Source code for solarforecastarbiter.utils

from contextlib import contextmanager
from hashlib import sha256
import logging
import warnings


import numpy as np
import pandas as pd


from solarforecastarbiter import datamodel


def _observation_valid(index, obs_id, aggregate_observations):
    """
    Indicates where the observation data is valid. For now,
    effective_from and effective_until are inclusive, so data missing
    at those times is marked as missing in the aggregate.
    """
    nindex = pd.DatetimeIndex([], tz=index.tz)
    for aggobs in aggregate_observations:
        if aggobs['observation_id'] == obs_id:
            if aggobs['observation_deleted_at'] is None:
                locs = index.slice_locs(aggobs['effective_from'],
                                        aggobs['effective_until'])
                nindex = nindex.union(index[locs[0]:locs[1]])
            elif (
                    aggobs['effective_until'] is None or
                    aggobs['effective_until'] >= index[0]
            ):
                raise ValueError(
                    'Deleted Observation data cannot be retrieved'
                    ' to include in Aggregate')
            else:  # observation deleted and effective_until before index
                return pd.Series(False, index=index)
    return pd.Series(1, index=nindex).reindex(index).fillna(0).astype(bool)


def _make_aggregate_index(data, interval_length, interval_label,
                          timezone):
    """
    Compute the aggregate the index should have based on the min and
    max timestamps in the data, the interval length, label, and timezone.
    """
    # first, find limits for a new index
    start = pd.Timestamp('20380119T031407Z')
    end = pd.Timestamp('19700101T000001Z')
    for df in data.values():
        start = min(start, min(df.index))
        end = max(end, max(df.index))
    # adjust start, end to nearest interval
    # hard to understand what this interval should be for
    # odd (e.g. 52min) intervals, so required that interval
    # is a divisor of one day
    if 86400 % pd.Timedelta(interval_length).total_seconds() != 0:
        raise ValueError(
            'interval_length must be a divisor of one day')
    if interval_label == 'ending':
        start = start.ceil(interval_length)
        end = end.ceil(interval_length)
    elif interval_label == 'beginning':
        start = start.floor(interval_length)
        end = end.floor(interval_length)
    else:
        raise ValueError(
            'interval_label must be beginning or ending for aggregates')
    # raise the error if unlocalized
    start = start.tz_convert(timezone)
    end = end.tz_convert(timezone)
    return pd.date_range(
        start, end, freq=interval_length, tz=timezone)


