Cinewatch-recommender / api /event_logger.py
IntimateUser6969
Deploy CineMatch backend: Two-Tower + DeepFM + MMR + Upstash Redis
1359487
Raw
History Blame Contribute Delete
4.67 kB
"""
Event Logger — records user interactions for offline analysis and model updates.
Events are published to Kafka when available, otherwise persisted in SQLite.
The streaming layer (flink_consumer.py) picks them up and updates the feature store.
Event schema:
{
"event_id": str (uuid4),
"user_id": int,
"event_type": "view" | "click" | "rating" | "search" | "feedback",
"movie_id": int | null,
"timestamp": ISO-8601 string,
"metadata": {}
}
"""
from __future__ import annotations
import json
import logging
import sqlite3
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
logger = logging.getLogger(__name__)
_KAFKA_AVAILABLE = False
try:
from kafka import KafkaProducer as _KafkaProducer
_KAFKA_AVAILABLE = True
except ImportError:
pass
EVENTS_TOPIC = "rec_events"
class EventLogger:
"""
Publishes events to Kafka (preferred) or SQLite (fallback).
Constructed once at API startup and injected via FastAPI dependency.
"""
def __init__(
self,
kafka_bootstrap: str = "localhost:9092",
sqlite_path: str | Path = "events.db",
):
self._producer = None
self._sqlite_path = str(sqlite_path)
self._backend: str
if _KAFKA_AVAILABLE:
try:
self._producer = _KafkaProducer(
bootstrap_servers=[kafka_bootstrap],
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
request_timeout_ms=2000,
)
# Verify connectivity
self._producer.bootstrap_connected()
self._backend = "kafka"
logger.info(f"EventLogger connected to Kafka at {kafka_bootstrap}")
except Exception as e:
logger.warning(f"Kafka unavailable ({e}), falling back to SQLite.")
self._producer = None
if self._producer is None:
self._backend = "sqlite"
self._init_sqlite()
logger.info(f"EventLogger using SQLite at {sqlite_path}")
def _init_sqlite(self) -> None:
con = sqlite3.connect(self._sqlite_path)
con.execute(
"""
CREATE TABLE IF NOT EXISTS events (
event_id TEXT PRIMARY KEY,
user_id INTEGER,
event_type TEXT,
movie_id INTEGER,
timestamp TEXT,
metadata TEXT
)
"""
)
con.commit()
con.close()
def log(
self,
user_id: int,
event_type: str,
movie_id: Optional[int] = None,
metadata: Optional[dict[str, Any]] = None,
) -> str:
event_id = str(uuid.uuid4())
ts = datetime.now(timezone.utc).isoformat()
event = {
"event_id": event_id,
"user_id": user_id,
"event_type": event_type,
"movie_id": movie_id,
"timestamp": ts,
"metadata": metadata or {},
}
if self._backend == "kafka" and self._producer:
self._producer.send(EVENTS_TOPIC, value=event)
else:
self._write_sqlite(event)
return event_id
def _write_sqlite(self, event: dict) -> None:
con = sqlite3.connect(self._sqlite_path)
con.execute(
"""
INSERT OR IGNORE INTO events
(event_id, user_id, event_type, movie_id, timestamp, metadata)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
event["event_id"],
event["user_id"],
event["event_type"],
event.get("movie_id"),
event["timestamp"],
json.dumps(event.get("metadata", {})),
),
)
con.commit()
con.close()
def recent_events(self, limit: int = 100) -> list[dict]:
"""Return the most recent events (SQLite backend only)."""
if self._backend != "sqlite":
return []
con = sqlite3.connect(self._sqlite_path)
rows = con.execute(
"SELECT * FROM events ORDER BY timestamp DESC LIMIT ?", (limit,)
).fetchall()
con.close()
return [
{
"event_id": r[0], "user_id": r[1], "event_type": r[2],
"movie_id": r[3], "timestamp": r[4], "metadata": json.loads(r[5]),
}
for r in rows
]
def close(self) -> None:
if self._producer:
self._producer.flush()
self._producer.close()