Anomaly Detection Pipeline

Purpose

A complete anomaly detection system combining unsupervised learning (Isolation Forest, GMM), dimensionality reduction, and systematic evaluation. Applies to fraud detection, infrastructure monitoring, data quality, and industrial defect detection.

Examples

  • Isolation Forest on tabular sensor / transaction data
  • PCA for high-dimensional data compression before detection
  • GMM density-based anomaly scoring
  • Evaluation in the absence of ground-truth labels

Architecture

┌────────────────────────────┐
│  Raw Feature Matrix         │  Tabular, potentially high-dim
└──────────────┬─────────────┘
               │
      ┌────────▼────────┐
      │  EDA + Cleaning │  Missingness, outlier profiling
      └────────┬────────┘
               │
      ┌────────▼────────┐
      │  Preprocessing  │  StandardScaler, PCA (optional)
      └────────┬────────┘
               │
    ┌──────────┴──────────┐
    │                     │
┌───▼───────────┐  ┌──────▼──────────┐
│ Isolation     │  │  GMM Density    │
│ Forest        │  │  Scoring        │
└───┬───────────┘  └──────┬──────────┘
    │                     │
    └──────────┬──────────┘
               │
      ┌────────▼────────┐
      │ Score Ensemble  │  Rank / threshold
      └────────┬────────┘
               │
      ┌────────▼────────┐
      │ Evaluation      │  Precision@k, PR-AUC if labels exist
      │ + Alerts        │
      └─────────────────┘

Step-by-Step Implementation

1. EDA and Preprocessing

import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
 
df = pd.read_csv("transactions.csv")
 
# Quick profiling
print(df.describe())
print(f"Missing: {df.isnull().mean().sort_values(ascending=False).head()}")
 
# Scale
scaler = StandardScaler()
X = scaler.fit_transform(df.select_dtypes(include="number"))
 
# Optional: PCA to 95% variance for high-dimensional data
pca = PCA(n_components=0.95, random_state=42)
X_pca = pca.fit_transform(X)
print(f"PCA reduced to {X_pca.shape[1]} components")

2. Isolation Forest

from sklearn.ensemble import IsolationForest
 
iso = IsolationForest(
    n_estimators=200,
    contamination=0.05,   # set based on expected anomaly rate
    max_samples="auto",
    random_state=42,
    n_jobs=-1
)
iso.fit(X_pca)
 
scores_iso = iso.score_samples(X_pca)   # lower = more anomalous
labels_iso = iso.predict(X_pca)         # -1 = anomaly
print(f"Anomalies detected: {(labels_iso == -1).sum()}")

3. GMM Density Scoring

from sklearn.mixture import GaussianMixture
 
# Select k by BIC
bics = [GaussianMixture(k, n_init=3, random_state=42).fit(X_pca).bic(X_pca)
        for k in range(2, 10)]
best_k = np.argmin(bics) + 2
 
gm = GaussianMixture(best_k, n_init=5, covariance_type="full", random_state=42)
gm.fit(X_pca)
 
log_probs = gm.score_samples(X_pca)   # log-likelihood; low = anomaly
threshold = np.percentile(log_probs, 5)
labels_gmm = (log_probs < threshold).astype(int)

4. Score Ensemble

from scipy.stats import rankdata
 
# Normalise scores to [0, 1] (higher = more anomalous)
rank_iso = rankdata(-scores_iso) / len(scores_iso)
rank_gmm = rankdata(-log_probs)  / len(log_probs)
 
ensemble_score = 0.5 * rank_iso + 0.5 * rank_gmm
 
# Flag top-k as anomalies
k = 100
top_k_idx = np.argsort(ensemble_score)[-k:]

5. Evaluation

If ground-truth labels are available (semi-supervised setting):

from sklearn.metrics import roc_auc_score, average_precision_score
 
y_true = df["is_fraud"].values  # 0/1
 
print(f"IF  ROC-AUC: {roc_auc_score(y_true, -scores_iso):.3f}")
print(f"GMM ROC-AUC: {roc_auc_score(y_true, -log_probs):.3f}")
print(f"Ens ROC-AUC: {roc_auc_score(y_true, ensemble_score):.3f}")
 
# Average Precision (PR-AUC) — better for imbalanced
print(f"Ens AP: {average_precision_score(y_true, ensemble_score):.3f}")

Without labels (unsupervised evaluation):

# Visual inspection: project anomalies via UMAP
import umap
 
reducer = umap.UMAP(n_components=2, random_state=42)
X_2d = reducer.fit_transform(X_pca)
 
import matplotlib.pyplot as plt
plt.scatter(X_2d[:, 0], X_2d[:, 1], c=ensemble_score, cmap="Reds", s=2, alpha=0.5)
plt.colorbar(label="Anomaly score")
plt.title("UMAP — anomaly score overlay")
plt.show()

6. Operational Deployment

import joblib
 
joblib.dump({
    "scaler": scaler,
    "pca": pca,
    "iso": iso,
    "gm": gm
}, "anomaly_detector.pkl")
 
def score_new_batch(X_new: np.ndarray, artefacts: dict) -> np.ndarray:
    X_s = artefacts["scaler"].transform(X_new)
    X_p = artefacts["pca"].transform(X_s)
    r_iso = -artefacts["iso"].score_samples(X_p)
    r_gmm = -artefacts["gm"].score_samples(X_p)
    return 0.5 * (rankdata(r_iso) / len(r_iso)) + \
           0.5 * (rankdata(r_gmm) / len(r_gmm))

Trade-offs

  • Isolation Forest: fast, scalable, tree-based; works well in high dimensions; not differentiable.
  • GMM: provides probability estimates; sensitive to choice; struggles in very high dimensions.
  • Ensemble: more robust to individual model failure; harder to explain to stakeholders.
  • When ground-truth labels become available, switch to a supervised model (XGBoost) trained on confirmed positives.

References