''''
Retrieve ecmwf data from new CDS api.
'''
#!/usr/bin/env python
from datetime import datetime, timedelta
from collections import OrderedDict
import calendar
import sys
#from ecmwfapi import ECMWFDataServer
import cdsapi
from dateutil.relativedelta import *
#from retrying import retry
import logging
import glob
from joblib import Parallel, delayed
import subprocess
#import multiprocessing
[docs]def retrieve_era5_surf( product, startDate,endDate,eraDir, latN,latS,lonE,lonW, step):
""" Sets up era5 surface retrieval.
* Creates list of year/month pairs to iterate through.
* MARS retrievals are most efficient when subset by time.
* Identifies preexisting downloads if restarted.
* Calls api using parallel function.
Args:
product: "reanalysis" (HRES) or "ensemble_members" (EDA)
startDate:
endDate:
eraDir: directory to write output
latN: north latitude of bbox
latS: south latitude of bbox
lonE: easterly lon of bbox
lonW: westerly lon of bbox
Returns:
Monthly era surface files.
"""
# product = config["forcing"]["product"]
# print(product)
# startDate = config["main"]["startDate"]
# endDate = config["main"]["endDate"]
#grd = config["era-interim"]["grid"]
#dataset = config["forcing"]["dataset"]
#grid=str(grd) + "/" + str(grd)
num_cores = 4 #config['main']['num_cores']
bbox=(str(latN) + "/" + str(lonW) + "/" + str(latS) + "/" + str(lonE) )
if (step == "1"):
time = [ '00:00','01:00','02:00',\
'03:00','04:00','05:00',\
'06:00','07:00','08:00',\
'09:00','10:00','11:00',\
'12:00','13:00','14:00',\
'15:00','16:00','17:00',\
'18:00','19:00','20:00',\
'21:00','22:00','23:00']
if (step == "3"):
time = ['00:00','03:00','06:00','09:00','12:00','15:00','18:00','21:00']
if (step == "6"):
time = ['00:00','06:00','12:00','18:00']
# download buffer of +/- 1 month to ensure all necessary timestamps are there for interpolations and consistency between plevel and surf
dates = [str(startDate), str(endDate)]
start = datetime.strptime(dates[0], "%Y-%m-%d")
end = datetime.strptime(dates[1], "%Y-%m-%d")
start = start+relativedelta(months=-1)
end = end+relativedelta(months=+1)
dateList = OrderedDict(((start + timedelta(_)).strftime(r"%Y-%m"), None) for _ in xrange((end - start).days)).keys()
print("Start date = " , dateList[0])
print("End date = " , dateList[len(dateList)-1])
print("cores used = " + str(num_cores))
requestDatesVec = []
targetVec=[]
yearVec=[]
monthVec=[]
for date in dateList:
strsplit = date.split('-' )
year = int(strsplit[0])
month = int(strsplit[1])
# firstDate = "%04d%02d%02d" % (year, month, 1)
# numberOfDays = calendar.monthrange(year, month)[1]
# lastDate = "%04d%02d%02d" % (year, month, numberOfDays)
target = eraDir + "SURF_%04d%02d.nc" % (year, month)
# requestDates = (firstDate + "/TO/" + lastDate)
# requestDatesVec.append(requestDates)
targetVec.append(target)
yearVec.append(year)
monthVec.append(month)
# find files that already downloaded if any with exact matches (in case of restarts)
dataExists = glob.glob(eraDir +"/SURF_??????.nc")
# list only files that dont exist
targetVecNew = [x for x in targetVec if x not in dataExists]
logging.info("ECWMF SURF data found:" )
logging.info(dataExists)
logging.info("Downloading SURF from ECWMF:")
logging.info(targetVecNew)
# Amend requestDatesVec
index = [targetVec.index(x) for x in targetVecNew]
yearVecNew = [yearVec[i] for i in index]
monthVecNew = [monthVec[i] for i in index]
# https://zacharyst.com/2016/03/31/parallelize-a-multifunction-argument-in-python/
Parallel(n_jobs=int(num_cores))(delayed( era5_request_surf)(int(yearVecNew[i]), int(monthVecNew[i]), bbox, targetVecNew[i], product, time) for i in range(0,len(yearVecNew)))
#@retry(wait_random_min=10000, wait_random_max=20000)
[docs]def era5_request_surf(year, month, bbox, target, product, time):
"""CDS surface api call"""
c = cdsapi.Client()
c.retrieve(
'reanalysis-era5-single-levels',
{
'variable':['geopotential', '2m_dewpoint_temperature', 'surface_thermal_radiation_downwards', 'surface_solar_radiation_downwards',
'Total precipitation','2m_temperature', 'TOA incident solar radiation',
'friction_velocity','instantaneous_moisture_flux','instantaneous_surface_sensible_heat_flux'
],
'product_type': product,
"area": bbox,
'year':int(year),
'month':[
int(month)
],
'day':[
'01','02','03',
'04','05','06',
'07','08','09',
'10','11','12',
'13','14','15',
'16','17','18',
'19','20','21',
'22','23','24',
'25','26','27',
'28','29','30',
'31'
],
'time':time,
'format':'netcdf'
},
target)
print(target+ " complete")
[docs]def retrieve_era5_plev(product, startDate,endDate,eraDir, latN,latS,lonE,lonW, step, plevels):
""" Sets up era5 pressure level retrieval.
* Creates list of year/month pairs to iterate through.
* MARS retrievals are most efficient when subset by time.
* Identifies preexisting downloads if restarted.
* Calls api using parallel function.
Args:
config: config object defining INI
eraDir: directory to write output
latN: north latitude of bbox
latS: south latitude of bbox
lonE: easterly lon of bbox
lonW: westerly lon of bbox
Returns:
Monthly era pressure level files.
"""
#product = config["forcing"]["product"]
#startDate = config["main"]["startDate"]
#endDate = config["main"]["endDate"]
#grd = config["era-interim"]["grid"]
#dataset = config["forcing"]["dataset"]
#grid=str(grd) + "/" + str(grd)
num_cores = 4 #config['main']['num_cores']
bbox=(str(latN) + "/" + str(lonW) + "/" + str(latS) + "/" + str(lonE) )
if (step == "1"):
time = [ '00:00','01:00','02:00',\
'03:00','04:00','05:00',\
'06:00','07:00','08:00',\
'09:00','10:00','11:00',\
'12:00','13:00','14:00',\
'15:00','16:00','17:00',\
'18:00','19:00','20:00',\
'21:00','22:00','23:00']
if (step == "3"):
time = ['00:00','03:00','06:00','09:00','12:00','15:00','18:00','21:00']
if (step == "6"):
time = ['00:00','06:00','12:00','18:00']
# download buffer of +/- 1 month to ensure all necessary timestamps are there for interpolations and consistency between plevel and surf
dates = [str(startDate), str(endDate)]
start = datetime.strptime(dates[0], "%Y-%m-%d")
end = datetime.strptime(dates[1], "%Y-%m-%d")
start = start+relativedelta(months=-1)
end = end+relativedelta(months=+1)
dateList = OrderedDict(((start + timedelta(_)).strftime(r"%Y-%m"), None) for _ in xrange((end - start).days)).keys()
print("Start date = " , dateList[0])
print("End date = " , dateList[len(dateList)-1])
print("cores used = " + str(num_cores))
requestDatesVec = []
targetVec=[]
yearVec=[]
monthVec=[]
for date in dateList:
strsplit = date.split('-' )
year = int(strsplit[0])
month = int(strsplit[1])
# firstDate = "%04d%02d%02d" % (year, month, 1)
# numberOfDays = calendar.monthrange(year, month)[1]
# lastDate = "%04d%02d%02d" % (year, month, numberOfDays)
target = eraDir + "PLEV_%04d%02d.nc" % (year, month)
# requestDates = (firstDate + "/TO/" + lastDate)
# requestDatesVec.append(requestDates)
targetVec.append(target)
yearVec.append(year)
monthVec.append(month)
# find files that already downloaded if any with exact matches (in case of restarts)
dataExists = glob.glob(eraDir +"/PLEV_??????.nc")
# list only files that dont exist
targetVecNew = [x for x in targetVec if x not in dataExists]
logging.info("ECWMF PLEV data found:" )
logging.info(dataExists)
logging.info("Downloading PLEV from ECWMF:")
logging.info(targetVecNew)
# Amend requestDatesVec
index = [targetVec.index(x) for x in targetVecNew]
yearVecNew = [yearVec[i] for i in index]
monthVecNew = [monthVec[i] for i in index]
# https://zacharyst.com/2016/03/31/parallelize-a-multifunction-argument-in-python/
Parallel(n_jobs=int(num_cores))(delayed( era5_request_plev)(int(yearVecNew[i]), int(monthVecNew[i]), bbox, targetVecNew[i], product, time,plevels) for i in range(0,len(yearVecNew)))
#@retry(wait_random_min=10000, wait_random_max=20000)
[docs]def era5_request_plev(year, month, bbox, target, product, time,plevels):
"""CDS plevel api call"""
c = cdsapi.Client()
c.retrieve(
'reanalysis-era5-pressure-levels',
{
'product_type': product,
'format':'netcdf',
"area": bbox,
'variable':[
'geopotential','temperature','u_component_of_wind',
'v_component_of_wind', 'relative_humidity'
],
'pressure_level':plevels,
'year':int(year),
'month':[
int(month)
],
'day':[
'01','02','03',
'04','05','06',
'07','08','09',
'10','11','12',
'13','14','15',
'16','17','18',
'19','20','21',
'22','23','24',
'25','26','27',
'28','29','30',
'31'
],
'time':time,
'format':'netcdf'
},
target)
print(target+ " complete")
#if __name__ == "__main__":
# config = sys.argv[1]
# eraDir = sys.argv[2]
# latNorth = str(float(sys.argv[3]))
# latSouth = str(float(sys.argv[4]))
# lonEast = str(float(sys.argv[5]))
# lonWest = str(float(sys.argv[6]))
# retrieve_interim(config,eraDir, latNorth,latSouth,lonEast,lonWest)
[docs]def eraCat(wd, grepStr):
"""
*** CDO ***
Concats monthly era (interim or 5) files by some keyword *grepStr*. Depends on CDO.
- reads monthly ERA5 data files (MARS efficiency)
- concats to single file timeseries
Syntax: cdo -b F64 -f nc2 mergetime SURF* SURF.nc
Args:
wd: directory of era monthly datafiles
grepStr: "PLEV" or "SURF"
Example:
wd=/home/eraDat/
grepStr= "PLEV"
eraCat("/home/joel/mnt/myserver/sim/wfj_era5/eraDat/", "PLEV")
"""
cmd = ("cdo -b F64 -f nc2 mergetime "
+ wd
+ grepStr
+ "* "
+ wd
+"/"
+grepStr
+".nc")
subprocess.check_output(cmd, shell = "TRUE")
[docs]def eraCat5d(wd, grepStr):
"""
*** NCO ***
Updated method to deal with 5D of ensemble datasets NOT supported by CDO
Concats monthly era (interim or 5) files by some keyword *grepStr*. Depends on NCO.
sudo apt-get install nco
- reads monthly ERA5 data files (MARS efficiency)
- concats to single file timeseries
- 2 steps
- assign time to "record" dimeanion in firstfile
- concat files
Args:
wd: directory of era monthly datafiles
grepStr: "PLEV" or "SURF"
Example:
wd=/home/eraDat/
grepStr= "PLEV"
eraCat("/home/joel/mnt/myserver/sim/wfj_era5/eraDat/", "PLEV")
"""
lst = glob.glob(grepStr+'*')
lst.sort()
firstfile = lst[0]
cmd = cmd = ("ncks -O --mk_rec_dmn time " + firstfile + " " + firstfile)
subprocess.check_output(cmd, shell = "TRUE")
cmd = ("ncrcat " +grepStr+"*"+" " + grepStr+".nc")
subprocess.check_output(cmd, shell = "TRUE")
[docs]def retrieve_era5_tpmm( startDate,endDate,eraDir, latN,latS,lonE,lonW):
"""
retrive monthly tp means to correct 6 or 3h TP retrieval
documented era5 here: https://confluence.ecmwf.int/display/CKB/ERA5+data+documentation#ERA5datadocumentation-Monthlymeans
values are mean daily for that month, weight by number of days in month eg annualtotal = sum(wfj_m*c(31,28,31,30,31,30,31,31,30,31,30,31))
"""
# eraDir='/home/joel/sim/imis/forcing/'
# latN=47.9
# latS=45.75
# lonW=5.7
# lonE=10.6
# startDate = '1979-09-01' # adjust to your requirements - as of 2017-07 only 2010-01-01 onwards is available
# endDate = '2018-09-01'
param='total_precipitation' # adjust to your requirements
months = [1,2,3,4,5,6,7,8,9,10,11,12]
dates = [str(startDate), str(endDate)]
start = datetime.strptime(dates[0], "%Y-%m-%d")
end = datetime.strptime(dates[1], "%Y-%m-%d")
start = start+relativedelta(months=-1)
end = end+relativedelta(months=+1)
dateList = OrderedDict(((start + timedelta(_)).strftime(r"%Y-%m"), None) for _ in xrange((end - start).days)).keys()
#target = eraDir + "tp_%04d%02d.nc" % (startDate, endDate)
target = eraDir + "tpmm.nc"
bbox=(str(latN) + "/" + str(lonW) + "/" + str(latS) + "/" + str(lonE) )
yearVec=[]
for date in dateList:
strsplit = date.split('-' )
year = int(strsplit[0])
yearVec.append(year)
myyears = list(sorted(set(yearVec)))
c = cdsapi.Client()
c.retrieve(
'reanalysis-era5-single-levels-monthly-means',
{
'product_type':'reanalysis-monthly-means-of-daily-means',
'area': bbox,
'variable': param,
'year': myyears,
'month':[
'01','02','03',
'04','05','06',
'07','08','09',
'10','11','12'
],
'time':'00:00',
'format':'netcdf'
},
target)