|
|
import os |
|
|
import re |
|
|
import json |
|
|
import time |
|
|
from typing import Dict, Any, List, Optional |
|
|
from urllib.parse import urlparse, parse_qs |
|
|
|
|
|
import gradio as gr |
|
|
from fastapi import FastAPI, Request, HTTPException |
|
|
from fastapi.responses import JSONResponse |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
|
|
|
try: |
|
|
from huggingface_hub import HfApi |
|
|
HF_AVAILABLE = True |
|
|
except Exception: |
|
|
HfApi = None |
|
|
HF_AVAILABLE = False |
|
|
|
|
|
|
|
|
BASE_DIR = os.path.dirname(__file__) |
|
|
UPLOAD_DIR = os.path.join(BASE_DIR, "uploads") |
|
|
os.makedirs(UPLOAD_DIR, exist_ok=True) |
|
|
|
|
|
app = FastAPI(title="Data Collection Server", description="Receives text/URLs from captioning/image servers, groups by course, compiles JSON and optionally uploads to HuggingFace.") |
|
|
|
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
|
|
|
courses: Dict[str, Dict[str, Any]] = {} |
|
|
|
|
|
URL_RE = re.compile(r"https?://[\w\-\./?%&=:@,+~#]+") |
|
|
DONE_RE = re.compile(r"\b(done|finished|completed|complete)\b", re.IGNORECASE) |
|
|
|
|
|
HF_TOKEN = os.getenv("HF_TOKEN") |
|
|
HF_DATASET_REPO = os.getenv("HF_DATASET_REPO") |
|
|
|
|
|
|
|
|
def extract_urls(text: str) -> List[str]: |
|
|
return URL_RE.findall(text or "") |
|
|
|
|
|
|
|
|
def extract_course_from_url(url: str) -> str: |
|
|
try: |
|
|
parsed = urlparse(url) |
|
|
qs = parse_qs(parsed.query) |
|
|
course = qs.get("course") or qs.get("Course") or qs.get("COURSE") |
|
|
if course: |
|
|
return course[0] |
|
|
except Exception: |
|
|
pass |
|
|
return None |
|
|
|
|
|
|
|
|
def now_ts() -> str: |
|
|
return time.strftime("%Y%m%dT%H%M%S") |
|
|
|
|
|
|
|
|
async def parse_request(request: Request) -> Dict[str, Any]: |
|
|
"""Read incoming request in any format and return a dict with keys: text, json, form, headers""" |
|
|
payload = {"text": "", "json": None, "form": {}, "headers": dict(request.headers)} |
|
|
|
|
|
|
|
|
try: |
|
|
body = await request.json() |
|
|
payload["json"] = body |
|
|
|
|
|
if isinstance(body, str): |
|
|
payload["text"] = body |
|
|
elif isinstance(body, dict): |
|
|
|
|
|
for k in ["text", "caption", "message", "body", "content"]: |
|
|
if k in body and isinstance(body[k], str): |
|
|
payload["text"] = body[k] |
|
|
break |
|
|
|
|
|
if "course" in body and isinstance(body["course"], str): |
|
|
payload["course"] = body["course"] |
|
|
except Exception: |
|
|
|
|
|
try: |
|
|
raw = (await request.body()).decode("utf-8", errors="ignore") |
|
|
payload["text"] = raw |
|
|
except Exception: |
|
|
payload["text"] = "" |
|
|
|
|
|
|
|
|
try: |
|
|
form = await request.form() |
|
|
for k, v in form.multi_items(): |
|
|
|
|
|
payload["form"][k] = str(v) |
|
|
if k in ("text", "caption", "message", "content") and not payload["text"]: |
|
|
payload["text"] = str(v) |
|
|
if k == "course": |
|
|
payload["course"] = str(v) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
if not payload["text"] and payload.get("json") is not None: |
|
|
try: |
|
|
payload["text"] = json.dumps(payload["json"]) |
|
|
except Exception: |
|
|
payload["text"] = str(payload["json"]) |
|
|
|
|
|
return payload |
|
|
|
|
|
|
|
|
def add_entry(course: str, entry: Dict[str, Any]): |
|
|
c = courses.setdefault(course, {"items": [], "last_updated": None}) |
|
|
c["items"].append(entry) |
|
|
c["last_updated"] = time.time() |
|
|
|
|
|
|
|
|
def compile_course(course: str) -> str: |
|
|
"""Compile course data to JSON file and optionally upload to HuggingFace. Returns path to saved file.""" |
|
|
if course not in courses: |
|
|
raise ValueError(f"Unknown course: {course}") |
|
|
|
|
|
data = { |
|
|
"course": course, |
|
|
"compiled_at": now_ts(), |
|
|
"count": len(courses[course]["items"]), |
|
|
"items": courses[course]["items"], |
|
|
} |
|
|
|
|
|
filename = f"{course}_{now_ts()}.json" |
|
|
safe_filename = re.sub(r"[^a-zA-Z0-9_\-\.]+", "_", filename) |
|
|
path = os.path.join(UPLOAD_DIR, safe_filename) |
|
|
|
|
|
with open(path, "w", encoding="utf-8") as f: |
|
|
json.dump(data, f, ensure_ascii=False, indent=2) |
|
|
|
|
|
|
|
|
if HF_TOKEN and HF_DATASET_REPO and HF_AVAILABLE: |
|
|
try: |
|
|
api = HfApi() |
|
|
|
|
|
api.upload_file( |
|
|
path_or_fileobj=path, |
|
|
path_in_repo=safe_filename, |
|
|
repo_id=HF_DATASET_REPO, |
|
|
repo_type="dataset", |
|
|
token=HF_TOKEN, |
|
|
) |
|
|
except Exception as e: |
|
|
|
|
|
print(f"[WARN] HuggingFace upload failed: {e}") |
|
|
|
|
|
|
|
|
courses[course]["items"] = [] |
|
|
return path |
|
|
|
|
|
|
|
|
@app.post("/submit") |
|
|
async def submit(request: Request): |
|
|
"""Receive any data (text, JSON, form). Will try to extract course and URLs and store entries. |
|
|
If message contains 'done' or similar, it will compile the course to JSON (and upload if configured). |
|
|
""" |
|
|
payload = await parse_request(request) |
|
|
text = (payload.get("text") or "").strip() |
|
|
|
|
|
|
|
|
urls = extract_urls(text) |
|
|
|
|
|
|
|
|
course = payload.get("course") |
|
|
if not course: |
|
|
for u in urls: |
|
|
c = extract_course_from_url(u) |
|
|
if c: |
|
|
course = c |
|
|
break |
|
|
|
|
|
if not course: |
|
|
course = "unknown_course" |
|
|
|
|
|
entry = { |
|
|
"timestamp": now_ts(), |
|
|
"text": text, |
|
|
"json": payload.get("json"), |
|
|
"form": payload.get("form"), |
|
|
"urls": urls, |
|
|
"headers": {k: v for k, v in payload.get("headers", {}).items() if k.lower() in ("user-agent", "host", "content-type")}, |
|
|
} |
|
|
|
|
|
add_entry(course, entry) |
|
|
|
|
|
|
|
|
if DONE_RE.search(text): |
|
|
try: |
|
|
path = compile_course(course) |
|
|
return JSONResponse({"status": "compiled", "course": course, "path": path}) |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return JSONResponse({"status": "stored", "course": course, "count": len(courses[course]["items"])}) |
|
|
|
|
|
|
|
|
@app.get("/status") |
|
|
async def status(): |
|
|
summary = {c: {"count": len(v["items"]), "last_updated": v["last_updated"]} for c, v in courses.items()} |
|
|
return {"courses": summary} |
|
|
|
|
|
|
|
|
@app.post("/compile") |
|
|
async def compile_endpoint(course: str = None): |
|
|
"""Force compile a course. If course is not provided and only one exists, compile that one.""" |
|
|
if not course: |
|
|
if len(courses) == 1: |
|
|
course = next(iter(courses.keys())) |
|
|
else: |
|
|
raise HTTPException(status_code=400, detail="Provide course query parameter when multiple courses exist.") |
|
|
|
|
|
try: |
|
|
path = compile_course(course) |
|
|
return {"status": "compiled", "course": course, "path": path} |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
@app.get("/debug/{course}") |
|
|
async def debug_course(course: str): |
|
|
if course not in courses: |
|
|
raise HTTPException(status_code=404, detail="Course not found") |
|
|
return courses[course] |
|
|
|
|
|
|
|
|
|
|
|
def get_course_summary(): |
|
|
"""Get a formatted summary of all courses and their items""" |
|
|
if not courses: |
|
|
return "No courses available" |
|
|
|
|
|
summary = [] |
|
|
for course, data in courses.items(): |
|
|
items = data["items"] |
|
|
last_updated = data["last_updated"] |
|
|
last_updated_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_updated)) if last_updated else "never" |
|
|
|
|
|
summary.append(f"Course: {course}") |
|
|
summary.append(f"Items: {len(items)}") |
|
|
summary.append(f"Last Updated: {last_updated_str}") |
|
|
if items: |
|
|
summary.append("\nLast 3 captions:") |
|
|
for item in items[-3:]: |
|
|
text = item.get("text", "").strip() |
|
|
urls = item.get("urls", []) |
|
|
summary.append(f"- {text[:200]}... ({len(urls)} URLs)") |
|
|
summary.append("\n" + "-"*40 + "\n") |
|
|
|
|
|
return "\n".join(summary) |
|
|
|
|
|
def compile_course_ui(course_name: str) -> str: |
|
|
"""Compile a course from the Gradio UI""" |
|
|
if not course_name: |
|
|
return "Please enter a course name" |
|
|
try: |
|
|
path = compile_course(course_name) |
|
|
return f"Successfully compiled course to {path}" |
|
|
except Exception as e: |
|
|
return f"Error compiling course: {str(e)}" |
|
|
|
|
|
def add_test_caption(course_name: str, caption_text: str) -> str: |
|
|
"""Add a test caption through the Gradio UI""" |
|
|
if not course_name or not caption_text: |
|
|
return "Please enter both course name and caption text" |
|
|
|
|
|
entry = { |
|
|
"timestamp": now_ts(), |
|
|
"text": caption_text, |
|
|
"urls": extract_urls(caption_text), |
|
|
"json": None, |
|
|
"form": {}, |
|
|
"headers": {"user-agent": "Gradio UI"}, |
|
|
} |
|
|
|
|
|
add_entry(course_name, entry) |
|
|
return f"Added caption to course '{course_name}'. Total items: {len(courses[course_name]['items'])}" |
|
|
|
|
|
def create_gradio_interface(): |
|
|
"""Create the Gradio interface tabs and components""" |
|
|
with gr.Blocks(title="Data Collection Server UI") as interface: |
|
|
gr.Markdown("# Data Collection Server Monitor") |
|
|
|
|
|
with gr.Tab("Course Status"): |
|
|
status_output = gr.Textbox(label="Current Status", value="Loading...", lines=15) |
|
|
refresh_btn = gr.Button("Refresh Status") |
|
|
refresh_btn.click(get_course_summary, outputs=status_output) |
|
|
|
|
|
|
|
|
gr.on(triggers="load", fn=get_course_summary, outputs=status_output) |
|
|
|
|
|
with gr.Tab("Manual Controls"): |
|
|
gr.Markdown("### Compile Course") |
|
|
with gr.Row(): |
|
|
compile_course_input = gr.Textbox(label="Course Name") |
|
|
compile_btn = gr.Button("Compile to JSON") |
|
|
compile_output = gr.Textbox(label="Compile Result", lines=3) |
|
|
compile_btn.click(compile_course_ui, inputs=compile_course_input, outputs=compile_output) |
|
|
|
|
|
gr.Markdown("### Add Test Caption") |
|
|
with gr.Row(): |
|
|
test_course_input = gr.Textbox(label="Course Name") |
|
|
test_caption_input = gr.Textbox(label="Caption Text", lines=3) |
|
|
test_btn = gr.Button("Add Caption") |
|
|
test_output = gr.Textbox(label="Result", lines=2) |
|
|
test_btn.click( |
|
|
add_test_caption, |
|
|
inputs=[test_course_input, test_caption_input], |
|
|
outputs=test_output |
|
|
) |
|
|
|
|
|
return interface |
|
|
|
|
|
|
|
|
interface = create_gradio_interface() |
|
|
app = gr.mount_gradio_app(app, interface, path="/ui") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
port = int(os.getenv("PORT", "8000")) |
|
|
print(f"[INFO] Server starting on port {port}") |
|
|
print(f"[INFO] FastAPI docs: http://localhost:{port}/docs") |
|
|
print(f"[INFO] Gradio UI: http://localhost:{port}/ui") |
|
|
uvicorn.run(app, host="0.0.0.0", port=port) |
|
|
|