[docs]def compute_aggregate(data, interval_length, interval_label, timezone, agg_func, aggregate_observations, new_index=None): """ Computes an aggregate quantity according to agg_func of the data. This function assumes the data has an interval_value_type of interval_mean or instantaneous and that the data interval_length is less than or equal to the aggregate interval_length. NaNs in the output are the result of missing data from an underyling observation of the aggregate. Parameters ---------- data : dict of pandas.DataFrames With keys 'observation_id' corresponding to observation in aggregate_observations. DataFrames must have 'value' and 'quality_flag' columns. interval_length : str or pandas.Timedelta The time between timesteps in the aggregate result. interval_label : str Whether the timestamps in the aggregated output represent the beginning or ending of the interval timezone : str The IANA timezone for the output index agg_func : str The aggregation function (e.g 'sum', 'mean', 'min') to create the aggregate aggregate_observations : tuple of dicts Each dict should have 'observation_id' (string), 'effective_from' (timestamp), 'effective_until' (timestamp or None), and 'observation_deleted_at' (timestamp or None) fields. new_index : pandas.DatetimeIndex The index to resample data to. Will attempt to infer an index if not provided. Returns ------- pandas.DataFrame - Index is a DatetimeIndex that adheres to interval_length and interval_label - Columns are 'value', for the aggregated value according to agg_func, and 'quality_flag', the bitwise or of all flags in the aggregate for the interval. - A 'value' of NaN means that data from one or more observations was missing in that interval. Raises ------ KeyError If data is missing a key for an observation in aggregate_obsevations + Or, if any DataFrames in data do not have 'value' or 'quality_flag' columns ValueError If interval_length is not a divisor of one day and an index is not provided. + Or, if an observation has been deleted but the data is required for the aggregate + Or, if interval_label is not beginning or ending + Or, if data is empty and an index is provided. """ if new_index is None: new_index = _make_aggregate_index( data, interval_length, interval_label, timezone) unique_ids = {ao['observation_id'] for ao in aggregate_observations} valid_mask = {obs_id: _observation_valid( new_index, obs_id, aggregate_observations) for obs_id in unique_ids} expected_observations = {k for k, v in valid_mask.items() if v.any()} # Raise an exception if no observations are valid if len(expected_observations) == 0: raise ValueError( 'No effective observations in data') missing_from_data_dict = expected_observations - set(data.keys()) if missing_from_data_dict: raise KeyError( 'Cannot aggregate data with missing keys ' f'{", ".join(missing_from_data_dict)}') value_is_missing = pd.Series(False, index=new_index) value = {} qf = {} closed = datamodel.CLOSED_MAPPING[interval_label] for obs_id, df in data.items(): resampler = df.resample(interval_length, closed=closed, label=closed) new_val = resampler['value'].mean().reindex(new_index) # data is missing when the resampled value is NaN and the data # should be valid according to effective_from/until valid = valid_mask[obs_id] missing = new_val.isna() & valid if missing.any(): warnings.warn('Values missing for one or more observations') value_is_missing[missing] = True value[obs_id] = new_val[valid] qf[obs_id] = resampler['quality_flag'].apply(np.bitwise_or.reduce) final_value = pd.DataFrame(value).reindex(new_index).aggregate( agg_func, axis=1) final_value[value_is_missing] = np.nan # have to fill in nans and convert to int to do bitwise_or # only works with pandas >= 0.25.0 final_qf = pd.DataFrame(qf).reindex(new_index).fillna(0).astype( int).aggregate(np.bitwise_or.reduce, axis=1) out = pd.DataFrame({'value': final_value, 'quality_flag': final_qf}) return out
[docs]def sha256_pandas_object_hash(obj): """ Compute a hash for a pandas object. No sorting of the object is performed, so an object with the same data in in a different order returns a different hash. Parameters ---------- obj: pandas.Series or pandas.DataFrame Returns ------- str Hex digest of the SHA-256 hash of the individual object row hashes """ return sha256( pd.util.hash_pandas_object(obj).values.tobytes() ).hexdigest()
class ListHandler(logging.Handler): """ A logger handler that appends each log record to a list. """ def __init__(self): super().__init__() self.records = [] def emit(self, record): self.records.append(record) def export_records(self, level=logging.WARNING): """ Convert each log record in the records list with level greater than or equal to `level` to a :py:class:`solarforecastarbiter.datamodel.ReportMessage` and return the tuple of messages. """ out = [] for rec in self.records: if rec.levelno >= level: out.append( datamodel.ReportMessage( message=rec.getMessage(), step=rec.name, level=rec.levelname, function=rec.funcName ) ) return tuple(out) def _get_children(name): return {k for k in logging.getLogger(name).manager.loggerDict.keys() if k.startswith(name + '.')} @contextmanager def hijack_loggers(loggers, level=logging.INFO): """ Context manager to temporarily set the handler of each logger in `loggers`. Parameters ---------- loggers: list of str Loggers to change level: logging LEVEL int Level to set the temporary handler to Returns ------- ListHandler The handler that will be temporarily assigned to each logger. Notes ----- This may not capture all records when used in a distributed or multiprocessing workflow """ from sentry_sdk.integrations import logging as sentry_logging handler = ListHandler() handler.setLevel(level) all_loggers = set() for name in loggers: all_loggers.add(name) all_loggers |= _get_children(name) logger_info = {} for name in all_loggers: logger = logging.getLogger(name) logger_info[name] = (logger.handlers, logger.propagate) logger.handlers = [handler] sentry_logging.ignore_logger(name) logger.propagate = False yield handler for name in all_loggers: logger = logging.getLogger(name) hnd, prop = logger_info[name] logger.handlers = hnd logger.propagate = prop try: sentry_logging._IGNORED_LOGGERS.remove(name) except Exception: pass del handler def _unique_key(try_key, keys, i=0): if try_key in keys: try_key += str(i) i += 1 return _unique_key(try_key, keys, i) else: return try_key
[docs]def generate_continuous_chunks(data, freq): """ Generator to split data into continuous chunks with spacing of freq. Parameters ---------- data : pandas.Series or pandas.DataFrame Data to apply func to. Must have a DatetimeIndex. freq : pd.Timedelta Expected frequency to split data into continuous chunks Yields ------ continuous_data : same as data Each continuous chunk that conforms to freq Raises ------ TypeError If data is not a pandas Series or DataFrame, or does not have a DatetimeIndex ValueError If freq cannot be converted to a pandas.Timedelta Examples -------- The following code would post two forecast series ignoring the missing period in the middle. .. testsetup:: import pandas as pd from solarforecastarbiter.io import api >>> series = pd.Series( ... [1.0, 2.0, 3.0, 7.0, 8.0], ... index=[ ... pd.Timestamp('2020-07-01T01:00Z'), ... pd.Timestamp('2020-07-01T02:00Z'), ... pd.Timestamp('2020-07-01T03:00Z'), ... pd.Timestamp('2020-07-01T07:00Z'), ... pd.Timestamp('2020-07-01T08:00Z'), ... ]) >>> session = api.APISession('token') >>> for cser in generate_continuous_chunks(series, pd.Timedelta('1h')): ... session.post_forecast_values('forecast_id', cser) """ if ( not isinstance(data, (pd.Series, pd.DataFrame)) or not isinstance(data.index, pd.DatetimeIndex) ): raise TypeError( 'data must be a pandas Series or DataFrame with DatetimeIndex') if not isinstance(freq, pd.Timedelta): freq = pd.Timedelta(freq) # first value is NaT, rest are timedeltas index_ser = data.index.to_series() delta = index_ser.diff() if len(delta) < 2: yield data return flags = delta != freq group_key = '_cid' if isinstance(data, pd.DataFrame): keys = data.columns group_key = _unique_key(group_key, keys) data[group_key] = flags.cumsum() elif isinstance(data, pd.Series): keys = data.name data = pd.DataFrame( {keys: data, group_key: flags.cumsum()}) for _, group in data.groupby(group_key): yield group[keys]
[docs]def merge_ranges(ranges): """Generator to merge the ranges like (min_val, max_val) removing any overlap. Results will be sorted in ascending order. The type of values in each range set should have well defined behaviour with the comparison operators, namely >, >=, <, <=. Parameters ---------- ranges: iterable Yields ------ next_value: same type as ranges[0] Raises ------ ValueError If any range is not properly sorted TypeError If any range values cannot be compared Examples -------- .. testsetup:: import pandas as pd from solarforecastarbiter.utils import merge_ranges >>> list(merge_ranges([[0, 1], [9, 15], [-1, 3]])) [[-1, 3], [9, 15]] >>> list(merge_ranges([ ... (pd.Timestamp('2020-01-01T00:00Z'), pd.Timestamp('2020-01-05T12:00Z')), ... (pd.Timestamp('2020-01-02T00:00Z'), pd.Timestamp('2020-01-03T12:00Z')), ... ])) [(pd.Timestamp('2020-01-01T00:00Z'), pd.Timestamp('2020-01-05T12:00Z'))] """ # NOQA if len(ranges) == 0: return ranges type_ = type(ranges[0]) ranges = sorted(ranges) last = list(ranges[0]) for rset in ranges: rset = list(rset) if rset[1] < rset[0]: raise ValueError( 'All ranges must be properly sorted like (min, max)') if not (rset[0] < rset[1] or rset[0] > rset[1] or rset[0] == rset[1]): raise TypeError( f'Cannot properly compare ({rset[0]}, {rset[1]})') if rset[0] <= last[1]: last[1] = max(rset[1], last[1]) else: yield type_(last) last = rset yield type_(last)