Spaces:
Restarting
Restarting
Ahmed Mostafa
feat: implement YouTube note generation API with background task processing and duration scraping
8813304 | from datetime import datetime | |
| from contextlib import asynccontextmanager | |
| import os | |
| from fastapi import FastAPI | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from src.api.auth_routes import router as auth_router | |
| from src.api.notes_routes import router as notes_router | |
| from src.api.recommendation_routes import router as recommendation_router | |
| from src.utils.logger import setup_logger | |
| logger = setup_logger(__name__) | |
| async def lifespan(app: FastAPI): | |
| logger.info("π AIdea API starting up...") | |
| yield | |
| logger.info("π AIdea API shutting down...") | |
| app = FastAPI( | |
| title="AIdea API", | |
| description="YouTube Study Notes Generation Engine", | |
| lifespan=lifespan | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| app.include_router(notes_router) | |
| app.include_router(recommendation_router) | |
| app.include_router(auth_router) | |
| def read_root(): | |
| return { | |
| "status": "online", | |
| "message": "Welcome to AIdea API! Everything is working perfectly.", | |
| } | |
| async def health_check(): | |
| import socket | |
| import httpx | |
| try: | |
| import dns.resolver | |
| has_dnspython = True | |
| except ImportError: | |
| has_dnspython = False | |
| dns_results = {} | |
| for domain in ["www.youtube.com", "google.com"]: | |
| try: | |
| dns_results[f"{domain}_sys"] = socket.gethostbyname(domain) | |
| except Exception as e: | |
| dns_results[f"{domain}_sys"] = f"Error: {e}" | |
| if has_dnspython: | |
| try: | |
| resolver = dns.resolver.Resolver() | |
| resolver.nameservers = ['8.8.8.8'] | |
| answer = resolver.resolve(domain, 'A') | |
| dns_results[f"{domain}_ext"] = [str(rdata) for rdata in answer] | |
| except Exception as e: | |
| dns_results[f"{domain}_ext"] = f"Error: {repr(e)}" | |
| else: | |
| dns_results[f"{domain}_ext"] = "dnspython not installed" | |
| connectivity = {} | |
| proxy_url = os.environ.get("PROXY_URL", "").strip() or os.environ.get("YOUTUBE_PROXY", "").strip() | |
| async with httpx.AsyncClient(timeout=5.0, proxy=proxy_url or None) as client: | |
| for url in ["https://www.youtube.com"]: | |
| try: | |
| resp = await client.get(url, follow_redirects=True) | |
| connectivity[url] = f"OK ({resp.status_code})" | |
| except Exception as e: | |
| connectivity[url] = f"Failed: {repr(e)}" | |
| return { | |
| "status": "v7-supadata-only", | |
| "dnspython": has_dnspython, | |
| "dns": dns_results, | |
| "connectivity": connectivity, | |
| "timestamp": datetime.now() | |
| } | |