ehratm APIs
srm.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 # *******************************************************
4 # * International Data Centre *
5 # * Comprehensive Nuclear Test Ban Treaty Organization *
6 # * Vienna *
7 # * Austria *
8 # *
9 # * Don Morton (DM) *
10 # * Boreal Scientific Computing *
11 # * Fairbanks, Alaska USA *
12 # *******************************************************
13 
14 
15 
16 #-----------------------------------------------------------------------------
17 
18 #import argparse
19 import datetime
20 import fnmatch
21 import glob
22 import logging
23 import os
24 import shutil
25 import subprocess
26 import uuid
27 
28 import f90nml
29 
30 import ehratm.defaults
31 import ehratm.mylogger
32 
34 
36 
37 class SrmWorkflow(object):
38 
39  '''Class for managing preparation and execution of srm component
40 
41  This component currently uses the standard ATM Pipeline flexpart2ctbt
42  for the conversion. This means that, as-is, it will not process
43  an OUTGRID nest, but only the top-level nest.
44  '''
45 
46  def __init__(self,
47  srm_exec_path=None,
48  working_rootdir=None,
49  flexwrfout_dir=None,
50  levels_list=[1],
51  log_level=None
52  ):
53 
54  '''Check and initialise the class
55 
56  Parameters
57  ----------
58  srm_exec_path : str
59  (Optional) Full path to the SRM executable. This is assumed
60  to have been installed in a way that is compatible with the
61  ehratm module. If arg is not present, uses a default value
62  working_rootdir : str
63  (Optional) Full path to a directory (assumed to have already
64  been created), to be used as scratch space for setting up and
65  running this srm instance. If arg is not present, uses a
66  default value. A 'srm_rundir' subdir will be created in here
67  flexwrfout_dir : str
68  Full path to a directory containing the FlexWRF binary output files,
69  including the header file
70  levels_list : list
71  (Optional) List of integers designating the vertical levels (indexed from 1)
72  to produce SRM files for. If arg is not present, assumes [1]
73  log_level : int
74  Python logging level (e.g. logging.INFO)
75 
76  Example
77  -------
78 
79  '''
80 
81  if log_level:
82  self._logging_level = log_level
83  else:
84  self._logging_level = DEFAULTS.log_level()
85  LOGGER.setLevel(self._logging_level)
86 
87  # Now I can use the logger
88  LOGGER.debug('started')
89 
90  self._srm_exec_path = None
91  if srm_exec_path:
92  if not os.path.isfile(srm_exec_path):
93  raise FileNotFoundError('srm_exec_path not found: %s' %
94  srm_exec_path)
95  if not os.access(srm_exec_path, os.X_OK):
96  raise FileNotFoundError('srm_exec_path not executable: %s' %
97  srm_exec_path)
98 
99  self._srm_exec_path = srm_exec_path
100  else:
101  # Use the default
102  self._srm_exec_path = DEFAULTS.srm_flexpart2ctbt_path()
103 
104  # Check for the working root dir, and try to use a default
105  # value if it wasn't passed in as an arg
106  if not working_rootdir:
107  working_rootdir = DEFAULTS.working_scratch_rootdir()
108  self._working_rootdir = working_rootdir
109  if not os.path.isdir(self._working_rootdir):
110  raise FileNotFoundError('working_rootdir not found: %s' %
111  self._working_rootdir)
112  LOGGER.debug('working_rootdir: %s' % self._working_rootdir)
113 
114  self._srm_rundir = os.path.join(self._working_rootdir,
115  'srm_rundir')
116  os.mkdir(self._srm_rundir, 0o755)
117  if not os.path.isdir(self._srm_rundir):
118  raise FileNotFoundError('Failed to create srm_rundir: %s' %
119  self._srm_rundir)
120 
121 
122 
123 
124  self._flexwrfout_dir = None
125  if flexwrfout_dir:
126  if not os.path.isdir(flexwrfout_dir):
127  raise FileNotFoundError('flexwrfout_dir not found: %s' %
128  flexwrfout_dir)
129 
130  self._flexwrfout_dir = flexwrfout_dir
131  else:
132  raise Exception('No flexwrfout_dir specified')
133 
134  if type(levels_list) is not list:
135  raise ValueError('levels_list is not a list')
136 
137  for level in levels_list:
138  if type(level) is not int:
139  raise ValueError('levels_list has a non-integer element')
140 
141  # If we made it here, we should have a reasonable levels_list
142  self._levels_list = levels_list
143 
144 
145  def run_srm(self, srm_output_stagedir=None,
146  multiplier=1.0,
147  compress=False):
148 
149 
150  '''Set up and run, via command line, an instance of flexpart2ctbt
151  for each level in the levels_list
152 
153  Parameters
154  ----------
155  srm_output_stagedir : str
156  (Optional) Full path to dir where srm output files will be staged.
157  If not specified, no staging will be done. If specified, we
158  assume the dir already exists.
159  multiplier : float
160  (Optional) Factor by which to multiply concentrations
161  compress : boolean
162  (Optional) NOT YET IMPLEMENTED - compresses resulting SRM
163  file if set to True
164 
165  Returns
166  -------
167  return_dict : dict
168  Dictionary with manifest of the srm output files and the location
169  (if applicable) of staged files
170  '''
171 
172 
173  LOGGER.debug('Starting run_srm()...')
174 
175 
176  # If compress was requested, warn that it's not implemented yet
177  if compress:
178  LOGGER.warning('SRM compress is not yet implemented.')
179 
180 
181 
182  # Check that srm_output_stagedir exists (if it's specified
183  # as an arg) and is writable
184  if srm_output_stagedir:
185  if not os.path.isdir(srm_output_stagedir):
186  raise FileNotFoundError(
187  'srm_output_stagedir not found: %s' % srm_output_stagedir
188  )
189  if not os.access(srm_output_stagedir, os.W_OK):
190  raise FileNotFoundError(
191  'srm_output_stagedir not writable: %s' % srm_output_stagedir
192  )
193 
194  os.chdir(self._srm_rundir)
195 
196  srm_output_manifest = {}
197  srm_output_manifest['rundir'] = self._srm_rundir
198  srm_output_manifest['levels'] = {}
199 
200  srm_status_dict = {}
201  for level in self._levels_list:
202  LOGGER.debug('Creating SRM file for level: %d' % level)
203 
204 
205  # Create the output_manifest for this level
206  level_output_manifest = {str(level) : None}
207 
208  # Create control file for this level
209  control_fname = 'control_level_%02d' % level
210  cfh = open(control_fname, 'w')
211  # Be sure to add trailing / to path, as required by flexpart2ctbt
212  cfh.write(self._flexwrfout_dir + '/' + '\n')
213  cfh.write('============================================\n')
214  cfh.write('%d\n' % level)
215  cfh.write('%e\n' % multiplier)
216  cfh.write('fp\n')
217  cfh.write('l%d\n' % level)
218  cfh.close()
219 
220  # Run and test exit code (or something like that)
221  the_command = self._srm_exec_path + ' < ' + control_fname
222  subprocess.call(the_command, shell=True)
223 
224 
225 
226  # Gather the srm filenames for this level and add to
227  # srm_output_manifest (with other components, this would
228  # have been done in the nwpservice component, but we're
229  # not putting srm down in there)
230 
231  # These are example filenames for a single release, for
232  # Level 1
233  #
234  # SICILY.fp.20171023000000.l1.srm
235  # SICILY.fp.20171023000000.l1.l0.wet.srm
236  # SICILY.fp.20171023000000.l1.l0.dry.srm
237 
238  # It's a bit of a pattern-matching hack, but it should work
239  digits_14 = '[0-9][0-9][0-9][0-9][0-9][0-9][0-9]'
240  digits_14 += '[0-9][0-9][0-9][0-9][0-9][0-9][0-9]'
241  prepattern = '*.fp.' + digits_14 + '.l'
242  prepattern += '%1d' % level
243  pattern_conc = prepattern + '.srm'
244  pattern_dry = prepattern + '.l0.dry.srm'
245  pattern_wet = prepattern + '.l0.wet.srm'
246 
247  matched_fnames = []
248 
249  for fname in os.listdir('.'):
250  if fnmatch.fnmatch(fname, pattern_conc):
251  matched_fnames.append(fname)
252  elif fnmatch.fnmatch(fname, pattern_dry):
253  matched_fnames.append(fname)
254  elif fnmatch.fnmatch(fname, pattern_wet):
255  matched_fnames.append(fname)
256 
257  LOGGER.debug('matched_fnames: %s' % matched_fnames)
258 
259  # Create the output manifest for this level
260  level_manifest = {}
261  for fname in matched_fnames:
262  # Get bytes and add to dict
263  bytes = os.path.getsize(fname)
264  file_info = {'bytes' : bytes}
265  level_manifest[fname] = file_info
266 
267 
268  srm_output_manifest['levels'] = {str(level) : level_manifest}
269 
270  LOGGER.debug('matched_fnames: %s' % matched_fnames)
271  LOGGER.debug('srm_output_manifest: %s' % srm_output_manifest)
272 
273  # Consider a staging, like we normally would do down at
274  # nwpservice level. If unsuccessful, simply returns a False
275  # Assumes that we also want auxfiles (in this case, the control
276  # files, copied over, too)
277  stage_success = self._stage_output(
278  staging_dir=srm_output_stagedir,
279  auxfiles=True
280  )
281  LOGGER.debug('stage_success: %s' % stage_success)
282  if stage_success:
283  staging_dir = srm_output_stagedir
284  else:
285  staging_dir = None
286 
287 
288  # Create the return dict
289  return_dict = {
290  'srm_output_manifest' : srm_output_manifest,
291  'staging_dir' : staging_dir
292  }
293  LOGGER.debug('return_dict: %s' % return_dict)
294 
295  return return_dict
296 
297 
298  def _stage_output(self, staging_dir=None, auxfiles=False):
299 
300  '''Stages output to specified dir
301 
302  Stages the output (met_em*) and, optionally, other runtime files to
303  specified directory
304 
305  Parameters
306  ----------
307  staging_dir : str
308  Must already exist and be writable. If not specified,
309  then self._output_dir is used. If that wasn't specified,
310  then we abort and return False to caller
311  auxfiles : boolean
312  If set to True, will also copy in the metgrid.log* files
313  and namelist.wps.
314 
315  Returns
316  -------
317  return value : boolean
318  True or False based on success of operation
319  '''
320 
321 
322  if staging_dir:
323  if os.path.isdir(staging_dir) and os.access(staging_dir, os.W_OK):
324  dest_dir = staging_dir
325  LOGGER.debug('dest_dir: %s' % dest_dir)
326  else:
327  LOGGER.error('staging_dir not accessible and/or writable: %s',
328  staging_dir)
329  return False
330  else:
331  LOGGER.info('No valid, writable staging dir. Returning None')
332  return False
333 
334  # Get the *.srm filenames and copy over
335  srm_files = [os.path.basename(x) for
336  x in glob.glob(self._srm_rundir + '/*.srm')]
337 
338  LOGGER.debug('Copying srm files: %s' % srm_files)
339  for f in srm_files:
340  shutil.copy(os.path.join(self._srm_rundir, f),
341  dest_dir)
342 
343  # Optionally, copy over the log files and namelist.wps
344  if auxfiles:
345  control_files = [os.path.basename(x) for
346  x in glob.glob(self._srm_rundir + '/control_level_*')]
347  auxfile_list = control_files
348  LOGGER.debug('Copying aux files: %s' % auxfile_list)
349  for f in auxfile_list:
350  shutil.copy(os.path.join(self._srm_rundir, f),
351  dest_dir)
352 
353  return True
ehratm.postproc.srm.SrmWorkflow._flexwrfout_dir
_flexwrfout_dir
Definition: srm.py:118
ehratm.defaults
Definition: defaults.py:1
ehratm.postproc.srm.SrmWorkflow.run_srm
def run_srm(self, srm_output_stagedir=None, multiplier=1.0, compress=False)
Definition: srm.py:145
ehratm.postproc.srm.SrmWorkflow._logging_level
_logging_level
Definition: srm.py:76
ehratm.postproc.srm.SrmWorkflow._working_rootdir
_working_rootdir
Definition: srm.py:102
ehratm.postproc.srm.SrmWorkflow._stage_output
def _stage_output(self, staging_dir=None, auxfiles=False)
Definition: srm.py:298
ehratm.defaults.Defaults
Definition: defaults.py:9
ehratm.postproc.srm.SrmWorkflow.__init__
def __init__(self, srm_exec_path=None, working_rootdir=None, flexwrfout_dir=None, levels_list=[1], log_level=None)
Definition: srm.py:46
ehratm.mylogger.getlogger
def getlogger()
Definition: mylogger.py:11
ehratm.mylogger
Definition: mylogger.py:1
ehratm.postproc.srm.SrmWorkflow._srm_rundir
_srm_rundir
Definition: srm.py:108
ehratm.postproc.srm.SrmWorkflow._srm_exec_path
_srm_exec_path
Definition: srm.py:84
ehratm.postproc.srm.SrmWorkflow._levels_list
_levels_list
Definition: srm.py:136
ehratm.postproc.srm.SrmWorkflow
Definition: srm.py:37