Preprocessor

Source code in anomaly/preprocessor.py
class Preprocessor:

    def __init__(self, args: argparse.Namespace | None = None):
        self.data_dir: str = DATA_DIR
        self.url: str = URL
        self.orig_df_filename: str = ORIG_DF_FILENAME
        self.strip: bool = True

        self.min_users: int = MIN_USERS
        self.season_length: int = SEASON_LENGTH
        self.window_width: int = WINDOW_WIDTH

        self.path: Path | None = None
        self.write: bool = False
        self.read: bool = False
        self.start_date: datetime.date | None = None
        self.end_date: datetime.date | None = None
        self.train_start_date: datetime.date | None = None
        self.train_end_date: datetime.date | None = None

        self.long_initial_df: pd.DataFrame = None
        self.long_seasoned_df: pd.DataFrame = None
        self.wide_seasoned_df: pd.DataFrame = None
        self.wide_unseasoned_df: pd.DataFrame = None

        if args is not None:
            self.path = args.path
            self.write = args.write
            self.read = args.read
            self.start_date = args.start_date
            self.end_date = args.end_date
            self.train_start_date = args.train_start_date
            self.train_end_date = args.train_end_date

    def obtain_initial_df(
        self,
    ) -> pd.DataFrame:
        """Obtain the (training) data to apply the analysis.

        Either from a file or from the metrics Web.

        Eg. input:

        ```text
        date,country,users,lower,upper,frac
        2011-03-06,,1042399,,,11
        2011-03-06,??,8869,,,11
        ```

        Output:

        - long_initial_df, eg:

          ```
                  country  users
          date
          2011-09-01      ad     76
          2011-09-01      ae   5837
          ```

        """
        logger.info("Obtaining data.")
        if self.path:
            logger.info("Reading path %s", self.path)
            self.long_initial_df = read_csv(self.path)  # (575550, 2)
        else:
            # Without dates argument the first date will be `2011-03-06` and
            # the last will be the previous? day.
            if self.train_start_date:
                self.url += f"?start={self.train_start_date}"
            if self.train_end_date:
                self.url += f"&end={self.train_end_date}"
            logger.info("Reading URL %s", self.url)
            self.long_initial_df = pd.read_csv(
                self.url,
                comment="#",
                index_col="date",
                parse_dates=True,
            )
            self.long_initial_df.index = self.long_initial_df.index.date
            self.long_initial_df.index.name = "date"
        # #8: log when the args dates and df date doesn't match
        data_start_date = self.long_initial_df.index.min()  # type: ignore
        data_end_date = self.long_initial_df.index.max()  # type: ignore
        # Adjust training date interval to the available data
        self.train_start_date = data_start_date
        self.train_end_date = data_end_date
        logger.info(
            "Training data is in the date interval %s %s",
            self.train_start_date,
            self.train_end_date,
        )
        # Adjust user date interval to the available data
        self.start_date = min(
            max(self.train_start_date, self.start_date),  # type: ignore
            self.train_end_date,  # type: ignore
        )
        self.end_date = max(
            min(self.train_end_date, self.end_date),  # type: ignore
            self.train_end_date,  # type: ignore
        )
        logger.info(
            "Target data is in the date interval %s %s",
            self.start_date,
            self.end_date,
        )
        # #8: log when the args dates and actual dates doesn't match
        return self.long_initial_df

    # #9: Take relays+bridges, or just relays.
    def prepare_df(
        self,
    ) -> tuple[pd.DataFrame, pd.DataFrame]:
        """Prepare the (training) data to apply the analysis.

        In particular:

        - remove not needed columns: `lower`, `upper`, `frac`,
        - remove the total counts column
        - remove date outliers columns (`2011-03-06`)
        - remove countries outliers rows
        - remove any columns that are constant over any time window, as this
          breaks prcomp with the SCALE=TRUE parameter.
        - remove all columns (countries) with a maximum lower than the
          `MAX_USERS` number of users, to avoid the unacceptably high variance
        - remove seasonal (weekly) components from each time series by Loess,
          to later reconstruct the time series without the weekly cycle
          components.

        ```text
        Tor usage data, [...] exhibits significant seasonality,
        typically on a weekly basis, reflecting changes between usage on
        weekdays and at weekends. This continual cyclical change in usage
        can reduce the accuracy of principal component analysis due to
        varying levels of seasonality exhibited by different countries.
        We employ the Seasonal and Trend Decomposition using Loess
        (STL) method of Cleveland et al. to remove the seasonal compo-
        nent of each series, leaving the trend component and the residual
        noise as inputs to our anomaly detector
        ```

        NOTE: R `stl` and Python `statsmodels.tsa.seasonal.STL` doesn't return
        the same values with the same input.

        Input:

        - long_initial_df:

          ```
                    date country      users
          0        2011-03-06     NaN  1042399.0
          1        2011-03-06      ??     8869.0
          2        2011-03-06      a1     1441.0
          ```

        Output:

        - long_seasoned_df, eg:

          ```
                  country  users
          date
          2011-09-01      ae   5837
          2011-09-01      al    212
          ```

        - wide_unseasoned_df, eg:

          ```
          country                ae          al
          date
          2011-09-01    5619.767371  223.472308
          ```

        """
        logger.info(
            "Preparing data with min_allowed %s and window_width %s.",
            self.min_users,
            self.window_width,
        )

        # Strip outliers
        # Remove no needed columns
        # copy the original df, so it can be saved as it was
        long_initial_df = self.long_initial_df.copy()
        long_initial_df.drop(
            columns=["lower", "upper", "frac"], inplace=True
        )  # [575550 rows x 2 columns]

        wide_initial_df: pd.DataFrame = long_initial_df.pivot_table(
            index="date", columns="country", values="users"
        ).copy()  # [2403 rows x 258 columns
        # In R, this matrix will already have replaced NAN by 0s
        wide_initial_df.fillna(0, inplace=True)
        wide_initial_df = wide_initial_df.drop(
            pd.to_datetime(DATE_TO_RM).date()
        )  # [2402 rows x 258 columns]

        # Remove outliers countries rows
        wide_initial_df = wide_initial_df.drop(
            columns=[
                col for col in COLUMNS_TO_RM if col in wide_initial_df.columns
            ]
        )  # [2402 rows x 253 columns]

        # Remove last date rows
        wide_initial_df = wide_initial_df.iloc[:-1]
        # [2401 rows x 253 columns]

        # Remove constant countries
        indices = []
        for i in range(len(wide_initial_df) - self.window_width + 1):
            window_data = wide_initial_df.iloc[i : i + self.window_width]
            variances = window_data.var(axis=0, skipna=True, ddof=0)
            const_cols = np.where(variances == 0)[
                0
            ]  # indices of columns with zero variance
            indices.append(const_cols)
        # Flatten and get unique column indices
        remove_columns = sorted(
            set(idx for sublist in indices for idx in sublist)
        )
        constant_countries = wide_initial_df.columns[remove_columns].tolist()
        wide_initial_df.drop(columns=constant_countries, inplace=True)

        logger.debug("Constant countries: %s", sorted(constant_countries))
        # #10: Check why the constant countries aren't the same as in R code

        # Drop all columns (countries) with a maximum lower than the
        # max.users number  of users.
        self.wide_seasoned_df = wide_initial_df.loc[
            :, wide_initial_df.max() > self.min_users
        ].copy()
        # [2401 rows x 135 columns]

        logger.debug("wide_seasoned.shape %s", self.wide_seasoned_df.shape)
        # [2401 rows x 226 columns]

        self.long_seasoned_df = self.wide_seasoned_df.melt(
            var_name="country",
            value_name="users",
            ignore_index=False,
        )  # [324135 rows x 2 columns]
        logger.debug("long_seasoned_df.shape %s", self.long_seasoned_df.shape)

        # Remove seasonality
        new_columns = []
        for col in self.wide_seasoned_df:
            # `seasonal` argument isn't the same as R `s.window`
            stl_fit = STL(
                self.wide_seasoned_df[col],
                period=self.season_length,
                seasonal_deg=0,
                robust=True,
            ).fit()
            new_col = pd.Series(
                stl_fit.trend + stl_fit.resid,
                name=col,
                index=self.wide_seasoned_df.index,
            )
            # To plot stl per row: `stl_fit.plot(); plt.show()`
            new_columns.append(new_col)
        self.wide_unseasoned_df = pd.concat(new_columns, axis=1)
        # self.wide_unseasoned_df = self.wide_unseasoned_df.dropna()
        # [2401 rows x 135 columns]
        logger.debug(
            "wide_unseasoned_df.shape %s", self.wide_unseasoned_df.shape
        )
        return self.long_seasoned_df, self.wide_unseasoned_df

    def read_csv(self) -> tuple[pd.DataFrame, pd.DataFrame] | None:
        if not self.read:
            return None
        self.wide_unseasoned_df = read_csv_subdir(
            "wide_unseasoned",
            self.train_start_date.strftime("%Y-%m-%d"),  # type: ignore
            self.train_end_date.strftime("%Y-%m-%d"),  # type: ignore
        )
        self.long_seasoned_df = read_csv_subdir(
            "long_seasoned",
            self.train_start_date.strftime("%Y-%m-%d"),  # type: ignore
            self.train_end_date.strftime("%Y-%m-%d"),  # type: ignore
        )
        if (
            self.wide_unseasoned_df is not None
            and self.long_seasoned_df is not None
        ):
            return self.long_seasoned_df, self.wide_unseasoned_df
        logger.info("Couldn't read files, computing them.")
        return None

    def write_csv(self) -> None:
        """Write main dataframes to files.

        In order to do not compute them again later on.

        """
        if not self.write:
            return
        # pylint: disable-next=[line-too-long]
        start_str: str = self.train_start_date.strftime("%Y-%m-%d")  # type: ignore
        # pylint: disable-next=[line-too-long]
        end_str: str = self.train_end_date.strftime("%Y-%m-%d")  # type: ignore
        write_csv_subdir(
            self.long_initial_df,
            "long_initial",
            start_str,
            end_str,
        )
        write_csv_subdir(
            self.wide_seasoned_df,
            "wide_seasoned",
            start_str,
            end_str,
        )
        write_csv_subdir(
            self.long_seasoned_df,
            "long_seasoned",
            start_str,
            end_str,
        )
        write_csv_subdir(
            self.wide_unseasoned_df,
            "wide_unseasoned",
            start_str,
            end_str,
        )

    def run(
        self,
    ) -> tuple[pd.DataFrame, pd.DataFrame]:
        read = self.read_csv()
        if read is not None:
            return read
        self.obtain_initial_df()
        self.prepare_df()
        self.write_csv()
        # `wide_unseasoned_df.shape` [2401 rows x 135 columns]
        # `long_seasoned_df.shape`: (324135, 2)
        return self.long_seasoned_df, self.wide_unseasoned_df

