"""
Make benchmark irradiance and power forecasts.
The functions in this module use the
:py:mod:`solarforecastarbiter.datamodel` objects.
"""
from collections import namedtuple, defaultdict
import itertools
import json
import logging
import re
import pandas as pd
from solarforecastarbiter import datamodel, pvmodel
from solarforecastarbiter.utils import generate_continuous_chunks
from solarforecastarbiter.io import api
from solarforecastarbiter.io.fetch import nwp as fetch_nwp
from solarforecastarbiter.io.utils import adjust_timeseries_for_interval_label
from solarforecastarbiter.reference_forecasts import persistence, models, utils
logger = logging.getLogger(__name__)
[docs]def run_nwp(forecast, model, run_time, issue_time):
"""
Calculate benchmark irradiance and power forecasts for a Forecast or
ProbabilisticForecast.
Forecasts may be run operationally or retrospectively. For
operational forecasts, *run_time* is typically set to now. For
retrospective forecasts, *run_time* is the time by which the
forecast should be run so that it could have been be delivered for
the *issue_time*. Forecasts will only use data with timestamps
before *run_time*.
Parameters
----------
forecast : datamodel.Forecast or datamodel.ProbabilisticForecast
The metadata of the desired forecast.
model : function
NWP model loading and processing function.
See :py:mod:`solarforecastarbiter.reference_forecasts.models`
for options.
run_time : pd.Timestamp
Run time of the forecast.
issue_time : pd.Timestamp
Issue time of the forecast run.
Returns
-------
ghi : pd.Series or pd.DataFrame
dni : pd.Series or pd.DataFrame
dhi : pd.Series or pd.DataFrame
air_temperature : pd.Series or pd.DataFrame
wind_speed : pd.Series or pd.DataFrame
ac_power : pd.Series or pd.DataFrame
Series are returned for deterministic forecasts, DataFrames are
returned for probabilisic forecasts.
Examples
--------
The following code would return hourly average forecasts derived
from the subhourly HRRR model.
.. testsetup::
import datetime
from solarforecastarbiter import datamodel
from solarforecastarbiter.reference_forecasts import models
from solarforecastarbiter.reference_forecasts.main import *
>>> run_time = pd.Timestamp('20190515T0200Z')
>>> issue_time = pd.Timestamp('20190515T0000Z')
>>> modeling_parameters = datamodel.FixedTiltModelingParameters(
... surface_tilt=30, surface_azimuth=180,
... ac_capacity=10, dc_capacity=15,
... temperature_coefficient=-0.4, dc_loss_factor=0,
... ac_loss_factor=0)
>>> power_plant = datamodel.SolarPowerPlant(
... name='Test plant', latitude=32.2, longitude=-110.9,
... elevation=715, timezone='America/Phoenix',
... modeling_parameters=modeling_parameters)
>>> forecast = datamodel.Forecast(
... name='Test plant fx',
... site=power_plant,
... variable='ac_power',
... interval_label='ending',
... interval_value_type='interval_mean',
... interval_length='1h',
... issue_time_of_day=datetime.time(hour=0),
... run_length=pd.Timedelta('24h'),
... lead_time_to_start=pd.Timedelta('0h'))
>>> ghi, dni, dhi, temp_air, wind_speed, ac_power = run_nwp(
... forecast, models.hrrr_subhourly_to_hourly_mean,
... run_time, issue_time)
"""
fetch_metadata = fetch_nwp.model_map[models.get_nwp_model(model)]
# absolute date and time for model run most recently available
# as of run_time
init_time = utils.get_init_time(run_time, fetch_metadata)
# absolute start and end times. interval_label still controls
# inclusive/exclusive
start, end = utils.get_forecast_start_end(forecast, issue_time)
site = forecast.site
logger.info(
'Calculating forecast for model %s starting at %s from %s to %s',
model, init_time, start, end)
# model will account for interval_label
*forecasts, resampler, solar_position_calculator = model(
site.latitude, site.longitude, site.elevation,
init_time, start, end, forecast.interval_label)
if isinstance(site, datamodel.SolarPowerPlant):
solar_position = solar_position_calculator()
if isinstance(forecasts[0], pd.DataFrame):
# must iterate over columns because pvmodel.irradiance_to_power
# calls operations that do not properly broadcast Series along
# a DataFrame time index. pvlib.irradiance.haydavies operation
# (AI = dni_ens / dni_extra) is the known culprit, though there
# may be more.
ac_power = {}
for col in forecasts[0].columns:
member_fx = [fx.get(col) for fx in forecasts]
member_ac_power = pvmodel.irradiance_to_power(
site.modeling_parameters,
solar_position['apparent_zenith'],
solar_position['azimuth'],
*member_fx)
ac_power[col] = member_ac_power
ac_power = pd.DataFrame(ac_power)
else:
ac_power = pvmodel.irradiance_to_power(
site.modeling_parameters, solar_position['apparent_zenith'],
solar_position['azimuth'], *forecasts)
else:
ac_power = None
# resample data after power calculation
resampled = list(map(resampler, (*forecasts, ac_power)))
nwpoutput = namedtuple(
'NWPOutput', ['ghi', 'dni', 'dhi', 'air_temperature', 'wind_speed',
'ac_power'])
return nwpoutput(*resampled)
def _default_load_data(session):
def load_data(observation, data_start, data_end):
df = session.get_observation_values(observation.observation_id,
data_start, data_end,
observation.interval_label)
df = df.tz_convert(observation.site.timezone)
return df['value']
return load_data
[docs]def run_persistence(session, observation, forecast, run_time, issue_time,
index=False, load_data=None):
"""
Run a persistence *forecast* for an *observation*.
For intraday forecasts, the *index* argument controls if the
forecast is constructed using persistence of the measured values
(*index = False*) or persistence using clear sky index or AC power
index.
For day ahead forecasts, only persistence of measured values
(*index = False*) is supported.
Forecasts may be run operationally or retrospectively. For
operational forecasts, *run_time* is typically set to now. For
retrospective forecasts, *run_time* is the time by which the
forecast should be run so that it could have been be delivered for
the *issue_time*. Forecasts will only use data with timestamps
before *run_time*.
The persistence *window* is the time over which the persistence
quantity (irradiance, power, clear sky index, or power index) is
averaged. The persistence window is automatically determined
from the *forecast* attributes:
- Intraday persistence forecasts:
+ ``window = forecast.run_length``. No longer than 1 hour.
- Day ahead forecasts (all but net load) and week ahead forecasts (net
load only):
+ ``window = forecast.interval_length``.
Users that would like more flexibility may use the lower-level
functions in
:py:mod:`solarforecastarbiter.reference_forecasts.persistence`.
Parameters
----------
session : api.Session
The session object to use to request data from the
SolarForecastArbiter API.
observation : datamodel.Observation
The metadata of the observation to be used to create the
forecast.
forecast : datamodel.Forecast
The metadata of the desired forecast.
run_time : pd.Timestamp
Run time of the forecast.
issue_time : pd.Timestamp
Issue time of the forecast run.
index : bool, default False
If False, use persistence of observed value. If True, use
persistence of clear sky or AC power index.
load_data : function
Function to load the observation data 'value' series given
(observation, data_start, data_end) arguments. Typically,
calls `session.get_observation_values` and selects the 'value'
column. May also have data preloaded to then slice from
data_start to data_end.
Returns
-------
forecast : pd.Series
Forecast conforms to the metadata specified by the *forecast*
argument.
Raises
------
ValueError
If forecast and issue_time are incompatible.
ValueError
If data is required from after run_time.
ValueError
If persistence window < observation.interval_length.
ValueError
If forecast.run_length => 1 day and index=True.
ValueError
If instantaneous forecast and instantaneous observation interval
lengths do not match.
ValueError
If average observations are used to make instantaneous forecast.
Notes
-----
For non-intraday net load forecasts, this function will use a weekahead
persistence due to the fact that net load exhibits stronger correlation
week-to-week than day-to-day. For example, the net load on a Monday tends
to look more similar to the previous Monday that it does to the previous
day (Sunday).
"""
utils.check_persistence_compatibility(observation, forecast, index)
forecast_start, forecast_end = utils.get_forecast_start_end(
forecast, issue_time, False)
intraday = utils._is_intraday(forecast)
if load_data is None:
load_data = _default_load_data(session)
data_start, data_end = utils.get_data_start_end(
observation, forecast, run_time, issue_time)
if data_end > run_time:
raise ValueError(
'Persistence forecast requires data from after run_time')
if isinstance(forecast, datamodel.ProbabilisticForecast):
cvs = [f.constant_value for f in forecast.constant_values]
fx = persistence.persistence_probabilistic(
observation, data_start, data_end, forecast_start, forecast_end,
forecast.interval_length, forecast.interval_label, load_data,
forecast.axis, cvs)
elif intraday and index:
fx = persistence.persistence_scalar_index(
observation, data_start, data_end, forecast_start, forecast_end,
forecast.interval_length, forecast.interval_label, load_data)
elif intraday and not index:
fx = persistence.persistence_scalar(
observation, data_start, data_end, forecast_start, forecast_end,
forecast.interval_length, forecast.interval_label, load_data)
elif not intraday and not index:
fx = persistence.persistence_interval(
observation, data_start, data_end, forecast_start,
forecast.interval_length, forecast.interval_label, load_data)
else: # pragma: no cover
raise ValueError(
'index=True not supported for forecasts with run_length >= 1day')
return fx
def all_equal(iterable):
"Returns True if all the elements are equal to each other"
g = itertools.groupby(iterable)
return next(g, True) and not next(g, False)
def _verify_nwp_forecasts_compatible(fx_group):
"""Verify that all the forecasts grouped by piggyback_on are compatible
"""
errors = []
if not len(fx_group.model.unique()) == 1:
errors.append('model')
for var in ('issue_time_of_day', 'lead_time_to_start', 'interval_length',
'run_length', 'interval_label', 'interval_value_type',
'site'):
if not all_equal(getattr(fx, var) for fx in fx_group.forecast):
errors.append(var)
return errors
def _is_reference_forecast(extra_params_string):
match = re.search(r'is_reference_forecast(["\s\:]*)true',
extra_params_string, re.I)
return match is not None
[docs]def find_reference_nwp_forecasts(forecasts, run_time=None):
"""
Sort through all *forecasts* to find those that should be generated
by the Arbiter from NWP models. The forecast must have a *model* key
in *extra_parameters* (formatted as a JSON string). If *piggyback_on*
is also defined in *extra_parameters*, it should be the forecast_id
of another forecast that has the same parameters, including site,
except the variable.
Parameters
----------
forecasts : list of datamodel.Forecasts
The forecasts that should be filtered to find references.
run_time : pandas.Timestamp or None, default None
The run_time of that forecast generation is taking place. If not
None, the next issue time for each forecast is added to the output.
Returns
-------
pandas.DataFrame
NWP reference forecasts with index of forecast_id and columns
(forecast, piggyback_on, model, next_issue_time).
"""
df_vals = []
for fx in forecasts:
# more explicit than filter()
if not _is_reference_forecast(fx.extra_parameters):
logger.debug('Forecast %s is not labeled as a reference forecast',
fx.forecast_id)
continue
try:
extra_parameters = json.loads(fx.extra_parameters)
except json.JSONDecodeError:
logger.warning(
'Failed to decode extra_parameters for %s: %s as JSON',
fx.name, fx.forecast_id)
continue
try:
model = extra_parameters['model']
except KeyError:
logger.error(
'Forecast, %s: %s, has no model. Cannot make forecast.',
fx.name, fx.forecast_id)
continue
if run_time is not None:
next_issue_time = utils.get_next_issue_time(fx, run_time)
else:
next_issue_time = None
piggyback_on = extra_parameters.get('piggyback_on', fx.forecast_id)
df_vals.append((fx.forecast_id, fx, piggyback_on, model,
next_issue_time))
forecast_df = pd.DataFrame(
df_vals, columns=['forecast_id', 'forecast', 'piggyback_on', 'model',
'next_issue_time']
).set_index('forecast_id')
return forecast_df
def _post_forecast_values(session, fx, fx_vals, model_str):
if isinstance(fx, datamodel.ProbabilisticForecast):
if not model_str.startswith('gefs'):
raise ValueError(
'Can only process probabilisic forecast from GEFS')
if not isinstance(fx_vals, pd.DataFrame) or len(fx_vals.columns) != 21:
raise TypeError(
'Could not post probabilistic forecast values: '
'forecast values in unknown format')
# adjust columns to be constant values
cv_df = fx_vals.rename(columns={i: i * 5.0 for i in range(22)})
for cv_fx in fx.constant_values:
# will raise a KeyError if no match
cv_vals = cv_df[cv_fx.constant_value]
logger.debug('Posting %s values to %s', len(cv_vals),
cv_fx.forecast_id)
session.post_probabilistic_forecast_constant_value_values(
cv_fx.forecast_id, cv_vals
)
else:
session.post_forecast_values(fx.forecast_id, fx_vals)
[docs]def process_nwp_forecast_groups(session, run_time, forecast_df):
"""
Groups NWP forecasts based on piggyback_on, calculates the forecast as
appropriate for *run_time*, and uploads the values to the API.
Parameters
----------
session : io.api.APISession
API session for uploading forecast values
run_time : pandas.Timestamp
Run time of the forecast. Also used along with the forecast metadata
to determine the issue_time of the forecast.
forecast_df : pandas.DataFrame
Dataframe of the forecast objects as procduced by
:py:func:`solarforecastarbiter.reference_forecasts.main.find_reference_nwp_forecasts`.
""" # NOQA
for run_for, group in forecast_df.groupby('piggyback_on'):
logger.info('Computing forecasts for group %s', run_for)
errors = _verify_nwp_forecasts_compatible(group)
if errors:
logger.error(
'Not all forecasts compatible in group with %s. '
'The following parameters may differ: %s', run_for, errors)
continue
try:
key_fx = group.loc[run_for].forecast
except KeyError:
logger.error('Forecast, %s, that others are piggybacking on not '
'found', run_for)
continue
model_str = group.loc[run_for].model
model = getattr(models, model_str)
issue_time = group.loc[run_for].next_issue_time
if issue_time is None:
issue_time = utils.get_next_issue_time(key_fx, run_time)
try:
nwp_result = run_nwp(key_fx, model, run_time, issue_time)
except FileNotFoundError as e:
logger.error('Could not process group of %s, %s', run_for, str(e))
continue
for fx_id, fx in group['forecast'].iteritems():
fx_vals = getattr(nwp_result, fx.variable)
if fx_vals is None:
logger.warning('No forecast produced for %s in group with %s',
fx_id, run_for)
continue
logger.info('Posting values %s for %s:%s issued at %s',
len(fx_vals), fx.name, fx_id, issue_time)
_post_forecast_values(session, fx, fx_vals, model_str)
[docs]def make_latest_nwp_forecasts(token, run_time, issue_buffer, base_url=None):
"""
Make all reference NWP forecasts for *run_time* that are within
*issue_buffer* of the next issue time for the forecast. For example,
this function may run in a cronjob every five minutes with *run_time*
set to now. By setting *issue_buffer* to '5min', only forecasts that
should be issued in the next five minutes will be generated on each
run. Only forecasts that belong to the same provider/organization
of the token user will be updated.
Parameters
----------
token : str
Access token for the API
run_time : pandas.Timestamp
Run time of the forecast generation
issue_buffer : pandas.Timedelta
Maximum time between *run_time* and the next initialization time of
each forecast that will be updated
base_url : str or None, default None
Alternate base_url of the API
"""
session = api.APISession(token, base_url=base_url)
user_info = session.get_user_info()
forecasts = session.list_forecasts()
forecasts += session.list_probabilistic_forecasts()
forecasts = [fx for fx in forecasts
if fx.provider == user_info['organization']]
forecast_df = find_reference_nwp_forecasts(forecasts, run_time)
execute_for = forecast_df[
forecast_df.next_issue_time <= run_time + issue_buffer]
if execute_for.empty:
logger.info('No forecasts to be made at %s', run_time)
return
process_nwp_forecast_groups(session, run_time, execute_for)
def _is_reference_persistence_forecast(extra_params_string):
match = re.search(r'is_reference_persistence_forecast(["\s\:]*)true',
extra_params_string, re.I)
return match is not None
def generate_reference_persistence_forecast_parameters(
session, forecasts, observations, max_run_time):
"""Sort through all *forecasts* to find those that should be generated
by the Arbiter from persisting Observation values. The forecast
must have ``'is_reference_persistence_forecast': true`` and an
observation_id in Forecast.extra_parameters (formatted as a JSON
string). A boolean value for "index_persistence" in
Forecast.extra_parameters controls whether the persistence
forecast should be made adjusting for clear-sky/AC power index or
not.
Parameters
----------
session : solarforecastarbiter.io.api.APISession
forecasts : list of datamodel.Forecasts
The forecasts that should be filtered to find references.
observations : list of datamodel.Observations
Observations that will are available to use to fetch values
and make persistence forecasts.
max_run_time : pandas.Timestamp
The maximum run time/issue time for any forecasts. Usually now.
Returns
-------
generator of (Forecast, Observation, index, data_start, issue_times)
"""
user_info = session.get_user_info()
observation_dict = {obs.observation_id: obs for obs in observations}
for fx in forecasts:
if not _is_reference_persistence_forecast(fx.extra_parameters):
logger.debug(
'Forecast %s is not labeled as a reference '
'persistence forecast', fx.forecast_id)
continue
if not fx.provider == user_info['organization']:
logger.debug(
"Forecast %s is not in user's organization",
fx.forecast_id)
continue
try:
extra_parameters = json.loads(fx.extra_parameters)
except json.JSONDecodeError:
logger.warning(
'Failed to decode extra_parameters for %s: %s as JSON',
fx.name, fx.forecast_id)
continue
try:
observation_id = extra_parameters['observation_id']
except KeyError:
logger.error(
'Forecast, %s: %s, has no observation_id to base forecasts'
' off of. Cannot make persistence forecast.',
fx.name, fx.forecast_id)
continue
if observation_id not in observation_dict:
logger.error(
'Observation %s not in set of given observations.'
' Cannot generate persistence forecast for %s: %s.',
observation_id, fx.name, fx.forecast_id)
continue
observation = observation_dict[observation_id]
index = extra_parameters.get('index_persistence', False)
obs_mint, obs_maxt = session.get_observation_time_range(observation_id)
if pd.isna(obs_maxt): # no observations to use anyway
logger.info(
'No observation values to use for %s: %s from observation %s',
fx.name, fx.forecast_id, observation_id)
continue
if isinstance(fx, datamodel.ProbabilisticForecast):
fx_mint, fx_maxt = \
session.get_probabilistic_forecast_constant_value_time_range(
fx.constant_values[0].forecast_id)
else:
fx_mint, fx_maxt = session.get_forecast_time_range(fx.forecast_id)
# find the next issue time for the forecast based on the last value
# in the forecast series
if pd.isna(fx_maxt):
# if there is no forecast yet, go back a bit from the last
# observation. Don't use the start of observations, since it
# could really stress the workers if we have a few years of
# data before deciding to make a persistence fx
next_issue_time = utils.get_next_issue_time(
fx, obs_maxt - fx.run_length)
else:
next_issue_time = utils.find_next_issue_time_from_last_forecast(
fx, fx_maxt)
data_start, _ = utils.get_data_start_end(
observation, fx, next_issue_time, next_issue_time)
issue_times = tuple(_issue_time_generator(
observation, fx, obs_mint, obs_maxt,
next_issue_time, max_run_time))
if len(issue_times) == 0:
continue
out = namedtuple(
'PersistenceParameters',
['forecast', 'observation', 'index', 'data_start',
'issue_times'])
yield out(fx, observation, index, data_start, issue_times)
def _issue_time_generator(observation, fx, obs_mint, obs_maxt, next_issue_time,
max_run_time):
# now find all the run times that can be made based on the
# last observation timestamp
while next_issue_time <= max_run_time:
data_start, data_end = utils.get_data_start_end(
observation, fx, next_issue_time, next_issue_time)
if data_end > obs_maxt:
break
if data_start > obs_mint:
yield next_issue_time
next_issue_time = utils.get_next_issue_time(
fx, next_issue_time + pd.Timedelta('1ns'))
def _preload_load_data(session, obs, data_start, data_end):
"""Fetch all the data required at once and slice as appropriate.
Much more efficient when generating many persistence forecasts from
the same observation.
"""
obs_data = session.get_observation_values(
obs.observation_id, data_start, data_end
).tz_convert(obs.site.timezone)['value']
def load_data(observation, data_start, data_end):
data = obs_data.loc[data_start:data_end]
return adjust_timeseries_for_interval_label(
data, observation.interval_label, data_start, data_end)
return load_data
[docs]def make_latest_persistence_forecasts(token, max_run_time, base_url=None):
"""Make all reference persistence forecasts that need to be made up to
*max_run_time*.
Parameters
----------
token : str
Access token for the API
max_run_time : pandas.Timestamp
Last possible run time of the forecast generation
base_url : str or None, default None
Alternate base_url of the API
"""
session = api.APISession(token, base_url=base_url)
forecasts = session.list_forecasts()
observations = session.list_observations()
params = generate_reference_persistence_forecast_parameters(
session, forecasts, observations, max_run_time)
for fx, obs, index, data_start, issue_times in params:
load_data = _preload_load_data(session, obs, data_start, max_run_time)
serlist = []
logger.info('Making persistence forecast for %s:%s from %s to %s',
fx.name, fx.forecast_id, issue_times[0], issue_times[-1])
for issue_time in issue_times:
run_time = issue_time
try:
fx_ser = run_persistence(
session, obs, fx, run_time, issue_time,
index=index, load_data=load_data)
except ValueError as e:
logger.error('Unable to generate persistence forecast: %s', e)
else:
serlist.append(fx_ser)
if len(serlist) > 0:
ser = pd.concat(serlist)
for cser in generate_continuous_chunks(ser, fx.interval_length):
session.post_forecast_values(fx.forecast_id, cser)
[docs]def make_latest_probabilistic_persistence_forecasts(
token, max_run_time, base_url=None):
"""Make all reference probabilistic persistence forecasts that need to
be made up to *max_run_time*.
Parameters
----------
token : str
Access token for the API
max_run_time : pandas.Timestamp
Last possible run time of the forecast generation
base_url : str or None, default None
Alternate base_url of the API
"""
session = api.APISession(token, base_url=base_url)
forecasts = session.list_probabilistic_forecasts()
observations = session.list_observations()
params = generate_reference_persistence_forecast_parameters(
session, forecasts, observations, max_run_time)
for fx, obs, index, data_start, issue_times in params:
load_data = _preload_load_data(session, obs, data_start, max_run_time)
out = defaultdict(list)
logger.info('Making persistence forecast for %s:%s from %s to %s',
fx.name, fx.forecast_id, issue_times[0], issue_times[-1])
for issue_time in issue_times:
run_time = issue_time
try:
fx_list = run_persistence(
session, obs, fx, run_time, issue_time,
index=index, load_data=load_data)
except ValueError as e:
logger.error('Unable to generate persistence forecast: %s', e)
else:
# api requires a post per constant value
cv_ids = [f.forecast_id for f in fx.constant_values]
for id_, fx_ser in zip(cv_ids, fx_list):
out[id_].append(fx_ser)
for id_, serlist in out.items():
if len(serlist) > 0:
ser = pd.concat(serlist)
for cser in generate_continuous_chunks(
ser, fx.interval_length):
session.post_probabilistic_forecast_constant_value_values(
id_, cser)