Visualizer

Source code in anomaly/visualizer.py
class Visualizer:

    def __init__(self, preprocessor, processor):
        logger.info("Initializing visualizer.")
        # It doesn't depend on ndays, always 7
        self.max_gap: int = 7
        self.threshold: float | None = THRESHOLD  # =2.5
        self.output_dir: Path = Path(OUTPUT_DIR)
        self.text_width: int = TEXT_WIDTH
        self.plot_length: int = PLOT_LENGTH

        self.window_reconstruct_width: int = (
            processor.window_reconstruct_width
        )  # WINDOW_RECONSTRUCT_WIDTH
        # Only used to tag anomalies, not get_suspicious
        self.topn: int = processor.topn  # TOPN=10
        self.write: bool = processor.write
        # Not using self.read here, since it depends on the user dates and
        # several dfs are needed
        self.start_date: datetime.date = preprocessor.start_date
        self.end_date: datetime.date = preprocessor.end_date
        self.train_start_date: datetime.date = preprocessor.train_start_date
        self.train_end_date: datetime.date = preprocessor.train_end_date
        self.ndays: int = (self.end_date - self.start_date).days
        self.long_seasoned_df: pd.DataFrame = preprocessor.long_seasoned_df
        self.long_reconstructed_df: pd.DataFrame = (
            processor.long_reconstructed_df
        )
        self.long_median_summary_df: pd.DataFrame = (
            processor.long_median_summary_df
        )

        self.anomalous_countries: list[str] | None = None  #
        self.long_reconstructed_interval_df: pd.DataFrame | None = None
        self.wide_anomaly_df: pd.DataFrame | None = None
        self.plot_end: datetime.date
        self.plot_start: datetime.date
        self.long_reconstructed_plot_df: pd.DataFrame | None = None
        self.long_seasoned_plot_df: pd.DataFrame | None = None
        self.wide_seasoned_df: pd.DataFrame | None = None
        self.wide_seasoned_plot_df: pd.DataFrame | None = None
        self.long_prepared_plot_df: pd.DataFrame | None = None
        self.anom_rect_df: pd.DataFrame | None = None
        self.long_median_df: pd.DataFrame | None = None

    def set_anomalous_countries(
        self,
        # In this function is always "mad"
        topn_function: str = "mad",
        # In this function is always None
        threshold: float | None = None,
        start_date: datetime.date | None = None,
        end_date: datetime.date | None = None,
    ) -> list[str]:
        """Return list of anomalous countries.

        Input:

        - long_reconstructed_df
        - threshold = None
        - topn_function = "mad"
        - start_date
        - end_date

        Output:

        - anomalous_countries

        """
        logger.info("Obtaining anomalous countries.")
        logger.debug("topn_function %s", topn_function)
        logger.debug("threshold %s", threshold)
        start_date = start_date or self.start_date
        end_date = end_date or self.end_date
        logger.debug(
            "Target date interval start %s end %s", start_date, end_date
        )
        if start_date is not None and end_date is not None:
            self.start_date = min(
                start_date, self.long_reconstructed_df.index.max()
            )
            self.end_date = min(
                end_date, self.long_reconstructed_df.index.max()
            )
            logger.debug(
                "Adjusted start %s end %s to reconstructed",
                self.start_date,
                self.end_date,
            )
        self.long_reconstructed_interval_df = self.long_reconstructed_df[
            (self.long_reconstructed_df.index >= self.start_date)
            & (self.long_reconstructed_df.index <= self.end_date)
        ].copy()  # [135 rows x 3 columns]
        logger.debug(
            "self.long_reconstructed_interval_df.shape d%s",
            self.long_reconstructed_interval_df.shape,
        )
        # Pivot to wide format: date ~ country
        wide_reconstructed_df: pd.DataFrame = (
            self.long_reconstructed_interval_df.pivot_table(
                index="date", columns="country", values="res"
            )
        )  # [1 rows x 135 columns]
        logger.debug(
            "wide_reconstructed_interval_df.shape d%s",
            wide_reconstructed_df.shape,
        )

        if len(wide_reconstructed_df) > 1:  # Always False
            abs_vals = wide_reconstructed_df.abs()
            scores = abs_vals.apply(lambda col: col.max()).sort_values(
                ascending=False
            )
        else:
            # Single row case
            scores = (
                wide_reconstructed_df.abs()
                .squeeze()
                .sort_values(ascending=False)
            )
            # country
            # sy    12.101697
            # tm     7.356141
            # ir     6.743156
            # sa     1.528195
            # ae     0.991938
            #         ...
            # sd     0.205191
            # il     0.144944
            # eg     0.099077
            # ph     0.027896
            # kr     0.009907

        # Always None in this function
        # Return countries based on threshold or topn
        suspicious: list[str]
        if threshold is not None:
            suspicious = scores[scores > threshold].index.tolist()
        else:
            suspicious = scores.head(self.topn).index.tolist()
        self.anomalous_countries = suspicious
        # ['sy', 'tm', 'ir', 'sa', 'ae', 'kg', 'id', 'om', 'ua', 'th']
        logger.debug("Suspicious countries: %r", self.anomalous_countries)
        return self.anomalous_countries

    def write_rankings(self, end_date: datetime.date | None = None) -> None:
        """Write anomalous countries rankings to file.

        Input:

        - end_date
        - anomalous_countries

        """
        logger.info("Writing rankings.")
        end_date = end_date or pd.to_datetime(self.end_date).strftime(
            "%Y-%m-%d"
        )
        rankings_path: Path = self.output_dir.joinpath(
            f"{end_date}-rankings.txt"
        )
        log_text: tuple[str, str] = (
            f"Most Anomalous Countries by Tor Usage:\n{end_date}\n",
            "(In order of median anomaly score over time period.)\n",
        )
        logger.info(log_text)
        countries: list[str] = translate_country_codes(
            self.anomalous_countries
        )

        text: str = "".join((f"Last {self.ndays}(s):\n=============\n",))
        # Print translated suspicious countries
        text += "\n".join(countries)
        text += "\n\n"
        log_write_file(rankings_path, text)

    def set_plot_df(self):
        """Set matrices that will be used to plot the anomalies.

        Input:

        - long_reconstructed_df
        - long_seasoned_df
        - anomalous_countries
        - plot_start
        - plot_end

        Output:

        - long_reconstructed_plot_df
        - long_seasoned_plot_df
        - long_prepared_plot_df

        """
        logger.info("Preparing plot data")
        self.plot_end = self.long_reconstructed_df.index.drop_duplicates()[-1]
        self.plot_start = self.long_reconstructed_df.index.drop_duplicates()[
            -self.plot_length - 1
        ]
        logger.debug("plot start %s end %s", self.plot_start, self.plot_end)
        # Subset dataframes with the plot dates
        if self.plot_start and self.plot_end:
            mask = (self.long_reconstructed_df.index >= self.plot_start) & (
                self.long_reconstructed_df.index <= self.plot_end
            )
            self.long_reconstructed_plot_df = (
                self.long_reconstructed_df.copy().loc[mask]
            )  # [181 rows x 3 columns] # [24435 rows x 3 columns]
            logger.debug(
                "long_reconstructed_plot_df.shape %s",
                self.long_reconstructed_plot_df.shape,
            )

            mask = (self.long_seasoned_df.index >= self.plot_start) & (
                self.long_seasoned_df.index <= self.plot_end
            )
            self.long_seasoned_plot_df = self.long_seasoned_df.copy().loc[
                mask
            ]  # [1 rows x 181 columns
            logger.debug(
                "long_seasoned_plot_df.shape %s",
                self.long_seasoned_plot_df.shape,
            )
        # pylint: disable-next=[line-too-long]
        self.long_prepared_plot_df = self.long_seasoned_plot_df[  # type: ignore
            self.long_seasoned_plot_df["country"].isin(  # type: ignore
                self.anomalous_countries
            )
        ].copy()

        self.long_prepared_plot_df[
            "country_label"
        ] = self.long_prepared_plot_df["country"].apply(
            lambda x: str_wrap(
                translate_country_codes(x), width=self.text_width
            )
        )

        logger.debug(
            "long_prepared_plot_df shape %s",
            self.long_prepared_plot_df.shape,
        )  # [1810 rows x 3 columns]
        return self.long_prepared_plot_df

    def set_anomalies(
        self,
        threshold: float = THRESHOLD,
    ) -> pd.DataFrame:
        """Detect anomalies based on median and MAD.

        Return a boolean DataFrame where True = anomaly.

        ```text
        For each country we calculate the difference between the true
        value and the reconstructed value, providing a residual error that
        was not captured by the restricted set of principal components.
        We maintain a rolling calculation of both the median observed
        residual error and the median absolute deviation of the errors for
        each country. We mark a day as anomalous if the observed resid-
        ual error falls outside of 2.5 median absolute deviations from the
        median.
        ```

        Threshold refers to a number of deviations from the median, not an
        absolute value.

        Input:

        - self.long_reconstructed_plot_df
        - self.long_median_summary_df
        - threshold
        - max_gap

        Output:

        - wide_anomaly_df

        """
        logger.info("Tagging wide_anomaly_df (robust).")
        threshold = threshold or self.threshold  # type: ignore
        logger.debug("threshold %d", threshold)

        # Merge the residuals object with the median.summary object, then
        # simply calculate for each row if the absolute value of the "res" is
        # greater than the absolute of ( "median" + threshold*"mad" )
        long_reconstructed_plot_median_df = pd.merge(
            self.long_reconstructed_plot_df.reset_index(),  # type: ignore
            self.long_median_summary_df.reset_index(),
            on=["date", "country"],
        )
        logger.debug(
            "long_reconstructed_plot_median_df.shape %s",
            long_reconstructed_plot_median_df.shape,
        )  # [24435 rows x 6 columns]

        # Calculate anomaly threshold using median and MAD
        threshold_upper = (
            long_reconstructed_plot_median_df["median"]
            + threshold * long_reconstructed_plot_median_df["mad"]
        )  # 24435
        threshold_lower = (
            long_reconstructed_plot_median_df["median"]
            - threshold * long_reconstructed_plot_median_df["mad"]
        )  # 24435

        # Assign anomalous to 1 for all days where the residual exceeds the
        # threshold
        long_reconstructed_plot_median_df["anomalous"] = (
            (long_reconstructed_plot_median_df["res"] > threshold_upper)
            | (long_reconstructed_plot_median_df["res"] < threshold_lower)
        ).astype(
            int
        )  # [24435 rows x 7 columns]

        # Pivot
        wide_reconstructed_plot_median_df = (
            long_reconstructed_plot_median_df.pivot(
                index="date", columns="country", values="anomalous"
            ).fillna(0)
        )  # [181 rows x 135 columns]

        # Apply smoothing per country column
        self.wide_anomaly_df = wide_reconstructed_plot_median_df.apply(
            lambda col: smooth_gaps(col, max_gap=self.max_gap)
        )  # [181 rows x 135 columns]
        logger.debug("wide_anomaly_df.shape %s", self.wide_anomaly_df.shape)
        return self.wide_anomaly_df

    def anomaly_rect_starts_ends(
        self, country: str
    ) -> tuple[list[int], list[int]]:
        """
        Extract start/end positions of anomalous windows using run-length
        encoding.

        Input:

        - wide_anomaly_df
        - country

        Output:

        - starts, ends

        """
        values = self.wide_anomaly_df[country].values  # type: ignore
        groups = [(k, sum(1 for _ in g)) for k, g in groupby(values)]

        cumsum = np.cumsum([length for _, length in groups])
        anomaly_idx = [i for i, (val, _) in enumerate(groups) if val]

        if not anomaly_idx:
            return [], []

        starts = [cumsum[i] - groups[i][1] for i in anomaly_idx]
        ends = [cumsum[i] - 1 for i in anomaly_idx]
        logger.debug("starts %s, ends %s", starts, ends)
        return starts, ends

    def highlight_anomalies_rect(self, country: str) -> pd.DataFrame:
        """
        Create rectangle coordinates for anomalous windows in a single country.

        Input:

        - wide_anomaly_df
        - country

        Output:

        - country's anom_rect

        """
        logger.debug("highlight_anomalies_rect")
        logger.debug("country %s", country)
        starts, ends = self.anomaly_rect_starts_ends(country)

        if not starts:
            return pd.DataFrame()

        rect: pd.DataFrame = pd.DataFrame(
            {
                "xmin": self.wide_anomaly_df.index[starts],  # type: ignore
                "xmax": self.wide_anomaly_df.index[ends],  # type: ignore
                "ymin": -np.inf,
                "ymax": np.inf,
                "country": country,
            }
        )
        logger.debug("rect.shape %s", rect.shape)
        return rect

    def highlight_anomalies(
        self,
    ) -> pd.DataFrame:
        """Highlight anomalies for multiple countries.

        Input:

        - anomalous_countries

        Output:

        - anom_rect_df

        """
        logger.info("Highlighting anomalies.")
        rects = [
            self.highlight_anomalies_rect(country)
            for country in self.anomalous_countries  # type: ignore
        ]

        rects = [r for r in rects if not r.empty]
        self.anom_rect_df = (
            pd.concat(rects, ignore_index=True) if rects else pd.DataFrame()
        )
        logger.debug("self.anom_rect_df.shape %s", self.anom_rect_df.shape)
        return self.anom_rect_df

    def set_median(
        self,
    ):
        """Median values to plot.

        Input:

        - long_seasoned_plot_df
        - anomalous_countries

        Output:

        - long_median_df

        """
        logger.info("Setting median")
        long_median_df = self.long_seasoned_plot_df.loc[  # type: ignore
            (
                self.long_seasoned_plot_df.index  # type: ignore
                >= self.start_date  # Adjusted date to reconstructed
            )
            & (
                self.long_seasoned_plot_df.index  # type: ignore
                <= self.end_date
            )
        ].copy()
        long_median_df = long_median_df.query(
            "country in @self.anomalous_countries"
        )
        self.long_median_df = long_median_df.groupby(
            "country", as_index=False
        )["users"].median()
        self.long_median_df.columns = ["country", "median_value"]
        logger.debug("long_median_df.shape %s", self.long_median_df.shape)
        return self.long_median_df

    def plot(self, linesize: float = 0.3):
        """Plot graphs for the most suspicious countries for the user dates.

        For this function, the threshold is in numbers of median average
        deviations from the median, rather than an absolute value:

        - 2 allows lots of errors to be detected, with higher false positive
        rates.
        - 2.5 is more conservative.
        - 3 is highly conservative.

        Input:

        - linesize
        - anomalous_countries
        - long_prepared_plot_df
        - anom_rect_df
        - long_median_df

        Output:

        - fig, axes

        """

        # Create subplots for each country
        n_countries: int = len(self.anomalous_countries)  # type: ignore
        fig, axes = plt.subplots(
            nrows=n_countries,
            ncols=1,
            sharex=True,  # Share x axis (dates)
            figsize=(10, 3 * n_countries),
        )
        axes = axes.flatten()  # In case there's only one

        # Convert to dict for easy access
        axes_dict = dict(zip(self.anomalous_countries, axes))  # type: ignore

        # Set target countries
        for country in self.anomalous_countries:  # type: ignore
            ax = axes_dict[country]
            data = self.long_prepared_plot_df[  # type: ignore
                self.long_prepared_plot_df["country"]  # type: ignore
                == country  # type: ignore
            ]
            ax.plot(
                data.index,
                data["users"],
                color="blue",
                linewidth=linesize,
                label=country,
            )
            _ymin, ymax = ax.get_ylim()
            ax.set_yticks([0, ymax])  # Show only 0 and max value
            # ax.set_ylabel("Estimated Daily Users")
            ax.tick_params(axis="y", labelsize=8)
            ax.set_title(
                data["country_label"].iloc[0], fontsize=8, loc="right"
            )

            # Add anomaly rectangles if they exist
            if not self.anom_rect_df.empty:  # type: ignore
                country_anoms = self.anom_rect_df[  # type: ignore
                    self.anom_rect_df["country"] == country  # type: ignore
                ]
                for _, row in country_anoms.iterrows():
                    ax.axvspan(
                        xmin=row["xmin"],
                        xmax=row["xmax"],
                        ymin=0,
                        ymax=1,
                        color="grey",
                        alpha=0.5,
                        linewidth=0,
                        zorder=0,
                    )

            # Add median horizontal line if available
            if not self.long_median_df.empty:  # type: ignore
                median_row = self.long_median_df[  # type: ignore
                    self.long_median_df["country"] == country  # type: ignore
                ]
                if not median_row.empty:
                    ax.axhline(
                        y=median_row["median_value"].iloc[0],
                        color="#56B4E9",
                        linestyle="dotted",
                        zorder=1,
                    )

        # Set x-label on last axis
        axes[-1].set_xlabel("Date")
        fig.suptitle("Anomalous Periods in Tor Usage\n", fontsize=12)

        # Adjust layout
        plt.tight_layout()

        # plt.subplots_adjust(hspace=0.3)  # space between subplots
        plt.subplots_adjust(top=0.93, hspace=0.4)
        return (fig, axes)

    def save_plot(self, fig, axes) -> sns:
        logger.info("Plotting recent suspicious")

        # Remove y-tick labels and ticks
        for ax in axes:
            ax.tick_params(axis="y", which="both", labelleft=False, left=False)

        # Save plot
        fname = self.output_dir.joinpath(f"{self.end_date}-plot.svg")
        plt.savefig(fname, dpi=100, bbox_inches="tight")
        plt.show()
        plt.close()
        logger.info("Plotted recent suspicious %s", fname)

        return fig

    def write_csv(self) -> None:
        if not self.write:
            return
        # pylint: disable-next=[line-too-long]
        start_str: str = self.train_start_date.strftime("%Y-%m-%d")  # type: ignore
        end_str: str = self.train_end_date.strftime("%Y-%m-%d")  # type: ignore
        # pylint: disable-next=[duplicate-code]
        write_csv_subdir(
            self.long_prepared_plot_df,
            "long_prepared_plot",
            start_str,
            end_str,
        )
        write_csv_subdir(
            self.wide_anomaly_df, "wide_anomaly", start_str, end_str
        )

    def run(
        self,
    ):
        # Ensure output directory exists
        os.makedirs(self.output_dir, exist_ok=True)
        self.set_anomalous_countries()  # "mad", None, start_date, end_date)
        self.write_rankings()  # end_date)
        self.set_plot_df()

        # Detect anomalies
        self.set_anomalies()
        self.highlight_anomalies()
        self.set_median()

        # Generate plot
        fig, axes = self.plot()
        self.save_plot(fig, axes)
        self.write_csv()

