#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Contains the main Model class.
Part of symenergy. Copyright 2018 authors listed in AUTHORS.
"""
import sys
from pathlib import Path
import itertools
from collections import Counter
from orderedset import OrderedSet
import pandas as pd
import sympy as sp
import wrapt
import numpy as np
import textwrap
from sympy.tensor.array import derive_by_array
from symenergy.patches.symenergy_solveset import linear_eq_to_matrix
from symenergy.assets.plant import Plant
from symenergy.assets.storage import Storage
from symenergy.assets.curtailment import Curtailment
from symenergy.core.constraint import Constraint
from symenergy.core.slot import Slot, SlotBlock, noneslot
from symenergy.core.parameter import Parameter
from symenergy.auxiliary.parallelization import parallelize_df
from symenergy.auxiliary.parallelization import MP_COUNTER, MP_EMA
from symenergy.auxiliary.parallelization import log_time_progress
from symenergy.auxiliary.decorators import hexdigest
from symenergy import _get_logger
from symenergy.patches.sympy_linsolve import linsolve
#from symenergy.patches.sympy_linear_coeffs import linear_coeffs
from symenergy.auxiliary.constrcomb import filter_constraint_combinations
from symenergy.auxiliary import io
logger = _get_logger(__name__)
sp.linsolve = linsolve # Monkey-patching sympy.linsolve
if __name__ == '__main__': sys.exit()
[docs]class Model:
'''
Instantiate a model object. Start from here.
Parameters
----------
slot_weight : float
default time slot weight (hours); this instantiates a singleton
parameter to avoid the definition of individual parameter for each
slot; it can be overwritten for individual time slots if the weight
parameter is provided
constraint_filt : str
see :func:`symenergy.core.model.Model.init_constraint_combinations`
curtailment : bool or list of Slots, default False
Allow for curtailment in each time slot (True) or in a selection of
time slots. This generates a
:class:`symenergy.assets.curtailment.Curtailment` instance `curt`,
which defines the positive curtailment power variables `curt.p`
for each of the relevant time slots.
nworkers : int or False
number of workers to be used for parallel model setup and solving;
passed to the :class:`multiprocessing.Pool` initializer;
defaults to `multiprocessing.cpu_count() - 1`;
if False, no multiprocessing is used
'''
mutually_exclusive = {
'No power production when curtailing':
(('pos_p', 'this', False), ('curt_pos_p', 'this', False)),
'No discharging when curtailing':
(('pos_pdch', 'this', False), ('curt_pos_p', 'this', False))
}
def __init__(self, nworkers='default', curtailment=False,
slot_weight=1, constraint_filt=''):
self.plants = {}
self.slots = {}
self.slot_blocks = {}
self.storages = {}
self.comps = {}
self.curt = {}
self._cache = None
self.df_comb = None
self.df_comb_invalid = None
self.nworkers = nworkers
self.constraint_filt = constraint_filt
self._slot_weights = Parameter('w', noneslot, slot_weight)
# global vre scaling parameter, set to 1; used for evaluation
self.vre_scale = Parameter('vre_scale', noneslot, 1)
self.noneslot = noneslot
self.curtailment = curtailment
self._ncomb = None # determined through construction of self.df_comb
self._nress = None # number of valid results
@wrapt.decorator
def _update_component_list(f, self, args, kwargs):
'''
Rebuild all derived model attributes.
This is triggered every time a relevant change is made to
the model through the public API.
'''
f(*args, **kwargs)
self.comps = {}
self.comps.update(self.plants)
self.comps.update(self.slots)
self.comps.update(self.storages)
self.comps.update(self.slot_blocks)
if not 'curt' in self.comps and self.curtailment:
logger.debug('Auto-adding curtailment')
self._add_curtailment(self.slots)
self.comps.update(self.curt)
# aggregate all attribute collections
self.parameters = sum(c.parameters._copy() for c in self.comps.values())
self.parameters.append(self.vre_scale)
self.variables = sum(c.variables._copy() for c in self.comps.values())
self.constraints = sum(c.constraints._copy()
for c in self.comps.values())
self._init_supply_constraints()
self._init_total_cost()
self.cache = io.Cache(self.get_model_hash_name())
# self._assert_slot_block_validity()
self.constrs_cols_neq = self.constraints.tolist('col',
is_equality_constraint=False)
@wrapt.decorator
def _check_component_replacement(f, self, args, kwargs):
if 'name' in kwargs:
name = kwargs['name']
elif args and isinstance(args[0], str):
name = args[0]
else:
raise AttributeError('Name required for all components.')
assert name not in {**self.comps, **self.slot_blocks}, (
'A component or slot_block `%s` has already been '
'defined.')%name
return f(*args, **kwargs)
@property
def cache(self):
if self._cache is None:
raise AttributeError('Model cache not yet initialized. Please add '
'a plant or storage asset.')
return self._cache
@cache.setter
def cache(self, cache):
self._cache = cache
[docs] @_update_component_list
def freeze_parameters(self, exceptions=None):
'''
Switch from variable to numerical value for all model parameters.
This automatically updates the definition of all sympy expressions
(total cost, lagrange, ...)
Parameters
----------
exceptions : list(str)
names of parameters to be excluded; corresponds to elements of the
list `Model.parameters('name')`
Example
-------
.. code-block:: python
>>> from symenery.core import model
>>> m = model.Model()
>>> m.add_slot('s0', load=2, vre=3)
>>> m.add_plant('nuc', vc0=1, vc1=0.01)
>>> print(m.tc)
nuc_p_s0*w_none*(nuc_p_s0*vc1_nuc_none + 2*vc0_nuc_none)/2
>>> m.freeze_parameters()
>>> print(m.tc)
nuc_p_s0*(0.005*nuc_p_s0 + 1.0)
Here only the power output variable `nuc_p_s0` is left in the equation.
All other symbols (all parameters) have been substituted with their
respective numerical values.
Excluding parameters through the `exceptions` argument causes a
partial substitution:
.. code-block:: python
>>> m.freeze_parameters(exceptions=['vc0_nuc_none'])
>>> print(m.tc)
nuc_p_s0*(0.005*nuc_p_s0 + 1.0*vc0_nuc_none)
'''
for param in self.parameters():
param._unfreeze_value()
exceptions = [] if not exceptions else exceptions
list_valid = self.parameters('name')
list_invalid = set(exceptions) - set(list_valid)
assert not list_invalid, ('Invalid names %s in exceptions parameter. '
'Valid options are %s.'
)%(', '.join(list_invalid),
', '.join(list_valid))
param_list = [param for param, name in self.parameters(('', 'name'))
if not name in exceptions]
for param in param_list:
param._freeze_value()
def _assert_slot_block_validity(self):
'''
If slot blocks are used, only the case with 2 blocks containing 2 slots
each is implemented.
'''
# adding first non-slot/non-slot_block component
slots_done = (len(self.comps) - hasattr(self, 'curt')
> len(self.slots) + len(self.slot_blocks))
# check validity of time slot block definition
if (self.slot_blocks and (
len(self.slots) > 4 or (slots_done and len(self.slots) < 4))):
raise RuntimeError('Number of time slots must be equal to 4 '
'if time slot blocks are used.')
if len(self.comps) > 0: # only when other components are added
assert len(self.slot_blocks) in [0, 2], \
'Number of slot blocks must be 0 or 2.'
if self.slot_blocks and slots_done:
slots_per_block = Counter(s.block for s in self.slots.values())
assert set(slots_per_block.values()) == set((2,)), \
'Each slot block must be associated with exactly 2 slots.'
@wrapt.decorator
def _add_slots_to_kwargs(f, self, args, kwargs):
if not 'slots' in kwargs:
kwargs.update(dict(slots=self.slots))
return f(*args, **kwargs)
@property
def df_comb(self):
if self._df_comb is not None:
return self._df_comb
else:
raise AttributeError('Model attribute df_comb not yet defined. '
'Construct a valid model and call '
'Model.generate_solve() or '
'Model.init_constraint_combinations()')
@df_comb.setter
def df_comb(self, df_comb):
if df_comb is None:
return
self._df_comb = df_comb.reset_index(drop=True)
if self._nress:
self._nress = len(self._df_comb)
else:
self._ncomb = len(self._df_comb)
[docs] @_check_component_replacement
def add_slot_block(self, name, repetitions):
'''
Add a time slot block to the model.
%s
'''
self.slot_blocks.update({name: SlotBlock(name, repetitions)})
[docs] @_update_component_list
@_add_slots_to_kwargs
@_check_component_replacement
def add_storage(self, name, *args, **kwargs):
r'''
Add generic storage capacity to the model.
%s
''' # Storage docstring added
kwargs['_slot_blocks'] = self.slot_blocks
self.storages.update({name: Storage(name, **kwargs)})
[docs] @_update_component_list
@_add_slots_to_kwargs
@_check_component_replacement
def add_plant(self, name, *args, **kwargs):
r'''
Add a dispatchable power plant to the model.
%s
''' # Plant docstring added
self.plants.update({name: Plant(name, **kwargs)})
# note: no _update_component_list since slots alone make no model
[docs] @_check_component_replacement
def add_slot(self, name, *args, **kwargs):
'''
Add a time slot to the model.
%s
'''
if 'block' in kwargs:
bk = kwargs['block']
assert bk in self.slot_blocks, 'Unknown block %s'%bk
kwargs['block'] = self.slot_blocks[bk]
elif self.slot_blocks:
raise RuntimeError(('Error in `add_slot(%s)`: If any of the slots '
'are assigned to a block, all slots must be.'
)%name)
if not 'weight' in kwargs: # use default weight parameter
kwargs['weight'] = self._slot_weights
self.slots.update({name: Slot(name, **kwargs)})
[docs] @_update_component_list
def add_curtailment(self, slots):
'''
Add curtailment to the model. Must specify the time slots.
This method is only used if curtailment is defined for a subset
of time slots. Use the model parameter `curtailment=True` to enable
curtailment globally.
Parameters
----------
slots : list
list of time slot names, e.g. ``['day', 'night']``
'''
if self.curtailment:
raise RuntimeError('Cannot manually add curtailment if model '
'level curtailment is True.')
self._add_curtailment(slots)
def _add_curtailment(self, slots):
logger.debug('_add_curtailment with slots=%s'%str(slots))
if not isinstance(slots, (dict)):
slots = {slot: self.slots[slot] for slot in slots}
self.curt.update({'curt': Curtailment('curt', slots=slots)})
def _init_total_cost(self):
'''
Generate total cost and base lagrange attributes.
Collects all cost components to calculate their total sum `tc`. Adds
the equality constraints to the model's total cost to generate the base
lagrange function `_lagrange_0`.
Costs and constraint expression of all components are re-initialized.
This is important in case parameter values are frozen.
'''
comp_list = list(self.plants.values()) + list(self.storages.values())
for comp in comp_list:
comp._init_cost_component()
comp._reinit_all_constraints()
eq_cstrs = self.constraints.tolist('expr', is_equality_constraint=True)
self.tc = sum(p.cc for p in comp_list)
self._lagrange_0 = self.tc + sum(eq_cstrs)
# =============================================================================
# =============================================================================
def _supply_cstr_expr_func(self, slot):
'''
Initialize the load constraints for a given time slot.
Note: this accesses all plants, therefore method of the model class.
'''
total_chg = sum(store.pchg[slot]
for store in self.storages.values()
if slot in store.pchg)
total_dch = sum(store.pdch[slot]
for store in self.storages.values()
if slot in store.pdch)
equ = (slot.l.symb
+ total_chg
- total_dch
- sum(plant.p[slot] for plant in self.plants.values()))
if self.curt and slot in self.curt['curt'].p:
equ += self.curt['curt'].p[slot]
if hasattr(slot, 'vre'):
equ -= slot.vre.symb * self.vre_scale.symb
return slot.w.symb * equ
def _init_supply_constraints(self):
'''
Defines a dictionary cstr_load {slot: supply constraint}
'''
for slot in self.slots.values():
cstr = Constraint('supply', expr_func=self._supply_cstr_expr_func,
slot=slot, is_equality_constraint=True,
expr_args=(slot,), comp_name=slot.name)
self.constraints.append(cstr)
# =============================================================================
# =============================================================================
[docs] def generate_solve(self):
'''
Initialize the constraint combinations, generate the problems, and
solve. This calls the following methods:
- `Model.init_constraint_combinations()`
- `Model.define_problems()`
- `Model.solve_all()`
- `Model.filter_invalid_solutions()`
- `Model.generate_total_costs()`
- `Model.cache.write(Model.df_comb)`
'''
if self.cache.file_exists:
self.df_comb = self.cache.load()
else:
self.init_constraint_combinations(self.constraint_filt)
self.define_problems()
self.solve_all()
self.filter_invalid_solutions()
self.generate_total_costs()
self.cache.write(self.df_comb)
def _get_model_mutually_exclusive_cols(self):
'''
Expand model `_MUTUALLY_EXCLUSIVE` to plants and time slots.
The initial list of constraint combinations is filtered only according
to constraint combinations within each component separately
(component `_MUTUALLY_EXCLUSIVE` dictionaries). Here,
additional constraint combinations from different components are
removed.
Assuming only `'this'` as slottype.
TODO: Integrate with constrcomb.CstrCombBase or derived class thereof.
'''
list_col_names = []
dict_struct = {('comp_name', 'base_name'): {('slot',): ''}}
cstrs_all = self.constraints.to_dict(dict_struct=dict_struct)
for mename, me in self.mutually_exclusive.items():
# expand to all components
me_exp = [tuple((cstrs, name_cstr[0], me_slct[-1])
for name_cstr, cstrs in cstrs_all.items()
if name_cstr[1].endswith(me_slct[0]))
for me_slct in me]
# all components of the combination's two constraints
# for example, ('n', 'g'), 'curt' --> ('n', curt), ('g', curt)
me_exp = list(itertools.product(*me_exp))
# remove double components, also: remove component names
me_exp = [tuple((cstr[0], cstr[2]) for cstr in cstrs)
for cstrs in me_exp
if not cstrs[0][1] == cstrs[1][1]]
me_exp = [tuple({slot: (cstr, cstrs[1])
for slot, cstr in cstrs[0].items()}
for cstrs in cstr_comb)
for cstr_comb in me_exp]
# split by time slots for existing time slots
me_exp = [(cstr_comb[0][slot], cstr_comb[1][slot])
for cstr_comb in me_exp
for slot in self.slots.values()
if all(slot in cc for cc in cstr_comb)]
# switch from constraint objects to column names
me_exp = [tuple((cstr[0].col, cstr[1])
for cstr in cstrs) for cstrs in me_exp]
list_col_names += me_exp
return list_col_names
[docs] def init_constraint_combinations(self, constraint_filt=None):
'''
Generates dataframe `model.df_comb` with constraint combinations.
1. Obtains relevant constraint combinations from components (see
:func:`symenergy.core.component.Component._get_constraint_combinations`)
2. Generates table corresponding to the full cross-product of all
component constraint combinations.
3. Filters constraint combinations according to the
:attr:`model.mutually_exclusive` class attribute.
This function initilizes the `symenergy.df_comb` attribute
Parameters
----------
constraint_filt : str
:func:`pandas.DataFrame.query` string to filter the constraint
activation columns of the `df_comb` dataframe. A list of relevant
column names of a model object `m` can be retrieved through
``m.constraints('col', is_equality_constraint=False)``
'''
list_dfcomb = []
for comp in self.comps.values():
list_dfcomb.append(comp._get_constraint_combinations())
list_dfcomb = [df for df in list_dfcomb if not df.empty]
dfcomb = pd.DataFrame({'dummy': 1}, index=[0])
for df in list_dfcomb:
dfcomb = pd.merge(dfcomb, df.assign(dummy=1),
on='dummy', how='outer')
logger.info('Length of merged df_comb: %d'%len(dfcomb))
# filter according to model mutually_exclusive
logger.info('*'*30 + 'model filtering' + '*'*30)
model_mut_excl_cols = self._get_model_mutually_exclusive_cols()
dfcomb = filter_constraint_combinations(dfcomb, model_mut_excl_cols)
self.df_comb = dfcomb.drop('dummy', axis=1)
if constraint_filt:
self.df_comb = self.df_comb.query(constraint_filt)
self._ncomb = len(self.df_comb)
logger.info('Remaining df_comb rows: %d' % self._ncomb)
# get index
self.df_comb = self.df_comb[[c for c in self.df_comb.columns
if not c == 'idx']].reset_index()
self.df_comb = self.df_comb.rename(columns={'index': 'idx'})
#
# def get_variabs_params(self):
# '''
# Generate lists of parameters and variables.
#
# Gathers all parameters and variables from its components.
# This is needed for the definition of the linear equation system.
# '''
#
## self.params = {par: comp
## for comp in self.comps.values()
## for par in comp.get_params_dict()}
#
# # add time-dependent variables
## self.variabs = {var: comp
## for comp in self.comps.values()
## for var in comp.get_variabs()}
#
# # parameter multips
## self.multips = {cstr.mlt: comp for cstr, comp in self.constrs.items()}
# # supply multips
## self.multips.update({cstr.mlt: slot
## for slot, cstr in self.cstr_supply.items()})
# =============================================================================
# Various solver-related methods
# =============================================================================
def _solve(self, x):
# substitute variables with binding positivitiy constraints
cpos = self.constraints.tolist(('col', ''),
is_positivity_constraint=True)
subs_zero = {cstr.expr_0: sp.Integer(0) for col, cstr
in cpos if x[cstr.col]}
mat = derive_by_array(x.lagrange, x.variabs_multips)
mat = sp.Matrix(mat).expand()
mat = mat.subs(subs_zero)
variabs_multips_slct = list(OrderedSet(x.variabs_multips) - OrderedSet(subs_zero))
A, b = linear_eq_to_matrix(mat, variabs_multips_slct)
MP_COUNTER.increment()
solution_0 = sp.linsolve((A, b), variabs_multips_slct)
if isinstance(solution_0, sp.sets.EmptySet):
return None
else:
# init with zeros
solution_dict = dict.fromkeys(x.variabs_multips, sp.Integer(0))
# update with solutions
solution_dict.update(dict(zip(variabs_multips_slct,
list(solution_0)[0])))
solution = tuple(solution_dict.values())
return solution
def _wrapper_call_solve_df(self, df, *args):
name, ntot = 'Solve', self._ncomb
return log_time_progress(self._call_solve_df)(self, df, name, ntot)
def _call_solve_df(self, df):
''' Applies to dataframe. '''
return df.apply(self._solve, axis=1).tolist()
[docs] def solve_all(self):
logger.info('Solving')
if __name__ == '__main__':
x = self.df_comb.iloc[0]
if not self.nworkers:
self.df_comb['result'] = self._call_solve_df(self.df_comb)
else:
func = self._wrapper_call_solve_df
self.df_comb['result'] = parallelize_df(self.df_comb, func,
nworkers=self.nworkers)
# =============================================================================
# =============================================================================
def _subs_total_cost(self, x):
'''
Substitutes solution into TC variables.
This expresses the total cost as a function of the parameters.
'''
res, var = x.result, x.variabs_multips
dict_var = {var: res[ivar]
if not isinstance(res, sp.sets.EmptySet)
else np.nan for ivar, var
in enumerate(var)}
MP_COUNTER.increment()
return self.tc.copy().subs(dict_var)
def _call_subs_tc(self, df):
return df.apply(self._subs_total_cost, axis=1)
def _wrapper_call_subs_tc(self, df, *args):
name = 'Substituting total cost'
ntot = self._nress
return log_time_progress(self._call_subs_tc)(self, df, name, ntot)
[docs] def generate_total_costs(self):
'''
Substitute result variable expressions into total costs.
This adds an additional total cost column ``'tc'`` to the
:attr:`symenergy.core.model.Model.df_comb` table. The total cost is
calculated by substituting the solutions for all variables into
the total cost expression ``Model.tc`` (for each constraint combination).
The execution is parallelized. The ``Model.nworkers`` attribute defines
the number of workers for multiprocessing.
'''
logger.info('Generating total cost expressions...')
df = self.df_comb[['result', 'variabs_multips', 'idx']]
if not self.nworkers:
self.df_comb['tc'] = self._call_subs_tc(df)
else:
func = self._wrapper_call_subs_tc
self.df_comb['tc'] = parallelize_df(df, func,
nworkers=self.nworkers)
# =============================================================================
# =============================================================================
def _construct_lagrange(self, row):
lagrange = self._lagrange_0
active_cstrs = row[row == 1].index.values
lagrange += sum(expr for col, expr
in self.constraints.tolist(('col', 'expr'))
if col in active_cstrs)
MP_COUNTER.increment()
return lagrange
def _call__construct_lagrange(self, df):
'''
Top-level method for parallelization of _construct_lagrange.
'''
return df.apply(self._construct_lagrange, axis=1).tolist()
def _wrapper_call__construct_lagrange(self, df, *args):
name = 'Construct lagrange'
ntot = self._ncomb
return log_time_progress(self._call__construct_lagrange
)(self, df, name, ntot)
# =============================================================================
# =============================================================================
def _get_variabs_multips_slct(self, lagrange):
'''
Returns all relevant variables and multipliers for this model.
Starting from the complete set of variables and multipliers, they are
filtered depending on whether they occur in a specific lagrange
function.
Parameters:
* lagrange -- sympy expression; lagrange function
Return values:
* variabs_slct --
* variabs_time_slct --
* multips_slct --
'''
lfs = lagrange.free_symbols
MP_COUNTER.increment()
list_vm = [ss for ss in lfs
if ss in self.variables.tolist('symb')
+ self.constraints.tolist('mlt')]
return sorted(list_vm, key=str)
def _call_get_variabs_multips_slct(self, df):
# res = list(map(self.get_variabs_multips_slct, df))
return df.apply(self._get_variabs_multips_slct)
def _wrapper_call_get_variabs_multips_slct(self, df, *args):
name = 'Get variabs/multipliers'
ntot = self._ncomb
func = self._call_get_variabs_multips_slct
return log_time_progress(func)(self, df, name, ntot)
# =============================================================================
# =============================================================================
def fix_linear_dependencies(self, x):
'''
All solutions showing linear dependencies are set to zero. See doc
of symenergy.core.model.Model._get_mask_linear_dependencies
'''
MP_COUNTER.increment()
if __name__ == '__main__':
x = self.df_comb.iloc[0]
if x.code_lindep == 0:
list_res_new = x.result
elif x.code_lindep == 3:
list_res_new = x.result
elif x.code_lindep == 1:
list_res = x.result
list_var = x.variabs_multips
collect = {}
list_res_new = [res for res in list_res]
for nres, res in enumerate(list_res):
free_symbs = [var for var in list_var if var in res.free_symbols]
if free_symbs:
list_res_new[nres] = sp.numbers.Zero()
for res in list_res_new:
res.subs(dict.fromkeys(free_symbs, sp.numbers.Zero()))
collect[list_var[nres]] = ', '.join(map(str, free_symbs))
if collect:
logger.debug('idx=%d'%x.idx)
for res, var in collect.items():
logger.debug((' Solution for %s contained variabs '
'%s.')%(res, var))
else:
raise ValueError('code_lindep must be 0, 3, or 1')
return list_res_new
def _call_fix_linear_dependencies(self, df):
return df.apply(self.fix_linear_dependencies, axis=1)
def _wrapper_call_fix_linear_dependencies(self, df, *args):
name = 'Fix linear dependencies'
ntot = self._nress
func = self._call_fix_linear_dependencies
return log_time_progress(func)(self, df, name, ntot)
# =============================================================================
# =============================================================================
[docs] def define_problems(self):
'''
For each combination of constraints, define
* the Lagrange functions (new column *lagrange*
in the ``df_comb`` table)
* the endogenous (dependent) variables and multipliers
(new column *variabs_multips* in the ``df_comb`` table)
'''
logger.info('Defining lagrangians...')
df = self.df_comb[self.constrs_cols_neq]
if not self.nworkers:
self.df_comb['lagrange'] = self._call__construct_lagrange(df)
else:
func = self._wrapper_call__construct_lagrange
nworkers = self.nworkers
self.df_comb['lagrange'] = parallelize_df(df, func,
nworkers=nworkers)
logger.info('Getting selected variables/multipliers...')
df = self.df_comb.lagrange
if not self.nworkers:
self.list_variabs_multips = self._call_get_variabs_multips_slct(df)
self.df_comb['variabs_multips'] = self.list_variabs_multips
else:
func = self._wrapper_call_get_variabs_multips_slct
nworkers = self.nworkers
self.df_comb['variabs_multips'] = parallelize_df(df, func,
nworkers=nworkers)
def _get_mask_empty_solution(self):
'''
Infeasible solutions are empty.
'''
mask_empty = self.df_comb.result.isnull()
return mask_empty
[docs] def _get_mask_linear_dependencies(self):
'''
Solutions of problems containing linear dependencies.
In case of linear dependencies SymPy returns solutions containing
variables which we are actually solving for. To fix this, we
differentiate between two cases:
0. No dependencies
1. All corresponding solutions belong to the same component.
Overspecification occurs if the variables of the same component
depend on each other but are all zero. E.g. charging,
discharging, and stored energy in the case of storage.
They are set to zero.
2. Linear dependent variables belonging to different components.
This occurs if the model is underspecified, e.g. if it doesn't
matter which component power is used. Then the solution can
be discarded without loss of generality. All cases will still
be captured by other constraint combinations.
3. Different components but same component classes. If multiple idling
storage plants are present, their mulitpliers show linear
dependencies.
Returns
-------
code_lindep : pandas.Series
Series with same length as ``df_comb`` with linear dependency codes
as defined above
'''
res_vars = self.df_comb[['result', 'variabs_multips', 'idx']].copy()
# map variable/multiplier -> component
dict_vm_cp = {**self.variables.to_dict({('symb',): 'comp_name'}),
**self.constraints.to_dict({('mlt', ): 'comp_name'})}
dict_vm_cp = {vm: self.comps[cp] for vm, cp in dict_vm_cp.items()}
# map variable/multipler -> component class
dict_vm_cl = {vm: cp.__class__ for vm, cp in dict_vm_cp.items()}
return_series = lambda *args: pd.Series(args,
index=['ncompunq', 'nclassunq'])
# for each individual solution, get residual variables/multipliers
def get_residual_vars(x):
if __name__ == '__main__':
x = res_vars.iloc[0]
varmlt = x.variabs_multips
result = x.result
# identify free variables/multipliers in results
resvars = [r.free_symbols & set(varmlt) for r in result]
# add the corresponding solution variable to all non-empty sets
resvars = [rv | {vm} for rv, vm in zip(resvars, varmlt) if rv]
if not resvars:
return return_series(0, 0)
# get components corresponding to symbols (unique for each result)
rescomps = [set(map(lambda x: dict_vm_cp[x], rv)) for rv in resvars]
# maximum number of distinct components
ncompunq = max(map(len, rescomps))
# get classes corresponding to symbols (unique for each result)
resclass = [set(map(lambda x: dict_vm_cl[x], rv)) for rv in resvars]
# maximum number of distinct classes
nclassunq = max(map(len, resclass))
return return_series(ncompunq, nclassunq)
max_cnt = res_vars.apply(get_residual_vars, axis=1)
# generate lindep codes
max_cnt['code_lindep'] = 0
mask_1 = (max_cnt.ncompunq < 2) & (max_cnt.ncompunq > 0)
max_cnt.loc[mask_1, 'code_lindep'] = 1
mask_2 = (max_cnt.ncompunq >= 2) & (max_cnt.nclassunq >= 2)
max_cnt.loc[mask_2, 'code_lindep'] = 2
mask_3 = (max_cnt.ncompunq >= 2) & (max_cnt.nclassunq <= 1)
max_cnt.loc[mask_3, 'code_lindep'] = 3
return max_cnt.code_lindep
[docs] def filter_invalid_solutions(self):
'''
Analyzes the model result expressions to filter invalid rows.
This method modifies and shortens the ``Model.df_comb`` table.
* Identify empty solutions as returned by the linsolve method
* Remove empty solutions from ``Model.df_comb``. Invalid solutions
are kept in the ``Model.df_comb_invalid`` dataframe.
* Analyze and classify remaining solutions with respect to linear
dependencies of solutions
(:func:`symenergy.core.model.Model._get_mask_linear_dependencies`).
* Fix results with fixable linear dependencies.
'''
mask_empty = self._get_mask_empty_solution()
ncomb0 = len(self.df_comb)
nempty = mask_empty.sum()
shareempty = nempty / ncomb0 * 100
logger.info('Number of empty solutions: '
'{:d} ({:.1f}%)'.format(nempty, shareempty))
# keep empty solutions constraint combinations for post-analysis
self.df_comb_invalid = self.df_comb.loc[mask_empty,
self.constrs_cols_neq]
# remove invalid constraint combinations
self.df_comb = self.df_comb.loc[-mask_empty]
# get info on linear combinations
mask_lindep = self._get_mask_linear_dependencies()
ncomb0 = len(self.df_comb)
nkey1, nkey2, nkey3 = ((mask_lindep == 1).sum(),
(mask_lindep == 2).sum(),
(mask_lindep == 3).sum())
logger.warning(('Number of solutions with linear dependencies: '
'Key 1: {:d} ({:.1f}%), Key 2: {:d} ({:.1f}%), Key 3: {:d} ({:.1f}%)'
).format(nkey1, nkey1/ncomb0*100,
nkey2, nkey2/ncomb0*100,
nkey3, nkey3/ncomb0*100))
self.df_comb['code_lindep'] = mask_lindep
self.df_comb = self.df_comb.loc[-(self.df_comb.code_lindep == 2)]
self._nress = len(self.df_comb)
# adjust results for single-component linear dependencies
if not self.nworkers:
self.df_comb['result'] = \
self._call_fix_linear_dependencies(self.df_comb)
else:
func = self._wrapper_call_fix_linear_dependencies
nworkers = self.nworkers
self.df_comb['result'] = parallelize_df(self.df_comb, func,
nworkers=nworkers)
[docs] def get_results_dict(self, idx, df=None, slct_var_mlt=None,
substitute=None, diff=None, diff_then_subs=True):
'''
Get dictionary with `{variable_name: result_expression}`.
Apply substitutions or derivatives with respect to a parameter.
Parameters
----------
idx : int
index of the constraint combination for which the results are
to be returned
df : df
DataFrame containing the results and the index; defaults to
the model's `df_comb` table
substitute : dict
substitutions to be performed prior to expression
simplification; main use case: setting the energy cost
parameter ``ec`` of the storage class to zero.
slct_var_mlt : list of str
list of variable or multiplier names; must be a subset of
`set(m.variables('name')) | set(map(str, m.constraints('mlt')))`
diff : name or sympy.symbol.Symbol
parameter name or symbol for differentiation
diff_then_subs : bool
If True, first differentiate expressions, then substitute values;
defaults to True
The input DataFrame must have the following columns:
* `variabs_multips`: iterable of variable and multiplier symbols
for which the results are to be printed
* `result`: list of expressions corresponding for each of the
`variabs_multips` symbols
'''
name_to_symb = self.parameters.to_dict({'name': 'symb'})
if diff:
assert isinstance(diff, (sp.symbol.Symbol, str)), \
f'diff must be SymPy symbol or string, got {type(diff)}'
if isinstance(diff, str):
assert diff in name_to_symb, f'Unknown parameter name "{diff}"'
diff = name_to_symb[diff]
if not isinstance(df, pd.DataFrame):
df = self.df_comb
assert idx in set(df.idx), \
f'get_results_dict: idx={idx} not found in df'
if not substitute:
substitute = {}
def sanitize_subs(par):
if isinstance(par, str):
assert par in name_to_symb,\
f'Unknown parameter name "{par}"'
return name_to_symb[par]
else:
assert par in name_to_symb.values(),\
f'Unknown parameter symbol "{par}"'
return par
substitute = {sanitize_subs(par): val for par, val in
substitute.items()}
x = df.loc[lambda x: x.idx == idx].iloc[0]
resdict = dict(sorted(zip(map(str, x.variabs_multips), x.result)))
if slct_var_mlt:
resdict = {var: res for var, res in resdict.items()
if var in slct_var_mlt}
def finalize(diff, diff_then_subs, substitute):
if diff and not diff_then_subs:
fin = lambda res: sp.diff(res.subs(substitute), diff)
elif diff and diff_then_subs:
fin = lambda res: sp.diff(res, diff).subs(substitute)
else:
fin = lambda res: res.subs(substitute)
return lambda res: sp.simplify(fin(res))
return {var: finalize(diff, diff_then_subs, substitute)(res)
for var, res in resdict.items()}
[docs] def print_results(self, *args, **kwargs):
'''
Print result expressions for all variables and multipliers for a
certain constraint combination index.
Parameters are passed to
:func:`symenergy.core.model.Model.get_results_dict`
'''
resdict = self.get_results_dict(*args, **kwargs)
for var, res in resdict.items():
print('*'*20, var, '*'*20)
print(res)
def __repr__(self):
ret = str(self.__class__)
return ret
def print_mutually_exclusive_post(self, logging=False):
print_func = logger.info if logging else print
dfiv = self.df_comb_invalid
dfvl = self.df_comb[self.df_comb_invalid.columns]
tot_list_excl = []
ncols = 3
for ncols in range(2, len(dfvl.columns)):
print_func('ncols=%d'%ncols)
for slct_cols in tuple(itertools.combinations(dfvl.columns, ncols)):
get_combs = lambda df: (df[list(slct_cols)].drop_duplicates()
.apply(tuple, axis=1).tolist())
vals_dfiv_slct = get_combs(dfiv)
vals_dfvl_slct = get_combs(dfvl)
# any in dfvl
vals_remain = [comb for comb in vals_dfiv_slct
if not comb in vals_dfvl_slct]
if vals_remain:
list_exc = [tuple(zip(*colvals)) for colvals
in list(zip([slct_cols] * ncols, vals_remain))]
# check not superset of tot_list_excl elements
list_exc = [comb for comb in list_exc
if not any(set(comb_old).issubset(set(comb))
for comb_old in tot_list_excl)]
if list_exc:
print_func(list_exc)
tot_list_excl += list_exc
def draw_slots(self, graphwidth=70):
slotlist = [(slotname, slot.l.value, slot.vre.value)
for slotname, slot in self.slots.items()]
maxlen = len(max([slot[0] for slot in slotlist], key=len))
maxpwr = max(itertools.chain.from_iterable(slot[1:]
for slot in slotlist))
ljust_all = lambda lst, newlen: [(x[0].ljust(newlen),) + x[1:]
for x in lst]
slotlist = ljust_all(slotlist, maxlen + 1)
slotlist = [(slotname,
round(l / maxpwr * graphwidth),
round(vre / maxpwr * graphwidth))
for slotname, l, vre in slotlist]
bar = lambda l, vre: ((l - vre) * "\u2588" + vre * "\u2591")
slotlist = [(name, bar(l, vre).ljust(graphwidth))
for name, l, vre in slotlist]
for slotbar, slotobj in zip(slotlist, self.slots.values()):
slot, bar = slotbar
data = 'l={:.1f}/vre={:.1f}'.format(slotobj.l.value, slotobj.vre.value)
print(slot, bar, data, sep=' | ', end='\n', flush=True)
@hexdigest
def get_model_hash_name(self):
hash_input = ''.join(comp._get_hash_name()
for comp in self.comps.values())
hash_input += str(self._lagrange_0)
hash_input += self.constraint_filt
return hash_input
# add component class docs to the component adder docstrings
for addermethod, compclass in [(Model.add_storage, Storage),
(Model.add_plant, Plant),
(Model.add_slot, Slot),
(Model.add_slot_block, SlotBlock)]:
doc = addermethod.__doc__
classdoc = textwrap.dedent(compclass.__doc__)
lines = doc.split('\n')
ind = min(len(line) - len(line.strip(' ')) for line in lines
if not line == '')
classdoc = textwrap.indent(classdoc, ' ' * ind)
addermethod.__doc__ = doc % classdoc