"""
.. module:: mesh_dfn_helper.py
:synopsis: helper functions for meshing DFN using LaGriT
.. moduleauthor:: Jeffrey Hyman <jhyman@lanl.gov>
"""
import os
import glob
from numpy import genfromtxt, sort, zeros
import subprocess
def parse_params_file(quiet=False):
""" Reads params.txt file from DFNGen and parses information
Parameters
---------
quiet : bool
If True details are not printed to screen, if False they area
Returns
-------
num_poly: int
Number of Polygons
h: float
Meshing length scale h
dudded_points: int
Expected number of dudded points in Filter (LaGriT)
visual_mode : bool
If True, reduced_mesh.inp is created (not suitable for flow and transport), if False, full_mesh.inp is created
domain: dict
x,y,z domain sizes
Notes
-----
None
"""
if not quiet:
print("\n--> Parsing params.txt")
fparams = open('params.txt', 'r')
# Line 1 is the number of polygons
num_poly = int(fparams.readline())
#Line 2 is the h scale
h = float(fparams.readline())
# Line 3 is the visualization mode: '1' is True, '0' is False.
visual_mode = int(fparams.readline())
# line 4 dudded points
dudded_points = int(fparams.readline())
# Dict domain contains the length of the domain in x,y, and z
domain = {'x': 0, 'y': 0, 'z': 0}
#Line 5 is the x domain length
domain['x'] = (float(fparams.readline()))
#Line 5 is the x domain length
domain['y'] = (float(fparams.readline()))
#Line 5 is the x domain length
domain['z'] = (float(fparams.readline()))
fparams.close()
if not quiet:
print("--> Number of Polygons: %d" % num_poly)
print("--> H_SCALE %f" % h)
if visual_mode > 0:
visual_mode = True
print("--> Visual mode is on")
else:
visual_mode = False
print("--> Visual mode is off")
print(f"--> Expected Number of dudded points: {dudded_points}")
print(f"--> X Domain Size {domain['x']} m")
print(f"--> Y Domain Size {domain['y']} m")
print(f"--> Z Domain Size {domain['z']} m")
print("--> Parsing params.txt complete\n")
return (num_poly, h, visual_mode, dudded_points, domain)
def check_dudded_points(dudded, hard=False):
"""Parses LaGrit log_merge_all.out and checks if number of dudded points is the expected number
Parameters
---------
dudded : int
Expected number of dudded points from params.txt
hard : bool
If hard is false, up to 1% of nodes in the mesh can be missed. If hard is True, no points can be missed.
Returns
---------
True/False : bool
True if the number of dudded points is correct and False if the number of dudded points is incorrect
Notes
-----
If number of dudded points is incorrect by over 1%, program will exit.
"""
print("--> Checking that number of dudded points is correct\n")
with open("lagrit_logs/log_merge_all.out", "r") as fp:
for line in fp.readlines():
if 'Dudding' in line:
print(f'--> From LaGriT: {line}')
try:
pts = int(line.split()[1])
except:
pts = int(line.split()[-1])
if 'RMPOINT:' in line:
print(f'--> From LaGriT: {line}')
total_points = int(line.split()[-1])
break
diff = abs(dudded - pts)
print(f"--> Expected Number of dudded points: {dudded}")
print(f"--> Actual Number of dudded points: {pts}")
print(f"--> Difference between expected and actual dudded points: {diff}")
if diff == 0:
print('--> The correct number of points were removed. Onward!\n')
return True
elif diff > 0:
## compare with total number poins
print(
'--> WARNING!!! Number of points removed does not match the expected value'
)
diff_ratio = 100 * (float(diff) / float(total_points))
if diff_ratio < 0.01 and hard == False:
print(f"--> However value is small: {diff}")
print("--> Proceeding\n")
return True
else:
print('ERROR! Incorrect Number of points removed')
print(f"Over 0.01% of nodes removed. Value is {diff_ratio:.2f}")
return False
def cleanup_dir():
""" Removes meshing files
Parameters
----------
None
Returns
-------
None
Notes
-----
Only runs if production_mode is True
"""
files_to_remove = [
'part*', 'log_merge*', 'merge*', 'mesh_poly_CPU*', 'mesh*inp',
'mesh*lg'
]
for name in files_to_remove:
for fl in glob.glob(name):
os.remove(fl)
def output_meshing_report(local_jobname, visual_mode):
""" Prints information about the final mesh to file
Parameters
----------
local_jobname : string
Name of current DFN job (not path)
visual_mode : bool
Determines is reduced_mesh or full_mesh is dumped
Returns
-------
None
Notes
-----
None
"""
f = open(local_jobname + '_mesh_information.txt', 'w')
f.write('The final mesh of DFN consists of: \n')
if not visual_mode:
print(
"--> Output files for flow calculations are written in : full_mesh.*"
)
finp = open('full_mesh.inp', 'r')
g = finp.readline()
g = g.split()
NumElems = int(g.pop(1))
NumIntNodes = int(g.pop(0))
f.write(str(NumElems) + ' triangular elements; \n')
f.write(str(NumIntNodes) + ' nodes / control volume cells; \n')
finp.close()
fstor = open('full_mesh.stor', 'r')
fstor.readline()
fstor.readline()
gs = fstor.readline()
gs = gs.split()
NumCoeff = int(gs.pop(0))
f.write(
str(NumCoeff) +
' geometrical coefficients / control volume faces. \n')
fstor.close()
else:
print(
"--> Output files for visualization are written in : reduced_mesh.inp"
)
print("--> Warning!!! Mesh is not suitable for flow and transport.")
finp = open('reduced_mesh.inp', 'r')
g = finp.readline()
g = g.split()
NumElems = int(g.pop(1))
NumIntNodes = int(g.pop(0))
f.write(str(NumElems) + ' triangular elements; \n')
f.write(str(NumIntNodes) + ' nodes / control volume cells. \n')
finp.close()
f.close()
def clean_up_files_after_prune(self):
''' After pruning a DFN to only include the fractures in prune_file this function removes references to those fractures from params.txt, perm.dat, aperature.dat, and poly_info.dat
Parameters
----------
self : DFN object
Returns
-------
None
Notes
-----
This function should always be run after pruning if flow solution is going to be run
'''
print("--> Editing DFN file based on fractures in %s" % self.prune_file)
keep_list = sort(genfromtxt(self.prune_file).astype(int))
num_frac = len(keep_list)
print("--> Editing params.txt file")
fin = open(self.path + '/params.txt')
try:
os.unlink('params.txt')
except:
pass
fout = open('params.txt', 'w')
line = fin.readline()
fout.write('%d\n' % num_frac)
for i in range(7):
line = fin.readline()
fout.write(line)
fin.close()
fout.close()
print("--> Complete")
print("--> Editing poly_info.dat file")
poly_info = genfromtxt(self.path + 'poly_info.dat')[keep_list - 1, :]
try:
os.unlink('poly_info.dat')
except:
pass
f = open('poly_info.dat', 'w')
for i in range(num_frac):
f.write('%d %d %f %f %f %d %f %f %d\n' %
(i + 1, poly_info[i, 1], poly_info[i, 2], poly_info[i, 3],
poly_info[i, 4], poly_info[i, 5], poly_info[i, 6],
poly_info[i, 7], poly_info[i, 8]))
f.close()
print("--> Complete")
print("--> Editing perm.dat file")
perm = genfromtxt(self.path + 'perm.dat', skip_header=1)[keep_list - 1, -1]
f = open('perm.dat', 'w+')
f.write('permeability\n')
for i in range(num_frac):
f.write('-%d 0 0 %e %e %e\n' % (7 + i, perm[i], perm[i], perm[i]))
f.close()
print("--> Complete")
print("--> Editing aperture.dat file")
aperture = genfromtxt(self.path + 'aperture.dat',
skip_header=1)[keep_list - 1, -1]
f = open('aperture.dat', 'w+')
f.write('aperture\n')
for i in range(num_frac):
f.write('-%d 0 0 %e \n' % (7 + i, aperture[i]))
f.close()
print("--> Complete")
print("--> Editing radii_Final.dat file")
fin = open(self.path + 'radii_Final.dat')
fout = open('radii_Final.dat', 'w')
# copy header
line = fin.readline()
fout.write(line)
line = fin.readline()
fout.write(line)
fin.close()
# write radii from remaining fractures
radii = genfromtxt(self.path + 'radii_Final.dat',
skip_header=2)[keep_list - 1, :]
for i in range(num_frac):
fout.write('%f %f %d\n' % (radii[i, 0], radii[i, 1], radii[i, 2]))
fout.close()
print("--> Complete")
print("--> Editing normal_vectors.dat file")
fin = open(self.path + 'normal_vectors.dat')
fout = open('normal_vectors.dat', 'w')
# copy header
normal_vect = genfromtxt(self.path + 'normal_vectors.dat')[keep_list -
1, :]
for i in range(num_frac):
fout.write('%f %f %f\n' %
(normal_vect[i, 0], normal_vect[i, 1], normal_vect[i, 2]))
fout.close()
print("--> Complete")
print("--> Editing translations.dat file")
fin = open(self.path + 'translations.dat')
fout = open('translations.dat', 'w')
# copy header
line = fin.readline()
fout.write(line)
points = []
for line in fin.readlines():
tmp = line.split(' ')
if tmp[-1] != 'R':
points.append((float(tmp[0]), float(tmp[1]), float(tmp[2])))
from numpy import asarray
points = asarray(points)
points = points[keep_list - 1, :]
for i in range(num_frac):
fout.write('%f %f %f\n' % (points[i, 0], points[i, 1], points[i, 2]))
fout.close()
print("--> Complete")
print("--> Editing Fracture Files Complete")
def create_mesh_links(path):
''' Makes symlinks for files in path required for meshing
Parameters
----------
path : string
Path to where meshing files are located
Returns
-------
None
Notes
-----
None
'''
import os.path
from shutil import rmtree
print("--> Creating links for meshing from %s" % path)
files = [
'params.txt', 'poly_info.dat', 'connectivity.dat', 'left.dat',
'right.dat', 'front.dat', 'back.dat', 'top.dat', 'bottom.dat', 'polys'
]
for f in files:
if os.path.isfile(f) or os.path.isdir(f):
print("Removing %s" % f)
try:
rmtree(f)
except:
print("Unable to remove %s" % f)
try:
os.symlink(path + f, f)
except:
print("Unable to make link for %s" % f)
pass
print("--> Complete")
[docs]def inp2gmv(self, inp_file=''):
""" Convert inp file to gmv file, for general mesh viewer. Name of output file for base.inp is base.gmv
Parameters
----------
self : object
DFN Class
inp_file : str
Name of inp file if not an attribure of self
Returns
----------
None
Notes
---------
"""
if inp_file:
self.inp_file = inp_file
else:
inp_file = self.inp_file
if inp_file == '':
error = 'ERROR: inp file must be specified in inp2gmv!\n'
sys.stderr.write(error)
sys.exit(1)
gmv_file = inp_file[:-4] + '.gmv'
with open('inp2gmv.lgi', 'w') as fid:
fid.write(f'read / avs / {inp_file} / mo\n')
fid.write(f'dump / gmv / {gmv_file} / mo\n')
fid.write('finish \n\n')
failure = run_lagrit_script('inp2gmv.lgi')
if failure:
error = 'ERROR: Failed to run LaGrit to get gmv from inp file!\n'
sys.stderr.write(error)
sys.exit(1)
print("--> Finished writing gmv format from avs format")
[docs]def inp2vtk_python(self):
""" Using Python VTK library, convert inp file to VTK file.
Parameters
----------
self : object
DFN Class
Returns
--------
None
Notes
--------
For a mesh base.inp, this dumps a VTK file named base.vtk
"""
import pyvtk as pv
if self.flow_solver != "PFLOTRAN":
error = "ERROR! Wrong flow solver requested\n"
sys.stderr.write(error)
sys.exit(1)
print("--> Using Python to convert inp files to VTK files")
if self.inp_file:
inp_file = self.inp_file
if inp_file == '':
error = 'ERROR: Please provide inp filename!\n'
sys.stderr.write(error)
sys.exit(1)
if self.vtk_file:
vtk_file = self.vtk_file
else:
vtk_file = inp_file[:-4]
self.vtk_file = vtk_file + '.vtk'
print("--> Reading inp data")
with open(inp_file, 'r') as f:
line = f.readline()
num_nodes = int(line.strip(' ').split()[0])
num_elems = int(line.strip(' ').split()[1])
coord = zeros((num_nodes, 3), 'float')
elem_list_tri = []
elem_list_tetra = []
for i in range(num_nodes):
line = f.readline()
coord[i, 0] = float(line.strip(' ').split()[1])
coord[i, 1] = float(line.strip(' ').split()[2])
coord[i, 2] = float(line.strip(' ').split()[3])
for i in range(num_elems):
line = f.readline().strip(' ').split()
line.pop(0)
line.pop(0)
elem_type = line.pop(0)
if elem_type == 'tri':
elem_list_tri.append([int(i) - 1 for i in line])
if elem_type == 'tet':
elem_list_tetra.append([int(i) - 1 for i in line])
print('--> Writing inp data to vtk format')
vtk = pv.VtkData(
pv.UnstructuredGrid(coord,
tetra=elem_list_tetra,
triangle=elem_list_tri),
'Unstructured pflotran grid')
vtk.tofile(vtk_file)
def run_lagrit_script(lagrit_file, output_file=None, quiet=False):
"""
Runs LaGriT
Parameters
-----------
----------
lagrit_file : string
Name of LaGriT script to run
output_file : string
Name of file to dump LaGriT output
quiet : bool
If false, information will be printed to screen.
Returns
----------
failure: int
If the run was successful, then 0 is returned.
"""
if output_file == None:
cmd = f"{os.environ['LAGRIT_EXE']} < {lagrit_file}"
else:
cmd = f"{os.environ['LAGRIT_EXE']} < {lagrit_file} > {output_file}"
if not quiet:
print(f"--> Running: {cmd}")
failure = subprocess.call(cmd, shell=True)
if failure:
error = f"ERROR running LaGriT on script {lagrit_file}. Exiting Program.\n"
return failure