Source code for solarforecastarbiter.io.utils

# coding: utf-8

"""Collection of Functions to convert API responses into python objects
and vice versa.
"""
import base64
from functools import wraps
from inspect import signature
import zlib


import pandas as pd
import pyarrow as pa


from solarforecastarbiter import datamodel


def _dataframe_to_json(payload_df):
    payload_df.index.name = 'timestamp'
    json_vals = payload_df.tz_convert("UTC").reset_index().to_json(
        orient="records", date_format='iso', date_unit='s')
    return '{"values":' + json_vals + '}'


[docs]def observation_df_to_json_payload( observation_df, default_quality_flag=None): """Extracts a variable from an observation DataFrame and formats it into a JSON payload for posting to the Solar Forecast Arbiter API. Parameters ---------- observation_df : DataFrame Dataframe of observation data. Must contain a tz-aware DateTimeIndex and a 'value' column. May contain a column of data quality flags labeled 'quality_flag'. default_quality_flag : int If 'quality_flag' is not a column, the quality flag for each row is set to this value. Returns ------- string SolarForecastArbiter API JSON payload for posting to the observation endpoint. See Notes section for example. Notes ----- Function returns an object in the following format: .. code:: { 'values': [ { “timestamp”: “2018-11-22T12:01:48Z”, # ISO 8601 datetime in UTC “value”: 10.23, # floating point value of observation “quality_flag”: 0 },... ] } Raises ------ KeyError When 'value' is missing from the columns or 'quality_flag' is missing and default_quality_flag is None """ if default_quality_flag is None: payload_df = observation_df[['value', 'quality_flag']] else: payload_df = observation_df[['value']] payload_df['quality_flag'] = int(default_quality_flag) return _dataframe_to_json(payload_df)
[docs]def forecast_object_to_json(forecast_series): """ Converts a forecast Series to JSON to post to the SolarForecastArbiter API. Parameters ---------- forecast_series : pandas.Series The series that contains the forecast values with a datetime index. Returns ------- string The JSON encoded forecast values dict """ payload_df = forecast_series.to_frame('value') return _dataframe_to_json(payload_df)
def _json_to_dataframe(json_payload): # in the future, might worry about reading the response in chunks # to stream the data and avoid having it all in memory at once, # but 30 days of 1 minute data is probably ~4 MB of text. A better # approach would probably be to switch to a binary format. vals = json_payload['values'] if len(vals) == 0: df = pd.DataFrame([], columns=['value', 'quality_flag'], index=pd.DatetimeIndex([], name='timestamp')) else: df = pd.DataFrame.from_dict(json_payload['values']) df.index = pd.to_datetime(df['timestamp'], utc=True, infer_datetime_format=True) return df
[docs]def json_payload_to_observation_df(json_payload): """ Convert the JSON payload dict as returned by the SolarForecastArbiter API observations/values endpoint into a DataFrame Parameters ---------- json_payload : dict Dictionary as returned by the API with a "values" key which is a list of dicts like {'timestamp': <timestamp>, 'value': <float>, 'quality_flag': <int>} Returns ------- pandas.DataFrame With a tz-aware DatetimeIndex and ['value', 'quality_flag'] columns """ df = _json_to_dataframe(json_payload) return df[['value', 'quality_flag']]
[docs]def json_payload_to_forecast_series(json_payload): """ Convert the JSON payload dict as returned by the SolarForecastArbiter API forecasts/values endpoing into a Series Parameters ---------- json_payload : dict Dictionary as returned by the API with a "values" key which is a list of dicts like {'timestamp': <timestamp>, 'value': <float>} Returns ------- pandas.Series With a tz-aware DatetimeIndex """ df = _json_to_dataframe(json_payload) return df['value']
[docs]def adjust_start_end_for_interval_label(interval_label, start, end, limit_instant=False): """ Adjusts the start and end times depending on the interval_label. Parameters ---------- interval_label : str or None The interval label for the the object the data represents start : pandas.Timestamp Start time to restrict data to end : pandas.Timestamp End time to restrict data to limit_instant : boolean If true, an interval label of 'instant' will remove a nanosecond from end to ensure forecasts do not overlap. If False, instant returns start, end unmodified Returns ------- start, end Return the adjusted start and end Raises ------ ValueError If an invalid interval_label is given Examples -------- .. testsetup:: from solarforecastarbiter.io.utils import * Define input start/end: >>> start = pd.Timestamp('20190101 1200Z') >>> end = pd.Timestamp('20190101 1300Z') Beginning: >>> adjust_start_end_for_interval_label('beginning', start, end) (Timestamp('2019-01-01 12:00:00+0000', tz='UTC'), Timestamp('2019-01-01 12:59:59.999999999+0000', tz='UTC')) Ending: >>> adjust_start_end_for_interval_label('ending', start, end) (Timestamp('2019-01-01 12:00:00.000000001+0000', tz='UTC'), Timestamp('2019-01-01 13:00:00+0000', tz='UTC')) Instantaneous: >>> adjust_start_end_for_interval_label('instant', start, end) (Timestamp('2019-01-01 12:00:00+0000', tz='UTC'), Timestamp('2019-01-01 13:00:00+0000', tz='UTC')) >>> adjust_start_end_for_interval_label('instant', start, end, ... limit_instant=True) (Timestamp('2019-01-01 12:00:00+0000', tz='UTC'), Timestamp('2019-01-01 12:59:59.999999999+0000', tz='UTC')) """ # NOQA if ( interval_label is not None and interval_label not in ('instant', 'beginning', 'ending') ): raise ValueError('Invalid interval_label') if ( interval_label == 'beginning' or (interval_label == 'instant' and limit_instant) ): end -= pd.Timedelta(1, unit='nano') elif interval_label == 'ending': start += pd.Timedelta(1, unit='nano') return start, end
[docs]def adjust_timeseries_for_interval_label(data, interval_label, start, end): """ Adjusts the index of the data depending on the interval_label, start, and end. Will always return the data located between start, end. Parameters ---------- data : pandas.Series or pandas.DataFrame The data with a localized DatetimeIndex interval_label : str or None The interval label for the the object the data represents start : pandas.Timestamp Start time to restrict data to end : pandas.Timestamp End time to restrict data to Returns ------- pandas.Series or pandas.DataFrame Return data between start and end, in/excluding the endpoints depending on interval_label Raises ------ ValueError If an invalid interval_label is given or data is not localized. """ start, end = adjust_start_end_for_interval_label(interval_label, start, end) data = data.sort_index(axis=0) # pandas >= 0.25.1 requires start, end to have same tzinfo. # unexpected behavior when data is not localized, so prevent that if data.empty: return data if data.index.tzinfo is None: raise ValueError('data must be localized') start = start.tz_convert(data.index.tzinfo) end = end.tz_convert(data.index.tzinfo) return data.loc[start:end]
def serialize_data(values): serialized_buf = pa.serialize(values).to_buffer() compressed_bytes = zlib.compress(serialized_buf) encoded = base64.b64encode(compressed_bytes) return encoded.decode('ascii') # bytes to str def deserialize_data(data): compressed = base64.b64decode(data) serialized = zlib.decompress(compressed) values = pa.deserialize(serialized) return values def serialize_raw_report(raw): bundle = {'metrics': raw.metrics, 'template': raw.template, 'metadata': raw.metadata.to_dict(), 'processed_forecasts_observations': [ pfx.to_dict() for pfx in raw.processed_forecasts_observations]} return serialize_data(bundle) def deserialize_raw_report(encoded_bundle, version=0): bundle = deserialize_data(encoded_bundle) return datamodel.RawReport.from_dict(bundle) class HiddenToken: """ Obscure the representation of the input string `token` to avoid saving or displaying access tokens in logs. """ def __init__(self, token): self.token = str(token) # make sure it isn't a localproxy def __repr__(self): return '****ACCESS*TOKEN****'
[docs]def ensure_timestamps(*time_args): """ Decorator that converts the specified time arguments of the wrapped function to pandas.Timestamp objects Parameters ---------- strings Function arguments to convert to pandas.Timestamp before executing function Raises ------ ValueError If any of time_args cannot be converted to pandas.Timestamp Examples -------- .. testsetup:: import datetime as dt from solarforecastarbiter.io.utils import * >>> @ensure_timestamps('start', 'end') ... def get_values(start, end, other_arg): ... # do stuff with start, end assumed to be pandas.Timestamps ... if isinstance(start, pd.Timestamp): ... return True >>> get_values('2019-01-01T00:00Z', dt.datetime(2019, 1, 2, 12), 'other') True """ def decorator(f): @wraps(f) def wrapper(*args, **kwargs): sig = signature(f) inds = {k: None for k in time_args} for i, k in enumerate(sig.parameters.keys()): if k in inds: inds[k] = i nargs = list(args) for k, ind in inds.items(): if k in kwargs: kwargs[k] = pd.Timestamp(kwargs[k]) elif ind is not None: nargs[ind] = pd.Timestamp(args[ind]) return f(*nargs, **kwargs) return wrapper return decorator