Source code for src.simulation.task_process_rapid_test_statistics

from itertools import product

import pandas as pd
import pytask

from src.config import BLD
from src.simulation.scenario_config import (
    create_path_to_rapid_test_statistic_time_series as get_ts_path,
)
from src.simulation.scenario_config import create_path_to_raw_rapid_test_statistics
from src.simulation.scenario_config import get_named_scenarios

[docs]_N_SEEDS = get_named_scenarios()["spring_baseline"]["n_seeds"]
[docs]_DEPENDENCIES = { seed: create_path_to_raw_rapid_test_statistics("spring_baseline", seed) for seed in range(_N_SEEDS)
}
[docs]CHANNELS = ["private", "work", "educ", "overall"]
[docs]OUTCOMES = [ "false_negative", "false_positive", "tested_negative", "tested_positive", "true_negative", "true_positive", "tested",
]
[docs]SHARE_TYPES = ["number", "popshare", "testshare"]
[docs]RAPID_TEST_STATISTICS = []
for out, channel, share_type in product(OUTCOMES, CHANNELS, SHARE_TYPES): RAPID_TEST_STATISTICS.append(f"{share_type}_{out}_by_{channel}")
[docs]_SINGLE_COL_PARAMETRIZATION = [ (column, get_ts_path("spring_baseline", column)) for column in RAPID_TEST_STATISTICS
] @pytask.mark.skipif(_N_SEEDS == 0, reason="spring baseline did not run.") @pytask.mark.depends_on(_DEPENDENCIES) @pytask.mark.parametrize("column, produces", _SINGLE_COL_PARAMETRIZATION)
[docs]def task_process_rapid_test_statistics(depends_on, column, produces): dfs = { seed: pd.read_csv(path, parse_dates=["date"], index_col="date") for seed, path in depends_on.items() } for df in dfs.values(): assert not df.index.duplicated().any(), ( "Duplicates in a rapid test statistic DataFrame's index. " "The csv file must be deleted before every run." ) df = pd.concat({seed: df[column] for seed, df in dfs.items()}, axis=1) df[column] = df.mean(axis=1).rolling(window=7, min_periods=1, center=False).mean() df.to_pickle(produces)
[docs]def _get_rate_parametrization(channels): rate_parametrization = [] for channel in channels: rate_parametrization += [ ( f"true_positive_rate_by_{channel}", { "numerator": get_ts_path( "spring_baseline", f"number_true_positive_by_{channel}" ), "denominator": get_ts_path( "spring_baseline", f"number_tested_positive_by_{channel}" ), }, get_ts_path("spring_baseline", f"true_positive_rate_by_{channel}"), ), ( f"false_positive_rate_by_{channel}", { "numerator": get_ts_path( "spring_baseline", f"number_false_positive_by_{channel}" ), "denominator": get_ts_path( "spring_baseline", f"number_tested_positive_by_{channel}" ), }, get_ts_path("spring_baseline", f"false_positive_rate_by_{channel}"), ), ( f"true_negative_rate_by_{channel}", { "numerator": get_ts_path( "spring_baseline", f"number_true_negative_by_{channel}" ), "denominator": get_ts_path( "spring_baseline", f"number_tested_negative_by_{channel}" ), }, get_ts_path("spring_baseline", f"true_negative_rate_by_{channel}"), ), ( f"false_negative_rate_by_{channel}", { "numerator": get_ts_path( "spring_baseline", f"number_false_negative_by_{channel}" ), "denominator": get_ts_path( "spring_baseline", f"number_tested_negative_by_{channel}" ), }, get_ts_path("spring_baseline", f"false_negative_rate_by_{channel}"), ), ] return rate_parametrization
[docs]_RATE_PARAMETRIZATION = _get_rate_parametrization(CHANNELS)
@pytask.mark.parametrize("name, depends_on, produces", _RATE_PARAMETRIZATION)
[docs]def task_create_rapid_test_statistic_ratios(name, depends_on, produces): numerator = pd.read_pickle(depends_on["numerator"]) denominator = pd.read_pickle(depends_on["denominator"]) seeds = list(range(_N_SEEDS)) rate_df = pd.DataFrame() # needed for plotting single runs for s in seeds: smooth_num = numerator[s].rolling(window=7, min_periods=1, center=False).mean() smooth_denom = ( denominator[s].rolling(window=7, min_periods=1, center=False).mean() ) rate_df[s] = smooth_num / smooth_denom # it's important to first average and smooth and **then** divide to get rid of noise # before the division. rate_df[name] = ( # use that the mean is created **after** the seeds have been added numerator[numerator.columns[-1]] / denominator[denominator.columns[-1]] ) rate_df.to_pickle(produces)
[docs]_ALL_RAPID_TEST_STATISTICS = [path for col, path in _SINGLE_COL_PARAMETRIZATION] + [ spec[-1] for spec in _RATE_PARAMETRIZATION
] @pytask.mark.depends_on(_ALL_RAPID_TEST_STATISTICS) @pytask.mark.produces(BLD / "tables" / "rapid_test_statistics.csv")
[docs]def task_create_nice_rapid_test_statistic_table_for_lookup(produces): column_names = [col for col, _ in _SINGLE_COL_PARAMETRIZATION] + [ spec[0] for spec in _RATE_PARAMETRIZATION ] assert len(set(column_names)) == len(column_names), ( "There are duplicate names in the rapid test statistic columns. " "You probably forgot to specify a channel as part of the column name." ) to_concat = [ pd.read_pickle(path)[[column]] for column, path in _SINGLE_COL_PARAMETRIZATION ] + [pd.read_pickle(path)[[column]] for column, _, path in _RATE_PARAMETRIZATION] df = pd.concat(to_concat, axis=1) df.round(4).to_csv(produces)
@pytask.mark.depends_on(_DEPENDENCIES)
[docs]def task_check_that_a_table_was_created_for_each_rapid_test_statistic(depends_on): statistics_saved_by_sid = pd.read_csv(depends_on[0]).columns to_skip = ["date", "n_individuals", "Unnamed: 0"] should_have_a_table = [x for x in statistics_saved_by_sid if x not in to_skip] assert set(should_have_a_table) == set( RAPID_TEST_STATISTICS ), "Some rapid test statistic columns that should have a table do not."