import os import re import json import time from typing import Dict, Any, List, Optional from urllib.parse import urlparse, parse_qs import gradio as gr from fastapi import FastAPI, Request, HTTPException from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware try: from huggingface_hub import HfApi HF_AVAILABLE = True except Exception: HfApi = None HF_AVAILABLE = False # Directory to store compiled uploads BASE_DIR = os.path.dirname(__file__) UPLOAD_DIR = os.path.join(BASE_DIR, "uploads") os.makedirs(UPLOAD_DIR, exist_ok=True) app = FastAPI(title="Data Collection Server", description="Receives text/URLs from captioning/image servers, groups by course, compiles JSON and optionally uploads to HuggingFace.") # Enable CORS for Gradio app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # In-memory store for course data courses: Dict[str, Dict[str, Any]] = {} URL_RE = re.compile(r"https?://[\w\-\./?%&=:@,+~#]+") DONE_RE = re.compile(r"\b(done|finished|completed|complete)\b", re.IGNORECASE) HF_TOKEN = os.getenv("HF_TOKEN") HF_DATASET_REPO = os.getenv("HF_DATASET_REPO") # e.g. "username/dataset-name" def extract_urls(text: str) -> List[str]: return URL_RE.findall(text or "") def extract_course_from_url(url: str) -> str: try: parsed = urlparse(url) qs = parse_qs(parsed.query) course = qs.get("course") or qs.get("Course") or qs.get("COURSE") if course: return course[0] except Exception: pass return None def now_ts() -> str: return time.strftime("%Y%m%dT%H%M%S") async def parse_request(request: Request) -> Dict[str, Any]: """Read incoming request in any format and return a dict with keys: text, json, form, headers""" payload = {"text": "", "json": None, "form": {}, "headers": dict(request.headers)} # Try JSON try: body = await request.json() payload["json"] = body # if it's a simple string payload inside JSON if isinstance(body, str): payload["text"] = body elif isinstance(body, dict): # flatten likely fields for k in ["text", "caption", "message", "body", "content"]: if k in body and isinstance(body[k], str): payload["text"] = body[k] break # allow explicit course field if "course" in body and isinstance(body["course"], str): payload["course"] = body["course"] except Exception: # not JSON - try raw body try: raw = (await request.body()).decode("utf-8", errors="ignore") payload["text"] = raw except Exception: payload["text"] = "" # Try form (for multipart/form-data) try: form = await request.form() for k, v in form.multi_items(): # take first text-like value payload["form"][k] = str(v) if k in ("text", "caption", "message", "content") and not payload["text"]: payload["text"] = str(v) if k == "course": payload["course"] = str(v) except Exception: pass # If no text yet but JSON is a list or similar, stringify (best-effort) if not payload["text"] and payload.get("json") is not None: try: payload["text"] = json.dumps(payload["json"]) except Exception: payload["text"] = str(payload["json"]) return payload def add_entry(course: str, entry: Dict[str, Any]): c = courses.setdefault(course, {"items": [], "last_updated": None}) c["items"].append(entry) c["last_updated"] = time.time() def compile_course(course: str) -> str: """Compile course data to JSON file and optionally upload to HuggingFace. Returns path to saved file.""" if course not in courses: raise ValueError(f"Unknown course: {course}") data = { "course": course, "compiled_at": now_ts(), "count": len(courses[course]["items"]), "items": courses[course]["items"], } filename = f"{course}_{now_ts()}.json" safe_filename = re.sub(r"[^a-zA-Z0-9_\-\.]+", "_", filename) path = os.path.join(UPLOAD_DIR, safe_filename) with open(path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) # Optionally upload to HuggingFace if HF_TOKEN and HF_DATASET_REPO and HF_AVAILABLE: try: api = HfApi() # upload path at root of repo with same filename api.upload_file( path_or_fileobj=path, path_in_repo=safe_filename, repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN, ) except Exception as e: # Log but don't fail the compile print(f"[WARN] HuggingFace upload failed: {e}") # After compiling, clear stored items for that course courses[course]["items"] = [] return path @app.post("/submit") async def submit(request: Request): """Receive any data (text, JSON, form). Will try to extract course and URLs and store entries. If message contains 'done' or similar, it will compile the course to JSON (and upload if configured). """ payload = await parse_request(request) text = (payload.get("text") or "").strip() # Collect urls found urls = extract_urls(text) # Determine course from payload (explicit field) or from any URL course = payload.get("course") if not course: for u in urls: c = extract_course_from_url(u) if c: course = c break if not course: course = "unknown_course" entry = { "timestamp": now_ts(), "text": text, "json": payload.get("json"), "form": payload.get("form"), "urls": urls, "headers": {k: v for k, v in payload.get("headers", {}).items() if k.lower() in ("user-agent", "host", "content-type")}, } add_entry(course, entry) # Detect completion if DONE_RE.search(text): try: path = compile_course(course) return JSONResponse({"status": "compiled", "course": course, "path": path}) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Detect explicit 'course change' in URLs (if a URL contains a different course than stored) -- best-effort # If a URL indicates a different course and there were previous items, compile previous course first # Example: previous stored course is same; we don't track per-source last course, so skip this more complex behavior for now return JSONResponse({"status": "stored", "course": course, "count": len(courses[course]["items"])}) @app.get("/status") async def status(): summary = {c: {"count": len(v["items"]), "last_updated": v["last_updated"]} for c, v in courses.items()} return {"courses": summary} @app.post("/compile") async def compile_endpoint(course: str = None): """Force compile a course. If course is not provided and only one exists, compile that one.""" if not course: if len(courses) == 1: course = next(iter(courses.keys())) else: raise HTTPException(status_code=400, detail="Provide course query parameter when multiple courses exist.") try: path = compile_course(course) return {"status": "compiled", "course": course, "path": path} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/debug/{course}") async def debug_course(course: str): if course not in courses: raise HTTPException(status_code=404, detail="Course not found") return courses[course] # Gradio Interface def get_course_summary(): """Get a formatted summary of all courses and their items""" if not courses: return "No courses available" summary = [] for course, data in courses.items(): items = data["items"] last_updated = data["last_updated"] last_updated_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_updated)) if last_updated else "never" summary.append(f"Course: {course}") summary.append(f"Items: {len(items)}") summary.append(f"Last Updated: {last_updated_str}") if items: summary.append("\nLast 3 captions:") for item in items[-3:]: text = item.get("text", "").strip() urls = item.get("urls", []) summary.append(f"- {text[:200]}... ({len(urls)} URLs)") summary.append("\n" + "-"*40 + "\n") return "\n".join(summary) def compile_course_ui(course_name: str) -> str: """Compile a course from the Gradio UI""" if not course_name: return "Please enter a course name" try: path = compile_course(course_name) return f"Successfully compiled course to {path}" except Exception as e: return f"Error compiling course: {str(e)}" def add_test_caption(course_name: str, caption_text: str) -> str: """Add a test caption through the Gradio UI""" if not course_name or not caption_text: return "Please enter both course name and caption text" entry = { "timestamp": now_ts(), "text": caption_text, "urls": extract_urls(caption_text), "json": None, "form": {}, "headers": {"user-agent": "Gradio UI"}, } add_entry(course_name, entry) return f"Added caption to course '{course_name}'. Total items: {len(courses[course_name]['items'])}" def create_gradio_interface(): """Create the Gradio interface tabs and components""" with gr.Blocks(title="Data Collection Server UI") as interface: gr.Markdown("# Data Collection Server Monitor") with gr.Tab("Course Status"): status_output = gr.Textbox(label="Current Status", value="Loading...", lines=15) refresh_btn = gr.Button("Refresh Status") refresh_btn.click(get_course_summary, outputs=status_output) # Add load event inside the Blocks context gr.on(triggers="load", fn=get_course_summary, outputs=status_output) with gr.Tab("Manual Controls"): gr.Markdown("### Compile Course") with gr.Row(): compile_course_input = gr.Textbox(label="Course Name") compile_btn = gr.Button("Compile to JSON") compile_output = gr.Textbox(label="Compile Result", lines=3) compile_btn.click(compile_course_ui, inputs=compile_course_input, outputs=compile_output) gr.Markdown("### Add Test Caption") with gr.Row(): test_course_input = gr.Textbox(label="Course Name") test_caption_input = gr.Textbox(label="Caption Text", lines=3) test_btn = gr.Button("Add Caption") test_output = gr.Textbox(label="Result", lines=2) test_btn.click( add_test_caption, inputs=[test_course_input, test_caption_input], outputs=test_output ) return interface # Mount Gradio app to FastAPI interface = create_gradio_interface() app = gr.mount_gradio_app(app, interface, path="/ui") if __name__ == "__main__": import uvicorn port = int(os.getenv("PORT", "8000")) print(f"[INFO] Server starting on port {port}") print(f"[INFO] FastAPI docs: http://localhost:{port}/docs") print(f"[INFO] Gradio UI: http://localhost:{port}/ui") uvicorn.run(app, host="0.0.0.0", port=port)