obtain_initial_df()

Obtain the (training) data to apply the analysis.

Either from a file or from the metrics Web.

Eg. input:

date,country,users,lower,upper,frac
2011-03-06,,1042399,,,11
2011-03-06,??,8869,,,11

Output:

  • long_initial_df, eg:

country users date 2011-09-01 ad 76 2011-09-01 ae 5837

Source code in anomaly/preprocessor.py
def obtain_initial_df(
    self,
) -> pd.DataFrame:
    """Obtain the (training) data to apply the analysis.

    Either from a file or from the metrics Web.

    Eg. input:

    ```text
    date,country,users,lower,upper,frac
    2011-03-06,,1042399,,,11
    2011-03-06,??,8869,,,11
    ```

    Output:

    - long_initial_df, eg:

      ```
              country  users
      date
      2011-09-01      ad     76
      2011-09-01      ae   5837
      ```

    """
    logger.info("Obtaining data.")
    if self.path:
        logger.info("Reading path %s", self.path)
        self.long_initial_df = read_csv(self.path)  # (575550, 2)
    else:
        # Without dates argument the first date will be `2011-03-06` and
        # the last will be the previous? day.
        if self.train_start_date:
            self.url += f"?start={self.train_start_date}"
        if self.train_end_date:
            self.url += f"&end={self.train_end_date}"
        logger.info("Reading URL %s", self.url)
        self.long_initial_df = pd.read_csv(
            self.url,
            comment="#",
            index_col="date",
            parse_dates=True,
        )
        self.long_initial_df.index = self.long_initial_df.index.date
        self.long_initial_df.index.name = "date"
    # #8: log when the args dates and df date doesn't match
    data_start_date = self.long_initial_df.index.min()  # type: ignore
    data_end_date = self.long_initial_df.index.max()  # type: ignore
    # Adjust training date interval to the available data
    self.train_start_date = data_start_date
    self.train_end_date = data_end_date
    logger.info(
        "Training data is in the date interval %s %s",
        self.train_start_date,
        self.train_end_date,
    )
    # Adjust user date interval to the available data
    self.start_date = min(
        max(self.train_start_date, self.start_date),  # type: ignore
        self.train_end_date,  # type: ignore
    )
    self.end_date = max(
        min(self.train_end_date, self.end_date),  # type: ignore
        self.train_end_date,  # type: ignore
    )
    logger.info(
        "Target data is in the date interval %s %s",
        self.start_date,
        self.end_date,
    )
    # #8: log when the args dates and actual dates doesn't match
    return self.long_initial_df

