Source code for symenergy.evaluator.evaluator

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Contains the Evaluator class.

Part of symenergy. Copyright 2018 authors listed in AUTHORS.
"""
import os
import sys
import gc
import py_compile
import sympy as sp
import numpy as np
from importlib import reload
from multiprocessing import current_process
import pandas as pd
import itertools
import random
from hashlib import md5
from functools import partial
import time
from sympy.utilities.lambdify import lambdastr
import symenergy

from symenergy.auxiliary.parallelization import parallelize_df
from symenergy.auxiliary.parallelization import log_time_progress
from symenergy import multiproc_params
from symenergy.auxiliary.parallelization import get_default_nworkers
from symenergy.auxiliary.parallelization import MP_COUNTER, MP_EMA
from symenergy.auxiliary import parallelization
from symenergy.auxiliary.decorators import hexdigest
from symenergy.auxiliary.io import EvaluatorCache

from symenergy.core.model import Model
from symenergy import _get_logger

logger = _get_logger(__name__)

pd.options.mode.chained_assignment = None

THRESHOLD_UNEXPECTED_ZEROS = 1e-9

def log_info_mainprocess(logstr):
    if current_process().name == 'MainProcess':
        logger.info(logstr)

def _eval(func, df_x):
    '''
    Vectorized evaluation

    Parameters
    ----------
    func : pandas.Series
    df_x : pandas.DataFrame
    '''

    new_index = df_x.set_index(df_x.columns.tolist()).index
    data = func.iloc[0](*df_x.values.T)
    if not isinstance(data, np.ndarray):  # constant value --> expand
        data = np.ones(df_x.iloc[:, 0].values.shape) * data


    res = pd.DataFrame(data, index=new_index)
    MP_COUNTER.increment()

    return res


class Expander():
    '''
    Evaluates the functions in the `lambd_func` column of `df` with all
    values of the x_vals dataframe.
    '''

    def __init__(self, x_vals):

        self.df_x_vals = x_vals


    def _expand(self, df):

        logger.warning('_call_eval: Generating dataframe with length %d' % (
                        len(df) * len(self.df_x_vals)))
        if not multiproc_params['nworkers'] or multiproc_params['nworkers'] == 1:
            df_result = self._call_eval(df)
        else:
            self.nparallel = len(df)
            df_result = parallelize_df(df=df[['func', 'idx', 'lambd_func']],
                                       func=self._wrapper_call_eval)

        return df_result.rename(columns={0: 'lambd'}).reset_index()


    def _call_eval(self, df):

        df_result = (df.groupby(['func', 'idx'])
                        .lambd_func
                        .apply(_eval, df_x=self.df_x_vals))
        return df_result


    def _restore_columns(self, df_result, df):

        ind = ['func', 'idx']
        cols = ['is_positive']
        return df_result.join(df.set_index(ind)[cols], on=ind)


    def _wrapper_call_eval(self, df):

        name, ntot = 'Vectorized evaluation', self.nparallel
        return log_time_progress(self._call_eval)(self, df, name, ntot)


    def run(self, df):

        df_result = self._expand(df)
        return self._restore_columns(df_result, df)


# %% ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

class EvalAnalysis():
    '''
    Identifies optimal and infeasible solutions.
    '''

    def __init__(self, x_vals, map_col_func, dict_cap, dict_constrs,
                 tolerance, drop_non_optimum):

        self.x_vals = x_vals
        self.map_col_func = map_col_func
        self.tolerance = tolerance
        self.drop_non_optimum = drop_non_optimum
        self.dict_cap = dict_cap
        self.dict_constrs = dict_constrs

        self.x_name = list(map(lambda x: x.name, self.x_vals))


    def run(self, df):

        if not multiproc_params['nworkers'] or multiproc_params['nworkers'] == 1:
            df_exp = self._evaluate_by_x_new(df)

        else:
            group_params = self._get_optimum_group_params()
            df_split = [df for _, df in (df.groupby(group_params))]

            self.nparallel = len(df_split)
            df_exp = parallelize_df(df=df_split,
                                    func=self._wrapper_call_evaluate_by_x)
        return df_exp


    def _get_optimum_group_params(self):
        '''
        Identify groupby columns to get closest to nchunks.

        evaluate_by_x must be applied to full sets of constraint
        combinations, since constraint combinations are to be compared.
        '''

        nchunks = get_default_nworkers() * multiproc_params['chunks_per_worker']

        param_combs = \
            itertools.chain.from_iterable(itertools.combinations(self.x_vals, i)
                                          for i in range(1, len(self.x_vals) + 1))
        len_param_combs = {params: np.prod(list(len(self.x_vals[par])
                                                for par in params))
                           for params in param_combs}

        dev_param_combs = {key: abs((len_ - nchunks) / nchunks)
                           for key, len_ in len_param_combs.items()}

        group_params = min(dev_param_combs, key=lambda x: dev_param_combs[x])
        group_params = list(map(lambda x: x.name, group_params))

        return group_params


    def _get_map_sanitize(self, df):
        '''
        Identify zero values with non-binding zero constraints.
        '''

        map_ = pd.Series([False] * len(df), index=df.index)

        for col, func in self.map_col_func:
            map_new = ((df.func == func)
                       & df.idx.isin(self.dict_constrs[col])
                       & (df['lambd'].abs() <= THRESHOLD_UNEXPECTED_ZEROS))
            map_ |= map_new

        return map_


    def _evaluate_by_x_new(self, df):

        MP_COUNTER.increment()

        log_info_mainprocess('Sanitizing unexpected zeros.')
        df['map_sanitize'] = self._get_map_sanitize(df)
        df.loc[df.map_sanitize, 'lambd'] = np.nan

        log_info_mainprocess('Getting mask valid solutions.')
        mask_valid = self._get_mask_valid_solutions(df)
        df = df.join(mask_valid, on=mask_valid.index.names)
        df.loc[:, 'lambd'] = df.lambd.astype(float)

        log_info_mainprocess('Identify cost optimum.')
        df.loc[:, 'is_optimum'] = self.init_cost_optimum(df)

        if self.drop_non_optimum:
            df = df.loc[df.is_optimum]

        return df

#    def _call_evaluate_by_x(self, df):
#        return self._evaluate_by_x_new(df)

    def _wrapper_call_evaluate_by_x(self, df):

        name, ntot = 'Evaluate', self.nparallel
        return log_time_progress(self._evaluate_by_x_new)(self, df, name, ntot)


    def _get_mask_valid_solutions(self, df, return_full=False):
        '''
        Obtain a mask identifying valid solutions for each parameter value set
        and constraint combination.

        Indexed by x_name and constraint combination idx, *not* by function.

        Parameters
        ----------
        df : pandas.DataFrame
        return_full : bool
            if True, returns the full mask for debugging, i.e. indexed by
            functions prior to consolidation
        '''

        df = df.copy() # this is important, otherwise we change the x_vals

        mask_valid = pd.Series(True, index=df.index)
        mask_valid &= self._get_mask_valid_positive(df)
        mask_valid &= self._get_mask_valid_capacity(df.copy())

        df.loc[:, 'mask_valid'] = mask_valid

        if return_full:  # for debugging
            return df

        # consolidate mask by constraint combination and x values
        index = self.x_name + ['idx']
        mask_valid = df.pivot_table(index=index, values='mask_valid',
                                    aggfunc=min)

        return mask_valid


    def _get_mask_valid_positive(self, df):
        ''' Called by _get_mask_valid_solutions '''

        msk_pos = df.is_positive == 1
        mask_positive = pd.Series(True, index=df.index)
        mask_positive.loc[msk_pos] = df.loc[msk_pos].lambd + self.tolerance >= 0

        return mask_positive


    def _get_mask_valid_capacity(self, df):
        ''' Called by _get_mask_valid_solutions '''

        mask_valid = pd.Series(True, index=df.index)

        for C, pp in (self.dict_cap if self.dict_cap else []):

            slct_func = [symb.name for symb in pp]

            mask_slct_func = df.func.isin(slct_func)

            # things are different depending on whether or not select_x
            # is the corresponding capacity
            if C in self.x_vals.keys():
                val_cap = df[C.name]
            else:
                val_cap = pd.Series(C.value, index=df.index)

            # need to add retired and additional capacity
            for addret, sign in {'add': +1, 'ret': -1}.items():
                func_C_addret = [variab for variab in slct_func
                                 if 'C_%s_none'%addret in variab]
                func_C_addret = func_C_addret[0] if func_C_addret else None
                if func_C_addret:
                    mask_addret = (df.func.str.contains(func_C_addret))
                    df_C = df.loc[mask_addret].copy()
                    df_C = (df_C.set_index(['idx'] + self.x_name)['lambd']
                                .rename('_C_%s'%addret))
                    df = df.join(df_C, on=df_C.index.names)

                    # doesn't apply to itself, hence -mask_addret
                    val_cap.loc[-mask_addret] += \
                        + sign * df.loc[-mask_addret,
                                                 '_C_%s'%addret]

            constraint_met = pd.Series(True, index=df.index)
            constraint_met.loc[mask_slct_func] = \
                                (df.loc[mask_slct_func].lambd
                                 * (1 - self.tolerance)
                                 <= val_cap.loc[mask_slct_func])

            # delete temporary columns:
            df = df[[c for c in df.columns
                                    if not c in ['_C_ret', '_C_add']]]

            mask_valid &= constraint_met

        return mask_valid


    def init_cost_optimum(self, df_result):
        ''' Adds binary cost optimum column to the expanded dataframe. '''

        cols = ['lambd', 'idx'] + self.x_name
        tc = df_result.loc[(df_result.func == 'tc')
                           & df_result.mask_valid, cols].copy()

        if not tc.empty:

            tc_min = (tc.groupby(self.x_name, as_index=0)
                        .apply(lambda x: x.nsmallest(1, 'lambd')))

            def get_cost_optimum_single(df):
                df = df.sort_values('lambd')
                df.loc[:, 'is_optimum'] = False
                df.iloc[0, -1] = True
                return df[['is_optimum']]

            mask_is_opt = (tc.set_index('idx')
                             .groupby(self.x_name)
                             .apply(get_cost_optimum_single))

            df_result = df_result.join(mask_is_opt, on=mask_is_opt.index.names)

            # mask_valid == False have is_optimum == NaN at this point
            df_result.is_optimum.fillna(False, inplace=True)

        else:

            df_result.loc[:, 'is_optimum'] = False

        return df_result.is_optimum


# %% ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

[docs]class Evaluator(): ''' Evaluates model results for selected parameter values. Parameters ---------- model : :class:`symenergy.core.model.Model` SymEnergy model instance x_vals : dict dictionary ``{parameter_instance_0: iterable_of_values}`` drop_non_optimum : bool if False, also keeps constraint combinations associated with non-optimal constraints tolerance : float absolute tolerance for constraint evaluation to allow for numerical inaccuracies Example ------- .. code-block:: python >>> import numpy as np >>> from symenergy.core.model import Model >>> from symenergy.evaluator.evaluator import Evaluator >>> m = Model(curtailment=True) >>> m.add_slot(name='day', load=4500, vre=4500) >>> m.add_plant(name='n', vc0=10, vc1=1, capacity=3500) >>> m.add_plant(name='g', vc0=90, vc1=10) >>> m.generate_solve() >>> x_vals = {m.vre_scale: np.linspace(0, 1, 51), m.comps['n'].C: [0, 1000, 3000]} >>> ev = Evaluator(m, x_vals=x_vals) The attribute ``ev.df_x_vals`` is a table with all parameter value combinations: .. code-block:: python >>> ev.df_x_vals vre_scale_none C_n_none 0 0.00 0 1 0.00 1000 2 0.00 3000 3 0.02 0 4 0.02 1000 ... The methods * :func:`symenergy.evaluator.evaluator.Evaluator.get_evaluated_lambdas_parallel` and * :func:`symenergy.evaluator.evaluator.Evaluator.expand_to_x_vals_parallel` are used to perform the actual evaluation. .. seealso: :ref:`label_example_minimal` minimal SymEnergy example demonstrating the use of the evaluator class ''' def __init__(self, model:Model, x_vals:dict, drop_non_optimum=False, tolerance=1e-9): self.drop_non_optimum = drop_non_optimum self.model = model self.x_vals = x_vals self.cache_lambd, self.cache_eval = self._get_caches() self.eval_analysis, self.expander = self._get_helpers( drop_non_optimum, tolerance) self.dfev = self._get_dfev() self.dict_param_values = self._get_param_values() # attribute name must match self.df_exp columns name self.is_positive = \ self.model.constraints('expr_0', is_positivity_constraint=True) @property def fn_temp_module(self): fn = self.cache_lambd.fn.replace('.pickle', '_eval_temp.py') return fn @fn_temp_module.setter def fn_temp_module(self, _): raise AttributeError("Attempt to change evaluator temp path. Modify " "the symenergy.cache_params['path'] value " "instead, prior to initializing the " "Evaluator class.") def _get_helpers(self, drop_non_optimum, tolerance): map_col_func_pos = \ self.model.constraints(('col', 'var_name'), is_positivity_constraint=True) dict_cap = [(cap, val) for comp in self.model.comps.values() if not comp in self.model.slots.values() # exclude slots for cap, val in comp.get_constrained_variabs()] dict_constrs_inactive = (pd.melt(self.model.df_comb, id_vars=['idx'], var_name='act_col', value_vars=self.model.constrs_cols_neq, value_name='active' ).assign(inactive=lambda x: ~x.active) .groupby('act_col') .apply(lambda x: set(x.loc[x.inactive].idx)) .to_dict()) eval_analysis = EvalAnalysis(self.x_vals, map_col_func_pos, dict_cap, dict_constrs_inactive, tolerance=tolerance, drop_non_optimum=drop_non_optimum) expander = Expander(self.df_x_vals) return eval_analysis, expander def _get_dfev(self): ''' Returns a modified main model DataFrame `df_comb`. Variables and multipliers are converted from sympy symbols to strings.''' cols = ['variabs_multips', 'result', 'idx', 'tc'] dfev = self.model.df_comb[cols].copy() dfev.variabs_multips = dfev.variabs_multips.apply( lambda x: list(map(str, x))) return dfev @property def x_vals(self): return self._x_vals @x_vals.setter def x_vals(self, x_vals): x_keys_old = ([val for val in self._x_vals] if hasattr(self, '_x_vals') else None) if x_keys_old: assert list(x_vals) == x_keys_old, \ 'Keys of x_vals attribute must not change.' frozen_params = [x.name for x in x_vals if x._is_frozen] assert not frozen_params, ('Encountered frozen parameters %s in ' 'x_vals.') % str(frozen_params) self._x_vals = x_vals self.x_symb = [x.symb for x in self._x_vals.keys()] self.x_name = [x.name for x in self.x_symb] self.x_name_str = '(%s)'%','.join(self.x_name) self.df_x_vals = self._get_x_vals_combs() if hasattr(self, 'eval_analysis'): self.eval_analysis.x_vals = self._x_vals def _get_caches(self): ''' Separate cache instances''' hash_lambd = self._get_evaluator_hash_name(include_x_vals=False) cache_lambd = EvaluatorCache(hash_lambd, 'cache_lambd') hash_eval = self._get_evaluator_hash_name(include_x_vals=True) cache_eval = EvaluatorCache(hash_eval, 'cache_eval') return cache_lambd, cache_eval @property def df_x_vals(self): return self._df_x_vals @df_x_vals.setter def df_x_vals(self, df_x_vals): self._df_x_vals = df_x_vals.reset_index(drop=True) # updates must happen due to changes to both x_vals and df_vals self.cache_lambd, self.cache_eval = self._get_caches() if hasattr(self, 'expander'): self.expander.df_x_vals = self._df_x_vals def _get_list_dep_var(self, skip_multipliers=False): list_dep_var = ['tc'] list_dep_var += list(map(str, self.model.variables('symb'))) # including supply constraint even if skip_multipliers list_dep_var += [mlt for mlt in map(str, self.model.constraints('mlt')) if (('supply' in mlt) # only keep supply if skip_multipliers else True)] # keep all if skip_multipliers: # excluding supply constraint even if list_dep_var = [v for v in list_dep_var if (not 'lb_' in v and not 'pi_' in v) or 'supply' in v] return list_dep_var def get_evaluated_lambdas(self, skip_multipliers=True): ''' For each dependent variable and total cost get a lambda function evaluated by constant parameters. This subsequently evaluated for all x_pos. Generated attributes: - df_lam_func: Holds all lambda functions for each dependent variable and each constraint combination. ''' # get dependent variables (variabs and multips) list_dep_var = self._get_list_dep_var(skip_multipliers) slct_eq = ('n_p_day' if 'n_p_day' in list_dep_var else list_dep_var[0]) for slct_eq in list_dep_var: logger.info('Generating lambda functions for %s.'%slct_eq) if slct_eq != 'tc': # function idx depends on constraint, since not all constraints # contain the same functions get_func = lambda x: self._get_func_from_idx(x, slct_eq) self.dfev.loc[:, slct_eq] = self.dfev.apply(get_func, axis=1) logger.debug('substituting...') expr_plot = self.dfev[slct_eq].apply(self._subs_param_values) lambdify = lambda res_plot: sp.lambdify(self.x_symb, res_plot, modules=['numpy'], dummify=False) logger.debug('lambdify...') self.dfev.loc[:, slct_eq] = expr_plot.apply(lambdify) logger.debug('done.') idx = ['idx'] df_lam_func = self.dfev.set_index(idx).copy()[list_dep_var] col_names = {'level_1': 'func', 0: 'lambd_func'} df_lam_func = (df_lam_func.stack().reset_index() .rename(columns=col_names)) df_lam_func = (df_lam_func.reset_index(drop=True) .reset_index()) df_lam_func = df_lam_func.join(self.model.df_comb.set_index('idx')[self.model.constrs_cols_neq], on='idx') df_lam_func = df_lam_func.set_index(self.model.constrs_cols_neq + ['func', 'idx']) self.df_lam_func = df_lam_func def _get_func_from_idx(self, x, slct_eq): ''' Get result expression corresponding to the selected variable slct_eq. From the result set of row x, get the expression corresponding to the selected variable/multiplier slct_eq. This first finds the index of the corresponding expression through comparison with slct_eq and then returns the expression itself. ''' if (slct_eq in x.variabs_multips and not isinstance(x.result, sp.sets.EmptySet)): idx = x.variabs_multips.index(slct_eq) func = x.result[idx] return func # ============================================================================= # ============================================================================= # def _expand_dfev(self, slct_eq): # ''' Returns the dfev DataFrame for a single var/mlt slct_eq. ''' # # MP_COUNTER.increment() # # df = self.dfev # # get_func = partial(self._get_func_from_idx, slct_eq=slct_eq) # if slct_eq != 'tc': # df['expr'] = df.apply(get_func, axis=1) # else: # df['expr'] = df.tc # # df['func'] = slct_eq # # return df[['idx', 'expr', 'func']] # # # def _call_expand_dfev(self, lst_slct_eq): # ''' Note: here the df argument of the parallelization.parallelize_df # function is a list of strings, for each of which the whole self.dfev # is evaluated. ''' # # return [self._expand_dfev(slct_eq) for slct_eq in lst_slct_eq] # # # def _wrapper_call_expand_dfev(self, lst_slct_eq): # # name, ntot = 'Expand by variable/multiplier', self.nparallel # return log_time_progress(self._call_expand_dfev)(self, lst_slct_eq, # name, ntot) # ============================================================================= # ============================================================================= def _lambdify(self, expr): ''' Convert sympy expressions to function strings. ''' MP_COUNTER.increment() return lambdastr(args=self.x_symb, expr=self._subs_param_values(expr), dummify=False) def _make_hash(self, func_str): ''' Generate function hash from function string. The idea is to avoid multiple definitions of identical functions which return e.g. constant zero. Not using @digest decorator due to leading "_" ''' # salt = str(random.randint(0, 1e12)) return '_' + md5((func_str).encode('utf-8')).hexdigest() def _call_lambdify(self, df): df['func_str'] = df.expr.apply(self._lambdify) df['func_hash'] = df.func_str.apply(self._make_hash) return df def _wrapper_call_lambdify(self, df): name, ntot = 'Lambdify expressions', self.nparallel return log_time_progress(self._call_lambdify)(self, df, name, ntot) # ============================================================================= # ============================================================================= # ============================================================================= # ============================================================================= def _replace_func_str_name(self, x): ''' Convert func_str to top level function strings using the names defined by func_hash. ''' func_str = x.func_str func_hash = x.func_hash x_name_str = self.x_name_str func_str_new = ('def ' + func_hash + x_name_str + ':\n return' + func_str[len(x_name_str) + 7:]) return func_str_new def _expand_results_df(self, df, skip_multipliers): ''' Expands to result lists to rows. * zips the variable/multiplier names and the results * adds the total cost * explodes the resulting tuple Parameters ---------- df : pandas.DataFrame cols ["variabs_multips", "result", "idx", "tc"] skip_multipliers : bool only include supply constraint multipliers if False, skip others ''' list_dep_var = self._get_list_dep_var(skip_multipliers) df['result_sep'] = df.apply(lambda x: tuple((key, val) for key, val in zip(x.variabs_multips, x.result) if key in list_dep_var) + (('tc', x.tc),), axis=1) df = df[['idx', 'result_sep']].explode(column='result_sep') df[['func', 'expr']] = pd.DataFrame(df.result_sep.tolist(), index=df.index) df.drop('result_sep', axis=1, inplace=True) logger.info('Length expanded function DataFrame: %d' % len(df)) return df @staticmethod def _write_import_function_module(fn, list_func): ''' Write and import ad-hoc module containing evaluated functions. Parameters ---------- list_func Returns handle of loaded module. ''' try: os.remove(fn) except Exception as e : logger.debug(e) module_str = '\n'.join(list_func) module_str = f'from numpy import sqrt\n\n{module_str}' if not os.path.isdir(os.path.dirname(fn)): os.mkdir(os.path.dirname(fn)) with open(fn , "w") as f: f.write(module_str) py_compile.compile(fn) et = __import__(os.path.basename(fn).replace('.py', ''), level=0, globals={"__name__": __name__}) logger.info(f'Imported temporary module from file {et.__file__}') return et
[docs] def get_evaluated_lambdas_parallel(self, skip_multipliers=True): ''' For each model variable and constraint combination, generate a function evaluated by all constant parameter values, but *not* by the varied values of the ``x_vals`` table. This results in a DataFrame attribute ``df_lam_func``, which is subsequently used (:func:`symenergy.evaluator.evaluator.Evaluator.expand_to_x_vals_parallel`) to generate the numerical values for variables and multipliers. Continuing the example from :class:`symenergy.evaluator.evaluator.Evaluator`: .. code-block:: python >>> import inspect >>> ev.cache_lambd.delete() >>> ev.get_evaluated_lambdas_parallel() >>> func = (ev.df_lam_func.set_index(['idx', 'func']) .loc[(4, 'g_p_day')] .lambd_func) >>> print(inspect.getsource(func)) def _ef0ba06865119477ac40bc0b40038a25(vre_scale_none,C_n_none): return(-C_n_none - 4500*vre_scale_none + 4500) In the example above the generated example function thus depends on the parameters with names ``vre_scale_none`` and ``C_n_none`` (the model VRE scale and the power plant capacity, as specified through the ``x_vals`` attribute). **Note**: The DataFrame ``ev.df_lam_func`` is typically only used internally. Instead, access the tables * ``Model.df_comb`` to obtain the result expressions for the model variables * ``Evaluator.df_exp`` to obtain the fully evaluated numerical solutions ''' sys.path.append(os.path.dirname(self.fn_temp_module)) if self.cache_lambd.file_exists: self.df_lam_func = self.cache_lambd.load() return dfev_exp = self._expand_results_df(self.dfev, skip_multipliers) self.nparallel = len(dfev_exp) dfev_func_str = parallelize_df(dfev_exp, self._wrapper_call_lambdify) # get unique function strings with function names from hashes list_func = (dfev_func_str[['func_hash', 'func_str']].drop_duplicates() .apply(self._replace_func_str_name, axis=1)) logger.info('Number unique function strings: %d'%len(list_func)) # write to and read from module et = self._write_import_function_module(self.fn_temp_module, list_func) # retrieve eval_temp functions based on hash name dfev_func_str['lambd_func'] = ( dfev_func_str.func_hash.apply(lambda x: getattr(et, x))) self.df_lam_func = (dfev_func_str[['func', 'lambd_func', 'idx']] .reset_index(drop=True)) self.cache_lambd.write(self.df_lam_func)
@hexdigest def _get_evaluator_hash_name(self, include_x_vals=False): hash_input = str(self.x_name) if include_x_vals: hash_input += str(pd.util.hash_pandas_object(self.df_x_vals.T, index=False).values) hash_input += str(self.drop_non_optimum == True) hash_input += str(self.model.get_model_hash_name()) logger.debug(f'hash_input for include_x_vals={include_x_vals}: {hash_input}') return hash_input def _init_constraints_active(self, df): ''' Create binary columns depending on whether the constraints for each particular variable are active or not. ''' def set_constr(x, lst): return (1 if x in map(str, getattr(self, lst)) else 0) lst = 'is_positive' for lst in ['is_positive']: constr_act = (df.func.apply(lambda x: set_constr(x, lst))) df[lst] = constr_act return df
[docs] def expand_to_x_vals_parallel(self): ''' Generates generates a table indexed by: * model variable/multiplier (column ``func``) * constraint combination (columns ``idx``) * varied parameters (columns specified by the list ``Evaluator.x_name``) with all numerically evaluated values of functions and multipliers. Other key columns are: * ``lambd`` numerical value * ``is_optimum``: boolean; if Evaluator was initialized with ``drop_non_optimum=True``, all non-optimal rows are dropped * ``mask_valid``: indicates whether the constraint combination yields valid results under the corresponding parameter values; see documentation section :ref:`label_theory_minimal` for explanations on infeasible constraint combinations by parameter values. Continuing the example from :func:`symenergy.evaluator.evaluator.Evaluator.get_evaluated_lambdas_parallel`: .. code-block:: python >>> ev.expand_to_x_vals_parallel() >>> (ev.df_exp.query('is_optimum') .set_index(['func', 'idx'] + ev.x_name)[ ['lambd', 'is_optimum', 'mask_valid']]).head() lambd is_optimum mask_valid func idx vre_scale_none C_n_none curt_p_day 1 1.0 0 0.0 True True g_p_day 1 1.0 0 0.0 True True n_p_day 1 1.0 0 0.0 True True pi_supply_day 1 1.0 0 0.0 True True curt_p_day 2 0.0 0 0.0 True True ... **Note:** Under some circumstances the serial evaluation is overall faster than the parallel approach. Serial evaluation is obtained by setting the SymEnergy multiprocessing *nworkers* parameter to ``None``: .. code-block:: python >>> from symenergy.auxiliary.parallelization import multiproc_params >>> multiproc_params('nworkers') = None ''' if self.cache_eval.file_exists: logger.debug('expand_to_x_vals_parallel: file ' f'{self.cache_eval.fn} found.') self.df_exp = self.cache_eval.load() else: logger.debug('expand_to_x_vals_parallel: NOT FOUND file ' f'{self.cache_eval.fn}.') cpos = self.model.constraints('col', is_positivity_constraint=True) ccap = self.model.constraints('col', is_capacity_constraint=True) self.df_lam_func = (self.df_lam_func .join(self.model.df_comb.set_index('idx')[ cpos + ccap], on='idx')) self.df_lam_func = self._init_constraints_active(self.df_lam_func) df_result = self.expander.run(self.df_lam_func) self.df_exp = self.eval_analysis.run(df_result) self.cache_eval.write(self.df_exp) self._map_func_to_slot() self.build_supply_table()
def _get_x_vals_combs(self): ''' Generates dataframe with all combinations of x_vals. Used as default by expand_to_x_vals or can be used externally to select subsets of ''' return pd.DataFrame(list(itertools.product(*self.x_vals.values())), columns=[col.name for col in self.x_vals.keys()]) def _get_param_values(self): ''' Initialize dict attribute defining fixed parameter values, i.e. of all parameters not in `self.x_vals`. ''' dict_param_values = self.model.parameters.to_dict({'symb': 'value'}) dict_param_values = {kk: vv for kk, vv in dict_param_values.items() if not kk in [x.symb for x in self.x_vals]} return dict_param_values def _subs_param_values(self, x): ''' Substitutes all parameter values except for the one selected as independent variables. ''' if isinstance(x, float) and np.isnan(x): return np.nan else: x_ret = x.subs(self.dict_param_values) return x_ret # def get_full_mask_valid(self, slct_idx): # # df_slct = self.df_exp.query('idx in %s' % str(slct_idx)) # # return self._get_mask_valid_solutions(df=df_slct, return_full=True) # ============================================================================= # ============================================================================= # ============================================================================= # ============================================================================= # ============================================================================= # ============================================================================= # ============================================================================= # ============================================================================= def expand_to_x_vals(self, verbose=True): ''' Applies evaluate_by_x to all df_x_vals rows. * by_x_vals -- if True: expand x_vals for all const_combs/func if False: expand const_combs/func for all x_vals ''' # keeping pos cols to sanitize zero equality constraints cols_pos = self.model.constraints('col', is_positivity_constraint=True) # keeping cap cols to sanitize cap equality constraints cols_cap = self.model.constraints('col', is_capacity_constraint=True) keep_cols = (['func', 'lambd_func', 'idx'] + cols_pos + cols_cap) df_lam_func = self.df_lam_func.reset_index()[keep_cols] df_lam_func = self._init_constraints_active(df_lam_func) df_x = self.df_x_vals df_lam = df_lam_func logger.debug('_call_eval') t = time.time() df_result = (df_lam.groupby(['func', 'idx']) .lambd_func .apply(_eval, df_x=df_x)) df_result = df_result.rename(columns={0: 'lambd'}) logger.debug('done _call_eval in %fs, length df_lam %d, length df_x %d'%(time.time() - t, len(self.df_lam_func), len(self.df_x_vals))) logger.debug('expand_to_x_vals_parallel intermediate') t = time.time() cols = [c for c in df_lam.columns if c.startswith('act_')] + ['is_positive'] ind = ['func', 'idx'] df_result = df_result.reset_index().join(df_lam.set_index(ind)[cols], on=ind) logger.debug('done expand_to_x_vals_parallel intermediate ' 'in %fs'%(time.time() - t)) logger.debug('_wrapper_call_evaluate_by_x_new') t = time.time() df_exp_0 = self._evaluate_by_x_new(df_result, True) df_exp_0 = df_exp_0.reset_index(drop=True) self.df_exp = df_exp_0 self.const_comb_opt = self.df_exp.loc[self.df_exp.is_optimum, 'idx' ].unique().tolist() logger.debug('done _wrapper_call_evaluate_by_x_new in %fs'%(time.time() - t)) logger.debug('_map_func_to_slot') t = time.time() self._map_func_to_slot() logger.debug('done _map_func_to_slot in %fs'%(time.time() - t)) self.build_supply_table() def build_supply_table(self, df=None): ''' Generates a table representing the supply constraint for easy plotting. ''' if not isinstance(df, pd.DataFrame): df=self.df_exp df_bal = df.loc[df.is_optimum].copy() # base dataframe: all operational variables drop = ['tc_', 'pi_', 'lb_'] df_bal = df_bal.loc[-df_bal.func.str.contains('|'.join(drop))] df_bal = df_bal[['func', 'idx', 'func_no_slot', 'slot', 'lambd'] + self.x_name] # map to pwr/erg list_erg_var = [var_e.name for store in self.model.storages.values() for var_e in store.e.values()] list_erg_func = [f for f in df_bal.func.unique() if any(f.startswith(var_e) for var_e in list_erg_var)] df_bal.loc[:, 'pwrerg'] = (df_bal.assign(pwrerg='erg').pwrerg .where(df_bal.func.isin(list_erg_func), 'pwr')) # add parameters par_add = ['l', 'vre'] pars = [getattr(slot, var) for var in par_add for slot in self.model.slots.values() if hasattr(slot, var)] pars_x = [p for p in pars if p.name in self.x_name] pars = [p for p in pars if not p.name in self.x_name] df_bal_add = pd.DataFrame(df_bal[self.x_name + ['idx']] .drop_duplicates()) for par in pars: df_bal_add.loc[:, par.name] = par.value for par in pars_x: df_bal_add.loc[:, 'y_' + par.name] = df_bal_add[par.name] df_bal_add = df_bal_add.set_index(self.x_name + ['idx']).stack().rename('lambd').reset_index() df_bal_add = df_bal_add.rename(columns={'level_%d'%(1 + len(self.x_name)): 'func'}) df_bal_add.func = df_bal_add.func.apply(lambda x: x.replace('y_', '')) df_bal_add.loc[:, 'func_no_slot'] = df_bal_add.func.apply(lambda x: '_'.join(x.split('_')[:-1])) df_bal_add.loc[:, 'slot'] = df_bal_add.func.apply(lambda x: x.split('_')[-1]) df_bal_add.loc[:, 'pwrerg'] = 'pwr' df_bal = pd.concat([df_bal, df_bal_add], axis=0, sort=True) # if ev.select_x == m.scale_vre: join to df_bal and adjust all vre if self.model.vre_scale in self.x_vals: mask_vre = df_bal.func.str.contains('vre') df_bal.loc[mask_vre, 'lambd'] *= df_bal.loc[mask_vre, 'vre_scale_none'] # negative by func_no_slot varpar_neg = ['l', 'curt_p'] df_bal.loc[df_bal.func_no_slot.isin(varpar_neg), 'lambd'] *= -1 # negative by func varpar_neg = [store.name + '_p' + chgdch + '_' + slot_name for store in self.model.storages.values() for chgdch, slots_names in store.slots_map.items() for slot_name in slots_names if chgdch == 'chg'] df_bal.loc[df_bal.func.isin(varpar_neg), 'lambd'] *= -1 self.df_bal = df_bal def drop_non_optimal_combinations(self): ''' Creates new attribute df_exp_opt with optimal constraint combs only. Note: This keeps all constraint combinations which are optimal for *some* parameter combinations. ''' constrs_opt = self.df_exp.loc[self.df_exp.is_optimum] constrs_opt = constrs_opt['const_comb'].unique().tolist() mask_opt = self.df_exp.const_comb.isin(constrs_opt) self.df_exp_opt = self.df_exp.loc[mask_opt].copy() def _map_func_to_slot(self): logger.info('Mapping model variables to time slots') func_list = self.df_exp.func.unique() slot_name_list = list(self.model.slots.keys()) slot_map = {func: '+'.join([ss for ss in slot_name_list if ss in func]) for func in func_list} func_map = {func: func.replace('_None', '').replace(slot, '') for func, slot in slot_map.items()} func_map = {func: func_new[:-1] if func_new.endswith('_') else func_new for func, func_new in func_map.items()} slot_map = {func: slot if not slot == '' else 'global' for func, slot in slot_map.items()} self.df_exp.loc[:, 'slot'] = self.df_exp['func'].replace(slot_map) self.df_exp.loc[:, 'func_no_slot'] = self.df_exp['func'].replace(func_map) def get_readable_cc_dict(self): cc_h = self.model.df_comb.set_index('const_comb')[self.model.constrs_cols_neq].copy() # cc_h_sto = cc_h.set_index('const_comb')[[c for c in cc_h.columns if 'phs' in c and ('cap' in c or 'pos' in c)]] cc_h_sto = (cc_h.act_lb_phs_pos_e_None.replace({1: 'no storage', 0: ''}) + cc_h.act_lb_phs_p_cap_C_day.replace({1: 'max storage (day)', 0: ''}) + cc_h.act_lb_phs_p_cap_C_night.replace({1: 'max storage (night)', 0: ''}) + cc_h.act_lb_phs_e_cap_E_None.replace({1: 'max storage (e)', 0: ''})) # cc_h_peak = cc_h.set_index('const_comb')[[c for c in cc_h.columns if '_g_' in c and ('pos' in c)]] cc_h_peak = (cc_h.act_lb_g_pos_p_night.replace({0: 'peak (night)', 1: 'no peak (night)'}) + cc_h.act_lb_g_pos_p_day.replace({0: 'peak (day)', 1: 'no peak (day)'})).replace({'peak (night)peak (day)': 'all peak', 'no peak (night)no peak (day)': 'no peak at all'}) # cc_h_curt = cc_h.set_index('const_comb')[[c for c in cc_h.columns if '_curt_' in c and ('pos' in c)]] cc_h_curt = (cc_h.act_lb_curt_pos_p_night.replace({0: 'curt (night)', 1: ''}) + cc_h.act_lb_curt_pos_p_day.replace({0: 'curt (day)', 1: ''})).replace({'curt (night)curt (day)': 'curtailment both'}) # cc_h_ret = cc_h.set_index('const_comb')[[c for c in cc_h.columns if '_C_ret_' in c]] cc_h_ret = (cc_h.act_lb_n_C_ret_cap_C_None.replace({1: 'maximum retirement', 0: ''}) + cc_h.act_lb_n_pos_C_ret_None.replace({1: 'no retirement', 0: ''})) # cc_h_base = cc_h.set_index('const_comb')[[c for c in cc_h.columns if '_n_' in c and not 'C_ret' in c]] cc_h_base = (cc_h.act_lb_n_pos_p_day.replace({1: 'no base (day)', 0: ''}) + cc_h.act_lb_n_p_cap_C_day.replace({1: 'max base (day)', 0: ''})) dict_cc_h = pd.concat([cc_h_sto, cc_h_peak, cc_h_curt, cc_h_ret, cc_h_base], axis=1).apply(lambda x: ' | '.join(x).replace(' | | ', ' | '), axis=1) dict_cc_h = dict_cc_h.to_dict() return dict_cc_h