pydfnWorks
python wrapper for dfnWorks
run_meshing.py
Go to the documentation of this file.
1 """
2 .. module:: run_meshing.py
3  :synopsis: functions to mesh fracture network in parallel
4 .. moduleauthor:: Jeffrey Hyman <jhyman@lanl.gov>
5 
6 """
7 
8 import subprocess
9 import os
10 import sys
11 import timeit
12 import glob
13 
14 import multiprocessing as mp
15 from shutil import copy, rmtree
16 from numpy import genfromtxt
17 from pydfnworks.dfnGen.meshing import mesh_dfn_helper as mh
18 from pydfnworks.dfnGen.meshing.poisson_disc.poisson_functions import single_fracture_poisson
19 
20 
21 def cleanup_failed_run(fracture_id, digits, quiet=True):
22  """ If meshing fails, this function moves all relavent files
23  to a folder for debugging
24 
25  Parameters
26  ----------
27  fracture_id : int
28  Current Fracture ID number
29  digits : int
30  Number of digits in total number of fractures
31 
32  Returns
33  -------
34  None
35 
36  Notes
37  ------
38  This will throw warnings depending on when in the work-flow it is called. Some files
39  might not be created at those times. It's okay.
40 
41 """
42 
43  if not quiet:
44  print(f"--> Cleaning up meshing run for fracture {fracture_id}")
45 
46  if not os.path.isfile("failure.txt"):
47  with open('failure.txt', "w+") as failure_file:
48  failure_file.write(f"{fracture_id}\n")
49  else:
50  with open('failure.txt', "a") as failure_file:
51  failure_file.write(f"{fracture_id}\n")
52 
53  folder = f"failure_{fracture_id}/"
54  try:
55  os.mkdir(folder)
56  except:
57  if not quiet:
58  print(f"Warning! Unable to make new folder: {folder}")
59  pass
60 
61  files = [
62  f"mesh_{fracture_id}.inp", f"{fracture_id}_mesh_errors.txt",
63  f"id_tri_node_{fracture_id}.list",
64  f"lagrit_logs/log_lagrit_{fracture_id:0{digits}d}.out"
65  ]
66 
67  for f in files:
68  try:
69  copy(f, folder)
70  except:
71  if not quiet:
72  print(f'--> Warning: Could copy {f} to failure folder')
73  pass
74 
75  symlinks = [
76  f"poly_{fracture_id}.inp", f"intersections_{fracture_id}.inp",
77  f"parameters_{fracture_id}.mlgi", f"mesh_poly_{fracture_id}.lgi",
78  f'points_{fracture_id}.xyz'
79  ]
80 
81  for f in symlinks:
82  try:
83  os.unlink(f)
84  except:
85  if not quiet:
86  print(f'--> Warning: Could not unlink {f}')
87  pass
88 
89  if not quiet:
90  print(f"--> Cleanup for Fracture {fracture_id} complete")
91 
92 
93 def mesh_fracture(fracture_id, visual_mode, num_poly):
94  """Child function for parallelized meshing of fractures
95 
96  Parameters
97  ----------
98  fracture_id : int
99  Current Fracture ID number
100  visual_mode : bool
101  True/False for reduced meshing
102  num_poly : int
103  Total Number of Fractures in the DFN
104 
105  Returns
106  -------
107  success index:
108  0 - run was successful
109  -1 - error making symbolic link
110  -2 - run failed in Poisson Sampling
111  -3 - run failed to produce mesh files
112  -4 - line of intersection not preserved
113 
114  Notes
115  -----
116  If meshing run fails, information about that fracture will be put into a directory failure_(fracture_id)
117 
118  """
119 
120  # get current process information
121  p = mp.current_process()
122  _, cpu_id = p.name.split("-")
123  cpu_id = int(cpu_id)
124  # get leading digits
125  digits = len(str(num_poly))
126 
127  print(
128  f"--> Fracture {fracture_id:0{digits}d} out of {num_poly} is starting on worker {cpu_id}"
129  )
130 
131  tic = timeit.default_timer()
132  # Create Symbolic Links
133  try:
134  os.symlink(f"polys/poly_{fracture_id}.inp", f"poly_{fracture_id}.inp")
135  except:
136  print(f"-->\n\n\nError creating link for poly_{fracture_id}.inp\n\n\n")
137  return (fracture_id, -1)
138 
139  try:
140  os.symlink(f"parameters/parameters_{fracture_id}.mlgi",\
141  f"parameters_{fracture_id}.mlgi")
142  except:
143  print(
144  f"-->\n\n\nError creating link for/parameters_{fracture_id}.mlgi\n\n\n"
145  )
146  return (fracture_id, -1)
147 
148  try:
149  os.symlink(f"lagrit_scripts/mesh_poly_{fracture_id}.lgi",\
150  f"mesh_poly_{fracture_id}.lgi")
151  except:
152  print(
153  f"-->\n\n\nError creating link for/parameters_{fracture_id}.mlgi\n\n\n"
154  )
155  return (fracture_id, -1)
156 
157  if not visual_mode:
158  try:
159  os.symlink(f"intersections/intersections_{fracture_id}.inp",\
160  f"intersections_{fracture_id}.inp")
161  except:
162  print(
163  f"\n\n\n--> Error creating link for intersections_{fracture_id}.inp\n\n\n"
164  )
165  return (fracture_id, -1)
166 
167 
168  try:
169  single_fracture_poisson(fracture_id)
170  except:
171  print(" Error:", sys.exc_info()[0])
172  print(
173  f"-->\n\n\nERROR occurred generating points for fracture {fracture_id}\n\n\n"
174  )
175  cleanup_failed_run(fracture_id, digits)
176  return (fracture_id, -2)
177 
178 
179  # check if points were created, if not exit
180  if not os.path.isfile(f'points/points_{fracture_id}.xyz'):
181  print(
182  f"-->\n\n\nERROR occurred generating points for fracture {fracture_id}\n\n\n"
183  )
184  cleanup_failed_run(fracture_id, digits)
185  return (fracture_id, -2)
186  try:
187  os.symlink(f"points/points_{fracture_id}.xyz",
188  f"points_{fracture_id}.xyz")
189  except:
190  print(
191  f"-->\n\n\nError creating link for points_{fracture_id}.xyz\n\n\n"
192  )
193  return (fracture_id, -1)
194 
195  # run LaGriT Meshing
196  try:
197  mh.run_lagrit_script(
198  f"mesh_poly_{fracture_id}.lgi",
199  output_file=f"lagrit_logs/log_lagrit_{fracture_id:0{digits}d}.out",
200  quiet=True)
201  except:
202  print(
203  f"\n\n\n--> ERROR occurred during meshing fracture {fracture_id}\n\n\n"
204  )
205  cleanup_failed_run(fracture_id, digits)
206  return (fracture_id, -3)
207 
208  # Check if mesh*.lg file was created, if not exit.
209  if not os.path.isfile(f'mesh_{fracture_id}.lg') or os.stat(
210  f'mesh_{fracture_id}.lg') == 0:
211  print(
212  f"\n\n\n--> ERROR occurred during meshing fracture {fracture_id}\n\n\n"
213  )
214  cleanup_failed_run(fracture_id, digits)
215  return (fracture_id, -3)
216 
217 
218  if not visual_mode:
219 
220  cmd_check = f"{os.environ['CONNECT_TEST_EXE']} \
221  intersections_{fracture_id}.inp \
222  id_tri_node_{fracture_id}.list \
223  mesh_{fracture_id}.inp \
224  {fracture_id}"
225 
226  # If the lines of intersection are not in the final mesh, put depending files
227  # into a directory for debugging.
228  try:
229  if subprocess.call(cmd_check, shell=True):
230  print(
231  f"\n\n\n--> ERROR: MESH CHECKING FAILED on {fracture_id}!!!\n\nEXITING PROGRAM\n\n\n"
232  )
233  cleanup_failed_run(fracture_id, digits)
234  return (fracture_id, -4)
235  except:
236  print(
237  f"\n\n\n--> ERROR: MESH CHECKING FAILED on {fracture_id}!!!\n\nEXITING PROGRAM\n\n\n"
238  )
239  cleanup_failed_run(fracture_id, digits)
240  return (fracture_id, -4)
241 
242  # Mesh checking was a success. Remove check files and move on
243  files = [f"id_tri_node_{fracture_id}.list", f"mesh_{fracture_id}.inp"]
244  for f in files:
245  try:
246  os.remove(f)
247  except:
248  print(f"--> Could not remove {f}\n")
249  pass
250 
251  # Remove symbolic
252  if visual_mode:
253  files = [
254  f'poly_{fracture_id}.inp', f'parameters_{fracture_id}.mlgi',
255  f"mesh_poly_{fracture_id}.lgi"
256  ]
257  else:
258  files = [
259  f'poly_{fracture_id}.inp', f'intersections_{fracture_id}.inp',
260  f'points_{fracture_id}.xyz', f'parameters_{fracture_id}.mlgi',
261  f"mesh_poly_{fracture_id}.lgi"
262  ]
263  for f in files:
264  try:
265  os.unlink(f)
266  except:
267  print(f'--> Warning: Could unlink {f}')
268  pass
269 
270  elapsed = timeit.default_timer() - tic
271  print(
272  f"--> Fracture {fracture_id:0{digits}d} out of {num_poly} is complete on worker {cpu_id}. Time required: {elapsed:.2f} seconds\n"
273  )
274  return (fracture_id, 0)
275 
276 
277 def mesh_fractures_header(fracture_list, ncpu, visual_mode, h):
278  """ Header function for Parallel meshing of fractures
279 
280  Creates a queue of fracture numbers ranging from 1, num_poly
281 
282  Each fractures is meshed using mesh_fracture called within the
283  worker function.
284 
285  If any fracture fails to mesh properly, then a folder is created with
286  that fracture information and the fracture number is written into
287  failure.txt.
288 
289  Parameters
290  ----------
291  fracture_list : list
292  Fractures to be meshed
293  visual_mode : bool
294  True/False for reduced meshing
295  num_poly : int
296  Total Number of Fractures
297 
298  Returns
299  -------
300  True/False : bool
301  True - If failure.txt is empty then all fractures have been meshed correctly
302  False - If failure.txt is not empty, then at least one fracture failed.
303 
304  Notes
305  -----
306  If one fracture fails meshing, program will exit.
307 
308  """
309  t_all = timeit.default_timer()
310  print()
311  print('=' * 80)
312  print(
313  f"\n--> Triangulating {len(fracture_list)} fractures using {ncpu} processors\n\n"
314  )
315  dirs = ["points", "lagrit_logs"]
316  for d in dirs:
317  if os.path.isdir(d):
318  rmtree(d)
319  os.mkdir(d)
320  else:
321  os.mkdir(d)
322 
323  pool = mp.Pool(ncpu)
324  result_list = []
325 
326  def log_result(result):
327  # This is called whenever foo_pool(i) returns a result.
328  # result_list is modified only by the main process, not the pool workers.
329  result_list.append(result)
330  if result[1] != 0:
331  pool.terminate()
332  # If a run fails, kill all other processes, and clean up the directory
333  names = [
334  "poly_*.inp", "mesh_poly_*.lgi", "parameters_*.mlgi",
335  "intersections_*.inp", "points_*.xzy"
336  ]
337  for name in names:
338  files_to_remove = glob.glob(name)
339  for f in files_to_remove:
340  os.remove(f)
341 
342  for i in fracture_list:
343  pool.apply_async(mesh_fracture,
344  args=(i, visual_mode, len(fracture_list)),
345  callback=log_result)
346 
347  pool.close()
348  pool.join()
349 
350  for result in result_list:
351  if result[1] != 0:
352  print(
353  f"\n\n--> Fracture number {result[0]} failed with error {result[1]}\n"
354  )
355  details = """
356 Error Index:
357 -1 - error making symbolic link
358 -2 - run failed in Poisson Sampling
359 -3 - run failed to produce mesh files
360 -4 - line of intersection not preserved
361  """
362  print(details)
363  return 1
364 
365  elapsed = timeit.default_timer() - t_all
366 
367  if os.path.isfile("failure.txt"):
368  failure_list = genfromtxt("failure.txt")
369  failure_flag = True
370  if type(failure_list) is list:
371  failure_list = sort(failure_list)
372  else:
373  print('--> Fractures:', failure_list, 'Failed')
374  print('--> Main process exiting.')
375  else:
376  failure_flag = False
377 
378  print('--> Triangulating Polygons: Complete')
379  time_sec = elapsed
380  time_min = elapsed / 60
381  time_hrs = elapsed / 3600
382 
383  print("--> Total Time to Mesh Network:")
384  print(
385  f"--> {time_sec:.2e} seconds\t{time_min:.2e} minutes\t{time_hrs:.2e} hours"
386  )
387 
388  print('=' * 80)
389  return failure_flag
390 
391 
392 def merge_worker(job):
393  """Parallel worker for merge meshes into final mesh
394 
395  Parameters
396  ----------
397  job : int
398  job number
399 
400  Returns
401  -------
402  bool : True if failed / False if successful
403 
404  Notes
405  -----
406  """
407 
408  print(f"--> Starting merge: {job}")
409  tic = timeit.default_timer()
410 
411  if mh.run_lagrit_script(f"lagrit_scripts/merge_poly_part_{job}.lgi",
412  f"lagrit_logs/log_merge_poly_part{job}.out",
413  quiet=True):
414  print(f"Error {job} failed")
415  return True
416 
417  elapsed = timeit.default_timer() - tic
418  print(
419  f"--> Merge Number {job} Complete. Time elapsed: {elapsed:.2f} seconds."
420  )
421  return False
422 
423 
424 def merge_the_meshes(num_poly, ncpu, n_jobs, visual_mode):
425  """Runs the LaGrit Scripts to merge meshes into final mesh
426 
427  Parameters
428  ----------
429  num_poly : int
430  Number of Fractures
431  ncpu : int
432  Number of Processors
433  n_jobs : int
434  Number of mesh pieces
435  visual_mode : bool
436  True/False for reduced meshing
437 
438  Returns
439  -------
440  None
441 
442  Notes
443  -----
444  Meshes are merged in batches for efficiency
445  """
446  print('=' * 80)
447  if n_jobs == 1:
448  print(
449  f"--> Merging triangulated fracture meshes using {n_jobs} processor."
450  )
451  else:
452  print(
453  f"--> Merging triangulated fracture meshes using {n_jobs} processors."
454  )
455 
456  jobs = range(1, n_jobs + 1)
457  tic = timeit.default_timer()
458  num_cpu = len(jobs)
459  pool = mp.Pool(num_cpu)
460  outputs = pool.map(merge_worker, jobs)
461  pool.close()
462  pool.join()
463  pool.terminate()
464  elapsed = timeit.default_timer() - tic
465  print(
466  f"\n--> Initial merging complete. Time elapsed: {elapsed:.2f} seconds.\n"
467  )
468  for output in outputs:
469  if output:
470  error = "ERROR!!! One of the merges failed\nExiting\n"
471  sys.stderr.write(error)
472  sys.exit(1)
473 
474  print('=' * 80)
475  print("--> Starting Final Merge")
476  tic = timeit.default_timer()
477 
478  mh.run_lagrit_script('lagrit_scripts/merge_rmpts.lgi',
479  'lagrit_logs/log_merge_all.out',
480  quiet=True)
481 
482  elapsed = timeit.default_timer() - tic
483  print(f"--> Final merge took {elapsed:.2f} seconds")
484 
485  if not visual_mode:
486  if (os.stat("full_mesh.lg").st_size > 0):
487  print("--> Final merge successful")
488 
489  else:
490  error = "ERROR: Final merge Failed\n"
491  sys.stderr.write(error)
492  sys.exit(1)
493 
494  else:
495  if os.stat("reduced_mesh.inp").st_size > 0:
496  print("--> Final merge successful")
497  else:
498  error = "ERROR: Final merge Failed\n"
499  sys.stderr.write(error)
500  sys.exit(1)
501  print('=' * 80)
def mesh_fracture(fracture_id, visual_mode, num_poly)
Definition: run_meshing.py:93
def mesh_fractures_header(fracture_list, ncpu, visual_mode, h)
Definition: run_meshing.py:277
def cleanup_failed_run(fracture_id, digits, quiet=True)
Definition: run_meshing.py:21
def merge_the_meshes(num_poly, ncpu, n_jobs, visual_mode)
Definition: run_meshing.py:424