| | """
|
| | Logging & Monitoring Module
|
| | Author: AI Generated
|
| | Created: 2025-11-24
|
| | Purpose: Track pipeline performance, errors, and model drift
|
| | """
|
| |
|
| | import logging
|
| | from datetime import datetime
|
| | from typing import Dict, Any, Optional
|
| | import json
|
| | from pathlib import Path
|
| | import numpy as np
|
| |
|
| | from database import db
|
| |
|
| |
|
| |
|
| | LOG_DIR = Path("logs")
|
| | LOG_DIR.mkdir(exist_ok=True)
|
| |
|
| | logging.basicConfig(
|
| | level=logging.INFO,
|
| | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
| | handlers=[
|
| | logging.FileHandler(LOG_DIR / 'pipeline.log'),
|
| | logging.StreamHandler()
|
| | ]
|
| | )
|
| |
|
| | logger = logging.getLogger(__name__)
|
| |
|
| |
|
| | class PipelineMonitor:
|
| | """
|
| | Monitor AI pipeline performance and log metrics.
|
| | """
|
| |
|
| | def __init__(self):
|
| | self.metrics_collection = "PipelineMetrics"
|
| |
|
| | def log_segmentation_run(self, metrics: Dict[str, Any]):
|
| | """
|
| | Log segmentation pipeline metrics.
|
| |
|
| | Metrics should include:
|
| | - n_users: Number of users processed
|
| | - n_segments: Number of segments created
|
| | - inertia: K-means inertia
|
| | - execution_time: Time in seconds
|
| | - outliers_removed: Count
|
| | """
|
| | logger.info(f"Segmentation Run: {metrics}")
|
| |
|
| |
|
| | doc = {
|
| | "pipeline": "segmentation",
|
| | "timestamp": datetime.utcnow(),
|
| | "metrics": metrics
|
| | }
|
| | db.get_collection(self.metrics_collection).insert_one(doc)
|
| |
|
| | def log_sentiment_run(self, metrics: Dict[str, Any]):
|
| | """
|
| | Log sentiment analysis metrics.
|
| |
|
| | Metrics should include:
|
| | - n_comments: Number of comments analyzed
|
| | - sentiment_distribution: {Positive: X, Negative: Y, Neutral: Z}
|
| | - avg_confidence: Average confidence score
|
| | - execution_time: Time in seconds
|
| | """
|
| | logger.info(f"Sentiment Analysis Run: {metrics}")
|
| |
|
| | doc = {
|
| | "pipeline": "sentiment",
|
| | "timestamp": datetime.utcnow(),
|
| | "metrics": metrics
|
| | }
|
| | db.get_collection(self.metrics_collection).insert_one(doc)
|
| |
|
| | def log_genai_run(self, task: str, metrics: Dict[str, Any]):
|
| | """
|
| | Log Generative AI metrics.
|
| |
|
| | Metrics should include:
|
| | - n_generated: Number of items generated
|
| | - avg_generation_time: Average time per item
|
| | - total_time: Total execution time
|
| | """
|
| | logger.info(f"GenAI Run ({task}): {metrics}")
|
| |
|
| | doc = {
|
| | "pipeline": "genai",
|
| | "task": task,
|
| | "timestamp": datetime.utcnow(),
|
| | "metrics": metrics
|
| | }
|
| | db.get_collection(self.metrics_collection).insert_one(doc)
|
| |
|
| | def log_error(self, pipeline: str, error: Exception, context: Dict = None):
|
| | """
|
| | Log pipeline errors.
|
| | """
|
| | logger.error(f"Error in {pipeline}: {str(error)}", exc_info=True)
|
| |
|
| | doc = {
|
| | "pipeline": pipeline,
|
| | "timestamp": datetime.utcnow(),
|
| | "error": str(error),
|
| | "error_type": type(error).__name__,
|
| | "context": context or {}
|
| | }
|
| | db.get_collection("PipelineErrors").insert_one(doc)
|
| |
|
| | def detect_drift_segmentation(self, current_centroids: np.ndarray) -> Dict:
|
| | """
|
| | Detect drift in K-means clustering.
|
| | Compare current centroids with previous run.
|
| | """
|
| |
|
| | last_metric = db.get_collection(self.metrics_collection).find_one(
|
| | {"pipeline": "segmentation"},
|
| | sort=[("timestamp", -1)]
|
| | )
|
| |
|
| | if not last_metric or "centroids" not in last_metric["metrics"]:
|
| | logger.info("No previous centroids found for drift detection")
|
| | return {"drift_detected": False, "reason": "no_baseline"}
|
| |
|
| |
|
| | prev_centroids = np.array(last_metric["metrics"]["centroids"])
|
| |
|
| | if prev_centroids.shape != current_centroids.shape:
|
| | return {"drift_detected": True, "reason": "shape_mismatch"}
|
| |
|
| |
|
| | distances = np.linalg.norm(current_centroids - prev_centroids, axis=1)
|
| | avg_drift = float(np.mean(distances))
|
| | max_drift = float(np.max(distances))
|
| |
|
| |
|
| | drift_detected = avg_drift > 0.5
|
| |
|
| | result = {
|
| | "drift_detected": drift_detected,
|
| | "avg_drift": avg_drift,
|
| | "max_drift": max_drift,
|
| | "threshold": 0.5
|
| | }
|
| |
|
| | if drift_detected:
|
| | logger.warning(f"⚠️ Cluster drift detected: avg={avg_drift:.3f}, max={max_drift:.3f}")
|
| |
|
| | return result
|
| |
|
| | def detect_drift_sentiment(self, current_distribution: Dict[str, int]) -> Dict:
|
| | """
|
| | Detect drift in sentiment distribution.
|
| | """
|
| |
|
| | last_metric = db.get_collection(self.metrics_collection).find_one(
|
| | {"pipeline": "sentiment"},
|
| | sort=[("timestamp", -1)]
|
| | )
|
| |
|
| | if not last_metric:
|
| | return {"drift_detected": False, "reason": "no_baseline"}
|
| |
|
| | prev_dist = last_metric["metrics"].get("sentiment_distribution", {})
|
| |
|
| |
|
| | prev_total = sum(prev_dist.values())
|
| | curr_total = sum(current_distribution.values())
|
| |
|
| | if prev_total == 0 or curr_total == 0:
|
| | return {"drift_detected": False, "reason": "insufficient_data"}
|
| |
|
| |
|
| | changes = {}
|
| | for label in ["Positive", "Negative", "Neutral"]:
|
| | prev_pct = prev_dist.get(label, 0) / prev_total
|
| | curr_pct = current_distribution.get(label, 0) / curr_total
|
| | changes[label] = abs(curr_pct - prev_pct)
|
| |
|
| |
|
| | max_change = max(changes.values())
|
| | drift_detected = max_change > 0.1
|
| |
|
| | result = {
|
| | "drift_detected": drift_detected,
|
| | "changes": changes,
|
| | "max_change": max_change,
|
| | "threshold": 0.1
|
| | }
|
| |
|
| | if drift_detected:
|
| | logger.warning(f"⚠️ Sentiment drift detected: max_change={max_change:.1%}")
|
| |
|
| | return result
|
| |
|
| | def get_performance_summary(self, pipeline: str, days: int = 7) -> Dict:
|
| | """
|
| | Get performance summary for the last N days.
|
| | """
|
| | from datetime import timedelta
|
| |
|
| | cutoff = datetime.utcnow() - timedelta(days=days)
|
| |
|
| | metrics = list(db.get_collection(self.metrics_collection).find({
|
| | "pipeline": pipeline,
|
| | "timestamp": {"$gte": cutoff}
|
| | }).sort("timestamp", -1))
|
| |
|
| | if not metrics:
|
| | return {"error": "No metrics found"}
|
| |
|
| |
|
| | total_runs = len(metrics)
|
| | avg_time = np.mean([m["metrics"].get("execution_time", 0) for m in metrics])
|
| |
|
| | return {
|
| | "pipeline": pipeline,
|
| | "period_days": days,
|
| | "total_runs": total_runs,
|
| | "avg_execution_time": avg_time,
|
| | "last_run": metrics[0]["timestamp"]
|
| | }
|
| |
|
| |
|
| |
|
| | monitor = PipelineMonitor()
|
| |
|