from django.utils.translation import gettext_lazy as _
from stemp_abw.visualizations import highcharts
from stemp_abw.models import Scenario, RegMun, MunData
from stemp_abw.results.io import oemof_json_to_results
from stemp_abw.app_settings import node_labels, SIMULATION_CFG as SIM_CFG
from stemp_abw.config.prepare_texts import label_data
from oemof.outputlib import views
import pandas as pd
import json
[docs]class Results(object):
"""Results associated to Simulation
TODO: Complete docstring
"""
def __init__(self, simulation):
self.sq_results_raw, self.sq_param_results_raw = oemof_json_to_results(
Scenario.objects.get(name=str(_('Status quo'))).results.data)
self.results_raw = self.sq_results_raw
self.param_results_raw = self.sq_param_results_raw
self.layer_results = None
self.status = 'init'
self.simulation = simulation
[docs] def set_result_raw_data(self, results_raw, param_results_raw):
self.results_raw = results_raw
self.param_results_raw = param_results_raw
self.status = 'up_to_date'
[docs] @staticmethod
def get_raw_results_df(results_raw):
"""Return DataFrame with optimization results (timeseries) for all
nodes for given raw results
Parameters
----------
results_raw : :obj:`dict` of :pandas:`pandas.DataFrame`
Raw result data from optimization as created by oemof
Returns
-------
:pandas:`pandas.DataFrame`
Node results (timeseries)
"""
timerange = pd.date_range(start=SIM_CFG['date_from'],
end=SIM_CFG['date_to'],
freq=SIM_CFG['freq'])
df = pd.concat([v['sequences'].rename(columns={'flow': k})
for k, v in results_raw.items()], axis=1)
df.index = timerange
return df
[docs] def get_node_results_df(self, node_label):
"""Return DataFrame with optimization results (timeseries) for single
node.
Parameters
----------
node_label : :obj:`str`
Label of node the data should be looked up for
Returns
-------
:pandas:`pandas.DataFrame`
Node results (timeseries)
"""
if node_label in [str(n) for n in self.simulation.esys.nodes]:
return views.node(self.results_raw, node_label)
else:
raise ValueError(f'Node "{node_label}" not found in energy system!')
[docs] def get_result_charts_data(self):
"""Analyze results and return data for panel display"""
#####################
# Energy production #
#####################
# node sources and targets
nodes_from_prod = ['gen_el_wind',
'gen_el_pv_ground',
'gen_el_pv_roof',
'gen_el_hydro',
'gen_el_bio',
'gen_el_conventional',
'shortage_el']
nodes_to_prod = ['bus_el']
# 1) Annual data (user scenario + SQ)
#####################################
# aggregate raw data
data_power_prod_a_user_scn = self.aggregate_flow_results(
nodes_from_prod,
nodes_to_prod,
self.results_raw,
resample_mode='A',
agg_mode='sum'
)
data_power_prod_a_sq_scn = self.aggregate_flow_results(
nodes_from_prod,
nodes_to_prod,
self.sq_results_raw,
resample_mode='A',
agg_mode='sum'
)
# prepare chart data
hc_column_power_prod_both_scn = [{'name': k1, 'data': [v1[0], v2[0]]}
for (k1, v1), (k2, v2) in
zip(data_power_prod_a_user_scn,
data_power_prod_a_sq_scn)]
hc_pie_power_production_user_scn = [{'name': k, 'y': v[0]}
for (k, v) in data_power_prod_a_user_scn]
hc_pie_power_production_sq_scn = [{'name': k, 'y': v[0]}
for (k, v) in data_power_prod_a_sq_scn]
# 2) Monthly data (user scenario)
#################################
data_power_prod_m_user_scn = self.aggregate_flow_results(
nodes_from=['gen_el_wind',
'gen_el_pv_ground',
'gen_el_pv_roof',
'gen_el_hydro',
'gen_el_bio'],
nodes_to=['bus_el'],
results_raw=self.results_raw,
resample_mode='M',
agg_mode='sum'
)
data_power_prod_m_user_scn = {k: v
for (k, v) in data_power_prod_m_user_scn}
data_power_prod_m_user_scn[str(_('Photovoltaik'))] = [
round(x1+x2, 1)
for x1, x2 in zip(data_power_prod_m_user_scn.pop(str(_('PV Freifläche'))),
data_power_prod_m_user_scn.pop(str(_('PV Dach'))))]
hc_column_power_prod_m_user_scn = [{'name': k, 'data': v}
for k, v in data_power_prod_m_user_scn.items()]
#################
# Energy demand #
#################
# node sources and targets
nodes_from_dem = ['bus_el']
nodes_to_dem = ['dem_el_hh',
'dem_el_rca',
'dem_el_ind',
'excess_el']
# aggregate raw data
data_power_dem_a_user_scn = self.aggregate_flow_results(
nodes_from_dem,
nodes_to_dem,
self.results_raw,
resample_mode='A',
agg_mode='sum'
)
data_power_dem_a_sq_scn = self.aggregate_flow_results(
nodes_from_dem,
nodes_to_dem,
self.sq_results_raw,
resample_mode='A',
agg_mode='sum'
)
# prepare chart data
hc_column_power_dem_both_scn = [{'name': k1, 'data': [v1[0], v2[0]]}
for (k1, v1), (k2, v2) in
zip(data_power_dem_a_user_scn,
data_power_dem_a_sq_scn)]
###################
# Own consumption #
###################
# 1) column 1: balance
######################
data_power_prod_a_user_scn_sum = sum([v[0] for (k,v)
in data_power_prod_a_user_scn
if k != str(_('Import'))])
data_power_prod_a_sq_scn_sum = sum([v[0] for (k, v)
in data_power_prod_a_sq_scn
if k != str(_('Import'))])
data_power_dem_a_user_scn_sum = sum([v[0] for (k,v)
in data_power_dem_a_user_scn
if k != str(_('Export'))])
data_power_dem_a_sq_scn_sum = sum([v[0] for (k, v)
in data_power_dem_a_sq_scn
if k != str(_('Export'))])
# prepare chart data
hc_column_power_own_cons_both_scn_balance = [
round(data_power_prod_a_user_scn_sum /
data_power_dem_a_user_scn_sum * 100, 1),
round(data_power_prod_a_sq_scn_sum /
data_power_dem_a_sq_scn_sum * 100, 1)]
# 2) column 2: simultaneous
###########################
# production
nodes_from = ['gen_el_wind',
'gen_el_pv_ground',
'gen_el_pv_roof',
'gen_el_hydro',
'gen_el_bio',
'gen_el_conventional']
nodes_to = ['bus_el']
data_power_prod_user_scn = self.aggregate_flow_results(
nodes_from,
nodes_to,
results_raw=self.results_raw
).sum(axis=1)
data_power_prod_sq_scn = self.aggregate_flow_results(
nodes_from,
nodes_to,
self.sq_results_raw
).sum(axis=1)
# demand
nodes_from = ['bus_el']
nodes_to = ['dem_el_hh',
'dem_el_rca',
'dem_el_ind']
data_power_dem_a_user_scn = self.aggregate_flow_results(
nodes_from,
nodes_to,
self.results_raw
).sum(axis=1)
data_power_dem_a_sq_scn = self.aggregate_flow_results(
nodes_from,
nodes_to,
self.sq_results_raw
).sum(axis=1)
# calc time share where prod >= demand
data_power_prod_both_scn_simult = [
round(sum(data_power_prod_user_scn >= data_power_dem_a_user_scn) /
len(data_power_prod_user_scn) * 100, 1),
round(sum(data_power_prod_sq_scn >= data_power_dem_a_sq_scn) /
len(data_power_prod_sq_scn) * 100, 1)]
# prepare chart data
hc_column_power_own_cons_both_scn = [
{'name': str(_('Bilanziell')),
'data': hc_column_power_own_cons_both_scn_balance},
{'name': str(_('Zeitgleich')),
'data': data_power_prod_both_scn_simult}]
######################
# make dict for json #
######################
chart_data = {
'hc_column_power_prod_both_scn':
hc_column_power_prod_both_scn,
'hc_column_power_dem_both_scn':
hc_column_power_dem_both_scn,
'hc_column_power_own_cons_both_scn':
hc_column_power_own_cons_both_scn,
'hc_pie_power_production_user_scn':
hc_pie_power_production_user_scn,
'hc_pie_power_production_sq_scn':
hc_pie_power_production_sq_scn,
'hc_column_power_prod_m_user_scn':
hc_column_power_prod_m_user_scn,
'hc_res_wind_time':
[5, 5, 5, 4, 2, 0, 2, 8, 1, 7, 1]
}
return chart_data
[docs] def get_layer_results(self):
"""Analyze results and return data for layer display"""
# TODO: better var names below!
# get user scn results for muns
scn_data = json.loads(self.simulation.session.user_scenario.data.data)
mun_results = pd.DataFrame.from_dict(scn_data['mun_data'],
orient='index')
mun_results.index = mun_results.index.astype(int)
mun_data = pd.DataFrame(list(
MunData.objects \
.values_list('ags', 'area', named=True))).set_index(['ags'])
mun_data.index = mun_data.index.astype(int)
results = pd.DataFrame(list(
RegMun.objects \
.values_list('ags', named=True))).set_index(['ags'])
# create DF with properties equivalent to those defined in models.py
results['pop'] = mun_results['pop']
results['pop_density'] = (
mun_results['pop'] / mun_data['area']).round()
results['gen_energy_re'] = (mun_results[[
'gen_el_energy_wind',
'gen_el_energy_pv_roof',
'gen_el_energy_pv_ground',
'gen_el_energy_hydro',
'gen_el_energy_bio']].sum(axis=1) / 1e3).round()
results['dem_el_energy'] = (mun_results[[
'dem_el_energy_hh',
'dem_el_energy_rca',
'dem_el_energy_ind']].sum(axis=1) / 1e3).round()
results['energy_re_el_dem_share'] = (
results['gen_energy_re'] / results['dem_el_energy'] * 100).round()
results['gen_energy_re_per_capita'] = (
results['gen_energy_re'] / results['pop']).round(decimals=1)
results['gen_energy_re_density'] = (
results['gen_energy_re'] * 1e3 / mun_data['area']).round(decimals=1)
results['gen_cap_re'] = (mun_results[[
'gen_capacity_wind',
'gen_capacity_pv_roof_large',
'gen_capacity_pv_ground',
'gen_capacity_hydro',
'gen_capacity_bio']].sum(axis=1)).round()
results['gen_cap_re_density'] = (
results['gen_cap_re'] / mun_data['area']).round(decimals=2)
results['gen_count_wind_density'] = (
mun_results['gen_count_wind'] / mun_data['area']).round(decimals=2)
results['dem_el_energy_per_capita'] = (
results['dem_el_energy'] * 1e6 / results['pop']).round()
results['dem_th_energy'] = (mun_results[[
'dem_th_energy_hh',
'dem_th_energy_rca']].sum(axis=1) / 1e3).round()
results['dem_th_energy_per_capita'] = (
results['dem_th_energy'] / results['pop']).round()
results = results.add_suffix(suffix='_result')
return results
[docs] def update_mun_energy_results_post_simulation(self):
"""Update energy results in mun data of user scenario
Returns
-------
"""
# get user scn results for muns
scn_data = json.loads(self.simulation.session.user_scenario.data.data)
scn_mun_data = pd.DataFrame.from_dict(scn_data['mun_data'],
orient='index')
scn_mun_data.index = scn_mun_data.index.astype(int)
# get energy columns to be updated
gen_el_energy_cols = [col for col in scn_mun_data if col.startswith('gen_el_energy')]
dem_el_energy_cols = [col for col in scn_mun_data if col.startswith('dem_el_energy')]
# get results (production)
nodes_from_prod = ['gen_el_wind',
'gen_el_pv_ground',
'gen_el_pv_roof',
'gen_el_hydro',
'gen_el_bio',
'gen_el_conventional']
nodes_to_prod = ['bus_el']
data_power_prod_user_scn = self.aggregate_flow_results(
nodes_from_prod,
nodes_to_prod,
self.results_raw
).sum(axis=0)
data_power_prod_user_scn.index = [src.replace('gen_el', 'gen_el_energy')
for (src, tar) in
data_power_prod_user_scn.index]
# get results (demand)
nodes_from_dem = ['bus_el']
nodes_to_dem = ['dem_el_hh',
'dem_el_rca',
'dem_el_ind']
data_power_dem_user_scn = self.aggregate_flow_results(
nodes_from_dem,
nodes_to_dem,
self.results_raw
).sum(axis=0)
data_power_dem_user_scn.index = [tar.replace('dem_el', 'dem_el_energy')
for (src, tar) in
data_power_dem_user_scn.index]
# calc results and update mun scn data
gen_el_energy_results = data_power_prod_user_scn * \
self.simulation.session.mun_to_reg_ratios[
gen_el_energy_cols]
gen_el_energy_results.index = gen_el_energy_results.index.astype(int)
dem_el_energy_results = data_power_dem_user_scn * \
self.simulation.session.mun_to_reg_ratios[
dem_el_energy_cols]
dem_el_energy_results.index = dem_el_energy_results.index.astype(int)
scn_mun_data.update(gen_el_energy_results)
scn_mun_data.update(dem_el_energy_results)
scn_mun_data.index = scn_mun_data.index.astype(str)
scn_data['mun_data'] = scn_mun_data.to_dict(orient='index')
self.simulation.session.user_scenario.data.data = json.dumps(
scn_data,
sort_keys=True
)
[docs] def aggregate_flow_results(self, nodes_from, nodes_to, results_raw,
resample_mode=None, agg_mode='sum'):
"""Aggregate raw data for each node in `nodes_from` to `nodes_to`.
Either `nodes_from` or `nodes_to` must contain a single node
label, the other one can contain one or more labels.
Parameters
----------
nodes_from : :obj:`list` of :obj:`str`
Source node labels, e.g. ['bus_el']
nodes_to : :obj:`list` of :obj:`str`
Target node labels, e.g. ['gen_el_wind', 'gen_el_pv_ground']
results_raw : :obj:`dict` of :pandas:`pandas.DataFrame`
Raw result data from optimization as created by oemof
resample_mode : :obj:`str` or None
Resampling option according to :pandas:`pandas.DataFrame.resample`
If None, no resampling takes place and `agg_mode` is not used.
Examples: 'A' (year), 'M' (month)
Default: None
agg_mode : :obj:`str`
Aggregation mode for resampling given in `resample_mode`,
possible values: 'sum', 'mean'
Default: 'sum'
Returns
-------
* If `resample_mode` is None:
:pandas:`pandas.DataFrame` with raw timeseries
* If `resample_mode` is not None:
:obj:`list` of :obj:`tuple`
Sum of annual flow by source or target node,
format: [('name_1', [value_11, ..., value_1n]),
...,
('name_n', [value_n1, ..., value_nn])]
# TODO: Always return DF, move conversion to chart-usable format to other fct
"""
# extract requested columns
if len(nodes_to) == 1 and len(nodes_from) >= 1:
ts = self.get_raw_results_df(results_raw)[[(n_from, nodes_to[0])
for n_from in nodes_from]]
multiple_nodes = 'from'
elif len(nodes_from) == 1 and len(nodes_to) >= 1:
ts = self.get_raw_results_df(results_raw)[[(nodes_from[0], n_to)
for n_to in nodes_to]]
multiple_nodes = 'to'
else:
raise ValueError(str(_('One of source and target nodes '
'must contain exactly 1 node, the '
'other >=1 nodes.')))
# resampling and aggregation
if resample_mode is not None:
if agg_mode == 'sum':
agg_data = ts.resample(resample_mode).sum()
elif agg_mode == 'mean':
agg_data = ts.resample(resample_mode).mean()
else:
raise ValueError(str(_('Aggregation mode is invalid.')))
else:
return ts
# reformat
if multiple_nodes == 'from':
agg_data = [(node_labels()[k[0]], [round(_/1000, 1) for _ in v])
for k, v in agg_data.to_dict(orient='list').items()]
elif multiple_nodes == 'to':
agg_data = [(node_labels()[k[1]], [round(_/1000, 1) for _ in v])
for k, v in agg_data.to_dict(orient='list').items()]
return agg_data
[docs]class ResultChart(object):
"""
Scenarios are loaded, analyzed and visualized within this class
"""
def __init__(self, setup_labels, type=None, data=None):
self.setup_labels = setup_labels
self.type = type
self.data = data
[docs] def visualize(self, **kwargs):
# load tooltip text from labels using container id
container_id = kwargs.get('renderTo', None)
if container_id is not None:
tooltip_section = label_data()['charts'].get(container_id, None)
if tooltip_section is not None:
tooltip_text = tooltip_section.get('text', '')
else:
tooltip_text = ''
else:
tooltip_text = ''
# prepare chart
if self.type == 'line':
visualization = highcharts.HCTimeseries(
data=self.data,
setup_labels=self.setup_labels,
tooltip_text=tooltip_text,
style='display: inline-block',
**kwargs
)
elif self.type == 'pie':
visualization = highcharts.HCPiechart(
data=self.data,
setup_labels=self.setup_labels,
tooltip_text=tooltip_text,
style='display: inline-block',
**kwargs
)
elif self.type == 'column':
visualization = highcharts.HCStackedColumn(
data=self.data,
setup_labels=self.setup_labels,
tooltip_text=tooltip_text,
style='display: inline-block',
**kwargs
)
else:
raise ValueError
return visualization