PIL2 / app.py
Fred808's picture
Update app.py
169c9a7 verified
import os
import re
import json
import time
from typing import Dict, Any, List, Optional
from urllib.parse import urlparse, parse_qs
import gradio as gr
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
try:
from huggingface_hub import HfApi
HF_AVAILABLE = True
except Exception:
HfApi = None
HF_AVAILABLE = False
# Directory to store compiled uploads
BASE_DIR = os.path.dirname(__file__)
UPLOAD_DIR = os.path.join(BASE_DIR, "uploads")
os.makedirs(UPLOAD_DIR, exist_ok=True)
app = FastAPI(title="Data Collection Server", description="Receives text/URLs from captioning/image servers, groups by course, compiles JSON and optionally uploads to HuggingFace.")
# Enable CORS for Gradio
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# In-memory store for course data
courses: Dict[str, Dict[str, Any]] = {}
URL_RE = re.compile(r"https?://[\w\-\./?%&=:@,+~#]+")
DONE_RE = re.compile(r"\b(done|finished|completed|complete)\b", re.IGNORECASE)
HF_TOKEN = os.getenv("HF_TOKEN")
HF_DATASET_REPO = os.getenv("HF_DATASET_REPO") # e.g. "username/dataset-name"
def extract_urls(text: str) -> List[str]:
return URL_RE.findall(text or "")
def extract_course_from_url(url: str) -> str:
try:
parsed = urlparse(url)
qs = parse_qs(parsed.query)
course = qs.get("course") or qs.get("Course") or qs.get("COURSE")
if course:
return course[0]
except Exception:
pass
return None
def now_ts() -> str:
return time.strftime("%Y%m%dT%H%M%S")
async def parse_request(request: Request) -> Dict[str, Any]:
"""Read incoming request in any format and return a dict with keys: text, json, form, headers"""
payload = {"text": "", "json": None, "form": {}, "headers": dict(request.headers)}
# Try JSON
try:
body = await request.json()
payload["json"] = body
# if it's a simple string payload inside JSON
if isinstance(body, str):
payload["text"] = body
elif isinstance(body, dict):
# flatten likely fields
for k in ["text", "caption", "message", "body", "content"]:
if k in body and isinstance(body[k], str):
payload["text"] = body[k]
break
# allow explicit course field
if "course" in body and isinstance(body["course"], str):
payload["course"] = body["course"]
except Exception:
# not JSON - try raw body
try:
raw = (await request.body()).decode("utf-8", errors="ignore")
payload["text"] = raw
except Exception:
payload["text"] = ""
# Try form (for multipart/form-data)
try:
form = await request.form()
for k, v in form.multi_items():
# take first text-like value
payload["form"][k] = str(v)
if k in ("text", "caption", "message", "content") and not payload["text"]:
payload["text"] = str(v)
if k == "course":
payload["course"] = str(v)
except Exception:
pass
# If no text yet but JSON is a list or similar, stringify (best-effort)
if not payload["text"] and payload.get("json") is not None:
try:
payload["text"] = json.dumps(payload["json"])
except Exception:
payload["text"] = str(payload["json"])
return payload
def add_entry(course: str, entry: Dict[str, Any]):
c = courses.setdefault(course, {"items": [], "last_updated": None})
c["items"].append(entry)
c["last_updated"] = time.time()
def compile_course(course: str) -> str:
"""Compile course data to JSON file and optionally upload to HuggingFace. Returns path to saved file."""
if course not in courses:
raise ValueError(f"Unknown course: {course}")
data = {
"course": course,
"compiled_at": now_ts(),
"count": len(courses[course]["items"]),
"items": courses[course]["items"],
}
filename = f"{course}_{now_ts()}.json"
safe_filename = re.sub(r"[^a-zA-Z0-9_\-\.]+", "_", filename)
path = os.path.join(UPLOAD_DIR, safe_filename)
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# Optionally upload to HuggingFace
if HF_TOKEN and HF_DATASET_REPO and HF_AVAILABLE:
try:
api = HfApi()
# upload path at root of repo with same filename
api.upload_file(
path_or_fileobj=path,
path_in_repo=safe_filename,
repo_id=HF_DATASET_REPO,
repo_type="dataset",
token=HF_TOKEN,
)
except Exception as e:
# Log but don't fail the compile
print(f"[WARN] HuggingFace upload failed: {e}")
# After compiling, clear stored items for that course
courses[course]["items"] = []
return path
@app.post("/submit")
async def submit(request: Request):
"""Receive any data (text, JSON, form). Will try to extract course and URLs and store entries.
If message contains 'done' or similar, it will compile the course to JSON (and upload if configured).
"""
payload = await parse_request(request)
text = (payload.get("text") or "").strip()
# Collect urls found
urls = extract_urls(text)
# Determine course from payload (explicit field) or from any URL
course = payload.get("course")
if not course:
for u in urls:
c = extract_course_from_url(u)
if c:
course = c
break
if not course:
course = "unknown_course"
entry = {
"timestamp": now_ts(),
"text": text,
"json": payload.get("json"),
"form": payload.get("form"),
"urls": urls,
"headers": {k: v for k, v in payload.get("headers", {}).items() if k.lower() in ("user-agent", "host", "content-type")},
}
add_entry(course, entry)
# Detect completion
if DONE_RE.search(text):
try:
path = compile_course(course)
return JSONResponse({"status": "compiled", "course": course, "path": path})
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Detect explicit 'course change' in URLs (if a URL contains a different course than stored) -- best-effort
# If a URL indicates a different course and there were previous items, compile previous course first
# Example: previous stored course is same; we don't track per-source last course, so skip this more complex behavior for now
return JSONResponse({"status": "stored", "course": course, "count": len(courses[course]["items"])})
@app.get("/status")
async def status():
summary = {c: {"count": len(v["items"]), "last_updated": v["last_updated"]} for c, v in courses.items()}
return {"courses": summary}
@app.post("/compile")
async def compile_endpoint(course: str = None):
"""Force compile a course. If course is not provided and only one exists, compile that one."""
if not course:
if len(courses) == 1:
course = next(iter(courses.keys()))
else:
raise HTTPException(status_code=400, detail="Provide course query parameter when multiple courses exist.")
try:
path = compile_course(course)
return {"status": "compiled", "course": course, "path": path}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/debug/{course}")
async def debug_course(course: str):
if course not in courses:
raise HTTPException(status_code=404, detail="Course not found")
return courses[course]
# Gradio Interface
def get_course_summary():
"""Get a formatted summary of all courses and their items"""
if not courses:
return "No courses available"
summary = []
for course, data in courses.items():
items = data["items"]
last_updated = data["last_updated"]
last_updated_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_updated)) if last_updated else "never"
summary.append(f"Course: {course}")
summary.append(f"Items: {len(items)}")
summary.append(f"Last Updated: {last_updated_str}")
if items:
summary.append("\nLast 3 captions:")
for item in items[-3:]:
text = item.get("text", "").strip()
urls = item.get("urls", [])
summary.append(f"- {text[:200]}... ({len(urls)} URLs)")
summary.append("\n" + "-"*40 + "\n")
return "\n".join(summary)
def compile_course_ui(course_name: str) -> str:
"""Compile a course from the Gradio UI"""
if not course_name:
return "Please enter a course name"
try:
path = compile_course(course_name)
return f"Successfully compiled course to {path}"
except Exception as e:
return f"Error compiling course: {str(e)}"
def add_test_caption(course_name: str, caption_text: str) -> str:
"""Add a test caption through the Gradio UI"""
if not course_name or not caption_text:
return "Please enter both course name and caption text"
entry = {
"timestamp": now_ts(),
"text": caption_text,
"urls": extract_urls(caption_text),
"json": None,
"form": {},
"headers": {"user-agent": "Gradio UI"},
}
add_entry(course_name, entry)
return f"Added caption to course '{course_name}'. Total items: {len(courses[course_name]['items'])}"
def create_gradio_interface():
"""Create the Gradio interface tabs and components"""
with gr.Blocks(title="Data Collection Server UI") as interface:
gr.Markdown("# Data Collection Server Monitor")
with gr.Tab("Course Status"):
status_output = gr.Textbox(label="Current Status", value="Loading...", lines=15)
refresh_btn = gr.Button("Refresh Status")
refresh_btn.click(get_course_summary, outputs=status_output)
# Add load event inside the Blocks context
gr.on(triggers="load", fn=get_course_summary, outputs=status_output)
with gr.Tab("Manual Controls"):
gr.Markdown("### Compile Course")
with gr.Row():
compile_course_input = gr.Textbox(label="Course Name")
compile_btn = gr.Button("Compile to JSON")
compile_output = gr.Textbox(label="Compile Result", lines=3)
compile_btn.click(compile_course_ui, inputs=compile_course_input, outputs=compile_output)
gr.Markdown("### Add Test Caption")
with gr.Row():
test_course_input = gr.Textbox(label="Course Name")
test_caption_input = gr.Textbox(label="Caption Text", lines=3)
test_btn = gr.Button("Add Caption")
test_output = gr.Textbox(label="Result", lines=2)
test_btn.click(
add_test_caption,
inputs=[test_course_input, test_caption_input],
outputs=test_output
)
return interface
# Mount Gradio app to FastAPI
interface = create_gradio_interface()
app = gr.mount_gradio_app(app, interface, path="/ui")
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", "8000"))
print(f"[INFO] Server starting on port {port}")
print(f"[INFO] FastAPI docs: http://localhost:{port}/docs")
print(f"[INFO] Gradio UI: http://localhost:{port}/ui")
uvicorn.run(app, host="0.0.0.0", port=port)