Example jrnr scriptΒΆ

'''

This module demonstrates the specification of a jrnr script

This jrnr script will compute daily average temperature 
as a the average of daily max and min temperature. 

To run, jrnr requires a parameterized job spec. This is constructed 
as a list on line 105 and handed to jrnr's `slurm_runner` decorator on line 123.

Each job, interactive and batch, in jrnr receives a 
dictionary which fully parameterizes the input arguments. 

To parameterize your job spec jrnr takes the 
cartesian product of the items in lists of dictionaries. 
`JOB_SPEC` on line 112 is simply a list of those lists. 
'''

import os
import logging
import time
import xarray as xr
import pandas as pd
import numpy as np
import climate_toolbox.climate_toolbox as ctb 
from jrnr.jrnr import slurm_runner

#set up logging format and configuration
FORMAT = '%(asctime)-15s %(message)s'
logging.basicConfig(format=FORMAT)
logger = logging.getLogger('uploader')
logger.setLevel('DEBUG')


description = '\n\n'.join(
        map(lambda s: ' '.join(s.split('\n')),
            __doc__.strip().split('\n\n')))

oneline = description.split('\n')[0]

__author__ = 'Justin Simcock'
__contact__ = 'jsimcock@rhg.com'
__version__ = '1.0'


READ_PATH = (
    '/global/scratch/groups/co_laika/gcp/climate/nasa_bcsd/reformatted/' +
    '{scenario}/{model}/{variable}/' +
    '{variable}_day_BCSD_{scenario}_r1i1p1_{model}_{year}/1.0.nc4')

WRITE_PATH = ('/global/scratch/groups/co_laika/gcp/climate/nasa_bcsd/reformatted/' +
                '{scenario}/{model}/{variable}/' + 
                '{variable}_day_BCSD_{scenario}_r1i1p1_{model}_{year}/' +
                '{version}.nc4')


ADDITIONAL_METADATA = dict(
    oneline=oneline,
    description=description,
    author=__author__,
    contact=__contact__,
    version=__version__,
    repo='https://gitlab.com/ClimateImpactLab/make_tas/',
    file=str(__file__),
    execute='python {} run'.format(str(__file__)),
    project='gcp',
    team='climate',
    frequency='annual',
    write_variable='tas',
    dependencies='')


def make_tas_ds(tasmax, tasmin):

    tas = xr.Dataset()
    tas['tas'] = (tasmax.tasmax + tasmin.tasmin) / 2.
    return tas

PERIODS = (
    [dict(scenario='historical', year=y) for y in range(1981, 2006)] +
    [dict(scenario='rcp45',  year=y) for y in range(2006, 2100)] +
    [dict(scenario='rcp85', year=y) for y in range(2006, 2100)])


MODELS = list(map(lambda x: dict(model=x), [
    'ACCESS1-0',
    'bcc-csm1-1',
    'BNU-ESM',
    'CanESM2',
    'CCSM4',
    'CESM1-BGC',
    'CNRM-CM5',
    'CSIRO-Mk3-6-0',
    'GFDL-CM3',
    'GFDL-ESM2G',
    'GFDL-ESM2M',
    'IPSL-CM5A-LR',
    'IPSL-CM5A-MR',
    'MIROC-ESM-CHEM',
    'MIROC-ESM',
    'MIROC5',
    'MPI-ESM-LR',
    'MPI-ESM-MR',
    'MRI-CGCM3',
    'inmcm4',
    'NorESM1-M'
    ]))


JOB_SPEC = [MODELS, PERIODS]

def validate_tas(tas):
    '''
    Make sure NaNs are not present
    '''
    msg_null = 'DataArray contains NaNs'
    msg_shape = 'DataSet dims {} do not match expected'
    tas_nan = tas.tas.sel(lat=slice(-85,85)).isnull().sum().values
    assert tas_nan == 0 , msg_null
    assert tas.dims['lat'] == 720,  msg_shape.format(tas.dims['lat'])
    assert tas.dims['lon'] == 1440, msg_shape.format(tas.dims['lon'])
    assert tas.dims['time'] in [364, 365, 366], msg_shape.format(tas.dims['time']) 
    assert tas.lon.min().values == -179.875
    assert tas.lon.max().values == 179.875
    return


@slurm_runner(job_spec=JOB_SPEC)
def make_tas(metadata,
            scenario,
            year,
            model,
            interactive=False
            ):


    metadata.update(ADDITIONAL_METADATA)

    tasmin_read = READ_PATH.format(variable='tasmin', **metadata)
    tasmax_read = READ_PATH.format(variable='tasmax', **metadata)

    metadata['dependencies'] = str([tasmin_read, tasmax_read])

    tas_write = WRITE_PATH.format(variable='tas', **metadata)


    if os.path.isfile(tas_write) and not interactive:
        tas = xr.open_dataset(tas_write, autoclose=True, chunks={'time': 100}).load()
        tas = ctb._standardize_longitude_dimension(tas)



    else: 
        tasmax = xr.open_dataset(tasmax_read, autoclose=True, chunks={'time': 100}).load()
        tasmin = xr.open_dataset(tasmin_read, autoclose=True, chunks={'time': 100}).load()

        logger.debug('beginning')

        logger.debug('producing_tas')
        tas = make_tas_ds(tasmax, tasmin)
        tas = ctb._standardize_longitude_dimension(tas)
    
    tas.attrs.update(metadata)
    
    if interactive:
        return tas

    logger.debug('checking_tas_path')
    if not os.path.isdir(os.path.dirname(tas_write)):
        os.makedirs(os.path.dirname(tas_write))

    tas.to_netcdf(tas_write + '~', encoding={var : {'dtype': 'float32'} for var in tas.data_vars.keys()})
    logger.debug('write_tas_path')

    validate_tas(tas)
    logger.debug('validate_tas')
    os.rename(tas_write + '~', tas_write)


    logger.debug('job_complete')


if __name__ == '__main__':
    make_tas()