anomaly_rect_starts_ends(country)

Extract start/end positions of anomalous windows using run-length encoding.

Input:

  • wide_anomaly_df
  • country

Output:

  • starts, ends
Source code in anomaly/visualizer.py
def anomaly_rect_starts_ends(
    self, country: str
) -> tuple[list[int], list[int]]:
    """
    Extract start/end positions of anomalous windows using run-length
    encoding.

    Input:

    - wide_anomaly_df
    - country

    Output:

    - starts, ends

    """
    values = self.wide_anomaly_df[country].values  # type: ignore
    groups = [(k, sum(1 for _ in g)) for k, g in groupby(values)]

    cumsum = np.cumsum([length for _, length in groups])
    anomaly_idx = [i for i, (val, _) in enumerate(groups) if val]

    if not anomaly_idx:
        return [], []

    starts = [cumsum[i] - groups[i][1] for i in anomaly_idx]
    ends = [cumsum[i] - 1 for i in anomaly_idx]
    logger.debug("starts %s, ends %s", starts, ends)
    return starts, ends

highlight_anomalies()

Highlight anomalies for multiple countries.

Input:

  • anomalous_countries

Output:

  • anom_rect_df
Source code in anomaly/visualizer.py
def highlight_anomalies(
    self,
) -> pd.DataFrame:
    """Highlight anomalies for multiple countries.

    Input:

    - anomalous_countries

    Output:

    - anom_rect_df

    """
    logger.info("Highlighting anomalies.")
    rects = [
        self.highlight_anomalies_rect(country)
        for country in self.anomalous_countries  # type: ignore
    ]

    rects = [r for r in rects if not r.empty]
    self.anom_rect_df = (
        pd.concat(rects, ignore_index=True) if rects else pd.DataFrame()
    )
    logger.debug("self.anom_rect_df.shape %s", self.anom_rect_df.shape)
    return self.anom_rect_df

