project1 / main.py
tvesha's picture
Upload 3 files
5d05538 verified
# main.py
import os
import re
import json
import base64
import stat
import shutil
import asyncio
import logging
import sys
from typing import List, Optional
from datetime import datetime
import httpx
import git
from fastapi import FastAPI, HTTPException, Request, Query
from fastapi.responses import JSONResponse, PlainTextResponse
from pydantic import BaseModel, Field
from pydantic_settings import BaseSettings
# ------------------------- Settings -------------------------
class Settings(BaseSettings):
OPENAI_API_KEY: str = Field("", env="OPENAI_API_KEY")
GITHUB_TOKEN: str = Field("", env="GITHUB_TOKEN")
GITHUB_USERNAME: str = Field("", env="GITHUB_USERNAME")
STUDENT_SECRET: str = Field("", env="STUDENT_SECRET")
LOG_FILE_PATH: str = Field("logs/app.log", env="LOG_FILE_PATH")
MAX_CONCURRENT_TASKS: int = Field(2, env="MAX_CONCURRENT_TASKS")
KEEP_ALIVE_INTERVAL_SECONDS: int = Field(30, env="KEEP_ALIVE_INTERVAL_SECONDS")
GITHUB_API_BASE: str = Field("https://api.github.com", env="GITHUB_API_BASE")
GITHUB_PAGES_BASE: Optional[str] = None
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
settings = Settings()
if not settings.GITHUB_PAGES_BASE:
settings.GITHUB_PAGES_BASE = f"https://{settings.GITHUB_USERNAME}.github.io"
# ------------------------- Logging -------------------------
os.makedirs(os.path.dirname(settings.LOG_FILE_PATH), exist_ok=True)
logger = logging.getLogger("task_receiver")
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler(sys.stdout)
file_handler = logging.FileHandler(settings.LOG_FILE_PATH, mode="a", encoding="utf-8")
fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
console_handler.setFormatter(fmt)
file_handler.setFormatter(fmt)
logger.handlers = []
logger.addHandler(console_handler)
logger.addHandler(file_handler)
logger.propagate = False
def flush_logs():
try:
sys.stdout.flush()
sys.stderr.flush()
for h in logger.handlers:
try:
h.flush()
except Exception:
pass
except Exception:
pass
# ------------------------- Models -------------------------
class Attachment(BaseModel):
name: str
url: str # data URI or http(s) url
class TaskRequest(BaseModel):
task: str
email: str
round: int
brief: str
evaluation_url: str
nonce: str
secret: str
attachments: List[Attachment] = []
# ------------------------- App & Globals -------------------------
app = FastAPI(title="Automated Task Receiver & Processor", description="LLM-driven code generation and deployment")
background_tasks_list: List[asyncio.Task] = []
task_semaphore = asyncio.Semaphore(settings.MAX_CONCURRENT_TASKS)
last_received_task: Optional[dict] = None
OPENAI_API_URL = "https://api.openai.com/v1/chat/completions"
# ------------------------- Utility -------------------------
def verify_secret(secret_from_request: str) -> bool:
return secret_from_request == settings.STUDENT_SECRET
def safe_makedirs(path: str):
os.makedirs(path, exist_ok=True)
def remove_local_path(path: str):
if not os.path.exists(path):
return
def onerror(func, path_arg, exc_info):
try:
os.chmod(path_arg, stat.S_IWUSR)
func(path_arg)
except Exception as exc:
logger.exception(f"Failed in rmtree on {path_arg}: {exc}")
raise
logger.info(f"[CLEANUP] Removing local directory: {path}")
shutil.rmtree(path, onerror=onerror)
flush_logs()
# ------------------------- Attachment helpers -------------------------
def is_image_data_uri(data_uri: str) -> bool:
if not data_uri or not data_uri.startswith("data:"):
return False
return re.search(r"data:image/[^;]+;base64,", data_uri, re.IGNORECASE) is not None
def data_uri_to_gemini_part(data_uri: str) -> Optional[dict]:
if not data_uri or not data_uri.startswith("data:"):
return None
match = re.search(r"data:(?P<mime_type>[^;]+);base64,(?P<base64_data>.*)", data_uri, re.IGNORECASE)
if not match:
return None
mime_type = match.group('mime_type')
base64_data = match.group('base64_data')
if not mime_type.startswith("image/"):
return None
return {"inlineData": {"data": base64_data, "mimeType": mime_type}}
async def attachment_to_gemini_part(attachment_url: str) -> Optional[dict]:
if not attachment_url:
return None
if attachment_url.startswith("data:"):
return data_uri_to_gemini_part(attachment_url)
if attachment_url.startswith(("http://", "https://")):
try:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(attachment_url)
resp.raise_for_status()
mime = resp.headers.get("Content-Type", "")
if not mime.startswith("image/"):
logger.info(f"[ATTACHMENT] Skipping non-image MIME: {mime}")
return None
b64 = base64.b64encode(resp.content).decode("utf-8")
return {"inlineData": {"data": b64, "mimeType": mime}}
except Exception as e:
logger.warning(f"[ATTACHMENT] Failed to fetch/encode attachment {attachment_url}: {e}")
return None
return None
# ------------------------- Filesystem Save Helpers -------------------------
async def save_generated_files_locally(task_id: str, files: dict) -> str:
base_dir = os.path.join(os.getcwd(), "generated_tasks")
task_dir = os.path.join(base_dir, task_id)
safe_makedirs(task_dir)
logger.info(f"[LOCAL_SAVE] Saving generated files to: {task_dir}")
for filename, content in files.items():
file_path = os.path.join(task_dir, filename)
try:
with open(file_path, "w", encoding="utf-8") as f:
f.write(content)
logger.info(f" -> Saved: {filename} (bytes: {len(content)})")
except Exception as e:
logger.exception(f"Failed to save generated file {filename}: {e}")
raise
flush_logs()
return task_dir
async def save_attachments_locally(task_dir: str, attachments: List[Attachment]) -> List[str]:
saved_files = []
logger.info(f"[ATTACHMENTS] Processing {len(attachments)} attachments for {task_dir}")
async with httpx.AsyncClient(timeout=30) as client:
for attachment in attachments:
filename = attachment.name
url = attachment.url
file_bytes = None
if not filename or not url:
logger.warning(f"Skipping invalid attachment entry: {filename}")
continue
try:
if url.startswith("data:"):
m = re.search(r"base64,(.*)", url, re.IGNORECASE)
if m:
file_bytes = base64.b64decode(m.group(1))
elif url.startswith(("http://", "https://")):
resp = await client.get(url)
resp.raise_for_status()
file_bytes = resp.content
if file_bytes is None:
logger.warning(f"No content for attachment: {filename}")
continue
file_path = os.path.join(task_dir, filename)
with open(file_path, "wb") as f:
f.write(file_bytes)
logger.info(f" -> Saved Attachment: {filename} (bytes: {len(file_bytes)})")
saved_files.append(filename)
except Exception as e:
logger.exception(f"Failed to save attachment {filename}: {e}")
flush_logs()
return saved_files
# ------------------------- GitHub helpers -------------------------
async def setup_local_repo(local_path: str, repo_name: str, repo_url_auth: str, repo_url_http: str, round_index: int) -> git.Repo:
github_token = settings.GITHUB_TOKEN
headers = {
"Authorization": f"token {github_token}",
"Accept": "application/vnd.github.v3+json",
"X-GitHub-Api-Version": "2022-11-28"
}
async with httpx.AsyncClient(timeout=45) as client:
try:
if round_index == 1:
logger.info(f"[GIT] R1: Creating remote repo '{repo_name}'")
payload = {"name": repo_name, "private": False, "auto_init": True}
resp = await client.post(f"{settings.GITHUB_API_BASE}/user/repos", json=payload, headers=headers)
resp.raise_for_status()
repo = git.Repo.init(local_path)
repo.create_remote('origin', repo_url_auth)
logger.info("[GIT] Local repo initialized")
else:
logger.info(f"[GIT] R{round_index}: Cloning {repo_url_http}")
repo = git.Repo.clone_from(repo_url_auth, local_path)
logger.info("[GIT] Cloned repo")
flush_logs()
return repo
except httpx.HTTPStatusError as e:
logger.exception(f"GitHub API error: {getattr(e.response, 'text', '')}")
raise
except git.GitCommandError as e:
logger.exception(f"Git command error: {e}")
raise
async def commit_and_publish(repo: git.Repo, task_id: str, round_index: int, repo_name: str) -> dict:
github_username = settings.GITHUB_USERNAME
github_token = settings.GITHUB_TOKEN
headers = {
"Authorization": f"token {github_token}",
"Accept": "application/vnd.github.v3+json",
"X-GitHub-Api-Version": "2022-11-28"
}
repo_url_http = f"https://github.com/{github_username}/{repo_name}"
async with httpx.AsyncClient(timeout=45) as client:
try:
repo.git.add(A=True)
commit_message = f"Task {task_id} - Round {round_index}: automated update"
repo.index.commit(commit_message)
commit_sha = repo.head.object.hexsha
logger.info(f"[GIT] Committed: {commit_sha}")
repo.git.branch('-M', 'main')
repo.git.push('--set-upstream', 'origin', 'main', force=True)
logger.info("[GIT] Pushed to origin/main")
# Configure GitHub Pages with retries
pages_api_url = f"{settings.GITHUB_API_BASE}/repos/{github_username}/{repo_name}/pages"
pages_payload = {"source": {"branch": "main", "path": "/"}}
pages_max_retries = 5
pages_base_delay = 3
for attempt in range(pages_max_retries):
try:
pages_response = await client.get(pages_api_url, headers=headers)
is_configured = (pages_response.status_code == 200)
if is_configured:
await client.put(pages_api_url, json=pages_payload, headers=headers)
else:
await client.post(pages_api_url, json=pages_payload, headers=headers)
logger.info("[GIT] Pages configured")
break
except httpx.HTTPStatusError as e:
text = getattr(e.response, "text", "")
if e.response.status_code == 422 and "main branch must exist" in text and attempt < pages_max_retries - 1:
delay = pages_base_delay * (2 ** attempt)
logger.warning(f"[GIT] Pages timing issue, retrying in {delay}s")
await asyncio.sleep(delay)
continue
logger.exception(f"[GIT] Pages configuration failed: {text}")
raise
await asyncio.sleep(5) # allow pages to deploy
pages_url = f"{settings.GITHUB_PAGES_BASE}/{repo_name}/"
flush_logs()
return {"repo_url": repo_url_http, "commit_sha": commit_sha, "pages_url": pages_url}
except git.GitCommandError as e:
logger.exception("Git operation failed during deployment.")
raise
except httpx.HTTPStatusError as e:
logger.exception("GitHub API error during deployment.")
raise
# ------------------------- OpenAI / LLM helpers -------------------------
async def call_openai_api(user_prompt: str, system_prompt: str, response_format: dict, max_retries: int = 3, timeout: int = 60) -> dict:
payload = {
"model": "gpt-4o-mini",
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
"response_format": {
"type": "json_schema",
"json_schema": {
"name": "response",
"strict": True,
"schema": response_format
}
}
}
base_delay = 1
for attempt in range(max_retries):
try:
if not settings.OPENAI_API_KEY:
raise Exception("OPENAI_API_KEY not configured.")
headers = {
"Authorization": f"Bearer {settings.OPENAI_API_KEY}",
"Content-Type": "application/json"
}
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(OPENAI_API_URL, json=payload, headers=headers)
resp.raise_for_status()
result = resp.json()
content = result.get("choices", [{}])[0].get("message", {}).get("content", "")
if not content:
raise ValueError("No content in LLM response")
return json.loads(content)
except httpx.HTTPStatusError as e:
logger.warning(f"[OPENAI] HTTP error attempt {attempt+1}: {e}")
logger.warning(f"[OPENAI] Response: {e.response.text if hasattr(e, 'response') else 'N/A'}")
except (httpx.RequestError, KeyError, json.JSONDecodeError, ValueError) as e:
logger.warning(f"[OPENAI] Processing error attempt {attempt+1}: {e}")
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
logger.info(f"[OPENAI] Retrying in {delay}s...")
await asyncio.sleep(delay)
raise Exception("LLM generation failed after retries")
# ------------------------- Round 2 surgical update (Base.py style) -------------------------
async def call_llm_round2_surgical_update(task_id: str, brief: str, existing_index_html: str) -> dict:
system_prompt = (
"You are an expert full-stack engineer tasked with making SURGICAL and MINIMAL changes. "
"Your MOST CRITICAL instruction is to preserve the existing application's core logic and structure. "
"Only apply the specific changes requested in the 'New Brief'. "
"Return a JSON object with 'index.html', 'README.md', and 'LICENSE'. "
"If README.md / LICENSE exist, copy them verbatim unless a change is strictly required."
)
response_schema = {
"type": "object",
"properties": {
"index_html": {"type": "string"},
"readme": {"type": "string"},
"license": {"type": "string"},
},
"required": ["index_html", "readme", "license"],
"additionalProperties": False
}
prompt = (
f"UPDATE INSTRUCTION (SAFE MODE):\n\n"
f"New Brief: {brief}\n\n"
f"--- EXISTING index.html START ---\n{existing_index_html}\n--- EXISTING index.html END ---\n\n"
"Only make the minimal changes necessary to implement the brief. Do NOT remove or break core scripts, event handlers, or layout. "
"Return FULL JSON with 'index_html', 'readme', 'license'. If you make no changes to README/LICENSE, copy their existing contents."
)
try:
result = await call_openai_api(user_prompt=prompt, system_prompt=system_prompt, response_format=response_schema, max_retries=4, timeout=90)
# Convert keys back to expected format
result = {
"index.html": result.get("index_html", ""),
"README.md": result.get("readme", ""),
"LICENSE": result.get("license", "")
}
except Exception as e:
logger.exception(f"[ROUND2] LLM call failed: {e}")
# Fallback: return existing index.html and placeholders for readme/license
return {"index.html": existing_index_html or "<!-- original index.html preserved due to LLM failure -->",
"README.md": "", "LICENSE": ""}
# Safety checks (Safe Mode)
new_html = (result.get("index.html") or "").strip()
if not new_html:
logger.warning("[SAFE] LLM returned empty index.html — reverting to existing.")
result["index.html"] = existing_index_html
else:
# If LLM output is grossly shorter than original (possible destructive rewrite), reject it.
try:
orig_len = len(existing_index_html or "")
new_len = len(new_html)
if orig_len > 0 and new_len < max(200, int(orig_len * 0.3)): # threshold: not less than 30% (and at least 200 chars)
logger.warning("[SAFE] LLM index.html appears destructive (too small). Reverting to existing.")
result["index.html"] = existing_index_html
except Exception:
# if anything goes wrong in safety checks, revert
result["index.html"] = existing_index_html
# Ensure README and LICENSE exist (if LLM didn't return them)
result["README.md"] = result.get("README.md") or ""
result["LICENSE"] = result.get("LICENSE") or ""
return result
# ------------------------- Notifier -------------------------
async def notify_evaluation_server(evaluation_url: str, email: str, task_id: str, round_index: int, nonce: str, repo_url: str, commit_sha: str, pages_url: str) -> bool:
payload = {
"email": email,
"task": task_id,
"round": round_index,
"nonce": nonce,
"repo_url": repo_url,
"commit_sha": commit_sha,
"pages_url": pages_url
}
max_retries = 3
base_delay = 1
logger.info(f"[NOTIFY] Notifying evaluation server at {evaluation_url}")
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(evaluation_url, json=payload)
resp.raise_for_status()
logger.info(f"[NOTIFY] Notification succeeded: {resp.status_code}")
flush_logs()
return True
except httpx.HTTPStatusError as e:
logger.warning(f"[NOTIFY] HTTP error attempt {attempt+1}: {e}")
except httpx.RequestError as e:
logger.warning(f"[NOTIFY] Request error attempt {attempt+1}: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(base_delay * (2 ** attempt))
logger.error("[NOTIFY] Failed to notify evaluation server after retries.")
flush_logs()
return False
# ------------------------- Main orchestration -------------------------
async def generate_files_and_deploy(task_data: TaskRequest):
acquired = False
try:
await task_semaphore.acquire()
acquired = True
logger.info(f"[PROCESS START] Task: {task_data.task} Round: {task_data.round}")
flush_logs()
task_id = task_data.task
email = task_data.email
round_index = task_data.round
brief = task_data.brief
evaluation_url = task_data.evaluation_url
nonce = task_data.nonce
attachments = task_data.attachments or []
repo_name = task_id.replace(" ", "-").lower()
github_username = settings.GITHUB_USERNAME
github_token = settings.GITHUB_TOKEN
repo_url_auth = f"https://{github_username}:{github_token}@github.com/{github_username}/{repo_name}.git"
repo_url_http = f"https://github.com/{github_username}/{repo_name}"
base_dir = os.path.join(os.getcwd(), "generated_tasks")
local_path = os.path.join(base_dir, task_id)
# Cleanup local path
if os.path.exists(local_path):
try:
remove_local_path(local_path)
except Exception as e:
logger.exception(f"Cleanup failed for {local_path}: {e}")
raise
safe_makedirs(local_path)
# Setup repo (init or clone)
repo = await setup_local_repo(local_path, repo_name, repo_url_auth, repo_url_http, round_index)
# --- Prepare attachment data for LLM ---
image_parts = []
for attachment in attachments:
part = await attachment_to_gemini_part(attachment.url)
if part:
image_parts.append(part)
# Build explicit file reference list for LLM
attachment_descriptions = ""
if attachments:
attachment_descriptions = "\nThe following attachments are provided (saved in the same folder):\n"
for att in attachments:
attachment_descriptions += f"- {att.name}\n"
attachment_descriptions += (
"Use these exact file names in your HTML (for example: "
"<img src='sample.png'>). Do NOT rename or use external links.\n"
)
# --- Round 1: Full generation ---
if round_index == 1:
logger.info("[WORKFLOW] Round 1: full generation")
# Add filenames info directly to the LLM prompt
enriched_brief = f"{brief}\n\n{attachment_descriptions}".strip()
system_prompt = (
"You are an expert full-stack engineer. Produce a JSON object with keys 'index_html', 'readme', and 'license'. "
"index_html must be a single-file responsive HTML app using Tailwind CSS. "
"If image attachments are mentioned below, reference them using <img src='filename'> exactly as provided. "
"readme should be professional with project description, setup, usage, and license info. "
"license should contain the full MIT license text."
)
response_schema = {
"type": "object",
"properties": {
"index_html": {"type": "string"},
"readme": {"type": "string"},
"license": {"type": "string"},
},
"required": ["index_html", "readme", "license"],
"additionalProperties": False
}
# For now, ignore image_parts (OpenAI vision would require different handling)
# Just use the text brief with attachment descriptions
generated_raw = await call_openai_api(
user_prompt=enriched_brief,
system_prompt=system_prompt,
response_format=response_schema,
max_retries=4,
timeout=120,
)
# Convert keys to expected format
generated = {
"index.html": generated_raw.get("index_html", ""),
"README.md": generated_raw.get("readme", ""),
"LICENSE": generated_raw.get("license", "")
}
# --- Round 2+: Surgical Update ---
else:
logger.info("[WORKFLOW] Round 2+: surgical update (Base.py style). Loading existing index.html only.")
existing_index_html = ""
idx_path = os.path.join(local_path, "index.html")
if os.path.exists(idx_path):
try:
with open(idx_path, "r", encoding="utf-8") as f:
existing_index_html = f.read()
logger.info("[WORKFLOW] Read existing index.html for context.")
except Exception as e:
logger.warning(f"[WORKFLOW] Could not read existing index.html: {e}")
existing_index_html = ""
# Add attachments info to round 2 prompt as well
brief_with_attachments = f"{brief}\n\n{attachment_descriptions}".strip()
generated = await call_llm_round2_surgical_update(
task_id=task_id, brief=brief_with_attachments, existing_index_html=existing_index_html
)
# Preserve README/LICENSE if LLM didn’t return them
readme_path = os.path.join(local_path, "README.md")
license_path = os.path.join(local_path, "LICENSE")
if not generated.get("README.md") and os.path.exists(readme_path):
with open(readme_path, "r", encoding="utf-8") as f:
generated["README.md"] = f.read()
if not generated.get("LICENSE") and os.path.exists(license_path):
with open(license_path, "r", encoding="utf-8") as f:
generated["LICENSE"] = f.read()
# Save generated files locally
await save_generated_files_locally(task_id, generated)
# Save attachments into repo folder
await save_attachments_locally(os.path.join(base_dir, task_id), attachments)
# Commit and publish
deployment_info = await commit_and_publish(repo, task_id, round_index, repo_name)
# Notify evaluation server
await notify_evaluation_server(
evaluation_url=evaluation_url,
email=email,
task_id=task_id,
round_index=round_index,
nonce=nonce,
repo_url=deployment_info["repo_url"],
commit_sha=deployment_info["commit_sha"],
pages_url=deployment_info["pages_url"],
)
logger.info(f"[DEPLOYMENT] Success. Repo: {deployment_info['repo_url']} Pages: {deployment_info['pages_url']}")
except Exception as exc:
logger.exception(f"[CRITICAL FAILURE] Task {getattr(task_data, 'task', 'unknown')} failed: {exc}")
finally:
if acquired:
task_semaphore.release()
flush_logs()
logger.info(
f"[PROCESS END] Task: {getattr(task_data, 'task', 'unknown')} Round: {getattr(task_data, 'round', 'unknown')}"
)
# ------------------------- Endpoint handlers -------------------------
def _task_done_callback(task: asyncio.Task):
try:
exc = task.exception()
if exc:
logger.error(f"[BACKGROUND TASK] Task finished with exception: {exc}")
logger.exception(exc)
else:
logger.info("[BACKGROUND TASK] Task finished successfully.")
except asyncio.CancelledError:
logger.warning("[BACKGROUND TASK] Task was cancelled.")
finally:
flush_logs()
@app.post("/ready", status_code=200)
async def receive_task(task_data: TaskRequest, request: Request):
global last_received_task, background_tasks_list
if not verify_secret(task_data.secret):
logger.warning(f"Unauthorized attempt for task {task_data.task} from {request.client.host if request.client else 'unknown'}")
raise HTTPException(status_code=401, detail="Unauthorized: Secret mismatch")
last_received_task = {
"task": task_data.task,
"email": task_data.email,
"round": task_data.round,
"brief": (task_data.brief[:250] + "...") if len(task_data.brief) > 250 else task_data.brief,
"time": datetime.utcnow().isoformat() + "Z"
}
bg_task = asyncio.create_task(generate_files_and_deploy(task_data))
bg_task.add_done_callback(_task_done_callback)
background_tasks_list.append(bg_task)
logger.info(f"Received task {task_data.task}. Background processing started.")
flush_logs()
return JSONResponse(status_code=200, content={"status": "ready", "message": f"Task {task_data.task} received and processing started."})
@app.get("/")
async def root():
return {"message": "Task Receiver Service running. POST /ready to submit."}
@app.get("/status")
async def get_status():
global last_received_task, background_tasks_list
if last_received_task:
background_tasks_list[:] = [t for t in background_tasks_list if not t.done()]
return {"last_received_task": last_received_task, "running_background_tasks": len(background_tasks_list)}
return {"message": "Awaiting first task submission to /ready"}
@app.get("/health")
async def health():
return {"status": "ok", "timestamp": datetime.utcnow().isoformat() + "Z"}
@app.get("/logs")
async def get_logs(lines: int = Query(200, ge=1, le=5000)):
path = settings.LOG_FILE_PATH
if not os.path.exists(path):
return PlainTextResponse("Log file not found.", status_code=404)
try:
with open(path, "rb") as f:
f.seek(0, os.SEEK_END)
file_size = f.tell()
buffer = bytearray()
block_size = 1024
blocks = 0
# Read from end until we have enough or hit limit
while file_size > 0 and len(buffer) < lines * 2000 and blocks < 1024:
read_size = min(block_size, file_size)
f.seek(file_size - read_size)
buffer.extend(f.read(read_size))
file_size -= read_size
blocks += 1
text = buffer.decode(errors="ignore").splitlines()
last_lines = "\n".join(text[-lines:])
return PlainTextResponse(last_lines)
except Exception as e:
logger.exception(f"Error reading log file: {e}")
return PlainTextResponse(f"Error reading log file: {e}", status_code=500)
# ------------------------- Startup / Shutdown -------------------------
@app.on_event("startup")
async def startup_event():
async def keep_alive():
while True:
try:
logger.info("[KEEPALIVE] Service heartbeat")
flush_logs()
except Exception:
pass
await asyncio.sleep(settings.KEEP_ALIVE_INTERVAL_SECONDS)
asyncio.create_task(keep_alive())
@app.on_event("shutdown")
async def shutdown_event():
logger.info("[SHUTDOWN] Waiting for background tasks to finish (graceful shutdown)...")
for t in background_tasks_list:
if not t.done():
try:
t.cancel()
except Exception:
pass
await asyncio.sleep(0.5)
flush_logs()