prepare_df()

Prepare the (training) data to apply the analysis.

In particular:

  • remove not needed columns: lower, upper, frac,
  • remove the total counts column
  • remove date outliers columns (2011-03-06)
  • remove countries outliers rows
  • remove any columns that are constant over any time window, as this breaks prcomp with the SCALE=TRUE parameter.
  • remove all columns (countries) with a maximum lower than the MAX_USERS number of users, to avoid the unacceptably high variance
  • remove seasonal (weekly) components from each time series by Loess, to later reconstruct the time series without the weekly cycle components.
Tor usage data, [...] exhibits significant seasonality,
typically on a weekly basis, reflecting changes between usage on
weekdays and at weekends. This continual cyclical change in usage
can reduce the accuracy of principal component analysis due to
varying levels of seasonality exhibited by different countries.
We employ the Seasonal and Trend Decomposition using Loess
(STL) method of Cleveland et al. to remove the seasonal compo-
nent of each series, leaving the trend component and the residual
noise as inputs to our anomaly detector

NOTE: R stl and Python statsmodels.tsa.seasonal.STL doesn't return the same values with the same input.

Input:

  • long_initial_df:

date country users 0 2011-03-06 NaN 1042399.0 1 2011-03-06 ?? 8869.0 2 2011-03-06 a1 1441.0

