Batch ML Prediction Pipeline

Purpose

A complete reference architecture for a batch ML prediction system: from raw data through feature engineering, model training, offline evaluation, batch scoring, and drift monitoring. This end-to-end example demonstrates how the individual engineering patterns combine into a production-grade system for tabular classification or regression.

Examples

Churn prediction: Score all active customers nightly, write propensity scores to a data warehouse, trigger retention campaigns for high-risk segments.

Credit risk scoring: Retrain a gradient boosting model weekly on the latest transaction history; score the entire loan portfolio in bulk; monitor score distributions for drift.

Architecture

System Overview

Raw Data (S3/Warehouse)
         │
         ▼
┌──────────────────┐
│ Data Pipeline    │  DVC + dbt / Spark
│ (batch, daily)   │  schema validation, feature transforms
└────────┬─────────┘
         │  versioned feature parquet
         ▼
┌──────────────────┐
│ Training Job     │  Python + scikit-learn / XGBoost
│ (weekly / event) │  MLflow tracking, DVC experiment
└────────┬─────────┘
         │  registered model
         ▼
┌──────────────────┐
│ Offline Eval     │  train/val/test splits + holdout
│ & Promotion      │  AUC, calibration, fairness checks
└────────┬─────────┘
         │  champion alias set
         ▼
┌──────────────────┐
│ Batch Scorer     │  scheduled container job (Airflow DAG)
│ (nightly)        │  loads model via MLflow alias, scores data
└────────┬─────────┘
         │  predictions parquet → DWH
         ▼
┌──────────────────┐
│ Drift Monitor    │  Evidently weekly report
│ (weekly)         │  PSI check on top features + predictions
└──────────────────┘

Component 1 — Data Pipeline

# scripts/build_features.py
import pandas as pd
from pathlib import Path
 
def build_features(raw_path: str, output_path: str) -> None:
    df = pd.read_parquet(raw_path)
 
    # Feature engineering
    df["tenure_days"] = (pd.Timestamp.now() - pd.to_datetime(df["signup_date"])).dt.days
    df["avg_monthly_spend"] = df["total_spend"] / df["months_active"].clip(lower=1)
    df["support_contacts_30d"] = df["support_contacts"].clip(upper=10)
 
    # Data quality assertions
    assert df["customer_id"].nunique() == len(df), "Duplicate customer_ids"
    assert df[["tenure_days", "avg_monthly_spend"]].notna().all().all(), "Nulls in features"
 
    # Write versioned output
    Path(output_path).parent.mkdir(parents=True, exist_ok=True)
    df.to_parquet(output_path, index=False)
    print(f"Wrote {len(df):,} rows to {output_path}")

Component 2 — Model Training

# scripts/train.py
import mlflow
import mlflow.sklearn
import pandas as pd
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import StratifiedKFold, cross_val_score
from sklearn.metrics import roc_auc_score
 
mlflow.set_experiment("churn-prediction")
 
with mlflow.start_run(run_name="weekly-retrain"):
    df = pd.read_parquet("data/splits/train.parquet")
    X, y = df.drop(columns=["churned", "customer_id"]), df["churned"]
 
    params = {
        "n_estimators": 400,
        "learning_rate": 0.05,
        "max_depth": 4,
        "subsample": 0.8,
        "min_samples_leaf": 50,
    }
    mlflow.log_params(params)
 
    model = GradientBoostingClassifier(**params, random_state=42)
 
    # Cross-validated AUC on training set
    cv_aucs = cross_val_score(model, X, y, cv=StratifiedKFold(5), scoring="roc_auc")
    mlflow.log_metric("cv_auc_mean", cv_aucs.mean())
    mlflow.log_metric("cv_auc_std", cv_aucs.std())
 
    # Fit on full training data
    model.fit(X, y)
 
    # Evaluate on holdout
    val = pd.read_parquet("data/splits/val.parquet")
    val_auc = roc_auc_score(val["churned"], model.predict_proba(val.drop(columns=["churned", "customer_id"]))[:, 1])
    mlflow.log_metric("val_auc", val_auc)
 
    # Register model
    mlflow.sklearn.log_model(
        model,
        artifact_path="model",
        registered_model_name="churn-predictor",
        input_example=X.head(5),
    )
    print(f"val_auc={val_auc:.4f}")