highlight_anomalies_rect(country)

Create rectangle coordinates for anomalous windows in a single country.

Input:

  • wide_anomaly_df
  • country

Output:

  • country's anom_rect
Source code in anomaly/visualizer.py
def highlight_anomalies_rect(self, country: str) -> pd.DataFrame:
    """
    Create rectangle coordinates for anomalous windows in a single country.

    Input:

    - wide_anomaly_df
    - country

    Output:

    - country's anom_rect

    """
    logger.debug("highlight_anomalies_rect")
    logger.debug("country %s", country)
    starts, ends = self.anomaly_rect_starts_ends(country)

    if not starts:
        return pd.DataFrame()

    rect: pd.DataFrame = pd.DataFrame(
        {
            "xmin": self.wide_anomaly_df.index[starts],  # type: ignore
            "xmax": self.wide_anomaly_df.index[ends],  # type: ignore
            "ymin": -np.inf,
            "ymax": np.inf,
            "country": country,
        }
    )
    logger.debug("rect.shape %s", rect.shape)
    return rect

plot(linesize=0.3)

Plot graphs for the most suspicious countries for the user dates.

For this function, the threshold is in numbers of median average deviations from the median, rather than an absolute value:

  • 2 allows lots of errors to be detected, with higher false positive rates.
  • 2.5 is more conservative.
  • 3 is highly conservative.