Output:

  • long_seasoned_df, eg:

country users date 2011-09-01 ae 5837 2011-09-01 al 212

  • wide_unseasoned_df, eg:

country ae al date 2011-09-01 5619.767371 223.472308

Source code in anomaly/preprocessor.py
def prepare_df(
    self,
) -> tuple[pd.DataFrame, pd.DataFrame]:
    """Prepare the (training) data to apply the analysis.

    In particular:

    - remove not needed columns: `lower`, `upper`, `frac`,
    - remove the total counts column
    - remove date outliers columns (`2011-03-06`)
    - remove countries outliers rows
    - remove any columns that are constant over any time window, as this
      breaks prcomp with the SCALE=TRUE parameter.
    - remove all columns (countries) with a maximum lower than the
      `MAX_USERS` number of users, to avoid the unacceptably high variance
    - remove seasonal (weekly) components from each time series by Loess,
      to later reconstruct the time series without the weekly cycle
      components.

    ```text
    Tor usage data, [...] exhibits significant seasonality,
    typically on a weekly basis, reflecting changes between usage on
    weekdays and at weekends. This continual cyclical change in usage
    can reduce the accuracy of principal component analysis due to
    varying levels of seasonality exhibited by different countries.
    We employ the Seasonal and Trend Decomposition using Loess
    (STL) method of Cleveland et al. to remove the seasonal compo-
    nent of each series, leaving the trend component and the residual
    noise as inputs to our anomaly detector
    ```

    NOTE: R `stl` and Python `statsmodels.tsa.seasonal.STL` doesn't return
    the same values with the same input.

    Input:

    - long_initial_df:

      ```
                date country      users
      0        2011-03-06     NaN  1042399.0
      1        2011-03-06      ??     8869.0
      2        2011-03-06      a1     1441.0
      ```

    Output:

    - long_seasoned_df, eg:

      ```
              country  users
      date
      2011-09-01      ae   5837
      2011-09-01      al    212
      ```

    - wide_unseasoned_df, eg:

      ```
      country                ae          al
      date
      2011-09-01    5619.767371  223.472308
      ```

    """
    logger.info(
        "Preparing data with min_allowed %s and window_width %s.",
        self.min_users,
        self.window_width,
    )

    # Strip outliers
    # Remove no needed columns
    # copy the original df, so it can be saved as it was
    long_initial_df = self.long_initial_df.copy()
    long_initial_df.drop(
        columns=["lower", "upper", "frac"], inplace=True
    )  # [575550 rows x 2 columns]

    wide_initial_df: pd.DataFrame = long_initial_df.pivot_table(
        index="date", columns="country", values="users"
    ).copy()  # [2403 rows x 258 columns
    # In R, this matrix will already have replaced NAN by 0s
    wide_initial_df.fillna(0, inplace=True)
    wide_initial_df = wide_initial_df.drop(
        pd.to_datetime(DATE_TO_RM).date()
    )  # [2402 rows x 258 columns]

    # Remove outliers countries rows
    wide_initial_df = wide_initial_df.drop(
        columns=[
            col for col in COLUMNS_TO_RM if col in wide_initial_df.columns
        ]
    )  # [2402 rows x 253 columns]

    # Remove last date rows
    wide_initial_df = wide_initial_df.iloc[:-1]
    # [2401 rows x 253 columns]

    # Remove constant countries
    indices = []
    for i in range(len(wide_initial_df) - self.window_width + 1):
        window_data = wide_initial_df.iloc[i : i + self.window_width]
        variances = window_data.var(axis=0, skipna=True, ddof=0)
        const_cols = np.where(variances == 0)[
            0
        ]  # indices of columns with zero variance
        indices.append(const_cols)
    # Flatten and get unique column indices
    remove_columns = sorted(
        set(idx for sublist in indices for idx in sublist)
    )
    constant_countries = wide_initial_df.columns[remove_columns].tolist()
    wide_initial_df.drop(columns=constant_countries, inplace=True)

    logger.debug("Constant countries: %s", sorted(constant_countries))
    # #10: Check why the constant countries aren't the same as in R code

    # Drop all columns (countries) with a maximum lower than the
    # max.users number  of users.
    self.wide_seasoned_df = wide_initial_df.loc[
        :, wide_initial_df.max() > self.min_users
    ].copy()
    # [2401 rows x 135 columns]

    logger.debug("wide_seasoned.shape %s", self.wide_seasoned_df.shape)
    # [2401 rows x 226 columns]

    self.long_seasoned_df = self.wide_seasoned_df.melt(
        var_name="country",
        value_name="users",
        ignore_index=False,
    )  # [324135 rows x 2 columns]
    logger.debug("long_seasoned_df.shape %s", self.long_seasoned_df.shape)

    # Remove seasonality
    new_columns = []
    for col in self.wide_seasoned_df:
        # `seasonal` argument isn't the same as R `s.window`
        stl_fit = STL(
            self.wide_seasoned_df[col],
            period=self.season_length,
            seasonal_deg=0,
            robust=True,
        ).fit()
        new_col = pd.Series(
            stl_fit.trend + stl_fit.resid,
            name=col,
            index=self.wide_seasoned_df.index,
        )
        # To plot stl per row: `stl_fit.plot(); plt.show()`
        new_columns.append(new_col)
    self.wide_unseasoned_df = pd.concat(new_columns, axis=1)
    # self.wide_unseasoned_df = self.wide_unseasoned_df.dropna()
    # [2401 rows x 135 columns]
    logger.debug(
        "wide_unseasoned_df.shape %s", self.wide_unseasoned_df.shape
    )
    return self.long_seasoned_df, self.wide_unseasoned_df

write_csv()

Write main dataframes to files.

In order to do not compute them again later on.

Source code in anomaly/preprocessor.py
def write_csv(self) -> None:
    """Write main dataframes to files.

    In order to do not compute them again later on.

    """
    if not self.write:
        return
    # pylint: disable-next=[line-too-long]
    start_str: str = self.train_start_date.strftime("%Y-%m-%d")  # type: ignore
    # pylint: disable-next=[line-too-long]
    end_str: str = self.train_end_date.strftime("%Y-%m-%d")  # type: ignore
    write_csv_subdir(
        self.long_initial_df,
        "long_initial",
        start_str,
        end_str,
    )
    write_csv_subdir(
        self.wide_seasoned_df,
        "wide_seasoned",
        start_str,
        end_str,
    )
    write_csv_subdir(
        self.long_seasoned_df,
        "long_seasoned",
        start_str,
        end_str,
    )
    write_csv_subdir(
        self.wide_unseasoned_df,
        "wide_unseasoned",
        start_str,
        end_str,
    )