"""Collection of Functions to convert API responses into python objects
and vice versa.
"""
from contextlib import contextmanager
from functools import wraps
from inspect import signature
import json
import re
import pandas as pd
def _dataframe_to_json(payload_df):
payload_df.index.name = 'timestamp'
json_vals = payload_df.tz_convert("UTC").reset_index().to_json(
orient="records", date_format='iso', date_unit='s')
return '{"values":' + json_vals + '}'
[docs]def observation_df_to_json_payload(
observation_df, default_quality_flag=None):
"""Extracts a variable from an observation DataFrame and formats it
into a JSON payload for posting to the Solar Forecast Arbiter API.
Parameters
----------
observation_df : DataFrame
Dataframe of observation data. Must contain a tz-aware DateTimeIndex
and a 'value' column. May contain a column of data quality
flags labeled 'quality_flag'.
default_quality_flag : int
If 'quality_flag' is not a column, the quality flag for each row is
set to this value.
Returns
-------
string
SolarForecastArbiter API JSON payload for posting to the observation
endpoint. See Notes section for example.
Notes
-----
Function returns an object in the following format:
.. code::
{
'values': [
{
“timestamp”: “2018-11-22T12:01:48Z”, # ISO 8601 datetime in UTC
“value”: 10.23, # floating point value of observation
“quality_flag”: 0
},...
]
}
Raises
------
KeyError
When 'value' is missing from the columns or 'quality_flag'
is missing and default_quality_flag is None
"""
if default_quality_flag is None:
payload_df = observation_df[['value', 'quality_flag']]
else:
payload_df = observation_df[['value']]
payload_df['quality_flag'] = int(default_quality_flag)
return _dataframe_to_json(payload_df)
[docs]def forecast_object_to_json(forecast_series):
"""
Converts a forecast Series to JSON to post to the
SolarForecastArbiter API.
Parameters
----------
forecast_series : pandas.Series
The series that contains the forecast values with a
datetime index.
Returns
-------
string
The JSON encoded forecast values dict
"""
payload_df = forecast_series.to_frame('value')
return _dataframe_to_json(payload_df)
def _json_to_dataframe(json_payload):
# in the future, might worry about reading the response in chunks
# to stream the data and avoid having it all in memory at once,
# but 30 days of 1 minute data is probably ~4 MB of text. A better
# approach would probably be to switch to a binary format.
vals = json_payload['values']
if len(vals) == 0:
df = pd.DataFrame([], columns=['value', 'quality_flag'],
index=pd.DatetimeIndex([], name='timestamp',
tz='UTC'))
else:
df = pd.DataFrame.from_dict(json_payload['values'])
df.index = pd.to_datetime(df['timestamp'], utc=True,
infer_datetime_format=True)
return df
[docs]def json_payload_to_observation_df(json_payload):
"""
Convert the JSON payload dict as returned by the SolarForecastArbiter API
observations/values endpoint into a DataFrame
Parameters
----------
json_payload : dict
Dictionary as returned by the API with a "values" key which is a list
of dicts like {'timestamp': <timestamp>, 'value': <float>,
'quality_flag': <int>}
Returns
-------
pandas.DataFrame
With a tz-aware DatetimeIndex and ['value', 'quality_flag'] columns
and dtypes {'value': float, 'quality_flag': int}
"""
df = _json_to_dataframe(json_payload)
return df[['value', 'quality_flag']].astype(
{'value': float, 'quality_flag': int})
[docs]def json_payload_to_forecast_series(json_payload):
"""
Convert the JSON payload dict as returned by the SolarForecastArbiter API
forecasts/values endpoing into a Series
Parameters
----------
json_payload : dict
Dictionary as returned by the API with a "values" key which is a list
of dicts like {'timestamp': <timestamp>, 'value': <float>}
Returns
-------
pandas.Series
With a tz-aware DatetimeIndex and float dtype
"""
df = _json_to_dataframe(json_payload)
return df['value'].astype(float)
[docs]def adjust_start_end_for_interval_label(interval_label, start, end,
limit_instant=False):
"""
Adjusts the start and end times depending on the interval_label.
Parameters
----------
interval_label : str or None
The interval label for the the object the data represents
start : pandas.Timestamp
Start time to restrict data to
end : pandas.Timestamp
End time to restrict data to
limit_instant : boolean
If true, an interval label of 'instant' will remove a nanosecond
from end to ensure forecasts do not overlap. If False, instant
returns start, end unmodified
Returns
-------
start, end
Return the adjusted start and end
Raises
------
ValueError
If an invalid interval_label is given
Examples
--------
.. testsetup::
from solarforecastarbiter.io.utils import *
Define input start/end:
>>> start = pd.Timestamp('20190101 1200Z')
>>> end = pd.Timestamp('20190101 1300Z')
Beginning:
>>> adjust_start_end_for_interval_label('beginning', start, end)
(Timestamp('2019-01-01 12:00:00+0000', tz='UTC'), Timestamp('2019-01-01 12:59:59.999999999+0000', tz='UTC'))
Ending:
>>> adjust_start_end_for_interval_label('ending', start, end)
(Timestamp('2019-01-01 12:00:00.000000001+0000', tz='UTC'), Timestamp('2019-01-01 13:00:00+0000', tz='UTC'))
Instantaneous:
>>> adjust_start_end_for_interval_label('instant', start, end)
(Timestamp('2019-01-01 12:00:00+0000', tz='UTC'), Timestamp('2019-01-01 13:00:00+0000', tz='UTC'))
>>> adjust_start_end_for_interval_label('instant', start, end,
... limit_instant=True)
(Timestamp('2019-01-01 12:00:00+0000', tz='UTC'), Timestamp('2019-01-01 12:59:59.999999999+0000', tz='UTC'))
""" # NOQA
if (
interval_label is not None and
interval_label not in ('instant', 'beginning', 'ending')
):
raise ValueError('Invalid interval_label')
if (
interval_label == 'beginning' or
(interval_label == 'instant' and limit_instant)
):
end -= pd.Timedelta(1, unit='nano')
elif interval_label == 'ending':
start += pd.Timedelta(1, unit='nano')
return start, end
[docs]def adjust_timeseries_for_interval_label(data, interval_label, start, end):
"""
Adjusts the index of the data depending on the interval_label, start,
and end. Will always return the data located between start, end.
Parameters
----------
data : pandas.Series or pandas.DataFrame
The data with a localized DatetimeIndex
interval_label : str or None
The interval label for the the object the data represents
start : pandas.Timestamp
Start time to restrict data to
end : pandas.Timestamp
End time to restrict data to
Returns
-------
pandas.Series or pandas.DataFrame
Return data between start and end, in/excluding the endpoints
depending on interval_label
Raises
------
ValueError
If an invalid interval_label is given or data is not localized.
"""
start, end = adjust_start_end_for_interval_label(interval_label, start,
end)
data = data.sort_index(axis=0)
# pandas >= 0.25.1 requires start, end to have same tzinfo.
# unexpected behavior when data is not localized, so prevent that
if data.empty:
return data
if data.index.tzinfo is None:
raise ValueError('data must be localized')
start = start.tz_convert(data.index.tzinfo)
end = end.tz_convert(data.index.tzinfo)
return data.loc[start:end]
[docs]def serialize_timeseries(ser):
"""
Serialize a timeseries to JSON. Note that the microseconds
portion of the index will be discarded.
Parameters
----------
ser : {pandas.Series, pandas.DataFrame}
Must have a tz-localized datetime index
Returns
-------
str
The JSON serialized data along with a schema
Raises
------
TypeError
If the input is invalid
"""
if not (
isinstance(ser, (pd.Series, pd.DataFrame)) and
isinstance(ser.index, pd.DatetimeIndex) and
ser.index.tzinfo is not None
):
raise TypeError(
'Only pandas Series or DataFrame with a localized DatetimeIndex '
'is supported')
v = ser.copy()
v.index.name = 'timestamp'
if isinstance(v, pd.Series):
jsonvals = v.tz_convert('UTC').reset_index(name='value').to_json(
orient='records', date_format='iso', date_unit='s')
column = 'value'
dtype = str(v.dtype)
objtype = 'Series'
else:
v.index.name = 'timestamp'
jsonvals = v.tz_convert('UTC').reset_index().to_json(
orient='records', date_format='iso', date_unit='s')
column = v.columns.astype(str).to_list()
dtype = v.dtypes.astype(str).to_list()
objtype = 'DataFrame'
schema = {
'version': 1,
'orient': 'records',
'timezone': 'UTC',
'column': column,
'index': 'timestamp',
'dtype': dtype,
'objtype': objtype
}
out = '{"schema":' + json.dumps(schema) + ',"data":' + jsonvals + '}'
return out
[docs]def deserialize_timeseries(data):
"""
Deserializes a timeseries from JSON
Parameters
----------
data : str
JSON string to deserialize. Must have schema and data keys.
Returns
-------
pandas.Series or pandas.DataFrame
Deserialized timeseries
Raises
------
ValueError
If "schema" or "data" keys are not found in the JSON string
KeyError
If the schema object does not contain the proper keys
"""
schema_str = re.search('(?<="schema":)\\s*{[^{}]*}\\s*(?=(,|}))', data)
if schema_str is None:
raise ValueError('Could not locate schema in data string')
schema = json.loads(schema_str.group(0))
if schema['version'] == 0:
# compatibility with data serialized and stored before the
# objtype key was added to schema and DataFrames were suppored in v1
objtype_str = 'Series'
else:
objtype_str = schema['objtype']
# find between "data": and , or }, with only one set of []
data_str = re.search('(?<="data":)\\s*\\[[^\\[\\]]*\\](?=\\s*(,|}))', data)
if data_str is None:
raise ValueError('Could not locate data key in data string')
df = pd.read_json(data_str.group(0), orient=schema['orient'],
convert_dates=True)
if objtype_str == 'Series':
if df.empty:
return pd.Series([], name=schema['column'], index=pd.DatetimeIndex(
[], tz=schema['timezone'], name='timestamp'),
dtype=schema.get('dtype', float))
out = df.set_index(schema['index'])[schema['column']].astype(
schema['dtype'])
elif objtype_str == 'DataFrame':
if df.empty:
try:
dtype = schema['dtype'][0]
except IndexError:
dtype = float
return pd.DataFrame(
[], columns=schema['column'], index=pd.DatetimeIndex(
[], tz=schema['timezone'], name='timestamp'), dtype=dtype)
out = df.set_index(schema['index'])
# pd.read_json will set all column names to strings, so
# columns originally specified with float names need to be
# mapped back into the right name dtype. this code is not needed
# if columns are always strings.
# str_col_map = {str(col): col for col in schema['column']}
# out = out.rename(columns=str_col_map)
out = out.astype(dict(zip(schema['column'], schema['dtype'])))
if out.index.tzinfo is None:
out = out.tz_localize(schema['timezone'])
return out
class HiddenToken:
"""
Obscure the representation of the input string `token` to avoid saving
or displaying access tokens in logs.
"""
def __init__(self, token):
self.token = str(token) # make sure it isn't a localproxy
def __repr__(self):
return '****ACCESS*TOKEN****'
[docs]def ensure_timestamps(*time_args):
"""
Decorator that converts the specified time arguments of the wrapped
function to pandas.Timestamp objects
Parameters
----------
strings
Function arguments to convert to pandas.Timestamp before
executing function
Raises
------
ValueError
If any of time_args cannot be converted to pandas.Timestamp
Examples
--------
.. testsetup::
import datetime as dt
from solarforecastarbiter.io.utils import *
>>> @ensure_timestamps('start', 'end')
... def get_values(start, end, other_arg):
... # do stuff with start, end assumed to be pandas.Timestamps
... if isinstance(start, pd.Timestamp):
... return True
>>> get_values('2019-01-01T00:00Z', dt.datetime(2019, 1, 2, 12), 'other')
True
"""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
sig = signature(f)
inds = {k: None for k in time_args}
for i, k in enumerate(sig.parameters.keys()):
if k in inds:
inds[k] = i
nargs = list(args)
for k, ind in inds.items():
if k in kwargs:
kwargs[k] = pd.Timestamp(kwargs[k])
elif ind is not None:
try:
nargs[ind] = pd.Timestamp(args[ind])
except IndexError:
pass
sig.bind(*nargs, **kwargs)
return f(*nargs, **kwargs)
return wrapper
return decorator
[docs]def load_report_values(raw_report, values):
"""
Load the processed forecast/observation data into the
datamodel.ProcessedForecastObservation objects of the raw_report.
Parameters
----------
raw_report : datamodel.RawReport
The raw report with processed_forecasts_observations to
be replaced
values : list
The report values dict as returned by the API.
Returns
-------
tuple
Of datamodel.ProcessedForecastObservation with values loaded into
`forecast_values` and `observation_values`
"""
val_dict = {v['id']: v['processed_values'] for v in values}
out = []
for fxobs in raw_report.processed_forecasts_observations:
fx_vals = val_dict.get(fxobs.forecast_values, None)
if fx_vals is not None:
fx_vals = deserialize_timeseries(fx_vals)
obs_vals = val_dict.get(fxobs.observation_values, None)
if obs_vals is not None:
obs_vals = deserialize_timeseries(obs_vals)
ref_fx_vals = val_dict.get(fxobs.reference_forecast_values)
if ref_fx_vals is not None:
ref_fx_vals = deserialize_timeseries(ref_fx_vals)
new_fxobs = fxobs.replace(forecast_values=fx_vals,
observation_values=obs_vals,
reference_forecast_values=ref_fx_vals)
out.append(new_fxobs)
return tuple(out)
[docs]@contextmanager
def mock_raw_report_endpoints(base_url):
"""
Mock API report endpoints under base_url to enable testing
of the report generation task run via the dashboard. Requires
requests_mock>=1.8.0. Catches all report endpoints, but if
report generation requires POSTing to other endpoints in the
future, they will need to be added here.
"""
import requests_mock
value_dict = {} # for raw processed values
def post_value_callback(request, context):
context.status_code = 200
rjson = request.json()
id_ = rjson['object_id']
value_dict[id_] = rjson['processed_values']
return id_
raw_dict = {} # for the raw reports
def post_raw_callback(request, context):
context.status_code = 200
raw_dict.update(request.json())
report_dict = {} # for new and full reports
def post_report_callback(request, context):
context.status_code = 200
report_dict.update(request.json())
return 'no_id'
def get_report_callback(request, context):
context.status_code = 200
out = report_dict.copy()
if raw_dict:
out['raw_report'] = raw_dict
out['values'] = [
{'id': k, 'processed_values': v}
for k, v in value_dict.items()]
out['status'] = 'complete'
return out
with requests_mock.Mocker(real_http=True) as m:
value_re = re.compile(f'{base_url}/reports/.*/values')
raw_re = re.compile(f'{base_url}/reports/.*/raw')
m.register_uri('GET', re.compile(
f'{base_url}/reports/.*'),
json=get_report_callback)
m.register_uri('POST', re.compile(
f'{base_url}/reports/*'),
text=post_report_callback)
m.register_uri('POST', re.compile(
f'{base_url}/reports/.*/status/.*'))
m.register_uri('POST', value_re, text=post_value_callback)
m.register_uri('POST', raw_re, json=post_raw_callback)
yield