Input:

  • linesize
  • anomalous_countries
  • long_prepared_plot_df
  • anom_rect_df
  • long_median_df

Output:

  • fig, axes
Source code in anomaly/visualizer.py
def plot(self, linesize: float = 0.3):
    """Plot graphs for the most suspicious countries for the user dates.

    For this function, the threshold is in numbers of median average
    deviations from the median, rather than an absolute value:

    - 2 allows lots of errors to be detected, with higher false positive
    rates.
    - 2.5 is more conservative.
    - 3 is highly conservative.

    Input:

    - linesize
    - anomalous_countries
    - long_prepared_plot_df
    - anom_rect_df
    - long_median_df

    Output:

    - fig, axes

    """

    # Create subplots for each country
    n_countries: int = len(self.anomalous_countries)  # type: ignore
    fig, axes = plt.subplots(
        nrows=n_countries,
        ncols=1,
        sharex=True,  # Share x axis (dates)
        figsize=(10, 3 * n_countries),
    )
    axes = axes.flatten()  # In case there's only one

    # Convert to dict for easy access
    axes_dict = dict(zip(self.anomalous_countries, axes))  # type: ignore

    # Set target countries
    for country in self.anomalous_countries:  # type: ignore
        ax = axes_dict[country]
        data = self.long_prepared_plot_df[  # type: ignore
            self.long_prepared_plot_df["country"]  # type: ignore
            == country  # type: ignore
        ]
        ax.plot(
            data.index,
            data["users"],
            color="blue",
            linewidth=linesize,
            label=country,
        )
        _ymin, ymax = ax.get_ylim()
        ax.set_yticks([0, ymax])  # Show only 0 and max value
        # ax.set_ylabel("Estimated Daily Users")
        ax.tick_params(axis="y", labelsize=8)
        ax.set_title(
            data["country_label"].iloc[0], fontsize=8, loc="right"
        )

        # Add anomaly rectangles if they exist
        if not self.anom_rect_df.empty:  # type: ignore
            country_anoms = self.anom_rect_df[  # type: ignore
                self.anom_rect_df["country"] == country  # type: ignore
            ]
            for _, row in country_anoms.iterrows():
                ax.axvspan(
                    xmin=row["xmin"],
                    xmax=row["xmax"],
                    ymin=0,
                    ymax=1,
                    color="grey",
                    alpha=0.5,
                    linewidth=0,
                    zorder=0,
                )

        # Add median horizontal line if available
        if not self.long_median_df.empty:  # type: ignore
            median_row = self.long_median_df[  # type: ignore
                self.long_median_df["country"] == country  # type: ignore
            ]
            if not median_row.empty:
                ax.axhline(
                    y=median_row["median_value"].iloc[0],
                    color="#56B4E9",
                    linestyle="dotted",
                    zorder=1,
                )

    # Set x-label on last axis
    axes[-1].set_xlabel("Date")
    fig.suptitle("Anomalous Periods in Tor Usage\n", fontsize=12)

    # Adjust layout
    plt.tight_layout()

    # plt.subplots_adjust(hspace=0.3)  # space between subplots
    plt.subplots_adjust(top=0.93, hspace=0.4)
    return (fig, axes)

