| """ |
| Event Logger — records user interactions for offline analysis and model updates. |
| |
| Events are published to Kafka when available, otherwise persisted in SQLite. |
| The streaming layer (flink_consumer.py) picks them up and updates the feature store. |
| |
| Event schema: |
| { |
| "event_id": str (uuid4), |
| "user_id": int, |
| "event_type": "view" | "click" | "rating" | "search" | "feedback", |
| "movie_id": int | null, |
| "timestamp": ISO-8601 string, |
| "metadata": {} |
| } |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import sqlite3 |
| import uuid |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from typing import Any, Optional |
|
|
| logger = logging.getLogger(__name__) |
|
|
| _KAFKA_AVAILABLE = False |
| try: |
| from kafka import KafkaProducer as _KafkaProducer |
| _KAFKA_AVAILABLE = True |
| except ImportError: |
| pass |
|
|
|
|
| EVENTS_TOPIC = "rec_events" |
|
|
|
|
| class EventLogger: |
| """ |
| Publishes events to Kafka (preferred) or SQLite (fallback). |
| Constructed once at API startup and injected via FastAPI dependency. |
| """ |
|
|
| def __init__( |
| self, |
| kafka_bootstrap: str = "localhost:9092", |
| sqlite_path: str | Path = "events.db", |
| ): |
| self._producer = None |
| self._sqlite_path = str(sqlite_path) |
| self._backend: str |
|
|
| if _KAFKA_AVAILABLE: |
| try: |
| self._producer = _KafkaProducer( |
| bootstrap_servers=[kafka_bootstrap], |
| value_serializer=lambda v: json.dumps(v).encode("utf-8"), |
| request_timeout_ms=2000, |
| ) |
| |
| self._producer.bootstrap_connected() |
| self._backend = "kafka" |
| logger.info(f"EventLogger connected to Kafka at {kafka_bootstrap}") |
| except Exception as e: |
| logger.warning(f"Kafka unavailable ({e}), falling back to SQLite.") |
| self._producer = None |
|
|
| if self._producer is None: |
| self._backend = "sqlite" |
| self._init_sqlite() |
| logger.info(f"EventLogger using SQLite at {sqlite_path}") |
|
|
| def _init_sqlite(self) -> None: |
| con = sqlite3.connect(self._sqlite_path) |
| con.execute( |
| """ |
| CREATE TABLE IF NOT EXISTS events ( |
| event_id TEXT PRIMARY KEY, |
| user_id INTEGER, |
| event_type TEXT, |
| movie_id INTEGER, |
| timestamp TEXT, |
| metadata TEXT |
| ) |
| """ |
| ) |
| con.commit() |
| con.close() |
|
|
| def log( |
| self, |
| user_id: int, |
| event_type: str, |
| movie_id: Optional[int] = None, |
| metadata: Optional[dict[str, Any]] = None, |
| ) -> str: |
| event_id = str(uuid.uuid4()) |
| ts = datetime.now(timezone.utc).isoformat() |
| event = { |
| "event_id": event_id, |
| "user_id": user_id, |
| "event_type": event_type, |
| "movie_id": movie_id, |
| "timestamp": ts, |
| "metadata": metadata or {}, |
| } |
|
|
| if self._backend == "kafka" and self._producer: |
| self._producer.send(EVENTS_TOPIC, value=event) |
| else: |
| self._write_sqlite(event) |
|
|
| return event_id |
|
|
| def _write_sqlite(self, event: dict) -> None: |
| con = sqlite3.connect(self._sqlite_path) |
| con.execute( |
| """ |
| INSERT OR IGNORE INTO events |
| (event_id, user_id, event_type, movie_id, timestamp, metadata) |
| VALUES (?, ?, ?, ?, ?, ?) |
| """, |
| ( |
| event["event_id"], |
| event["user_id"], |
| event["event_type"], |
| event.get("movie_id"), |
| event["timestamp"], |
| json.dumps(event.get("metadata", {})), |
| ), |
| ) |
| con.commit() |
| con.close() |
|
|
| def recent_events(self, limit: int = 100) -> list[dict]: |
| """Return the most recent events (SQLite backend only).""" |
| if self._backend != "sqlite": |
| return [] |
| con = sqlite3.connect(self._sqlite_path) |
| rows = con.execute( |
| "SELECT * FROM events ORDER BY timestamp DESC LIMIT ?", (limit,) |
| ).fetchall() |
| con.close() |
| return [ |
| { |
| "event_id": r[0], "user_id": r[1], "event_type": r[2], |
| "movie_id": r[3], "timestamp": r[4], "metadata": json.loads(r[5]), |
| } |
| for r in rows |
| ] |
|
|
| def close(self) -> None: |
| if self._producer: |
| self._producer.flush() |
| self._producer.close() |
|
|