pydfnWorks
python wrapper for dfnWorks
graph_transport.py
Go to the documentation of this file.
1 """
2 .. module:: graph_transport.py
3  :synopsis: simulate transport on a pipe network representaiton of a DFN
4 .. moduleauthor:: Shriram Srinivasan <shrirams@lanl.gov>
5 
6 """
7 
8 import networkx as nx
9 import numpy as np
10 import numpy.random
11 import sys
12 import math
13 import scipy.special
14 import multiprocessing as mp
15 
16 # pydfnworks modules
18 
19 
21  """ Create a list of downstream neighbor vertices for every vertex on NetworkX graph obtained after running graph_flow
22 
23  Parameters
24  ----------
25  Gtilde: NetworkX graph
26  obtained from output of graph_flow
27 
28  Returns
29  -------
30  dict : nested dictionary.
31 
32  Notes
33  -----
34  dict[n]['child'] is a list of vertices downstream to vertex n
35  dict[n]['prob'] is a list of probabilities for choosing a downstream node for vertex n
36  """
37 
38  nbrs_dict = {}
39 
40  for i in nx.nodes(Gtilde):
41 
42  if Gtilde.nodes[i]['outletflag']:
43  continue
44 
45  node_list = []
46  prob_list = []
47  nbrs_dict[i] = {}
48 
49  for v in nx.neighbors(Gtilde, i):
50  delta_p = Gtilde.nodes[i]['pressure'] - Gtilde.nodes[v]['pressure']
51 
52  if delta_p > np.spacing(Gtilde.nodes[i]['pressure']):
53  node_list.append(v)
54  prob_list.append(Gtilde.edges[i, v]['flux'])
55 
56  if node_list:
57  nbrs_dict[i]['child'] = node_list
58  nbrs_dict[i]['prob'] = np.array(prob_list,
59  dtype=float) / sum(prob_list)
60  else:
61  nbrs_dict[i]['child'] = None
62  nbrs_dict[i]['prob'] = None
63 
64  return nbrs_dict
65 
66 
67 class Particle():
68  '''
69  Class for graph particle tracking, instantiated for each particle
70 
71 
72  Attributes:
73  * time : Total advective time of travel of particle [s]
74  * tdrw_time : Total advection+diffusion time of travel of particle [s]
75  * dist : total distance travelled in advection [m]
76  * flag : True if particle exited system, else False
77  * frac_seq : Dictionary, contains information about fractures through which the particle went
78  '''
79  def __init__(self):
80  self.frac_seqfrac_seq = {}
81  self.timetime = float
82  self.tdrw_timetdrw_time = float
83  self.distdist = float
84  self.flagflag = bool
85 
86  def set_start_time_dist(self, t, L):
87  """ Set initial value for travel time and distance
88 
89  Parameters
90  ----------
91  self: object
92  t : double
93  time in seconds
94  L : double
95  distance in metres
96 
97  Returns
98  -------
99 
100  """
101 
102  self.timetime = t
103  self.tdrw_timetdrw_time = t
104  self.distdist = L
105 
106  def track(self, Gtilde, nbrs_dict, frac_porosity, tdrw_flag,
107  matrix_porosity, matrix_diffusivity):
108  """ track a particle from inlet vertex to outlet vertex
109 
110  Parameters
111  ----------
112  Gtilde : NetworkX graph
113  graph obtained from graph_flow
114 
115  nbrs_dict: nested dictionary
116  dictionary of downstream neighbors for each vertex
117 
118  Returns
119  -------
120 
121  """
122 
123  Inlet = [v for v in nx.nodes(Gtilde) if Gtilde.nodes[v]['inletflag']]
124 
125  curr_v = numpy.random.choice(Inlet)
126 
127  while True:
128 
129  if Gtilde.nodes[curr_v]['outletflag']:
130  self.flagflag = True
131  break
132 
133  if nbrs_dict[curr_v]['child'] is None:
134  self.flagflag = False
135  break
136 
137  next_v = numpy.random.choice(nbrs_dict[curr_v]['child'],
138  p=nbrs_dict[curr_v]['prob'])
139 
140  frac = Gtilde.edges[curr_v, next_v]['frac']
141 
142  t = Gtilde.edges[curr_v, next_v]['time'] * frac_porosity
143 
144  if tdrw_flag:
145  a_nondim = matrix_porosity * math.sqrt(
146  matrix_diffusivity /
147  (12 * Gtilde.edges[curr_v, next_v]['perm']))
148  xi = numpy.random.random_sample()
149  t_tdrw = t + math.pow(a_nondim * t / scipy.special.erfcinv(xi),
150  2)
151  else:
152  t_tdrw = t
153 
154  L = Gtilde.edges[curr_v, next_v]['length']
155  self.add_frac_dataadd_frac_data(frac, t, t_tdrw, L)
156  curr_v = next_v
157 
158  def add_frac_data(self, frac, t, t_tdrw, L):
159  """ add details of fracture through which particle traversed
160 
161  Parameters
162  ----------
163  self: object
164 
165  frac: int
166  index of fracture in graph
167 
168  t : double
169  time in seconds
170  t_tdrw: double
171  time in seconds
172  L : double
173  distance in metres
174 
175  Returns
176  -------
177 
178  """
179 
180  self.frac_seqfrac_seq.update(
181  {frac: {
182  'time': 0.0,
183  'tdrw_time': 0.0,
184  'dist': 0.0
185  }})
186  self.frac_seqfrac_seq[frac]['time'] += t
187  self.frac_seqfrac_seq[frac]['tdrw_time'] += t_tdrw
188  self.frac_seqfrac_seq[frac]['dist'] += L
189  self.timetime += t
190  self.tdrw_timetdrw_time += t_tdrw
191  self.distdist += L
192 
193  def write_file(self, partime_file=None, frac_id_file=None):
194  """ write particle data to output files, if supplied
195 
196  Parameters
197  ----------
198  self: object
199 
200  partime_file : string
201  name of file to which the total travel times and lengths will be written for each particle, default is None
202 
203  frac_id_file : string
204  name of file to which detailed information of each particle's travel will be written, default is None
205 
206  Returns
207  -------
208  """
209 
210  if partime_file is not None:
211  with open(partime_file, "a") as f1:
212  f1.write("{:3.3E} {:3.3E} {:3.3E} {:3.3E} \n".format(
213  self.timetime, self.tdrw_timetdrw_time, self.tdrw_timetdrw_time - self.timetime,
214  self.distdist))
215 
216  if frac_id_file is not None:
217  data1 = []
218  data1 = [
219  key for key in self.frac_seqfrac_seq if isinstance(key, dict) is False
220  ]
221  n = len(data1)
222  with open(frac_id_file, "a") as f2:
223  for i in range(n):
224  f2.write("{:d} ".format(data1[i]))
225  f2.write("\n")
226  # f2.write("{:d}".format(n))
227  # for i in range(0, 4 * n):
228  # if i < n:
229  # f2.write("{3:d} ".format(data1[i]))
230  # elif n - 1 < i < 2 * n:
231  # f2.write("{:3.2E} ".format(
232  # self.frac_seq[data1[i - n]]['time']))
233  # elif 2 * n - 1 < i < 3 * n:
234  # f2.write("{:3.2E} ".format(
235  # self.frac_seq[data1[i - 2 * n]]['tdrw_time']))
236  # else:
237  # f2.write("{:3.2E} ".format(
238  # self.frac_seq[data1[i - 3 * n]]['dist']))
239  # f2.write("\n")
240 
241 
242 def prepare_output_files(partime_file, frac_id_file):
243  """ opens the output files partime_file and frac_id_file and writes the
244  header for each
245 
246  Parameters
247  ----------
248 
249  partime_file : string
250  name of file to which the total travel times and lengths will be written for each particle
251 
252  frac_id_file : string
253  name of file to which detailed information of each particle's travel will be written
254 
255  Returns
256  -------
257  None
258  """
259 
260  try:
261  with open(partime_file, "w") as f1:
262  f1.write(
263  "# advective time (s) advection+diffusion time (s) diffusion time (s) total advection distance covered (m)\n"
264  )
265  except:
266  error = "ERROR: Unable to open supplied partime_file file {}\n".format(
267  partime_file)
268  sys.stderr.write(error)
269  sys.exit(1)
270 
271  try:
272  with open(frac_id_file, "w") as f2:
273  f2.write("# List of fractures each particle visits\n")
274  #f2.write(
275  # "# Line has (n+n+n+n) entries, consisting of all frac_ids (from 0), advective times (s), advective+diffusion times (s), advection dist covered (m)\n"
276  #)
277  except:
278  error = "ERROR: Unable to open supplied frac_id_file file {}\n".format(
279  frac_id_file)
280  sys.stderr.write(error)
281  sys.exit(1)
282 
283 
284 def dump_particle_info(particles, partime_file, frac_id_file):
285  """ If running graph transport in parallel, this function dumps out all the
286  particle information is a single pass rather then opening and closing the
287  files for every particle
288 
289 
290  Parameters
291  ----------
292  particles : list
293  list of particle objects
294 
295  partime_file : string
296  name of file to which the total travel times and lengths will be written for each particle
297 
298  frac_id_file : string
299  name of file to which detailed information of each particle's travel will be written
300 
301  Returns
302  -------
303  pfailcount : int
304  Number of particles that do not exit the domain
305 
306  """
307 
308  prepare_output_files(partime_file, frac_id_file)
309 
310  f1 = open(partime_file, "a")
311  f2 = open(frac_id_file, "a")
312 
313  pfailcount = 0
314 
315  for particle in particles:
316  if particle.flag:
317  f1.write("{:3.3E} {:3.3E} {:3.3E} {:3.3E} \n".format(
318  particle.time, particle.tdrw_time,
319  particle.tdrw_time - particle.time, particle.dist))
320 
321  data1 = [
322  key for key in particle.frac_seq
323  if isinstance(key, dict) is False
324  ]
325  n = len(data1)
326 
327  for i in range(n):
328  f2.write("{:d} ".format(data1[i]))
329  f2.write("\n")
330  # f2.write("{:d}".format(n))
331  # for i in range(0, 4 * n):
332  # if i < n:
333  # f2.write("{3:d} ".format(data1[i]))
334  # elif n - 1 < i < 2 * n:
335  # f2.write("{:3.2E} ".format(
336  # self.frac_seq[data1[i - n]]['time']))
337  # elif 2 * n - 1 < i < 3 * n:
338  # f2.write("{:3.2E} ".format(
339  # self.frac_seq[data1[i - 2 * n]]['tdrw_time']))
340  # else:
341  # f2.write("{:3.2E} ".format(
342  # self.frac_seq[data1[i - 3 * n]]['dist']))
343  # f2.write("\n")
344  else:
345  pfailcount += 1
346 
347  f1.close()
348  f2.close()
349  return pfailcount
350 
351 
352 def track_particle(data):
353  """ Tracks a single particle through the graph
354 
355  all input parameters are in the dictionary named data
356 
357  Parameters
358  ----------
359 
360  Gtilde : NetworkX graph
361  obtained from graph_flow
362 
363  nbrs_dict : dict
364  see function create_neighbor_list
365 
366  frac_porosity: float
367  porosity of fracture, default is 1.0
368 
369  tdrw_flag : Bool
370  if False, matrix_porosity, matrix_diffusivity are ignored
371 
372  matrix_porosity: float
373  default is 0.02
374 
375  matrix_diffusivity: float
376  default is 1e-11 m^2/s
377 
378  Returns
379  -------
380  particle : object
381  particle trajectory information
382 
383  """
384 
385  particle = Particle()
386  particle.set_start_time_dist(0, 0)
387  particle.track(data["Gtilde"], data["nbrs_dict"], data["frac_porosity"],
388  data["tdrw_flag"], data["matrix_porosity"],
389  data["matrix_diffusivity"])
390 
391  return particle
392 
393 
395  Gtilde,
396  nparticles,
397  partime_file,
398  frac_id_file,
399  frac_porosity=1.0,
400  tdrw_flag=False,
401  matrix_porosity=0.02,
402  matrix_diffusivity=1e-11):
403  """ Run particle tracking on the given NetworkX graph
404 
405  Parameters
406  ----------
407  self : object
408  DFN Class
409 
410  Gtilde : NetworkX graph
411  obtained from graph_flow
412 
413  nparticles: int
414  number of particles
415 
416  partime_file : string
417  name of file to which the total travel times and lengths will be written for each particle
418 
419  frac_id_file : string
420  name of file to which detailed information of each particle's travel will be written
421 
422  frac_porosity: float
423  porosity of fracture, default is 1.0
424  tdrw_flag : Bool
425  if False, matrix_porosity, matrix_diffusivity are ignored
426  matrix_porosity: float
427  default is 0.02
428  matrix_diffusivity: float
429  default is 1e-11 in SI units
430 
431  Returns
432  -------
433 
434  Notes
435  -----
436  Information on individual functions is found therein
437  """
438 
439  nbrs_dict = create_neighbor_list(Gtilde)
440 
441  print("--> Creating downstream neighbor list")
442 
443  Inlet = [v for v in nx.nodes(Gtilde) if Gtilde.nodes[v]['inletflag']]
444 
445  pfailcount = 0
446  print("--> Starting particle tracking for %d particles" % nparticles)
447 
448  if self.ncpu > 1:
449  print("--> Using %d processors" % self.ncpu)
450  mp_input = []
451  for i in range(nparticles):
452 
453  data = {}
454  data["Gtilde"] = Gtilde
455  data["nbrs_dict"] = nbrs_dict
456  data["frac_porosity"] = frac_porosity
457  data["tdrw_flag"] = tdrw_flag
458  data["matrix_porosity"] = matrix_porosity
459  data["matrix_diffusivity"] = matrix_diffusivity
460  #data["partime_file"] = partime_file
461  #data["frac_id_file"] = frac_id_file
462  mp_input.append(data)
463 
464  pool = mp.Pool(self.ncpu)
465  particles = pool.map(track_particle, mp_input)
466  pool.close()
467  pool.join()
468  pool.terminate()
469  print("--> Tracking Complete")
470  print("--> Writing Data to files: {} and {}".format(
471  partime_file, frac_id_file))
472  dump_particle_info(particles, partime_file, frac_id_file)
473  print("--> Writing Data Complete")
474 
475  else:
476  prepare_output_files(partime_file, frac_id_file)
477  for i in range(nparticles):
478  if i % 1000 == 0:
479  print("--> Starting particle %d out of %d" % (i, nparticles))
480  particle_i = Particle()
481  particle_i.set_start_time_dist(0, 0)
482  particle_i.track(Gtilde, nbrs_dict, frac_porosity, tdrw_flag,
483  matrix_porosity, matrix_diffusivity)
484 
485  if particle_i.flag:
486  particle_i.write_file(partime_file, frac_id_file)
487  else:
488  pfailcount += 1
489 
490  if pfailcount == 0:
491  print("--> All particles exited")
492  else:
493  print("--> Out of {} particles, {} particles did not exit".format(
494  nparticles, pfailcount))
495  return
def track(self, Gtilde, nbrs_dict, frac_porosity, tdrw_flag, matrix_porosity, matrix_diffusivity)
def write_file(self, partime_file=None, frac_id_file=None)
def add_frac_data(self, frac, t, t_tdrw, L)
def dump_particle_info(particles, partime_file, frac_id_file)
def run_graph_transport(self, Gtilde, nparticles, partime_file, frac_id_file, frac_porosity=1.0, tdrw_flag=False, matrix_porosity=0.02, matrix_diffusivity=1e-11)
def prepare_output_files(partime_file, frac_id_file)