Processor

Source code in anomaly/processor.py
class Processor:

    def __init__(self, preprocessor):
        self.data_dir: str = preprocessor.data_dir  # DATA_DIR
        self.read: bool = preprocessor.read
        self.write: bool = preprocessor.write
        self.window_width: int = preprocessor.window_width  # WINDOW_WIDTH
        self.train_start_date: datetime.date = preprocessor.train_start_date
        self.train_end_date: datetime.date = preprocessor.train_end_date
        self.wide_unseasoned_df: pd.DataFrame = preprocessor.wide_unseasoned_df

        self.pcs_number: int = PCS_NUMBER
        self.window_reconstruct_width: int = WINDOW_RECONSTRUCT_WIDTH
        self.threshold: float = THRESHOLD
        self.topn: int = TOPN

        self.pca_df: pd.DataFrame | None = None
        self.long_reconstructed_df: pd.DataFrame | None = None
        self.long_median_summary_df: pd.DataFrame | None = None

    def calculate_pca_win(self, i: int) -> dict:
        """

        ```text
        For each 180-day period in the dataset we apply a principal component
        analysis over the usage time series for all countries, resulting in a
        set of components for that time window.
        ```

        Input:

        - wide_unseasoned_df
        - window_width (180)

        eg. window_df for i = 0:


        ```
        country                ae          al
        date
        2011-09-01    5619.767371  223.472308
        ...
        2012-02-29    2725.496242  117.144923
        ```
        R's `scale()` uses Bessel's correction (ddof=1, dividing by n-1), while
        StandardScaler uses ddof=0 (dividing by n).

        There is no need to store `scaler._scale`, `scaler.mean_`,
        or `pca.components_`, since storing `scaler` and `pca` the matrix can
        be reconstructed calling the inverse functions.

        Without the inverse functions, the matrices generated by R can be
        created using:
        - `scale`: `scaler.scale_ `
        - `x`: `pca.fit_transform(scaled)`
        - `rotation`: `pca.components_`
        - `center`: `scaler.mean_`

        eg. pca_win_dict:

        ```
        'scale': array([7.82486484e+02, 4.67978686e+01  # 135
        'x': array([[ 2.53382531e+01,  1.92390266e+00,  # 180 x 135 = 24300
        'sdev': array([8.89094766e+00, 3.25341552e+00,  # 135
        'rotation': array([[ 0.10952302, -0.05344188,   # 135 x 135 = 18225
        'center': array([3.54811646e+03, 1.82723558e+02,  # 135
        ```

        """
        j: int = i + self.window_width
        window_df = self.wide_unseasoned_df.iloc[i:j]  # 180 x 135
        # logger.debug("window_df.shape %s", window_df.shape)
        pca_win_dict: dict = {}
        scaler = StandardScaler()
        scaled = scaler.fit_transform(window_df)  # 180 x 135
        # Here `n_components` will be min(min(window_df.shape), window_width)
        # and has to be initialized on each iteration
        pca = PCA()
        pca_win_dict["scaler"] = scaler
        pca_win_dict["pca"] = pca
        pca_win_dict["x"] = pca.fit_transform(scaled)
        return pca_win_dict

    def calculate_pca_df(
        self,
    ) -> pd.DataFrame:
        """Perform Principal Component Analysis (PCA)

        using singular value decomposition (SVD) on the (centered and
        optionally scaled) matrix.

        ```text
        To account for developing patterns, therefore, we perform a rolling
        principal component analysis over smaller time windows within
        the series. For the purposes of our experiments, we make use of
        a 180-day window as a balance between sufficient data for useful
        principal component analysis, given the number of dimension in
        the data, against the evolution of the daily Tor metrics.
        ```

        Input:

        - wide_unseasoned_df [2401 rows x 135 columns]

        Output:

        - pca_df (2222, 3)
          - x.iloc[0] (180, 135) = 24300
          - scaler
          - pca

        """
        logger.info(
            "Calculating rolling Principal Components Analysis over dataset."
        )
        logger.debug("window_width %s", self.window_width)

        self.pca_df = pd.DataFrame(
            columns=[
                "scaler",
                "pca",
                "x",
            ]
        )

        total: int = len(self.wide_unseasoned_df) - self.window_width + 1
        # Range will go from 0 to 2221 (i = 2221, j = i + 180 = 2401)
        for i in tqdm(range(total), desc="Rolling PCA"):
            pca_win_dict: dict = self.calculate_pca_win(i)
            self.pca_df.loc[i] = [
                pca_win_dict["scaler"],
                pca_win_dict["pca"],
                pca_win_dict["x"],
            ]
        logger.debug("self.pca_df.shape %s", self.pca_df.shape)  #  (2222, 5)
        return self.pca_df

    def reconstruct_pc_win_df(
        self, pca_win_df: pd.DataFrame, pcs_array: None | np.ndarray = None
    ) -> pd.DataFrame:
        """

        We can use this to 'reconstruct' an approximation of the original data
        by applying the inverse (transpose) of the rotation matrix to the
        rotated data. (Rotations are stored in the $rotation matrix. The
        rotated data is stored in the $x matrix. So $rotation' x $x should
        reconstruct the original.
        $rotation[,<some pcs>]' x $x will make the rotation using only some
        principal components, and then the residuals should be meaningful.

        Input:

        - pca_win_df  # [1 rows x 5 columns], eg:

          ```
            0  [782.4864835661228, 46.797868643198996, 16.018...
            [1 rows x 5 columns]
          ```

        - `pcs_array`: a vector of principal components to be used for
          reconstruction (default includes all pcs)

        Output:

        - recon # [180 rows x 135 columns], eg:

          ```
          recon[180, 135] 1053.506204
          ```

        """
        # logger.info("Reconstructing")
        if pcs_array is None:
            pcs_array = np.arange(len(pca_win_df["x"][0]))  # 135

        # Reconstruct
        recon_scaled = pca_win_df.pca.iloc[0].inverse_transform(
            pca_win_df.x.iloc[0]
        )
        recon = pca_win_df.scaler.iloc[0].inverse_transform(recon_scaled)

        return recon

    def reconstruct_win_df(
        self,
        window: int = 1,
        countries: list | None = None,
    ) -> pd.DataFrame:
        """Obtain residuals using selected components over selected countries.

        ```text
        After reconstructing data from principal components, the result is
        a set of residuals that express variances in the observed data not
        captured by the current principal component model. A sufficiently
        large-scale residual represents behaviour that deviates significantly
        from previous patterns, and is thus of interest.
        ```

        ```text
        The residual errors calculated during the reconstruction accounts
        for variance in the dataset that is not expressed by the chosen
        principle components in the approximate model.
        • Positive residuals represent drops in expected Tor usage for
        a country.
        • Negative residuals represent increases in expected Tor usage
        for a country.
        • Magnitude of residuals expresses how much a country varies
        from its previous behaviour relative to other countries.
        ```

        Input:

        - pca_df
        - wide_unseasoned_df  # [2401 rows x 136 columns]
        - `window`: the starting index of the window
        - `countries`: a list of countries of interest

        Output:

        - long_residuals  # [24300 rows x 2 columns], eg:

          ```
          2012-02-29      za  0.449914
          ```
        """
        # logger.info("Reconstruct with window %s and pcs %s", window, pcs)

        # Retrieves the PC scores for a data point (row)
        pca_win_df: pd.DataFrame = self.pca_df.iloc[:window]  # type: ignore
        # [1 rows x 3 columns]

        # Reconstruct PCA data
        # `self.pcs_number` isn't used.
        recon = self.reconstruct_pc_win_df(
            pca_win_df
        )  # [180 rows x 135 columns]
        recon_df = pd.DataFrame(recon)  # [180 rows x 135 columns]

        # Add back dates
        recon_df.index = self.wide_unseasoned_df.index[
            window - 1 : window + self.window_width - 1
        ]

        # Add back columns
        recon_df.columns = countries

        # Extract the original data window
        orig_win = self.wide_unseasoned_df.iloc[
            window - 1 : window + self.window_width - 1
        ][
            countries
        ]  # [180 rows x 135 columns]

        # Calculate residuals
        # no need for `orig_win[countries]` since orig_win comes from it
        residuals = (
            recon_df - orig_win
        ) / orig_win  # [180 rows x 135 columns]

        long_residuals = residuals.melt(
            var_name="country",
            value_name="res",
            ignore_index=False,
        )
        return long_residuals  # [24300 rows x 2 columns]

    def reconstruct_df(
        self,
        # window: int = 1,
        # countries=None,
    ) -> pd.DataFrame:
        """
        ```text
        For PCA, the full set of principal components allows reconstruction of
        the full data set. As fewer components are selected, less variance in
        the original dataset is captured.
        ```
        ```text
        our experimental results suggest twelve principal components as
        broadly optimal across the dataset.
        ```

        Though the same number of components (180) is being used in practice.

        ```text
        With appropriately calculated principal components, we can
        reconstruct an approximate value for each day’s Tor usage based
        on previous behaviour.
        ```
        ```text
        we reconstruct a day’s usage based on principal components in order to
        compare against the true observed value, and thus to calculate deviance
        from prior behaviour relative to other countries.
        ```
        ```text
        Taking the true observed usage for each country for the final day of
        each window, we calculate the approximated value from the first 12
        principal components. This provides the expected value for each country
        based on previous behaviour.
        ```

        Input:

        - pcs_number
        - window
        - countries
        - wide_seasoned_df

        Output:

        - long_reconstructed_df

        """
        logger.info(
            "Reconstructing data from restricted principle components. "
            "(%s components).",
            self.pcs_number,
        )

        countries = self.wide_unseasoned_df.columns.tolist()
        # logger.debug("countries %s", countries)
        # 2222
        total: int = len(self.pca_df["x"])  # type: ignore
        logger.debug("total %s", total)
        logger.debug("window_width %s", self.window_width)
        reconstructed_cache: list = []
        # [ 135 rows x 3 columns]]

        # Reconstruct the data for the first window (180 rows for each 135 PC),
        # and note the window number.
        reconstructed_tmp = self.reconstruct_win_df(
            1, countries
        )  # [32580 rows x 2 columns]

        # Just keep the min date for each country only for window 1 (why?)
        reconstructed_tmp = reconstructed_tmp.loc[
            reconstructed_tmp.index == reconstructed_tmp.index.min()
        ]  # [135 rows x 2 columns]
        reconstructed_tmp.loc[:, "window"] = 1  # [135 rows x 3 columns]
        reconstructed_cache.insert(0, reconstructed_tmp)

        for i in tqdm(range(2, total + 1), desc="Reconstructing data"):
            # Reconstitute data for current window
            reconstructed_tmp = self.reconstruct_win_df(
                i, countries
            )  # [32580 rows x 2 columns]

            # Select row with maximum date within the current window (why?)
            reconstructed_tmp = reconstructed_tmp[
                reconstructed_tmp.index == reconstructed_tmp.index.max()
            ]  # [181 rows x 2 columns]

            # Append to cumulative list of data frames
            reconstructed_tmp.loc[:, "window"] = i  # [181 rows x 3 columns]
            reconstructed_cache.insert(i - 1, reconstructed_tmp)

        # Collapse list of data frames to a single frame.
        self.long_reconstructed_df = pd.concat(reconstructed_cache)
        # [299970 rows x 3 columns]

        # There are a small number of 'Inf' values in the data caused by
        # divisions by 0. Simply set those values to 0.
        self.long_reconstructed_df.loc[
            np.isinf(self.long_reconstructed_df["res"]), "res"
        ] = 0
        logger.debug(
            "long_reconstructed_df.shape: %s", self.long_reconstructed_df.shape
        )
        return self.long_reconstructed_df  # [910611 rows x 3 columns]

    def median_summarise(
        self,
    ) -> pd.DataFrame:
        """
        Combination of the median.error and median.absolute.deviation functions
        that returns a long combination of the two.

        Input:

        - long_reconstructed_df
        - window_reconstruct_width

        Output:

        - long_median_summary_df

        """
        logger.info("Calculating median summary.")

        # Create a wide form of the residuals dataset
        # pylint: disable-next=[line-too-long]
        wide_residuals_df = self.long_reconstructed_df.pivot_table(  # type: ignore
            index="date", columns="country", values="res", aggfunc="sum"
        )  # [2222 rows x 136 columns]
        window_width = self.window_reconstruct_width

        # (Window width must be odd.)
        if window_width % 2 == 0:
            window_width += 1
        logger.debug("window_width %s", window_width)

        # Get the rolling median
        wide_median_error_df = median_error(
            wide_residuals_df, window_width
        )  # [2180 rows x 135 columns]
        long_median_errors_df = pd.melt(
            wide_median_error_df,
            ignore_index=False,
            var_name="country",
            value_name="median",
        )

        # Get the rolling mad
        wide_mad_df = median_absolute_deviation(
            wide_residuals_df, window_width
        )
        long_mad_df = pd.melt(
            wide_mad_df,
            ignore_index=False,
            var_name="country",
            value_name="mad",
        )
        # Join the data frames
        self.long_median_summary_df = pd.merge(
            long_median_errors_df,
            long_mad_df,
            on=["date", "country"],
        )  # [294300 rows x 3 columns]

        logger.debug(
            "long_median_summary_df.shape %s",
            self.long_median_summary_df.shape,
        )
        return self.long_median_summary_df

    def list_anomalous_date(self, target_date=None):
        if target_date is None:
            target_date = self.long_reconstructed_df.index[-1]  # type: ignore

        # Subset data to given date
        long_reconstructed_df = self.long_reconstructed_df[  # type: ignore
            (self.long_reconstructed_df.index >= target_date)  # type: ignore
            & (self.long_reconstructed_df.index <= target_date)  # type: ignore
        ]

        # Merge the residuals object with the long_median_summary_df object,
        # then simply calculate for each row if the absolute value of the "res"
        # is greater than the absolute of ( "median" + threshold*"mad" )
        residual_anomaly_threshold = pd.merge(
            long_reconstructed_df, self.long_median_summary_df, how="inner"
        )

        # Create a column to tag a day/country column as 'anomalous'
        residual_anomaly_threshold["anomalous"] = 0

        # Assign anomalous to 1 for all rows where the residual exceeds the
        # threshold
        condition = abs(residual_anomaly_threshold["res"]) > (
            residual_anomaly_threshold["median"]
            + self.threshold * residual_anomaly_threshold["mad"]
        )
        residual_anomaly_threshold.loc[condition, "anomalous"] = 1

        anomalous_date = residual_anomaly_threshold.loc[
            residual_anomaly_threshold["anomalous"] == 1, "country"
        ]
        logger.info("anomalous_date %r", anomalous_date)
        return anomalous_date

    def read_csv(self) -> tuple[pd.DataFrame, pd.DataFrame] | None:
        if not self.read:
            return None
        self.long_reconstructed_df = read_csv_subdir(
            "long_reconstructed",
            self.train_start_date.strftime("%Y-%m-%d"),  # type: ignore
            self.train_end_date.strftime("%Y-%m-%d"),  # type: ignore
        )
        self.long_median_summary_df = read_csv_subdir(
            "long_median_summary",
            self.train_start_date.strftime("%Y-%m-%d"),  # type: ignore
            self.train_end_date.strftime("%Y-%m-%d"),  # type: ignore
        )
        if (
            self.long_reconstructed_df is not None
            and self.long_median_summary_df is not None
        ):
            return self.long_reconstructed_df, self.long_median_summary_df
        logger.info("Couldn't read files, computing them.")
        return None

    def write_csv(self) -> None:
        """Write main dataframes to files.

        In order to do not compute them again later on.

        """
        # pylint: disable=R0801
        if not self.write:
            return
        # pylint: disable-next=[line-too-long]
        start_str: str = self.train_start_date.strftime("%Y-%m-%d")  # type: ignore
        # pylint: disable-next=[line-too-long]
        end_str: str = self.train_end_date.strftime("%Y-%m-%d")  # type: ignore
        write_csv_subdir(
            self.long_reconstructed_df,
            "long_reconstructed",
            start_str,
            end_str,
        )
        write_csv_subdir(self.pca_df, "pca", start_str, end_str)
        write_csv_subdir(
            self.long_median_summary_df,
            "long_median_summary",
            start_str,
            end_str,
        )

    def run(
        self,
    ) -> tuple[pd.DataFrame, pd.DataFrame]:
        # pylint: disable=R0801
        read = self.read_csv()
        if read is not None:
            return read
        # Calculate the rolling PCA and clean the results
        self.calculate_pca_df()  # "dim(pca.calculated)" 2222    5
        # Reconstruct the data
        self.reconstruct_df()  # (299700, 3)

        self.median_summarise()  # (294030, 3)
        # self.list_anomalous_date()
        self.write_csv()
        return self.long_reconstructed_df, self.long_median_summary_df

