Fred808 commited on
Commit
5e5412c
·
verified ·
1 Parent(s): 2f2e31e

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +233 -0
app.py ADDED
@@ -0,0 +1,233 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import re
3
+ import json
4
+ import time
5
+ from typing import Dict, Any, List
6
+ from urllib.parse import urlparse, parse_qs
7
+
8
+ from fastapi import FastAPI, Request, HTTPException
9
+ from fastapi.responses import JSONResponse
10
+
11
+ try:
12
+ from huggingface_hub import HfApi
13
+ HF_AVAILABLE = True
14
+ except Exception:
15
+ HfApi = None
16
+ HF_AVAILABLE = False
17
+
18
+ # Directory to store compiled uploads
19
+ BASE_DIR = os.path.dirname(__file__)
20
+ UPLOAD_DIR = os.path.join(BASE_DIR, "uploads")
21
+ os.makedirs(UPLOAD_DIR, exist_ok=True)
22
+
23
+ app = FastAPI(title="Data Collection Server", description="Receives text/URLs from captioning/image servers, groups by course, compiles JSON and optionally uploads to HuggingFace.")
24
+
25
+ # In-memory store for course data
26
+ courses: Dict[str, Dict[str, Any]] = {}
27
+
28
+ URL_RE = re.compile(r"https?://[\w\-\./?%&=:@,+~#]+")
29
+ DONE_RE = re.compile(r"\b(done|finished|completed|complete)\b", re.IGNORECASE)
30
+
31
+ HF_TOKEN = os.getenv("HF_TOKEN")
32
+ HF_DATASET_REPO = os.getenv("HF_DATASET_REPO") # e.g. "username/dataset-name"
33
+
34
+
35
+ def extract_urls(text: str) -> List[str]:
36
+ return URL_RE.findall(text or "")
37
+
38
+
39
+ def extract_course_from_url(url: str) -> str:
40
+ try:
41
+ parsed = urlparse(url)
42
+ qs = parse_qs(parsed.query)
43
+ course = qs.get("course") or qs.get("Course") or qs.get("COURSE")
44
+ if course:
45
+ return course[0]
46
+ except Exception:
47
+ pass
48
+ return None
49
+
50
+
51
+ def now_ts() -> str:
52
+ return time.strftime("%Y%m%dT%H%M%S")
53
+
54
+
55
+ async def parse_request(request: Request) -> Dict[str, Any]:
56
+ """Read incoming request in any format and return a dict with keys: text, json, form, headers"""
57
+ payload = {"text": "", "json": None, "form": {}, "headers": dict(request.headers)}
58
+
59
+ # Try JSON
60
+ try:
61
+ body = await request.json()
62
+ payload["json"] = body
63
+ # if it's a simple string payload inside JSON
64
+ if isinstance(body, str):
65
+ payload["text"] = body
66
+ elif isinstance(body, dict):
67
+ # flatten likely fields
68
+ for k in ["text", "caption", "message", "body", "content"]:
69
+ if k in body and isinstance(body[k], str):
70
+ payload["text"] = body[k]
71
+ break
72
+ # allow explicit course field
73
+ if "course" in body and isinstance(body["course"], str):
74
+ payload["course"] = body["course"]
75
+ except Exception:
76
+ # not JSON - try raw body
77
+ try:
78
+ raw = (await request.body()).decode("utf-8", errors="ignore")
79
+ payload["text"] = raw
80
+ except Exception:
81
+ payload["text"] = ""
82
+
83
+ # Try form (for multipart/form-data)
84
+ try:
85
+ form = await request.form()
86
+ for k, v in form.multi_items():
87
+ # take first text-like value
88
+ payload["form"][k] = str(v)
89
+ if k in ("text", "caption", "message", "content") and not payload["text"]:
90
+ payload["text"] = str(v)
91
+ if k == "course":
92
+ payload["course"] = str(v)
93
+ except Exception:
94
+ pass
95
+
96
+ # If no text yet but JSON is a list or similar, stringify (best-effort)
97
+ if not payload["text"] and payload.get("json") is not None:
98
+ try:
99
+ payload["text"] = json.dumps(payload["json"])
100
+ except Exception:
101
+ payload["text"] = str(payload["json"])
102
+
103
+ return payload
104
+
105
+
106
+ def add_entry(course: str, entry: Dict[str, Any]):
107
+ c = courses.setdefault(course, {"items": [], "last_updated": None})
108
+ c["items"].append(entry)
109
+ c["last_updated"] = time.time()
110
+
111
+
112
+ def compile_course(course: str) -> str:
113
+ """Compile course data to JSON file and optionally upload to HuggingFace. Returns path to saved file."""
114
+ if course not in courses:
115
+ raise ValueError(f"Unknown course: {course}")
116
+
117
+ data = {
118
+ "course": course,
119
+ "compiled_at": now_ts(),
120
+ "count": len(courses[course]["items"]),
121
+ "items": courses[course]["items"],
122
+ }
123
+
124
+ filename = f"{course}_{now_ts()}.json"
125
+ safe_filename = re.sub(r"[^a-zA-Z0-9_\-\.]+", "_", filename)
126
+ path = os.path.join(UPLOAD_DIR, safe_filename)
127
+
128
+ with open(path, "w", encoding="utf-8") as f:
129
+ json.dump(data, f, ensure_ascii=False, indent=2)
130
+
131
+ # Optionally upload to HuggingFace
132
+ if HF_TOKEN and HF_DATASET_REPO and HF_AVAILABLE:
133
+ try:
134
+ api = HfApi()
135
+ # upload path at root of repo with same filename
136
+ api.upload_file(
137
+ path_or_fileobj=path,
138
+ path_in_repo=safe_filename,
139
+ repo_id=HF_DATASET_REPO,
140
+ repo_type="dataset",
141
+ token=HF_TOKEN,
142
+ )
143
+ except Exception as e:
144
+ # Log but don't fail the compile
145
+ print(f"[WARN] HuggingFace upload failed: {e}")
146
+
147
+ # After compiling, clear stored items for that course
148
+ courses[course]["items"] = []
149
+ return path
150
+
151
+
152
+ @app.post("/submit")
153
+ async def submit(request: Request):
154
+ """Receive any data (text, JSON, form). Will try to extract course and URLs and store entries.
155
+ If message contains 'done' or similar, it will compile the course to JSON (and upload if configured).
156
+ """
157
+ payload = await parse_request(request)
158
+ text = (payload.get("text") or "").strip()
159
+
160
+ # Collect urls found
161
+ urls = extract_urls(text)
162
+
163
+ # Determine course from payload (explicit field) or from any URL
164
+ course = payload.get("course")
165
+ if not course:
166
+ for u in urls:
167
+ c = extract_course_from_url(u)
168
+ if c:
169
+ course = c
170
+ break
171
+
172
+ if not course:
173
+ course = "unknown_course"
174
+
175
+ entry = {
176
+ "timestamp": now_ts(),
177
+ "text": text,
178
+ "json": payload.get("json"),
179
+ "form": payload.get("form"),
180
+ "urls": urls,
181
+ "headers": {k: v for k, v in payload.get("headers", {}).items() if k.lower() in ("user-agent", "host", "content-type")},
182
+ }
183
+
184
+ add_entry(course, entry)
185
+
186
+ # Detect completion
187
+ if DONE_RE.search(text):
188
+ try:
189
+ path = compile_course(course)
190
+ return JSONResponse({"status": "compiled", "course": course, "path": path})
191
+ except Exception as e:
192
+ raise HTTPException(status_code=500, detail=str(e))
193
+
194
+ # Detect explicit 'course change' in URLs (if a URL contains a different course than stored) -- best-effort
195
+ # If a URL indicates a different course and there were previous items, compile previous course first
196
+ # Example: previous stored course is same; we don't track per-source last course, so skip this more complex behavior for now
197
+
198
+ return JSONResponse({"status": "stored", "course": course, "count": len(courses[course]["items"])})
199
+
200
+
201
+ @app.get("/status")
202
+ async def status():
203
+ summary = {c: {"count": len(v["items"]), "last_updated": v["last_updated"]} for c, v in courses.items()}
204
+ return {"courses": summary}
205
+
206
+
207
+ @app.post("/compile")
208
+ async def compile_endpoint(course: str = None):
209
+ """Force compile a course. If course is not provided and only one exists, compile that one."""
210
+ if not course:
211
+ if len(courses) == 1:
212
+ course = next(iter(courses.keys()))
213
+ else:
214
+ raise HTTPException(status_code=400, detail="Provide course query parameter when multiple courses exist.")
215
+
216
+ try:
217
+ path = compile_course(course)
218
+ return {"status": "compiled", "course": course, "path": path}
219
+ except Exception as e:
220
+ raise HTTPException(status_code=500, detail=str(e))
221
+
222
+
223
+ @app.get("/debug/{course}")
224
+ async def debug_course(course: str):
225
+ if course not in courses:
226
+ raise HTTPException(status_code=404, detail="Course not found")
227
+ return courses[course]
228
+
229
+
230
+ if __name__ == "__main__":
231
+ import uvicorn
232
+ port = int(os.getenv("PORT", "8000"))
233
+ uvicorn.run(app, host="0.0.0.0", port=port)