AIdea-Server / src /api /notes_routes.py
Ahmed Mostafa
feat: implement YouTube note generation API with background task processing and duration scraping
8813304
import json
import os
import re
import uuid
import asyncio
from datetime import datetime
from typing import Dict, List
from curl_cffi import requests as curl_requests
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
from pydantic import BaseModel, HttpUrl
from src.api.downloader import YouTubeDownloader
from src.auth.dependencies import get_current_user
from src.db.models import User
from src.summarization.note_generator import NoteGenerator
from src.utils.config import settings
from src.utils.logger import setup_logger
logger = setup_logger(__name__)
router = APIRouter(tags=["Notes"])
tasks: Dict[str, Dict] = {}
def _set_task_status(task_id: str, status: str, message: str) -> None:
tasks[task_id]["status"] = status
tasks[task_id]["message"] = message
def _proxy_dict() -> dict | None:
proxy_url = os.environ.get("PROXY_URL", "").strip() or os.environ.get("YOUTUBE_PROXY", "").strip()
if not proxy_url:
return None
return {
"http": proxy_url,
"https": proxy_url,
}
def _extract_video_id(url: str) -> str:
"""Extract the 11-character YouTube video ID from any URL format."""
match = re.search(r"(?:v=|youtu\.be/|shorts/|embed/)([A-Za-z0-9_-]{11})", str(url))
return match.group(1) if match else ""
def _duration_via_supadata(video_id: str) -> int:
"""Estimate video duration from Supadata transcript segment timestamps."""
api_key = os.environ.get("SUPADATA_API_KEY", "").strip()
if not api_key:
return 0
try:
api_url = (
"https://api.supadata.ai/v1/youtube/transcript"
f"?url=https://www.youtube.com/watch?v={video_id}"
)
resp = curl_requests.get(
api_url,
headers={"x-api-key": api_key},
impersonate="chrome124",
timeout=20,
proxies=_proxy_dict(),
)
resp.raise_for_status()
data = resp.json()
segments = data.get("segments") or data.get("content", [])
if isinstance(segments, list) and segments:
last = segments[-1]
offset_ms = last.get("offset", 0) or last.get("start", 0)
dur_ms = last.get("duration", 0) or last.get("dur", 0)
total_s = (int(offset_ms) + int(dur_ms)) // 1000
if total_s > 0:
logger.info("[S2-supadata] duration~%ds", total_s)
return total_s
except Exception as exc:
logger.warning("[S2-supadata] failed: %s", exc)
return 0
def _duration_via_html_scrape(url: str) -> int:
"""Scrape the watch page and parse duration hints."""
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
"Accept-Language": "en-US,en;q=0.9",
"Accept": (
"text/html,application/xhtml+xml,application/xml;"
"q=0.9,image/avif,image/webp,*/*;q=0.8"
),
"Connection": "keep-alive",
"DNT": "1",
"Upgrade-Insecure-Requests": "1",
}
try:
resp = curl_requests.get(
url,
headers=headers,
impersonate="chrome124",
timeout=15,
proxies=_proxy_dict(),
)
resp.raise_for_status()
html = resp.text
except Exception as exc:
logger.warning("[S3-scrape] HTTP fetch failed: %s", exc)
return 0
match = re.search(r'"lengthSeconds"\s*:\s*"(\d+)"', html)
if match:
duration = int(match.group(1))
logger.info("[S3a-regex-quoted] duration=%ds", duration)
return duration
match = re.search(r'"approxDurationMs"\s*:\s*"(\d+)"', html)
if match:
duration = int(match.group(1)) // 1000
logger.info("[S3b-approxMs] duration=%ds", duration)
return duration
match = re.search(
r"var\s+ytInitialPlayerResponse\s*=\s*(\{.*?\})\s*;",
html,
re.DOTALL,
)
if match:
try:
data = json.loads(match.group(1))
seconds_str = data.get("videoDetails", {}).get("lengthSeconds", "")
if seconds_str and str(seconds_str).isdigit():
duration = int(seconds_str)
logger.info("[S3c-jsonParse] duration=%ds", duration)
return duration
except (json.JSONDecodeError, AttributeError) as exc:
logger.warning("[S3c-jsonParse] JSON decode failed: %s", exc)
return 0
def get_youtube_duration(
url: str,
preferred_duration: int = 0,
strategy: str | None = None,
) -> int:
"""Fetch YouTube duration in seconds using Supadata, then page scraping."""
video_id = _extract_video_id(url)
if preferred_duration > 0:
return preferred_duration
if video_id:
duration = _duration_via_supadata(video_id)
if duration > 0:
return duration
duration = _duration_via_html_scrape(url)
if duration > 0:
return duration
logger.warning("[duration] All strategies exhausted for: %s", url)
return 0
class GenerateNotesRequest(BaseModel):
youtube_url: HttpUrl
language: str = "en"
deep_scan: bool = False
class TaskResponse(BaseModel):
task_id: str
status: str
message: str
class GeneratedNoteFile(BaseModel):
filename: str
title: str
created_at: float
size: int
@router.post("/generate", response_model=TaskResponse)
async def generate_note(
request: GenerateNotesRequest,
background_tasks: BackgroundTasks,
current_user: User = Depends(get_current_user),
):
task_id = str(uuid.uuid4())
user_id = current_user.id
tasks[task_id] = {
"status": "pending",
"message": "Initializing...",
"youtube_url": str(request.youtube_url),
"user_id": user_id,
"usedDeepScan": False,
"created_at": datetime.now(),
}
background_tasks.add_task(
process_video_task,
task_id,
str(request.youtube_url),
request.language,
user_id,
request.deep_scan,
)
return TaskResponse(
task_id=task_id,
status="pending",
message="Generation started successfully.",
)
@router.get("/status/{task_id}")
async def get_task_status(task_id: str):
if task_id not in tasks:
raise HTTPException(status_code=404, detail="Task not found")
return tasks[task_id]
async def process_video_task(
task_id: str,
youtube_url: str,
language: str,
user_id: str,
deep_scan: bool = False,
):
downloader = YouTubeDownloader()
try:
video_id = _extract_video_id(youtube_url)
video_title = "YouTube Video"
_set_task_status(task_id, "validating_url", "Validating video URL...")
prefetched_duration = _duration_via_html_scrape(youtube_url)
_set_task_status(
task_id,
"extracting_content",
"Checking for available subtitles...",
)
if deep_scan:
transcript_text = await asyncio.to_thread(
_transcribe_audio_fallback,
task_id,
youtube_url,
language,
downloader,
)
else:
try:
transcript_text = await asyncio.to_thread(
downloader.get_transcript,
youtube_url,
)
except Exception as transcript_exc:
logger.info(
"Subtitle transcript unavailable for task %s, starting deep scan: %s",
task_id,
transcript_exc,
)
transcript_text = await asyncio.to_thread(
_transcribe_audio_fallback,
task_id,
youtube_url,
language,
downloader,
)
_set_task_status(
task_id,
"transcript_ready",
"Transcript ready. Preparing summary...",
)
_set_task_status(
task_id,
"ai_processing",
"Generating intelligent summary...",
)
note_gen = NoteGenerator()
summary_json = note_gen.generateSummary(transcript_text, video_title)
resolved_video_title = video_title
if resolved_video_title == "YouTube Video":
resolved_video_title = str(summary_json.get("title") or resolved_video_title)
video_duration = get_youtube_duration(
youtube_url,
preferred_duration=prefetched_duration,
)
final_markdown = note_gen.format_final_notes(
note_gen.format_notes_to_markdown(summary_json),
resolved_video_title,
youtube_url,
video_duration,
detected_language=summary_json.get("detected_language", "English"),
)
segments = summary_json.get("segments", [])
key_points_list = [
seg["key_insight"]
for seg in segments
if isinstance(seg, dict) and seg.get("key_insight")
]
from src.summarization.topic_classifier import classify_topics
_set_task_status(
task_id,
"structuring_notes",
"Structuring notes and key points...",
)
raw_topics = summary_json.get("topics", [])
categories = classify_topics(raw_topics) if raw_topics else ["Education & Science"]
_set_task_status(task_id, "complete", "Generation completed successfully.")
tasks[task_id]["notes"] = final_markdown
tasks[task_id]["topics"] = categories
tasks[task_id]["category"] = categories
tasks[task_id]["keyPoints"] = key_points_list
tasks[task_id]["videoTitle"] = resolved_video_title
tasks[task_id]["thumbnail"] = (
f"https://img.youtube.com/vi/{video_id}/mqdefault.jpg" if video_id else ""
)
logger.info("Task %s completed successfully", task_id)
except Exception as exc:
logger.error("Task %s failed: %s", task_id, exc)
_set_task_status(task_id, "failed", str(exc))
def _transcribe_audio_fallback(
task_id: str,
youtube_url: str,
language: str,
downloader: YouTubeDownloader,
) -> str:
audio_path = None
try:
_set_task_status(
task_id,
"extracting_audio",
"No subtitles found. Extracting audio for deep scan...",
)
tasks[task_id]["usedDeepScan"] = True
audio_path = downloader.download_audio(youtube_url, task_id)
_set_task_status(
task_id,
"transcribing_audio",
"Transcribing audio with deep scan...",
)
from src.transcription.whisper_transcriber import WhisperTranscriber
transcript_data = WhisperTranscriber().transcribe(
audio_path,
language=language,
verbose=False,
)
transcript_text = str(transcript_data.get("text", "")).strip()
if not transcript_text:
raise RuntimeError("Deep scan produced an empty transcript.")
return transcript_text
except Exception as exc:
raise RuntimeError(
"Deep scan failed: audio extraction or transcription could not be completed. "
"The video may be private, restricted, DRM-protected, unavailable, "
"or YouTube may require YOUTUBE_COOKIES_B64/YOUTUBE_COOKIES for this Space. "
f"Details: {exc}"
) from exc
finally:
if audio_path is not None:
downloader.cleanup(audio_path)
@router.get("/generated", response_model=List[GeneratedNoteFile])
async def list_generated_notes():
notes = []
output_dir = settings.output_dir
if not output_dir.exists():
return []
for file_path in output_dir.glob("*_notes.md"):
stats = file_path.stat()
notes.append(
GeneratedNoteFile(
filename=file_path.name,
title=file_path.name.replace("_notes.md", ""),
created_at=stats.st_mtime,
size=stats.st_size,
)
)
notes.sort(key=lambda item: item.created_at, reverse=True)
return notes