kinaiok
Initial deployment setup for Hugging Face Spaces
5ef6e9d
import { Router, type Response as ExpressResponse, type Request } from "express";
import { db, videosTable, usersTable, configTable, creditTransactionsTable } from "@workspace/db";
import { desc, eq, and, or, sql } from "drizzle-orm";
import { randomUUID } from "crypto";
import {
getValidBearerToken,
refreshAccessToken,
getPoolToken,
tryRefreshPoolAccount,
} from "./config";
import { getTurnstileToken, invalidateTurnstileToken } from "../captcha";
import { generateGuardId } from "../guardId";
import { optionalJwtAuth } from "./auth";
import { downloadAndStoreVideo, streamStoredVideo, isStorageReady } from "../lib/videoStorage";
const router = Router();
const GEMINIGEN_BASE = "https://api.geminigen.ai";
const GROK_ENDPOINT = `${GEMINIGEN_BASE}/api/video-gen/grok-stream`;
const VEO_ENDPOINT = `${GEMINIGEN_BASE}/api/video-gen/veo`;
const USER_AGENT =
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36";
// ── In-memory task store ─────────────────────────────────────────────────────
// Tasks live here between POST /generate (which creates them) and the
// SSE /progress/:taskId endpoint which a client subscribes to.
type TaskStatus = "pending" | "complete" | "failed";
interface ProgressEvent {
type: "start" | "progress" | "complete" | "error";
message?: string;
status?: number;
uuid?: string;
video?: unknown;
errorCode?: string;
}
interface Task {
status: TaskStatus;
createdAt: number;
buffered: ProgressEvent[]; // events buffered before client connects
clients: Set<(e: ProgressEvent) => void>; // active SSE listeners
}
const tasks = new Map<string, Task>();
// Clean up tasks older than 30 min
setInterval(() => {
const cutoff = Date.now() - 30 * 60 * 1000;
for (const [id, t] of tasks) {
if (t.createdAt < cutoff) tasks.delete(id);
}
}, 5 * 60 * 1000);
function createTask(): string {
const id = randomUUID();
tasks.set(id, { status: "pending", createdAt: Date.now(), buffered: [], clients: new Set() });
return id;
}
function broadcast(task: Task, event: ProgressEvent): void {
if (task.status === "pending") task.buffered.push(event);
for (const client of task.clients) {
try { client(event); } catch { /* ignore broken pipe */ }
}
}
function finishTask(task: Task, status: TaskStatus, final: ProgressEvent): void {
task.status = status;
task.buffered.push(final);
for (const client of task.clients) {
try { client(final); } catch {}
}
task.clients.clear();
}
// ── Video generation options ──────────────────────────────────────────────────
export type VideoModel = "grok-3" | "veo-3-fast";
export interface VideoGenOptions {
model: VideoModel;
prompt: string;
negativePrompt?: string;
aspectRatio: "16:9" | "9:16" | "1:1" | "3:4" | "4:3";
resolution: "480p" | "720p" | "1080p";
duration: 5 | 6 | 8 | 10;
enhancePrompt: boolean;
refImageBase64?: string;
refImageMime?: string;
}
// ── Model constraints ─────────────────────────────────────────────────────────
// Grok-3 limits (API confirmed)
const GROK3_MAX_DURATION = 6;
const GROK3_MAX_RESOLUTION = "720p";
// Veo 3.1 Fast limits (API confirmed)
// duration: 8s only, aspect ratios: 16:9 / 9:16 only
const VEO_VALID_ASPECT_RATIOS = new Set(["16:9", "9:16"]);
const VEO_DURATION = 8 as const;
// Grok-3 aspect ratio mapping: raw user ratio → API keyword
const GROK_ASPECT_RATIO_MAP: Record<string, string> = {
"16:9": "landscape",
"4:3": "landscape",
"3:2": "3:2",
"9:16": "portrait",
"3:4": "portrait",
"2:3": "2:3",
"1:1": "square",
};
const VALID_ASPECT_RATIOS = new Set(["16:9", "9:16", "1:1", "3:4", "4:3"]);
const VALID_RESOLUTIONS = new Set(["480p", "720p", "1080p"]);
const VALID_DURATIONS = new Set([5, 6, 8, 10]);
const RESOLUTION_RANK: Record<string, number> = { "480p": 0, "720p": 1, "1080p": 2 };
export function parseVideoOptions(body: Record<string, unknown>, prompt: string): VideoGenOptions {
const model: VideoModel = body.model === "veo-3-fast" ? "veo-3-fast" : "grok-3";
let aspectRatio = VALID_ASPECT_RATIOS.has(body.aspectRatio as string)
? (body.aspectRatio as VideoGenOptions["aspectRatio"]) : "16:9";
let resolution = VALID_RESOLUTIONS.has(body.resolution as string)
? (body.resolution as VideoGenOptions["resolution"]) : "480p";
let duration = VALID_DURATIONS.has(Number(body.duration))
? (Number(body.duration) as VideoGenOptions["duration"]) : 6;
if (model === "grok-3") {
// Clamp resolution to max 720p
if (RESOLUTION_RANK[resolution] > RESOLUTION_RANK[GROK3_MAX_RESOLUTION]) {
resolution = GROK3_MAX_RESOLUTION;
}
// Clamp duration to max 6s
if (duration > GROK3_MAX_DURATION) {
duration = GROK3_MAX_DURATION as VideoGenOptions["duration"];
}
} else if (model === "veo-3-fast") {
// Force duration to 8s
duration = VEO_DURATION;
// Restrict to 16:9 / 9:16
if (!VEO_VALID_ASPECT_RATIOS.has(aspectRatio)) {
aspectRatio = "16:9";
}
}
const negativePrompt = typeof body.negativePrompt === "string" && body.negativePrompt.trim()
? body.negativePrompt.trim() : undefined;
const enhancePrompt = body.enhancePrompt !== false;
return { model, prompt, negativePrompt, aspectRatio, resolution, duration, enhancePrompt };
}
// ── Helper: build FormData ───────────────────────────────────────────────────
function base64ToBlob(base64: string, mime: string): Blob {
const binary = Buffer.from(base64, "base64");
return new Blob([binary], { type: mime });
}
// ── Grok-3 form builder ───────────────────────────────────────────────────────
function buildGrokForm(turnstileToken: string, opts: VideoGenOptions): FormData {
const form = new FormData();
form.append("prompt", opts.prompt);
form.append("model", "grok-3");
form.append("model_name", "grok-3");
// Aspect ratio: API expects 'landscape'|'portrait'|'square'|'3:2'|'2:3'
const apiAspectRatio = GROK_ASPECT_RATIO_MAP[opts.aspectRatio] ?? "landscape";
form.append("aspect_ratio", apiAspectRatio);
form.append("resolution", opts.resolution);
form.append("duration", opts.duration.toString());
form.append("num_result", "1");
form.append("enhance_prompt", opts.enhancePrompt ? "true" : "false");
form.append("turnstile_token", turnstileToken);
if (opts.negativePrompt) form.append("negative_prompt", opts.negativePrompt);
if (opts.refImageBase64 && opts.refImageMime) {
// Presence of 'files' tells the API this is image-to-video.
// Do NOT set mode='image_to_video' — the 'mode' field is a creativity level
// (normal | custom | extremely-crazy | extremely-spicy-or-crazy).
form.append("files", base64ToBlob(opts.refImageBase64, opts.refImageMime), "reference.jpg");
form.append("mode", "custom");
}
return form;
}
// ── Veo form builder ──────────────────────────────────────────────────────────
function buildVeoForm(turnstileToken: string, opts: VideoGenOptions): FormData {
const form = new FormData();
form.append("prompt", opts.prompt);
form.append("model", "veo-3-fast");
// Veo uses raw ratio strings directly ('16:9' or '9:16')
form.append("aspect_ratio", opts.aspectRatio);
form.append("duration", opts.duration.toString());
form.append("enhance_prompt", opts.enhancePrompt ? "true" : "false");
form.append("turnstile_token", turnstileToken);
if (opts.negativePrompt) form.append("negative_prompt", opts.negativePrompt);
if (opts.refImageBase64 && opts.refImageMime) {
form.append("ref_images", base64ToBlob(opts.refImageBase64, opts.refImageMime), "reference.jpg");
form.append("mode_image", "image_to_video");
}
return form;
}
// ── Grok-3 SSE endpoint ───────────────────────────────────────────────────────
async function callGrokEndpoint(
bearerToken: string,
turnstileToken: string,
opts: VideoGenOptions,
): Promise<Response> {
const guardId = generateGuardId("/api/video-gen/grok-stream", "post");
return fetch(GROK_ENDPOINT, {
method: "POST",
headers: {
Authorization: `Bearer ${bearerToken}`,
"x-guard-id": guardId,
"User-Agent": USER_AGENT,
Accept: "text/event-stream, application/json",
},
body: buildGrokForm(turnstileToken, opts),
});
}
// ── Veo POST endpoint (returns {uuid}, then poll) ─────────────────────────────
async function callVeoEndpoint(
bearerToken: string,
turnstileToken: string,
opts: VideoGenOptions,
): Promise<{ uuid: string }> {
const guardId = generateGuardId("/api/video-gen/veo", "post");
const resp = await fetch(VEO_ENDPOINT, {
method: "POST",
headers: {
Authorization: `Bearer ${bearerToken}`,
"x-guard-id": guardId,
"User-Agent": USER_AGENT,
Accept: "application/json",
},
body: buildVeoForm(turnstileToken, opts),
});
if (!resp.ok) {
const raw = await resp.text().catch(() => "");
const { code, msg } = parseErrBody(raw);
throw Object.assign(new Error(msg || `HTTP ${resp.status}`), { code, status: resp.status, raw });
}
const data = await resp.json() as { uuid?: string; history_uuid?: string; task_id?: string; id?: string };
console.log("[veo-submit] response:", JSON.stringify(data).slice(0, 300));
const uuid = data.uuid || data.history_uuid || data.task_id || data.id;
if (!uuid) throw new Error(`Veo API did not return a UUID. Response: ${JSON.stringify(data).slice(0, 200)}`);
return { uuid };
}
// ── One-shot history fetch: get the R2 pre-signed URL after generation ────────
// geminigen.ai stores completed videos to Cloudflare R2 and returns a 7-day
// pre-signed URL in generated_video[0].video_url. This works without any
// additional auth, unlike the intermediate assets.grok.com CDN URL.
async function fetchHistoryVideoUrl(
uuid: string,
bearerToken: string,
): Promise<{ videoUrl: string; thumbnailUrl: string | null } | null> {
try {
const guardId = generateGuardId("/api/history/" + uuid, "get");
const resp = await fetch(`${GEMINIGEN_BASE}/api/history/${uuid}`, {
headers: {
Authorization: `Bearer ${bearerToken}`,
"x-guard-id": guardId,
"User-Agent": USER_AGENT,
Accept: "application/json",
},
});
if (!resp.ok) {
console.warn("[history-fetch] HTTP", resp.status, "for uuid", uuid);
return null;
}
const data = await resp.json() as {
status?: number;
generated_video?: Array<{ video_url?: string; thumbnail_url?: string }>;
};
const vid = data.generated_video?.[0];
const videoUrl = vid?.video_url ?? null;
const thumbnailUrl = vid?.thumbnail_url ?? null;
if (videoUrl) {
console.log("[history-fetch] got R2 URL:", videoUrl.slice(0, 120));
return { videoUrl, thumbnailUrl };
}
return null;
} catch (err) {
console.warn("[history-fetch] error:", err instanceof Error ? err.message : err);
return null;
}
}
// ── Veo polling: GET /api/history/{uuid} until status 2/3 ────────────────────
// Veo generation typically takes 2–8 minutes on geminigen.ai; we poll up to
// 12 minutes with a 20s interval (first check after 15s to catch fast results).
async function pollVeoHistory(
uuid: string,
bearerToken: string,
onProgress: (msg: string) => void,
maxWaitMs = 720_000, // 12 minutes
): Promise<{ videoUrl: string; thumbnailUrl: string | null } | null> {
const start = Date.now();
let attempt = 0;
let waitMs = 15_000; // first check sooner; subsequent checks every 20s
while (Date.now() - start < maxWaitMs) {
await new Promise((r) => setTimeout(r, waitMs));
waitMs = 20_000; // steady-state interval
attempt++;
const elapsed = Math.round((Date.now() - start) / 1000);
onProgress(`Veo 生成中,第 ${attempt} 次確認... (已等待 ${elapsed}s)`);
const guardId = generateGuardId("/api/history/" + uuid, "get");
let resp: Response;
try {
resp = await fetch(`${GEMINIGEN_BASE}/api/history/${uuid}`, {
headers: {
Authorization: `Bearer ${bearerToken}`,
"x-guard-id": guardId,
"User-Agent": USER_AGENT,
Accept: "application/json",
},
signal: AbortSignal.timeout(15_000),
});
} catch (fetchErr) {
console.warn("[veo-poll] fetch error on attempt", attempt, ":", fetchErr instanceof Error ? fetchErr.message : fetchErr);
continue;
}
if (!resp.ok) {
console.warn("[veo-poll] history HTTP", resp.status, "for uuid", uuid, "attempt", attempt);
continue;
}
let data: {
status?: number;
generated_video?: Array<{ video_url?: string; thumbnail_url?: string; media_url?: string }>;
video_url?: string;
};
try {
data = await resp.json();
} catch {
console.warn("[veo-poll] JSON parse failed on attempt", attempt);
continue;
}
console.log("[veo-poll] attempt", attempt, "status=", data.status,
"has_video=", !!(data.generated_video?.[0]?.video_url || data.video_url));
if (data.status === 2) {
const vid = data.generated_video?.[0];
// Try multiple possible URL fields
const videoUrl = vid?.video_url || vid?.media_url || data.video_url || null;
const thumbnailUrl = vid?.thumbnail_url ?? null;
if (videoUrl) {
console.log("[veo-poll] completed with video URL:", videoUrl.slice(0, 120));
return { videoUrl, thumbnailUrl };
}
// Status 2 but no URL — wait a bit and try once more
console.warn("[veo-poll] status=2 but no video URL, retrying in 10s...");
await new Promise((r) => setTimeout(r, 10_000));
const guardId2 = generateGuardId("/api/history/" + uuid, "get");
const resp2 = await fetch(`${GEMINIGEN_BASE}/api/history/${uuid}`, {
headers: { Authorization: `Bearer ${bearerToken}`, "x-guard-id": guardId2,
"User-Agent": USER_AGENT, Accept: "application/json" },
}).catch(() => null);
if (resp2?.ok) {
const data2 = await resp2.json().catch(() => ({})) as typeof data;
const vid2 = data2.generated_video?.[0];
const videoUrl2 = vid2?.video_url || vid2?.media_url || data2.video_url || null;
if (videoUrl2) return { videoUrl: videoUrl2, thumbnailUrl: vid2?.thumbnail_url ?? null };
}
return null; // complete but no URL
}
if (data.status === 3) {
console.warn("[veo-poll] status=3 (failed) for uuid", uuid);
return null; // failed
}
// status 1 = still generating; any other status: continue polling
}
console.warn("[veo-poll] timed out after", maxWaitMs / 1000, "s for uuid", uuid);
return null; // timeout
}
// ── Helper: parse error body ─────────────────────────────────────────────────
function parseErrBody(text: string): { code: string; msg: string } {
try {
const d = JSON.parse(text) as { detail?: { error_code?: string; error_message?: string } };
return {
code: d?.detail?.error_code || "",
msg: (d?.detail?.error_message || "").toLowerCase(),
};
} catch {
return { code: "", msg: text.toLowerCase() };
}
}
// ── SSE stream reader ─────────────────────────────────────────────────────────
interface StreamResult {
videoUrl: string | null;
thumbnailUrl: string | null;
uuid: string | null;
lastEvent: unknown;
errorCode: string | null;
errorMsg: string | null;
}
/**
* Resolve a potentially-relative video URL to an absolute URL.
*
* geminigen.ai's SSE stream returns a field like:
* "videoUrl": "users/{userId}/generated/{videoId}/gen_xxx.mp4?signed=..."
* which needs to be prefixed with the R2 CDN base, or it may already be a
* full https:// signed URL (if the JSON was not truncated in our logs).
*
* NOTE: confirmed from production logs — video files are served from
* https://assets.grok.com/ NOT from https://api.geminigen.ai/
*/
const VIDEO_CDN_BASE = "https://assets.grok.com/";
function resolveVideoUrl(rawUrl: string): string {
if (!rawUrl) return rawUrl;
if (rawUrl.startsWith("http://") || rawUrl.startsWith("https://")) return rawUrl;
// Relative path → prepend CDN base
return VIDEO_CDN_BASE + rawUrl;
}
/**
* Extract the geminigen.ai streaming video generation response from a parsed
* SSE event. The structure (confirmed from production logs) is:
*
* parsed.data.result.response.streamingVideoGenerationResponse
* .progress : 0–100
* .videoUrl : relative or absolute URL (only when progress === 100)
*/
function extractSVGR(parsed: Record<string, unknown>): Record<string, unknown> | null {
try {
const data = parsed.data as Record<string, unknown> | undefined;
const result = data?.result as Record<string, unknown> | undefined;
const response = result?.response as Record<string, unknown> | undefined;
const svgr = response?.streamingVideoGenerationResponse as Record<string, unknown> | undefined;
return svgr ?? null;
} catch {
return null;
}
}
async function readVideoStream(
resp: Response,
onEvent: (parsed: Record<string, unknown>) => void,
maxWaitMs = 300_000,
): Promise<StreamResult> {
const decoder = new TextDecoder();
const reader = resp.body?.getReader();
const empty: StreamResult = {
videoUrl: null, thumbnailUrl: null, uuid: null, lastEvent: null, errorCode: null, errorMsg: null,
};
if (!reader) return empty;
let videoUrl: string | null = null;
let thumbnailUrl: string | null = null;
let uuid: string | null = null;
let lastEvent: unknown = null;
let errorCode: string | null = null;
let errorMsg: string | null = null;
let buffer = "";
let historyUuid: string | null = null;
const deadline = Date.now() + maxWaitMs;
try {
while (Date.now() < deadline) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed || trimmed === "data: [DONE]") continue;
const jsonStr = trimmed.startsWith("data:") ? trimmed.slice(5).trim() : trimmed;
let parsed: Record<string, unknown>;
try {
parsed = JSON.parse(jsonStr);
} catch {
continue;
}
lastEvent = parsed;
// Log full event (no truncation — we need the complete video URL)
console.log("[video-stream]", JSON.stringify(parsed));
// ── Error detection ───────────────────────────────────────────────────
// Pattern A: {"detail":{"error_code":"..."}}
const detail = parsed.detail as Record<string, string> | undefined;
if (detail?.error_code) {
errorCode = detail.error_code;
errorMsg = detail.error_message || detail.error_code;
console.error(`[video-stream] error: ${errorCode}${errorMsg}`);
return { videoUrl: null, thumbnailUrl: null, uuid, lastEvent, errorCode, errorMsg };
}
// Pattern B: top-level error_code
if (typeof parsed.error_code === "string" && parsed.error_code) {
errorCode = parsed.error_code;
errorMsg = (parsed.error_message as string) || parsed.error_code;
return { videoUrl: null, thumbnailUrl: null, uuid, lastEvent, errorCode, errorMsg };
}
// Pattern C: top-level error string (no status field)
if (typeof parsed.error === "string" && parsed.error && typeof parsed.status === "undefined") {
errorCode = "STREAM_ERROR";
errorMsg = parsed.error;
return { videoUrl: null, thumbnailUrl: null, uuid, lastEvent, errorCode, errorMsg };
}
// ── Initial event: {"success":true,"message":"Video generation started",
// "history_id":...,"history_uuid":"...","status":1} ──
if (parsed.success === true && typeof parsed.history_uuid === "string") {
historyUuid = parsed.history_uuid;
uuid = historyUuid;
if (parsed.message === "Video generation started") {
onEvent({ type: "started", historyUuid });
continue;
}
}
// ── Completion signal: {"success":true,"message":"Video generation complete..."} ──
if (parsed.success === true && typeof parsed.message === "string"
&& parsed.message.includes("Video generation complete")) {
console.log("[video-stream] generation complete signal received");
return { videoUrl, thumbnailUrl, uuid, lastEvent, errorCode: null, errorMsg: null };
}
// ── HD URL event: {"success":true,"data":{"video_id":"...","hdMediaUrl":"https://assets.grok.com/..."}} ──
// Prefer this over the relative videoUrl from SVGR since it's already absolute and higher quality.
{
const d = parsed.data as Record<string, unknown> | undefined;
if (d && typeof d.hdMediaUrl === "string" && d.hdMediaUrl.startsWith("https://")) {
console.log("[video-url] using hdMediaUrl:", d.hdMediaUrl.slice(0, 120));
videoUrl = d.hdMediaUrl;
}
}
// ── Progress events: data.result.response.streamingVideoGenerationResponse ──
const svgr = extractSVGR(parsed);
if (svgr) {
const progress = typeof svgr.progress === "number" ? svgr.progress : null;
const rawUrl = typeof svgr.videoUrl === "string" ? svgr.videoUrl : null;
const videoId = typeof svgr.videoId === "string" ? svgr.videoId : undefined;
if (rawUrl) {
videoUrl = resolveVideoUrl(rawUrl);
try {
const u = new URL(videoUrl);
console.log("[video-url] origin:", u.origin);
console.log("[video-url] path:", u.pathname);
const qs = u.search;
for (let i = 0; i < qs.length; i += 200) {
console.log("[video-url] qs[" + Math.floor(i / 200) + "]:", qs.slice(i, i + 200));
}
console.log("[video-url] total_length:", videoUrl.length, "| raw_length:", rawUrl.length);
} catch {
console.log("[video-url] raw (non-URL):", rawUrl.slice(0, 200));
}
}
// Extract thumbnail URL (only present at progress=100)
const rawThumb = typeof svgr.thumbnailImageUrl === "string" ? svgr.thumbnailImageUrl : null;
if (rawThumb) thumbnailUrl = resolveVideoUrl(rawThumb);
onEvent({ type: "progress", progress, videoId, videoUrl });
if (progress === 100 && videoUrl) {
console.log("[video-stream] progress=100, video ready");
// Don't return yet — wait for the "Video generation complete" signal
// which ensures the backend has finished processing.
// But if no more events come, videoUrl is set so we'll return at stream end.
}
continue;
}
// ── Legacy format fallback ─────────────────────────────────────────────
// In case geminigen.ai changes format back, still check old fields
if (!uuid && typeof parsed.uuid === "string") uuid = parsed.uuid;
const legacyCandidate =
(parsed.video_url as string | undefined) ||
(parsed.generated_video as Array<{ video_url?: string }> | undefined)?.[0]?.video_url ||
null;
if (legacyCandidate) videoUrl = resolveVideoUrl(legacyCandidate);
if (typeof parsed.status === "number" && (parsed.status === 2 || parsed.status === 3)) {
return { videoUrl, thumbnailUrl, uuid, lastEvent, errorCode: null, errorMsg: null };
}
if (typeof parsed.status === "number" && parsed.status > 3) {
errorCode = "GEN_FAILED";
errorMsg = (parsed.message as string) || `status=${parsed.status}`;
return { videoUrl: null, thumbnailUrl: null, uuid, lastEvent, errorCode, errorMsg };
}
}
}
} finally {
reader.cancel().catch(() => {});
}
// Stream ended naturally — if we have a videoUrl it's a success
return { videoUrl, thumbnailUrl, uuid, lastEvent, errorCode, errorMsg };
}
// ── Shared token helpers ──────────────────────────────────────────────────────
function makeTokenHelpers() {
const failedPoolIds: number[] = [];
let currentAccountId: number | null = null;
async function pickToken(): Promise<string | null> {
const poolEntry = await getPoolToken(failedPoolIds);
if (poolEntry) { currentAccountId = poolEntry.accountId; return poolEntry.token; }
currentAccountId = null;
return getValidBearerToken();
}
async function handleTokenExpiry(): Promise<string | null> {
if (currentAccountId !== null) {
const refreshed = await tryRefreshPoolAccount(currentAccountId);
if (refreshed) return refreshed;
failedPoolIds.push(currentAccountId);
const next = await getPoolToken(failedPoolIds);
if (next) { currentAccountId = next.accountId; return next.token; }
}
return refreshAccessToken();
}
return { pickToken, handleTokenExpiry };
}
// ── Grok-3 background task runner (SSE streaming) ────────────────────────────
async function runGrokTask(
taskId: string,
opts: VideoGenOptions,
isPrivate: boolean,
userId: number | null,
): Promise<void> {
const task = tasks.get(taskId);
if (!task) return;
const { pickToken, handleTokenExpiry } = makeTokenHelpers();
try {
broadcast(task, { type: "start", message: "正在取得 Turnstile 驗證碼..." });
let turnstileToken = await getTurnstileToken();
broadcast(task, { type: "start", message: "正在取得 Bearer Token..." });
let token = await pickToken();
if (!token) {
finishTask(task, "failed", { type: "error", errorCode: "NO_TOKEN", message: "未設定 API Token,請到管理後台設定" });
return;
}
broadcast(task, { type: "start", message: "正在連接 geminigen.ai (Grok-3)..." });
let resp = await callGrokEndpoint(token, turnstileToken, opts);
// ── Handle HTTP-level errors ─────────────────────────────────────────────
if (!resp.ok) {
const rawText = await resp.text().catch(() => "");
const { code, msg } = parseErrBody(rawText);
const isCaptcha = msg.includes("captcha") || msg.includes("turnstile")
|| code === "CAPTCHA_ERROR" || code === "INVALID_CAPTCHA";
if (isCaptcha) {
broadcast(task, { type: "start", message: "Turnstile 拒絕,正在重新取得..." });
invalidateTurnstileToken();
turnstileToken = await getTurnstileToken();
resp = await callGrokEndpoint(token, turnstileToken, opts);
} else {
const isExpired = resp.status === 401 || resp.status === 403
|| code === "TOKEN_EXPIRED" || code === "INVALID_CREDENTIALS"
|| msg.includes("token") || msg.includes("expired") || msg.includes("credential");
if (isExpired) {
broadcast(task, { type: "start", message: "Token 過期,正在刷新..." });
const newToken = await handleTokenExpiry();
if (!newToken) {
finishTask(task, "failed", {
type: "error", errorCode: "TOKEN_EXPIRED",
message: "Token 已過期且無法自動刷新,請到管理後台更新 Token",
});
return;
}
token = newToken;
resp = await callGrokEndpoint(token, turnstileToken, opts);
}
}
if (!resp.ok) {
const raw2 = await resp.text().catch(() => "");
const { code: c2, msg: m2 } = parseErrBody(raw2);
finishTask(task, "failed", {
type: "error",
errorCode: c2 || "API_ERROR",
message: m2 || `HTTP ${resp.status}`,
});
return;
}
}
broadcast(task, { type: "start", message: "AI 正在生成影片,這可能需要 1–5 分鐘..." });
// ── Read SSE stream ──────────────────────────────────────────────────────
const makeProgressHandler = (label: string) => (parsed: Record<string, unknown>) => {
// New format: { type: "progress", progress: 0-100 }
if (parsed.type === "progress") {
const pct = typeof parsed.progress === "number" ? parsed.progress : null;
const pctStr = pct !== null ? ` (${pct}%)` : "";
broadcast(task, { type: "progress", progress: pct, message: `${label}${pctStr}` });
} else if (parsed.type === "started") {
broadcast(task, { type: "progress", message: "AI 開始生成..." });
} else {
// Legacy / fallback
const u = typeof parsed.uuid === "string" ? parsed.uuid : undefined;
broadcast(task, { type: "progress", uuid: u, message: label });
}
};
let streamResult = await readVideoStream(resp, makeProgressHandler("AI 生成中"));
// ── Retry logic for stream-level errors ──────────────────────────────────
if (!streamResult.videoUrl && streamResult.errorCode) {
const ec = streamResult.errorCode;
const em = (streamResult.errorMsg || "").toLowerCase();
const isStreamCaptcha = ec === "CAPTCHA_ERROR" || ec === "INVALID_CAPTCHA"
|| em.includes("captcha") || em.includes("turnstile");
const isStreamToken = ec === "TOKEN_EXPIRED" || ec === "INVALID_CREDENTIALS"
|| ec === "UNAUTHORIZED" || em.includes("token") || em.includes("expired")
|| em.includes("credentials") || em.includes("invalid credential");
if (isStreamCaptcha) {
broadcast(task, { type: "start", message: "Turnstile 在串流中拒絕,重新取得並重試..." });
invalidateTurnstileToken();
turnstileToken = await getTurnstileToken();
const retryResp = await callGrokEndpoint(token, turnstileToken, opts);
if (retryResp.ok) {
streamResult = await readVideoStream(retryResp, makeProgressHandler("AI 生成中(重試)"));
}
} else if (isStreamToken) {
broadcast(task, { type: "start", message: `Token 錯誤 (${ec}),重新刷新並重試...` });
const newToken = await handleTokenExpiry();
if (!newToken) {
finishTask(task, "failed", {
type: "error", errorCode: "TOKEN_EXPIRED",
message: "Token 已過期且無法自動刷新,請到管理後台更新 Token",
});
return;
}
token = newToken;
const retryResp = await callGrokEndpoint(token, turnstileToken, opts);
if (retryResp.ok) {
streamResult = await readVideoStream(retryResp, makeProgressHandler("AI 生成中(重試)"));
}
}
}
let { videoUrl, thumbnailUrl, uuid, errorCode: finalCode, errorMsg: finalMsg } = streamResult;
if (!videoUrl && !uuid) {
const msg = finalMsg ? `生成失敗:${finalMsg}` : "未取得影片 URL,生成可能已失敗或超時";
finishTask(task, "failed", { type: "error", errorCode: finalCode || "NO_VIDEO", message: msg });
return;
}
// ── Step 2: Fetch R2 pre-signed URL from history API ─────────────────────
// The SSE stream gives us an assets.grok.com URL (requires auth + Cloudflare cookies).
// The history API returns a Cloudflare R2 pre-signed URL (7-day, publicly accessible).
// Always try to get the R2 URL regardless of whether SSE already gave us a URL.
if (uuid) {
broadcast(task, { type: "progress", progress: 100, message: "正在取得影片下載連結..." });
const historyResult = await fetchHistoryVideoUrl(uuid, token);
if (historyResult?.videoUrl) {
videoUrl = historyResult.videoUrl;
if (historyResult.thumbnailUrl) thumbnailUrl = historyResult.thumbnailUrl;
console.log("[grok-task] using R2 URL from history API");
}
}
if (!videoUrl) {
const msg = finalMsg ? `生成失敗:${finalMsg}` : "未取得影片 URL,生成可能已失敗或超時";
finishTask(task, "failed", { type: "error", errorCode: finalCode || "NO_VIDEO", message: msg });
return;
}
// Store the URL (R2 pre-signed if available, falls back to assets.grok.com).
const [saved] = await db
.insert(videosTable)
.values({
videoUrl,
thumbnailUrl: thumbnailUrl ?? null,
prompt: opts.prompt,
negativePrompt: opts.negativePrompt ?? null,
model: opts.model,
aspectRatio: opts.aspectRatio,
resolution: opts.resolution,
duration: opts.duration,
hasRefImage: !!(opts.refImageBase64 && opts.refImageMime),
isPrivate,
userId,
})
.returning();
finishTask(task, "complete", { type: "complete", video: saved, uuid: uuid ?? undefined });
// ── Background: download video to S3 for permanent storage ───────────────
// R2 pre-signed URLs expire in 7 days; S3 copy is permanent.
if (isStorageReady()) {
(async () => {
try {
const storedPath = await downloadAndStoreVideo(videoUrl!, token);
if (storedPath) {
await db.update(videosTable).set({ videoUrl: storedPath }).where(eq(videosTable.id, saved.id));
console.log(`[grok-task] video cached in storage: ${storedPath}`);
}
} catch (e) {
console.warn("[grok-task] background storage failed:", e instanceof Error ? e.message : e);
}
})();
}
} catch (err: unknown) {
const msg = err instanceof Error ? err.message : String(err);
console.error("[grok-task] unexpected error:", msg);
finishTask(task, "failed", { type: "error", errorCode: "INTERNAL_ERROR", message: msg });
}
}
// ── Veo 3.1 Fast background task runner (POST + polling) ─────────────────────
async function runVeoTask(
taskId: string,
opts: VideoGenOptions,
isPrivate: boolean,
userId: number | null,
): Promise<void> {
const task = tasks.get(taskId);
if (!task) return;
const { pickToken, handleTokenExpiry } = makeTokenHelpers();
try {
broadcast(task, { type: "start", message: "正在取得 Turnstile 驗證碼..." });
let turnstileToken = await getTurnstileToken();
broadcast(task, { type: "start", message: "正在取得 Bearer Token..." });
let token = await pickToken();
if (!token) {
finishTask(task, "failed", { type: "error", errorCode: "NO_TOKEN", message: "未設定 API Token,請到管理後台設定" });
return;
}
broadcast(task, { type: "start", message: "正在提交 Veo 3.1 Fast 任務..." });
// Submit to Veo endpoint (returns UUID immediately)
let veoResult: { uuid: string };
try {
veoResult = await callVeoEndpoint(token, turnstileToken, opts);
} catch (err: unknown) {
const e = err as Error & { code?: string; status?: number };
const code = e.code ?? "";
const msg = (e.message ?? "").toLowerCase();
const isCaptcha = msg.includes("captcha") || msg.includes("turnstile")
|| code === "CAPTCHA_ERROR" || code === "INVALID_CAPTCHA";
const isExpired = e.status === 401 || e.status === 403
|| code === "TOKEN_EXPIRED" || msg.includes("token") || msg.includes("expired");
if (isCaptcha) {
broadcast(task, { type: "start", message: "Turnstile 拒絕,重新取得並重試..." });
invalidateTurnstileToken();
turnstileToken = await getTurnstileToken();
veoResult = await callVeoEndpoint(token, turnstileToken, opts);
} else if (isExpired) {
broadcast(task, { type: "start", message: "Token 過期,正在刷新..." });
const newToken = await handleTokenExpiry();
if (!newToken) {
finishTask(task, "failed", { type: "error", errorCode: "TOKEN_EXPIRED", message: "Token 過期且無法自動刷新,請到管理後台更新 Token" });
return;
}
token = newToken;
veoResult = await callVeoEndpoint(token, turnstileToken, opts);
} else {
throw err;
}
}
const { uuid } = veoResult;
broadcast(task, {
type: "progress",
progress: 5,
message: `Veo 任務已提交 (UUID: ${uuid.slice(0, 8)}...),等待生成(每 30 秒確認一次)...`,
});
// Poll until done (max 5 minutes, every 30s)
const pollResult = await pollVeoHistory(
uuid,
token,
(msg) => broadcast(task, { type: "progress", progress: null, message: msg }),
);
if (!pollResult || !pollResult.videoUrl) {
finishTask(task, "failed", { type: "error", errorCode: "NO_VIDEO", message: "Veo 未返回影片 URL,可能生成失敗或超時" });
return;
}
const { videoUrl, thumbnailUrl } = pollResult;
const absVideoUrl = resolveVideoUrl(videoUrl);
const absThumbUrl = thumbnailUrl ? resolveVideoUrl(thumbnailUrl) : null;
// Store raw CDN URL immediately; background GCS download follows.
const [saved] = await db
.insert(videosTable)
.values({
videoUrl: absVideoUrl,
thumbnailUrl: absThumbUrl ?? null,
prompt: opts.prompt,
negativePrompt: opts.negativePrompt ?? null,
model: opts.model,
aspectRatio: opts.aspectRatio,
resolution: opts.resolution,
duration: opts.duration,
hasRefImage: !!(opts.refImageBase64 && opts.refImageMime),
isPrivate,
userId,
})
.returning();
finishTask(task, "complete", { type: "complete", video: saved, uuid });
// ── Background: download video to GCS ────────────────────────────────────
if (isStorageReady()) {
(async () => {
try {
const storedPath = await downloadAndStoreVideo(absVideoUrl, token);
if (storedPath) {
await db.update(videosTable).set({ videoUrl: storedPath }).where(eq(videosTable.id, saved.id));
console.log(`[veo-task] video cached in storage: ${storedPath}`);
}
} catch (e) {
console.warn("[veo-task] background storage failed:", e instanceof Error ? e.message : e);
}
})();
}
} catch (err: unknown) {
const msg = err instanceof Error ? err.message : String(err);
console.error("[veo-task] unexpected error:", msg);
finishTask(task, "failed", { type: "error", errorCode: "INTERNAL_ERROR", message: msg });
}
}
// ── Credit helpers ─────────────────────────────────────────────────────────────
async function getVideoConfigVal(key: string): Promise<string | null> {
const rows = await db.select({ value: configTable.value }).from(configTable).where(eq(configTable.key, key)).limit(1);
return rows[0]?.value ?? null;
}
async function checkAndDeductVideoCredits(userId: number, cost: number, description: string): Promise<{ ok: boolean; balance?: number }> {
const enabled = await getVideoConfigVal("enable_credits");
if (enabled !== "true") return { ok: true };
const [user] = await db.select({ credits: usersTable.credits }).from(usersTable).where(eq(usersTable.id, userId)).limit(1);
if (!user) return { ok: false };
if (user.credits < cost) return { ok: false, balance: user.credits };
const [updated] = await db
.update(usersTable)
.set({ credits: sql`${usersTable.credits} - ${cost}` })
.where(eq(usersTable.id, userId))
.returning({ credits: usersTable.credits });
await db.insert(creditTransactionsTable).values({ userId, amount: -cost, type: "spend", description });
return { ok: true, balance: updated.credits };
}
// ── POST /api/videos/generate ─────────────────────────────────────────────────
// Returns taskId immediately; generation runs in background.
router.post("/generate", optionalJwtAuth, async (req, res) => {
const body = req.body as Record<string, unknown>;
const prompt = typeof body.prompt === "string" ? body.prompt.trim() : "";
if (!prompt || prompt.length < 1 || prompt.length > 2000) {
return res.status(400).json({ error: "INVALID_BODY", message: "prompt is required (1-2000 chars)" });
}
const isPrivate = body.isPrivate === true;
// Parse video options (aspectRatio, resolution, duration, negativePrompt, enhancePrompt)
const opts = parseVideoOptions(body, prompt);
// Attach reference image if provided
const refImageBase64 = typeof body.referenceImageBase64 === "string" ? body.referenceImageBase64 : undefined;
const refImageMime = typeof body.referenceImageMime === "string" ? body.referenceImageMime : undefined;
if (refImageBase64 && refImageMime) {
opts.refImageBase64 = refImageBase64;
opts.refImageMime = refImageMime;
}
const userId: number | null = (req as any).jwtUserId ?? null;
// ── Credits check ────────────────────────────────────────────────────────────
if (userId !== null) {
const costStr = await getVideoConfigVal("video_gen_cost");
const cost = Number(costStr) || 0;
if (cost > 0) {
const creditResult = await checkAndDeductVideoCredits(userId, cost, `影片生成(${opts.model})`);
if (!creditResult.ok) {
return res.status(402).json({
error: "INSUFFICIENT_CREDITS",
message: `點數不足,此操作需要 ${cost} 點`,
balance: creditResult.balance ?? 0,
});
}
}
}
const taskId = createTask();
res.json({ taskId });
// Dispatch to the correct runner based on model
const runner = opts.model === "veo-3-fast" ? runVeoTask : runGrokTask;
runner(taskId, opts, isPrivate, userId).catch((err) => {
console.error("[video-task] uncaught:", err);
const t = tasks.get(taskId);
if (t && t.status === "pending") {
finishTask(t, "failed", { type: "error", errorCode: "INTERNAL_ERROR", message: String(err) });
}
});
});
// ── GET /api/videos/progress/:taskId (SSE) ──────────────────────────────────
router.get("/progress/:taskId", (req, res: ExpressResponse) => {
const task = tasks.get(req.params.taskId);
if (!task) return res.status(404).json({ error: "TASK_NOT_FOUND" });
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
});
res.flushHeaders();
const send = (event: ProgressEvent) => {
res.write(`data: ${JSON.stringify(event)}\n\n`);
};
// Replay buffered events
for (const ev of task.buffered) {
send(ev);
}
// If already done, close immediately
if (task.status !== "pending") {
res.write("data: {\"type\":\"done\"}\n\n");
res.end();
return;
}
// Subscribe to future events
task.clients.add(send);
// Heartbeat every 20s to prevent proxy / CDN from closing idle SSE connections
const heartbeat = setInterval(() => {
try { res.write(": heartbeat\n\n"); } catch { clearInterval(heartbeat); }
}, 20_000);
const onClose = () => {
clearInterval(heartbeat);
task.clients.delete(send);
};
req.on("close", onClose);
req.on("end", onClose);
});
// Helper: rewrite raw geminigen URLs to proxy URL for backward compatibility.
// assets.grok.com URLs are returned as-is (browser plays directly).
// /api/videos/stored/ URLs are returned as-is (GCS).
function toProxyUrl(url: string | null): string | null {
if (!url) return null;
if (url.startsWith("/api/videos/proxy")) return url;
if (url.startsWith("/api/videos/stored")) return url;
try {
const u = new URL(url);
// Cloudflare R2 pre-signed URLs (*.r2.cloudflarestorage.com) are publicly
// accessible — no proxy needed. Browser plays them directly.
if (u.hostname.endsWith(".r2.cloudflarestorage.com")) return url;
// assets.grok.com requires auth/cookies — route through our proxy.
if (u.hostname === "api.geminigen.ai" || u.hostname === "assets.grok.com") {
return `/api/videos/proxy?url=${encodeURIComponent(url)}`;
}
} catch { /* relative path or non-URL */ }
return url;
}
// ── GET /api/videos/stored/<id>.mp4 ──────────────────────────────────────────
// Serve videos that were downloaded from CDN and stored in GCS.
router.get(/^\/stored\/(.+)$/, async (req, res) => {
const objectPath = (req.params as Record<string, string>)[0];
if (!objectPath) return res.status(400).json({ error: "Missing objectPath" });
await streamStoredVideo(objectPath, res, req.headers.range);
});
// ── GET /api/videos/history ───────────────────────────────────────────────────
router.get("/history", optionalJwtAuth, async (req, res) => {
const userId: number | null = (req as any).jwtUserId ?? null;
const limit = Math.min(Number(req.query.limit) || 20, 50);
const offset = Number(req.query.offset) || 0;
const visibilityFilter = userId
? or(eq(videosTable.isPrivate, false), and(eq(videosTable.isPrivate, true), eq(videosTable.userId, userId)))
: eq(videosTable.isPrivate, false);
const rows = await db
.select()
.from(videosTable)
.where(visibilityFilter)
.orderBy(desc(videosTable.createdAt))
.limit(limit)
.offset(offset);
// Rewrite any legacy direct geminigen URLs to proxy URLs
const videos = rows.map((v) => ({
...v,
videoUrl: toProxyUrl(v.videoUrl) ?? v.videoUrl,
thumbnailUrl: toProxyUrl(v.thumbnailUrl),
}));
return res.json({ videos, limit, offset });
});
// ── GET /api/videos/proxy ─────────────────────────────────────────────────────
// Proxy geminigen.ai video/thumbnail files with Bearer token auth.
// Supports HTTP Range requests so the browser's <video> player can seek.
router.get("/proxy", async (req, res) => {
const raw = typeof req.query.url === "string" ? req.query.url : "";
if (!raw) return res.status(400).json({ error: "Missing url param" });
// Only proxy geminigen.ai and Grok CDN assets
let targetUrl: URL;
try {
targetUrl = new URL(raw);
} catch {
return res.status(400).json({ error: "Invalid url" });
}
const ALLOWED_PROXY_HOSTS = ["api.geminigen.ai", "assets.grok.com"];
if (!ALLOWED_PROXY_HOSTS.includes(targetUrl.hostname)) {
return res.status(403).json({ error: "URL not allowed" });
}
// Forward Range header if present (needed for video seeking)
const rangeHeader = req.headers.range;
/**
* assets.grok.com is the Grok CDN. It appears to be a public CDN that does
* NOT accept the geminigen.ai JWT as auth (returns 403 if you send it).
* Strategy: try first WITHOUT Authorization, fall back to with-token if needed.
*
* api.geminigen.ai requires Bearer auth as before.
*/
const isGrokCdn = targetUrl.hostname === "assets.grok.com";
// For assets.grok.com, use Referer=https://grok.com/ (same origin as the CDN).
// For geminigen.ai, use Referer=https://geminigen.ai/.
const referer = isGrokCdn ? "https://grok.com/" : "https://geminigen.ai/";
const buildHeaders = (token?: string | null): Record<string, string> => {
const h: Record<string, string> = {
"User-Agent": USER_AGENT,
Accept: "video/mp4,video/*,*/*",
Referer: referer,
Origin: isGrokCdn ? "https://grok.com" : "https://geminigen.ai",
};
if (rangeHeader) h["Range"] = rangeHeader;
if (token) h["Authorization"] = `Bearer ${token}`;
return h;
};
try {
// Always fetch the Bearer token (needed for both geminigen.ai and assets.grok.com)
const poolResult = await getPoolToken();
const bearerToken = poolResult?.token ?? await getValidBearerToken();
// 1. First attempt: with Bearer token for both endpoints
let upstream = await fetch(targetUrl.toString(), {
headers: buildHeaders(bearerToken),
});
// 2. If first attempt fails, try without auth (in case token is wrong for CDN)
if (!upstream.ok && upstream.status !== 206) {
console.warn(`[video-proxy] first attempt ${upstream.status} for ${targetUrl.hostname}${targetUrl.pathname}`);
if (isGrokCdn) {
// Try without auth as fallback
upstream = await fetch(targetUrl.toString(), { headers: buildHeaders(null) });
if (!upstream.ok && upstream.status !== 206) {
// Refresh token and try again
const newToken = await refreshAccessToken();
if (newToken) {
upstream = await fetch(targetUrl.toString(), { headers: buildHeaders(newToken) });
}
}
} else {
// geminigen.ai rejected token — refresh and retry
if (!bearerToken) {
console.error("[video-proxy] no token available");
return res.status(502).json({ error: "No bearer token available for proxy" });
}
if ((upstream.status === 401 || upstream.status === 403)) {
console.warn(`[video-proxy] refreshing token after ${upstream.status}...`);
const newToken = await refreshAccessToken();
if (newToken) {
upstream = await fetch(targetUrl.toString(), { headers: buildHeaders(newToken) });
}
}
}
}
console.log(`[video-proxy] final status ${upstream.status} for ${targetUrl.hostname}${targetUrl.pathname}`);
if (!upstream.ok && upstream.status !== 206) {
console.error(`[video-proxy] upstream ${upstream.status} for ${targetUrl.pathname}`);
return res.status(upstream.status).json({ error: "Upstream error", status: upstream.status });
}
// Forward response headers
const ct = upstream.headers.get("content-type");
const cl = upstream.headers.get("content-length");
const cr = upstream.headers.get("content-range");
const ac = upstream.headers.get("accept-ranges");
if (ct) res.setHeader("Content-Type", ct);
if (cl) res.setHeader("Content-Length", cl);
if (cr) res.setHeader("Content-Range", cr);
if (ac) res.setHeader("Accept-Ranges", ac);
else res.setHeader("Accept-Ranges", "bytes");
// Cache for 1 hour (videos don't change)
res.setHeader("Cache-Control", "public, max-age=3600");
res.status(upstream.status);
if (!upstream.body) {
return res.end();
}
// Stream the body
const reader = upstream.body.getReader();
const pump = async () => {
const { done, value } = await reader.read();
if (done) { res.end(); return; }
if (!res.write(value)) {
// Backpressure: wait for drain
await new Promise<void>((r) => res.once("drain", r));
}
await pump();
};
req.on("close", () => reader.cancel().catch(() => {}));
await pump();
} catch (err) {
console.error("[video-proxy] fetch error:", err);
if (!res.headersSent) res.status(502).json({ error: "Proxy fetch failed" });
}
});
// ── DELETE /api/videos/:id ────────────────────────────────────────────────────
router.delete("/:id", optionalJwtAuth, async (req, res) => {
const id = Number(req.params.id);
if (isNaN(id)) return res.status(400).json({ error: "INVALID_ID" });
const userId: number | null = (req as any).jwtUserId ?? null;
const rows = await db.select().from(videosTable).where(eq(videosTable.id, id)).limit(1);
if (!rows.length) return res.status(404).json({ error: "NOT_FOUND" });
const video = rows[0];
if (video.userId !== null && video.userId !== userId) {
return res.status(403).json({ error: "FORBIDDEN" });
}
await db.delete(videosTable).where(eq(videosTable.id, id));
return res.json({ success: true });
});
export default router;