set_anomalies(threshold=THRESHOLD)

Detect anomalies based on median and MAD.

Return a boolean DataFrame where True = anomaly.

For each country we calculate the difference between the true
value and the reconstructed value, providing a residual error that
was not captured by the restricted set of principal components.
We maintain a rolling calculation of both the median observed
residual error and the median absolute deviation of the errors for
each country. We mark a day as anomalous if the observed resid-
ual error falls outside of 2.5 median absolute deviations from the
median.

Threshold refers to a number of deviations from the median, not an absolute value.

Input:

  • self.long_reconstructed_plot_df
  • self.long_median_summary_df
  • threshold
  • max_gap

Output:

  • wide_anomaly_df
Source code in anomaly/visualizer.py
def set_anomalies(
    self,
    threshold: float = THRESHOLD,
) -> pd.DataFrame:
    """Detect anomalies based on median and MAD.

    Return a boolean DataFrame where True = anomaly.

    ```text
    For each country we calculate the difference between the true
    value and the reconstructed value, providing a residual error that
    was not captured by the restricted set of principal components.
    We maintain a rolling calculation of both the median observed
    residual error and the median absolute deviation of the errors for
    each country. We mark a day as anomalous if the observed resid-
    ual error falls outside of 2.5 median absolute deviations from the
    median.
    ```

    Threshold refers to a number of deviations from the median, not an
    absolute value.

    Input:

    - self.long_reconstructed_plot_df
    - self.long_median_summary_df
    - threshold
    - max_gap

    Output:

    - wide_anomaly_df

    """
    logger.info("Tagging wide_anomaly_df (robust).")
    threshold = threshold or self.threshold  # type: ignore
    logger.debug("threshold %d", threshold)

    # Merge the residuals object with the median.summary object, then
    # simply calculate for each row if the absolute value of the "res" is
    # greater than the absolute of ( "median" + threshold*"mad" )
    long_reconstructed_plot_median_df = pd.merge(
        self.long_reconstructed_plot_df.reset_index(),  # type: ignore
        self.long_median_summary_df.reset_index(),
        on=["date", "country"],
    )
    logger.debug(
        "long_reconstructed_plot_median_df.shape %s",
        long_reconstructed_plot_median_df.shape,
    )  # [24435 rows x 6 columns]

    # Calculate anomaly threshold using median and MAD
    threshold_upper = (
        long_reconstructed_plot_median_df["median"]
        + threshold * long_reconstructed_plot_median_df["mad"]
    )  # 24435
    threshold_lower = (
        long_reconstructed_plot_median_df["median"]
        - threshold * long_reconstructed_plot_median_df["mad"]
    )  # 24435

    # Assign anomalous to 1 for all days where the residual exceeds the
    # threshold
    long_reconstructed_plot_median_df["anomalous"] = (
        (long_reconstructed_plot_median_df["res"] > threshold_upper)
        | (long_reconstructed_plot_median_df["res"] < threshold_lower)
    ).astype(
        int
    )  # [24435 rows x 7 columns]

    # Pivot
    wide_reconstructed_plot_median_df = (
        long_reconstructed_plot_median_df.pivot(
            index="date", columns="country", values="anomalous"
        ).fillna(0)
    )  # [181 rows x 135 columns]

    # Apply smoothing per country column
    self.wide_anomaly_df = wide_reconstructed_plot_median_df.apply(
        lambda col: smooth_gaps(col, max_gap=self.max_gap)
    )  # [181 rows x 135 columns]
    logger.debug("wide_anomaly_df.shape %s", self.wide_anomaly_df.shape)
    return self.wide_anomaly_df

