Spaces:
Running
Running
| """ | |
| Preprocessing API routes | |
| Endpoints for preprocessing YouTube Takeout data into normalized Event format | |
| """ | |
| from fastapi import APIRouter, HTTPException | |
| from models.schemas import ( | |
| PreprocessRequest, | |
| PreprocessResult, | |
| PreprocessStats, | |
| LanguageBreakdown, | |
| Event, | |
| PreprocessedSessionData | |
| ) | |
| from services.preprocess_service import ( | |
| preprocess_watch_history, | |
| preprocess_search_history, | |
| preprocess_subscriptions, | |
| preprocess_all_files | |
| ) | |
| from services.session_service import create_session, get_session | |
| from datetime import datetime | |
| import uuid | |
| import json | |
| from pathlib import Path | |
| router = APIRouter() | |
| # Storage for preprocessed sessions | |
| STORAGE_DIR = Path(__file__).parent.parent / "storage" | |
| async def preprocess_watch(request: PreprocessRequest): | |
| """ | |
| Preprocess watch-history.json only. | |
| Returns list of normalized watch events. | |
| """ | |
| watch_file = None | |
| for f in request.files: | |
| if f.filename == "watch-history.json": | |
| watch_file = f | |
| break | |
| if not watch_file: | |
| raise HTTPException(status_code=400, detail="watch-history.json not found in files") | |
| events = preprocess_watch_history(watch_file.content, request.timezone) | |
| return [Event(**e) for e in events] | |
| async def preprocess_search(request: PreprocessRequest): | |
| """ | |
| Preprocess search-history.json only. | |
| Returns list of normalized search events. | |
| """ | |
| search_file = None | |
| for f in request.files: | |
| if f.filename == "search-history.json": | |
| search_file = f | |
| break | |
| if not search_file: | |
| raise HTTPException(status_code=400, detail="search-history.json not found in files") | |
| events = preprocess_search_history(search_file.content, request.timezone) | |
| return [Event(**e) for e in events] | |
| async def preprocess_subs(request: PreprocessRequest): | |
| """ | |
| Preprocess subscriptions.csv only. | |
| Returns list of normalized subscription events. | |
| """ | |
| subs_file = None | |
| for f in request.files: | |
| if f.filename == "subscriptions.csv": | |
| subs_file = f | |
| break | |
| if not subs_file: | |
| raise HTTPException(status_code=400, detail="subscriptions.csv not found in files") | |
| events = preprocess_subscriptions(subs_file.content, request.timezone) | |
| return [Event(**e) for e in events] | |
| async def preprocess_all(request: PreprocessRequest): | |
| """ | |
| Preprocess all files and return combined results. | |
| Processes in order: watch-history -> search-history -> subscriptions | |
| Returns individual results and combined events. | |
| """ | |
| files_dict = [{"filename": f.filename, "content": f.content} for f in request.files] | |
| result = preprocess_all_files(files_dict, request.timezone) | |
| return PreprocessResult( | |
| watch_history=[Event(**e) for e in result["watch_history"]], | |
| search_history=[Event(**e) for e in result["search_history"]], | |
| subscriptions=[Event(**e) for e in result["subscriptions"]], | |
| combined_events=[Event(**e) for e in result["combined_events"]], | |
| stats=PreprocessStats( | |
| total_watch=result["stats"]["total_watch"], | |
| total_search=result["stats"]["total_search"], | |
| total_subscribe=result["stats"]["total_subscribe"], | |
| total_events=result["stats"]["total_events"], | |
| language_breakdown=LanguageBreakdown(**result["stats"]["language_breakdown"]) | |
| ), | |
| timezone=request.timezone | |
| ) | |
| async def preprocess_and_store(request: PreprocessRequest): | |
| """ | |
| Preprocess all files, combine results, and store in session. | |
| Returns session token with preprocessed events. | |
| """ | |
| files_dict = [{"filename": f.filename, "content": f.content} for f in request.files] | |
| result = preprocess_all_files(files_dict, request.timezone) | |
| # Create session data | |
| STORAGE_DIR.mkdir(parents=True, exist_ok=True) | |
| token = str(uuid.uuid4()) | |
| created_at = datetime.utcnow().isoformat() + "Z" | |
| session_data = { | |
| "token": token, | |
| "events": result["combined_events"], | |
| "stats": result["stats"], | |
| "timezone": request.timezone, | |
| "created_at": created_at | |
| } | |
| # Save to file | |
| session_path = STORAGE_DIR / f"preprocessed_{token}.json" | |
| with open(session_path, 'w', encoding='utf-8') as f: | |
| json.dump(session_data, f, indent=2) | |
| stats = PreprocessStats( | |
| total_watch=result["stats"]["total_watch"], | |
| total_search=result["stats"]["total_search"], | |
| total_subscribe=result["stats"]["total_subscribe"], | |
| total_events=result["stats"]["total_events"], | |
| language_breakdown=LanguageBreakdown(**result["stats"]["language_breakdown"]) | |
| ) | |
| return PreprocessedSessionData( | |
| token=token, | |
| events=[Event(**e) for e in result["combined_events"]], | |
| stats=stats, | |
| timezone=request.timezone, | |
| created_at=created_at | |
| ) | |
| async def get_preprocessed_session(token: str): | |
| """ | |
| Retrieve preprocessed session data by token. | |
| """ | |
| session_path = STORAGE_DIR / f"preprocessed_{token}.json" | |
| if not session_path.exists(): | |
| raise HTTPException(status_code=404, detail="Preprocessed session not found") | |
| try: | |
| with open(session_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| return PreprocessedSessionData( | |
| token=data["token"], | |
| events=[Event(**e) for e in data["events"]], | |
| stats=PreprocessStats(**data["stats"]), | |
| timezone=data["timezone"], | |
| created_at=data["created_at"] | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error reading session: {str(e)}") | |