Processor
Source code in anomaly/processor.py
class Processor:
def __init__(self, preprocessor):
self.data_dir: str = preprocessor.data_dir # DATA_DIR
self.read: bool = preprocessor.read
self.write: bool = preprocessor.write
self.window_width: int = preprocessor.window_width # WINDOW_WIDTH
self.train_start_date: datetime.date = preprocessor.train_start_date
self.train_end_date: datetime.date = preprocessor.train_end_date
self.wide_unseasoned_df: pd.DataFrame = preprocessor.wide_unseasoned_df
self.pcs_number: int = PCS_NUMBER
self.window_reconstruct_width: int = WINDOW_RECONSTRUCT_WIDTH
self.threshold: float = THRESHOLD
self.topn: int = TOPN
self.pca_df: pd.DataFrame | None = None
self.long_reconstructed_df: pd.DataFrame | None = None
self.long_median_summary_df: pd.DataFrame | None = None
def calculate_pca_win(self, i: int) -> dict:
"""
```text
For each 180-day period in the dataset we apply a principal component
analysis over the usage time series for all countries, resulting in a
set of components for that time window.
```
Input:
- wide_unseasoned_df
- window_width (180)
eg. window_df for i = 0:
```
country ae al
date
2011-09-01 5619.767371 223.472308
...
2012-02-29 2725.496242 117.144923
```
R's `scale()` uses Bessel's correction (ddof=1, dividing by n-1), while
StandardScaler uses ddof=0 (dividing by n).
There is no need to store `scaler._scale`, `scaler.mean_`,
or `pca.components_`, since storing `scaler` and `pca` the matrix can
be reconstructed calling the inverse functions.
Without the inverse functions, the matrices generated by R can be
created using:
- `scale`: `scaler.scale_ `
- `x`: `pca.fit_transform(scaled)`
- `rotation`: `pca.components_`
- `center`: `scaler.mean_`
eg. pca_win_dict:
```
'scale': array([7.82486484e+02, 4.67978686e+01 # 135
'x': array([[ 2.53382531e+01, 1.92390266e+00, # 180 x 135 = 24300
'sdev': array([8.89094766e+00, 3.25341552e+00, # 135
'rotation': array([[ 0.10952302, -0.05344188, # 135 x 135 = 18225
'center': array([3.54811646e+03, 1.82723558e+02, # 135
```
"""
j: int = i + self.window_width
window_df = self.wide_unseasoned_df.iloc[i:j] # 180 x 135
# logger.debug("window_df.shape %s", window_df.shape)
pca_win_dict: dict = {}
scaler = StandardScaler()
scaled = scaler.fit_transform(window_df) # 180 x 135
# Here `n_components` will be min(min(window_df.shape), window_width)
# and has to be initialized on each iteration
pca = PCA()
pca_win_dict["scaler"] = scaler
pca_win_dict["pca"] = pca
pca_win_dict["x"] = pca.fit_transform(scaled)
return pca_win_dict
def calculate_pca_df(
self,
) -> pd.DataFrame:
"""Perform Principal Component Analysis (PCA)
using singular value decomposition (SVD) on the (centered and
optionally scaled) matrix.
```text
To account for developing patterns, therefore, we perform a rolling
principal component analysis over smaller time windows within
the series. For the purposes of our experiments, we make use of
a 180-day window as a balance between sufficient data for useful
principal component analysis, given the number of dimension in
the data, against the evolution of the daily Tor metrics.
```
Input:
- wide_unseasoned_df [2401 rows x 135 columns]
Output:
- pca_df (2222, 3)
- x.iloc[0] (180, 135) = 24300
- scaler
- pca
"""
logger.info(
"Calculating rolling Principal Components Analysis over dataset."
)
logger.debug("window_width %s", self.window_width)
self.pca_df = pd.DataFrame(
columns=[
"scaler",
"pca",
"x",
]
)
total: int = len(self.wide_unseasoned_df) - self.window_width + 1
# Range will go from 0 to 2221 (i = 2221, j = i + 180 = 2401)
for i in tqdm(range(total), desc="Rolling PCA"):
pca_win_dict: dict = self.calculate_pca_win(i)
self.pca_df.loc[i] = [
pca_win_dict["scaler"],
pca_win_dict["pca"],
pca_win_dict["x"],
]
logger.debug("self.pca_df.shape %s", self.pca_df.shape) # (2222, 5)
return self.pca_df
def reconstruct_pc_win_df(
self, pca_win_df: pd.DataFrame, pcs_array: None | np.ndarray = None
) -> pd.DataFrame:
"""
We can use this to 'reconstruct' an approximation of the original data
by applying the inverse (transpose) of the rotation matrix to the
rotated data. (Rotations are stored in the $rotation matrix. The
rotated data is stored in the $x matrix. So $rotation' x $x should
reconstruct the original.
$rotation[,<some pcs>]' x $x will make the rotation using only some
principal components, and then the residuals should be meaningful.
Input:
- pca_win_df # [1 rows x 5 columns], eg:
```
0 [782.4864835661228, 46.797868643198996, 16.018...
[1 rows x 5 columns]
```
- `pcs_array`: a vector of principal components to be used for
reconstruction (default includes all pcs)
Output:
- recon # [180 rows x 135 columns], eg:
```
recon[180, 135] 1053.506204
```
"""
# logger.info("Reconstructing")
if pcs_array is None:
pcs_array = np.arange(len(pca_win_df["x"][0])) # 135
# Reconstruct
recon_scaled = pca_win_df.pca.iloc[0].inverse_transform(
pca_win_df.x.iloc[0]
)
recon = pca_win_df.scaler.iloc[0].inverse_transform(recon_scaled)
return recon
def reconstruct_win_df(
self,
window: int = 1,
countries: list | None = None,
) -> pd.DataFrame:
"""Obtain residuals using selected components over selected countries.
```text
After reconstructing data from principal components, the result is
a set of residuals that express variances in the observed data not
captured by the current principal component model. A sufficiently
large-scale residual represents behaviour that deviates significantly
from previous patterns, and is thus of interest.
```
```text
The residual errors calculated during the reconstruction accounts
for variance in the dataset that is not expressed by the chosen
principle components in the approximate model.
• Positive residuals represent drops in expected Tor usage for
a country.
• Negative residuals represent increases in expected Tor usage
for a country.
• Magnitude of residuals expresses how much a country varies
from its previous behaviour relative to other countries.
```
Input:
- pca_df
- wide_unseasoned_df # [2401 rows x 136 columns]
- `window`: the starting index of the window
- `countries`: a list of countries of interest
Output:
- long_residuals # [24300 rows x 2 columns], eg:
```
2012-02-29 za 0.449914
```
"""
# logger.info("Reconstruct with window %s and pcs %s", window, pcs)
# Retrieves the PC scores for a data point (row)
pca_win_df: pd.DataFrame = self.pca_df.iloc[:window] # type: ignore
# [1 rows x 3 columns]
# Reconstruct PCA data
# `self.pcs_number` isn't used.
recon = self.reconstruct_pc_win_df(
pca_win_df
) # [180 rows x 135 columns]
recon_df = pd.DataFrame(recon) # [180 rows x 135 columns]
# Add back dates
recon_df.index = self.wide_unseasoned_df.index[
window - 1 : window + self.window_width - 1
]
# Add back columns
recon_df.columns = countries
# Extract the original data window
orig_win = self.wide_unseasoned_df.iloc[
window - 1 : window + self.window_width - 1
][
countries
] # [180 rows x 135 columns]
# Calculate residuals
# no need for `orig_win[countries]` since orig_win comes from it
residuals = (
recon_df - orig_win
) / orig_win # [180 rows x 135 columns]
long_residuals = residuals.melt(
var_name="country",
value_name="res",
ignore_index=False,
)
return long_residuals # [24300 rows x 2 columns]
def reconstruct_df(
self,
# window: int = 1,
# countries=None,
) -> pd.DataFrame:
"""
```text
For PCA, the full set of principal components allows reconstruction of
the full data set. As fewer components are selected, less variance in
the original dataset is captured.
```
```text
our experimental results suggest twelve principal components as
broadly optimal across the dataset.
```
Though the same number of components (180) is being used in practice.
```text
With appropriately calculated principal components, we can
reconstruct an approximate value for each day’s Tor usage based
on previous behaviour.
```
```text
we reconstruct a day’s usage based on principal components in order to
compare against the true observed value, and thus to calculate deviance
from prior behaviour relative to other countries.
```
```text
Taking the true observed usage for each country for the final day of
each window, we calculate the approximated value from the first 12
principal components. This provides the expected value for each country
based on previous behaviour.
```
Input:
- pcs_number
- window
- countries
- wide_seasoned_df
Output:
- long_reconstructed_df
"""
logger.info(
"Reconstructing data from restricted principle components. "
"(%s components).",
self.pcs_number,
)
countries = self.wide_unseasoned_df.columns.tolist()
# logger.debug("countries %s", countries)
# 2222
total: int = len(self.pca_df["x"]) # type: ignore
logger.debug("total %s", total)
logger.debug("window_width %s", self.window_width)
reconstructed_cache: list = []
# [ 135 rows x 3 columns]]
# Reconstruct the data for the first window (180 rows for each 135 PC),
# and note the window number.
reconstructed_tmp = self.reconstruct_win_df(
1, countries
) # [32580 rows x 2 columns]
# Just keep the min date for each country only for window 1 (why?)
reconstructed_tmp = reconstructed_tmp.loc[
reconstructed_tmp.index == reconstructed_tmp.index.min()
] # [135 rows x 2 columns]
reconstructed_tmp.loc[:, "window"] = 1 # [135 rows x 3 columns]
reconstructed_cache.insert(0, reconstructed_tmp)
for i in tqdm(range(2, total + 1), desc="Reconstructing data"):
# Reconstitute data for current window
reconstructed_tmp = self.reconstruct_win_df(
i, countries
) # [32580 rows x 2 columns]
# Select row with maximum date within the current window (why?)
reconstructed_tmp = reconstructed_tmp[
reconstructed_tmp.index == reconstructed_tmp.index.max()
] # [181 rows x 2 columns]
# Append to cumulative list of data frames
reconstructed_tmp.loc[:, "window"] = i # [181 rows x 3 columns]
reconstructed_cache.insert(i - 1, reconstructed_tmp)
# Collapse list of data frames to a single frame.
self.long_reconstructed_df = pd.concat(reconstructed_cache)
# [299970 rows x 3 columns]
# There are a small number of 'Inf' values in the data caused by
# divisions by 0. Simply set those values to 0.
self.long_reconstructed_df.loc[
np.isinf(self.long_reconstructed_df["res"]), "res"
] = 0
logger.debug(
"long_reconstructed_df.shape: %s", self.long_reconstructed_df.shape
)
return self.long_reconstructed_df # [910611 rows x 3 columns]
def median_summarise(
self,
) -> pd.DataFrame:
"""
Combination of the median.error and median.absolute.deviation functions
that returns a long combination of the two.
Input:
- long_reconstructed_df
- window_reconstruct_width
Output:
- long_median_summary_df
"""
logger.info("Calculating median summary.")
# Create a wide form of the residuals dataset
# pylint: disable-next=[line-too-long]
wide_residuals_df = self.long_reconstructed_df.pivot_table( # type: ignore
index="date", columns="country", values="res", aggfunc="sum"
) # [2222 rows x 136 columns]
window_width = self.window_reconstruct_width
# (Window width must be odd.)
if window_width % 2 == 0:
window_width += 1
logger.debug("window_width %s", window_width)
# Get the rolling median
wide_median_error_df = median_error(
wide_residuals_df, window_width
) # [2180 rows x 135 columns]
long_median_errors_df = pd.melt(
wide_median_error_df,
ignore_index=False,
var_name="country",
value_name="median",
)
# Get the rolling mad
wide_mad_df = median_absolute_deviation(
wide_residuals_df, window_width
)
long_mad_df = pd.melt(
wide_mad_df,
ignore_index=False,
var_name="country",
value_name="mad",
)
# Join the data frames
self.long_median_summary_df = pd.merge(
long_median_errors_df,
long_mad_df,
on=["date", "country"],
) # [294300 rows x 3 columns]
logger.debug(
"long_median_summary_df.shape %s",
self.long_median_summary_df.shape,
)
return self.long_median_summary_df
def list_anomalous_date(self, target_date=None):
if target_date is None:
target_date = self.long_reconstructed_df.index[-1] # type: ignore
# Subset data to given date
long_reconstructed_df = self.long_reconstructed_df[ # type: ignore
(self.long_reconstructed_df.index >= target_date) # type: ignore
& (self.long_reconstructed_df.index <= target_date) # type: ignore
]
# Merge the residuals object with the long_median_summary_df object,
# then simply calculate for each row if the absolute value of the "res"
# is greater than the absolute of ( "median" + threshold*"mad" )
residual_anomaly_threshold = pd.merge(
long_reconstructed_df, self.long_median_summary_df, how="inner"
)
# Create a column to tag a day/country column as 'anomalous'
residual_anomaly_threshold["anomalous"] = 0
# Assign anomalous to 1 for all rows where the residual exceeds the
# threshold
condition = abs(residual_anomaly_threshold["res"]) > (
residual_anomaly_threshold["median"]
+ self.threshold * residual_anomaly_threshold["mad"]
)
residual_anomaly_threshold.loc[condition, "anomalous"] = 1
anomalous_date = residual_anomaly_threshold.loc[
residual_anomaly_threshold["anomalous"] == 1, "country"
]
logger.info("anomalous_date %r", anomalous_date)
return anomalous_date
def read_csv(self) -> tuple[pd.DataFrame, pd.DataFrame] | None:
if not self.read:
return None
self.long_reconstructed_df = read_csv_subdir(
"long_reconstructed",
self.train_start_date.strftime("%Y-%m-%d"), # type: ignore
self.train_end_date.strftime("%Y-%m-%d"), # type: ignore
)
self.long_median_summary_df = read_csv_subdir(
"long_median_summary",
self.train_start_date.strftime("%Y-%m-%d"), # type: ignore
self.train_end_date.strftime("%Y-%m-%d"), # type: ignore
)
if (
self.long_reconstructed_df is not None
and self.long_median_summary_df is not None
):
return self.long_reconstructed_df, self.long_median_summary_df
logger.info("Couldn't read files, computing them.")
return None
def write_csv(self) -> None:
"""Write main dataframes to files.
In order to do not compute them again later on.
"""
# pylint: disable=R0801
if not self.write:
return
# pylint: disable-next=[line-too-long]
start_str: str = self.train_start_date.strftime("%Y-%m-%d") # type: ignore
# pylint: disable-next=[line-too-long]
end_str: str = self.train_end_date.strftime("%Y-%m-%d") # type: ignore
write_csv_subdir(
self.long_reconstructed_df,
"long_reconstructed",
start_str,
end_str,
)
write_csv_subdir(self.pca_df, "pca", start_str, end_str)
write_csv_subdir(
self.long_median_summary_df,
"long_median_summary",
start_str,
end_str,
)
def run(
self,
) -> tuple[pd.DataFrame, pd.DataFrame]:
# pylint: disable=R0801
read = self.read_csv()
if read is not None:
return read
# Calculate the rolling PCA and clean the results
self.calculate_pca_df() # "dim(pca.calculated)" 2222 5
# Reconstruct the data
self.reconstruct_df() # (299700, 3)
self.median_summarise() # (294030, 3)
# self.list_anomalous_date()
self.write_csv()
return self.long_reconstructed_df, self.long_median_summary_df
calculate_pca_df()
Perform Principal Component Analysis (PCA)
using singular value decomposition (SVD) on the (centered and optionally scaled) matrix.
To account for developing patterns, therefore, we perform a rolling
principal component analysis over smaller time windows within
the series. For the purposes of our experiments, we make use of
a 180-day window as a balance between sufficient data for useful
principal component analysis, given the number of dimension in
the data, against the evolution of the daily Tor metrics.
Input:
- wide_unseasoned_df [2401 rows x 135 columns]
Output:
- pca_df (2222, 3)
- x.iloc[0] (180, 135) = 24300
- scaler
- pca
Source code in anomaly/processor.py
def calculate_pca_df(
self,
) -> pd.DataFrame:
"""Perform Principal Component Analysis (PCA)
using singular value decomposition (SVD) on the (centered and
optionally scaled) matrix.
```text
To account for developing patterns, therefore, we perform a rolling
principal component analysis over smaller time windows within
the series. For the purposes of our experiments, we make use of
a 180-day window as a balance between sufficient data for useful
principal component analysis, given the number of dimension in
the data, against the evolution of the daily Tor metrics.
```
Input:
- wide_unseasoned_df [2401 rows x 135 columns]
Output:
- pca_df (2222, 3)
- x.iloc[0] (180, 135) = 24300
- scaler
- pca
"""
logger.info(
"Calculating rolling Principal Components Analysis over dataset."
)
logger.debug("window_width %s", self.window_width)
self.pca_df = pd.DataFrame(
columns=[
"scaler",
"pca",
"x",
]
)
total: int = len(self.wide_unseasoned_df) - self.window_width + 1
# Range will go from 0 to 2221 (i = 2221, j = i + 180 = 2401)
for i in tqdm(range(total), desc="Rolling PCA"):
pca_win_dict: dict = self.calculate_pca_win(i)
self.pca_df.loc[i] = [
pca_win_dict["scaler"],
pca_win_dict["pca"],
pca_win_dict["x"],
]
logger.debug("self.pca_df.shape %s", self.pca_df.shape) # (2222, 5)
return self.pca_df
calculate_pca_win(i)
For each 180-day period in the dataset we apply a principal component
analysis over the usage time series for all countries, resulting in a
set of components for that time window.
Input:
- wide_unseasoned_df
- window_width (180)
eg. window_df for i = 0:
country ae al
date
2011-09-01 5619.767371 223.472308
...
2012-02-29 2725.496242 117.144923
R's scale() uses Bessel's correction (ddof=1, dividing by n-1), while
StandardScaler uses ddof=0 (dividing by n).
There is no need to store scaler._scale, scaler.mean_,
or pca.components_, since storing scaler and pca the matrix can
be reconstructed calling the inverse functions.
Without the inverse functions, the matrices generated by R can be
created using:
- scale: scaler.scale_
- x: pca.fit_transform(scaled)
- rotation: pca.components_
- center: scaler.mean_
eg. pca_win_dict:
'scale': array([7.82486484e+02, 4.67978686e+01 # 135
'x': array([[ 2.53382531e+01, 1.92390266e+00, # 180 x 135 = 24300
'sdev': array([8.89094766e+00, 3.25341552e+00, # 135
'rotation': array([[ 0.10952302, -0.05344188, # 135 x 135 = 18225
'center': array([3.54811646e+03, 1.82723558e+02, # 135
Source code in anomaly/processor.py
def calculate_pca_win(self, i: int) -> dict:
"""
```text
For each 180-day period in the dataset we apply a principal component
analysis over the usage time series for all countries, resulting in a
set of components for that time window.
```
Input:
- wide_unseasoned_df
- window_width (180)
eg. window_df for i = 0:
```
country ae al
date
2011-09-01 5619.767371 223.472308
...
2012-02-29 2725.496242 117.144923
```
R's `scale()` uses Bessel's correction (ddof=1, dividing by n-1), while
StandardScaler uses ddof=0 (dividing by n).
There is no need to store `scaler._scale`, `scaler.mean_`,
or `pca.components_`, since storing `scaler` and `pca` the matrix can
be reconstructed calling the inverse functions.
Without the inverse functions, the matrices generated by R can be
created using:
- `scale`: `scaler.scale_ `
- `x`: `pca.fit_transform(scaled)`
- `rotation`: `pca.components_`
- `center`: `scaler.mean_`
eg. pca_win_dict:
```
'scale': array([7.82486484e+02, 4.67978686e+01 # 135
'x': array([[ 2.53382531e+01, 1.92390266e+00, # 180 x 135 = 24300
'sdev': array([8.89094766e+00, 3.25341552e+00, # 135
'rotation': array([[ 0.10952302, -0.05344188, # 135 x 135 = 18225
'center': array([3.54811646e+03, 1.82723558e+02, # 135
```
"""
j: int = i + self.window_width
window_df = self.wide_unseasoned_df.iloc[i:j] # 180 x 135
# logger.debug("window_df.shape %s", window_df.shape)
pca_win_dict: dict = {}
scaler = StandardScaler()
scaled = scaler.fit_transform(window_df) # 180 x 135
# Here `n_components` will be min(min(window_df.shape), window_width)
# and has to be initialized on each iteration
pca = PCA()
pca_win_dict["scaler"] = scaler
pca_win_dict["pca"] = pca
pca_win_dict["x"] = pca.fit_transform(scaled)
return pca_win_dict
median_summarise()
Combination of the median.error and median.absolute.deviation functions that returns a long combination of the two.
Input:
- long_reconstructed_df
- window_reconstruct_width
Output:
- long_median_summary_df
Source code in anomaly/processor.py
def median_summarise(
self,
) -> pd.DataFrame:
"""
Combination of the median.error and median.absolute.deviation functions
that returns a long combination of the two.
Input:
- long_reconstructed_df
- window_reconstruct_width
Output:
- long_median_summary_df
"""
logger.info("Calculating median summary.")
# Create a wide form of the residuals dataset
# pylint: disable-next=[line-too-long]
wide_residuals_df = self.long_reconstructed_df.pivot_table( # type: ignore
index="date", columns="country", values="res", aggfunc="sum"
) # [2222 rows x 136 columns]
window_width = self.window_reconstruct_width
# (Window width must be odd.)
if window_width % 2 == 0:
window_width += 1
logger.debug("window_width %s", window_width)
# Get the rolling median
wide_median_error_df = median_error(
wide_residuals_df, window_width
) # [2180 rows x 135 columns]
long_median_errors_df = pd.melt(
wide_median_error_df,
ignore_index=False,
var_name="country",
value_name="median",
)
# Get the rolling mad
wide_mad_df = median_absolute_deviation(
wide_residuals_df, window_width
)
long_mad_df = pd.melt(
wide_mad_df,
ignore_index=False,
var_name="country",
value_name="mad",
)
# Join the data frames
self.long_median_summary_df = pd.merge(
long_median_errors_df,
long_mad_df,
on=["date", "country"],
) # [294300 rows x 3 columns]
logger.debug(
"long_median_summary_df.shape %s",
self.long_median_summary_df.shape,
)
return self.long_median_summary_df
reconstruct_df()
For PCA, the full set of principal components allows reconstruction of
the full data set. As fewer components are selected, less variance in
the original dataset is captured.
our experimental results suggest twelve principal components as
broadly optimal across the dataset.
Though the same number of components (180) is being used in practice.
With appropriately calculated principal components, we can
reconstruct an approximate value for each day’s Tor usage based
on previous behaviour.
we reconstruct a day’s usage based on principal components in order to
compare against the true observed value, and thus to calculate deviance
from prior behaviour relative to other countries.
Taking the true observed usage for each country for the final day of
each window, we calculate the approximated value from the first 12
principal components. This provides the expected value for each country
based on previous behaviour.
Input:
- pcs_number
- window
- countries
- wide_seasoned_df
Output:
- long_reconstructed_df
Source code in anomaly/processor.py
def reconstruct_df(
self,
# window: int = 1,
# countries=None,
) -> pd.DataFrame:
"""
```text
For PCA, the full set of principal components allows reconstruction of
the full data set. As fewer components are selected, less variance in
the original dataset is captured.
```
```text
our experimental results suggest twelve principal components as
broadly optimal across the dataset.
```
Though the same number of components (180) is being used in practice.
```text
With appropriately calculated principal components, we can
reconstruct an approximate value for each day’s Tor usage based
on previous behaviour.
```
```text
we reconstruct a day’s usage based on principal components in order to
compare against the true observed value, and thus to calculate deviance
from prior behaviour relative to other countries.
```
```text
Taking the true observed usage for each country for the final day of
each window, we calculate the approximated value from the first 12
principal components. This provides the expected value for each country
based on previous behaviour.
```
Input:
- pcs_number
- window
- countries
- wide_seasoned_df
Output:
- long_reconstructed_df
"""
logger.info(
"Reconstructing data from restricted principle components. "
"(%s components).",
self.pcs_number,
)
countries = self.wide_unseasoned_df.columns.tolist()
# logger.debug("countries %s", countries)
# 2222
total: int = len(self.pca_df["x"]) # type: ignore
logger.debug("total %s", total)
logger.debug("window_width %s", self.window_width)
reconstructed_cache: list = []
# [ 135 rows x 3 columns]]
# Reconstruct the data for the first window (180 rows for each 135 PC),
# and note the window number.
reconstructed_tmp = self.reconstruct_win_df(
1, countries
) # [32580 rows x 2 columns]
# Just keep the min date for each country only for window 1 (why?)
reconstructed_tmp = reconstructed_tmp.loc[
reconstructed_tmp.index == reconstructed_tmp.index.min()
] # [135 rows x 2 columns]
reconstructed_tmp.loc[:, "window"] = 1 # [135 rows x 3 columns]
reconstructed_cache.insert(0, reconstructed_tmp)
for i in tqdm(range(2, total + 1), desc="Reconstructing data"):
# Reconstitute data for current window
reconstructed_tmp = self.reconstruct_win_df(
i, countries
) # [32580 rows x 2 columns]
# Select row with maximum date within the current window (why?)
reconstructed_tmp = reconstructed_tmp[
reconstructed_tmp.index == reconstructed_tmp.index.max()
] # [181 rows x 2 columns]
# Append to cumulative list of data frames
reconstructed_tmp.loc[:, "window"] = i # [181 rows x 3 columns]
reconstructed_cache.insert(i - 1, reconstructed_tmp)
# Collapse list of data frames to a single frame.
self.long_reconstructed_df = pd.concat(reconstructed_cache)
# [299970 rows x 3 columns]
# There are a small number of 'Inf' values in the data caused by
# divisions by 0. Simply set those values to 0.
self.long_reconstructed_df.loc[
np.isinf(self.long_reconstructed_df["res"]), "res"
] = 0
logger.debug(
"long_reconstructed_df.shape: %s", self.long_reconstructed_df.shape
)
return self.long_reconstructed_df # [910611 rows x 3 columns]
reconstruct_pc_win_df(pca_win_df, pcs_array=None)
We can use this to 'reconstruct' an approximation of the original data
by applying the inverse (transpose) of the rotation matrix to the
rotated data. (Rotations are stored in the $rotation matrix. The
rotated data is stored in the $x matrix. So $rotation' x $x should
reconstruct the original.
$rotation[,
Input:
- pca_win_df # [1 rows x 5 columns], eg:
0 [782.4864835661228, 46.797868643198996, 16.018...
[1 rows x 5 columns]
pcs_array: a vector of principal components to be used for reconstruction (default includes all pcs)
Output:
- recon # [180 rows x 135 columns], eg:
recon[180, 135] 1053.506204
Source code in anomaly/processor.py
def reconstruct_pc_win_df(
self, pca_win_df: pd.DataFrame, pcs_array: None | np.ndarray = None
) -> pd.DataFrame:
"""
We can use this to 'reconstruct' an approximation of the original data
by applying the inverse (transpose) of the rotation matrix to the
rotated data. (Rotations are stored in the $rotation matrix. The
rotated data is stored in the $x matrix. So $rotation' x $x should
reconstruct the original.
$rotation[,<some pcs>]' x $x will make the rotation using only some
principal components, and then the residuals should be meaningful.
Input:
- pca_win_df # [1 rows x 5 columns], eg:
```
0 [782.4864835661228, 46.797868643198996, 16.018...
[1 rows x 5 columns]
```
- `pcs_array`: a vector of principal components to be used for
reconstruction (default includes all pcs)
Output:
- recon # [180 rows x 135 columns], eg:
```
recon[180, 135] 1053.506204
```
"""
# logger.info("Reconstructing")
if pcs_array is None:
pcs_array = np.arange(len(pca_win_df["x"][0])) # 135
# Reconstruct
recon_scaled = pca_win_df.pca.iloc[0].inverse_transform(
pca_win_df.x.iloc[0]
)
recon = pca_win_df.scaler.iloc[0].inverse_transform(recon_scaled)
return recon
reconstruct_win_df(window=1, countries=None)
Obtain residuals using selected components over selected countries.
After reconstructing data from principal components, the result is
a set of residuals that express variances in the observed data not
captured by the current principal component model. A sufficiently
large-scale residual represents behaviour that deviates significantly
from previous patterns, and is thus of interest.
The residual errors calculated during the reconstruction accounts
for variance in the dataset that is not expressed by the chosen
principle components in the approximate model.
• Positive residuals represent drops in expected Tor usage for
a country.
• Negative residuals represent increases in expected Tor usage
for a country.
• Magnitude of residuals expresses how much a country varies
from its previous behaviour relative to other countries.
Input:
- pca_df
- wide_unseasoned_df # [2401 rows x 136 columns]
window: the starting index of the windowcountries: a list of countries of interest
Output:
- long_residuals # [24300 rows x 2 columns], eg:
2012-02-29 za 0.449914
Source code in anomaly/processor.py
def reconstruct_win_df(
self,
window: int = 1,
countries: list | None = None,
) -> pd.DataFrame:
"""Obtain residuals using selected components over selected countries.
```text
After reconstructing data from principal components, the result is
a set of residuals that express variances in the observed data not
captured by the current principal component model. A sufficiently
large-scale residual represents behaviour that deviates significantly
from previous patterns, and is thus of interest.
```
```text
The residual errors calculated during the reconstruction accounts
for variance in the dataset that is not expressed by the chosen
principle components in the approximate model.
• Positive residuals represent drops in expected Tor usage for
a country.
• Negative residuals represent increases in expected Tor usage
for a country.
• Magnitude of residuals expresses how much a country varies
from its previous behaviour relative to other countries.
```
Input:
- pca_df
- wide_unseasoned_df # [2401 rows x 136 columns]
- `window`: the starting index of the window
- `countries`: a list of countries of interest
Output:
- long_residuals # [24300 rows x 2 columns], eg:
```
2012-02-29 za 0.449914
```
"""
# logger.info("Reconstruct with window %s and pcs %s", window, pcs)
# Retrieves the PC scores for a data point (row)
pca_win_df: pd.DataFrame = self.pca_df.iloc[:window] # type: ignore
# [1 rows x 3 columns]
# Reconstruct PCA data
# `self.pcs_number` isn't used.
recon = self.reconstruct_pc_win_df(
pca_win_df
) # [180 rows x 135 columns]
recon_df = pd.DataFrame(recon) # [180 rows x 135 columns]
# Add back dates
recon_df.index = self.wide_unseasoned_df.index[
window - 1 : window + self.window_width - 1
]
# Add back columns
recon_df.columns = countries
# Extract the original data window
orig_win = self.wide_unseasoned_df.iloc[
window - 1 : window + self.window_width - 1
][
countries
] # [180 rows x 135 columns]
# Calculate residuals
# no need for `orig_win[countries]` since orig_win comes from it
residuals = (
recon_df - orig_win
) / orig_win # [180 rows x 135 columns]
long_residuals = residuals.melt(
var_name="country",
value_name="res",
ignore_index=False,
)
return long_residuals # [24300 rows x 2 columns]
write_csv()
Write main dataframes to files.
In order to do not compute them again later on.
Source code in anomaly/processor.py
def write_csv(self) -> None:
"""Write main dataframes to files.
In order to do not compute them again later on.
"""
# pylint: disable=R0801
if not self.write:
return
# pylint: disable-next=[line-too-long]
start_str: str = self.train_start_date.strftime("%Y-%m-%d") # type: ignore
# pylint: disable-next=[line-too-long]
end_str: str = self.train_end_date.strftime("%Y-%m-%d") # type: ignore
write_csv_subdir(
self.long_reconstructed_df,
"long_reconstructed",
start_str,
end_str,
)
write_csv_subdir(self.pca_df, "pca", start_str, end_str)
write_csv_subdir(
self.long_median_summary_df,
"long_median_summary",
start_str,
end_str,
)