import json import os import re import uuid import asyncio from datetime import datetime from typing import Dict, List from curl_cffi import requests as curl_requests from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException from pydantic import BaseModel, HttpUrl from src.api.downloader import YouTubeDownloader from src.auth.dependencies import get_current_user from src.db.models import User from src.summarization.note_generator import NoteGenerator from src.utils.config import settings from src.utils.logger import setup_logger logger = setup_logger(__name__) router = APIRouter(tags=["Notes"]) tasks: Dict[str, Dict] = {} def _set_task_status(task_id: str, status: str, message: str) -> None: tasks[task_id]["status"] = status tasks[task_id]["message"] = message def _proxy_dict() -> dict | None: proxy_url = os.environ.get("PROXY_URL", "").strip() or os.environ.get("YOUTUBE_PROXY", "").strip() if not proxy_url: return None return { "http": proxy_url, "https": proxy_url, } def _extract_video_id(url: str) -> str: """Extract the 11-character YouTube video ID from any URL format.""" match = re.search(r"(?:v=|youtu\.be/|shorts/|embed/)([A-Za-z0-9_-]{11})", str(url)) return match.group(1) if match else "" def _duration_via_supadata(video_id: str) -> int: """Estimate video duration from Supadata transcript segment timestamps.""" api_key = os.environ.get("SUPADATA_API_KEY", "").strip() if not api_key: return 0 try: api_url = ( "https://api.supadata.ai/v1/youtube/transcript" f"?url=https://www.youtube.com/watch?v={video_id}" ) resp = curl_requests.get( api_url, headers={"x-api-key": api_key}, impersonate="chrome124", timeout=20, proxies=_proxy_dict(), ) resp.raise_for_status() data = resp.json() segments = data.get("segments") or data.get("content", []) if isinstance(segments, list) and segments: last = segments[-1] offset_ms = last.get("offset", 0) or last.get("start", 0) dur_ms = last.get("duration", 0) or last.get("dur", 0) total_s = (int(offset_ms) + int(dur_ms)) // 1000 if total_s > 0: logger.info("[S2-supadata] duration~%ds", total_s) return total_s except Exception as exc: logger.warning("[S2-supadata] failed: %s", exc) return 0 def _duration_via_html_scrape(url: str) -> int: """Scrape the watch page and parse duration hints.""" headers = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ), "Accept-Language": "en-US,en;q=0.9", "Accept": ( "text/html,application/xhtml+xml,application/xml;" "q=0.9,image/avif,image/webp,*/*;q=0.8" ), "Connection": "keep-alive", "DNT": "1", "Upgrade-Insecure-Requests": "1", } try: resp = curl_requests.get( url, headers=headers, impersonate="chrome124", timeout=15, proxies=_proxy_dict(), ) resp.raise_for_status() html = resp.text except Exception as exc: logger.warning("[S3-scrape] HTTP fetch failed: %s", exc) return 0 match = re.search(r'"lengthSeconds"\s*:\s*"(\d+)"', html) if match: duration = int(match.group(1)) logger.info("[S3a-regex-quoted] duration=%ds", duration) return duration match = re.search(r'"approxDurationMs"\s*:\s*"(\d+)"', html) if match: duration = int(match.group(1)) // 1000 logger.info("[S3b-approxMs] duration=%ds", duration) return duration match = re.search( r"var\s+ytInitialPlayerResponse\s*=\s*(\{.*?\})\s*;", html, re.DOTALL, ) if match: try: data = json.loads(match.group(1)) seconds_str = data.get("videoDetails", {}).get("lengthSeconds", "") if seconds_str and str(seconds_str).isdigit(): duration = int(seconds_str) logger.info("[S3c-jsonParse] duration=%ds", duration) return duration except (json.JSONDecodeError, AttributeError) as exc: logger.warning("[S3c-jsonParse] JSON decode failed: %s", exc) return 0 def get_youtube_duration( url: str, preferred_duration: int = 0, strategy: str | None = None, ) -> int: """Fetch YouTube duration in seconds using Supadata, then page scraping.""" video_id = _extract_video_id(url) if preferred_duration > 0: return preferred_duration if video_id: duration = _duration_via_supadata(video_id) if duration > 0: return duration duration = _duration_via_html_scrape(url) if duration > 0: return duration logger.warning("[duration] All strategies exhausted for: %s", url) return 0 class GenerateNotesRequest(BaseModel): youtube_url: HttpUrl language: str = "en" deep_scan: bool = False class TaskResponse(BaseModel): task_id: str status: str message: str class GeneratedNoteFile(BaseModel): filename: str title: str created_at: float size: int @router.post("/generate", response_model=TaskResponse) async def generate_note( request: GenerateNotesRequest, background_tasks: BackgroundTasks, current_user: User = Depends(get_current_user), ): task_id = str(uuid.uuid4()) user_id = current_user.id tasks[task_id] = { "status": "pending", "message": "Initializing...", "youtube_url": str(request.youtube_url), "user_id": user_id, "usedDeepScan": False, "created_at": datetime.now(), } background_tasks.add_task( process_video_task, task_id, str(request.youtube_url), request.language, user_id, request.deep_scan, ) return TaskResponse( task_id=task_id, status="pending", message="Generation started successfully.", ) @router.get("/status/{task_id}") async def get_task_status(task_id: str): if task_id not in tasks: raise HTTPException(status_code=404, detail="Task not found") return tasks[task_id] async def process_video_task( task_id: str, youtube_url: str, language: str, user_id: str, deep_scan: bool = False, ): downloader = YouTubeDownloader() try: video_id = _extract_video_id(youtube_url) video_title = "YouTube Video" _set_task_status(task_id, "validating_url", "Validating video URL...") prefetched_duration = _duration_via_html_scrape(youtube_url) _set_task_status( task_id, "extracting_content", "Checking for available subtitles...", ) if deep_scan: transcript_text = await asyncio.to_thread( _transcribe_audio_fallback, task_id, youtube_url, language, downloader, ) else: try: transcript_text = await asyncio.to_thread( downloader.get_transcript, youtube_url, ) except Exception as transcript_exc: logger.info( "Subtitle transcript unavailable for task %s, starting deep scan: %s", task_id, transcript_exc, ) transcript_text = await asyncio.to_thread( _transcribe_audio_fallback, task_id, youtube_url, language, downloader, ) _set_task_status( task_id, "transcript_ready", "Transcript ready. Preparing summary...", ) _set_task_status( task_id, "ai_processing", "Generating intelligent summary...", ) note_gen = NoteGenerator() summary_json = note_gen.generateSummary(transcript_text, video_title) resolved_video_title = video_title if resolved_video_title == "YouTube Video": resolved_video_title = str(summary_json.get("title") or resolved_video_title) video_duration = get_youtube_duration( youtube_url, preferred_duration=prefetched_duration, ) final_markdown = note_gen.format_final_notes( note_gen.format_notes_to_markdown(summary_json), resolved_video_title, youtube_url, video_duration, detected_language=summary_json.get("detected_language", "English"), ) segments = summary_json.get("segments", []) key_points_list = [ seg["key_insight"] for seg in segments if isinstance(seg, dict) and seg.get("key_insight") ] from src.summarization.topic_classifier import classify_topics _set_task_status( task_id, "structuring_notes", "Structuring notes and key points...", ) raw_topics = summary_json.get("topics", []) categories = classify_topics(raw_topics) if raw_topics else ["Education & Science"] _set_task_status(task_id, "complete", "Generation completed successfully.") tasks[task_id]["notes"] = final_markdown tasks[task_id]["topics"] = categories tasks[task_id]["category"] = categories tasks[task_id]["keyPoints"] = key_points_list tasks[task_id]["videoTitle"] = resolved_video_title tasks[task_id]["thumbnail"] = ( f"https://img.youtube.com/vi/{video_id}/mqdefault.jpg" if video_id else "" ) logger.info("Task %s completed successfully", task_id) except Exception as exc: logger.error("Task %s failed: %s", task_id, exc) _set_task_status(task_id, "failed", str(exc)) def _transcribe_audio_fallback( task_id: str, youtube_url: str, language: str, downloader: YouTubeDownloader, ) -> str: audio_path = None try: _set_task_status( task_id, "extracting_audio", "No subtitles found. Extracting audio for deep scan...", ) tasks[task_id]["usedDeepScan"] = True audio_path = downloader.download_audio(youtube_url, task_id) _set_task_status( task_id, "transcribing_audio", "Transcribing audio with deep scan...", ) from src.transcription.whisper_transcriber import WhisperTranscriber transcript_data = WhisperTranscriber().transcribe( audio_path, language=language, verbose=False, ) transcript_text = str(transcript_data.get("text", "")).strip() if not transcript_text: raise RuntimeError("Deep scan produced an empty transcript.") return transcript_text except Exception as exc: raise RuntimeError( "Deep scan failed: audio extraction or transcription could not be completed. " "The video may be private, restricted, DRM-protected, unavailable, " "or YouTube may require YOUTUBE_COOKIES_B64/YOUTUBE_COOKIES for this Space. " f"Details: {exc}" ) from exc finally: if audio_path is not None: downloader.cleanup(audio_path) @router.get("/generated", response_model=List[GeneratedNoteFile]) async def list_generated_notes(): notes = [] output_dir = settings.output_dir if not output_dir.exists(): return [] for file_path in output_dir.glob("*_notes.md"): stats = file_path.stat() notes.append( GeneratedNoteFile( filename=file_path.name, title=file_path.name.replace("_notes.md", ""), created_at=stats.st_mtime, size=stats.st_size, ) ) notes.sort(key=lambda item: item.created_at, reverse=True) return notes