import warnings
from functools import partial
import pandas as pd
from sid.statistics import calculate_r_effective
from src.calculate_moments import calculate_period_outcome_sim
from src.config import BLD
from src.config import SID_DEPENDENCIES
from src.config import SRC
from src.contact_models.get_contact_models import get_all_contact_models
from src.create_initial_states.create_initial_conditions import (
create_initial_conditions,
)
from src.events.events import introduce_b117
from src.events.events import introduce_delta
from src.policies.policy_tools import combine_dictionaries
from src.simulation import scenario_simulation_inputs
from src.simulation.calculate_susceptibility import calculate_susceptibility
from src.simulation.seasonality import seasonality_model
from src.testing.shared import get_piecewise_linear_interpolation
from src.testing.testing_models import allocate_tests
from src.testing.testing_models import demand_test
from src.testing.testing_models import process_tests
[docs]def get_simulation_dependencies(debug, is_resumed):
"""Collect paths on which the simulation depends.
This contains both paths to python modules and data paths.
It only covers sid-germany specific paths, i.e. not sid.
Args:
debug (bool): Whether to use the debug initial states.
is_resumed (bool): Whether the simulation is a resumed simulation. If False the
path to the initial states from BLD / "data" are given.
Returns:
paths (dict): Dictionary with the dependencies for the simulation.
"""
out = {
**SID_DEPENDENCIES,
# to ensure that the checks on the initial states run before the
# simulations we add the output of task_check_initial_states here
# even though we don't use it.
"output_of_check_initial_states": BLD
/ "figures"
/ "data"
/ "how_well_our_synthetic_population_matches_the_german_age_distribution.pdf",
"contact_models.py": SRC / "contact_models" / "get_contact_models.py",
"contact_policies.py": SRC / "policies" / "enacted_policies.py",
"testing_models.py": SRC / "testing" / "testing_models.py",
"share_of_tests_for_symptomatics_series": BLD
/ "data"
/ "testing"
/ "share_of_tests_for_symptomatics_series.pkl",
"initial_conditions.py": SRC
/ "create_initial_states"
/ "create_initial_conditions.py",
"initial_infections.py": SRC
/ "create_initial_states"
/ "create_initial_infections.py",
"initial_immunity.py": SRC
/ "create_initial_states"
/ "create_initial_immunity.py",
"susceptibility_factor_model.py": SRC
/ "simulation"
/ "calculate_susceptibility.py",
"virus_shares": BLD / "data" / "virus_strains" / "virus_shares_dict.pkl",
"find_people_to_vaccinate.py": SRC / "policies" / "find_people_to_vaccinate.py",
"vaccination_shares": BLD
/ "data"
/ "vaccinations"
/ "vaccination_shares_extended.pkl",
"rapid_tests.py": SRC / "testing" / "rapid_tests.py",
"rapid_test_reactions.py": SRC / "testing" / "rapid_test_reactions.py",
"seasonality.py": SRC / "simulation" / "seasonality.py",
"params": BLD / "params.pkl",
"scenario_simulation_inputs.py": SRC
/ "simulation"
/ "scenario_simulation_inputs.py",
"params_scenarios.py": SRC / "simulation" / "params_scenarios.py",
"rki": BLD / "data" / "processed_time_series" / "rki.pkl",
"rki_age_groups": BLD / "data" / "population_structure" / "age_groups_rki.pkl",
"load_simulation_inputs.py": SRC / "simulation" / "load_simulation_inputs.py",
"load_params.py": SRC / "simulation" / "load_params.py",
"calculate_moments.py": SRC / "calculate_moments.py",
# not strictly necessary because changes to scenario_config would change the
# parametrization but for safety put it here
"scenario_config.py": SRC / "simulation" / "scenario_config.py",
"testing_shared.py": SRC / "testing" / "shared.py",
"policy_tools.py": SRC / "policies" / "policy_tools.py",
"create_rapid_test_statistics": SRC
/ "testing"
/ "create_rapid_test_statistics.py",
}
if not is_resumed:
if debug:
out["initial_states"] = BLD / "data" / "debug_initial_states.parquet"
else:
out["initial_states"] = BLD / "data" / "initial_states.parquet"
return out
[docs]def create_period_outputs():
period_outputs = {}
incidence_outcomes = [
"newly_infected",
"new_known_case",
"newly_deceased",
"currently_infected", # only used for share known cases
"knows_currently_infected", # only used for share known cases
"ever_vaccinated",
]
groupbys = ["state", "age_group_rki", None]
for outcome in incidence_outcomes:
for groupby in groupbys:
gb_str = f"_by_{groupby}" if groupby is not None else ""
period_outputs[outcome + gb_str] = partial(
calculate_period_outcome_sim,
outcome=outcome,
groupby=groupby,
)
period_outputs["r_effective"] = partial(calculate_r_effective, window_length=7)
period_outputs["share_b117"] = partial(calculate_period_virus_share, strain="b117")
period_outputs["share_delta"] = partial(
calculate_period_virus_share, strain="delta"
)
for groupby in groupbys:
gb_str = f"_by_{groupby}" if groupby is not None else ""
period_outputs["share_ever_rapid_test" + gb_str] = partial(
_calculate_share_ever_rapid_test, groupby=groupby
)
period_outputs["share_rapid_test_in_last_week" + gb_str] = partial(
_calculate_share_rapid_test_in_last_week, groupby=groupby
)
period_outputs["share_doing_rapid_test_today" + gb_str] = partial(
_calculate_share_doing_rapid_test, groupby=groupby
)
return period_outputs
[docs]def _calculate_share_ever_rapid_test(df, groupby):
sid_start_value = -9999
ever_tested_share_per_group = _calculate_share_rapid_test_countdown_between(
df=df,
groupby=groupby,
lower=sid_start_value + 1,
upper=0,
name="ever_had_a_rapid_test",
)
return ever_tested_share_per_group
[docs]def _calculate_share_rapid_test_in_last_week(df, groupby):
tested_last_week = _calculate_share_rapid_test_countdown_between(
df=df,
groupby=groupby,
lower=-6,
upper=0,
name="last_rapid_test_in_the_last_week",
)
return tested_last_week
[docs]def _calculate_share_rapid_test_countdown_between(df, groupby, lower, upper, name):
"""Share by groupby with their last rapid test between lower and upper (inclusively).
Args:
df (pandas.DataFrame): states DataFrame
groupby (str or None): groupby column
lower (int): lower bound (individuals with this value are included)
upper (int): upper bound (individuals with this value are included)
name (str): name of the Series to be returned
"""
if groupby is None:
groupby = []
elif isinstance(groupby, str):
groupby = [groupby]
within_interval = df["cd_received_rapid_test"].between(lower, upper, inclusive=True)
within_interval = within_interval.to_frame(name=name)
within_interval["date"] = df["date"]
if groupby:
within_interval[groupby] = df[groupby]
grouper = [pd.Grouper(key="date", freq="D")] + groupby
within_interval_share_per_group = within_interval.groupby(grouper)[name].mean()
return within_interval_share_per_group
[docs]def calculate_period_virus_share(df, strain):
df = df[["virus_strain", "date", "newly_infected"]].copy(deep=True)
df = df[df["newly_infected"]]
df[strain] = df["virus_strain"] == strain
out = df.groupby([pd.Grouper(key="date", freq="D")])[strain].mean().fillna(0)
return out
[docs]def _calculate_share_doing_rapid_test(df, groupby):
if groupby is None:
groupby = []
elif isinstance(groupby, str):
groupby = [groupby]
df = df[["date", "cd_received_rapid_test"] + groupby].copy(deep=True)
# has to be -1 because period outputs are done after countdowns are updated.
df["did_rapid_test_today"] = df["cd_received_rapid_test"] == -1
out = (
df.groupby([pd.Grouper(key="date", freq="D")] + groupby)["did_rapid_test_today"]
.mean()
.fillna(0)
)
return out