Preprocessor
Source code in anomaly/preprocessor.py
class Preprocessor:
def __init__(self, args: argparse.Namespace | None = None):
self.data_dir: str = DATA_DIR
self.url: str = URL
self.orig_df_filename: str = ORIG_DF_FILENAME
self.strip: bool = True
self.min_users: int = MIN_USERS
self.season_length: int = SEASON_LENGTH
self.window_width: int = WINDOW_WIDTH
self.path: Path | None = None
self.write: bool = False
self.read: bool = False
self.start_date: datetime.date | None = None
self.end_date: datetime.date | None = None
self.train_start_date: datetime.date | None = None
self.train_end_date: datetime.date | None = None
self.long_initial_df: pd.DataFrame = None
self.long_seasoned_df: pd.DataFrame = None
self.wide_seasoned_df: pd.DataFrame = None
self.wide_unseasoned_df: pd.DataFrame = None
if args is not None:
self.path = args.path
self.write = args.write
self.read = args.read
self.start_date = args.start_date
self.end_date = args.end_date
self.train_start_date = args.train_start_date
self.train_end_date = args.train_end_date
def obtain_initial_df(
self,
) -> pd.DataFrame:
"""Obtain the (training) data to apply the analysis.
Either from a file or from the metrics Web.
Eg. input:
```text
date,country,users,lower,upper,frac
2011-03-06,,1042399,,,11
2011-03-06,??,8869,,,11
```
Output:
- long_initial_df, eg:
```
country users
date
2011-09-01 ad 76
2011-09-01 ae 5837
```
"""
logger.info("Obtaining data.")
if self.path:
logger.info("Reading path %s", self.path)
self.long_initial_df = read_csv(self.path) # (575550, 2)
else:
# Without dates argument the first date will be `2011-03-06` and
# the last will be the previous? day.
if self.train_start_date:
self.url += f"?start={self.train_start_date}"
if self.train_end_date:
self.url += f"&end={self.train_end_date}"
logger.info("Reading URL %s", self.url)
self.long_initial_df = pd.read_csv(
self.url,
comment="#",
index_col="date",
parse_dates=True,
)
self.long_initial_df.index = self.long_initial_df.index.date
self.long_initial_df.index.name = "date"
# #8: log when the args dates and df date doesn't match
data_start_date = self.long_initial_df.index.min() # type: ignore
data_end_date = self.long_initial_df.index.max() # type: ignore
# Adjust training date interval to the available data
self.train_start_date = data_start_date
self.train_end_date = data_end_date
logger.info(
"Training data is in the date interval %s %s",
self.train_start_date,
self.train_end_date,
)
# Adjust user date interval to the available data
self.start_date = min(
max(self.train_start_date, self.start_date), # type: ignore
self.train_end_date, # type: ignore
)
self.end_date = max(
min(self.train_end_date, self.end_date), # type: ignore
self.train_end_date, # type: ignore
)
logger.info(
"Target data is in the date interval %s %s",
self.start_date,
self.end_date,
)
# #8: log when the args dates and actual dates doesn't match
return self.long_initial_df
# #9: Take relays+bridges, or just relays.
def prepare_df(
self,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Prepare the (training) data to apply the analysis.
In particular:
- remove not needed columns: `lower`, `upper`, `frac`,
- remove the total counts column
- remove date outliers columns (`2011-03-06`)
- remove countries outliers rows
- remove any columns that are constant over any time window, as this
breaks prcomp with the SCALE=TRUE parameter.
- remove all columns (countries) with a maximum lower than the
`MAX_USERS` number of users, to avoid the unacceptably high variance
- remove seasonal (weekly) components from each time series by Loess,
to later reconstruct the time series without the weekly cycle
components.
```text
Tor usage data, [...] exhibits significant seasonality,
typically on a weekly basis, reflecting changes between usage on
weekdays and at weekends. This continual cyclical change in usage
can reduce the accuracy of principal component analysis due to
varying levels of seasonality exhibited by different countries.
We employ the Seasonal and Trend Decomposition using Loess
(STL) method of Cleveland et al. to remove the seasonal compo-
nent of each series, leaving the trend component and the residual
noise as inputs to our anomaly detector
```
NOTE: R `stl` and Python `statsmodels.tsa.seasonal.STL` doesn't return
the same values with the same input.
Input:
- long_initial_df:
```
date country users
0 2011-03-06 NaN 1042399.0
1 2011-03-06 ?? 8869.0
2 2011-03-06 a1 1441.0
```
Output:
- long_seasoned_df, eg:
```
country users
date
2011-09-01 ae 5837
2011-09-01 al 212
```
- wide_unseasoned_df, eg:
```
country ae al
date
2011-09-01 5619.767371 223.472308
```
"""
logger.info(
"Preparing data with min_allowed %s and window_width %s.",
self.min_users,
self.window_width,
)
# Strip outliers
# Remove no needed columns
# copy the original df, so it can be saved as it was
long_initial_df = self.long_initial_df.copy()
long_initial_df.drop(
columns=["lower", "upper", "frac"], inplace=True
) # [575550 rows x 2 columns]
wide_initial_df: pd.DataFrame = long_initial_df.pivot_table(
index="date", columns="country", values="users"
).copy() # [2403 rows x 258 columns
# In R, this matrix will already have replaced NAN by 0s
wide_initial_df.fillna(0, inplace=True)
wide_initial_df = wide_initial_df.drop(
pd.to_datetime(DATE_TO_RM).date()
) # [2402 rows x 258 columns]
# Remove outliers countries rows
wide_initial_df = wide_initial_df.drop(
columns=[
col for col in COLUMNS_TO_RM if col in wide_initial_df.columns
]
) # [2402 rows x 253 columns]
# Remove last date rows
wide_initial_df = wide_initial_df.iloc[:-1]
# [2401 rows x 253 columns]
# Remove constant countries
indices = []
for i in range(len(wide_initial_df) - self.window_width + 1):
window_data = wide_initial_df.iloc[i : i + self.window_width]
variances = window_data.var(axis=0, skipna=True, ddof=0)
const_cols = np.where(variances == 0)[
0
] # indices of columns with zero variance
indices.append(const_cols)
# Flatten and get unique column indices
remove_columns = sorted(
set(idx for sublist in indices for idx in sublist)
)
constant_countries = wide_initial_df.columns[remove_columns].tolist()
wide_initial_df.drop(columns=constant_countries, inplace=True)
logger.debug("Constant countries: %s", sorted(constant_countries))
# #10: Check why the constant countries aren't the same as in R code
# Drop all columns (countries) with a maximum lower than the
# max.users number of users.
self.wide_seasoned_df = wide_initial_df.loc[
:, wide_initial_df.max() > self.min_users
].copy()
# [2401 rows x 135 columns]
logger.debug("wide_seasoned.shape %s", self.wide_seasoned_df.shape)
# [2401 rows x 226 columns]
self.long_seasoned_df = self.wide_seasoned_df.melt(
var_name="country",
value_name="users",
ignore_index=False,
) # [324135 rows x 2 columns]
logger.debug("long_seasoned_df.shape %s", self.long_seasoned_df.shape)
# Remove seasonality
new_columns = []
for col in self.wide_seasoned_df:
# `seasonal` argument isn't the same as R `s.window`
stl_fit = STL(
self.wide_seasoned_df[col],
period=self.season_length,
seasonal_deg=0,
robust=True,
).fit()
new_col = pd.Series(
stl_fit.trend + stl_fit.resid,
name=col,
index=self.wide_seasoned_df.index,
)
# To plot stl per row: `stl_fit.plot(); plt.show()`
new_columns.append(new_col)
self.wide_unseasoned_df = pd.concat(new_columns, axis=1)
# self.wide_unseasoned_df = self.wide_unseasoned_df.dropna()
# [2401 rows x 135 columns]
logger.debug(
"wide_unseasoned_df.shape %s", self.wide_unseasoned_df.shape
)
return self.long_seasoned_df, self.wide_unseasoned_df
def read_csv(self) -> tuple[pd.DataFrame, pd.DataFrame] | None:
if not self.read:
return None
self.wide_unseasoned_df = read_csv_subdir(
"wide_unseasoned",
self.train_start_date.strftime("%Y-%m-%d"), # type: ignore
self.train_end_date.strftime("%Y-%m-%d"), # type: ignore
)
self.long_seasoned_df = read_csv_subdir(
"long_seasoned",
self.train_start_date.strftime("%Y-%m-%d"), # type: ignore
self.train_end_date.strftime("%Y-%m-%d"), # type: ignore
)
if (
self.wide_unseasoned_df is not None
and self.long_seasoned_df is not None
):
return self.long_seasoned_df, self.wide_unseasoned_df
logger.info("Couldn't read files, computing them.")
return None
def write_csv(self) -> None:
"""Write main dataframes to files.
In order to do not compute them again later on.
"""
if not self.write:
return
# pylint: disable-next=[line-too-long]
start_str: str = self.train_start_date.strftime("%Y-%m-%d") # type: ignore
# pylint: disable-next=[line-too-long]
end_str: str = self.train_end_date.strftime("%Y-%m-%d") # type: ignore
write_csv_subdir(
self.long_initial_df,
"long_initial",
start_str,
end_str,
)
write_csv_subdir(
self.wide_seasoned_df,
"wide_seasoned",
start_str,
end_str,
)
write_csv_subdir(
self.long_seasoned_df,
"long_seasoned",
start_str,
end_str,
)
write_csv_subdir(
self.wide_unseasoned_df,
"wide_unseasoned",
start_str,
end_str,
)
def run(
self,
) -> tuple[pd.DataFrame, pd.DataFrame]:
read = self.read_csv()
if read is not None:
return read
self.obtain_initial_df()
self.prepare_df()
self.write_csv()
# `wide_unseasoned_df.shape` [2401 rows x 135 columns]
# `long_seasoned_df.shape`: (324135, 2)
return self.long_seasoned_df, self.wide_unseasoned_df
obtain_initial_df()
Obtain the (training) data to apply the analysis.
Either from a file or from the metrics Web.
Eg. input:
date,country,users,lower,upper,frac
2011-03-06,,1042399,,,11
2011-03-06,??,8869,,,11
Output:
- long_initial_df, eg:
country users
date
2011-09-01 ad 76
2011-09-01 ae 5837
Source code in anomaly/preprocessor.py
def obtain_initial_df(
self,
) -> pd.DataFrame:
"""Obtain the (training) data to apply the analysis.
Either from a file or from the metrics Web.
Eg. input:
```text
date,country,users,lower,upper,frac
2011-03-06,,1042399,,,11
2011-03-06,??,8869,,,11
```
Output:
- long_initial_df, eg:
```
country users
date
2011-09-01 ad 76
2011-09-01 ae 5837
```
"""
logger.info("Obtaining data.")
if self.path:
logger.info("Reading path %s", self.path)
self.long_initial_df = read_csv(self.path) # (575550, 2)
else:
# Without dates argument the first date will be `2011-03-06` and
# the last will be the previous? day.
if self.train_start_date:
self.url += f"?start={self.train_start_date}"
if self.train_end_date:
self.url += f"&end={self.train_end_date}"
logger.info("Reading URL %s", self.url)
self.long_initial_df = pd.read_csv(
self.url,
comment="#",
index_col="date",
parse_dates=True,
)
self.long_initial_df.index = self.long_initial_df.index.date
self.long_initial_df.index.name = "date"
# #8: log when the args dates and df date doesn't match
data_start_date = self.long_initial_df.index.min() # type: ignore
data_end_date = self.long_initial_df.index.max() # type: ignore
# Adjust training date interval to the available data
self.train_start_date = data_start_date
self.train_end_date = data_end_date
logger.info(
"Training data is in the date interval %s %s",
self.train_start_date,
self.train_end_date,
)
# Adjust user date interval to the available data
self.start_date = min(
max(self.train_start_date, self.start_date), # type: ignore
self.train_end_date, # type: ignore
)
self.end_date = max(
min(self.train_end_date, self.end_date), # type: ignore
self.train_end_date, # type: ignore
)
logger.info(
"Target data is in the date interval %s %s",
self.start_date,
self.end_date,
)
# #8: log when the args dates and actual dates doesn't match
return self.long_initial_df
prepare_df()
Prepare the (training) data to apply the analysis.
In particular:
- remove not needed columns:
lower,upper,frac, - remove the total counts column
- remove date outliers columns (
2011-03-06) - remove countries outliers rows
- remove any columns that are constant over any time window, as this breaks prcomp with the SCALE=TRUE parameter.
- remove all columns (countries) with a maximum lower than the
MAX_USERSnumber of users, to avoid the unacceptably high variance - remove seasonal (weekly) components from each time series by Loess, to later reconstruct the time series without the weekly cycle components.
Tor usage data, [...] exhibits significant seasonality,
typically on a weekly basis, reflecting changes between usage on
weekdays and at weekends. This continual cyclical change in usage
can reduce the accuracy of principal component analysis due to
varying levels of seasonality exhibited by different countries.
We employ the Seasonal and Trend Decomposition using Loess
(STL) method of Cleveland et al. to remove the seasonal compo-
nent of each series, leaving the trend component and the residual
noise as inputs to our anomaly detector
NOTE: R stl and Python statsmodels.tsa.seasonal.STL doesn't return
the same values with the same input.
Input:
- long_initial_df:
date country users
0 2011-03-06 NaN 1042399.0
1 2011-03-06 ?? 8869.0
2 2011-03-06 a1 1441.0
Output:
- long_seasoned_df, eg:
country users
date
2011-09-01 ae 5837
2011-09-01 al 212
- wide_unseasoned_df, eg:
country ae al
date
2011-09-01 5619.767371 223.472308
Source code in anomaly/preprocessor.py
def prepare_df(
self,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Prepare the (training) data to apply the analysis.
In particular:
- remove not needed columns: `lower`, `upper`, `frac`,
- remove the total counts column
- remove date outliers columns (`2011-03-06`)
- remove countries outliers rows
- remove any columns that are constant over any time window, as this
breaks prcomp with the SCALE=TRUE parameter.
- remove all columns (countries) with a maximum lower than the
`MAX_USERS` number of users, to avoid the unacceptably high variance
- remove seasonal (weekly) components from each time series by Loess,
to later reconstruct the time series without the weekly cycle
components.
```text
Tor usage data, [...] exhibits significant seasonality,
typically on a weekly basis, reflecting changes between usage on
weekdays and at weekends. This continual cyclical change in usage
can reduce the accuracy of principal component analysis due to
varying levels of seasonality exhibited by different countries.
We employ the Seasonal and Trend Decomposition using Loess
(STL) method of Cleveland et al. to remove the seasonal compo-
nent of each series, leaving the trend component and the residual
noise as inputs to our anomaly detector
```
NOTE: R `stl` and Python `statsmodels.tsa.seasonal.STL` doesn't return
the same values with the same input.
Input:
- long_initial_df:
```
date country users
0 2011-03-06 NaN 1042399.0
1 2011-03-06 ?? 8869.0
2 2011-03-06 a1 1441.0
```
Output:
- long_seasoned_df, eg:
```
country users
date
2011-09-01 ae 5837
2011-09-01 al 212
```
- wide_unseasoned_df, eg:
```
country ae al
date
2011-09-01 5619.767371 223.472308
```
"""
logger.info(
"Preparing data with min_allowed %s and window_width %s.",
self.min_users,
self.window_width,
)
# Strip outliers
# Remove no needed columns
# copy the original df, so it can be saved as it was
long_initial_df = self.long_initial_df.copy()
long_initial_df.drop(
columns=["lower", "upper", "frac"], inplace=True
) # [575550 rows x 2 columns]
wide_initial_df: pd.DataFrame = long_initial_df.pivot_table(
index="date", columns="country", values="users"
).copy() # [2403 rows x 258 columns
# In R, this matrix will already have replaced NAN by 0s
wide_initial_df.fillna(0, inplace=True)
wide_initial_df = wide_initial_df.drop(
pd.to_datetime(DATE_TO_RM).date()
) # [2402 rows x 258 columns]
# Remove outliers countries rows
wide_initial_df = wide_initial_df.drop(
columns=[
col for col in COLUMNS_TO_RM if col in wide_initial_df.columns
]
) # [2402 rows x 253 columns]
# Remove last date rows
wide_initial_df = wide_initial_df.iloc[:-1]
# [2401 rows x 253 columns]
# Remove constant countries
indices = []
for i in range(len(wide_initial_df) - self.window_width + 1):
window_data = wide_initial_df.iloc[i : i + self.window_width]
variances = window_data.var(axis=0, skipna=True, ddof=0)
const_cols = np.where(variances == 0)[
0
] # indices of columns with zero variance
indices.append(const_cols)
# Flatten and get unique column indices
remove_columns = sorted(
set(idx for sublist in indices for idx in sublist)
)
constant_countries = wide_initial_df.columns[remove_columns].tolist()
wide_initial_df.drop(columns=constant_countries, inplace=True)
logger.debug("Constant countries: %s", sorted(constant_countries))
# #10: Check why the constant countries aren't the same as in R code
# Drop all columns (countries) with a maximum lower than the
# max.users number of users.
self.wide_seasoned_df = wide_initial_df.loc[
:, wide_initial_df.max() > self.min_users
].copy()
# [2401 rows x 135 columns]
logger.debug("wide_seasoned.shape %s", self.wide_seasoned_df.shape)
# [2401 rows x 226 columns]
self.long_seasoned_df = self.wide_seasoned_df.melt(
var_name="country",
value_name="users",
ignore_index=False,
) # [324135 rows x 2 columns]
logger.debug("long_seasoned_df.shape %s", self.long_seasoned_df.shape)
# Remove seasonality
new_columns = []
for col in self.wide_seasoned_df:
# `seasonal` argument isn't the same as R `s.window`
stl_fit = STL(
self.wide_seasoned_df[col],
period=self.season_length,
seasonal_deg=0,
robust=True,
).fit()
new_col = pd.Series(
stl_fit.trend + stl_fit.resid,
name=col,
index=self.wide_seasoned_df.index,
)
# To plot stl per row: `stl_fit.plot(); plt.show()`
new_columns.append(new_col)
self.wide_unseasoned_df = pd.concat(new_columns, axis=1)
# self.wide_unseasoned_df = self.wide_unseasoned_df.dropna()
# [2401 rows x 135 columns]
logger.debug(
"wide_unseasoned_df.shape %s", self.wide_unseasoned_df.shape
)
return self.long_seasoned_df, self.wide_unseasoned_df
write_csv()
Write main dataframes to files.
In order to do not compute them again later on.
Source code in anomaly/preprocessor.py
def write_csv(self) -> None:
"""Write main dataframes to files.
In order to do not compute them again later on.
"""
if not self.write:
return
# pylint: disable-next=[line-too-long]
start_str: str = self.train_start_date.strftime("%Y-%m-%d") # type: ignore
# pylint: disable-next=[line-too-long]
end_str: str = self.train_end_date.strftime("%Y-%m-%d") # type: ignore
write_csv_subdir(
self.long_initial_df,
"long_initial",
start_str,
end_str,
)
write_csv_subdir(
self.wide_seasoned_df,
"wide_seasoned",
start_str,
end_str,
)
write_csv_subdir(
self.long_seasoned_df,
"long_seasoned",
start_str,
end_str,
)
write_csv_subdir(
self.wide_unseasoned_df,
"wide_unseasoned",
start_str,
end_str,
)