Tabular Classification Pipeline
Purpose
A complete reference architecture for a tabular supervised ML system, from raw CSV to monitored production API. This example synthesizes EDA, feature engineering, XGBoost training with MLflow tracking, SHAP explanations, FastAPI serving, and Evidently drift monitoring into a single coherent system.
Examples
Customer churn prediction: Score all customers daily; flag high-risk segments for retention campaigns; monitor feature distributions weekly for drift.
Credit risk scoring: Retrain gradient-boosted model monthly on latest transaction data; explain individual credit decisions via SHAP waterfall plots for regulatory audit.
Architecture
Raw data (CSV / DWH table)
│
├──[1] EDA → data quality report (nulls, distributions, correlations)
│
├──[2] Feature pipeline (sklearn Pipeline: impute → encode → scale)
│
├──[3] XGBoost training + MLflow experiment tracking
│ ├── hyperparameter tuning (Optuna)
│ └── evaluation gate (AUC vs. champion)
│
├──[4] SHAP explanations (TreeExplainer → beeswarm + waterfall)
│
├──[5] FastAPI inference endpoint
│ ├── /predict — return probability + top SHAP contributors
│ └── /health — liveness probe
│
└──[6] Evidently drift monitoring
├── weekly data drift report
└── automated retraining trigger
Step 1: EDA
import pandas as pd, numpy as np, matplotlib.pyplot as plt, seaborn as sns
df = pd.read_csv("data/customers.csv")
# Data quality summary
print(df.dtypes)
print(df.isnull().sum().sort_values(ascending=False).head(10))
print(df.describe())
# Target distribution
df["churn"].value_counts(normalize=True).plot(kind="bar", title="Churn distribution")
plt.tight_layout(); plt.savefig("reports/target_dist.png")
# Correlation heatmap (numeric features)
corr = df.select_dtypes("number").corr()
sns.heatmap(corr, cmap="coolwarm", vmax=1, vmin=-1, center=0, square=True)
plt.savefig("reports/correlation_heatmap.png")Step 2: Feature Pipeline
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OrdinalEncoder
from sklearn.impute import SimpleImputer
NUMERIC_FEATS = ["tenure", "monthly_charges", "total_charges", "txn_count_30d"]
CATEGORICAL_FEATS = ["contract_type", "payment_method", "internet_service"]
numeric_pipeline = Pipeline([
("impute", SimpleImputer(strategy="median")),
("scale", StandardScaler()),
])
categorical_pipeline = Pipeline([
("impute", SimpleImputer(strategy="most_frequent")),
("encode", OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=-1)),
])
preprocessor = ColumnTransformer([
("num", numeric_pipeline, NUMERIC_FEATS),
("cat", categorical_pipeline, CATEGORICAL_FEATS),
])Step 3: XGBoost Training + MLflow
import xgboost as xgb, mlflow, mlflow.xgboost, optuna
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
X, y = df[NUMERIC_FEATS + CATEGORICAL_FEATS], df["churn"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
X_train_p, X_test_p = preprocessor.fit_transform(X_train), preprocessor.transform(X_test)
mlflow.set_experiment("churn-xgboost")
def objective(trial):
params = {
"n_estimators": trial.suggest_int("n_estimators", 100, 500),
"max_depth": trial.suggest_int("max_depth", 3, 7),
"learning_rate": trial.suggest_float("learning_rate", 0.01, 0.2, log=True),
"subsample": trial.suggest_float("subsample", 0.6, 1.0),
"colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0),
}
model = xgb.XGBClassifier(**params, eval_metric="logloss", random_state=42)
model.fit(X_train_p, y_train, eval_set=[(X_test_p, y_test)], verbose=False)
return roc_auc_score(y_test, model.predict_proba(X_test_p)[:, 1])
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=50, show_progress_bar=True)
# Final training run with best params
with mlflow.start_run(run_name="challenger_best"):
best_params = study.best_params
model = xgb.XGBClassifier(**best_params, eval_metric="logloss", random_state=42)
model.fit(X_train_p, y_train)
auc = roc_auc_score(y_test, model.predict_proba(X_test_p)[:, 1])
mlflow.log_params(best_params)
mlflow.log_metric("val_auc", auc)
mlflow.xgboost.log_model(model, "model", registered_model_name="churn_xgboost")
print(f"AUC: {auc:.4f}")Step 4: SHAP Explanations
import shap, joblib
explainer = shap.TreeExplainer(model)
shap_values = explainer(X_test_p)
# Global: beeswarm plot
shap.plots.beeswarm(shap_values, max_display=15)
plt.savefig("reports/shap_beeswarm.png", bbox_inches="tight")
# Local: waterfall for the highest-risk customer
top_risk_idx = model.predict_proba(X_test_p)[:, 1].argmax()
shap.plots.waterfall(shap_values[top_risk_idx])
plt.savefig("reports/shap_waterfall_top_risk.png", bbox_inches="tight")
# Persist artefacts for serving
joblib.dump(preprocessor, "artefacts/preprocessor.pkl")
model.save_model("artefacts/model.json")
joblib.dump(explainer, "artefacts/explainer.pkl")Step 5: FastAPI Serving
# api.py
import joblib, xgboost as xgb, shap, numpy as np
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI(title="Churn Prediction API")
pp = joblib.load("artefacts/preprocessor.pkl")
model = xgb.XGBClassifier(); model.load_model("artefacts/model.json")
exp = joblib.load("artefacts/explainer.pkl")
FEAT_NAMES = ["tenure", "monthly_charges", "total_charges", "txn_count_30d",
"contract_type", "payment_method", "internet_service"]
class Customer(BaseModel):
tenure: float
monthly_charges: float
total_charges: float
txn_count_30d: int
contract_type: str
payment_method: str
internet_service: str
@app.post("/predict")
def predict(customer: Customer):
import pandas as pd
row = pd.DataFrame([customer.dict()])
X_p = pp.transform(row)
prob = float(model.predict_proba(X_p)[0, 1])
sv = exp(X_p).values[0]
top3 = sorted(zip(FEAT_NAMES, sv), key=lambda x: abs(x[1]), reverse=True)[:3]
return {"churn_probability": prob, "top_contributors": [{"feature": f, "shap": s} for f, s in top3]}
@app.get("/health")
def health(): return {"status": "ok"}uvicorn api:app --host 0.0.0.0 --port 8080 --workers 2Step 6: Drift Monitoring
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
reference = pd.read_parquet("data/training_features.parquet")[FEAT_NAMES]
current = pd.read_parquet("data/production_features_last_week.parquet")[FEAT_NAMES]
report = Report(metrics=[DataDriftPreset()])
report.run(reference_data=reference, current_data=current)
report.save_html("reports/weekly_drift.html")
n_drifted = report.as_dict()["metrics"][0]["result"]["number_of_drifted_columns"]
if n_drifted > 2:
print(f"WARNING: {n_drifted} features drifted — trigger retraining")Key Trade-offs
| Decision | Choice | Alternative |
|---|---|---|
| Model | XGBoost (fast, explainable) | LightGBM (faster on large data), CatBoost (native categoricals) |
| Serving | FastAPI (flexible, fast dev) | BentoML (built-in batching, model versioning) |
| Explainability | SHAP TreeExplainer (exact) | Partial dependence (global only) |
| Monitoring | Evidently (open-source) | Fiddler, Arize (managed) |
References
Links
Data Science & Modeling
- Exploratory Data Analysis — EDA patterns and diagnostic plots
- Feature Engineering — encoding, scaling, imputation strategies
- SHAP and Feature Attribution — Shapley value theory
Model Implementations
- Tree Ensembles Implementation — XGBoost, LightGBM, Optuna tuning code
- Interpretability Implementation — SHAP TreeExplainer, PDP, permutation importance
System Patterns
- Model Serving with FastAPI — production API patterns
- Training Pipeline Pattern — MLflow + DVC training pipeline
- Model Monitoring System — Evidently + Prometheus + alerting