Model Monitoring
🎯 Maqsad
Bu bobni o'qib bo'lgach:
- ML model monitoring DevOps monitoring'dan qanday farq qilishini bilasiz
- Data drift va Concept drift'ni aniqlay olasiz
- Evidently AI bilan production monitoring qurishasiz
- Business KPI'larni model performance bilan bog'lay olasiz
- Alerts va retraining trigger'lar yarata olasiz
Nimani o'rganish kerak
- 3 darajadagi monitoring — infrastructure, model, business
- Data drift — feature distribution o'zgarishi
- Concept drift — input → output relationship o'zgarishi
- Prediction drift — output distribution o'zgarishi
- Performance metrics — accuracy/loss vaqt o'tishi bilan
- Evidently AI — open source monitoring
- WhyLabs, Arize — managed alternatives
- Prometheus + Grafana — infrastructure
- Alerts va retraining triggers
ML monitoring nima uchun maxsus?
DevOps monitoring (backend dev'lar biladi)
- Server CPU/RAM
- Request latency
- Error rate
- Throughput
ML monitoring qo'shimcha
- Input data quality — schema, missing, range
- Feature distribution — drift!
- Prediction distribution — drift!
- Performance vaqt o'tishi bilan — accuracy/loss
- Business KPI — revenue impact, user satisfaction
Misol — drift muammosi
Day 1 (training): age distribution = N(35, 10)
Day 30 (prod): age distribution = N(45, 12) ← drift!
Model accuracy:
- Day 1: 92%
- Day 30: 75% ← muammo!
Aniqlash: feature drift early warning
Yechim: yangi data bilan retraining
Drift turlari
1. Data drift (Covariate shift)
Input distribution o'zgaradi (P(X) o'zgaradi).
- Misol: yangi xil mijozlar paydo bo'ldi (yoshroq, boshqa region)
2. Concept drift
Input ↔ Output relationship o'zgaradi (P(Y|X) o'zgaradi).
- Misol: bir xil features lekin javob boshqa (COVID kabi external event)
3. Prediction drift
Model output distribution o'zgaradi (P(Ŷ) o'zgaradi).
- Misol: 1% spam → 5% spam predict qilmoqda
4. Label drift (training set'da)
Ground truth distribution o'zgaradi.
Kod misollari
Evidently AI — quick start
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, ClassificationPreset
from evidently import ColumnMapping
import pandas as pd
# Reference (training) va Current (production) data
reference = pd.read_csv("data/training_data.csv")
current = pd.read_csv("data/production_data_last_week.csv")
# Column mapping
column_mapping = ColumnMapping(
target="churned",
prediction="prediction",
numerical_features=["age", "income", "tenure_months"],
categorical_features=["plan_type", "country"],
)
# Run data drift report
report = Report(metrics=[DataDriftPreset()])
report.run(reference_data=reference, current_data=current, column_mapping=column_mapping)
# Save report
report.save_html("reports/data_drift.html")
# Programmatic access
result = report.as_dict()
print(result["metrics"][0]["result"]["dataset_drift"]) # True/False
Classification monitoring
from evidently.metric_preset import ClassificationPreset
report = Report(metrics=[ClassificationPreset()])
report.run(
reference_data=reference, # baseline (training)
current_data=current, # production
column_mapping=column_mapping,
)
# Output: accuracy, precision, recall, ROC, etc.
# Reference vs Current comparison
report.save_html("classification_report.html")
Real-time monitoring (production stream)
from evidently.test_suite import TestSuite
from evidently.tests import (
TestNumberOfColumnsWithMissingValues,
TestNumberOfRowsWithMissingValues,
TestNumberOfConstantColumns,
TestNumberOfDuplicatedRows,
TestColumnsType,
)
# Daily test suite
test_suite = TestSuite(tests=[
TestNumberOfColumnsWithMissingValues(),
TestNumberOfRowsWithMissingValues(),
TestNumberOfConstantColumns(),
TestNumberOfDuplicatedRows(),
TestColumnsType(),
])
test_suite.run(reference_data=reference, current_data=daily_batch)
if not test_suite.as_dict()["summary"]["all_passed"]:
send_alert(test_suite.as_dict())
Custom drift detection
from scipy import stats
import numpy as np
def detect_drift_ks(reference: np.ndarray, current: np.ndarray, alpha: float = 0.05) -> dict:
"""Kolmogorov-Smirnov test for distribution drift."""
ks_stat, p_value = stats.ks_2samp(reference, current)
return {
"ks_statistic": float(ks_stat),
"p_value": float(p_value),
"drift_detected": p_value < alpha,
}
def detect_drift_psi(reference: np.ndarray, current: np.ndarray, bins: int = 10) -> float:
"""Population Stability Index (industry standard)."""
breakpoints = np.linspace(reference.min(), reference.max(), bins + 1)
ref_counts = np.histogram(reference, breakpoints)[0]
cur_counts = np.histogram(current, breakpoints)[0]
ref_pct = ref_counts / ref_counts.sum() + 1e-6
cur_pct = cur_counts / cur_counts.sum() + 1e-6
psi = ((cur_pct - ref_pct) * np.log(cur_pct / ref_pct)).sum()
# Interpretation:
# PSI < 0.1 — no significant change
# PSI 0.1-0.2 — moderate change
# PSI > 0.2 — significant change
return float(psi)
# Per-feature drift report
def feature_drift_report(reference_df, current_df, features):
report = {}
for feature in features:
ref_values = reference_df[feature].dropna().values
cur_values = current_df[feature].dropna().values
report[feature] = {
**detect_drift_ks(ref_values, cur_values),
"psi": detect_drift_psi(ref_values, cur_values),
}
return report
Prometheus metrics — production
from prometheus_client import Counter, Histogram, Gauge
import time
# Counters
prediction_count = Counter(
"ml_predictions_total",
"Total predictions",
["model_version", "prediction_class"],
)
# Histograms (distribution)
prediction_latency = Histogram(
"ml_prediction_latency_seconds",
"Prediction latency",
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0],
)
# Gauges (current state)
model_accuracy = Gauge(
"ml_model_accuracy",
"Current model accuracy (rolling 1h)",
["model_version"],
)
drift_score = Gauge(
"ml_feature_drift_psi",
"PSI drift score per feature",
["feature"],
)
# Usage
@app.post("/predict")
async def predict(features: Features):
start = time.perf_counter()
pred = model.predict(features)
prediction_latency.observe(time.perf_counter() - start)
prediction_count.labels(
model_version="v1.2",
prediction_class=str(pred),
).inc()
return {"prediction": int(pred)}
# Background job — update gauges
@app.on_event("startup")
async def schedule_drift_check():
asyncio.create_task(periodic_drift_check())
async def periodic_drift_check():
while True:
await asyncio.sleep(3600) # har soatda
recent = await fetch_recent_predictions(hours=1)
psi_scores = feature_drift_report(reference_data, recent, features)
for feature, scores in psi_scores.items():
drift_score.labels(feature=feature).set(scores["psi"])
if scores["psi"] > 0.2:
await send_alert(f"High drift on {feature}: PSI={scores['psi']:.3f}")
Grafana dashboard JSON (snippet)
{
"panels": [
{
"title": "Prediction Latency (p95)",
"type": "graph",
"targets": [{
"expr": "histogram_quantile(0.95, rate(ml_prediction_latency_seconds_bucket[5m]))",
}]
},
{
"title": "Predictions per second",
"type": "stat",
"targets": [{
"expr": "sum(rate(ml_predictions_total[1m]))",
}]
},
{
"title": "Feature Drift (PSI)",
"type": "heatmap",
"targets": [{
"expr": "ml_feature_drift_psi",
}]
},
{
"title": "Model Accuracy",
"type": "graph",
"targets": [{
"expr": "ml_model_accuracy",
}]
}
]
}
Alerts (Prometheus AlertManager)
# alerts.yml
groups:
- name: ml_alerts
interval: 30s
rules:
- alert: HighDriftDetected
expr: ml_feature_drift_psi > 0.2
for: 5m
labels:
severity: warning
annotations:
summary: "Feature drift detected on {{ $labels.feature }}"
description: "PSI = {{ $value }}"
- alert: ModelAccuracyDrop
expr: ml_model_accuracy < 0.80
for: 30m
labels:
severity: critical
annotations:
summary: "Model accuracy below 80%"
action: "Consider retraining"
- alert: HighLatency
expr: histogram_quantile(0.95, rate(ml_prediction_latency_seconds_bucket[5m])) > 1
for: 5m
labels:
severity: warning
Auto-retraining trigger
async def check_and_retrain():
"""Drift detected'da avtomatik retraining trigger."""
recent_data = await fetch_recent_predictions(days=7)
drift = feature_drift_report(reference_data, recent_data, features)
critical_drift = [f for f, s in drift.items() if s["psi"] > 0.2]
if len(critical_drift) >= 3: # 3+ feature drift bo'lsa
# Trigger retraining DAG
await trigger_airflow_dag("retrain_pipeline", config={
"reason": "drift_detected",
"drifted_features": critical_drift,
})
# Notify
await send_slack_message(
f"🚨 Retraining triggered due to drift on: {critical_drift}"
)
Backend integratsiyasi
Prediction logging
# Postgres'da har prediction'ni log qilish
async def log_prediction(features: dict, prediction, model_version: str):
await db.execute("""
INSERT INTO predictions (timestamp, features, prediction, model_version)
VALUES ($1, $2, $3, $4)
""", datetime.utcnow(), json.dumps(features), prediction, model_version)
@app.post("/predict")
async def predict(features: Features, background: BackgroundTasks):
pred = model.predict(features)
# Background log (response'ni ushlab turmaslik uchun)
background.add_task(log_prediction, features.dict(), pred, "v1.2")
return {"prediction": pred}
# Feedback endpoint (real outcome'ni qaytarish)
@app.post("/predict/{prediction_id}/feedback")
async def submit_feedback(prediction_id: int, actual: int):
await db.execute(
"UPDATE predictions SET actual = $1, feedback_at = NOW() WHERE id = $2",
actual, prediction_id,
)
Daily monitoring job
# scheduled via Airflow
def daily_monitoring():
# 1. Fetch yesterday's predictions + actuals
df = pd.read_sql("""
SELECT * FROM predictions
WHERE timestamp > NOW() - INTERVAL '24 hours'
AND actual IS NOT NULL
""", engine)
# 2. Calculate metrics
accuracy = (df["prediction"] == df["actual"]).mean()
# 3. Compare with reference
reference = pd.read_csv("reference_data.csv")
drift_report = feature_drift_report(reference, df, FEATURES)
# 4. Log to MLflow
with mlflow.start_run(run_name=f"monitoring_{date.today()}"):
mlflow.log_metric("daily_accuracy", accuracy)
for f, s in drift_report.items():
mlflow.log_metric(f"drift_{f}", s["psi"])
# 5. Generate Evidently report
report = Report(metrics=[DataDriftPreset(), ClassificationPreset()])
report.run(reference_data=reference, current_data=df, column_mapping=column_mapping)
report.save_html(f"reports/monitoring_{date.today()}.html")
# 6. Send to S3
upload_to_s3(f"reports/monitoring_{date.today()}.html")
# 7. Alert if needed
if accuracy < 0.80:
send_alert(f"Daily accuracy: {accuracy:.2%}")
Resurslar
- Evidently AI docs — docs.evidentlyai.com
- "Monitoring Machine Learning Models in Production" — Towards Data Science
- "A Survey on Concept Drift Adaptation" — Gama et al.
- WhyLabs docs — docs.whylabs.ai
- Prometheus best practices — prometheus.io/docs/practices
- "Building Machine Learning Pipelines" — Hapke & Nelson
🏋️ Mashqlar
🟢 Easy
- Evidently AI bilan oddiy drift report.
- PSI calculation manual (numpy).
- Prometheus metric'larini FastAPI'ga qo'shing.
🟡 Medium
- Full monitoring setup: Evidently + Prometheus + Grafana lokal Docker'da.
- Drift simulation: training data'dan distribution biroz o'zgartirib drift'ni kuzating.
- Daily monitoring job: Airflow yoki cron bilan automated reports.
🔴 Hard
- End-to-end monitoring: FastAPI + Postgres logs + Prometheus + Evidently + Slack alerts.
- Auto-retraining trigger: drift detected → Airflow DAG trigger.
- A/B test analytics: bir nechta model versiyalarini comparison dashboard.
Capstone
notebooks/month-06/06_monitoring.ipynb + monitoring/:
- Loyihangizdagi modelni monitoring bilan o'rab oling
- Prediction logging (Postgres)
- Daily Evidently reports
- Grafana dashboard
- Slack alert misol
✅ Tekshirish ro'yxati
- Data drift, concept drift, prediction drift farqi
- PSI metric'ni bilaman va hisoblay olaman
- Evidently AI bilan reports
- Prometheus metric'lar (Counter, Histogram, Gauge)
- Grafana dashboard
- AlertManager rules
- Auto-retraining trigger logic
- Production monitoring stack
CI/CD for ML ga o'tamiz.