Spaces:
Running
Running
| """ | |
| API routes for topic extraction v2. | |
| """ | |
| from fastapi import APIRouter, HTTPException | |
| from pydantic import BaseModel | |
| from typing import List, Optional, Dict | |
| from pathlib import Path | |
| import json | |
| from services.micro_topic_service import ( | |
| extract_micro_topics_v2, | |
| process_events_batch, | |
| get_aggregated_topics, | |
| extract_hashtags | |
| ) | |
| topic_router = APIRouter(prefix="/api/topics", tags=["Topics"]) | |
| class ExtractRequest(BaseModel): | |
| text: str | |
| language_type: str = "english" | |
| class ExtractResponse(BaseModel): | |
| text: str | |
| language_type: str | |
| hashtags: List[str] | |
| ner: List[str] | |
| nouns: List[str] | |
| text_v1: Optional[str] = None | |
| micro_topics: List[str] | |
| async def extract_topics_single(request: ExtractRequest): | |
| """ | |
| Extract micro topics from a single text. | |
| Useful for testing the extraction pipeline. | |
| """ | |
| # Create a mock event to use the extraction function | |
| mock_event = { | |
| "type": "watch", | |
| "engagement": "active", | |
| "text_clean": request.text, | |
| "language_type": request.language_type | |
| } | |
| # Process it | |
| result = extract_micro_topics_v2(mock_event) | |
| return ExtractResponse( | |
| text=request.text, | |
| language_type=request.language_type, | |
| hashtags=result.get("hashtags", []), | |
| ner=result.get("ner", []), | |
| nouns=result.get("nouns", []), | |
| text_v1=result.get("text_v1"), | |
| micro_topics=result.get("micro_topics", []) | |
| ) | |
| async def enrich_session_topics(token: str): | |
| """ | |
| Enrich all events in a session with micro topics. | |
| Only processes events with type=watch and engagement=active. | |
| Adds hashtags, ner, nouns, text_v1, and micro_topics fields. | |
| """ | |
| storage_dir = Path("storage") | |
| file_path = storage_dir / f"preprocessed_{token}.json" | |
| if not file_path.exists(): | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| # Load events | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| events = data.get("events", []) | |
| # Count qualifying events before processing | |
| active_watch_count = sum( | |
| 1 for e in events | |
| if e.get("type") == "watch" and e.get("engagement") == "active" | |
| ) | |
| # Process events | |
| processed_events = process_events_batch(events) | |
| # Count results | |
| events_with_topics = sum(1 for e in processed_events if e.get("micro_topics")) | |
| total_topics = sum(len(e.get("micro_topics", [])) for e in processed_events) | |
| total_hashtags = sum(len(e.get("hashtags", [])) for e in processed_events) | |
| total_ner = sum(len(e.get("ner", [])) for e in processed_events) | |
| total_nouns = sum(len(e.get("nouns", [])) for e in processed_events) | |
| # Save updated data | |
| data["events"] = processed_events | |
| data["micro_topics_extracted"] = True | |
| data["extraction_version"] = "v2" | |
| with open(file_path, "w", encoding="utf-8") as f: | |
| json.dump(data, f, ensure_ascii=False, indent=2) | |
| return { | |
| "token": token, | |
| "total_events": len(events), | |
| "active_watch_events": active_watch_count, | |
| "events_with_topics": events_with_topics, | |
| "extraction_stats": { | |
| "total_hashtags": total_hashtags, | |
| "total_ner": total_ner, | |
| "total_nouns": total_nouns, | |
| "total_micro_topics": total_topics | |
| }, | |
| "status": "enriched" | |
| } | |
| async def get_session_topics(token: str, top_n: int = 50): | |
| """ | |
| Get aggregated micro topics for a session. | |
| Returns: | |
| - top_hashtags: Most common hashtags | |
| - top_ner: Most common named entities | |
| - top_nouns: Most common nouns | |
| - top_micro_topics: Most common overall | |
| """ | |
| storage_dir = Path("storage") | |
| file_path = storage_dir / f"preprocessed_{token}.json" | |
| if not file_path.exists(): | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| # Load events | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| events = data.get("events", []) | |
| # Check if topics are extracted | |
| if not data.get("micro_topics_extracted"): | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Topics not extracted yet. Call POST /{token}/enrich first." | |
| ) | |
| # Aggregate topics | |
| aggregated = get_aggregated_topics(events, top_n) | |
| # Add language breakdown | |
| from collections import Counter | |
| language_topics = {"english": [], "hindi": [], "hinglish": [], "unknown": []} | |
| for event in events: | |
| if event.get("type") == "watch" and event.get("engagement") == "active": | |
| lang = event.get("language_type", "unknown") | |
| topics = event.get("micro_topics", []) | |
| if lang in language_topics: | |
| language_topics[lang].extend(topics) | |
| language_breakdown = { | |
| lang: [{"topic": t, "count": c} for t, c in Counter(topics).most_common(20)] | |
| for lang, topics in language_topics.items() | |
| if topics # Only include languages with topics | |
| } | |
| return { | |
| "token": token, | |
| "version": data.get("extraction_version", "v1"), | |
| "stats": aggregated["stats"], | |
| "top_hashtags": aggregated["top_hashtags"], | |
| "top_ner": aggregated["top_ner"], | |
| "top_nouns": aggregated["top_nouns"], | |
| "top_micro_topics": aggregated["top_micro_topics"], | |
| "by_language": language_breakdown | |
| } | |