"""
Functions to connect to and process data from SolarForecastArbiter API
"""
import json
import logging
import requests
from urllib3 import Retry
from solarforecastarbiter import datamodel
from solarforecastarbiter.io.utils import (
json_payload_to_observation_df,
json_payload_to_forecast_series,
observation_df_to_json_payload,
forecast_object_to_json,
adjust_timeseries_for_interval_label,
serialize_data, deserialize_data,
serialize_raw_report, deserialize_raw_report,
HiddenToken)
BASE_URL = 'https://api.solarforecastarbiter.org'
logger = logging.getLogger(__name__)
[docs]def request_cli_access_token(user, password):
req = requests.post(
'https://solarforecastarbiter.auth0.com/oauth/token',
data={'grant_type': 'password', 'username': user,
'audience': BASE_URL,
'password': password,
'client_id': 'c16EJo48lbTCQEhqSztGGlmxxxmZ4zX7'})
req.raise_for_status()
return req.json()['access_token']
[docs]class APISession(requests.Session):
"""
Subclass of requests.Session to handle requets to the SolarForecastArbiter
API. The Session provides connection pooling, automatic retries for certain
types of requets, default timeouts, and a default base url. Responses are
converted into the appropriate class from datamodel.py or a pandas object.
Parameters
----------
access_token : string or HiddenToken
The base64 encoded Bearer token to authenticate with the API
default_timeout : float or tuple, optional
A default timeout to add to all requests. If a tuple, the first element
is the connection timeout and the second is the read timeout.
Default is 10 seconds for connection and 60 seconds to read from the
server.
base_url : string
URL to use as the base for endpoints to APISession
"""
[docs] def __init__(self, access_token, default_timeout=(10, 60),
base_url=None):
super().__init__()
if isinstance(access_token, HiddenToken):
access_token = access_token.token
self.headers = {'Authorization': f'Bearer {access_token}',
'Accept': 'application/json',
'Accept-Encoding': 'gzip,deflate'}
self.default_timeout = default_timeout
self.base_url = base_url or BASE_URL
# set requests to automatically retry
retries = Retry(total=10, connect=3, read=3, status=3,
status_forcelist=[408, 423, 444, 500, 501, 502, 503,
504, 507, 508, 511, 599],
backoff_factor=0.5,
raise_on_status=False,
remove_headers_on_redirect=[])
adapter = requests.adapters.HTTPAdapter(max_retries=retries)
self.mount(self.base_url, adapter)
[docs] def request(self, method, url, *args, **kwargs):
"""
Modify the default Session.request to add in the default timeout
and make requests relative to the base_url. Users will likely
use the standard get and post methods instead of calling this directly.
Raises
------
requests.exceptions.HTTPError
When an error is encountered in when making the request to the API
"""
if url.startswith('/'):
url = f'{self.base_url}{url}'
else:
url = f'{self.base_url}/{url}'
# set a defautl timeout so we never hang indefinitely
if 'timeout' not in kwargs:
kwargs['timeout'] = self.default_timeout
result = super().request(method, url, *args, **kwargs)
if result.status_code >= 400:
raise requests.exceptions.HTTPError(
f'{result.status_code} API Request Error: {result.reason} for '
f'url: {result.url} and text: {result.text}',
response=result)
return result
def _process_site_dict(self, site_dict):
if (
site_dict.get('modeling_parameters', {}).get(
'tracking_type', '') in ('fixed', 'single_axis')
):
return datamodel.SolarPowerPlant.from_dict(site_dict)
else:
return datamodel.Site.from_dict(site_dict)
[docs] def get_site(self, site_id):
"""
Retrieve site metadata for site_id from the API and process
into the proper model.
Parameters
----------
site_id : string
UUID of the site to retrieve metadata for
Returns
-------
datamodel.Site or datamodel.SolarPowerPlant
Dataclass with all the metadata for the site depending on if
the Site is a power plant with modeling parameters or not.
"""
req = self.get(f'/sites/{site_id}')
site_dict = req.json()
return self._process_site_dict(site_dict)
[docs] def list_sites(self):
"""
List all the sites available to a user.
Returns
-------
list of datamodel.Sites and datamodel.SolarPowerPlants
"""
req = self.get('/sites/')
return [self._process_site_dict(site_dict)
for site_dict in req.json()]
[docs] def create_site(self, site):
"""
Create a new site in the API with the given Site model
Parameters
----------
site : datamodel.Site or datamodel.SolarPowerPlant
Site to create in the API
Returns
-------
datamodel.Site or datamodel.SolarPowerPlant
With the appropriate parameters such as site_id set by the API
"""
site_dict = site.to_dict()
for k in ('site_id', 'provider'):
site_dict.pop(k, None)
site_json = json.dumps(site_dict)
req = self.post('/sites/', data=site_json,
headers={'Content-Type': 'application/json'})
new_id = req.text
return self.get_site(new_id)
[docs] def get_observation(self, observation_id):
"""
Get the metadata from the API for the a given observation_id
in an Observation object.
Parameters
----------
observation_id : string
UUID of the observation to retrieve
Returns
-------
datamodel.Observation
"""
req = self.get(f'/observations/{observation_id}/metadata')
obs_dict = req.json()
site = self.get_site(obs_dict['site_id'])
obs_dict['site'] = site
return datamodel.Observation.from_dict(obs_dict)
[docs] def list_observations(self):
"""
List the observations a user has access to.
Returns
-------
list of datamodel.Observation
"""
req = self.get('/observations/')
obs_dicts = req.json()
if len(obs_dicts) == 0:
return []
sites = {site.site_id: site for site in self.list_sites()}
out = []
for obs_dict in obs_dicts:
obs_dict['site'] = sites.get(obs_dict['site_id'])
out.append(datamodel.Observation.from_dict(obs_dict))
return out
[docs] def create_observation(self, observation):
"""
Create a new observation in the API with the given Observation model
Parameters
----------
observation : datamodel.Observation
Observation to create in the API
Returns
-------
datamodel.Observation
With the appropriate parameters such as observation_id set by the
API
"""
obs_dict = observation.to_dict()
obs_dict.pop('observation_id')
site = obs_dict.pop('site')
obs_dict['site_id'] = site['site_id']
obs_json = json.dumps(obs_dict)
req = self.post('/observations/', data=obs_json,
headers={'Content-Type': 'application/json'})
new_id = req.text
return self.get_observation(new_id)
[docs] def get_forecast(self, forecast_id):
"""
Get Forecast metadata from the API for the given forecast_id
Parameters
----------
forecast_id : string
UUID of the forecast to get metadata for
Returns
-------
datamodel.Forecast
"""
req = self.get(f'/forecasts/single/{forecast_id}/metadata')
fx_dict = req.json()
site = self.get_site(fx_dict['site_id'])
fx_dict['site'] = site
return datamodel.Forecast.from_dict(fx_dict)
[docs] def list_forecasts(self):
"""
List all Forecasts a user has access to.
Returns
-------
list of datamodel.Forecast
"""
req = self.get('/forecasts/single/')
fx_dicts = req.json()
if len(fx_dicts) == 0:
return []
sites = {site.site_id: site for site in self.list_sites()}
out = []
for fx_dict in fx_dicts:
fx_dict['site'] = sites.get(fx_dict['site_id'])
out.append(datamodel.Forecast.from_dict(fx_dict))
return out
[docs] def create_forecast(self, forecast):
"""
Create a new forecast in the API with the given Forecast model
Parameters
----------
forecast : datamodel.Forecast
Forecast to create in the API
Returns
-------
datamodel.Forecast
With the appropriate parameters such as forecast_id set by the API
"""
fx_dict = forecast.to_dict()
fx_dict.pop('forecast_id')
site = fx_dict.pop('site')
fx_dict['site_id'] = site['site_id']
fx_json = json.dumps(fx_dict)
req = self.post('/forecasts/single/', data=fx_json,
headers={'Content-Type': 'application/json'})
new_id = req.text
return self.get_forecast(new_id)
[docs] def get_observation_values(self, observation_id, start, end,
interval_label=None):
"""
Get observation values from start to end for observation_id from the
API
Parameters
----------
observation_id : string
UUID of the observation object.
start : timelike object
Start time in interval to retrieve values for
end : timelike object
End time of the interval
interval_label : str or None
If beginning, ending, adjust the data to return only data that is
valid between start and end. If None or instant, return any data
between start and end inclusive of the endpoints.
Returns
-------
pandas.DataFrame
With a datetime index and (value, quality_flag) columns
"""
req = self.get(f'/observations/{observation_id}/values',
params={'start': start, 'end': end})
out = json_payload_to_observation_df(req.json())
return adjust_timeseries_for_interval_label(
out, interval_label, start, end)
[docs] def get_forecast_values(self, forecast_id, start, end,
interval_label=None):
"""
Get forecast values from start to end for forecast_id
Parameters
----------
forecast_id : string
UUID of the forecast object
start : timelike object
Start of the interval to retrieve values for
end : timelike object
End of the interval
interval_label : str or None
If beginning, ending, adjust the data to return only data that is
valid between start and end. If None or instant, return any data
between start and end inclusive of the endpoints.
Returns
-------
pandas.Series
With the forecast values and a datetime index
"""
req = self.get(f'/forecasts/single/{forecast_id}/values',
params={'start': start, 'end': end})
out = json_payload_to_forecast_series(req.json())
return adjust_timeseries_for_interval_label(
out, interval_label, start, end)
[docs] def post_observation_values(self, observation_id, observation_df,
params=None):
"""
Upload the given observation values to the appropriate observation_id
of the API.
Parameters
----------
observation_id : string
UUID of the observation to add values for
observation_df : pandas.DataFrame
Dataframe with a datetime index and the (required) value and
quality_flag columns to upload to the API.
params : dict, list, string, default None
Parameters passed through POST request. Types are the same as
Requests <https://2.python-requests.org/en/master/api/#requests.Request>
""" # NOQA
json_vals = observation_df_to_json_payload(observation_df)
self.post(f'/observations/{observation_id}/values',
data=json_vals, params=params,
headers={'Content-Type': 'application/json'})
[docs] def post_forecast_values(self, forecast_id, forecast_series):
"""
Upload the given forecast values to the appropriate forecast_id of the
API
Parameters
----------
forecast_id : string
UUID of the forecast to upload values to
forecast_obj : pandas.Series
Pandas series with a datetime index that contains the values to
upload to the API
"""
json_vals = forecast_object_to_json(forecast_series)
self.post(f'/forecasts/single/{forecast_id}/values',
data=json_vals,
headers={'Content-Type': 'application/json'})
def _process_report_dict(self, rep_dict):
req_dict = rep_dict['report_parameters']
for key in ('name', 'report_id', 'status'):
req_dict[key] = rep_dict[key]
req_dict['metrics'] = tuple(req_dict['metrics'])
req_dict['forecast_observations'] = tuple([
datamodel.ForecastObservation(self.get_forecast(o[0]),
self.get_observation(o[1]))
for o in req_dict['object_pairs']])
return datamodel.Report.from_dict(req_dict)
def get_report(self, report_id):
"""
Get the metadata, and possible raw report if it has processed,
from the API for the given report_id in a Report object.
Parameters
----------
report_id : string
UUID of the report to retrieve
Returns
-------
datamodel.Report
"""
req = self.get(f'/reports/{report_id}')
resp = req.json()
raw = resp.pop('raw_report')
report = self._process_report_dict(resp)
if raw is not None:
raw_report = deserialize_raw_report(raw)
processed_fxobs = self.get_raw_report_processed_data(
report_id, raw_report, resp['values'])
report = report.replace(raw_report=raw_report.replace(
processed_forecasts_observations=processed_fxobs))
return report
def list_reports(self):
"""
List the reports a user has access to. Does not load the raw
report data, use :py:meth:`~.APISession.get_report`.
Returns
-------
list of datamodel.Report
"""
req = self.get('/reports')
rep_dicts = req.json()
if len(rep_dicts) == 0:
return []
out = []
for rep_dict in rep_dicts:
out.append(self._process_report_dict(rep_dict))
return out
def create_report(self, report):
"""
Post the report request to the API. A completed report should post
the raw_report with :py:meth:`~.APISession.post_raw_report`.
Parameters
----------
report : datamodel.Report
Returns
-------
datamodel.Report
As returned by the API
"""
report_dict = report.to_dict()
report_dict.pop('report_id')
name = report_dict.pop('name')
for key in ('raw_report', '__version__', 'status'):
del report_dict[key]
report_dict['filters'] = []
fxobs = report_dict.pop('forecast_observations')
report_dict['object_pairs'] = [
(_fo['forecast']['forecast_id'],
_fo['observation']['observation_id'])
for _fo in fxobs]
params = {'name': name,
'report_parameters': report_dict}
req = self.post('/reports/', json=params,
headers={'Content-Type': 'application/json'})
new_id = req.text
return self.get_report(new_id)
def post_raw_report_processed_data(self, report_id, raw_report):
"""
Post the processed data that was used to make the report to the
API.
Parameters
----------
report_id : str
ID of the report to post values to
raw_report : datamodel.RawReport
The raw report object with processed_forecasts_observations
Returns
-------
tuple
of datamodel.ProcessedForecastObservation with `forecast_values`
and `observations_values` replaced with report value IDs for later
retrieval
"""
posted_fxobs = []
for fxobs in raw_report.processed_forecasts_observations:
fx_data = {
'object_id': fxobs.original.forecast.forecast_id,
'processed_values': serialize_data(fxobs.forecast_values)}
fx_post = self.post(
f'/reports/{report_id}/values',
json=fx_data, headers={'Content-Type': 'application/json'})
obs_data = {
'object_id': fxobs.original.observation.observation_id,
'processed_values': serialize_data(fxobs.observation_values)}
obs_post = self.post(
f'/reports/{report_id}/values',
json=obs_data, headers={'Content-Type': 'application/json'})
processed_fx_id = fx_post.text
processed_obs_id = obs_post.text
new_fxobs = fxobs.replace(forecast_values=processed_fx_id,
observation_values=processed_obs_id)
posted_fxobs.append(new_fxobs)
return tuple(posted_fxobs)
def get_raw_report_processed_data(self, report_id, raw_report,
values=None):
"""
Load the processed forecast/observation data into the
datamodel.ProcessedForecastObservation objects of the raw_report.
Parameters
----------
report_id : str
ID of the report that values will be loaded from
raw_report : datamodel.RawReport
The raw report with processed_forecasts_observations to
be replaced
values : list or None
The report values dict as returned by the API. If None, fetch
the values from the API for the given report_id
Returns
-------
tuple
Of datamodel.ProcessedForecastObservation with values loaded into
`forecast_values` and `observation_values`
"""
if values is None:
val_req = self.get(f'/reports/{report_id}/values')
values = val_req.json()
val_dict = {v['id']: v['processed_values'] for v in values}
out = []
for fxobs in raw_report.processed_forecasts_observations:
fx_vals = val_dict.get(fxobs.forecast_values, None)
if fx_vals is not None:
fx_vals = deserialize_data(fx_vals)
obs_vals = val_dict.get(fxobs.observation_values, None)
if obs_vals is not None:
obs_vals = deserialize_data(obs_vals)
new_fxobs = fxobs.replace(forecast_values=fx_vals,
observation_values=obs_vals)
out.append(new_fxobs)
return tuple(out)
def post_raw_report(self, report_id, raw_report):
"""
Update the report with the raw report and metrics
Parameters
----------
report_id : str
ID of the report to update
raw_report : datamodel.RawReport
The raw report object to add to the report
"""
posted_fxobs = self.post_raw_report_processed_data(
report_id, raw_report)
to_post = raw_report.replace(
processed_forecasts_observations=posted_fxobs)
compressed_bundle = serialize_raw_report(to_post)
# metrics not really meaningful right now as JSON
self.post(f'/reports/{report_id}/metrics',
json={'metrics': {}, 'raw_report': compressed_bundle},
headers={'Content-Type': 'application/json'})
self.update_report_status(report_id, 'complete')
def update_report_status(self, report_id, status):
"""
Update the status of the report
Parameters
----------
report_id : str
ID of the report to update
status : str
New status of the report
"""
self.post(f'/reports/{report_id}/status/{status}')