#!/usr/bin/env python3 import os import json import base64 import hashlib import secrets import tempfile import threading from pathlib import Path from email.mime.text import MIMEText from datetime import datetime, timezone from fastapi import FastAPI, Request, HTTPException from fastapi.responses import RedirectResponse, HTMLResponse, FileResponse from pydantic import BaseModel from google_auth_oauthlib.flow import Flow from google.auth.transport.requests import Request as GoogleRequest from google.oauth2.credentials import Credentials from googleapiclient.discovery import build from googleapiclient.http import MediaFileUpload from gemini_transcript import TranscriptSummaryPipeline # ============================================================================ # CONFIG # ============================================================================ os.environ.setdefault("OAUTHLIB_INSECURE_TRANSPORT", "1") BASE_DIR = Path(__file__).resolve().parent CLIENT_SECRETS = os.getenv("CLIENT_SECRETS", str(BASE_DIR / "client_secret.json")) TOKEN_PATH = os.getenv("GOOGLE_OAUTH_TOKEN_PATH", str(BASE_DIR / "Google_oauth_token.json")) REDIRECT_URI = os.getenv("REDIRECT_URI", "http://localhost:8000/auth/callback") STATE_FILE = BASE_DIR / "oauth_states.json" DEFAULT_SPREADSHEET_ID = os.getenv("DEFAULT_SPREADSHEET_ID", "1XA3vW_guHBT-ktkYvhktmUqcquECBe8exGZAoSQS3Ag") DEFAULT_DRIVE_FOLDER_ID = os.getenv("DEFAULT_DRIVE_FOLDER_ID", "1hI6dNXysR_2p9gHkDpsI-iwMExmy2hhR") SCOPES = [ "https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/gmail.send", "https://www.googleapis.com/auth/drive.file", "https://www.googleapis.com/auth/youtube.force-ssl", ] SHEETS_HEADERS = [ "Timestamp", # A "Job ID", # B "Video Title", # C "YouTube URL", # D "Model Used", # E "Transcript Method", # F "Status", # G "Summary Drive Link", # H "Q&A Drive Link", # I "Transcript Drive Link", # J "Email Sent To", # K "Email Status", # L "Email Message ID", # M "Completed At", # N "Error", # O ] # ============================================================================ # IN-MEMORY JOB STORE # ============================================================================ _jobs: dict[str, dict] = {} _jobs_lock = threading.Lock() STEPS = [ "fetch_transcript", "summarize", "create_drive_folder", "upload_summary", "upload_qa", "upload_transcript", "send_email", "log_sheet", ] def _new_job(job_id: str, youtube_url: str, email_to: str) -> dict: job = { "job_id": job_id, "status": "initiated", "youtube_url": youtube_url, "email_to": email_to, "started_at": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC"), "completed_at": None, "steps": {s: "pending" for s in STEPS}, "result": None, "error": None, } with _jobs_lock: _jobs[job_id] = job return job def _update_job(job_id: str, **kwargs): with _jobs_lock: if job_id in _jobs: _jobs[job_id].update(kwargs) def _set_step(job_id: str, step: str, state: str): with _jobs_lock: if job_id in _jobs: _jobs[job_id]["steps"][step] = state # ============================================================================ # APP # ============================================================================ app = FastAPI(title="Google Integration API", version="7.0.0") # ============================================================================ # MODELS # ============================================================================ class GenerateRequest(BaseModel): youtube_url: str email_to: str class EmailRequest(BaseModel): to: str subject: str body: str class CreateFileRequest(BaseModel): filename: str content: str mimetype: str = "text/plain" folder_id: str | None = None make_public: bool = True class SheetWriteRequest(BaseModel): spreadsheet_id: str = DEFAULT_SPREADSHEET_ID range_name: str = "Sheet1!A1" values: list[list] class SheetClearRequest(BaseModel): spreadsheet_id: str = DEFAULT_SPREADSHEET_ID range_name: str = "Sheet1!A1:Z1000" # ============================================================================ # STATE PERSISTENCE # ============================================================================ def load_states() -> dict: try: return json.loads(STATE_FILE.read_text()) except Exception: return {} def save_states(states: dict) -> None: STATE_FILE.write_text(json.dumps(states)) # ============================================================================ # AUTH # ============================================================================ def create_flow() -> Flow: if not os.path.exists(CLIENT_SECRETS): raise FileNotFoundError(f"Missing client secret: {CLIENT_SECRETS}") return Flow.from_client_secrets_file( CLIENT_SECRETS, scopes=SCOPES, redirect_uri=REDIRECT_URI ) def load_credentials() -> Credentials | None: if not os.path.exists(TOKEN_PATH): return None creds = Credentials.from_authorized_user_file(TOKEN_PATH, SCOPES) if not creds.valid: if creds.expired and creds.refresh_token: try: creds.refresh(GoogleRequest()) Path(TOKEN_PATH).write_text(creds.to_json(), encoding="utf-8") except Exception as e: print(f"[WARN] Failed to refresh token: {e}. Forcing re-auth.") if os.path.exists(TOKEN_PATH): os.remove(TOKEN_PATH) return None else: return None return creds def require_credentials() -> Credentials: creds = load_credentials() if creds is None: raise HTTPException( status_code=401, detail="Not authenticated. Visit http://localhost:8000/auth/start", ) return creds # ============================================================================ # GMAIL # ============================================================================ def _raw_message(to: str, subject: str, body: str) -> dict: msg = MIMEText(body) msg["to"] = to msg["subject"] = subject return {"raw": base64.urlsafe_b64encode(msg.as_bytes()).decode()} def send_email(to: str, subject: str, body: str, creds: Credentials = None) -> dict: if creds is None: creds = require_credentials() return ( build("gmail", "v1", credentials=creds, cache_discovery=False) .users().messages() .send(userId="me", body=_raw_message(to, subject, body)) .execute() ) # ============================================================================ # DRIVE # ============================================================================ def _direct_link(file_id: str) -> str: return f"https://drive.google.com/uc?export=download&id={file_id}" def _make_public(service, file_id: str) -> dict: service.permissions().create( fileId=file_id, body={"type": "anyone", "role": "reader"}, ).execute() return service.files().get( fileId=file_id, fields="id,name,webViewLink,webContentLink,mimeType,size,createdTime,modifiedTime", ).execute() def _drive(creds: Credentials): return build("drive", "v3", credentials=creds, cache_discovery=False) def create_drive_folder( folder_name: str, parent_folder_id: str | None = None, creds: Credentials = None, ) -> str: if creds is None: creds = require_credentials() meta = { "name": folder_name, "mimeType": "application/vnd.google-apps.folder", } pid = parent_folder_id or DEFAULT_DRIVE_FOLDER_ID if pid: meta["parents"] = [pid] folder = _drive(creds).files().create(body=meta, fields="id").execute() return folder["id"] def create_file_on_drive( filename: str, content: str, mimetype: str = "text/plain", creds: Credentials = None, folder_id: str | None = None, make_public: bool = True, ) -> dict: """ Upload a string as a Drive file via a temporary file. No permanent local files are created. """ if creds is None: creds = require_credentials() svc = _drive(creds) meta = {"name": filename} if folder_id: meta["parents"] = [folder_id] suffix = Path(filename).suffix or ".txt" tmp = tempfile.NamedTemporaryFile( mode="w", suffix=suffix, delete=False, encoding="utf-8" ) tmp.write(content) tmp.close() tmp_path = tmp.name try: media = MediaFileUpload(tmp_path, mimetype=mimetype, resumable=True) file = svc.files().create( body=meta, media_body=media, fields="id,name,webViewLink,webContentLink,mimeType,size", ).execute() media._fd.close() finally: try: os.unlink(tmp_path) except Exception: pass file_id = file["id"] if make_public: file = _make_public(svc, file_id) file["direct_download_link"] = _direct_link(file_id) return file def get_drive_file(file_id: str, creds: Credentials = None) -> dict: if creds is None: creds = require_credentials() file = _drive(creds).files().get( fileId=file_id, fields="id,name,webViewLink,webContentLink,mimeType,size,createdTime,modifiedTime", ).execute() file["direct_download_link"] = _direct_link(file_id) return file def list_drive_files( folder_id: str | None = None, page_size: int = 20, creds: Credentials = None, ) -> list: if creds is None: creds = require_credentials() fid = folder_id or DEFAULT_DRIVE_FOLDER_ID query = f"'{fid}' in parents and trashed=false" if fid else "trashed=false" files = _drive(creds).files().list( q=query, pageSize=page_size, fields="files(id,name,webViewLink,webContentLink,mimeType,size,createdTime)", ).execute().get("files", []) for f in files: f["direct_download_link"] = _direct_link(f["id"]) return files # ============================================================================ # SHEETS # ============================================================================ def _sheets(creds: Credentials): return build("sheets", "v4", credentials=creds, cache_discovery=False) def read_sheet( spreadsheet_id: str, range_name: str = "Sheet1!A1:Z1000", creds: Credentials = None, ) -> list: if creds is None: creds = require_credentials() return ( _sheets(creds).spreadsheets().values() .get(spreadsheetId=spreadsheet_id, range=range_name) .execute().get("values", []) ) def write_sheet( spreadsheet_id: str, range_name: str, values: list[list], creds: Credentials = None, ) -> dict: if creds is None: creds = require_credentials() return ( _sheets(creds).spreadsheets().values() .update( spreadsheetId=spreadsheet_id, range=range_name, valueInputOption="USER_ENTERED", body={"values": values}, ).execute() ) def append_sheet( spreadsheet_id: str, range_name: str, values: list[list], creds: Credentials = None, ) -> dict: if creds is None: creds = require_credentials() return ( _sheets(creds).spreadsheets().values() .append( spreadsheetId=spreadsheet_id, range=range_name, valueInputOption="USER_ENTERED", insertDataOption="INSERT_ROWS", body={"values": values}, ).execute() ) def clear_sheet( spreadsheet_id: str, range_name: str, creds: Credentials = None, ) -> dict: if creds is None: creds = require_credentials() return ( _sheets(creds).spreadsheets().values() .clear(spreadsheetId=spreadsheet_id, range=range_name) .execute() ) def get_sheet_metadata( spreadsheet_id: str, creds: Credentials = None, ) -> dict: if creds is None: creds = require_credentials() info = _sheets(creds).spreadsheets().get(spreadsheetId=spreadsheet_id).execute() return { "spreadsheet_id": info["spreadsheetId"], "title": info["properties"]["title"], "url": f"https://docs.google.com/spreadsheets/d/{info['spreadsheetId']}", "sheets": [ { "sheet_id": s["properties"]["sheetId"], "title": s["properties"]["title"], "rows": s["properties"]["gridProperties"]["rowCount"], "cols": s["properties"]["gridProperties"]["columnCount"], } for s in info.get("sheets", []) ], } def append_row_to_sheet( values: list, spreadsheet_id: str = DEFAULT_SPREADSHEET_ID, range_name: str = "Sheet1!A1", creds: Credentials = None, ): if not spreadsheet_id: return None return append_sheet(spreadsheet_id, range_name, [values], creds=creds) # ============================================================================ # SHEETS — JOB RECORD HELPERS # ============================================================================ def ensure_sheet_header(creds: Credentials = None) -> None: if not DEFAULT_SPREADSHEET_ID: return try: existing = read_sheet(DEFAULT_SPREADSHEET_ID, "Sheet1!A1:Z1", creds=creds) if not existing: write_sheet( DEFAULT_SPREADSHEET_ID, "Sheet1!A1", [SHEETS_HEADERS], creds=creds, ) except Exception as exc: print(f"[WARN] Could not write sheet header: {exc}") def _find_job_row(job_id: str, creds: Credentials) -> int | None: """Find 1-based row number of job_id in column B.""" try: rows = read_sheet(DEFAULT_SPREADSHEET_ID, "Sheet1!B:B", creds=creds) for i, row in enumerate(rows, start=1): if row and row[0] == job_id: return i except Exception: pass return None def _create_sheet_record( job_id: str, timestamp: str, youtube_url: str, email_to: str, creds: Credentials, ) -> None: """Insert initial row when job starts.""" try: row = [ timestamp, # A — Timestamp job_id, # B — Job ID "", # C — Video Title youtube_url, # D — YouTube URL "", # E — Model Used "", # F — Transcript Method "initiated", # G — Status "", # H — Summary Link "", # I — Q&A Link "", # J — Transcript Link email_to, # K — Email Sent To "", # L — Email Status "", # M — Email Message ID "", # N — Completed At "", # O — Error ] append_sheet(DEFAULT_SPREADSHEET_ID, "Sheet1!A1", [row], creds=creds) except Exception as exc: print(f"[WARN] Could not create sheet record: {exc}") def _update_sheet_record( job_id: str, creds: Credentials, video_title: str = "", model_used: str = "", extraction_method: str = "", status: str = "", summary_link: str = "", qa_link: str = "", transcript_link: str = "", email_status: str = "", email_msg_id: str = "", completed_at: str = "", error: str = "", ) -> None: """Find job row by job_id and overwrite with updated values.""" if not DEFAULT_SPREADSHEET_ID: return try: row_num = _find_job_row(job_id, creds) if row_num is None: print(f"[WARN] Row for job {job_id} not found in sheet.") return existing = read_sheet( DEFAULT_SPREADSHEET_ID, f"Sheet1!A{row_num}:O{row_num}", creds=creds, ) existing_row = existing[0] if existing else [""] * 15 def _v(new: str, idx: int) -> str: return new if new != "" else ( existing_row[idx] if len(existing_row) > idx else "" ) updated_row = [ _v("", 0), # A — Timestamp (immutable) job_id, # B — Job ID (immutable) _v(video_title, 2), # C — Video Title _v("", 3), # D — YouTube URL (immutable) _v(model_used, 4), # E — Model Used _v(extraction_method, 5), # F — Transcript Method _v(status, 6), # G — Status _v(summary_link, 7), # H — Summary Link _v(qa_link, 8), # I — Q&A Link _v(transcript_link, 9), # J — Transcript Link _v("", 10), # K — Email Sent To (immutable) _v(email_status, 11), # L — Email Status _v(email_msg_id, 12), # M — Email Message ID _v(completed_at, 13), # N — Completed At _v(error, 14), # O — Error ] write_sheet( DEFAULT_SPREADSHEET_ID, f"Sheet1!A{row_num}:O{row_num}", [updated_row], creds=creds, ) except Exception as exc: print(f"[WARN] Could not update sheet record: {exc}") # ============================================================================ # STARTUP # ============================================================================ @app.on_event("startup") def on_startup(): creds = load_credentials() if creds: ensure_sheet_header(creds=creds) # ============================================================================ # BASIC ROUTES # ============================================================================ @app.get("/") def root(): return FileResponse("index.html") @app.get("/health") def health(): creds = load_credentials() return { "status": "ok", "version": "7.0.0", "authenticated": creds is not None, "endpoints": { "auth": ["/auth/start", "/auth/status", "/auth/revoke"], "gmail": ["/email"], "drive": ["/drive/create", "/drive/file/{id}", "/drive/files"], "sheets": ["/sheets/info", "/sheets/read", "/sheets/write", "/sheets/append", "/sheets/clear"], "jobs": ["/generate", "/status/{job_id}", "/jobs"], "misc": ["/health"], }, } # ============================================================================ # OAUTH # ============================================================================ @app.get("/auth/start") def auth_start(): flow = create_flow() verifier = secrets.token_urlsafe(64) challenge = ( base64.urlsafe_b64encode(hashlib.sha256(verifier.encode()).digest()) .rstrip(b"=").decode() ) auth_url, state = flow.authorization_url( access_type="offline", include_granted_scopes="true", prompt="consent", code_challenge=challenge, code_challenge_method="S256", ) states = load_states() states[state] = verifier save_states(states) return RedirectResponse(auth_url) @app.get("/auth/callback") def auth_callback(request: Request): state = request.query_params.get("state", "") states = load_states() if state not in states: raise HTTPException(status_code=400, detail="Invalid or expired OAuth state.") verifier = states.pop(state) save_states(states) flow = create_flow() flow.fetch_token( authorization_response=str(request.url).replace("https://", "http://"), code_verifier=verifier, ) Path(TOKEN_PATH).write_text(flow.credentials.to_json(), encoding="utf-8") return HTMLResponse("""
Gmail, Drive and Sheets are now connected.
You can close this tab.
""") @app.get("/auth/status") def auth_status(): creds = load_credentials() return {"authenticated": creds is not None} @app.delete("/auth/revoke") def auth_revoke(): for p in [Path(TOKEN_PATH), STATE_FILE]: if p.exists(): p.unlink() return {"status": "revoked"} # ============================================================================ # EMAIL # ============================================================================ @app.post("/email") def email(payload: EmailRequest): creds = require_credentials() result = send_email(payload.to, payload.subject, payload.body, creds=creds) return {"status": "sent", "message_id": result.get("id")} # ============================================================================ # DRIVE ROUTES # ============================================================================ @app.post("/drive/create") def drive_create(payload: CreateFileRequest): creds = require_credentials() file = create_file_on_drive( filename=payload.filename, content=payload.content, mimetype=payload.mimetype, creds=creds, folder_id=payload.folder_id, make_public=payload.make_public, ) return { "file_id": file["id"], "name": file["name"], "mime_type": file.get("mimeType"), "web_view_link": file.get("webViewLink"), "direct_download_link": file["direct_download_link"], } @app.get("/drive/file/{file_id}") def drive_get_file(file_id: str): creds = require_credentials() file = get_drive_file(file_id, creds=creds) return { "file_id": file["id"], "name": file["name"], "mime_type": file.get("mimeType"), "size_bytes": file.get("size"), "created": file.get("createdTime"), "modified": file.get("modifiedTime"), "web_view_link": file.get("webViewLink"), "direct_download_link": file["direct_download_link"], } @app.get("/drive/files") def drive_list_files(folder_id: str = "", limit: int = 20): creds = require_credentials() files = list_drive_files( folder_id=folder_id or None, page_size=limit, creds=creds, ) return {"count": len(files), "files": files} # ============================================================================ # SHEETS ROUTES # ============================================================================ @app.get("/sheets/info") def sheets_info(spreadsheet_id: str = DEFAULT_SPREADSHEET_ID): if not spreadsheet_id: raise HTTPException(status_code=400, detail="spreadsheet_id is required.") return get_sheet_metadata(spreadsheet_id, creds=require_credentials()) @app.get("/sheets/read") def sheets_read( spreadsheet_id: str = DEFAULT_SPREADSHEET_ID, range_name: str = "Sheet1!A1:Z1000", ): if not spreadsheet_id: raise HTTPException(status_code=400, detail="spreadsheet_id is required.") rows = read_sheet(spreadsheet_id, range_name, creds=require_credentials()) return { "spreadsheet_id": spreadsheet_id, "range": range_name, "row_count": len(rows), "values": rows, } @app.post("/sheets/write") def sheets_write(payload: SheetWriteRequest): if not payload.spreadsheet_id: raise HTTPException(status_code=400, detail="spreadsheet_id is required.") result = write_sheet( payload.spreadsheet_id, payload.range_name, payload.values, creds=require_credentials(), ) return { "status": "written", "updated_range": result.get("updatedRange"), "updated_rows": result.get("updatedRows"), "updated_columns": result.get("updatedColumns"), "updated_cells": result.get("updatedCells"), } @app.post("/sheets/append") def sheets_append(payload: SheetWriteRequest): if not payload.spreadsheet_id: raise HTTPException(status_code=400, detail="spreadsheet_id is required.") result = append_sheet( payload.spreadsheet_id, payload.range_name, payload.values, creds=require_credentials(), ) updates = result.get("updates", {}) return { "status": "appended", "updated_range": updates.get("updatedRange"), "updated_rows": updates.get("updatedRows"), "updated_cells": updates.get("updatedCells"), } @app.post("/sheets/clear") def sheets_clear(payload: SheetClearRequest): if not payload.spreadsheet_id: raise HTTPException(status_code=400, detail="spreadsheet_id is required.") result = clear_sheet( payload.spreadsheet_id, payload.range_name, creds=require_credentials(), ) return { "status": "cleared", "cleared_range": result.get("clearedRange"), "spreadsheet_id": result.get("spreadsheetId"), } # ============================================================================ # JOB STATUS ROUTES # ============================================================================ @app.get("/status/{job_id}") def get_status(job_id: str): with _jobs_lock: job = _jobs.get(job_id) if job is None: raise HTTPException(status_code=404, detail=f"Job '{job_id}' not found.") return job @app.get("/jobs") def list_jobs(): with _jobs_lock: return { "total": len(_jobs), "jobs": [ { "job_id": j["job_id"], "status": j["status"], "youtube_url": j["youtube_url"], "started_at": j["started_at"], "completed_at": j["completed_at"], } for j in _jobs.values() ], } # ============================================================================ # PIPELINE BACKGROUND WORKER # ============================================================================ def _upload_content( content: str, filename: str, step_key: str, job_id: str, folder_id: str, creds: Credentials, ) -> dict: """ Upload a string directly to Drive as a text file. Uses create_file_on_drive which handles its own temp file internally. No permanent local files are created. """ _set_step(job_id, step_key, "running") try: result = create_file_on_drive( filename=filename, content=content, mimetype="text/plain", creds=creds, folder_id=folder_id, make_public=True, ) _set_step(job_id, step_key, "done") return result except Exception as exc: _set_step(job_id, step_key, "failed") return {"error": str(exc)} def _run_pipeline(job_id: str, youtube_url: str, email_to: str): creds = load_credentials() timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") _create_sheet_record( job_id=job_id, timestamp=timestamp, youtube_url=youtube_url, email_to=email_to, creds=creds, ) try: # ── STEP 1: Fetch transcript ───────────────────────────────────── _update_job(job_id, status="fetching_transcript") _set_step(job_id, "fetch_transcript", "running") _update_sheet_record(job_id, creds, status="fetching_transcript") try: pipeline = TranscriptSummaryPipeline( youtube_url=youtube_url, languages=["en", "en-US", "en-GB"], google_creds=creds, ) transcript, extraction_method = pipeline.fetcher.run() _set_step(job_id, "fetch_transcript", "done") _update_sheet_record( job_id, creds, video_title=pipeline.video_title, extraction_method=extraction_method, status="transcript_ready", ) except Exception as exc: _set_step(job_id, "fetch_transcript", "failed") raise RuntimeError(f"Transcript fetch failed: {exc}") video_title = pipeline.video_title # ── STEP 2: Summarize ──────────────────────────────────────────── _update_job(job_id, status="summarizing") _set_step(job_id, "summarize", "running") _update_sheet_record(job_id, creds, status="summarizing") try: summary, qa, model_used = pipeline.summarizer.run(transcript) _set_step(job_id, "summarize", "done") _update_sheet_record( job_id, creds, model_used=model_used, status="summarized", ) except Exception as exc: _set_step(job_id, "summarize", "failed") raise RuntimeError(f"Summarization failed: {exc}") # ── STEP 3: Create Drive folder ────────────────────────────────── _update_job(job_id, status="creating_drive_folder") _set_step(job_id, "create_drive_folder", "running") _update_sheet_record(job_id, creds, status="creating_drive_folder") try: folder_id = create_drive_folder(video_title, creds=creds) _set_step(job_id, "create_drive_folder", "done") except Exception as exc: _set_step(job_id, "create_drive_folder", "failed") raise RuntimeError(f"Drive folder creation failed: {exc}") # ── STEP 4–6: Upload content strings directly to Drive ─────────── _update_job(job_id, status="uploading_drive") _update_sheet_record(job_id, creds, status="uploading_to_drive") summary_drive = _upload_content( summary, f"{video_title}__summary.txt", "upload_summary", job_id, folder_id, creds, ) qa_drive = _upload_content( qa, f"{video_title}__qa.txt", "upload_qa", job_id, folder_id, creds, ) transcript_drive = _upload_content( transcript, f"{video_title}__transcript.txt", "upload_transcript", job_id, folder_id, creds, ) summary_link = summary_drive.get("direct_download_link", "N/A") qa_link = qa_drive.get("direct_download_link", "N/A") transcript_link = transcript_drive.get("direct_download_link", "N/A") _update_sheet_record( job_id, creds, status="drive_uploaded", summary_link=summary_link, qa_link=qa_link, transcript_link=transcript_link, ) # ── STEP 7: Send email ─────────────────────────────────────────── _update_job(job_id, status="sending_email") _set_step(job_id, "send_email", "running") _update_sheet_record(job_id, creds, status="sending_email") email_subject = f"✅ YouTube Summary Ready — {video_title}" email_body = f"""Hello, Your YouTube video has been processed successfully. 🎥 Title : {video_title} 🔗 Video URL : {youtube_url} 📄 Summary : {summary_link} ❓ Q&A : {qa_link} 📝 Transcript : {transcript_link} ──────────────────────────────── Model Used : {model_used} Transcript Extraction : {extraction_method} ──────────────────────────────── Regards, Google Integration API """ try: email_result = send_email( to=email_to, subject=email_subject, body=email_body, creds=creds, ) email_status = "sent" email_msg_id = email_result.get("id", "") _set_step(job_id, "send_email", "done") except Exception as exc: email_status = f"failed: {exc}" email_msg_id = "" _set_step(job_id, "send_email", "failed") # ── STEP 8: Final sheet update ─────────────────────────────────── _update_job(job_id, status="logging_sheet") _set_step(job_id, "log_sheet", "running") completed_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") try: _update_sheet_record( job_id, creds, status="completed", email_status=email_status, email_msg_id=email_msg_id, completed_at=completed_at, ) _set_step(job_id, "log_sheet", "done") sheets_status = "logged" except Exception as exc: sheets_status = f"failed: {exc}" _set_step(job_id, "log_sheet", "failed") # ── COMPLETE ───────────────────────────────────────────────────── _update_job( job_id, status="completed", completed_at=completed_at, result={ "video_title": video_title, "youtube_url": youtube_url, "model_used": model_used, "extraction_method": extraction_method, "drive": { "folder_id": folder_id, "summary": { "web_view_link": summary_drive.get("webViewLink"), "direct_download_link": summary_link, }, "qa": { "web_view_link": qa_drive.get("webViewLink"), "direct_download_link": qa_link, }, "transcript": { "web_view_link": transcript_drive.get("webViewLink"), "direct_download_link": transcript_link, }, }, "email": {"status": email_status, "message_id": email_msg_id}, "sheets": {"status": sheets_status}, }, ) except Exception as exc: completed_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") _update_job( job_id, status="failed", completed_at=completed_at, error=str(exc), ) _update_sheet_record( job_id, creds, status="failed", completed_at=completed_at, error=str(exc), ) # ============================================================================ # GENERATE ROUTE # ============================================================================ @app.post("/generate") def generate(payload: GenerateRequest): """ Kick off full pipeline in background thread. Returns job_id immediately — poll GET /status/{job_id} for progress. """ require_credentials() job_id = secrets.token_hex(8) _new_job(job_id, payload.youtube_url, payload.email_to) thread = threading.Thread( target=_run_pipeline, args=(job_id, payload.youtube_url, payload.email_to), daemon=True, ) thread.start() return { "job_id": job_id, "status": "initiated", "poll_url": f"/status/{job_id}", "started_at": _jobs[job_id]["started_at"], } # ============================================================================ # RUN # ============================================================================ if __name__ == "__main__": import uvicorn uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)