Component 3 — Offline Evaluation and Promotion

# scripts/evaluate_and_promote.py
from mlflow.tracking import MlflowClient
import pandas as pd
from sklearn.metrics import roc_auc_score, average_precision_score
from sklearn.calibration import calibration_curve
import mlflow.pyfunc
 
client = MlflowClient()
 
def evaluate_and_promote(model_name: str, new_version: str, threshold: float = 0.92):
    # Load new model
    new_model = mlflow.pyfunc.load_model(f"models:/{model_name}/{new_version}")
 
    # Load holdout test set (never seen during training or validation)
    test = pd.read_parquet("data/splits/test.parquet")
    X_test = test.drop(columns=["churned", "customer_id"])
    y_test = test["churned"]
 
    probs = new_model.predict(X_test)
    auc = roc_auc_score(y_test, probs)
    ap = average_precision_score(y_test, probs)
 
    print(f"New model — AUC={auc:.4f}, AP={ap:.4f}")
 
    # Compare against champion
    try:
        champion = mlflow.pyfunc.load_model(f"models:/{model_name}@champion")
        champion_probs = champion.predict(X_test)
        champion_auc = roc_auc_score(y_test, champion_probs)
        print(f"Champion AUC={champion_auc:.4f}")
 
        if auc > champion_auc and auc >= threshold:
            client.set_registered_model_alias(model_name, "champion", new_version)
            print(f"✓ Promoted version {new_version} to champion")
        else:
            print("✗ New model did not outperform champion; keeping existing champion")
    except Exception:
        # No champion yet — promote directly if above threshold
        if auc >= threshold:
            client.set_registered_model_alias(model_name, "champion", new_version)
            print(f"✓ Set version {new_version} as first champion")

Component 4 — Nightly Batch Scorer

# scripts/batch_score.py (runs nightly via Airflow or cron)
import mlflow.pyfunc
import pandas as pd
from datetime import date
 
def score_daily(scoring_date: str = None):
    scoring_date = scoring_date or str(date.today())
 
    # Load champion model
    model = mlflow.pyfunc.load_model("models:/churn-predictor@champion")
 
    # Load all active customers for today
    df = pd.read_parquet(f"s3://data-lake/customers/date={scoring_date}/")
    features = df.drop(columns=["customer_id"])
 
    df["churn_score"] = model.predict(features)
    df["score_date"] = scoring_date
    df["model_version"] = "champion"
 
    # Write to data warehouse
    df[["customer_id", "churn_score", "score_date", "model_version"]].to_parquet(
        f"s3://predictions/churn/date={scoring_date}/scores.parquet", index=False
    )
    print(f"Scored {len(df):,} customers for {scoring_date}")

Component 5 — Weekly Drift Check

# scripts/weekly_drift.py
from evidently.test_suite import TestSuite
from evidently.tests import TestShareOfDriftedColumns, TestColumnDrift
import pandas as pd
 
reference = pd.read_parquet("data/reference/train_sample.parquet")
current = pd.read_parquet(f"s3://predictions/churn/date=latest/features.parquet")
 
suite = TestSuite(tests=[
    TestShareOfDriftedColumns(lt=0.3),
    TestColumnDrift("avg_monthly_spend", stattest="psi", stattest_threshold=0.2),
    TestColumnDrift("tenure_days", stattest="ks"),
])
suite.run(reference_data=reference, current_data=current)
 
if not suite.as_dict()["summary"]["all_passed"]:
    # Trigger retraining via CI
    import subprocess
    subprocess.run(["gh", "workflow", "run", "train_validate.yml", "--ref", "main"])

Deployment (Airflow DAG)

# dags/churn_pipeline_dag.py
from airflow.decorators import dag, task
from datetime import datetime
 
@dag(schedule="0 2 * * *", start_date=datetime(2026, 1, 1), catchup=False)
def churn_prediction_pipeline():
 
    @task
    def build_features():
        import subprocess
        subprocess.run(["dvc", "repro", "featurise"], check=True)
 
    @task
    def batch_score():
        from scripts.batch_score import score_daily
        score_daily()
 
    @task
    def check_drift():
        from scripts.weekly_drift import run_weekly_drift_check
        run_weekly_drift_check()
 
    build_features() >> batch_score() >> check_drift()
 
churn_prediction_pipeline()

References