transcript / app.py
rsnarsna
fix: Update Google OAuth token and refresh token handling; add error handling for token refresh and improve transcript fetching with fallback mechanisms
ade38de
Raw
History Blame Contribute Delete
37.1 kB
#!/usr/bin/env python3
import os
import json
import base64
import hashlib
import secrets
import tempfile
import threading
from pathlib import Path
from email.mime.text import MIMEText
from datetime import datetime, timezone
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import RedirectResponse, HTMLResponse, FileResponse
from pydantic import BaseModel
from google_auth_oauthlib.flow import Flow
from google.auth.transport.requests import Request as GoogleRequest
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from gemini_transcript import TranscriptSummaryPipeline
# ============================================================================
# CONFIG
# ============================================================================
os.environ.setdefault("OAUTHLIB_INSECURE_TRANSPORT", "1")
BASE_DIR = Path(__file__).resolve().parent
CLIENT_SECRETS = os.getenv("CLIENT_SECRETS", str(BASE_DIR / "client_secret.json"))
TOKEN_PATH = os.getenv("GOOGLE_OAUTH_TOKEN_PATH", str(BASE_DIR / "Google_oauth_token.json"))
REDIRECT_URI = os.getenv("REDIRECT_URI", "http://localhost:8000/auth/callback")
STATE_FILE = BASE_DIR / "oauth_states.json"
DEFAULT_SPREADSHEET_ID = os.getenv("DEFAULT_SPREADSHEET_ID", "1XA3vW_guHBT-ktkYvhktmUqcquECBe8exGZAoSQS3Ag")
DEFAULT_DRIVE_FOLDER_ID = os.getenv("DEFAULT_DRIVE_FOLDER_ID", "1hI6dNXysR_2p9gHkDpsI-iwMExmy2hhR")
SCOPES = [
"https://www.googleapis.com/auth/spreadsheets",
"https://www.googleapis.com/auth/gmail.send",
"https://www.googleapis.com/auth/drive.file",
"https://www.googleapis.com/auth/youtube.force-ssl",
]
SHEETS_HEADERS = [
"Timestamp", # A
"Job ID", # B
"Video Title", # C
"YouTube URL", # D
"Model Used", # E
"Transcript Method", # F
"Status", # G
"Summary Drive Link", # H
"Q&A Drive Link", # I
"Transcript Drive Link", # J
"Email Sent To", # K
"Email Status", # L
"Email Message ID", # M
"Completed At", # N
"Error", # O
]
# ============================================================================
# IN-MEMORY JOB STORE
# ============================================================================
_jobs: dict[str, dict] = {}
_jobs_lock = threading.Lock()
STEPS = [
"fetch_transcript",
"summarize",
"create_drive_folder",
"upload_summary",
"upload_qa",
"upload_transcript",
"send_email",
"log_sheet",
]
def _new_job(job_id: str, youtube_url: str, email_to: str) -> dict:
job = {
"job_id": job_id,
"status": "initiated",
"youtube_url": youtube_url,
"email_to": email_to,
"started_at": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC"),
"completed_at": None,
"steps": {s: "pending" for s in STEPS},
"result": None,
"error": None,
}
with _jobs_lock:
_jobs[job_id] = job
return job
def _update_job(job_id: str, **kwargs):
with _jobs_lock:
if job_id in _jobs:
_jobs[job_id].update(kwargs)
def _set_step(job_id: str, step: str, state: str):
with _jobs_lock:
if job_id in _jobs:
_jobs[job_id]["steps"][step] = state
# ============================================================================
# APP
# ============================================================================
app = FastAPI(title="Google Integration API", version="7.0.0")
# ============================================================================
# MODELS
# ============================================================================
class GenerateRequest(BaseModel):
youtube_url: str
email_to: str
class EmailRequest(BaseModel):
to: str
subject: str
body: str
class CreateFileRequest(BaseModel):
filename: str
content: str
mimetype: str = "text/plain"
folder_id: str | None = None
make_public: bool = True
class SheetWriteRequest(BaseModel):
spreadsheet_id: str = DEFAULT_SPREADSHEET_ID
range_name: str = "Sheet1!A1"
values: list[list]
class SheetClearRequest(BaseModel):
spreadsheet_id: str = DEFAULT_SPREADSHEET_ID
range_name: str = "Sheet1!A1:Z1000"
# ============================================================================
# STATE PERSISTENCE
# ============================================================================
def load_states() -> dict:
try:
return json.loads(STATE_FILE.read_text())
except Exception:
return {}
def save_states(states: dict) -> None:
STATE_FILE.write_text(json.dumps(states))
# ============================================================================
# AUTH
# ============================================================================
def create_flow() -> Flow:
if not os.path.exists(CLIENT_SECRETS):
raise FileNotFoundError(f"Missing client secret: {CLIENT_SECRETS}")
return Flow.from_client_secrets_file(
CLIENT_SECRETS, scopes=SCOPES, redirect_uri=REDIRECT_URI
)
def load_credentials() -> Credentials | None:
if not os.path.exists(TOKEN_PATH):
return None
creds = Credentials.from_authorized_user_file(TOKEN_PATH, SCOPES)
if not creds.valid:
if creds.expired and creds.refresh_token:
try:
creds.refresh(GoogleRequest())
Path(TOKEN_PATH).write_text(creds.to_json(), encoding="utf-8")
except Exception as e:
print(f"[WARN] Failed to refresh token: {e}. Forcing re-auth.")
if os.path.exists(TOKEN_PATH):
os.remove(TOKEN_PATH)
return None
else:
return None
return creds
def require_credentials() -> Credentials:
creds = load_credentials()
if creds is None:
raise HTTPException(
status_code=401,
detail="Not authenticated. Visit http://localhost:8000/auth/start",
)
return creds
# ============================================================================
# GMAIL
# ============================================================================
def _raw_message(to: str, subject: str, body: str) -> dict:
msg = MIMEText(body)
msg["to"] = to
msg["subject"] = subject
return {"raw": base64.urlsafe_b64encode(msg.as_bytes()).decode()}
def send_email(to: str, subject: str, body: str, creds: Credentials = None) -> dict:
if creds is None:
creds = require_credentials()
return (
build("gmail", "v1", credentials=creds, cache_discovery=False)
.users().messages()
.send(userId="me", body=_raw_message(to, subject, body))
.execute()
)
# ============================================================================
# DRIVE
# ============================================================================
def _direct_link(file_id: str) -> str:
return f"https://drive.google.com/uc?export=download&id={file_id}"
def _make_public(service, file_id: str) -> dict:
service.permissions().create(
fileId=file_id,
body={"type": "anyone", "role": "reader"},
).execute()
return service.files().get(
fileId=file_id,
fields="id,name,webViewLink,webContentLink,mimeType,size,createdTime,modifiedTime",
).execute()
def _drive(creds: Credentials):
return build("drive", "v3", credentials=creds, cache_discovery=False)
def create_drive_folder(
folder_name: str,
parent_folder_id: str | None = None,
creds: Credentials = None,
) -> str:
if creds is None:
creds = require_credentials()
meta = {
"name": folder_name,
"mimeType": "application/vnd.google-apps.folder",
}
pid = parent_folder_id or DEFAULT_DRIVE_FOLDER_ID
if pid:
meta["parents"] = [pid]
folder = _drive(creds).files().create(body=meta, fields="id").execute()
return folder["id"]
def create_file_on_drive(
filename: str,
content: str,
mimetype: str = "text/plain",
creds: Credentials = None,
folder_id: str | None = None,
make_public: bool = True,
) -> dict:
"""
Upload a string as a Drive file via a temporary file.
No permanent local files are created.
"""
if creds is None:
creds = require_credentials()
svc = _drive(creds)
meta = {"name": filename}
if folder_id:
meta["parents"] = [folder_id]
suffix = Path(filename).suffix or ".txt"
tmp = tempfile.NamedTemporaryFile(
mode="w", suffix=suffix, delete=False, encoding="utf-8"
)
tmp.write(content)
tmp.close()
tmp_path = tmp.name
try:
media = MediaFileUpload(tmp_path, mimetype=mimetype, resumable=True)
file = svc.files().create(
body=meta, media_body=media,
fields="id,name,webViewLink,webContentLink,mimeType,size",
).execute()
media._fd.close()
finally:
try:
os.unlink(tmp_path)
except Exception:
pass
file_id = file["id"]
if make_public:
file = _make_public(svc, file_id)
file["direct_download_link"] = _direct_link(file_id)
return file
def get_drive_file(file_id: str, creds: Credentials = None) -> dict:
if creds is None:
creds = require_credentials()
file = _drive(creds).files().get(
fileId=file_id,
fields="id,name,webViewLink,webContentLink,mimeType,size,createdTime,modifiedTime",
).execute()
file["direct_download_link"] = _direct_link(file_id)
return file
def list_drive_files(
folder_id: str | None = None,
page_size: int = 20,
creds: Credentials = None,
) -> list:
if creds is None:
creds = require_credentials()
fid = folder_id or DEFAULT_DRIVE_FOLDER_ID
query = f"'{fid}' in parents and trashed=false" if fid else "trashed=false"
files = _drive(creds).files().list(
q=query, pageSize=page_size,
fields="files(id,name,webViewLink,webContentLink,mimeType,size,createdTime)",
).execute().get("files", [])
for f in files:
f["direct_download_link"] = _direct_link(f["id"])
return files
# ============================================================================
# SHEETS
# ============================================================================
def _sheets(creds: Credentials):
return build("sheets", "v4", credentials=creds, cache_discovery=False)
def read_sheet(
spreadsheet_id: str,
range_name: str = "Sheet1!A1:Z1000",
creds: Credentials = None,
) -> list:
if creds is None:
creds = require_credentials()
return (
_sheets(creds).spreadsheets().values()
.get(spreadsheetId=spreadsheet_id, range=range_name)
.execute().get("values", [])
)
def write_sheet(
spreadsheet_id: str,
range_name: str,
values: list[list],
creds: Credentials = None,
) -> dict:
if creds is None:
creds = require_credentials()
return (
_sheets(creds).spreadsheets().values()
.update(
spreadsheetId=spreadsheet_id,
range=range_name,
valueInputOption="USER_ENTERED",
body={"values": values},
).execute()
)
def append_sheet(
spreadsheet_id: str,
range_name: str,
values: list[list],
creds: Credentials = None,
) -> dict:
if creds is None:
creds = require_credentials()
return (
_sheets(creds).spreadsheets().values()
.append(
spreadsheetId=spreadsheet_id,
range=range_name,
valueInputOption="USER_ENTERED",
insertDataOption="INSERT_ROWS",
body={"values": values},
).execute()
)
def clear_sheet(
spreadsheet_id: str,
range_name: str,
creds: Credentials = None,
) -> dict:
if creds is None:
creds = require_credentials()
return (
_sheets(creds).spreadsheets().values()
.clear(spreadsheetId=spreadsheet_id, range=range_name)
.execute()
)
def get_sheet_metadata(
spreadsheet_id: str,
creds: Credentials = None,
) -> dict:
if creds is None:
creds = require_credentials()
info = _sheets(creds).spreadsheets().get(spreadsheetId=spreadsheet_id).execute()
return {
"spreadsheet_id": info["spreadsheetId"],
"title": info["properties"]["title"],
"url": f"https://docs.google.com/spreadsheets/d/{info['spreadsheetId']}",
"sheets": [
{
"sheet_id": s["properties"]["sheetId"],
"title": s["properties"]["title"],
"rows": s["properties"]["gridProperties"]["rowCount"],
"cols": s["properties"]["gridProperties"]["columnCount"],
}
for s in info.get("sheets", [])
],
}
def append_row_to_sheet(
values: list,
spreadsheet_id: str = DEFAULT_SPREADSHEET_ID,
range_name: str = "Sheet1!A1",
creds: Credentials = None,
):
if not spreadsheet_id:
return None
return append_sheet(spreadsheet_id, range_name, [values], creds=creds)
# ============================================================================
# SHEETS β€” JOB RECORD HELPERS
# ============================================================================
def ensure_sheet_header(creds: Credentials = None) -> None:
if not DEFAULT_SPREADSHEET_ID:
return
try:
existing = read_sheet(DEFAULT_SPREADSHEET_ID, "Sheet1!A1:Z1", creds=creds)
if not existing:
write_sheet(
DEFAULT_SPREADSHEET_ID,
"Sheet1!A1",
[SHEETS_HEADERS],
creds=creds,
)
except Exception as exc:
print(f"[WARN] Could not write sheet header: {exc}")
def _find_job_row(job_id: str, creds: Credentials) -> int | None:
"""Find 1-based row number of job_id in column B."""
try:
rows = read_sheet(DEFAULT_SPREADSHEET_ID, "Sheet1!B:B", creds=creds)
for i, row in enumerate(rows, start=1):
if row and row[0] == job_id:
return i
except Exception:
pass
return None
def _create_sheet_record(
job_id: str,
timestamp: str,
youtube_url: str,
email_to: str,
creds: Credentials,
) -> None:
"""Insert initial row when job starts."""
try:
row = [
timestamp, # A β€” Timestamp
job_id, # B β€” Job ID
"", # C β€” Video Title
youtube_url, # D β€” YouTube URL
"", # E β€” Model Used
"", # F β€” Transcript Method
"initiated", # G β€” Status
"", # H β€” Summary Link
"", # I β€” Q&A Link
"", # J β€” Transcript Link
email_to, # K β€” Email Sent To
"", # L β€” Email Status
"", # M β€” Email Message ID
"", # N β€” Completed At
"", # O β€” Error
]
append_sheet(DEFAULT_SPREADSHEET_ID, "Sheet1!A1", [row], creds=creds)
except Exception as exc:
print(f"[WARN] Could not create sheet record: {exc}")
def _update_sheet_record(
job_id: str,
creds: Credentials,
video_title: str = "",
model_used: str = "",
extraction_method: str = "",
status: str = "",
summary_link: str = "",
qa_link: str = "",
transcript_link: str = "",
email_status: str = "",
email_msg_id: str = "",
completed_at: str = "",
error: str = "",
) -> None:
"""Find job row by job_id and overwrite with updated values."""
if not DEFAULT_SPREADSHEET_ID:
return
try:
row_num = _find_job_row(job_id, creds)
if row_num is None:
print(f"[WARN] Row for job {job_id} not found in sheet.")
return
existing = read_sheet(
DEFAULT_SPREADSHEET_ID,
f"Sheet1!A{row_num}:O{row_num}",
creds=creds,
)
existing_row = existing[0] if existing else [""] * 15
def _v(new: str, idx: int) -> str:
return new if new != "" else (
existing_row[idx] if len(existing_row) > idx else ""
)
updated_row = [
_v("", 0), # A β€” Timestamp (immutable)
job_id, # B β€” Job ID (immutable)
_v(video_title, 2), # C β€” Video Title
_v("", 3), # D β€” YouTube URL (immutable)
_v(model_used, 4), # E β€” Model Used
_v(extraction_method, 5), # F β€” Transcript Method
_v(status, 6), # G β€” Status
_v(summary_link, 7), # H β€” Summary Link
_v(qa_link, 8), # I β€” Q&A Link
_v(transcript_link, 9), # J β€” Transcript Link
_v("", 10), # K β€” Email Sent To (immutable)
_v(email_status, 11), # L β€” Email Status
_v(email_msg_id, 12), # M β€” Email Message ID
_v(completed_at, 13), # N β€” Completed At
_v(error, 14), # O β€” Error
]
write_sheet(
DEFAULT_SPREADSHEET_ID,
f"Sheet1!A{row_num}:O{row_num}",
[updated_row],
creds=creds,
)
except Exception as exc:
print(f"[WARN] Could not update sheet record: {exc}")
# ============================================================================
# STARTUP
# ============================================================================
@app.on_event("startup")
def on_startup():
creds = load_credentials()
if creds:
ensure_sheet_header(creds=creds)
# ============================================================================
# BASIC ROUTES
# ============================================================================
@app.get("/")
def root():
return FileResponse("index.html")
@app.get("/health")
def health():
creds = load_credentials()
return {
"status": "ok",
"version": "7.0.0",
"authenticated": creds is not None,
"endpoints": {
"auth": ["/auth/start", "/auth/status", "/auth/revoke"],
"gmail": ["/email"],
"drive": ["/drive/create", "/drive/file/{id}", "/drive/files"],
"sheets": ["/sheets/info", "/sheets/read", "/sheets/write",
"/sheets/append", "/sheets/clear"],
"jobs": ["/generate", "/status/{job_id}", "/jobs"],
"misc": ["/health"],
},
}
# ============================================================================
# OAUTH
# ============================================================================
@app.get("/auth/start")
def auth_start():
flow = create_flow()
verifier = secrets.token_urlsafe(64)
challenge = (
base64.urlsafe_b64encode(hashlib.sha256(verifier.encode()).digest())
.rstrip(b"=").decode()
)
auth_url, state = flow.authorization_url(
access_type="offline",
include_granted_scopes="true",
prompt="consent",
code_challenge=challenge,
code_challenge_method="S256",
)
states = load_states()
states[state] = verifier
save_states(states)
return RedirectResponse(auth_url)
@app.get("/auth/callback")
def auth_callback(request: Request):
state = request.query_params.get("state", "")
states = load_states()
if state not in states:
raise HTTPException(status_code=400, detail="Invalid or expired OAuth state.")
verifier = states.pop(state)
save_states(states)
flow = create_flow()
flow.fetch_token(
authorization_response=str(request.url).replace("https://", "http://"),
code_verifier=verifier,
)
Path(TOKEN_PATH).write_text(flow.credentials.to_json(), encoding="utf-8")
return HTMLResponse("""
<html>
<body style="font-family:sans-serif;text-align:center;padding-top:80px;
background:#f0fdf4;color:#166534">
<h1>βœ… Authorization Successful</h1>
<p>Gmail, Drive and Sheets are now connected.</p>
<p>You can close this tab.</p>
</body>
</html>
""")
@app.get("/auth/status")
def auth_status():
creds = load_credentials()
return {"authenticated": creds is not None}
@app.delete("/auth/revoke")
def auth_revoke():
for p in [Path(TOKEN_PATH), STATE_FILE]:
if p.exists():
p.unlink()
return {"status": "revoked"}
# ============================================================================
# EMAIL
# ============================================================================
@app.post("/email")
def email(payload: EmailRequest):
creds = require_credentials()
result = send_email(payload.to, payload.subject, payload.body, creds=creds)
return {"status": "sent", "message_id": result.get("id")}
# ============================================================================
# DRIVE ROUTES
# ============================================================================
@app.post("/drive/create")
def drive_create(payload: CreateFileRequest):
creds = require_credentials()
file = create_file_on_drive(
filename=payload.filename,
content=payload.content,
mimetype=payload.mimetype,
creds=creds,
folder_id=payload.folder_id,
make_public=payload.make_public,
)
return {
"file_id": file["id"],
"name": file["name"],
"mime_type": file.get("mimeType"),
"web_view_link": file.get("webViewLink"),
"direct_download_link": file["direct_download_link"],
}
@app.get("/drive/file/{file_id}")
def drive_get_file(file_id: str):
creds = require_credentials()
file = get_drive_file(file_id, creds=creds)
return {
"file_id": file["id"],
"name": file["name"],
"mime_type": file.get("mimeType"),
"size_bytes": file.get("size"),
"created": file.get("createdTime"),
"modified": file.get("modifiedTime"),
"web_view_link": file.get("webViewLink"),
"direct_download_link": file["direct_download_link"],
}
@app.get("/drive/files")
def drive_list_files(folder_id: str = "", limit: int = 20):
creds = require_credentials()
files = list_drive_files(
folder_id=folder_id or None,
page_size=limit,
creds=creds,
)
return {"count": len(files), "files": files}
# ============================================================================
# SHEETS ROUTES
# ============================================================================
@app.get("/sheets/info")
def sheets_info(spreadsheet_id: str = DEFAULT_SPREADSHEET_ID):
if not spreadsheet_id:
raise HTTPException(status_code=400, detail="spreadsheet_id is required.")
return get_sheet_metadata(spreadsheet_id, creds=require_credentials())
@app.get("/sheets/read")
def sheets_read(
spreadsheet_id: str = DEFAULT_SPREADSHEET_ID,
range_name: str = "Sheet1!A1:Z1000",
):
if not spreadsheet_id:
raise HTTPException(status_code=400, detail="spreadsheet_id is required.")
rows = read_sheet(spreadsheet_id, range_name, creds=require_credentials())
return {
"spreadsheet_id": spreadsheet_id,
"range": range_name,
"row_count": len(rows),
"values": rows,
}
@app.post("/sheets/write")
def sheets_write(payload: SheetWriteRequest):
if not payload.spreadsheet_id:
raise HTTPException(status_code=400, detail="spreadsheet_id is required.")
result = write_sheet(
payload.spreadsheet_id, payload.range_name, payload.values,
creds=require_credentials(),
)
return {
"status": "written",
"updated_range": result.get("updatedRange"),
"updated_rows": result.get("updatedRows"),
"updated_columns": result.get("updatedColumns"),
"updated_cells": result.get("updatedCells"),
}
@app.post("/sheets/append")
def sheets_append(payload: SheetWriteRequest):
if not payload.spreadsheet_id:
raise HTTPException(status_code=400, detail="spreadsheet_id is required.")
result = append_sheet(
payload.spreadsheet_id, payload.range_name, payload.values,
creds=require_credentials(),
)
updates = result.get("updates", {})
return {
"status": "appended",
"updated_range": updates.get("updatedRange"),
"updated_rows": updates.get("updatedRows"),
"updated_cells": updates.get("updatedCells"),
}
@app.post("/sheets/clear")
def sheets_clear(payload: SheetClearRequest):
if not payload.spreadsheet_id:
raise HTTPException(status_code=400, detail="spreadsheet_id is required.")
result = clear_sheet(
payload.spreadsheet_id, payload.range_name,
creds=require_credentials(),
)
return {
"status": "cleared",
"cleared_range": result.get("clearedRange"),
"spreadsheet_id": result.get("spreadsheetId"),
}
# ============================================================================
# JOB STATUS ROUTES
# ============================================================================
@app.get("/status/{job_id}")
def get_status(job_id: str):
with _jobs_lock:
job = _jobs.get(job_id)
if job is None:
raise HTTPException(status_code=404, detail=f"Job '{job_id}' not found.")
return job
@app.get("/jobs")
def list_jobs():
with _jobs_lock:
return {
"total": len(_jobs),
"jobs": [
{
"job_id": j["job_id"],
"status": j["status"],
"youtube_url": j["youtube_url"],
"started_at": j["started_at"],
"completed_at": j["completed_at"],
}
for j in _jobs.values()
],
}
# ============================================================================
# PIPELINE BACKGROUND WORKER
# ============================================================================
def _upload_content(
content: str,
filename: str,
step_key: str,
job_id: str,
folder_id: str,
creds: Credentials,
) -> dict:
"""
Upload a string directly to Drive as a text file.
Uses create_file_on_drive which handles its own temp file internally.
No permanent local files are created.
"""
_set_step(job_id, step_key, "running")
try:
result = create_file_on_drive(
filename=filename,
content=content,
mimetype="text/plain",
creds=creds,
folder_id=folder_id,
make_public=True,
)
_set_step(job_id, step_key, "done")
return result
except Exception as exc:
_set_step(job_id, step_key, "failed")
return {"error": str(exc)}
def _run_pipeline(job_id: str, youtube_url: str, email_to: str):
creds = load_credentials()
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
_create_sheet_record(
job_id=job_id,
timestamp=timestamp,
youtube_url=youtube_url,
email_to=email_to,
creds=creds,
)
try:
# ── STEP 1: Fetch transcript ─────────────────────────────────────
_update_job(job_id, status="fetching_transcript")
_set_step(job_id, "fetch_transcript", "running")
_update_sheet_record(job_id, creds, status="fetching_transcript")
try:
pipeline = TranscriptSummaryPipeline(
youtube_url=youtube_url,
languages=["en", "en-US", "en-GB"],
google_creds=creds,
)
transcript, extraction_method = pipeline.fetcher.run()
_set_step(job_id, "fetch_transcript", "done")
_update_sheet_record(
job_id, creds,
video_title=pipeline.video_title,
extraction_method=extraction_method,
status="transcript_ready",
)
except Exception as exc:
_set_step(job_id, "fetch_transcript", "failed")
raise RuntimeError(f"Transcript fetch failed: {exc}")
video_title = pipeline.video_title
# ── STEP 2: Summarize ────────────────────────────────────────────
_update_job(job_id, status="summarizing")
_set_step(job_id, "summarize", "running")
_update_sheet_record(job_id, creds, status="summarizing")
try:
summary, qa, model_used = pipeline.summarizer.run(transcript)
_set_step(job_id, "summarize", "done")
_update_sheet_record(
job_id, creds,
model_used=model_used,
status="summarized",
)
except Exception as exc:
_set_step(job_id, "summarize", "failed")
raise RuntimeError(f"Summarization failed: {exc}")
# ── STEP 3: Create Drive folder ──────────────────────────────────
_update_job(job_id, status="creating_drive_folder")
_set_step(job_id, "create_drive_folder", "running")
_update_sheet_record(job_id, creds, status="creating_drive_folder")
try:
folder_id = create_drive_folder(video_title, creds=creds)
_set_step(job_id, "create_drive_folder", "done")
except Exception as exc:
_set_step(job_id, "create_drive_folder", "failed")
raise RuntimeError(f"Drive folder creation failed: {exc}")
# ── STEP 4–6: Upload content strings directly to Drive ───────────
_update_job(job_id, status="uploading_drive")
_update_sheet_record(job_id, creds, status="uploading_to_drive")
summary_drive = _upload_content(
summary, f"{video_title}__summary.txt", "upload_summary",
job_id, folder_id, creds,
)
qa_drive = _upload_content(
qa, f"{video_title}__qa.txt", "upload_qa",
job_id, folder_id, creds,
)
transcript_drive = _upload_content(
transcript, f"{video_title}__transcript.txt", "upload_transcript",
job_id, folder_id, creds,
)
summary_link = summary_drive.get("direct_download_link", "N/A")
qa_link = qa_drive.get("direct_download_link", "N/A")
transcript_link = transcript_drive.get("direct_download_link", "N/A")
_update_sheet_record(
job_id, creds,
status="drive_uploaded",
summary_link=summary_link,
qa_link=qa_link,
transcript_link=transcript_link,
)
# ── STEP 7: Send email ───────────────────────────────────────────
_update_job(job_id, status="sending_email")
_set_step(job_id, "send_email", "running")
_update_sheet_record(job_id, creds, status="sending_email")
email_subject = f"βœ… YouTube Summary Ready β€” {video_title}"
email_body = f"""Hello,
Your YouTube video has been processed successfully.
πŸŽ₯ Title : {video_title}
πŸ”— Video URL : {youtube_url}
πŸ“„ Summary : {summary_link}
❓ Q&A : {qa_link}
πŸ“ Transcript : {transcript_link}
────────────────────────────────
Model Used : {model_used}
Transcript Extraction : {extraction_method}
────────────────────────────────
Regards,
Google Integration API
"""
try:
email_result = send_email(
to=email_to, subject=email_subject,
body=email_body, creds=creds,
)
email_status = "sent"
email_msg_id = email_result.get("id", "")
_set_step(job_id, "send_email", "done")
except Exception as exc:
email_status = f"failed: {exc}"
email_msg_id = ""
_set_step(job_id, "send_email", "failed")
# ── STEP 8: Final sheet update ───────────────────────────────────
_update_job(job_id, status="logging_sheet")
_set_step(job_id, "log_sheet", "running")
completed_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
try:
_update_sheet_record(
job_id, creds,
status="completed",
email_status=email_status,
email_msg_id=email_msg_id,
completed_at=completed_at,
)
_set_step(job_id, "log_sheet", "done")
sheets_status = "logged"
except Exception as exc:
sheets_status = f"failed: {exc}"
_set_step(job_id, "log_sheet", "failed")
# ── COMPLETE ─────────────────────────────────────────────────────
_update_job(
job_id,
status="completed",
completed_at=completed_at,
result={
"video_title": video_title,
"youtube_url": youtube_url,
"model_used": model_used,
"extraction_method": extraction_method,
"drive": {
"folder_id": folder_id,
"summary": {
"web_view_link": summary_drive.get("webViewLink"),
"direct_download_link": summary_link,
},
"qa": {
"web_view_link": qa_drive.get("webViewLink"),
"direct_download_link": qa_link,
},
"transcript": {
"web_view_link": transcript_drive.get("webViewLink"),
"direct_download_link": transcript_link,
},
},
"email": {"status": email_status, "message_id": email_msg_id},
"sheets": {"status": sheets_status},
},
)
except Exception as exc:
completed_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
_update_job(
job_id,
status="failed",
completed_at=completed_at,
error=str(exc),
)
_update_sheet_record(
job_id, creds,
status="failed",
completed_at=completed_at,
error=str(exc),
)
# ============================================================================
# GENERATE ROUTE
# ============================================================================
@app.post("/generate")
def generate(payload: GenerateRequest):
"""
Kick off full pipeline in background thread.
Returns job_id immediately β€” poll GET /status/{job_id} for progress.
"""
require_credentials()
job_id = secrets.token_hex(8)
_new_job(job_id, payload.youtube_url, payload.email_to)
thread = threading.Thread(
target=_run_pipeline,
args=(job_id, payload.youtube_url, payload.email_to),
daemon=True,
)
thread.start()
return {
"job_id": job_id,
"status": "initiated",
"poll_url": f"/status/{job_id}",
"started_at": _jobs[job_id]["started_at"],
}
# ============================================================================
# RUN
# ============================================================================
if __name__ == "__main__":
import uvicorn
uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)