set_anomalous_countries(topn_function='mad', threshold=None, start_date=None, end_date=None)

Return list of anomalous countries.

Input:

  • long_reconstructed_df
  • threshold = None
  • topn_function = "mad"
  • start_date
  • end_date

Output:

  • anomalous_countries
Source code in anomaly/visualizer.py
def set_anomalous_countries(
    self,
    # In this function is always "mad"
    topn_function: str = "mad",
    # In this function is always None
    threshold: float | None = None,
    start_date: datetime.date | None = None,
    end_date: datetime.date | None = None,
) -> list[str]:
    """Return list of anomalous countries.

    Input:

    - long_reconstructed_df
    - threshold = None
    - topn_function = "mad"
    - start_date
    - end_date

    Output:

    - anomalous_countries

    """
    logger.info("Obtaining anomalous countries.")
    logger.debug("topn_function %s", topn_function)
    logger.debug("threshold %s", threshold)
    start_date = start_date or self.start_date
    end_date = end_date or self.end_date
    logger.debug(
        "Target date interval start %s end %s", start_date, end_date
    )
    if start_date is not None and end_date is not None:
        self.start_date = min(
            start_date, self.long_reconstructed_df.index.max()
        )
        self.end_date = min(
            end_date, self.long_reconstructed_df.index.max()
        )
        logger.debug(
            "Adjusted start %s end %s to reconstructed",
            self.start_date,
            self.end_date,
        )
    self.long_reconstructed_interval_df = self.long_reconstructed_df[
        (self.long_reconstructed_df.index >= self.start_date)
        & (self.long_reconstructed_df.index <= self.end_date)
    ].copy()  # [135 rows x 3 columns]
    logger.debug(
        "self.long_reconstructed_interval_df.shape d%s",
        self.long_reconstructed_interval_df.shape,
    )
    # Pivot to wide format: date ~ country
    wide_reconstructed_df: pd.DataFrame = (
        self.long_reconstructed_interval_df.pivot_table(
            index="date", columns="country", values="res"
        )
    )  # [1 rows x 135 columns]
    logger.debug(
        "wide_reconstructed_interval_df.shape d%s",
        wide_reconstructed_df.shape,
    )

    if len(wide_reconstructed_df) > 1:  # Always False
        abs_vals = wide_reconstructed_df.abs()
        scores = abs_vals.apply(lambda col: col.max()).sort_values(
            ascending=False
        )
    else:
        # Single row case
        scores = (
            wide_reconstructed_df.abs()
            .squeeze()
            .sort_values(ascending=False)
        )
        # country
        # sy    12.101697
        # tm     7.356141
        # ir     6.743156
        # sa     1.528195
        # ae     0.991938
        #         ...
        # sd     0.205191
        # il     0.144944
        # eg     0.099077
        # ph     0.027896
        # kr     0.009907

    # Always None in this function
    # Return countries based on threshold or topn
    suspicious: list[str]
    if threshold is not None:
        suspicious = scores[scores > threshold].index.tolist()
    else:
        suspicious = scores.head(self.topn).index.tolist()
    self.anomalous_countries = suspicious
    # ['sy', 'tm', 'ir', 'sa', 'ae', 'kg', 'id', 'om', 'ua', 'th']
    logger.debug("Suspicious countries: %r", self.anomalous_countries)
    return self.anomalous_countries

