|
|
| import os
|
| import re
|
| import json
|
| import base64
|
| import stat
|
| import shutil
|
| import asyncio
|
| import logging
|
| import sys
|
| from typing import List, Optional
|
| from datetime import datetime
|
|
|
| import httpx
|
| import git
|
| from fastapi import FastAPI, HTTPException, Request, Query
|
| from fastapi.responses import JSONResponse, PlainTextResponse
|
| from pydantic import BaseModel, Field
|
| from pydantic_settings import BaseSettings
|
|
|
|
|
| class Settings(BaseSettings):
|
| OPENAI_API_KEY: str = Field("", env="OPENAI_API_KEY")
|
| GITHUB_TOKEN: str = Field("", env="GITHUB_TOKEN")
|
| GITHUB_USERNAME: str = Field("", env="GITHUB_USERNAME")
|
| STUDENT_SECRET: str = Field("", env="STUDENT_SECRET")
|
| LOG_FILE_PATH: str = Field("logs/app.log", env="LOG_FILE_PATH")
|
| MAX_CONCURRENT_TASKS: int = Field(2, env="MAX_CONCURRENT_TASKS")
|
| KEEP_ALIVE_INTERVAL_SECONDS: int = Field(30, env="KEEP_ALIVE_INTERVAL_SECONDS")
|
| GITHUB_API_BASE: str = Field("https://api.github.com", env="GITHUB_API_BASE")
|
| GITHUB_PAGES_BASE: Optional[str] = None
|
|
|
| class Config:
|
| env_file = ".env"
|
| env_file_encoding = "utf-8"
|
|
|
| settings = Settings()
|
| if not settings.GITHUB_PAGES_BASE:
|
| settings.GITHUB_PAGES_BASE = f"https://{settings.GITHUB_USERNAME}.github.io"
|
|
|
|
|
| os.makedirs(os.path.dirname(settings.LOG_FILE_PATH), exist_ok=True)
|
| logger = logging.getLogger("task_receiver")
|
| logger.setLevel(logging.INFO)
|
| console_handler = logging.StreamHandler(sys.stdout)
|
| file_handler = logging.FileHandler(settings.LOG_FILE_PATH, mode="a", encoding="utf-8")
|
| fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
|
| console_handler.setFormatter(fmt)
|
| file_handler.setFormatter(fmt)
|
| logger.handlers = []
|
| logger.addHandler(console_handler)
|
| logger.addHandler(file_handler)
|
| logger.propagate = False
|
|
|
| def flush_logs():
|
| try:
|
| sys.stdout.flush()
|
| sys.stderr.flush()
|
| for h in logger.handlers:
|
| try:
|
| h.flush()
|
| except Exception:
|
| pass
|
| except Exception:
|
| pass
|
|
|
|
|
| class Attachment(BaseModel):
|
| name: str
|
| url: str
|
|
|
| class TaskRequest(BaseModel):
|
| task: str
|
| email: str
|
| round: int
|
| brief: str
|
| evaluation_url: str
|
| nonce: str
|
| secret: str
|
| attachments: List[Attachment] = []
|
|
|
|
|
| app = FastAPI(title="Automated Task Receiver & Processor", description="LLM-driven code generation and deployment")
|
| background_tasks_list: List[asyncio.Task] = []
|
| task_semaphore = asyncio.Semaphore(settings.MAX_CONCURRENT_TASKS)
|
| last_received_task: Optional[dict] = None
|
| OPENAI_API_URL = "https://api.openai.com/v1/chat/completions"
|
|
|
|
|
| def verify_secret(secret_from_request: str) -> bool:
|
| return secret_from_request == settings.STUDENT_SECRET
|
|
|
| def safe_makedirs(path: str):
|
| os.makedirs(path, exist_ok=True)
|
|
|
| def remove_local_path(path: str):
|
| if not os.path.exists(path):
|
| return
|
| def onerror(func, path_arg, exc_info):
|
| try:
|
| os.chmod(path_arg, stat.S_IWUSR)
|
| func(path_arg)
|
| except Exception as exc:
|
| logger.exception(f"Failed in rmtree on {path_arg}: {exc}")
|
| raise
|
| logger.info(f"[CLEANUP] Removing local directory: {path}")
|
| shutil.rmtree(path, onerror=onerror)
|
| flush_logs()
|
|
|
|
|
| def is_image_data_uri(data_uri: str) -> bool:
|
| if not data_uri or not data_uri.startswith("data:"):
|
| return False
|
| return re.search(r"data:image/[^;]+;base64,", data_uri, re.IGNORECASE) is not None
|
|
|
| def data_uri_to_gemini_part(data_uri: str) -> Optional[dict]:
|
| if not data_uri or not data_uri.startswith("data:"):
|
| return None
|
| match = re.search(r"data:(?P<mime_type>[^;]+);base64,(?P<base64_data>.*)", data_uri, re.IGNORECASE)
|
| if not match:
|
| return None
|
| mime_type = match.group('mime_type')
|
| base64_data = match.group('base64_data')
|
| if not mime_type.startswith("image/"):
|
| return None
|
| return {"inlineData": {"data": base64_data, "mimeType": mime_type}}
|
|
|
| async def attachment_to_gemini_part(attachment_url: str) -> Optional[dict]:
|
| if not attachment_url:
|
| return None
|
| if attachment_url.startswith("data:"):
|
| return data_uri_to_gemini_part(attachment_url)
|
| if attachment_url.startswith(("http://", "https://")):
|
| try:
|
| async with httpx.AsyncClient(timeout=15) as client:
|
| resp = await client.get(attachment_url)
|
| resp.raise_for_status()
|
| mime = resp.headers.get("Content-Type", "")
|
| if not mime.startswith("image/"):
|
| logger.info(f"[ATTACHMENT] Skipping non-image MIME: {mime}")
|
| return None
|
| b64 = base64.b64encode(resp.content).decode("utf-8")
|
| return {"inlineData": {"data": b64, "mimeType": mime}}
|
| except Exception as e:
|
| logger.warning(f"[ATTACHMENT] Failed to fetch/encode attachment {attachment_url}: {e}")
|
| return None
|
| return None
|
|
|
|
|
| async def save_generated_files_locally(task_id: str, files: dict) -> str:
|
| base_dir = os.path.join(os.getcwd(), "generated_tasks")
|
| task_dir = os.path.join(base_dir, task_id)
|
| safe_makedirs(task_dir)
|
| logger.info(f"[LOCAL_SAVE] Saving generated files to: {task_dir}")
|
| for filename, content in files.items():
|
| file_path = os.path.join(task_dir, filename)
|
| try:
|
| with open(file_path, "w", encoding="utf-8") as f:
|
| f.write(content)
|
| logger.info(f" -> Saved: {filename} (bytes: {len(content)})")
|
| except Exception as e:
|
| logger.exception(f"Failed to save generated file {filename}: {e}")
|
| raise
|
| flush_logs()
|
| return task_dir
|
|
|
| async def save_attachments_locally(task_dir: str, attachments: List[Attachment]) -> List[str]:
|
| saved_files = []
|
| logger.info(f"[ATTACHMENTS] Processing {len(attachments)} attachments for {task_dir}")
|
| async with httpx.AsyncClient(timeout=30) as client:
|
| for attachment in attachments:
|
| filename = attachment.name
|
| url = attachment.url
|
| file_bytes = None
|
| if not filename or not url:
|
| logger.warning(f"Skipping invalid attachment entry: {filename}")
|
| continue
|
| try:
|
| if url.startswith("data:"):
|
| m = re.search(r"base64,(.*)", url, re.IGNORECASE)
|
| if m:
|
| file_bytes = base64.b64decode(m.group(1))
|
| elif url.startswith(("http://", "https://")):
|
| resp = await client.get(url)
|
| resp.raise_for_status()
|
| file_bytes = resp.content
|
| if file_bytes is None:
|
| logger.warning(f"No content for attachment: {filename}")
|
| continue
|
| file_path = os.path.join(task_dir, filename)
|
| with open(file_path, "wb") as f:
|
| f.write(file_bytes)
|
| logger.info(f" -> Saved Attachment: {filename} (bytes: {len(file_bytes)})")
|
| saved_files.append(filename)
|
| except Exception as e:
|
| logger.exception(f"Failed to save attachment {filename}: {e}")
|
| flush_logs()
|
| return saved_files
|
|
|
|
|
| async def setup_local_repo(local_path: str, repo_name: str, repo_url_auth: str, repo_url_http: str, round_index: int) -> git.Repo:
|
| github_token = settings.GITHUB_TOKEN
|
| headers = {
|
| "Authorization": f"token {github_token}",
|
| "Accept": "application/vnd.github.v3+json",
|
| "X-GitHub-Api-Version": "2022-11-28"
|
| }
|
| async with httpx.AsyncClient(timeout=45) as client:
|
| try:
|
| if round_index == 1:
|
| logger.info(f"[GIT] R1: Creating remote repo '{repo_name}'")
|
| payload = {"name": repo_name, "private": False, "auto_init": True}
|
| resp = await client.post(f"{settings.GITHUB_API_BASE}/user/repos", json=payload, headers=headers)
|
| resp.raise_for_status()
|
| repo = git.Repo.init(local_path)
|
| repo.create_remote('origin', repo_url_auth)
|
| logger.info("[GIT] Local repo initialized")
|
| else:
|
| logger.info(f"[GIT] R{round_index}: Cloning {repo_url_http}")
|
| repo = git.Repo.clone_from(repo_url_auth, local_path)
|
| logger.info("[GIT] Cloned repo")
|
| flush_logs()
|
| return repo
|
| except httpx.HTTPStatusError as e:
|
| logger.exception(f"GitHub API error: {getattr(e.response, 'text', '')}")
|
| raise
|
| except git.GitCommandError as e:
|
| logger.exception(f"Git command error: {e}")
|
| raise
|
|
|
| async def commit_and_publish(repo: git.Repo, task_id: str, round_index: int, repo_name: str) -> dict:
|
| github_username = settings.GITHUB_USERNAME
|
| github_token = settings.GITHUB_TOKEN
|
| headers = {
|
| "Authorization": f"token {github_token}",
|
| "Accept": "application/vnd.github.v3+json",
|
| "X-GitHub-Api-Version": "2022-11-28"
|
| }
|
| repo_url_http = f"https://github.com/{github_username}/{repo_name}"
|
| async with httpx.AsyncClient(timeout=45) as client:
|
| try:
|
| repo.git.add(A=True)
|
| commit_message = f"Task {task_id} - Round {round_index}: automated update"
|
| repo.index.commit(commit_message)
|
| commit_sha = repo.head.object.hexsha
|
| logger.info(f"[GIT] Committed: {commit_sha}")
|
| repo.git.branch('-M', 'main')
|
| repo.git.push('--set-upstream', 'origin', 'main', force=True)
|
| logger.info("[GIT] Pushed to origin/main")
|
|
|
|
|
| pages_api_url = f"{settings.GITHUB_API_BASE}/repos/{github_username}/{repo_name}/pages"
|
| pages_payload = {"source": {"branch": "main", "path": "/"}}
|
| pages_max_retries = 5
|
| pages_base_delay = 3
|
| for attempt in range(pages_max_retries):
|
| try:
|
| pages_response = await client.get(pages_api_url, headers=headers)
|
| is_configured = (pages_response.status_code == 200)
|
| if is_configured:
|
| await client.put(pages_api_url, json=pages_payload, headers=headers)
|
| else:
|
| await client.post(pages_api_url, json=pages_payload, headers=headers)
|
| logger.info("[GIT] Pages configured")
|
| break
|
| except httpx.HTTPStatusError as e:
|
| text = getattr(e.response, "text", "")
|
| if e.response.status_code == 422 and "main branch must exist" in text and attempt < pages_max_retries - 1:
|
| delay = pages_base_delay * (2 ** attempt)
|
| logger.warning(f"[GIT] Pages timing issue, retrying in {delay}s")
|
| await asyncio.sleep(delay)
|
| continue
|
| logger.exception(f"[GIT] Pages configuration failed: {text}")
|
| raise
|
|
|
| await asyncio.sleep(5)
|
| pages_url = f"{settings.GITHUB_PAGES_BASE}/{repo_name}/"
|
| flush_logs()
|
| return {"repo_url": repo_url_http, "commit_sha": commit_sha, "pages_url": pages_url}
|
| except git.GitCommandError as e:
|
| logger.exception("Git operation failed during deployment.")
|
| raise
|
| except httpx.HTTPStatusError as e:
|
| logger.exception("GitHub API error during deployment.")
|
| raise
|
|
|
|
|
| async def call_openai_api(user_prompt: str, system_prompt: str, response_format: dict, max_retries: int = 3, timeout: int = 60) -> dict:
|
| payload = {
|
| "model": "gpt-4o-mini",
|
| "messages": [
|
| {"role": "system", "content": system_prompt},
|
| {"role": "user", "content": user_prompt}
|
| ],
|
| "response_format": {
|
| "type": "json_schema",
|
| "json_schema": {
|
| "name": "response",
|
| "strict": True,
|
| "schema": response_format
|
| }
|
| }
|
| }
|
| base_delay = 1
|
| for attempt in range(max_retries):
|
| try:
|
| if not settings.OPENAI_API_KEY:
|
| raise Exception("OPENAI_API_KEY not configured.")
|
| headers = {
|
| "Authorization": f"Bearer {settings.OPENAI_API_KEY}",
|
| "Content-Type": "application/json"
|
| }
|
| async with httpx.AsyncClient(timeout=timeout) as client:
|
| resp = await client.post(OPENAI_API_URL, json=payload, headers=headers)
|
| resp.raise_for_status()
|
| result = resp.json()
|
| content = result.get("choices", [{}])[0].get("message", {}).get("content", "")
|
| if not content:
|
| raise ValueError("No content in LLM response")
|
| return json.loads(content)
|
| except httpx.HTTPStatusError as e:
|
| logger.warning(f"[OPENAI] HTTP error attempt {attempt+1}: {e}")
|
| logger.warning(f"[OPENAI] Response: {e.response.text if hasattr(e, 'response') else 'N/A'}")
|
| except (httpx.RequestError, KeyError, json.JSONDecodeError, ValueError) as e:
|
| logger.warning(f"[OPENAI] Processing error attempt {attempt+1}: {e}")
|
| if attempt < max_retries - 1:
|
| delay = base_delay * (2 ** attempt)
|
| logger.info(f"[OPENAI] Retrying in {delay}s...")
|
| await asyncio.sleep(delay)
|
| raise Exception("LLM generation failed after retries")
|
|
|
|
|
| async def call_llm_round2_surgical_update(task_id: str, brief: str, existing_index_html: str) -> dict:
|
| system_prompt = (
|
| "You are an expert full-stack engineer tasked with making SURGICAL and MINIMAL changes. "
|
| "Your MOST CRITICAL instruction is to preserve the existing application's core logic and structure. "
|
| "Only apply the specific changes requested in the 'New Brief'. "
|
| "Return a JSON object with 'index.html', 'README.md', and 'LICENSE'. "
|
| "If README.md / LICENSE exist, copy them verbatim unless a change is strictly required."
|
| )
|
| response_schema = {
|
| "type": "object",
|
| "properties": {
|
| "index_html": {"type": "string"},
|
| "readme": {"type": "string"},
|
| "license": {"type": "string"},
|
| },
|
| "required": ["index_html", "readme", "license"],
|
| "additionalProperties": False
|
| }
|
| prompt = (
|
| f"UPDATE INSTRUCTION (SAFE MODE):\n\n"
|
| f"New Brief: {brief}\n\n"
|
| f"--- EXISTING index.html START ---\n{existing_index_html}\n--- EXISTING index.html END ---\n\n"
|
| "Only make the minimal changes necessary to implement the brief. Do NOT remove or break core scripts, event handlers, or layout. "
|
| "Return FULL JSON with 'index_html', 'readme', 'license'. If you make no changes to README/LICENSE, copy their existing contents."
|
| )
|
|
|
| try:
|
| result = await call_openai_api(user_prompt=prompt, system_prompt=system_prompt, response_format=response_schema, max_retries=4, timeout=90)
|
|
|
| result = {
|
| "index.html": result.get("index_html", ""),
|
| "README.md": result.get("readme", ""),
|
| "LICENSE": result.get("license", "")
|
| }
|
| except Exception as e:
|
| logger.exception(f"[ROUND2] LLM call failed: {e}")
|
|
|
| return {"index.html": existing_index_html or "<!-- original index.html preserved due to LLM failure -->",
|
| "README.md": "", "LICENSE": ""}
|
|
|
|
|
| new_html = (result.get("index.html") or "").strip()
|
| if not new_html:
|
| logger.warning("[SAFE] LLM returned empty index.html — reverting to existing.")
|
| result["index.html"] = existing_index_html
|
| else:
|
|
|
| try:
|
| orig_len = len(existing_index_html or "")
|
| new_len = len(new_html)
|
| if orig_len > 0 and new_len < max(200, int(orig_len * 0.3)):
|
| logger.warning("[SAFE] LLM index.html appears destructive (too small). Reverting to existing.")
|
| result["index.html"] = existing_index_html
|
| except Exception:
|
|
|
| result["index.html"] = existing_index_html
|
|
|
|
|
| result["README.md"] = result.get("README.md") or ""
|
| result["LICENSE"] = result.get("LICENSE") or ""
|
| return result
|
|
|
|
|
| async def notify_evaluation_server(evaluation_url: str, email: str, task_id: str, round_index: int, nonce: str, repo_url: str, commit_sha: str, pages_url: str) -> bool:
|
| payload = {
|
| "email": email,
|
| "task": task_id,
|
| "round": round_index,
|
| "nonce": nonce,
|
| "repo_url": repo_url,
|
| "commit_sha": commit_sha,
|
| "pages_url": pages_url
|
| }
|
| max_retries = 3
|
| base_delay = 1
|
| logger.info(f"[NOTIFY] Notifying evaluation server at {evaluation_url}")
|
| for attempt in range(max_retries):
|
| try:
|
| async with httpx.AsyncClient(timeout=10) as client:
|
| resp = await client.post(evaluation_url, json=payload)
|
| resp.raise_for_status()
|
| logger.info(f"[NOTIFY] Notification succeeded: {resp.status_code}")
|
| flush_logs()
|
| return True
|
| except httpx.HTTPStatusError as e:
|
| logger.warning(f"[NOTIFY] HTTP error attempt {attempt+1}: {e}")
|
| except httpx.RequestError as e:
|
| logger.warning(f"[NOTIFY] Request error attempt {attempt+1}: {e}")
|
| if attempt < max_retries - 1:
|
| await asyncio.sleep(base_delay * (2 ** attempt))
|
| logger.error("[NOTIFY] Failed to notify evaluation server after retries.")
|
| flush_logs()
|
| return False
|
|
|
|
|
| async def generate_files_and_deploy(task_data: TaskRequest):
|
| acquired = False
|
| try:
|
| await task_semaphore.acquire()
|
| acquired = True
|
| logger.info(f"[PROCESS START] Task: {task_data.task} Round: {task_data.round}")
|
| flush_logs()
|
|
|
| task_id = task_data.task
|
| email = task_data.email
|
| round_index = task_data.round
|
| brief = task_data.brief
|
| evaluation_url = task_data.evaluation_url
|
| nonce = task_data.nonce
|
| attachments = task_data.attachments or []
|
|
|
| repo_name = task_id.replace(" ", "-").lower()
|
| github_username = settings.GITHUB_USERNAME
|
| github_token = settings.GITHUB_TOKEN
|
| repo_url_auth = f"https://{github_username}:{github_token}@github.com/{github_username}/{repo_name}.git"
|
| repo_url_http = f"https://github.com/{github_username}/{repo_name}"
|
|
|
| base_dir = os.path.join(os.getcwd(), "generated_tasks")
|
| local_path = os.path.join(base_dir, task_id)
|
|
|
|
|
| if os.path.exists(local_path):
|
| try:
|
| remove_local_path(local_path)
|
| except Exception as e:
|
| logger.exception(f"Cleanup failed for {local_path}: {e}")
|
| raise
|
| safe_makedirs(local_path)
|
|
|
|
|
| repo = await setup_local_repo(local_path, repo_name, repo_url_auth, repo_url_http, round_index)
|
|
|
|
|
| image_parts = []
|
| for attachment in attachments:
|
| part = await attachment_to_gemini_part(attachment.url)
|
| if part:
|
| image_parts.append(part)
|
|
|
|
|
| attachment_descriptions = ""
|
| if attachments:
|
| attachment_descriptions = "\nThe following attachments are provided (saved in the same folder):\n"
|
| for att in attachments:
|
| attachment_descriptions += f"- {att.name}\n"
|
| attachment_descriptions += (
|
| "Use these exact file names in your HTML (for example: "
|
| "<img src='sample.png'>). Do NOT rename or use external links.\n"
|
| )
|
|
|
|
|
| if round_index == 1:
|
| logger.info("[WORKFLOW] Round 1: full generation")
|
|
|
|
|
| enriched_brief = f"{brief}\n\n{attachment_descriptions}".strip()
|
|
|
| system_prompt = (
|
| "You are an expert full-stack engineer. Produce a JSON object with keys 'index_html', 'readme', and 'license'. "
|
| "index_html must be a single-file responsive HTML app using Tailwind CSS. "
|
| "If image attachments are mentioned below, reference them using <img src='filename'> exactly as provided. "
|
| "readme should be professional with project description, setup, usage, and license info. "
|
| "license should contain the full MIT license text."
|
| )
|
|
|
| response_schema = {
|
| "type": "object",
|
| "properties": {
|
| "index_html": {"type": "string"},
|
| "readme": {"type": "string"},
|
| "license": {"type": "string"},
|
| },
|
| "required": ["index_html", "readme", "license"],
|
| "additionalProperties": False
|
| }
|
|
|
|
|
|
|
| generated_raw = await call_openai_api(
|
| user_prompt=enriched_brief,
|
| system_prompt=system_prompt,
|
| response_format=response_schema,
|
| max_retries=4,
|
| timeout=120,
|
| )
|
|
|
|
|
| generated = {
|
| "index.html": generated_raw.get("index_html", ""),
|
| "README.md": generated_raw.get("readme", ""),
|
| "LICENSE": generated_raw.get("license", "")
|
| }
|
|
|
|
|
| else:
|
| logger.info("[WORKFLOW] Round 2+: surgical update (Base.py style). Loading existing index.html only.")
|
| existing_index_html = ""
|
| idx_path = os.path.join(local_path, "index.html")
|
| if os.path.exists(idx_path):
|
| try:
|
| with open(idx_path, "r", encoding="utf-8") as f:
|
| existing_index_html = f.read()
|
| logger.info("[WORKFLOW] Read existing index.html for context.")
|
| except Exception as e:
|
| logger.warning(f"[WORKFLOW] Could not read existing index.html: {e}")
|
| existing_index_html = ""
|
|
|
|
|
| brief_with_attachments = f"{brief}\n\n{attachment_descriptions}".strip()
|
| generated = await call_llm_round2_surgical_update(
|
| task_id=task_id, brief=brief_with_attachments, existing_index_html=existing_index_html
|
| )
|
|
|
|
|
| readme_path = os.path.join(local_path, "README.md")
|
| license_path = os.path.join(local_path, "LICENSE")
|
| if not generated.get("README.md") and os.path.exists(readme_path):
|
| with open(readme_path, "r", encoding="utf-8") as f:
|
| generated["README.md"] = f.read()
|
| if not generated.get("LICENSE") and os.path.exists(license_path):
|
| with open(license_path, "r", encoding="utf-8") as f:
|
| generated["LICENSE"] = f.read()
|
|
|
|
|
| await save_generated_files_locally(task_id, generated)
|
|
|
|
|
| await save_attachments_locally(os.path.join(base_dir, task_id), attachments)
|
|
|
|
|
| deployment_info = await commit_and_publish(repo, task_id, round_index, repo_name)
|
|
|
|
|
| await notify_evaluation_server(
|
| evaluation_url=evaluation_url,
|
| email=email,
|
| task_id=task_id,
|
| round_index=round_index,
|
| nonce=nonce,
|
| repo_url=deployment_info["repo_url"],
|
| commit_sha=deployment_info["commit_sha"],
|
| pages_url=deployment_info["pages_url"],
|
| )
|
|
|
| logger.info(f"[DEPLOYMENT] Success. Repo: {deployment_info['repo_url']} Pages: {deployment_info['pages_url']}")
|
|
|
| except Exception as exc:
|
| logger.exception(f"[CRITICAL FAILURE] Task {getattr(task_data, 'task', 'unknown')} failed: {exc}")
|
| finally:
|
| if acquired:
|
| task_semaphore.release()
|
| flush_logs()
|
| logger.info(
|
| f"[PROCESS END] Task: {getattr(task_data, 'task', 'unknown')} Round: {getattr(task_data, 'round', 'unknown')}"
|
| )
|
|
|
|
|
|
|
| def _task_done_callback(task: asyncio.Task):
|
| try:
|
| exc = task.exception()
|
| if exc:
|
| logger.error(f"[BACKGROUND TASK] Task finished with exception: {exc}")
|
| logger.exception(exc)
|
| else:
|
| logger.info("[BACKGROUND TASK] Task finished successfully.")
|
| except asyncio.CancelledError:
|
| logger.warning("[BACKGROUND TASK] Task was cancelled.")
|
| finally:
|
| flush_logs()
|
|
|
| @app.post("/ready", status_code=200)
|
| async def receive_task(task_data: TaskRequest, request: Request):
|
| global last_received_task, background_tasks_list
|
| if not verify_secret(task_data.secret):
|
| logger.warning(f"Unauthorized attempt for task {task_data.task} from {request.client.host if request.client else 'unknown'}")
|
| raise HTTPException(status_code=401, detail="Unauthorized: Secret mismatch")
|
|
|
| last_received_task = {
|
| "task": task_data.task,
|
| "email": task_data.email,
|
| "round": task_data.round,
|
| "brief": (task_data.brief[:250] + "...") if len(task_data.brief) > 250 else task_data.brief,
|
| "time": datetime.utcnow().isoformat() + "Z"
|
| }
|
|
|
| bg_task = asyncio.create_task(generate_files_and_deploy(task_data))
|
| bg_task.add_done_callback(_task_done_callback)
|
| background_tasks_list.append(bg_task)
|
|
|
| logger.info(f"Received task {task_data.task}. Background processing started.")
|
| flush_logs()
|
|
|
| return JSONResponse(status_code=200, content={"status": "ready", "message": f"Task {task_data.task} received and processing started."})
|
|
|
| @app.get("/")
|
| async def root():
|
| return {"message": "Task Receiver Service running. POST /ready to submit."}
|
|
|
| @app.get("/status")
|
| async def get_status():
|
| global last_received_task, background_tasks_list
|
| if last_received_task:
|
| background_tasks_list[:] = [t for t in background_tasks_list if not t.done()]
|
| return {"last_received_task": last_received_task, "running_background_tasks": len(background_tasks_list)}
|
| return {"message": "Awaiting first task submission to /ready"}
|
|
|
| @app.get("/health")
|
| async def health():
|
| return {"status": "ok", "timestamp": datetime.utcnow().isoformat() + "Z"}
|
|
|
| @app.get("/logs")
|
| async def get_logs(lines: int = Query(200, ge=1, le=5000)):
|
| path = settings.LOG_FILE_PATH
|
| if not os.path.exists(path):
|
| return PlainTextResponse("Log file not found.", status_code=404)
|
| try:
|
| with open(path, "rb") as f:
|
| f.seek(0, os.SEEK_END)
|
| file_size = f.tell()
|
| buffer = bytearray()
|
| block_size = 1024
|
| blocks = 0
|
|
|
| while file_size > 0 and len(buffer) < lines * 2000 and blocks < 1024:
|
| read_size = min(block_size, file_size)
|
| f.seek(file_size - read_size)
|
| buffer.extend(f.read(read_size))
|
| file_size -= read_size
|
| blocks += 1
|
| text = buffer.decode(errors="ignore").splitlines()
|
| last_lines = "\n".join(text[-lines:])
|
| return PlainTextResponse(last_lines)
|
| except Exception as e:
|
| logger.exception(f"Error reading log file: {e}")
|
| return PlainTextResponse(f"Error reading log file: {e}", status_code=500)
|
|
|
|
|
| @app.on_event("startup")
|
| async def startup_event():
|
| async def keep_alive():
|
| while True:
|
| try:
|
| logger.info("[KEEPALIVE] Service heartbeat")
|
| flush_logs()
|
| except Exception:
|
| pass
|
| await asyncio.sleep(settings.KEEP_ALIVE_INTERVAL_SECONDS)
|
| asyncio.create_task(keep_alive())
|
|
|
| @app.on_event("shutdown")
|
| async def shutdown_event():
|
| logger.info("[SHUTDOWN] Waiting for background tasks to finish (graceful shutdown)...")
|
| for t in background_tasks_list:
|
| if not t.done():
|
| try:
|
| t.cancel()
|
| except Exception:
|
| pass
|
| await asyncio.sleep(0.5)
|
| flush_logs()
|
|
|