Spaces:
Sleeping
Sleeping
rsnarsna
fix: Update Google OAuth token and refresh token handling; add error handling for token refresh and improve transcript fetching with fallback mechanisms
ade38de | #!/usr/bin/env python3 | |
| import os | |
| import json | |
| import base64 | |
| import hashlib | |
| import secrets | |
| import tempfile | |
| import threading | |
| from pathlib import Path | |
| from email.mime.text import MIMEText | |
| from datetime import datetime, timezone | |
| from fastapi import FastAPI, Request, HTTPException | |
| from fastapi.responses import RedirectResponse, HTMLResponse, FileResponse | |
| from pydantic import BaseModel | |
| from google_auth_oauthlib.flow import Flow | |
| from google.auth.transport.requests import Request as GoogleRequest | |
| from google.oauth2.credentials import Credentials | |
| from googleapiclient.discovery import build | |
| from googleapiclient.http import MediaFileUpload | |
| from gemini_transcript import TranscriptSummaryPipeline | |
| # ============================================================================ | |
| # CONFIG | |
| # ============================================================================ | |
| os.environ.setdefault("OAUTHLIB_INSECURE_TRANSPORT", "1") | |
| BASE_DIR = Path(__file__).resolve().parent | |
| CLIENT_SECRETS = os.getenv("CLIENT_SECRETS", str(BASE_DIR / "client_secret.json")) | |
| TOKEN_PATH = os.getenv("GOOGLE_OAUTH_TOKEN_PATH", str(BASE_DIR / "Google_oauth_token.json")) | |
| REDIRECT_URI = os.getenv("REDIRECT_URI", "http://localhost:8000/auth/callback") | |
| STATE_FILE = BASE_DIR / "oauth_states.json" | |
| DEFAULT_SPREADSHEET_ID = os.getenv("DEFAULT_SPREADSHEET_ID", "1XA3vW_guHBT-ktkYvhktmUqcquECBe8exGZAoSQS3Ag") | |
| DEFAULT_DRIVE_FOLDER_ID = os.getenv("DEFAULT_DRIVE_FOLDER_ID", "1hI6dNXysR_2p9gHkDpsI-iwMExmy2hhR") | |
| SCOPES = [ | |
| "https://www.googleapis.com/auth/spreadsheets", | |
| "https://www.googleapis.com/auth/gmail.send", | |
| "https://www.googleapis.com/auth/drive.file", | |
| "https://www.googleapis.com/auth/youtube.force-ssl", | |
| ] | |
| SHEETS_HEADERS = [ | |
| "Timestamp", # A | |
| "Job ID", # B | |
| "Video Title", # C | |
| "YouTube URL", # D | |
| "Model Used", # E | |
| "Transcript Method", # F | |
| "Status", # G | |
| "Summary Drive Link", # H | |
| "Q&A Drive Link", # I | |
| "Transcript Drive Link", # J | |
| "Email Sent To", # K | |
| "Email Status", # L | |
| "Email Message ID", # M | |
| "Completed At", # N | |
| "Error", # O | |
| ] | |
| # ============================================================================ | |
| # IN-MEMORY JOB STORE | |
| # ============================================================================ | |
| _jobs: dict[str, dict] = {} | |
| _jobs_lock = threading.Lock() | |
| STEPS = [ | |
| "fetch_transcript", | |
| "summarize", | |
| "create_drive_folder", | |
| "upload_summary", | |
| "upload_qa", | |
| "upload_transcript", | |
| "send_email", | |
| "log_sheet", | |
| ] | |
| def _new_job(job_id: str, youtube_url: str, email_to: str) -> dict: | |
| job = { | |
| "job_id": job_id, | |
| "status": "initiated", | |
| "youtube_url": youtube_url, | |
| "email_to": email_to, | |
| "started_at": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC"), | |
| "completed_at": None, | |
| "steps": {s: "pending" for s in STEPS}, | |
| "result": None, | |
| "error": None, | |
| } | |
| with _jobs_lock: | |
| _jobs[job_id] = job | |
| return job | |
| def _update_job(job_id: str, **kwargs): | |
| with _jobs_lock: | |
| if job_id in _jobs: | |
| _jobs[job_id].update(kwargs) | |
| def _set_step(job_id: str, step: str, state: str): | |
| with _jobs_lock: | |
| if job_id in _jobs: | |
| _jobs[job_id]["steps"][step] = state | |
| # ============================================================================ | |
| # APP | |
| # ============================================================================ | |
| app = FastAPI(title="Google Integration API", version="7.0.0") | |
| # ============================================================================ | |
| # MODELS | |
| # ============================================================================ | |
| class GenerateRequest(BaseModel): | |
| youtube_url: str | |
| email_to: str | |
| class EmailRequest(BaseModel): | |
| to: str | |
| subject: str | |
| body: str | |
| class CreateFileRequest(BaseModel): | |
| filename: str | |
| content: str | |
| mimetype: str = "text/plain" | |
| folder_id: str | None = None | |
| make_public: bool = True | |
| class SheetWriteRequest(BaseModel): | |
| spreadsheet_id: str = DEFAULT_SPREADSHEET_ID | |
| range_name: str = "Sheet1!A1" | |
| values: list[list] | |
| class SheetClearRequest(BaseModel): | |
| spreadsheet_id: str = DEFAULT_SPREADSHEET_ID | |
| range_name: str = "Sheet1!A1:Z1000" | |
| # ============================================================================ | |
| # STATE PERSISTENCE | |
| # ============================================================================ | |
| def load_states() -> dict: | |
| try: | |
| return json.loads(STATE_FILE.read_text()) | |
| except Exception: | |
| return {} | |
| def save_states(states: dict) -> None: | |
| STATE_FILE.write_text(json.dumps(states)) | |
| # ============================================================================ | |
| # AUTH | |
| # ============================================================================ | |
| def create_flow() -> Flow: | |
| if not os.path.exists(CLIENT_SECRETS): | |
| raise FileNotFoundError(f"Missing client secret: {CLIENT_SECRETS}") | |
| return Flow.from_client_secrets_file( | |
| CLIENT_SECRETS, scopes=SCOPES, redirect_uri=REDIRECT_URI | |
| ) | |
| def load_credentials() -> Credentials | None: | |
| if not os.path.exists(TOKEN_PATH): | |
| return None | |
| creds = Credentials.from_authorized_user_file(TOKEN_PATH, SCOPES) | |
| if not creds.valid: | |
| if creds.expired and creds.refresh_token: | |
| try: | |
| creds.refresh(GoogleRequest()) | |
| Path(TOKEN_PATH).write_text(creds.to_json(), encoding="utf-8") | |
| except Exception as e: | |
| print(f"[WARN] Failed to refresh token: {e}. Forcing re-auth.") | |
| if os.path.exists(TOKEN_PATH): | |
| os.remove(TOKEN_PATH) | |
| return None | |
| else: | |
| return None | |
| return creds | |
| def require_credentials() -> Credentials: | |
| creds = load_credentials() | |
| if creds is None: | |
| raise HTTPException( | |
| status_code=401, | |
| detail="Not authenticated. Visit http://localhost:8000/auth/start", | |
| ) | |
| return creds | |
| # ============================================================================ | |
| # GMAIL | |
| # ============================================================================ | |
| def _raw_message(to: str, subject: str, body: str) -> dict: | |
| msg = MIMEText(body) | |
| msg["to"] = to | |
| msg["subject"] = subject | |
| return {"raw": base64.urlsafe_b64encode(msg.as_bytes()).decode()} | |
| def send_email(to: str, subject: str, body: str, creds: Credentials = None) -> dict: | |
| if creds is None: | |
| creds = require_credentials() | |
| return ( | |
| build("gmail", "v1", credentials=creds, cache_discovery=False) | |
| .users().messages() | |
| .send(userId="me", body=_raw_message(to, subject, body)) | |
| .execute() | |
| ) | |
| # ============================================================================ | |
| # DRIVE | |
| # ============================================================================ | |
| def _direct_link(file_id: str) -> str: | |
| return f"https://drive.google.com/uc?export=download&id={file_id}" | |
| def _make_public(service, file_id: str) -> dict: | |
| service.permissions().create( | |
| fileId=file_id, | |
| body={"type": "anyone", "role": "reader"}, | |
| ).execute() | |
| return service.files().get( | |
| fileId=file_id, | |
| fields="id,name,webViewLink,webContentLink,mimeType,size,createdTime,modifiedTime", | |
| ).execute() | |
| def _drive(creds: Credentials): | |
| return build("drive", "v3", credentials=creds, cache_discovery=False) | |
| def create_drive_folder( | |
| folder_name: str, | |
| parent_folder_id: str | None = None, | |
| creds: Credentials = None, | |
| ) -> str: | |
| if creds is None: | |
| creds = require_credentials() | |
| meta = { | |
| "name": folder_name, | |
| "mimeType": "application/vnd.google-apps.folder", | |
| } | |
| pid = parent_folder_id or DEFAULT_DRIVE_FOLDER_ID | |
| if pid: | |
| meta["parents"] = [pid] | |
| folder = _drive(creds).files().create(body=meta, fields="id").execute() | |
| return folder["id"] | |
| def create_file_on_drive( | |
| filename: str, | |
| content: str, | |
| mimetype: str = "text/plain", | |
| creds: Credentials = None, | |
| folder_id: str | None = None, | |
| make_public: bool = True, | |
| ) -> dict: | |
| """ | |
| Upload a string as a Drive file via a temporary file. | |
| No permanent local files are created. | |
| """ | |
| if creds is None: | |
| creds = require_credentials() | |
| svc = _drive(creds) | |
| meta = {"name": filename} | |
| if folder_id: | |
| meta["parents"] = [folder_id] | |
| suffix = Path(filename).suffix or ".txt" | |
| tmp = tempfile.NamedTemporaryFile( | |
| mode="w", suffix=suffix, delete=False, encoding="utf-8" | |
| ) | |
| tmp.write(content) | |
| tmp.close() | |
| tmp_path = tmp.name | |
| try: | |
| media = MediaFileUpload(tmp_path, mimetype=mimetype, resumable=True) | |
| file = svc.files().create( | |
| body=meta, media_body=media, | |
| fields="id,name,webViewLink,webContentLink,mimeType,size", | |
| ).execute() | |
| media._fd.close() | |
| finally: | |
| try: | |
| os.unlink(tmp_path) | |
| except Exception: | |
| pass | |
| file_id = file["id"] | |
| if make_public: | |
| file = _make_public(svc, file_id) | |
| file["direct_download_link"] = _direct_link(file_id) | |
| return file | |
| def get_drive_file(file_id: str, creds: Credentials = None) -> dict: | |
| if creds is None: | |
| creds = require_credentials() | |
| file = _drive(creds).files().get( | |
| fileId=file_id, | |
| fields="id,name,webViewLink,webContentLink,mimeType,size,createdTime,modifiedTime", | |
| ).execute() | |
| file["direct_download_link"] = _direct_link(file_id) | |
| return file | |
| def list_drive_files( | |
| folder_id: str | None = None, | |
| page_size: int = 20, | |
| creds: Credentials = None, | |
| ) -> list: | |
| if creds is None: | |
| creds = require_credentials() | |
| fid = folder_id or DEFAULT_DRIVE_FOLDER_ID | |
| query = f"'{fid}' in parents and trashed=false" if fid else "trashed=false" | |
| files = _drive(creds).files().list( | |
| q=query, pageSize=page_size, | |
| fields="files(id,name,webViewLink,webContentLink,mimeType,size,createdTime)", | |
| ).execute().get("files", []) | |
| for f in files: | |
| f["direct_download_link"] = _direct_link(f["id"]) | |
| return files | |
| # ============================================================================ | |
| # SHEETS | |
| # ============================================================================ | |
| def _sheets(creds: Credentials): | |
| return build("sheets", "v4", credentials=creds, cache_discovery=False) | |
| def read_sheet( | |
| spreadsheet_id: str, | |
| range_name: str = "Sheet1!A1:Z1000", | |
| creds: Credentials = None, | |
| ) -> list: | |
| if creds is None: | |
| creds = require_credentials() | |
| return ( | |
| _sheets(creds).spreadsheets().values() | |
| .get(spreadsheetId=spreadsheet_id, range=range_name) | |
| .execute().get("values", []) | |
| ) | |
| def write_sheet( | |
| spreadsheet_id: str, | |
| range_name: str, | |
| values: list[list], | |
| creds: Credentials = None, | |
| ) -> dict: | |
| if creds is None: | |
| creds = require_credentials() | |
| return ( | |
| _sheets(creds).spreadsheets().values() | |
| .update( | |
| spreadsheetId=spreadsheet_id, | |
| range=range_name, | |
| valueInputOption="USER_ENTERED", | |
| body={"values": values}, | |
| ).execute() | |
| ) | |
| def append_sheet( | |
| spreadsheet_id: str, | |
| range_name: str, | |
| values: list[list], | |
| creds: Credentials = None, | |
| ) -> dict: | |
| if creds is None: | |
| creds = require_credentials() | |
| return ( | |
| _sheets(creds).spreadsheets().values() | |
| .append( | |
| spreadsheetId=spreadsheet_id, | |
| range=range_name, | |
| valueInputOption="USER_ENTERED", | |
| insertDataOption="INSERT_ROWS", | |
| body={"values": values}, | |
| ).execute() | |
| ) | |
| def clear_sheet( | |
| spreadsheet_id: str, | |
| range_name: str, | |
| creds: Credentials = None, | |
| ) -> dict: | |
| if creds is None: | |
| creds = require_credentials() | |
| return ( | |
| _sheets(creds).spreadsheets().values() | |
| .clear(spreadsheetId=spreadsheet_id, range=range_name) | |
| .execute() | |
| ) | |
| def get_sheet_metadata( | |
| spreadsheet_id: str, | |
| creds: Credentials = None, | |
| ) -> dict: | |
| if creds is None: | |
| creds = require_credentials() | |
| info = _sheets(creds).spreadsheets().get(spreadsheetId=spreadsheet_id).execute() | |
| return { | |
| "spreadsheet_id": info["spreadsheetId"], | |
| "title": info["properties"]["title"], | |
| "url": f"https://docs.google.com/spreadsheets/d/{info['spreadsheetId']}", | |
| "sheets": [ | |
| { | |
| "sheet_id": s["properties"]["sheetId"], | |
| "title": s["properties"]["title"], | |
| "rows": s["properties"]["gridProperties"]["rowCount"], | |
| "cols": s["properties"]["gridProperties"]["columnCount"], | |
| } | |
| for s in info.get("sheets", []) | |
| ], | |
| } | |
| def append_row_to_sheet( | |
| values: list, | |
| spreadsheet_id: str = DEFAULT_SPREADSHEET_ID, | |
| range_name: str = "Sheet1!A1", | |
| creds: Credentials = None, | |
| ): | |
| if not spreadsheet_id: | |
| return None | |
| return append_sheet(spreadsheet_id, range_name, [values], creds=creds) | |
| # ============================================================================ | |
| # SHEETS β JOB RECORD HELPERS | |
| # ============================================================================ | |
| def ensure_sheet_header(creds: Credentials = None) -> None: | |
| if not DEFAULT_SPREADSHEET_ID: | |
| return | |
| try: | |
| existing = read_sheet(DEFAULT_SPREADSHEET_ID, "Sheet1!A1:Z1", creds=creds) | |
| if not existing: | |
| write_sheet( | |
| DEFAULT_SPREADSHEET_ID, | |
| "Sheet1!A1", | |
| [SHEETS_HEADERS], | |
| creds=creds, | |
| ) | |
| except Exception as exc: | |
| print(f"[WARN] Could not write sheet header: {exc}") | |
| def _find_job_row(job_id: str, creds: Credentials) -> int | None: | |
| """Find 1-based row number of job_id in column B.""" | |
| try: | |
| rows = read_sheet(DEFAULT_SPREADSHEET_ID, "Sheet1!B:B", creds=creds) | |
| for i, row in enumerate(rows, start=1): | |
| if row and row[0] == job_id: | |
| return i | |
| except Exception: | |
| pass | |
| return None | |
| def _create_sheet_record( | |
| job_id: str, | |
| timestamp: str, | |
| youtube_url: str, | |
| email_to: str, | |
| creds: Credentials, | |
| ) -> None: | |
| """Insert initial row when job starts.""" | |
| try: | |
| row = [ | |
| timestamp, # A β Timestamp | |
| job_id, # B β Job ID | |
| "", # C β Video Title | |
| youtube_url, # D β YouTube URL | |
| "", # E β Model Used | |
| "", # F β Transcript Method | |
| "initiated", # G β Status | |
| "", # H β Summary Link | |
| "", # I β Q&A Link | |
| "", # J β Transcript Link | |
| email_to, # K β Email Sent To | |
| "", # L β Email Status | |
| "", # M β Email Message ID | |
| "", # N β Completed At | |
| "", # O β Error | |
| ] | |
| append_sheet(DEFAULT_SPREADSHEET_ID, "Sheet1!A1", [row], creds=creds) | |
| except Exception as exc: | |
| print(f"[WARN] Could not create sheet record: {exc}") | |
| def _update_sheet_record( | |
| job_id: str, | |
| creds: Credentials, | |
| video_title: str = "", | |
| model_used: str = "", | |
| extraction_method: str = "", | |
| status: str = "", | |
| summary_link: str = "", | |
| qa_link: str = "", | |
| transcript_link: str = "", | |
| email_status: str = "", | |
| email_msg_id: str = "", | |
| completed_at: str = "", | |
| error: str = "", | |
| ) -> None: | |
| """Find job row by job_id and overwrite with updated values.""" | |
| if not DEFAULT_SPREADSHEET_ID: | |
| return | |
| try: | |
| row_num = _find_job_row(job_id, creds) | |
| if row_num is None: | |
| print(f"[WARN] Row for job {job_id} not found in sheet.") | |
| return | |
| existing = read_sheet( | |
| DEFAULT_SPREADSHEET_ID, | |
| f"Sheet1!A{row_num}:O{row_num}", | |
| creds=creds, | |
| ) | |
| existing_row = existing[0] if existing else [""] * 15 | |
| def _v(new: str, idx: int) -> str: | |
| return new if new != "" else ( | |
| existing_row[idx] if len(existing_row) > idx else "" | |
| ) | |
| updated_row = [ | |
| _v("", 0), # A β Timestamp (immutable) | |
| job_id, # B β Job ID (immutable) | |
| _v(video_title, 2), # C β Video Title | |
| _v("", 3), # D β YouTube URL (immutable) | |
| _v(model_used, 4), # E β Model Used | |
| _v(extraction_method, 5), # F β Transcript Method | |
| _v(status, 6), # G β Status | |
| _v(summary_link, 7), # H β Summary Link | |
| _v(qa_link, 8), # I β Q&A Link | |
| _v(transcript_link, 9), # J β Transcript Link | |
| _v("", 10), # K β Email Sent To (immutable) | |
| _v(email_status, 11), # L β Email Status | |
| _v(email_msg_id, 12), # M β Email Message ID | |
| _v(completed_at, 13), # N β Completed At | |
| _v(error, 14), # O β Error | |
| ] | |
| write_sheet( | |
| DEFAULT_SPREADSHEET_ID, | |
| f"Sheet1!A{row_num}:O{row_num}", | |
| [updated_row], | |
| creds=creds, | |
| ) | |
| except Exception as exc: | |
| print(f"[WARN] Could not update sheet record: {exc}") | |
| # ============================================================================ | |
| # STARTUP | |
| # ============================================================================ | |
| def on_startup(): | |
| creds = load_credentials() | |
| if creds: | |
| ensure_sheet_header(creds=creds) | |
| # ============================================================================ | |
| # BASIC ROUTES | |
| # ============================================================================ | |
| def root(): | |
| return FileResponse("index.html") | |
| def health(): | |
| creds = load_credentials() | |
| return { | |
| "status": "ok", | |
| "version": "7.0.0", | |
| "authenticated": creds is not None, | |
| "endpoints": { | |
| "auth": ["/auth/start", "/auth/status", "/auth/revoke"], | |
| "gmail": ["/email"], | |
| "drive": ["/drive/create", "/drive/file/{id}", "/drive/files"], | |
| "sheets": ["/sheets/info", "/sheets/read", "/sheets/write", | |
| "/sheets/append", "/sheets/clear"], | |
| "jobs": ["/generate", "/status/{job_id}", "/jobs"], | |
| "misc": ["/health"], | |
| }, | |
| } | |
| # ============================================================================ | |
| # OAUTH | |
| # ============================================================================ | |
| def auth_start(): | |
| flow = create_flow() | |
| verifier = secrets.token_urlsafe(64) | |
| challenge = ( | |
| base64.urlsafe_b64encode(hashlib.sha256(verifier.encode()).digest()) | |
| .rstrip(b"=").decode() | |
| ) | |
| auth_url, state = flow.authorization_url( | |
| access_type="offline", | |
| include_granted_scopes="true", | |
| prompt="consent", | |
| code_challenge=challenge, | |
| code_challenge_method="S256", | |
| ) | |
| states = load_states() | |
| states[state] = verifier | |
| save_states(states) | |
| return RedirectResponse(auth_url) | |
| def auth_callback(request: Request): | |
| state = request.query_params.get("state", "") | |
| states = load_states() | |
| if state not in states: | |
| raise HTTPException(status_code=400, detail="Invalid or expired OAuth state.") | |
| verifier = states.pop(state) | |
| save_states(states) | |
| flow = create_flow() | |
| flow.fetch_token( | |
| authorization_response=str(request.url).replace("https://", "http://"), | |
| code_verifier=verifier, | |
| ) | |
| Path(TOKEN_PATH).write_text(flow.credentials.to_json(), encoding="utf-8") | |
| return HTMLResponse(""" | |
| <html> | |
| <body style="font-family:sans-serif;text-align:center;padding-top:80px; | |
| background:#f0fdf4;color:#166534"> | |
| <h1>β Authorization Successful</h1> | |
| <p>Gmail, Drive and Sheets are now connected.</p> | |
| <p>You can close this tab.</p> | |
| </body> | |
| </html> | |
| """) | |
| def auth_status(): | |
| creds = load_credentials() | |
| return {"authenticated": creds is not None} | |
| def auth_revoke(): | |
| for p in [Path(TOKEN_PATH), STATE_FILE]: | |
| if p.exists(): | |
| p.unlink() | |
| return {"status": "revoked"} | |
| # ============================================================================ | |
| # ============================================================================ | |
| def email(payload: EmailRequest): | |
| creds = require_credentials() | |
| result = send_email(payload.to, payload.subject, payload.body, creds=creds) | |
| return {"status": "sent", "message_id": result.get("id")} | |
| # ============================================================================ | |
| # DRIVE ROUTES | |
| # ============================================================================ | |
| def drive_create(payload: CreateFileRequest): | |
| creds = require_credentials() | |
| file = create_file_on_drive( | |
| filename=payload.filename, | |
| content=payload.content, | |
| mimetype=payload.mimetype, | |
| creds=creds, | |
| folder_id=payload.folder_id, | |
| make_public=payload.make_public, | |
| ) | |
| return { | |
| "file_id": file["id"], | |
| "name": file["name"], | |
| "mime_type": file.get("mimeType"), | |
| "web_view_link": file.get("webViewLink"), | |
| "direct_download_link": file["direct_download_link"], | |
| } | |
| def drive_get_file(file_id: str): | |
| creds = require_credentials() | |
| file = get_drive_file(file_id, creds=creds) | |
| return { | |
| "file_id": file["id"], | |
| "name": file["name"], | |
| "mime_type": file.get("mimeType"), | |
| "size_bytes": file.get("size"), | |
| "created": file.get("createdTime"), | |
| "modified": file.get("modifiedTime"), | |
| "web_view_link": file.get("webViewLink"), | |
| "direct_download_link": file["direct_download_link"], | |
| } | |
| def drive_list_files(folder_id: str = "", limit: int = 20): | |
| creds = require_credentials() | |
| files = list_drive_files( | |
| folder_id=folder_id or None, | |
| page_size=limit, | |
| creds=creds, | |
| ) | |
| return {"count": len(files), "files": files} | |
| # ============================================================================ | |
| # SHEETS ROUTES | |
| # ============================================================================ | |
| def sheets_info(spreadsheet_id: str = DEFAULT_SPREADSHEET_ID): | |
| if not spreadsheet_id: | |
| raise HTTPException(status_code=400, detail="spreadsheet_id is required.") | |
| return get_sheet_metadata(spreadsheet_id, creds=require_credentials()) | |
| def sheets_read( | |
| spreadsheet_id: str = DEFAULT_SPREADSHEET_ID, | |
| range_name: str = "Sheet1!A1:Z1000", | |
| ): | |
| if not spreadsheet_id: | |
| raise HTTPException(status_code=400, detail="spreadsheet_id is required.") | |
| rows = read_sheet(spreadsheet_id, range_name, creds=require_credentials()) | |
| return { | |
| "spreadsheet_id": spreadsheet_id, | |
| "range": range_name, | |
| "row_count": len(rows), | |
| "values": rows, | |
| } | |
| def sheets_write(payload: SheetWriteRequest): | |
| if not payload.spreadsheet_id: | |
| raise HTTPException(status_code=400, detail="spreadsheet_id is required.") | |
| result = write_sheet( | |
| payload.spreadsheet_id, payload.range_name, payload.values, | |
| creds=require_credentials(), | |
| ) | |
| return { | |
| "status": "written", | |
| "updated_range": result.get("updatedRange"), | |
| "updated_rows": result.get("updatedRows"), | |
| "updated_columns": result.get("updatedColumns"), | |
| "updated_cells": result.get("updatedCells"), | |
| } | |
| def sheets_append(payload: SheetWriteRequest): | |
| if not payload.spreadsheet_id: | |
| raise HTTPException(status_code=400, detail="spreadsheet_id is required.") | |
| result = append_sheet( | |
| payload.spreadsheet_id, payload.range_name, payload.values, | |
| creds=require_credentials(), | |
| ) | |
| updates = result.get("updates", {}) | |
| return { | |
| "status": "appended", | |
| "updated_range": updates.get("updatedRange"), | |
| "updated_rows": updates.get("updatedRows"), | |
| "updated_cells": updates.get("updatedCells"), | |
| } | |
| def sheets_clear(payload: SheetClearRequest): | |
| if not payload.spreadsheet_id: | |
| raise HTTPException(status_code=400, detail="spreadsheet_id is required.") | |
| result = clear_sheet( | |
| payload.spreadsheet_id, payload.range_name, | |
| creds=require_credentials(), | |
| ) | |
| return { | |
| "status": "cleared", | |
| "cleared_range": result.get("clearedRange"), | |
| "spreadsheet_id": result.get("spreadsheetId"), | |
| } | |
| # ============================================================================ | |
| # JOB STATUS ROUTES | |
| # ============================================================================ | |
| def get_status(job_id: str): | |
| with _jobs_lock: | |
| job = _jobs.get(job_id) | |
| if job is None: | |
| raise HTTPException(status_code=404, detail=f"Job '{job_id}' not found.") | |
| return job | |
| def list_jobs(): | |
| with _jobs_lock: | |
| return { | |
| "total": len(_jobs), | |
| "jobs": [ | |
| { | |
| "job_id": j["job_id"], | |
| "status": j["status"], | |
| "youtube_url": j["youtube_url"], | |
| "started_at": j["started_at"], | |
| "completed_at": j["completed_at"], | |
| } | |
| for j in _jobs.values() | |
| ], | |
| } | |
| # ============================================================================ | |
| # PIPELINE BACKGROUND WORKER | |
| # ============================================================================ | |
| def _upload_content( | |
| content: str, | |
| filename: str, | |
| step_key: str, | |
| job_id: str, | |
| folder_id: str, | |
| creds: Credentials, | |
| ) -> dict: | |
| """ | |
| Upload a string directly to Drive as a text file. | |
| Uses create_file_on_drive which handles its own temp file internally. | |
| No permanent local files are created. | |
| """ | |
| _set_step(job_id, step_key, "running") | |
| try: | |
| result = create_file_on_drive( | |
| filename=filename, | |
| content=content, | |
| mimetype="text/plain", | |
| creds=creds, | |
| folder_id=folder_id, | |
| make_public=True, | |
| ) | |
| _set_step(job_id, step_key, "done") | |
| return result | |
| except Exception as exc: | |
| _set_step(job_id, step_key, "failed") | |
| return {"error": str(exc)} | |
| def _run_pipeline(job_id: str, youtube_url: str, email_to: str): | |
| creds = load_credentials() | |
| timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") | |
| _create_sheet_record( | |
| job_id=job_id, | |
| timestamp=timestamp, | |
| youtube_url=youtube_url, | |
| email_to=email_to, | |
| creds=creds, | |
| ) | |
| try: | |
| # ββ STEP 1: Fetch transcript βββββββββββββββββββββββββββββββββββββ | |
| _update_job(job_id, status="fetching_transcript") | |
| _set_step(job_id, "fetch_transcript", "running") | |
| _update_sheet_record(job_id, creds, status="fetching_transcript") | |
| try: | |
| pipeline = TranscriptSummaryPipeline( | |
| youtube_url=youtube_url, | |
| languages=["en", "en-US", "en-GB"], | |
| google_creds=creds, | |
| ) | |
| transcript, extraction_method = pipeline.fetcher.run() | |
| _set_step(job_id, "fetch_transcript", "done") | |
| _update_sheet_record( | |
| job_id, creds, | |
| video_title=pipeline.video_title, | |
| extraction_method=extraction_method, | |
| status="transcript_ready", | |
| ) | |
| except Exception as exc: | |
| _set_step(job_id, "fetch_transcript", "failed") | |
| raise RuntimeError(f"Transcript fetch failed: {exc}") | |
| video_title = pipeline.video_title | |
| # ββ STEP 2: Summarize ββββββββββββββββββββββββββββββββββββββββββββ | |
| _update_job(job_id, status="summarizing") | |
| _set_step(job_id, "summarize", "running") | |
| _update_sheet_record(job_id, creds, status="summarizing") | |
| try: | |
| summary, qa, model_used = pipeline.summarizer.run(transcript) | |
| _set_step(job_id, "summarize", "done") | |
| _update_sheet_record( | |
| job_id, creds, | |
| model_used=model_used, | |
| status="summarized", | |
| ) | |
| except Exception as exc: | |
| _set_step(job_id, "summarize", "failed") | |
| raise RuntimeError(f"Summarization failed: {exc}") | |
| # ββ STEP 3: Create Drive folder ββββββββββββββββββββββββββββββββββ | |
| _update_job(job_id, status="creating_drive_folder") | |
| _set_step(job_id, "create_drive_folder", "running") | |
| _update_sheet_record(job_id, creds, status="creating_drive_folder") | |
| try: | |
| folder_id = create_drive_folder(video_title, creds=creds) | |
| _set_step(job_id, "create_drive_folder", "done") | |
| except Exception as exc: | |
| _set_step(job_id, "create_drive_folder", "failed") | |
| raise RuntimeError(f"Drive folder creation failed: {exc}") | |
| # ββ STEP 4β6: Upload content strings directly to Drive βββββββββββ | |
| _update_job(job_id, status="uploading_drive") | |
| _update_sheet_record(job_id, creds, status="uploading_to_drive") | |
| summary_drive = _upload_content( | |
| summary, f"{video_title}__summary.txt", "upload_summary", | |
| job_id, folder_id, creds, | |
| ) | |
| qa_drive = _upload_content( | |
| qa, f"{video_title}__qa.txt", "upload_qa", | |
| job_id, folder_id, creds, | |
| ) | |
| transcript_drive = _upload_content( | |
| transcript, f"{video_title}__transcript.txt", "upload_transcript", | |
| job_id, folder_id, creds, | |
| ) | |
| summary_link = summary_drive.get("direct_download_link", "N/A") | |
| qa_link = qa_drive.get("direct_download_link", "N/A") | |
| transcript_link = transcript_drive.get("direct_download_link", "N/A") | |
| _update_sheet_record( | |
| job_id, creds, | |
| status="drive_uploaded", | |
| summary_link=summary_link, | |
| qa_link=qa_link, | |
| transcript_link=transcript_link, | |
| ) | |
| # ββ STEP 7: Send email βββββββββββββββββββββββββββββββββββββββββββ | |
| _update_job(job_id, status="sending_email") | |
| _set_step(job_id, "send_email", "running") | |
| _update_sheet_record(job_id, creds, status="sending_email") | |
| email_subject = f"β YouTube Summary Ready β {video_title}" | |
| email_body = f"""Hello, | |
| Your YouTube video has been processed successfully. | |
| π₯ Title : {video_title} | |
| π Video URL : {youtube_url} | |
| π Summary : {summary_link} | |
| β Q&A : {qa_link} | |
| π Transcript : {transcript_link} | |
| ββββββββββββββββββββββββββββββββ | |
| Model Used : {model_used} | |
| Transcript Extraction : {extraction_method} | |
| ββββββββββββββββββββββββββββββββ | |
| Regards, | |
| Google Integration API | |
| """ | |
| try: | |
| email_result = send_email( | |
| to=email_to, subject=email_subject, | |
| body=email_body, creds=creds, | |
| ) | |
| email_status = "sent" | |
| email_msg_id = email_result.get("id", "") | |
| _set_step(job_id, "send_email", "done") | |
| except Exception as exc: | |
| email_status = f"failed: {exc}" | |
| email_msg_id = "" | |
| _set_step(job_id, "send_email", "failed") | |
| # ββ STEP 8: Final sheet update βββββββββββββββββββββββββββββββββββ | |
| _update_job(job_id, status="logging_sheet") | |
| _set_step(job_id, "log_sheet", "running") | |
| completed_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") | |
| try: | |
| _update_sheet_record( | |
| job_id, creds, | |
| status="completed", | |
| email_status=email_status, | |
| email_msg_id=email_msg_id, | |
| completed_at=completed_at, | |
| ) | |
| _set_step(job_id, "log_sheet", "done") | |
| sheets_status = "logged" | |
| except Exception as exc: | |
| sheets_status = f"failed: {exc}" | |
| _set_step(job_id, "log_sheet", "failed") | |
| # ββ COMPLETE βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _update_job( | |
| job_id, | |
| status="completed", | |
| completed_at=completed_at, | |
| result={ | |
| "video_title": video_title, | |
| "youtube_url": youtube_url, | |
| "model_used": model_used, | |
| "extraction_method": extraction_method, | |
| "drive": { | |
| "folder_id": folder_id, | |
| "summary": { | |
| "web_view_link": summary_drive.get("webViewLink"), | |
| "direct_download_link": summary_link, | |
| }, | |
| "qa": { | |
| "web_view_link": qa_drive.get("webViewLink"), | |
| "direct_download_link": qa_link, | |
| }, | |
| "transcript": { | |
| "web_view_link": transcript_drive.get("webViewLink"), | |
| "direct_download_link": transcript_link, | |
| }, | |
| }, | |
| "email": {"status": email_status, "message_id": email_msg_id}, | |
| "sheets": {"status": sheets_status}, | |
| }, | |
| ) | |
| except Exception as exc: | |
| completed_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") | |
| _update_job( | |
| job_id, | |
| status="failed", | |
| completed_at=completed_at, | |
| error=str(exc), | |
| ) | |
| _update_sheet_record( | |
| job_id, creds, | |
| status="failed", | |
| completed_at=completed_at, | |
| error=str(exc), | |
| ) | |
| # ============================================================================ | |
| # GENERATE ROUTE | |
| # ============================================================================ | |
| def generate(payload: GenerateRequest): | |
| """ | |
| Kick off full pipeline in background thread. | |
| Returns job_id immediately β poll GET /status/{job_id} for progress. | |
| """ | |
| require_credentials() | |
| job_id = secrets.token_hex(8) | |
| _new_job(job_id, payload.youtube_url, payload.email_to) | |
| thread = threading.Thread( | |
| target=_run_pipeline, | |
| args=(job_id, payload.youtube_url, payload.email_to), | |
| daemon=True, | |
| ) | |
| thread.start() | |
| return { | |
| "job_id": job_id, | |
| "status": "initiated", | |
| "poll_url": f"/status/{job_id}", | |
| "started_at": _jobs[job_id]["started_at"], | |
| } | |
| # ============================================================================ | |
| # RUN | |
| # ============================================================================ | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True) |