calculate_pca_df()

Perform Principal Component Analysis (PCA)

using singular value decomposition (SVD) on the (centered and optionally scaled) matrix.

To account for developing patterns, therefore, we perform a rolling
principal component analysis over smaller time windows within
the series. For the purposes of our experiments, we make use of
a 180-day window as a balance between sufficient data for useful
principal component analysis, given the number of dimension in
the data, against the evolution of the daily Tor metrics.

Input:

  • wide_unseasoned_df [2401 rows x 135 columns]

Output:

  • pca_df (2222, 3)
  • x.iloc[0] (180, 135) = 24300
  • scaler
  • pca
Source code in anomaly/processor.py
def calculate_pca_df(
    self,
) -> pd.DataFrame:
    """Perform Principal Component Analysis (PCA)

    using singular value decomposition (SVD) on the (centered and
    optionally scaled) matrix.

    ```text
    To account for developing patterns, therefore, we perform a rolling
    principal component analysis over smaller time windows within
    the series. For the purposes of our experiments, we make use of
    a 180-day window as a balance between sufficient data for useful
    principal component analysis, given the number of dimension in
    the data, against the evolution of the daily Tor metrics.
    ```

    Input:

    - wide_unseasoned_df [2401 rows x 135 columns]

    Output:

    - pca_df (2222, 3)
      - x.iloc[0] (180, 135) = 24300
      - scaler
      - pca

    """
    logger.info(
        "Calculating rolling Principal Components Analysis over dataset."
    )
    logger.debug("window_width %s", self.window_width)

    self.pca_df = pd.DataFrame(
        columns=[
            "scaler",
            "pca",
            "x",
        ]
    )

    total: int = len(self.wide_unseasoned_df) - self.window_width + 1
    # Range will go from 0 to 2221 (i = 2221, j = i + 180 = 2401)
    for i in tqdm(range(total), desc="Rolling PCA"):
        pca_win_dict: dict = self.calculate_pca_win(i)
        self.pca_df.loc[i] = [
            pca_win_dict["scaler"],
            pca_win_dict["pca"],
            pca_win_dict["x"],
        ]
    logger.debug("self.pca_df.shape %s", self.pca_df.shape)  #  (2222, 5)
    return self.pca_df

