Spaces:
Running
Running
File size: 6,184 Bytes
27d04ef | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 | """
Preprocessing API routes
Endpoints for preprocessing YouTube Takeout data into normalized Event format
"""
from fastapi import APIRouter, HTTPException
from models.schemas import (
PreprocessRequest,
PreprocessResult,
PreprocessStats,
LanguageBreakdown,
Event,
PreprocessedSessionData
)
from services.preprocess_service import (
preprocess_watch_history,
preprocess_search_history,
preprocess_subscriptions,
preprocess_all_files
)
from services.session_service import create_session, get_session
from datetime import datetime
import uuid
import json
from pathlib import Path
router = APIRouter()
# Storage for preprocessed sessions
STORAGE_DIR = Path(__file__).parent.parent / "storage"
@router.post("/watch-history", response_model=list[Event])
async def preprocess_watch(request: PreprocessRequest):
"""
Preprocess watch-history.json only.
Returns list of normalized watch events.
"""
watch_file = None
for f in request.files:
if f.filename == "watch-history.json":
watch_file = f
break
if not watch_file:
raise HTTPException(status_code=400, detail="watch-history.json not found in files")
events = preprocess_watch_history(watch_file.content, request.timezone)
return [Event(**e) for e in events]
@router.post("/search-history", response_model=list[Event])
async def preprocess_search(request: PreprocessRequest):
"""
Preprocess search-history.json only.
Returns list of normalized search events.
"""
search_file = None
for f in request.files:
if f.filename == "search-history.json":
search_file = f
break
if not search_file:
raise HTTPException(status_code=400, detail="search-history.json not found in files")
events = preprocess_search_history(search_file.content, request.timezone)
return [Event(**e) for e in events]
@router.post("/subscriptions", response_model=list[Event])
async def preprocess_subs(request: PreprocessRequest):
"""
Preprocess subscriptions.csv only.
Returns list of normalized subscription events.
"""
subs_file = None
for f in request.files:
if f.filename == "subscriptions.csv":
subs_file = f
break
if not subs_file:
raise HTTPException(status_code=400, detail="subscriptions.csv not found in files")
events = preprocess_subscriptions(subs_file.content, request.timezone)
return [Event(**e) for e in events]
@router.post("/all", response_model=PreprocessResult)
async def preprocess_all(request: PreprocessRequest):
"""
Preprocess all files and return combined results.
Processes in order: watch-history -> search-history -> subscriptions
Returns individual results and combined events.
"""
files_dict = [{"filename": f.filename, "content": f.content} for f in request.files]
result = preprocess_all_files(files_dict, request.timezone)
return PreprocessResult(
watch_history=[Event(**e) for e in result["watch_history"]],
search_history=[Event(**e) for e in result["search_history"]],
subscriptions=[Event(**e) for e in result["subscriptions"]],
combined_events=[Event(**e) for e in result["combined_events"]],
stats=PreprocessStats(
total_watch=result["stats"]["total_watch"],
total_search=result["stats"]["total_search"],
total_subscribe=result["stats"]["total_subscribe"],
total_events=result["stats"]["total_events"],
language_breakdown=LanguageBreakdown(**result["stats"]["language_breakdown"])
),
timezone=request.timezone
)
@router.post("/all-and-store", response_model=PreprocessedSessionData)
async def preprocess_and_store(request: PreprocessRequest):
"""
Preprocess all files, combine results, and store in session.
Returns session token with preprocessed events.
"""
files_dict = [{"filename": f.filename, "content": f.content} for f in request.files]
result = preprocess_all_files(files_dict, request.timezone)
# Create session data
STORAGE_DIR.mkdir(parents=True, exist_ok=True)
token = str(uuid.uuid4())
created_at = datetime.utcnow().isoformat() + "Z"
session_data = {
"token": token,
"events": result["combined_events"],
"stats": result["stats"],
"timezone": request.timezone,
"created_at": created_at
}
# Save to file
session_path = STORAGE_DIR / f"preprocessed_{token}.json"
with open(session_path, 'w', encoding='utf-8') as f:
json.dump(session_data, f, indent=2)
stats = PreprocessStats(
total_watch=result["stats"]["total_watch"],
total_search=result["stats"]["total_search"],
total_subscribe=result["stats"]["total_subscribe"],
total_events=result["stats"]["total_events"],
language_breakdown=LanguageBreakdown(**result["stats"]["language_breakdown"])
)
return PreprocessedSessionData(
token=token,
events=[Event(**e) for e in result["combined_events"]],
stats=stats,
timezone=request.timezone,
created_at=created_at
)
@router.get("/session/{token}", response_model=PreprocessedSessionData)
async def get_preprocessed_session(token: str):
"""
Retrieve preprocessed session data by token.
"""
session_path = STORAGE_DIR / f"preprocessed_{token}.json"
if not session_path.exists():
raise HTTPException(status_code=404, detail="Preprocessed session not found")
try:
with open(session_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return PreprocessedSessionData(
token=data["token"],
events=[Event(**e) for e in data["events"]],
stats=PreprocessStats(**data["stats"]),
timezone=data["timezone"],
created_at=data["created_at"]
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error reading session: {str(e)}")
|