set_median()

Median values to plot.

Input:

  • long_seasoned_plot_df
  • anomalous_countries

Output:

  • long_median_df
Source code in anomaly/visualizer.py
def set_median(
    self,
):
    """Median values to plot.

    Input:

    - long_seasoned_plot_df
    - anomalous_countries

    Output:

    - long_median_df

    """
    logger.info("Setting median")
    long_median_df = self.long_seasoned_plot_df.loc[  # type: ignore
        (
            self.long_seasoned_plot_df.index  # type: ignore
            >= self.start_date  # Adjusted date to reconstructed
        )
        & (
            self.long_seasoned_plot_df.index  # type: ignore
            <= self.end_date
        )
    ].copy()
    long_median_df = long_median_df.query(
        "country in @self.anomalous_countries"
    )
    self.long_median_df = long_median_df.groupby(
        "country", as_index=False
    )["users"].median()
    self.long_median_df.columns = ["country", "median_value"]
    logger.debug("long_median_df.shape %s", self.long_median_df.shape)
    return self.long_median_df

set_plot_df()

Set matrices that will be used to plot the anomalies.

Input:

  • long_reconstructed_df
  • long_seasoned_df
  • anomalous_countries
  • plot_start
  • plot_end

Output:

  • long_reconstructed_plot_df
  • long_seasoned_plot_df
  • long_prepared_plot_df