calculate_pca_win(i)

For each 180-day period in the dataset we apply a principal component
analysis over the usage time series for all countries, resulting in a
set of components for that time window.

Input:

  • wide_unseasoned_df
  • window_width (180)

eg. window_df for i = 0:

country                ae          al
date
2011-09-01    5619.767371  223.472308
...
2012-02-29    2725.496242  117.144923

R's scale() uses Bessel's correction (ddof=1, dividing by n-1), while StandardScaler uses ddof=0 (dividing by n).

There is no need to store scaler._scale, scaler.mean_, or pca.components_, since storing scaler and pca the matrix can be reconstructed calling the inverse functions.

Without the inverse functions, the matrices generated by R can be created using: - scale: scaler.scale_ - x: pca.fit_transform(scaled) - rotation: pca.components_ - center: scaler.mean_

eg. pca_win_dict:

'scale': array([7.82486484e+02, 4.67978686e+01  # 135
'x': array([[ 2.53382531e+01,  1.92390266e+00,  # 180 x 135 = 24300
'sdev': array([8.89094766e+00, 3.25341552e+00,  # 135
'rotation': array([[ 0.10952302, -0.05344188,   # 135 x 135 = 18225
'center': array([3.54811646e+03, 1.82723558e+02,  # 135
Source code in anomaly/processor.py
def calculate_pca_win(self, i: int) -> dict:
    """

    ```text
    For each 180-day period in the dataset we apply a principal component
    analysis over the usage time series for all countries, resulting in a
    set of components for that time window.
    ```

    Input:

    - wide_unseasoned_df
    - window_width (180)

    eg. window_df for i = 0:


    ```
    country                ae          al
    date
    2011-09-01    5619.767371  223.472308
    ...
    2012-02-29    2725.496242  117.144923
    ```
    R's `scale()` uses Bessel's correction (ddof=1, dividing by n-1), while
    StandardScaler uses ddof=0 (dividing by n).

    There is no need to store `scaler._scale`, `scaler.mean_`,
    or `pca.components_`, since storing `scaler` and `pca` the matrix can
    be reconstructed calling the inverse functions.

    Without the inverse functions, the matrices generated by R can be
    created using:
    - `scale`: `scaler.scale_ `
    - `x`: `pca.fit_transform(scaled)`
    - `rotation`: `pca.components_`
    - `center`: `scaler.mean_`

    eg. pca_win_dict:

    ```
    'scale': array([7.82486484e+02, 4.67978686e+01  # 135
    'x': array([[ 2.53382531e+01,  1.92390266e+00,  # 180 x 135 = 24300
    'sdev': array([8.89094766e+00, 3.25341552e+00,  # 135
    'rotation': array([[ 0.10952302, -0.05344188,   # 135 x 135 = 18225
    'center': array([3.54811646e+03, 1.82723558e+02,  # 135
    ```

    """
    j: int = i + self.window_width
    window_df = self.wide_unseasoned_df.iloc[i:j]  # 180 x 135
    # logger.debug("window_df.shape %s", window_df.shape)
    pca_win_dict: dict = {}
    scaler = StandardScaler()
    scaled = scaler.fit_transform(window_df)  # 180 x 135
    # Here `n_components` will be min(min(window_df.shape), window_width)
    # and has to be initialized on each iteration
    pca = PCA()
    pca_win_dict["scaler"] = scaler
    pca_win_dict["pca"] = pca
    pca_win_dict["x"] = pca.fit_transform(scaled)
    return pca_win_dict

median_summarise()

Combination of the median.error and median.absolute.deviation functions that returns a long combination of the two.

Input:

  • long_reconstructed_df
  • window_reconstruct_width

Output:

  • long_median_summary_df
Source code in anomaly/processor.py
def median_summarise(
    self,
) -> pd.DataFrame:
    """
    Combination of the median.error and median.absolute.deviation functions
    that returns a long combination of the two.

    Input:

    - long_reconstructed_df
    - window_reconstruct_width

    Output:

    - long_median_summary_df

    """
    logger.info("Calculating median summary.")

    # Create a wide form of the residuals dataset
    # pylint: disable-next=[line-too-long]
    wide_residuals_df = self.long_reconstructed_df.pivot_table(  # type: ignore
        index="date", columns="country", values="res", aggfunc="sum"
    )  # [2222 rows x 136 columns]
    window_width = self.window_reconstruct_width

    # (Window width must be odd.)
    if window_width % 2 == 0:
        window_width += 1
    logger.debug("window_width %s", window_width)

    # Get the rolling median
    wide_median_error_df = median_error(
        wide_residuals_df, window_width
    )  # [2180 rows x 135 columns]
    long_median_errors_df = pd.melt(
        wide_median_error_df,
        ignore_index=False,
        var_name="country",
        value_name="median",
    )

    # Get the rolling mad
    wide_mad_df = median_absolute_deviation(
        wide_residuals_df, window_width
    )
    long_mad_df = pd.melt(
        wide_mad_df,
        ignore_index=False,
        var_name="country",
        value_name="mad",
    )
    # Join the data frames
    self.long_median_summary_df = pd.merge(
        long_median_errors_df,
        long_mad_df,
        on=["date", "country"],
    )  # [294300 rows x 3 columns]

    logger.debug(
        "long_median_summary_df.shape %s",
        self.long_median_summary_df.shape,
    )
    return self.long_median_summary_df

reconstruct_df()

For PCA, the full set of principal components allows reconstruction of
the full data set. As fewer components are selected, less variance in
the original dataset is captured.
our experimental results suggest twelve principal components as
broadly optimal across the dataset.

Though the same number of components (180) is being used in practice.

With appropriately calculated principal components, we can
reconstruct an approximate value for each day’s Tor usage based
on previous behaviour.
we reconstruct a day’s usage based on principal components in order to
compare against the true observed value, and thus to calculate deviance
from prior behaviour relative to other countries.
Taking the true observed usage for each country for the final day of
each window, we calculate the approximated value from the first 12
principal components. This provides the expected value for each country
based on previous behaviour.

Input:

  • pcs_number
  • window
  • countries
  • wide_seasoned_df

Output:

  • long_reconstructed_df
Source code in anomaly/processor.py
def reconstruct_df(
    self,
    # window: int = 1,
    # countries=None,
) -> pd.DataFrame:
    """
    ```text
    For PCA, the full set of principal components allows reconstruction of
    the full data set. As fewer components are selected, less variance in
    the original dataset is captured.
    ```
    ```text
    our experimental results suggest twelve principal components as
    broadly optimal across the dataset.
    ```

    Though the same number of components (180) is being used in practice.

    ```text
    With appropriately calculated principal components, we can
    reconstruct an approximate value for each day’s Tor usage based
    on previous behaviour.
    ```
    ```text
    we reconstruct a day’s usage based on principal components in order to
    compare against the true observed value, and thus to calculate deviance
    from prior behaviour relative to other countries.
    ```
    ```text
    Taking the true observed usage for each country for the final day of
    each window, we calculate the approximated value from the first 12
    principal components. This provides the expected value for each country
    based on previous behaviour.
    ```

    Input:

    - pcs_number
    - window
    - countries
    - wide_seasoned_df

    Output:

    - long_reconstructed_df

    """
    logger.info(
        "Reconstructing data from restricted principle components. "
        "(%s components).",
        self.pcs_number,
    )

    countries = self.wide_unseasoned_df.columns.tolist()
    # logger.debug("countries %s", countries)
    # 2222
    total: int = len(self.pca_df["x"])  # type: ignore
    logger.debug("total %s", total)
    logger.debug("window_width %s", self.window_width)
    reconstructed_cache: list = []
    # [ 135 rows x 3 columns]]

    # Reconstruct the data for the first window (180 rows for each 135 PC),
    # and note the window number.
    reconstructed_tmp = self.reconstruct_win_df(
        1, countries
    )  # [32580 rows x 2 columns]

    # Just keep the min date for each country only for window 1 (why?)
    reconstructed_tmp = reconstructed_tmp.loc[
        reconstructed_tmp.index == reconstructed_tmp.index.min()
    ]  # [135 rows x 2 columns]
    reconstructed_tmp.loc[:, "window"] = 1  # [135 rows x 3 columns]
    reconstructed_cache.insert(0, reconstructed_tmp)

    for i in tqdm(range(2, total + 1), desc="Reconstructing data"):
        # Reconstitute data for current window
        reconstructed_tmp = self.reconstruct_win_df(
            i, countries
        )  # [32580 rows x 2 columns]

        # Select row with maximum date within the current window (why?)
        reconstructed_tmp = reconstructed_tmp[
            reconstructed_tmp.index == reconstructed_tmp.index.max()
        ]  # [181 rows x 2 columns]

        # Append to cumulative list of data frames
        reconstructed_tmp.loc[:, "window"] = i  # [181 rows x 3 columns]
        reconstructed_cache.insert(i - 1, reconstructed_tmp)

    # Collapse list of data frames to a single frame.
    self.long_reconstructed_df = pd.concat(reconstructed_cache)
    # [299970 rows x 3 columns]

    # There are a small number of 'Inf' values in the data caused by
    # divisions by 0. Simply set those values to 0.
    self.long_reconstructed_df.loc[
        np.isinf(self.long_reconstructed_df["res"]), "res"
    ] = 0
    logger.debug(
        "long_reconstructed_df.shape: %s", self.long_reconstructed_df.shape
    )
    return self.long_reconstructed_df  # [910611 rows x 3 columns]

reconstruct_pc_win_df(pca_win_df, pcs_array=None)

We can use this to 'reconstruct' an approximation of the original data by applying the inverse (transpose) of the rotation matrix to the rotated data. (Rotations are stored in the $rotation matrix. The rotated data is stored in the $x matrix. So $rotation' x $x should reconstruct the original. $rotation[,]' x $x will make the rotation using only some principal components, and then the residuals should be meaningful.

Input:

  • pca_win_df # [1 rows x 5 columns], eg:

0 [782.4864835661228, 46.797868643198996, 16.018... [1 rows x 5 columns]

  • pcs_array: a vector of principal components to be used for reconstruction (default includes all pcs)

Output:

  • recon # [180 rows x 135 columns], eg:

recon[180, 135] 1053.506204

Source code in anomaly/processor.py
def reconstruct_pc_win_df(
    self, pca_win_df: pd.DataFrame, pcs_array: None | np.ndarray = None
) -> pd.DataFrame:
    """

    We can use this to 'reconstruct' an approximation of the original data
    by applying the inverse (transpose) of the rotation matrix to the
    rotated data. (Rotations are stored in the $rotation matrix. The
    rotated data is stored in the $x matrix. So $rotation' x $x should
    reconstruct the original.
    $rotation[,<some pcs>]' x $x will make the rotation using only some
    principal components, and then the residuals should be meaningful.

    Input:

    - pca_win_df  # [1 rows x 5 columns], eg:

      ```
        0  [782.4864835661228, 46.797868643198996, 16.018...
        [1 rows x 5 columns]
      ```

    - `pcs_array`: a vector of principal components to be used for
      reconstruction (default includes all pcs)

    Output:

    - recon # [180 rows x 135 columns], eg:

      ```
      recon[180, 135] 1053.506204
      ```

    """
    # logger.info("Reconstructing")
    if pcs_array is None:
        pcs_array = np.arange(len(pca_win_df["x"][0]))  # 135

    # Reconstruct
    recon_scaled = pca_win_df.pca.iloc[0].inverse_transform(
        pca_win_df.x.iloc[0]
    )
    recon = pca_win_df.scaler.iloc[0].inverse_transform(recon_scaled)

    return recon

reconstruct_win_df(window=1, countries=None)

Obtain residuals using selected components over selected countries.

After reconstructing data from principal components, the result is
a set of residuals that express variances in the observed data not
captured by the current principal component model. A sufficiently
large-scale residual represents behaviour that deviates significantly
from previous patterns, and is thus of interest.
The residual errors calculated during the reconstruction accounts
for variance in the dataset that is not expressed by the chosen
principle components in the approximate model.
• Positive residuals represent drops in expected Tor usage for
a country.
• Negative residuals represent increases in expected Tor usage
for a country.
• Magnitude of residuals expresses how much a country varies
from its previous behaviour relative to other countries.

Input:

  • pca_df
  • wide_unseasoned_df # [2401 rows x 136 columns]
  • window: the starting index of the window
  • countries: a list of countries of interest

Output:

  • long_residuals # [24300 rows x 2 columns], eg:

2012-02-29 za 0.449914

Source code in anomaly/processor.py
def reconstruct_win_df(
    self,
    window: int = 1,
    countries: list | None = None,
) -> pd.DataFrame:
    """Obtain residuals using selected components over selected countries.

    ```text
    After reconstructing data from principal components, the result is
    a set of residuals that express variances in the observed data not
    captured by the current principal component model. A sufficiently
    large-scale residual represents behaviour that deviates significantly
    from previous patterns, and is thus of interest.
    ```

    ```text
    The residual errors calculated during the reconstruction accounts
    for variance in the dataset that is not expressed by the chosen
    principle components in the approximate model.
    • Positive residuals represent drops in expected Tor usage for
    a country.
    • Negative residuals represent increases in expected Tor usage
    for a country.
    • Magnitude of residuals expresses how much a country varies
    from its previous behaviour relative to other countries.
    ```

    Input:

    - pca_df
    - wide_unseasoned_df  # [2401 rows x 136 columns]
    - `window`: the starting index of the window
    - `countries`: a list of countries of interest

    Output:

    - long_residuals  # [24300 rows x 2 columns], eg:

      ```
      2012-02-29      za  0.449914
      ```
    """
    # logger.info("Reconstruct with window %s and pcs %s", window, pcs)

    # Retrieves the PC scores for a data point (row)
    pca_win_df: pd.DataFrame = self.pca_df.iloc[:window]  # type: ignore
    # [1 rows x 3 columns]

    # Reconstruct PCA data
    # `self.pcs_number` isn't used.
    recon = self.reconstruct_pc_win_df(
        pca_win_df
    )  # [180 rows x 135 columns]
    recon_df = pd.DataFrame(recon)  # [180 rows x 135 columns]

    # Add back dates
    recon_df.index = self.wide_unseasoned_df.index[
        window - 1 : window + self.window_width - 1
    ]

    # Add back columns
    recon_df.columns = countries

    # Extract the original data window
    orig_win = self.wide_unseasoned_df.iloc[
        window - 1 : window + self.window_width - 1
    ][
        countries
    ]  # [180 rows x 135 columns]

    # Calculate residuals
    # no need for `orig_win[countries]` since orig_win comes from it
    residuals = (
        recon_df - orig_win
    ) / orig_win  # [180 rows x 135 columns]

    long_residuals = residuals.melt(
        var_name="country",
        value_name="res",
        ignore_index=False,
    )
    return long_residuals  # [24300 rows x 2 columns]

write_csv()

Write main dataframes to files.

In order to do not compute them again later on.

Source code in anomaly/processor.py
def write_csv(self) -> None:
    """Write main dataframes to files.

    In order to do not compute them again later on.

    """
    # pylint: disable=R0801
    if not self.write:
        return
    # pylint: disable-next=[line-too-long]
    start_str: str = self.train_start_date.strftime("%Y-%m-%d")  # type: ignore
    # pylint: disable-next=[line-too-long]
    end_str: str = self.train_end_date.strftime("%Y-%m-%d")  # type: ignore
    write_csv_subdir(
        self.long_reconstructed_df,
        "long_reconstructed",
        start_str,
        end_str,
    )
    write_csv_subdir(self.pca_df, "pca", start_str, end_str)
    write_csv_subdir(
        self.long_median_summary_df,
        "long_median_summary",
        start_str,
        end_str,
    )