ehratm APIs
ecmwfplevels.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 # *******************************************************
4 # * International Data Centre *
5 # * Comprehensive Nuclear Test Ban Treaty Organization *
6 # * Vienna *
7 # * Austria *
8 # *
9 # * Don Morton (DM) *
10 # * Boreal Scientific Computing *
11 # * Fairbanks, Alaska USA *
12 # *******************************************************
13 
14 
15 
16 #-----------------------------------------------------------------------------
17 
18 #import argparse
19 import datetime
20 import logging
21 import os
22 import uuid
23 
24 import f90nml
25 
26 import ehratm.defaults
27 import ehratm.mylogger
28 
29 import nwpservice.wps.namelistwps
30 import nwpservice.wps.calcecmwfp
31 
33 
35 
36 
37 class EcmwfPlevelsWorkflow(object):
38 
39 
40  '''Class for managing preparation and execution of ecmwf plevels components
41  '''
42 
43 
44  def __init__(self,
45  wpswrf_distro_path=None,
46  working_rootdir=None,
47  ecmwf_coeffs_path=None,
48  start_time_dt=None,
49  stop_time_dt=None,
50  hours_intvl=None,
51  ungribbed_data_dict=None,
52  log_level=None):
53 
54  '''Initialises the class
55 
56  Brings in the ecmwfplevels, time and ungribbed_data dicts, to be used
57  for making the request to nwpservice
58 
59  Parameters
60  ----------
61  wpswrf_distro_path : str
62  (Optional) Full path to the WPS/WRF distribution to be used.
63  This is assumed to have been installed in a way that is compatible
64  with the nwpservice module. If arg is not present, uses a default
65  value
66  working_rootdir : str
67  (Optional) Full path to a directory (assumed to have already
68  been created), to be used as scratch space for setting up and
69  running this geogrid instance. If arg is not present, uses a
70  default value
71  ecmwf_coeffs : str
72  (Optional) Full path to ecmwf_coeffs file. If arg is not present,
73  uses a default value
74  start_time_dt : datetime
75  Time of first ungribbed sfc/ml file
76  stop_time_dt : datetime
77  Time of last ungribbed sfc/ml file
78  hours_intvl : int
79  Interval (in hours) between ungribbed sfc/ml files
80  ungribbed_data_dict : dict
81  Necessary information on the ecmwf_ml and ecmwf_sfc ungribbed
82  files. We expect dict with entries {'FG_NAME' : <path>, ...},
83  and expect FG_NAME to be either DEFAULTS.ecmwf_sfc_ungrib_prefix()
84  or DEFAULT.ecmwf_ml_ungrib_prefix
85  log_level : int
86  Python logging level (e.g. logging.INFO)
87  '''
88 
89 
90 
91  if log_level:
92  self._logging_level = log_level
93  else:
94  self._logging_level = DEFAULTS.log_level()
95  LOGGER.setLevel(self._logging_level)
96 
97  # NOW I can use the logger
98  LOGGER.debug('started')
99 
100  # Do some checks (need to expand on these over the years)
101  # Essentially, would be good to control bail here if there's a problem
102  # as opposed to further down
103 
104  if not wpswrf_distro_path:
105  wpswrf_distro_path = DEFAULTS.wpswrf_distro_path()
106  self._wpswrf_distro_path = wpswrf_distro_path
107  LOGGER.debug('wpswrf_distro_path: %s' % self._wpswrf_distro_path)
108  if not os.path.isdir(self._wpswrf_distro_path):
109  raise FileNotFoundError('wpswrf_distro_path not found: %s' %
110  self._wpswrf_distro_path)
111 
112  if not working_rootdir:
113  working_rootdir = DEFAULTS.working_scratch_rootdir()
114  self._working_rootdir = working_rootdir
115  if not os.path.isdir(self._working_rootdir):
116  raise FileNotFoundError('working_rootdir not found: %s' %
117  self._working_rootdir)
118  LOGGER.debug('working_rootdir: %s' % self._working_rootdir)
119 
120  # Check that ecmwf_coefs, either the arg or the default, is
121  # accessible
122  if ecmwf_coeffs_path:
123  ecmwf_coeffs = ecmwf_coeffs_path
124  else:
125  ecmwf_coeffs = DEFAULTS.ecmwf_coeffs()
126  if not os.path.isfile(ecmwf_coeffs):
127  raise FileNotFoundError('ecmwf_coeffs: %s' % ecmwf_coeffs)
128  self._ecmwf_coeffs_path = ecmwf_coeffs
129 
130 
131  # Go through the ungribbed data, ensure we have the two types we want,
132  # and ensure they have the timestamps we're expecting
133  if not ungribbed_data_dict:
134  raise ValueError("ungribbed_data_dict not defined")
135 
136  # First, we ensure that the expected SFC and ML types are in there
137  sfc_prefix = DEFAULTS.ecmwf_sfc_ungrib_prefix()
138  ml_prefix = DEFAULTS.ecmwf_ml_ungrib_prefix()
139 
140  try:
141  ungribbed_data_keys = ungribbed_data_dict.keys()
142  except:
143  raise ValueError('Unable to retrieve ungribbed_data_keys')
144  if sfc_prefix not in ungribbed_data_keys:
145  raise ValueError('Missing key: %s' % sfc_prefix)
146  if ml_prefix not in ungribbed_data_keys:
147  raise ValueError('Missing key: %s' % ml_prefix)
148 
149  # Then, we iterate through expected timestamps and ensure that
150  # the ungribbed files are there
151  ml_path = ungribbed_data_dict[ml_prefix]
152  sfc_path = ungribbed_data_dict[sfc_prefix]
153  curr_dt = start_time_dt
154  while curr_dt <= stop_time_dt:
155  LOGGER.debug('curr_dt: %s' % curr_dt)
156  curr_timestr = curr_dt.strftime('%Y-%m-%d_%H')
157  LOGGER.debug('curr_timestr: %s' % curr_timestr)
158 
159  ml_filepath = os.path.join(ml_path, ml_prefix + ':' + curr_timestr)
160  sfc_filepath = os.path.join(sfc_path, sfc_prefix + ':' + curr_timestr)
161  if not os.path.isfile(ml_filepath):
162  raise FileNotFoundError('Input not found: %s' % ml_filepath)
163  if not os.path.isfile(sfc_filepath):
164  raise FileNotFoundError('Input not found: %s' % sfc_filepath)
165  curr_dt += datetime.timedelta(hours=hours_intvl)
166 
167  self._start_time_dt = start_time_dt
168  self._stop_time_dt = stop_time_dt
169  self._hours_intvl = hours_intvl
170 
171 
172  self._ungribbed_data_dict = ungribbed_data_dict
173 
174 
176 
177 
178 
179 
180 
181  def run_ecmwfplevels(self, ecmwfplevels_output_stagedir=None):
182 
183  '''Set up and run, via nwpservice module, instance of ecmwfplevels
184 
185  Parameters
186  ----------
187  ecmwfplevels_output_stagedir : str
188  (Optional) Full path to dir where ecmwfplevels files will be staged.
189  If not specified, no staging will be done. If specified, we
190  assume the dir already exists.
191 
192  Returns
193  -------
194  return_dict : dict
195  Dictionary with manifest of the ecmwfplevels files and the location
196  (if applicable) of staged files
197  '''
198 
199 
200  LOGGER.debug('Start run_ecmwfplevels()')
201 
202 
203  ecmwfp_status_dict = {}
204 
205 
206  # Check that if the output stagedir is an arg, that it exists and is
207  # writable
208  if ecmwfplevels_output_stagedir:
209  if os.path.isdir(ecmwfplevels_output_stagedir) and \
210  os.access(ecmwfplevels_output_stagedir, os.W_OK):
211  LOGGER.debug('Output stage dir: %s' % ecmwfplevels_output_stagedir)
212  else:
213  raise FileNotFoundError('Problem with output stage dir: %s' %
214  ecmwfplevels_output_stagedir)
215 
216 
217  # Create name for namelist.wps
218  #namelist_wps_path = os.path.join(self._working_rootdir,
219  # str(uuid.uuid4()) + '_namelist.wps')
220  # This needs to be a unique name, on the off chance (which might
221  # occur in testing) that the working_rootdir has been used before
222  # and already has this file, which would cause a collision
223  #unique_nml_name = 'geogrid_namelist.wps-' + str(uuid.uuid4())[0:8]
224  namelist_wps_path = os.path.join(self._working_rootdir,
225  'ecmwfplevels_namelist.wps')
226  LOGGER.debug('namelist_wps_path: %s' % namelist_wps_path)
227 
228  # nwpservice object expects this dir to NOT exist - it will create it
229  # itself. Error will occur if it already exists
230  wpswrf_rundir = os.path.join(self._working_rootdir,
231  'ecmwfplevels_rundir')
232 
233  # Create namelist.wps
234  # The share section
235  start_time_str = self._start_time_dt.strftime('%Y%m%d%H%M%S')
236  end_time_str = self._stop_time_dt.strftime('%Y%m%d%H%M%S')
237  interval_secs = int(self._hours_intvl*3600)
238  share_section_dict = {
239  'max_dom' : 1,
240  'start_date_list' : [start_time_str],
241  'end_date_list' : [end_time_str],
242  'interval_seconds' : interval_secs
243  }
244  LOGGER.debug('share_section_dict: %s' % share_section_dict)
245  metgrid_section_dict = {
246  'fg_name' : [DEFAULTS.ecmwf_sfc_ungrib_prefix(),
247  DEFAULTS.ecmwf_ml_ungrib_prefix()]
248  }
249  LOGGER.debug('metgrid_section_dict: %s' % metgrid_section_dict)
250 
251  section_data_dict = {'share' : share_section_dict,
252  'metgrid' : metgrid_section_dict}
253  LOGGER.debug('section_data_dict: %s' % section_data_dict)
254 
255 
256  myobj = nwpservice.wps.namelistwps.NamelistWpsWriter(
257  destpath=namelist_wps_path,
258  section_data_dict=section_data_dict,
259  log_level=self._logging_level
260  )
261  LOGGER.debug('Writing namelist: %s' % namelist_wps_path)
262  myobj.write()
263 
264 
265  # Create the nwpservice object
266  nwpecmwfp_obj = nwpservice.wps.calcecmwfp.CalcEcmwfPlevels(
267  wpswrf_distro_path=self._wpswrf_distro_path,
268  wpswrf_rundir=wpswrf_rundir,
269  ungribbed_data=self._ungribbed_data_dict,
270  ecmwf_coeffs_path=self._ecmwf_coeffs_path,
271  namelist_wps=namelist_wps_path,
272  output_dir=ecmwfplevels_output_stagedir,
273  log_level=self._logging_level
274  )
275 
276  nwpecmwfp_obj.setup()
277  output_manifest = nwpecmwfp_obj.run()
278  LOGGER.debug('output_manifest: %s' % output_manifest)
279 
280 
281  staging_dir = None
282 
283  # If this wasn't successful (even if an output staging dir was
284  # never specified), it should return False
285  stage_success = nwpecmwfp_obj.stage_output(auxfiles=False)
286  if stage_success:
287  staging_dir = ecmwfplevels_output_stagedir
288  else:
289  staging_dir = None
290 
291 
292  return_dict = {
293  'ecmwfp_output_manifest' : output_manifest,
294  'staging_dir' : staging_dir
295  }
296 
297  LOGGER.debug('return_dict: %s' % return_dict)
298 
299  return return_dict
ehratm.wps.ecmwfplevels.EcmwfPlevelsWorkflow._start_time_dt
_start_time_dt
Definition: ecmwfplevels.py:159
ehratm.wps.ecmwfplevels.EcmwfPlevelsWorkflow._ecmwf_coeffs_path
_ecmwf_coeffs_path
Definition: ecmwfplevels.py:120
ehratm.defaults
Definition: defaults.py:1
ehratm.wps.ecmwfplevels.EcmwfPlevelsWorkflow._wpswrf_distro_path
_wpswrf_distro_path
Definition: ecmwfplevels.py:98
ehratm.wps.ecmwfplevels.EcmwfPlevelsWorkflow._ungribbed_data_dict
_ungribbed_data_dict
Definition: ecmwfplevels.py:164
ehratm.wps.ecmwfplevels.EcmwfPlevelsWorkflow
Definition: ecmwfplevels.py:37
ehratm.wps.ecmwfplevels.EcmwfPlevelsWorkflow.__init__
def __init__(self, wpswrf_distro_path=None, working_rootdir=None, ecmwf_coeffs_path=None, start_time_dt=None, stop_time_dt=None, hours_intvl=None, ungribbed_data_dict=None, log_level=None)
Definition: ecmwfplevels.py:44
ehratm.wps.ecmwfplevels.EcmwfPlevelsWorkflow.run_ecmwfplevels
def run_ecmwfplevels(self, ecmwfplevels_output_stagedir=None)
Definition: ecmwfplevels.py:181
ehratm.defaults.Defaults
Definition: defaults.py:9
ehratm.mylogger.getlogger
def getlogger()
Definition: mylogger.py:11
ehratm.mylogger
Definition: mylogger.py:1
ehratm.wps.ecmwfplevels.EcmwfPlevelsWorkflow._hours_intvl
_hours_intvl
Definition: ecmwfplevels.py:161
ehratm.wps.ecmwfplevels.EcmwfPlevelsWorkflow._logging_level
_logging_level
Definition: ecmwfplevels.py:84
ehratm.wps.ecmwfplevels.EcmwfPlevelsWorkflow._stop_time_dt
_stop_time_dt
Definition: ecmwfplevels.py:160
ehratm.wps.ecmwfplevels.EcmwfPlevelsWorkflow._working_rootdir
_working_rootdir
Definition: ecmwfplevels.py:106