Source code in anomaly/visualizer.py
def set_plot_df(self):
    """Set matrices that will be used to plot the anomalies.

    Input:

    - long_reconstructed_df
    - long_seasoned_df
    - anomalous_countries
    - plot_start
    - plot_end

    Output:

    - long_reconstructed_plot_df
    - long_seasoned_plot_df
    - long_prepared_plot_df

    """
    logger.info("Preparing plot data")
    self.plot_end = self.long_reconstructed_df.index.drop_duplicates()[-1]
    self.plot_start = self.long_reconstructed_df.index.drop_duplicates()[
        -self.plot_length - 1
    ]
    logger.debug("plot start %s end %s", self.plot_start, self.plot_end)
    # Subset dataframes with the plot dates
    if self.plot_start and self.plot_end:
        mask = (self.long_reconstructed_df.index >= self.plot_start) & (
            self.long_reconstructed_df.index <= self.plot_end
        )
        self.long_reconstructed_plot_df = (
            self.long_reconstructed_df.copy().loc[mask]
        )  # [181 rows x 3 columns] # [24435 rows x 3 columns]
        logger.debug(
            "long_reconstructed_plot_df.shape %s",
            self.long_reconstructed_plot_df.shape,
        )

        mask = (self.long_seasoned_df.index >= self.plot_start) & (
            self.long_seasoned_df.index <= self.plot_end
        )
        self.long_seasoned_plot_df = self.long_seasoned_df.copy().loc[
            mask
        ]  # [1 rows x 181 columns
        logger.debug(
            "long_seasoned_plot_df.shape %s",
            self.long_seasoned_plot_df.shape,
        )
    # pylint: disable-next=[line-too-long]
    self.long_prepared_plot_df = self.long_seasoned_plot_df[  # type: ignore
        self.long_seasoned_plot_df["country"].isin(  # type: ignore
            self.anomalous_countries
        )
    ].copy()

    self.long_prepared_plot_df[
        "country_label"
    ] = self.long_prepared_plot_df["country"].apply(
        lambda x: str_wrap(
            translate_country_codes(x), width=self.text_width
        )
    )

    logger.debug(
        "long_prepared_plot_df shape %s",
        self.long_prepared_plot_df.shape,
    )  # [1810 rows x 3 columns]
    return self.long_prepared_plot_df

write_rankings(end_date=None)

Write anomalous countries rankings to file.

Input:

  • end_date
  • anomalous_countries
Source code in anomaly/visualizer.py
def write_rankings(self, end_date: datetime.date | None = None) -> None:
    """Write anomalous countries rankings to file.

    Input:

    - end_date
    - anomalous_countries

    """
    logger.info("Writing rankings.")
    end_date = end_date or pd.to_datetime(self.end_date).strftime(
        "%Y-%m-%d"
    )
    rankings_path: Path = self.output_dir.joinpath(
        f"{end_date}-rankings.txt"
    )
    log_text: tuple[str, str] = (
        f"Most Anomalous Countries by Tor Usage:\n{end_date}\n",
        "(In order of median anomaly score over time period.)\n",
    )
    logger.info(log_text)
    countries: list[str] = translate_country_codes(
        self.anomalous_countries
    )

    text: str = "".join((f"Last {self.ndays}(s):\n=============\n",))
    # Print translated suspicious countries
    text += "\n".join(countries)
    text += "\n\n"
    log_write_file(rankings_path, text)