Visualizer
Source code in anomaly/visualizer.py
class Visualizer:
def __init__(self, preprocessor, processor):
logger.info("Initializing visualizer.")
# It doesn't depend on ndays, always 7
self.max_gap: int = 7
self.threshold: float | None = THRESHOLD # =2.5
self.output_dir: Path = Path(OUTPUT_DIR)
self.text_width: int = TEXT_WIDTH
self.plot_length: int = PLOT_LENGTH
self.window_reconstruct_width: int = (
processor.window_reconstruct_width
) # WINDOW_RECONSTRUCT_WIDTH
# Only used to tag anomalies, not get_suspicious
self.topn: int = processor.topn # TOPN=10
self.write: bool = processor.write
# Not using self.read here, since it depends on the user dates and
# several dfs are needed
self.start_date: datetime.date = preprocessor.start_date
self.end_date: datetime.date = preprocessor.end_date
self.train_start_date: datetime.date = preprocessor.train_start_date
self.train_end_date: datetime.date = preprocessor.train_end_date
self.ndays: int = (self.end_date - self.start_date).days
self.long_seasoned_df: pd.DataFrame = preprocessor.long_seasoned_df
self.long_reconstructed_df: pd.DataFrame = (
processor.long_reconstructed_df
)
self.long_median_summary_df: pd.DataFrame = (
processor.long_median_summary_df
)
self.anomalous_countries: list[str] | None = None #
self.long_reconstructed_interval_df: pd.DataFrame | None = None
self.wide_anomaly_df: pd.DataFrame | None = None
self.plot_end: datetime.date
self.plot_start: datetime.date
self.long_reconstructed_plot_df: pd.DataFrame | None = None
self.long_seasoned_plot_df: pd.DataFrame | None = None
self.wide_seasoned_df: pd.DataFrame | None = None
self.wide_seasoned_plot_df: pd.DataFrame | None = None
self.long_prepared_plot_df: pd.DataFrame | None = None
self.anom_rect_df: pd.DataFrame | None = None
self.long_median_df: pd.DataFrame | None = None
def set_anomalous_countries(
self,
# In this function is always "mad"
topn_function: str = "mad",
# In this function is always None
threshold: float | None = None,
start_date: datetime.date | None = None,
end_date: datetime.date | None = None,
) -> list[str]:
"""Return list of anomalous countries.
Input:
- long_reconstructed_df
- threshold = None
- topn_function = "mad"
- start_date
- end_date
Output:
- anomalous_countries
"""
logger.info("Obtaining anomalous countries.")
logger.debug("topn_function %s", topn_function)
logger.debug("threshold %s", threshold)
start_date = start_date or self.start_date
end_date = end_date or self.end_date
logger.debug(
"Target date interval start %s end %s", start_date, end_date
)
if start_date is not None and end_date is not None:
self.start_date = min(
start_date, self.long_reconstructed_df.index.max()
)
self.end_date = min(
end_date, self.long_reconstructed_df.index.max()
)
logger.debug(
"Adjusted start %s end %s to reconstructed",
self.start_date,
self.end_date,
)
self.long_reconstructed_interval_df = self.long_reconstructed_df[
(self.long_reconstructed_df.index >= self.start_date)
& (self.long_reconstructed_df.index <= self.end_date)
].copy() # [135 rows x 3 columns]
logger.debug(
"self.long_reconstructed_interval_df.shape d%s",
self.long_reconstructed_interval_df.shape,
)
# Pivot to wide format: date ~ country
wide_reconstructed_df: pd.DataFrame = (
self.long_reconstructed_interval_df.pivot_table(
index="date", columns="country", values="res"
)
) # [1 rows x 135 columns]
logger.debug(
"wide_reconstructed_interval_df.shape d%s",
wide_reconstructed_df.shape,
)
if len(wide_reconstructed_df) > 1: # Always False
abs_vals = wide_reconstructed_df.abs()
scores = abs_vals.apply(lambda col: col.max()).sort_values(
ascending=False
)
else:
# Single row case
scores = (
wide_reconstructed_df.abs()
.squeeze()
.sort_values(ascending=False)
)
# country
# sy 12.101697
# tm 7.356141
# ir 6.743156
# sa 1.528195
# ae 0.991938
# ...
# sd 0.205191
# il 0.144944
# eg 0.099077
# ph 0.027896
# kr 0.009907
# Always None in this function
# Return countries based on threshold or topn
suspicious: list[str]
if threshold is not None:
suspicious = scores[scores > threshold].index.tolist()
else:
suspicious = scores.head(self.topn).index.tolist()
self.anomalous_countries = suspicious
# ['sy', 'tm', 'ir', 'sa', 'ae', 'kg', 'id', 'om', 'ua', 'th']
logger.debug("Suspicious countries: %r", self.anomalous_countries)
return self.anomalous_countries
def write_rankings(self, end_date: datetime.date | None = None) -> None:
"""Write anomalous countries rankings to file.
Input:
- end_date
- anomalous_countries
"""
logger.info("Writing rankings.")
end_date = end_date or pd.to_datetime(self.end_date).strftime(
"%Y-%m-%d"
)
rankings_path: Path = self.output_dir.joinpath(
f"{end_date}-rankings.txt"
)
log_text: tuple[str, str] = (
f"Most Anomalous Countries by Tor Usage:\n{end_date}\n",
"(In order of median anomaly score over time period.)\n",
)
logger.info(log_text)
countries: list[str] = translate_country_codes(
self.anomalous_countries
)
text: str = "".join((f"Last {self.ndays}(s):\n=============\n",))
# Print translated suspicious countries
text += "\n".join(countries)
text += "\n\n"
log_write_file(rankings_path, text)
def set_plot_df(self):
"""Set matrices that will be used to plot the anomalies.
Input:
- long_reconstructed_df
- long_seasoned_df
- anomalous_countries
- plot_start
- plot_end
Output:
- long_reconstructed_plot_df
- long_seasoned_plot_df
- long_prepared_plot_df
"""
logger.info("Preparing plot data")
self.plot_end = self.long_reconstructed_df.index.drop_duplicates()[-1]
self.plot_start = self.long_reconstructed_df.index.drop_duplicates()[
-self.plot_length - 1
]
logger.debug("plot start %s end %s", self.plot_start, self.plot_end)
# Subset dataframes with the plot dates
if self.plot_start and self.plot_end:
mask = (self.long_reconstructed_df.index >= self.plot_start) & (
self.long_reconstructed_df.index <= self.plot_end
)
self.long_reconstructed_plot_df = (
self.long_reconstructed_df.copy().loc[mask]
) # [181 rows x 3 columns] # [24435 rows x 3 columns]
logger.debug(
"long_reconstructed_plot_df.shape %s",
self.long_reconstructed_plot_df.shape,
)
mask = (self.long_seasoned_df.index >= self.plot_start) & (
self.long_seasoned_df.index <= self.plot_end
)
self.long_seasoned_plot_df = self.long_seasoned_df.copy().loc[
mask
] # [1 rows x 181 columns
logger.debug(
"long_seasoned_plot_df.shape %s",
self.long_seasoned_plot_df.shape,
)
# pylint: disable-next=[line-too-long]
self.long_prepared_plot_df = self.long_seasoned_plot_df[ # type: ignore
self.long_seasoned_plot_df["country"].isin( # type: ignore
self.anomalous_countries
)
].copy()
self.long_prepared_plot_df[
"country_label"
] = self.long_prepared_plot_df["country"].apply(
lambda x: str_wrap(
translate_country_codes(x), width=self.text_width
)
)
logger.debug(
"long_prepared_plot_df shape %s",
self.long_prepared_plot_df.shape,
) # [1810 rows x 3 columns]
return self.long_prepared_plot_df
def set_anomalies(
self,
threshold: float = THRESHOLD,
) -> pd.DataFrame:
"""Detect anomalies based on median and MAD.
Return a boolean DataFrame where True = anomaly.
```text
For each country we calculate the difference between the true
value and the reconstructed value, providing a residual error that
was not captured by the restricted set of principal components.
We maintain a rolling calculation of both the median observed
residual error and the median absolute deviation of the errors for
each country. We mark a day as anomalous if the observed resid-
ual error falls outside of 2.5 median absolute deviations from the
median.
```
Threshold refers to a number of deviations from the median, not an
absolute value.
Input:
- self.long_reconstructed_plot_df
- self.long_median_summary_df
- threshold
- max_gap
Output:
- wide_anomaly_df
"""
logger.info("Tagging wide_anomaly_df (robust).")
threshold = threshold or self.threshold # type: ignore
logger.debug("threshold %d", threshold)
# Merge the residuals object with the median.summary object, then
# simply calculate for each row if the absolute value of the "res" is
# greater than the absolute of ( "median" + threshold*"mad" )
long_reconstructed_plot_median_df = pd.merge(
self.long_reconstructed_plot_df.reset_index(), # type: ignore
self.long_median_summary_df.reset_index(),
on=["date", "country"],
)
logger.debug(
"long_reconstructed_plot_median_df.shape %s",
long_reconstructed_plot_median_df.shape,
) # [24435 rows x 6 columns]
# Calculate anomaly threshold using median and MAD
threshold_upper = (
long_reconstructed_plot_median_df["median"]
+ threshold * long_reconstructed_plot_median_df["mad"]
) # 24435
threshold_lower = (
long_reconstructed_plot_median_df["median"]
- threshold * long_reconstructed_plot_median_df["mad"]
) # 24435
# Assign anomalous to 1 for all days where the residual exceeds the
# threshold
long_reconstructed_plot_median_df["anomalous"] = (
(long_reconstructed_plot_median_df["res"] > threshold_upper)
| (long_reconstructed_plot_median_df["res"] < threshold_lower)
).astype(
int
) # [24435 rows x 7 columns]
# Pivot
wide_reconstructed_plot_median_df = (
long_reconstructed_plot_median_df.pivot(
index="date", columns="country", values="anomalous"
).fillna(0)
) # [181 rows x 135 columns]
# Apply smoothing per country column
self.wide_anomaly_df = wide_reconstructed_plot_median_df.apply(
lambda col: smooth_gaps(col, max_gap=self.max_gap)
) # [181 rows x 135 columns]
logger.debug("wide_anomaly_df.shape %s", self.wide_anomaly_df.shape)
return self.wide_anomaly_df
def anomaly_rect_starts_ends(
self, country: str
) -> tuple[list[int], list[int]]:
"""
Extract start/end positions of anomalous windows using run-length
encoding.
Input:
- wide_anomaly_df
- country
Output:
- starts, ends
"""
values = self.wide_anomaly_df[country].values # type: ignore
groups = [(k, sum(1 for _ in g)) for k, g in groupby(values)]
cumsum = np.cumsum([length for _, length in groups])
anomaly_idx = [i for i, (val, _) in enumerate(groups) if val]
if not anomaly_idx:
return [], []
starts = [cumsum[i] - groups[i][1] for i in anomaly_idx]
ends = [cumsum[i] - 1 for i in anomaly_idx]
logger.debug("starts %s, ends %s", starts, ends)
return starts, ends
def highlight_anomalies_rect(self, country: str) -> pd.DataFrame:
"""
Create rectangle coordinates for anomalous windows in a single country.
Input:
- wide_anomaly_df
- country
Output:
- country's anom_rect
"""
logger.debug("highlight_anomalies_rect")
logger.debug("country %s", country)
starts, ends = self.anomaly_rect_starts_ends(country)
if not starts:
return pd.DataFrame()
rect: pd.DataFrame = pd.DataFrame(
{
"xmin": self.wide_anomaly_df.index[starts], # type: ignore
"xmax": self.wide_anomaly_df.index[ends], # type: ignore
"ymin": -np.inf,
"ymax": np.inf,
"country": country,
}
)
logger.debug("rect.shape %s", rect.shape)
return rect
def highlight_anomalies(
self,
) -> pd.DataFrame:
"""Highlight anomalies for multiple countries.
Input:
- anomalous_countries
Output:
- anom_rect_df
"""
logger.info("Highlighting anomalies.")
rects = [
self.highlight_anomalies_rect(country)
for country in self.anomalous_countries # type: ignore
]
rects = [r for r in rects if not r.empty]
self.anom_rect_df = (
pd.concat(rects, ignore_index=True) if rects else pd.DataFrame()
)
logger.debug("self.anom_rect_df.shape %s", self.anom_rect_df.shape)
return self.anom_rect_df
def set_median(
self,
):
"""Median values to plot.
Input:
- long_seasoned_plot_df
- anomalous_countries
Output:
- long_median_df
"""
logger.info("Setting median")
long_median_df = self.long_seasoned_plot_df.loc[ # type: ignore
(
self.long_seasoned_plot_df.index # type: ignore
>= self.start_date # Adjusted date to reconstructed
)
& (
self.long_seasoned_plot_df.index # type: ignore
<= self.end_date
)
].copy()
long_median_df = long_median_df.query(
"country in @self.anomalous_countries"
)
self.long_median_df = long_median_df.groupby(
"country", as_index=False
)["users"].median()
self.long_median_df.columns = ["country", "median_value"]
logger.debug("long_median_df.shape %s", self.long_median_df.shape)
return self.long_median_df
def plot(self, linesize: float = 0.3):
"""Plot graphs for the most suspicious countries for the user dates.
For this function, the threshold is in numbers of median average
deviations from the median, rather than an absolute value:
- 2 allows lots of errors to be detected, with higher false positive
rates.
- 2.5 is more conservative.
- 3 is highly conservative.
Input:
- linesize
- anomalous_countries
- long_prepared_plot_df
- anom_rect_df
- long_median_df
Output:
- fig, axes
"""
# Create subplots for each country
n_countries: int = len(self.anomalous_countries) # type: ignore
fig, axes = plt.subplots(
nrows=n_countries,
ncols=1,
sharex=True, # Share x axis (dates)
figsize=(10, 3 * n_countries),
)
axes = axes.flatten() # In case there's only one
# Convert to dict for easy access
axes_dict = dict(zip(self.anomalous_countries, axes)) # type: ignore
# Set target countries
for country in self.anomalous_countries: # type: ignore
ax = axes_dict[country]
data = self.long_prepared_plot_df[ # type: ignore
self.long_prepared_plot_df["country"] # type: ignore
== country # type: ignore
]
ax.plot(
data.index,
data["users"],
color="blue",
linewidth=linesize,
label=country,
)
_ymin, ymax = ax.get_ylim()
ax.set_yticks([0, ymax]) # Show only 0 and max value
# ax.set_ylabel("Estimated Daily Users")
ax.tick_params(axis="y", labelsize=8)
ax.set_title(
data["country_label"].iloc[0], fontsize=8, loc="right"
)
# Add anomaly rectangles if they exist
if not self.anom_rect_df.empty: # type: ignore
country_anoms = self.anom_rect_df[ # type: ignore
self.anom_rect_df["country"] == country # type: ignore
]
for _, row in country_anoms.iterrows():
ax.axvspan(
xmin=row["xmin"],
xmax=row["xmax"],
ymin=0,
ymax=1,
color="grey",
alpha=0.5,
linewidth=0,
zorder=0,
)
# Add median horizontal line if available
if not self.long_median_df.empty: # type: ignore
median_row = self.long_median_df[ # type: ignore
self.long_median_df["country"] == country # type: ignore
]
if not median_row.empty:
ax.axhline(
y=median_row["median_value"].iloc[0],
color="#56B4E9",
linestyle="dotted",
zorder=1,
)
# Set x-label on last axis
axes[-1].set_xlabel("Date")
fig.suptitle("Anomalous Periods in Tor Usage\n", fontsize=12)
# Adjust layout
plt.tight_layout()
# plt.subplots_adjust(hspace=0.3) # space between subplots
plt.subplots_adjust(top=0.93, hspace=0.4)
return (fig, axes)
def save_plot(self, fig, axes) -> sns:
logger.info("Plotting recent suspicious")
# Remove y-tick labels and ticks
for ax in axes:
ax.tick_params(axis="y", which="both", labelleft=False, left=False)
# Save plot
fname = self.output_dir.joinpath(f"{self.end_date}-plot.svg")
plt.savefig(fname, dpi=100, bbox_inches="tight")
plt.show()
plt.close()
logger.info("Plotted recent suspicious %s", fname)
return fig
def write_csv(self) -> None:
if not self.write:
return
# pylint: disable-next=[line-too-long]
start_str: str = self.train_start_date.strftime("%Y-%m-%d") # type: ignore
end_str: str = self.train_end_date.strftime("%Y-%m-%d") # type: ignore
# pylint: disable-next=[duplicate-code]
write_csv_subdir(
self.long_prepared_plot_df,
"long_prepared_plot",
start_str,
end_str,
)
write_csv_subdir(
self.wide_anomaly_df, "wide_anomaly", start_str, end_str
)
def run(
self,
):
# Ensure output directory exists
os.makedirs(self.output_dir, exist_ok=True)
self.set_anomalous_countries() # "mad", None, start_date, end_date)
self.write_rankings() # end_date)
self.set_plot_df()
# Detect anomalies
self.set_anomalies()
self.highlight_anomalies()
self.set_median()
# Generate plot
fig, axes = self.plot()
self.save_plot(fig, axes)
self.write_csv()
anomaly_rect_starts_ends(country)
Extract start/end positions of anomalous windows using run-length encoding.
Input:
- wide_anomaly_df
- country
Output:
- starts, ends
Source code in anomaly/visualizer.py
def anomaly_rect_starts_ends(
self, country: str
) -> tuple[list[int], list[int]]:
"""
Extract start/end positions of anomalous windows using run-length
encoding.
Input:
- wide_anomaly_df
- country
Output:
- starts, ends
"""
values = self.wide_anomaly_df[country].values # type: ignore
groups = [(k, sum(1 for _ in g)) for k, g in groupby(values)]
cumsum = np.cumsum([length for _, length in groups])
anomaly_idx = [i for i, (val, _) in enumerate(groups) if val]
if not anomaly_idx:
return [], []
starts = [cumsum[i] - groups[i][1] for i in anomaly_idx]
ends = [cumsum[i] - 1 for i in anomaly_idx]
logger.debug("starts %s, ends %s", starts, ends)
return starts, ends
highlight_anomalies()
Highlight anomalies for multiple countries.
Input:
- anomalous_countries
Output:
- anom_rect_df
Source code in anomaly/visualizer.py
def highlight_anomalies(
self,
) -> pd.DataFrame:
"""Highlight anomalies for multiple countries.
Input:
- anomalous_countries
Output:
- anom_rect_df
"""
logger.info("Highlighting anomalies.")
rects = [
self.highlight_anomalies_rect(country)
for country in self.anomalous_countries # type: ignore
]
rects = [r for r in rects if not r.empty]
self.anom_rect_df = (
pd.concat(rects, ignore_index=True) if rects else pd.DataFrame()
)
logger.debug("self.anom_rect_df.shape %s", self.anom_rect_df.shape)
return self.anom_rect_df
highlight_anomalies_rect(country)
Create rectangle coordinates for anomalous windows in a single country.
Input:
- wide_anomaly_df
- country
Output:
- country's anom_rect
Source code in anomaly/visualizer.py
def highlight_anomalies_rect(self, country: str) -> pd.DataFrame:
"""
Create rectangle coordinates for anomalous windows in a single country.
Input:
- wide_anomaly_df
- country
Output:
- country's anom_rect
"""
logger.debug("highlight_anomalies_rect")
logger.debug("country %s", country)
starts, ends = self.anomaly_rect_starts_ends(country)
if not starts:
return pd.DataFrame()
rect: pd.DataFrame = pd.DataFrame(
{
"xmin": self.wide_anomaly_df.index[starts], # type: ignore
"xmax": self.wide_anomaly_df.index[ends], # type: ignore
"ymin": -np.inf,
"ymax": np.inf,
"country": country,
}
)
logger.debug("rect.shape %s", rect.shape)
return rect
plot(linesize=0.3)
Plot graphs for the most suspicious countries for the user dates.
For this function, the threshold is in numbers of median average deviations from the median, rather than an absolute value:
- 2 allows lots of errors to be detected, with higher false positive rates.
- 2.5 is more conservative.
- 3 is highly conservative.
Input:
- linesize
- anomalous_countries
- long_prepared_plot_df
- anom_rect_df
- long_median_df
Output:
- fig, axes
Source code in anomaly/visualizer.py
def plot(self, linesize: float = 0.3):
"""Plot graphs for the most suspicious countries for the user dates.
For this function, the threshold is in numbers of median average
deviations from the median, rather than an absolute value:
- 2 allows lots of errors to be detected, with higher false positive
rates.
- 2.5 is more conservative.
- 3 is highly conservative.
Input:
- linesize
- anomalous_countries
- long_prepared_plot_df
- anom_rect_df
- long_median_df
Output:
- fig, axes
"""
# Create subplots for each country
n_countries: int = len(self.anomalous_countries) # type: ignore
fig, axes = plt.subplots(
nrows=n_countries,
ncols=1,
sharex=True, # Share x axis (dates)
figsize=(10, 3 * n_countries),
)
axes = axes.flatten() # In case there's only one
# Convert to dict for easy access
axes_dict = dict(zip(self.anomalous_countries, axes)) # type: ignore
# Set target countries
for country in self.anomalous_countries: # type: ignore
ax = axes_dict[country]
data = self.long_prepared_plot_df[ # type: ignore
self.long_prepared_plot_df["country"] # type: ignore
== country # type: ignore
]
ax.plot(
data.index,
data["users"],
color="blue",
linewidth=linesize,
label=country,
)
_ymin, ymax = ax.get_ylim()
ax.set_yticks([0, ymax]) # Show only 0 and max value
# ax.set_ylabel("Estimated Daily Users")
ax.tick_params(axis="y", labelsize=8)
ax.set_title(
data["country_label"].iloc[0], fontsize=8, loc="right"
)
# Add anomaly rectangles if they exist
if not self.anom_rect_df.empty: # type: ignore
country_anoms = self.anom_rect_df[ # type: ignore
self.anom_rect_df["country"] == country # type: ignore
]
for _, row in country_anoms.iterrows():
ax.axvspan(
xmin=row["xmin"],
xmax=row["xmax"],
ymin=0,
ymax=1,
color="grey",
alpha=0.5,
linewidth=0,
zorder=0,
)
# Add median horizontal line if available
if not self.long_median_df.empty: # type: ignore
median_row = self.long_median_df[ # type: ignore
self.long_median_df["country"] == country # type: ignore
]
if not median_row.empty:
ax.axhline(
y=median_row["median_value"].iloc[0],
color="#56B4E9",
linestyle="dotted",
zorder=1,
)
# Set x-label on last axis
axes[-1].set_xlabel("Date")
fig.suptitle("Anomalous Periods in Tor Usage\n", fontsize=12)
# Adjust layout
plt.tight_layout()
# plt.subplots_adjust(hspace=0.3) # space between subplots
plt.subplots_adjust(top=0.93, hspace=0.4)
return (fig, axes)
set_anomalies(threshold=THRESHOLD)
Detect anomalies based on median and MAD.
Return a boolean DataFrame where True = anomaly.
For each country we calculate the difference between the true
value and the reconstructed value, providing a residual error that
was not captured by the restricted set of principal components.
We maintain a rolling calculation of both the median observed
residual error and the median absolute deviation of the errors for
each country. We mark a day as anomalous if the observed resid-
ual error falls outside of 2.5 median absolute deviations from the
median.
Threshold refers to a number of deviations from the median, not an absolute value.
Input:
- self.long_reconstructed_plot_df
- self.long_median_summary_df
- threshold
- max_gap
Output:
- wide_anomaly_df
Source code in anomaly/visualizer.py
def set_anomalies(
self,
threshold: float = THRESHOLD,
) -> pd.DataFrame:
"""Detect anomalies based on median and MAD.
Return a boolean DataFrame where True = anomaly.
```text
For each country we calculate the difference between the true
value and the reconstructed value, providing a residual error that
was not captured by the restricted set of principal components.
We maintain a rolling calculation of both the median observed
residual error and the median absolute deviation of the errors for
each country. We mark a day as anomalous if the observed resid-
ual error falls outside of 2.5 median absolute deviations from the
median.
```
Threshold refers to a number of deviations from the median, not an
absolute value.
Input:
- self.long_reconstructed_plot_df
- self.long_median_summary_df
- threshold
- max_gap
Output:
- wide_anomaly_df
"""
logger.info("Tagging wide_anomaly_df (robust).")
threshold = threshold or self.threshold # type: ignore
logger.debug("threshold %d", threshold)
# Merge the residuals object with the median.summary object, then
# simply calculate for each row if the absolute value of the "res" is
# greater than the absolute of ( "median" + threshold*"mad" )
long_reconstructed_plot_median_df = pd.merge(
self.long_reconstructed_plot_df.reset_index(), # type: ignore
self.long_median_summary_df.reset_index(),
on=["date", "country"],
)
logger.debug(
"long_reconstructed_plot_median_df.shape %s",
long_reconstructed_plot_median_df.shape,
) # [24435 rows x 6 columns]
# Calculate anomaly threshold using median and MAD
threshold_upper = (
long_reconstructed_plot_median_df["median"]
+ threshold * long_reconstructed_plot_median_df["mad"]
) # 24435
threshold_lower = (
long_reconstructed_plot_median_df["median"]
- threshold * long_reconstructed_plot_median_df["mad"]
) # 24435
# Assign anomalous to 1 for all days where the residual exceeds the
# threshold
long_reconstructed_plot_median_df["anomalous"] = (
(long_reconstructed_plot_median_df["res"] > threshold_upper)
| (long_reconstructed_plot_median_df["res"] < threshold_lower)
).astype(
int
) # [24435 rows x 7 columns]
# Pivot
wide_reconstructed_plot_median_df = (
long_reconstructed_plot_median_df.pivot(
index="date", columns="country", values="anomalous"
).fillna(0)
) # [181 rows x 135 columns]
# Apply smoothing per country column
self.wide_anomaly_df = wide_reconstructed_plot_median_df.apply(
lambda col: smooth_gaps(col, max_gap=self.max_gap)
) # [181 rows x 135 columns]
logger.debug("wide_anomaly_df.shape %s", self.wide_anomaly_df.shape)
return self.wide_anomaly_df
set_anomalous_countries(topn_function='mad', threshold=None, start_date=None, end_date=None)
Return list of anomalous countries.
Input:
- long_reconstructed_df
- threshold = None
- topn_function = "mad"
- start_date
- end_date
Output:
- anomalous_countries
Source code in anomaly/visualizer.py
def set_anomalous_countries(
self,
# In this function is always "mad"
topn_function: str = "mad",
# In this function is always None
threshold: float | None = None,
start_date: datetime.date | None = None,
end_date: datetime.date | None = None,
) -> list[str]:
"""Return list of anomalous countries.
Input:
- long_reconstructed_df
- threshold = None
- topn_function = "mad"
- start_date
- end_date
Output:
- anomalous_countries
"""
logger.info("Obtaining anomalous countries.")
logger.debug("topn_function %s", topn_function)
logger.debug("threshold %s", threshold)
start_date = start_date or self.start_date
end_date = end_date or self.end_date
logger.debug(
"Target date interval start %s end %s", start_date, end_date
)
if start_date is not None and end_date is not None:
self.start_date = min(
start_date, self.long_reconstructed_df.index.max()
)
self.end_date = min(
end_date, self.long_reconstructed_df.index.max()
)
logger.debug(
"Adjusted start %s end %s to reconstructed",
self.start_date,
self.end_date,
)
self.long_reconstructed_interval_df = self.long_reconstructed_df[
(self.long_reconstructed_df.index >= self.start_date)
& (self.long_reconstructed_df.index <= self.end_date)
].copy() # [135 rows x 3 columns]
logger.debug(
"self.long_reconstructed_interval_df.shape d%s",
self.long_reconstructed_interval_df.shape,
)
# Pivot to wide format: date ~ country
wide_reconstructed_df: pd.DataFrame = (
self.long_reconstructed_interval_df.pivot_table(
index="date", columns="country", values="res"
)
) # [1 rows x 135 columns]
logger.debug(
"wide_reconstructed_interval_df.shape d%s",
wide_reconstructed_df.shape,
)
if len(wide_reconstructed_df) > 1: # Always False
abs_vals = wide_reconstructed_df.abs()
scores = abs_vals.apply(lambda col: col.max()).sort_values(
ascending=False
)
else:
# Single row case
scores = (
wide_reconstructed_df.abs()
.squeeze()
.sort_values(ascending=False)
)
# country
# sy 12.101697
# tm 7.356141
# ir 6.743156
# sa 1.528195
# ae 0.991938
# ...
# sd 0.205191
# il 0.144944
# eg 0.099077
# ph 0.027896
# kr 0.009907
# Always None in this function
# Return countries based on threshold or topn
suspicious: list[str]
if threshold is not None:
suspicious = scores[scores > threshold].index.tolist()
else:
suspicious = scores.head(self.topn).index.tolist()
self.anomalous_countries = suspicious
# ['sy', 'tm', 'ir', 'sa', 'ae', 'kg', 'id', 'om', 'ua', 'th']
logger.debug("Suspicious countries: %r", self.anomalous_countries)
return self.anomalous_countries
set_median()
Median values to plot.
Input:
- long_seasoned_plot_df
- anomalous_countries
Output:
- long_median_df
Source code in anomaly/visualizer.py
def set_median(
self,
):
"""Median values to plot.
Input:
- long_seasoned_plot_df
- anomalous_countries
Output:
- long_median_df
"""
logger.info("Setting median")
long_median_df = self.long_seasoned_plot_df.loc[ # type: ignore
(
self.long_seasoned_plot_df.index # type: ignore
>= self.start_date # Adjusted date to reconstructed
)
& (
self.long_seasoned_plot_df.index # type: ignore
<= self.end_date
)
].copy()
long_median_df = long_median_df.query(
"country in @self.anomalous_countries"
)
self.long_median_df = long_median_df.groupby(
"country", as_index=False
)["users"].median()
self.long_median_df.columns = ["country", "median_value"]
logger.debug("long_median_df.shape %s", self.long_median_df.shape)
return self.long_median_df
set_plot_df()
Set matrices that will be used to plot the anomalies.
Input:
- long_reconstructed_df
- long_seasoned_df
- anomalous_countries
- plot_start
- plot_end
Output:
- long_reconstructed_plot_df
- long_seasoned_plot_df
- long_prepared_plot_df
Source code in anomaly/visualizer.py
def set_plot_df(self):
"""Set matrices that will be used to plot the anomalies.
Input:
- long_reconstructed_df
- long_seasoned_df
- anomalous_countries
- plot_start
- plot_end
Output:
- long_reconstructed_plot_df
- long_seasoned_plot_df
- long_prepared_plot_df
"""
logger.info("Preparing plot data")
self.plot_end = self.long_reconstructed_df.index.drop_duplicates()[-1]
self.plot_start = self.long_reconstructed_df.index.drop_duplicates()[
-self.plot_length - 1
]
logger.debug("plot start %s end %s", self.plot_start, self.plot_end)
# Subset dataframes with the plot dates
if self.plot_start and self.plot_end:
mask = (self.long_reconstructed_df.index >= self.plot_start) & (
self.long_reconstructed_df.index <= self.plot_end
)
self.long_reconstructed_plot_df = (
self.long_reconstructed_df.copy().loc[mask]
) # [181 rows x 3 columns] # [24435 rows x 3 columns]
logger.debug(
"long_reconstructed_plot_df.shape %s",
self.long_reconstructed_plot_df.shape,
)
mask = (self.long_seasoned_df.index >= self.plot_start) & (
self.long_seasoned_df.index <= self.plot_end
)
self.long_seasoned_plot_df = self.long_seasoned_df.copy().loc[
mask
] # [1 rows x 181 columns
logger.debug(
"long_seasoned_plot_df.shape %s",
self.long_seasoned_plot_df.shape,
)
# pylint: disable-next=[line-too-long]
self.long_prepared_plot_df = self.long_seasoned_plot_df[ # type: ignore
self.long_seasoned_plot_df["country"].isin( # type: ignore
self.anomalous_countries
)
].copy()
self.long_prepared_plot_df[
"country_label"
] = self.long_prepared_plot_df["country"].apply(
lambda x: str_wrap(
translate_country_codes(x), width=self.text_width
)
)
logger.debug(
"long_prepared_plot_df shape %s",
self.long_prepared_plot_df.shape,
) # [1810 rows x 3 columns]
return self.long_prepared_plot_df
write_rankings(end_date=None)
Write anomalous countries rankings to file.
Input:
- end_date
- anomalous_countries
Source code in anomaly/visualizer.py
def write_rankings(self, end_date: datetime.date | None = None) -> None:
"""Write anomalous countries rankings to file.
Input:
- end_date
- anomalous_countries
"""
logger.info("Writing rankings.")
end_date = end_date or pd.to_datetime(self.end_date).strftime(
"%Y-%m-%d"
)
rankings_path: Path = self.output_dir.joinpath(
f"{end_date}-rankings.txt"
)
log_text: tuple[str, str] = (
f"Most Anomalous Countries by Tor Usage:\n{end_date}\n",
"(In order of median anomaly score over time period.)\n",
)
logger.info(log_text)
countries: list[str] = translate_country_codes(
self.anomalous_countries
)
text: str = "".join((f"Last {self.ndays}(s):\n=============\n",))
# Print translated suspicious countries
text += "\n".join(countries)
text += "\n\n"
log_write_file(rankings_path, text)