AIdea-Server / src /api /main.py
Ahmed Mostafa
feat: implement YouTube note generation API with background task processing and duration scraping
8813304
from datetime import datetime
from contextlib import asynccontextmanager
import os
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from src.api.auth_routes import router as auth_router
from src.api.notes_routes import router as notes_router
from src.api.recommendation_routes import router as recommendation_router
from src.utils.logger import setup_logger
logger = setup_logger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("πŸš€ AIdea API starting up...")
yield
logger.info("πŸ›‘ AIdea API shutting down...")
app = FastAPI(
title="AIdea API",
description="YouTube Study Notes Generation Engine",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(notes_router)
app.include_router(recommendation_router)
app.include_router(auth_router)
@app.get("/")
def read_root():
return {
"status": "online",
"message": "Welcome to AIdea API! Everything is working perfectly.",
}
@app.get("/health")
async def health_check():
import socket
import httpx
try:
import dns.resolver
has_dnspython = True
except ImportError:
has_dnspython = False
dns_results = {}
for domain in ["www.youtube.com", "google.com"]:
try:
dns_results[f"{domain}_sys"] = socket.gethostbyname(domain)
except Exception as e:
dns_results[f"{domain}_sys"] = f"Error: {e}"
if has_dnspython:
try:
resolver = dns.resolver.Resolver()
resolver.nameservers = ['8.8.8.8']
answer = resolver.resolve(domain, 'A')
dns_results[f"{domain}_ext"] = [str(rdata) for rdata in answer]
except Exception as e:
dns_results[f"{domain}_ext"] = f"Error: {repr(e)}"
else:
dns_results[f"{domain}_ext"] = "dnspython not installed"
connectivity = {}
proxy_url = os.environ.get("PROXY_URL", "").strip() or os.environ.get("YOUTUBE_PROXY", "").strip()
async with httpx.AsyncClient(timeout=5.0, proxy=proxy_url or None) as client:
for url in ["https://www.youtube.com"]:
try:
resp = await client.get(url, follow_redirects=True)
connectivity[url] = f"OK ({resp.status_code})"
except Exception as e:
connectivity[url] = f"Failed: {repr(e)}"
return {
"status": "v7-supadata-only",
"dnspython": has_dnspython,
"dns": dns_results,
"connectivity": connectivity,
"timestamp": datetime.now()
}