#TODO: a simple function interface that takes as arguments a format,
# a set of filenames, and a date format (i.e., $Y$m$d) to extract
# from the file names.
# Expand globbed file expressions to a flat list of files
def expand_file_list (file_list, sort=True):
# from glob import iglob
from glob import iglob
from os.path import exists
if not isinstance(file_list, (list,tuple)): file_list = [file_list]
files = [ f for file_glob in file_list for f in iglob(file_glob)]
assert len(files) > 0, 'No matches found'
for f in files: assert exists(f), str(f)+" doesn't exist"
if sort: files.sort()
return files
# Interface for loading multiple files at a time (file a file glob)
# Inputs: the format, the glob, and any additional keywords to pass
# I.e.: openall(files = "file???.nc", format = netcdf)
#NOTE: for a large number of homogeneous files, use the alternative interface below
[docs]def openall (files, format=None, opener=None, **kwargs):
''' Returns a :class:`Dataset` containing variables merged across multiple files.
Parameters
==========
files : string, list, or tuple
Either a single filename or a list of filenames. Wildcards are supported, :func:`glob.iglob` is
used to expand these into an explicit list of files.
format : string, optional
String specifying format of file to open. If none is given the format will be automatically
detected from the first filename (see :func:`autodetectformat`)
opener : function, optional
Function to open individual files. If none is provided, uses the
format-specific version of :func:`open`. The datasets returned by this
function are then concatenated and returned. See Notes.
sorted : boolean, optional
If True, the filenames are sorted (by alpha) prior to opening each file, and
the axes on the returned dataset are sorted by calling :meth:`Dataset.sorted`.
**kwargs : keyword arguments
These are passed on to the function ``opener``;
Returns
=======
dataset
A dataset containing the variables concatenated across all specified files.
The variable data itself is not loaded into memory.
Notes
=====
The function ``opener`` must take a single positional argument - the filename of the file
to open - and keyword arguments that are passed through from this function. It must return
a :class:`Dataset` object with the loaded variables. By default the standard
:func:`open` is used, but providing a custom opener can be useful for any reshaping of the
variables that must be done prior to concatenating the whole dataset.
Once every file has been opened, the resulting datasets are concatenated
using :func:`dataset.concat`.
This function is best suited for a moderate number of files. Because each
file must be explicitly opened to read the metadata, even this can take a
significant amount of time if a large number of files are being opened. For
these cases using :func:`open_multi` can be much more efficient, though it
requires more coding effort initially. The underlying concatenation is also
more efficient when the data is actually accessed.
See Also
========
open
open_multi
'''
from pygeode.dataset import concat
from pygeode.formats import autodetectformat
sort = kwargs.pop('sorted', True)
files = expand_file_list (files, sort)
if opener is None:
if format is None: format = autodetectformat(files[0])
if not hasattr(format, 'open'):
try:
format = __import__("pygeode.formats.%s" % format, fromlist=["pygeode.formats"])
except ImportError:
raise ValueError('Unrecognized format module %s.' % format)
opener = format.open
datasets = [ opener(f, **kwargs) for f in files]
ds = concat(*[d for d in datasets if d is not None])
if sort: ds = ds.sorted()
return ds
[docs]def open_multi (files, format=None, opener=None, pattern=None, file2date=None, **kwargs):
# {{{
''' Returns a :class:`Dataset` containing variables merged across many files.
Parameters
==========
files : string, list, or tuple
Either a single filename or a list of filenames. Wildcards are supported, :func:`glob.iglob` is
used to expand these into an explicit list of files.
format : string, optional
String specifying format of file to open. If none is given the format will be automatically
detected from the first filename (see :func:`autodetectformat`)
opener : function, optional
Function to open individual files. If none is provided, uses the
format-specific version of :func:`open`. The datasets returned by this
function are then concatenated and returned. See Notes.
pattern : string, optional
A regex pattern to extract date stamps from the filename; used by default file2date.
Matching patterns must be named <year>, <month>, <day>, <hour> or <minute>.
Abbreviations are available for the above; $Y matches a four digit year, $m, $d, $H,
and $M match a two-digit month, day, hour and minute, respectively.
file2date : function, optional
Function which returns a date dictionary given a filename. By default this is produced
by applying the regex pattern ``pattern`` to the filename.
sorted : boolean, optional
If True, the filenames are sorted (by alpha) prior to opening each file, and
the axes on the returned dataset are sorted by calling :meth:`Dataset.sorted`.
**kwargs : keyword arguments
These are passed on to the function ``opener``;
Returns
=======
dataset
A dataset containing the variables concatenated across all specified files.
The variable data itself is not loaded into memory.
Notes
=====
This is intended to provide access to large datasets whose files are
separated by timestep. To avoid opening every file individually, the time
axis is constructed by opening the first and the last file in the list of
files provided. This is done to provide a template of what variables and what
times are stored in each file - it is assumed that the number of timesteps
(and their offsets) is the same accross the whole dataset. The time axis is
then constructed from the filenames themselves, using the function
``file2date`` to generate a date from each filename. As a result only two files
need to be opened, which makes this a very efficient way to work with very large
datasets.
However, no explicit check is made of the integrity of the files - if there
are corrupt or missing data within individual files, this will not become
clear until that data is actually accessed. This can be done explicitly with
:func:`check_multi`, which explicitly attempts to access all the data and
returns a list of any problems encountered; this can take a long time, but is
a useful check (and is more likely to provide helpful error messages).
The function ``opener`` must take a single positional argument - the filename
of the file to open - and keyword arguments that are passed through from this
function. It must return a :class:`Dataset` object with the loaded variables.
By default the standard :func:`open` is used, but providing a custom opener
can be useful for any reshaping of the variables that must be done prior to
concatenating the whole dataset.
See Also
========
open
openall
'''
from pygeode.timeaxis import Time, StandardTime
from pygeode.timeutils import reltime, delta
from pygeode.dataset import Dataset
from pygeode.tools import common_dict
from pygeode.formats import open, autodetectformat
import numpy as np
files = expand_file_list(files)
nfiles = len(files)
assert nfiles > 0
if opener is None:
if format is None: format = autodetectformat(files[0])
if not hasattr(format, 'open'):
try:
format = __import__("pygeode.formats.%s" % format, fromlist=["pygeode.formats"])
except ImportError:
raise ValueError('Unrecognized format module %s.' % format)
opener = format.open
# Apply keyword arguments
if len(kwargs) > 0:
old_opener = opener
opener = lambda f: old_opener (f, **kwargs)
# Degenerate case: only one file was given
if nfiles == 1: return opener(files[0])
# We'll need a function to translate filenames to dates
# (if we don't have one, use the supplied pattern to make one)
if file2date is None:
import re
assert pattern is not None, "I don't know how to get the dates from the filenames"
regex = pattern
regex = regex.replace('$Y', '(?P<year>[0-9]{4})')
regex = regex.replace('$m', '(?P<month>[0-9]{2})')
regex = regex.replace('$d', '(?P<day>[0-9]{2})')
regex = regex.replace('$H', '(?P<hour>[0-9]{2})')
regex = regex.replace('$M', '(?P<minute>[0-9]{2})')
regex = re.compile(regex)
def file2date (f):
d = regex.search(f)
assert d is not None, "can't use the pattern on the filenames?"
d = d.groupdict()
d = dict([k,int(v)] for k,v in d.items() if v is not None)
# Apply default values (i.e. for minutes, seconds if they're not in the file format?)
d = dict({'hour':0, 'minute':0,'second':0}, **d)
return d
# Get the starting date of each file
dates = [file2date(f) for f in files]
dates = dict((k,[d[k] for d in dates]) for k in list(dates[0].keys()))
# Open a file to get a time axis
file = opener(files[0])
T = None
for v in file.vars:
if v.hasaxis(Time):
T = type(v.getaxis(Time))
break
if T is None: T = StandardTime
# T = [v.getaxis(Time) for v in file.vars if v.hasaxis(Time)]
# T = type(T[0]) if len(T) > 0 else StandardTime
del file
# Generate a lower-resolution time axis (the start of *each* file)
faxis = T(units='days',**dates)
# Re-sort the files, if they weren't in order
S = faxis.argsort()
faxis = faxis.slice[S]
files = [files[s] for s in S]
# Re-init the faxis to force the proper start date
faxis = type(faxis)(units=faxis.units, **faxis.auxarrays)
# Open the first and last file, so we know what the variables & timesteps are
first = opener(files[0])
last = opener(files[-1])
names = [v.name for v in first.vars]
for n in names: assert n in last, "inconsistent vars"
# Get global attributes
global_atts = common_dict (first.atts, last.atts)
#---
timedict = {None:faxis}
for v1 in first:
if not v1.hasaxis(Time): continue
t1 = v1.getaxis(Time)
if t1.name in timedict: continue # already handled this one
t2 = last[v1.name].getaxis(Time)
# Construct a full time axis from these pieces
# One timestep per file? (check for an offset for the var time compared
# to the file time)
if max(len(t1),len(t2)) == 1:
offset = reltime(t1, startdate=faxis.startdate, units=faxis.units)[0]
taxis = faxis.withnewvalues(faxis.values + offset)
# At least one of first/last files has multiple timesteps?
else:
assert t1.units == t2.units
dt = max(delta(t1),delta(t2))
assert dt > 0
val1 = t1.values[0]
val2 = reltime(t2, startdate=t1.startdate)[-1]
ntf = (val2 - val1)/dt
nt = np.round(ntf)
if np.abs(ntf - nt) > 0.1 * dt:
raise ValueError('Cannot infer well-formed time axis out of provided files. Times and dates in first and last file are not aligned.')
nt = int(nt) + 1
if nt <= 0:
raise ValueError('Cannot infer well-formed time axis out of provided files. Times and dates in the last file are later than those in the first file.')
taxis = t1.withnewvalues(np.arange(nt)*dt + val1)
timedict[t1.name] = taxis
#---
# Create the multifile version of the vars
vars = [Multifile_Var(v1, opener, files, faxis, timedict) for v1 in first]
return Dataset(vars,atts=global_atts)
# }}}
from pygeode.var import Var
class Multifile_Var (Var):
# {{{
def __init__ (self, v1, opener, files, faxis, timedict):
# {{{
# v1 - var chunk from the first file
# opener - method for opening a file given a filename
# files - list of filenames
# faxis - starting time of each file, as a time axis
# timedict - dictionary of pre-constructed time axes
from pygeode.var import Var
from pygeode.timeaxis import Time
self.opener = opener
self.files = files
self.faxis = faxis
n = v1.getaxis(Time).name if v1.hasaxis(Time) else None
taxis = timedict[n]
T = type(taxis)
axes = list(v1.axes)
# Replace the existing time axis?
# (look for either a Time axis or a generic axis with the name 'time')
if v1.hasaxis(T): axes[v1.whichaxis(T)] = taxis
elif v1.hasaxis('time'): axes[v1.whichaxis('time')] = taxis
else: axes = [taxis] + axes
Var.__init__ (self, axes, dtype=v1.dtype, name=v1.name, atts=v1.atts, plotatts=v1.plotatts)
# }}}
def getview (self, view, pbar):
# {{{
from pygeode.timeaxis import Time
from pygeode.timeutils import reltime
import numpy as np
from warnings import warn
out = np.empty(view.shape, self.dtype)
out[()] = float('nan')
# Get the times
itime = view.index(Time)
times = view.subaxis(Time)
handled_times = np.zeros([len(times)], dtype='bool')
# print times
# Map these times to values along the 'file' axis
x = reltime(times, startdate=self.faxis.startdate, units=self.faxis.units)
file_indices = np.searchsorted(self.faxis.values, x, side='right') - 1 # -1 because we want the file that has a date *before* the specified timestep
diff = np.diff(file_indices)
# Where a new file needs to be loaded
newfile_pos = list(np.where(diff != 0)[0] + 1)
newfile_pos = [0] + newfile_pos
# Loop over each file, git 'er done
for i,p in enumerate(newfile_pos):
file_index = file_indices[p]
try:
file = self.opener(self.files[file_index])
except Exception as e:
raise Exception("Multifile: error encountered when opening file '%s': %s"%(self.files[file_index], str(e)))
if self.name not in file:
raise Exception("Multifile: var '%s' was expected to be in file '%s', but it's not there!"%(self.name, self.files[file_index]))
var = file[self.name] # abandon all hope, ye who use non-unique variable names
# How does this var map to the overall time axis?
if var.hasaxis(Time):
timechunk = var.getaxis(Time)
else:
timechunk = self.faxis.slice[file_index]
# Remove any vestigial internal time axis
if var.hasaxis('time'):
assert len(var.getaxis('time')) == 1, "unresolved time axis. this should have been caught at init time!"
var = var.squeeze()
bigmap, smallmap = times.common_map(timechunk)
# Check for any funky problems with the map
# assert len(bigmap) > 0, "?? %s <-> %s"%(times,timechunk)
if len(bigmap) == 0:
raise Exception("Multifile: Expected to find data for variable %s for times %s in file %s, but none is present. Perhaps a file is missing, or filename to date mapping is incorrect?" % (self.name, timechunk, self.files[file_index]))
slices = [slice(None)] * self.naxes
slices[itime] = bigmap
slices = tuple(slices)
newview = view.replace_axis(Time, times, bigmap)
try:
data = newview.get(var, pbar=pbar.part(i,len(newfile_pos)))
except Exception as e:
raise Exception("Multifile: problem fetching variable '%s' from file '%s': %s"%(self.name, self.files[file_index], str(e)))
# Stick this data into the output
out[slices] = data
handled_times[bigmap] = True
if not np.all(handled_times):
missing_times = ~handled_times
missing_files = [self.files[i] for i in np.unique(file_indices[missing_times])]
itms = times[missing_times]
if len(itms) < 5:
stms = ', '.join([times.formatvalue(t) for t in itms])
else:
stms = ', ... '.join([times.formatvalue(t) for t in [itms[0], itms[-1]]])
if len(missing_files) < 4:
sfiles = ', '.join(missing_files)
else:
sfiles = ', ... '.join([missing_files[0], missing_files[-1]])
raise Exception("Multifile: can't find %d timesteps (%s) from %d files (%s) for variable '%s'." % \
(len(itms), stms, len(missing_files), sfiles, self.name))
return out
# }}}
# }}}
del Var
# Decorator for turning an 'open' method into a multifile open method
def multifile (opener):
def new_opener (files, pattern=None, file2date=None, **kwargs):
o = lambda f: opener(f, **kwargs) # attach the kwargs to the underlying format opener
return open_multi (files, pattern=pattern, file2date=file2date, opener=o)
new_opener.__doc__ = opener.__doc__
return new_opener
[docs]def check_multi (*args, **kwargs):
''' Validates the files for completeness and consistency with the assumptions
made by pygeode.formats.multifile.open_multi.
'''
from pygeode.timeutils import reltime
import numpy as np
# First, query open_multi to find out what we *expect* to see in all the files
full_dataset = open_multi (*args, **kwargs)
# Dig into this object, to find the list of files and the file opener.
# (this will break if open_multi or Multifile_Var are ever changed!)
sample_var = full_dataset.vars[0]
assert isinstance(sample_var,Multifile_Var)
files = sample_var.files
faxis = sample_var.faxis
opener = sample_var.opener
full_taxis = sample_var.getaxis('time')
del sample_var
# Helper method - associate a time axis value with a particular file.
def find_file (t):
i = np.searchsorted(faxis.values, t, side='right') - 1
if i == -1: return '(some missing file?)'
return files[i]
# Similar to above, but return all files that should cover all given timesteps.
def find_files (t_array):
return sorted(set(map(find_file,t_array)))
# Loop over each file, and check the contents.
all_ok = True
all_expected_times = set(full_taxis.values)
# Check for uniformity in the data, and report any potential holes.
dt = np.diff(full_taxis.values)
expected_dt = min(dt[dt > 0])
gaps = full_taxis.values[np.where(dt > expected_dt)]
if len(gaps) > 0:
print("ERROR: detected gaps on or after file(s):")
for filename in find_files(gaps):
print(filename)
print("There may be missing files near those files.")
all_ok = False
covered_times = set()
for i,filename in enumerate(files):
print("Scanning "+filename)
try:
current_file = opener(filename)
except Exception as e:
print(" ERROR: Can't even open the file. Reason: %s"%str(e))
all_ok = False
continue
for var in current_file:
if var.name not in full_dataset:
print(" ERROR: unexpected variable '%s'"%var.name)
all_ok = False
continue
for var in full_dataset:
if var.name not in current_file:
print(" ERROR: missing variable '%s'"%var.name)
all_ok = False
continue
try:
source_data = current_file[var.name].get().flatten()
except Exception as e:
print(" ERROR: unable to read source variable '%s'. Reason: %s"%(var.name, str(e)))
all_ok = False
continue
try:
file_taxis = current_file[var.name].getaxis('time')
times = reltime(file_taxis, startdate=full_taxis.startdate, units=full_taxis.units)
rtimes = reltime(file_taxis, startdate=faxis.startdate, units=faxis.units)
multifile_data = var(l_time=list(times)).get().flatten()
except Exception as e:
print(" ERROR: unable to read multifile variable '%s'. Reason: %s"%(var.name, str(e)))
all_ok = False
continue
if len(source_data) != len(multifile_data):
print(" ERROR: size mismatch for variable '%s'"%var.name)
all_ok = False
continue
source_mask = ~np.isfinite(source_data)
multifile_mask = ~np.isfinite(multifile_data)
if not np.all(source_mask == multifile_mask):
print(" ERROR: different missing value masks found in multifile vs. direct access for '%s'"%var.name)
all_ok = False
continue
source_data = np.ma.masked_array(source_data, mask=source_mask)
multifile_data = np.ma.masked_array(multifile_data, mask=multifile_mask)
if not np.all(source_data == multifile_data):
print(" ERROR: get different data from multifile vs. direct access for '%s'"%var.name)
all_ok = False
continue
covered_times.update(times)
if i < len(files)-1 and np.any(rtimes >= faxis[i+1]):
print(" ERROR: found timesteps beyond the expected range of this file.")
all_ok = False
if np.any(rtimes < faxis[i]):
print(" ERROR: found timestep(s) earlier than the expected start of this file.")
all_ok = False
missing_times = all_expected_times - covered_times
if len(missing_times) > 0:
print("ERROR: did not get full time coverage. Missing some timesteps for file(s):")
for filename in find_files(missing_times):
print(filename)
all_ok = False
extra_times = covered_times - all_expected_times
if len(extra_times) > 0:
print("ERROR: found extra (unexpected) timesteps in the following file(s):")
for filename in find_files(extra_times):
print(filename)
all_ok = False
if all_ok:
print("Scan completed without any errors.")
else:
print("One or more errors occurred while scanning the files.")