Source code for src.prepare_data.task_prepare_eu_data

import gzip
from pathlib import Path

import numpy as np
import pandas as pd
import pytask

from src.config import BLD
from src.config import SRC


@pytask.mark.depends_on(
    {
        "data": SRC
        / "original_data"
        / "population_structure"
        / "eu_age_structure.tsv.gz",
    }
[docs]) @pytask.mark.produces(BLD / "data" / "population_structure" / "eu_age_structure.pkl") def task_eu_age_distribution(depends_on, produces): age_data_path = Path(depends_on["data"]) with gzip.open(age_data_path, "rb") as f: data = pd.read_csv(f, sep=",|\t", engine="python") data = data[data["sex"] == "T"][["age", r"geo\time", "2019 "]] countries = ["BE", "DE_TOT", "FI", "IT", "LU", "NL", "PL", "UK"] data = data[data[r"geo\time"].isin(countries)] data = data[~data["age"].isin(["UNK"])] total_pop = data[data["age"] == "TOTAL"].set_index(r"geo\time")["2019 "].astype(int) # Y_OPEN is people above 99 years old which was not asked in the sample data and # who are very few. age_data = data[~data["age"].isin(["TOTAL", "Y_OPEN"])].copy() age_data["age"].replace({"Y_LT1": "Y0"}, inplace=True) age_data["age"] = age_data["age"].str[1:].astype(int) age_data = age_data.set_index([r"geo\time", "age"]) age_data = age_data.unstack()["2019 "].astype(int).T age_data = age_data / total_pop age_data.columns.name = "country" age_data.to_pickle(produces)
@pytask.mark.depends_on( SRC / "original_data" / "population_structure" / "eu_hh_sizes.zip" )
[docs]@pytask.mark.produces(BLD / "data" / "population_structure" / "eu_hh_size_shares.pkl") def task_eu_hh_size_distribution(depends_on, produces): hh_size_path = Path(depends_on) with gzip.open(hh_size_path, "rb") as f: data = pd.read_csv(f, sep=",|\t", engine="python") data["hh_size"] = data["n_person"].replace({"GE6": pd.Interval(5, np.inf)}) # we only have West Germany. Use it for Germany as a whole. data["country"] = data[r"geo\time"].replace("DE", "DE_TOT") countries = ["BE", "DE_TOT", "FI", "IT", "LU", "NL", "PL", "UK"] data = data.set_index(["country", "hh_size"])["2018 "] data = data.loc[countries].sort_index().astype(float) hh_size_shares = data.unstack().T / 100 hh_size_shares.index = [ int(x) if isinstance(x, str) else x for x in hh_size_shares.index ] hh_size_shares.to_pickle(produces)