ido / api /preprocess_routes.py
Parthnuwal7
Adding backend to HF spaces
27d04ef
"""
Preprocessing API routes
Endpoints for preprocessing YouTube Takeout data into normalized Event format
"""
from fastapi import APIRouter, HTTPException
from models.schemas import (
PreprocessRequest,
PreprocessResult,
PreprocessStats,
LanguageBreakdown,
Event,
PreprocessedSessionData
)
from services.preprocess_service import (
preprocess_watch_history,
preprocess_search_history,
preprocess_subscriptions,
preprocess_all_files
)
from services.session_service import create_session, get_session
from datetime import datetime
import uuid
import json
from pathlib import Path
router = APIRouter()
# Storage for preprocessed sessions
STORAGE_DIR = Path(__file__).parent.parent / "storage"
@router.post("/watch-history", response_model=list[Event])
async def preprocess_watch(request: PreprocessRequest):
"""
Preprocess watch-history.json only.
Returns list of normalized watch events.
"""
watch_file = None
for f in request.files:
if f.filename == "watch-history.json":
watch_file = f
break
if not watch_file:
raise HTTPException(status_code=400, detail="watch-history.json not found in files")
events = preprocess_watch_history(watch_file.content, request.timezone)
return [Event(**e) for e in events]
@router.post("/search-history", response_model=list[Event])
async def preprocess_search(request: PreprocessRequest):
"""
Preprocess search-history.json only.
Returns list of normalized search events.
"""
search_file = None
for f in request.files:
if f.filename == "search-history.json":
search_file = f
break
if not search_file:
raise HTTPException(status_code=400, detail="search-history.json not found in files")
events = preprocess_search_history(search_file.content, request.timezone)
return [Event(**e) for e in events]
@router.post("/subscriptions", response_model=list[Event])
async def preprocess_subs(request: PreprocessRequest):
"""
Preprocess subscriptions.csv only.
Returns list of normalized subscription events.
"""
subs_file = None
for f in request.files:
if f.filename == "subscriptions.csv":
subs_file = f
break
if not subs_file:
raise HTTPException(status_code=400, detail="subscriptions.csv not found in files")
events = preprocess_subscriptions(subs_file.content, request.timezone)
return [Event(**e) for e in events]
@router.post("/all", response_model=PreprocessResult)
async def preprocess_all(request: PreprocessRequest):
"""
Preprocess all files and return combined results.
Processes in order: watch-history -> search-history -> subscriptions
Returns individual results and combined events.
"""
files_dict = [{"filename": f.filename, "content": f.content} for f in request.files]
result = preprocess_all_files(files_dict, request.timezone)
return PreprocessResult(
watch_history=[Event(**e) for e in result["watch_history"]],
search_history=[Event(**e) for e in result["search_history"]],
subscriptions=[Event(**e) for e in result["subscriptions"]],
combined_events=[Event(**e) for e in result["combined_events"]],
stats=PreprocessStats(
total_watch=result["stats"]["total_watch"],
total_search=result["stats"]["total_search"],
total_subscribe=result["stats"]["total_subscribe"],
total_events=result["stats"]["total_events"],
language_breakdown=LanguageBreakdown(**result["stats"]["language_breakdown"])
),
timezone=request.timezone
)
@router.post("/all-and-store", response_model=PreprocessedSessionData)
async def preprocess_and_store(request: PreprocessRequest):
"""
Preprocess all files, combine results, and store in session.
Returns session token with preprocessed events.
"""
files_dict = [{"filename": f.filename, "content": f.content} for f in request.files]
result = preprocess_all_files(files_dict, request.timezone)
# Create session data
STORAGE_DIR.mkdir(parents=True, exist_ok=True)
token = str(uuid.uuid4())
created_at = datetime.utcnow().isoformat() + "Z"
session_data = {
"token": token,
"events": result["combined_events"],
"stats": result["stats"],
"timezone": request.timezone,
"created_at": created_at
}
# Save to file
session_path = STORAGE_DIR / f"preprocessed_{token}.json"
with open(session_path, 'w', encoding='utf-8') as f:
json.dump(session_data, f, indent=2)
stats = PreprocessStats(
total_watch=result["stats"]["total_watch"],
total_search=result["stats"]["total_search"],
total_subscribe=result["stats"]["total_subscribe"],
total_events=result["stats"]["total_events"],
language_breakdown=LanguageBreakdown(**result["stats"]["language_breakdown"])
)
return PreprocessedSessionData(
token=token,
events=[Event(**e) for e in result["combined_events"]],
stats=stats,
timezone=request.timezone,
created_at=created_at
)
@router.get("/session/{token}", response_model=PreprocessedSessionData)
async def get_preprocessed_session(token: str):
"""
Retrieve preprocessed session data by token.
"""
session_path = STORAGE_DIR / f"preprocessed_{token}.json"
if not session_path.exists():
raise HTTPException(status_code=404, detail="Preprocessed session not found")
try:
with open(session_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return PreprocessedSessionData(
token=data["token"],
events=[Event(**e) for e in data["events"]],
stats=PreprocessStats(**data["stats"]),
timezone=data["timezone"],
created_at=data["created_at"]
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error reading session: {str(e)}")