Fred808 commited on
Commit
a1e320e
·
verified ·
1 Parent(s): 10dec39

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +262 -65
app.py CHANGED
@@ -3,11 +3,16 @@ import json
3
  import time
4
  import asyncio
5
  import aiohttp
6
- from typing import Dict, List, Set
7
  from urllib.parse import quote, urljoin
8
  from datetime import datetime
9
  from pathlib import Path
10
 
 
 
 
 
 
11
  # Path for storing caption data
12
  CAPTIONS_DIR = Path("captions_data")
13
  CAPTIONS_DIR.mkdir(exist_ok=True)
@@ -58,34 +63,225 @@ CAPTION_SERVERS = [
58
  "https://fred1012-fred1012-ptlc5u.hf.space/analyze",
59
  "https://fred1012-fred1012-u7lh57.hf.space/analyze",
60
  "https://fred1012-fred1012-q8djv1.hf.space/analyze",
61
- "https://fredalone-fredalone-ozugrp.hf.space/analyze",
62
- "https://fredalone-fredalone-9brxj2.hf.space/analyze",
63
- "https://fredalone-fredalone-p8vq9a.hf.space/analyze",
64
- "https://fredalone-fredalone-vbli2y.hf.space/analyze",
65
- "https://fredalone-fredalone-uggger.hf.space/analyze",
66
- "https://fredalone-fredalone-nmi7e8.hf.space/analyze",
67
- "https://fredalone-fredalone-d1f26d.hf.space/analyze",
68
- "https://fredalone-fredalone-461jp2.hf.space/analyze",
69
- "https://fredalone-fredalone-3enfg4.hf.space/analyze",
70
- "https://fredalone-fredalone-dqdbpv.hf.space/analyze",
71
- "https://fredalone-fredalone-ivtjua.hf.space/analyze",
72
- "https://fredalone-fredalone-6bezt2.hf.space/analyze",
73
- "https://fredalone-fredalone-e0wfnk.hf.space/analyze",
74
- "https://fredalone-fredalone-zu2t7j.hf.space/analyze",
75
- "https://fredalone-fredalone-dqtv1o.hf.space/analyze",
76
- "https://fredalone-fredalone-wclyog.hf.space/analyze",
77
- "https://fredalone-fredalone-t27vig.hf.space/analyze",
78
- "https://fredalone-fredalone-gahbxh.hf.space/analyze",
79
- "https://fredalone-fredalone-kw2po4.hf.space/analyze",
80
- "https://fredalone-fredalone-8h285h.hf.space/analyze"
81
  ]
82
  MODEL_TYPE = "Florence-2-large" # Explicitly request large model
83
  DATA_COLLECTION_SERVER = "https://fred808-flow.hf.space"
84
 
85
- # Tracking state
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
86
  processed_images: Dict[str, Set[str]] = {} # {course: set(image_names)}
87
  course_captions: Dict[str, List[Dict]] = {} # {course: [{image, caption, metadata}]}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
89
  async def fetch_courses() -> List[str]:
90
  """Fetch available courses from source server"""
91
  async with aiohttp.ClientSession() as session:
@@ -137,18 +333,6 @@ async def get_model_info():
137
  print(f"Couldn't get model info from {server}: {e}")
138
  return model_info
139
 
140
- class CaptionServer:
141
- def __init__(self, url):
142
- self.url = url
143
- self.busy = False
144
- self.model = "unknown"
145
- self.total_processed = 0
146
- self.total_time = 0
147
-
148
- @property
149
- def fps(self):
150
- return self.total_processed / self.total_time if self.total_time > 0 else 0
151
-
152
  async def process_image(server: CaptionServer, course: str, image: Dict) -> Dict:
153
  """Process single image through one caption server"""
154
  if server.busy:
@@ -230,7 +414,7 @@ async def process_course(course: str, servers: List[CaptionServer]):
230
  print(f"\nProcessing {len(images)} images for course {course}")
231
  remaining_images = [img for img in images if img['filename'] not in processed_images[course]]
232
 
233
- while remaining_images:
234
  # Create tasks for each available server
235
  tasks = []
236
  for server in servers:
@@ -270,23 +454,9 @@ async def process_course(course: str, servers: List[CaptionServer]):
270
  course_captions[course].clear()
271
  break
272
 
273
- async def main():
274
- # Initialize caption servers
275
- servers = [CaptionServer(url) for url in CAPTION_SERVERS]
276
-
277
- # Check for existing caption files and report
278
- existing_captions = list(CAPTIONS_DIR.glob("*_captions.json"))
279
- if existing_captions:
280
- print("\nFound existing caption files:")
281
- for cap_file in existing_captions:
282
- course = cap_file.stem.replace("_captions", "")
283
- try:
284
- with open(cap_file, 'r', encoding='utf-8') as f:
285
- captions = json.load(f)
286
- print(f"- {course}: {len(captions)} captions")
287
- except Exception as e:
288
- print(f"- Error reading {cap_file.name}: {e}")
289
- print()
290
 
291
  # Get model information and verify Florence-2-large availability
292
  model_info = await get_model_info()
@@ -302,21 +472,28 @@ async def main():
302
 
303
  if not available_servers:
304
  print(f"\nError: No servers with {MODEL_TYPE} available!")
 
305
  return
306
 
307
  # Update servers list to only use those with large model
308
- servers = available_servers
309
- print(f"\nUsing {len(servers)} servers with {MODEL_TYPE}")
310
  print()
311
 
312
  start_time = time.time()
313
 
314
- while True:
315
  try:
316
  # Get available courses
317
- courses = await fetch_courses()
 
 
 
 
318
  if not courses:
319
  print("No courses found, waiting...")
 
 
320
  await asyncio.sleep(10)
321
  continue
322
 
@@ -324,30 +501,50 @@ async def main():
324
 
325
  # Process each course with all available servers
326
  for course in courses:
327
- await process_course(course, servers)
 
 
328
 
329
  # Show server stats
330
  print("\nServer Stats:")
331
- total_processed = sum(s.total_processed for s in servers)
332
  elapsed = time.time() - start_time
333
  if elapsed > 0:
334
  print(f"Total images processed: {total_processed}")
335
  print(f"Overall speed: {total_processed/elapsed:.2f} fps")
336
- for s in servers:
337
  print(f"- {s.url}: {s.total_processed} images, {s.fps:.2f} fps")
338
  print()
339
 
 
 
 
340
  # Wait before next check
341
  await asyncio.sleep(5)
342
 
 
 
 
343
  except Exception as e:
344
- print(f"Error in main loop: {e}")
345
  await asyncio.sleep(10)
 
 
346
 
347
- if __name__ == "__main__":
348
- print("Starting caption coordinator...")
 
 
 
 
349
  print(f"Source server: {SOURCE_SERVER}")
350
- print(f"Caption servers: {CAPTION_SERVERS}")
351
  print(f"Dataset server: {DATA_COLLECTION_SERVER}")
352
-
353
- asyncio.run(main())
 
 
 
 
 
 
 
3
  import time
4
  import asyncio
5
  import aiohttp
6
+ from typing import Dict, List, Set, Optional
7
  from urllib.parse import quote, urljoin
8
  from datetime import datetime
9
  from pathlib import Path
10
 
11
+ from fastapi import FastAPI, BackgroundTasks, HTTPException, status
12
+ from fastapi.responses import JSONResponse
13
+ from pydantic import BaseModel, Field
14
+ import uvicorn
15
+
16
  # Path for storing caption data
17
  CAPTIONS_DIR = Path("captions_data")
18
  CAPTIONS_DIR.mkdir(exist_ok=True)
 
63
  "https://fred1012-fred1012-ptlc5u.hf.space/analyze",
64
  "https://fred1012-fred1012-u7lh57.hf.space/analyze",
65
  "https://fred1012-fred1012-q8djv1.hf.space/analyze",
66
+ "https://fredalone-fredalone-ozugrp.hf.space/analyze",
67
+ "https://fredalone-fredalone-9brxj2.hf.space/analyze",
68
+ "https://fredalone-fredalone-p8vq9a.hf.space/analyze",
69
+ "https://fredalone-fredalone-vbli2y.hf.space/analyze",
70
+ "https://fredalone-fredalone-uggger.hf.space/analyze",
71
+ "https://fredalone-fredalone-nmi7e8.hf.space/analyze",
72
+ "https://fredalone-fredalone-d1f26d.hf.space/analyze",
73
+ "https://fredalone-fredalone-461jp2.hf.space/analyze",
74
+ "https://fredalone-fredalone-3enfg4.hf.space/analyze",
75
+ "https://fredalone-fredalone-dqdbpv.hf.space/analyze",
76
+ "https://fredalone-fredalone-ivtjua.hf.space/analyze",
77
+ "https://fredalone-fredalone-6bezt2.hf.space/analyze",
78
+ "https://fredalone-fredalone-e0wfnk.hf.space/analyze",
79
+ "https://fredalone-fredalone-zu2t7j.hf.space/analyze",
80
+ "https://fredalone-fredalone-dqtv1o.hf.space/analyze",
81
+ "https://fredalone-fredalone-wclyog.hf.space/analyze",
82
+ "https://fredalone-fredalone-t27vig.hf.space/analyze",
83
+ "https://fredalone-fredalone-gahbxh.hf.space/analyze",
84
+ "https://fredalone-fredalone-kw2po4.hf.space/analyze",
85
+ "https://fredalone-fredalone-8h285h.hf.space/analyze"
86
  ]
87
  MODEL_TYPE = "Florence-2-large" # Explicitly request large model
88
  DATA_COLLECTION_SERVER = "https://fred808-flow.hf.space"
89
 
90
+ # FastAPI Models
91
+ class CourseInfo(BaseModel):
92
+ course_folder: str
93
+
94
+ class ImageInfo(BaseModel):
95
+ filename: str
96
+
97
+ class CaptionRequest(BaseModel):
98
+ image_url: str
99
+ model_choice: str = MODEL_TYPE
100
+
101
+ class CaptionResponse(BaseModel):
102
+ success: bool
103
+ caption: Optional[str] = None
104
+ error: Optional[str] = None
105
+
106
+ class ServerStatus(BaseModel):
107
+ url: str
108
+ model: str
109
+ busy: bool
110
+ total_processed: int
111
+ total_time: float
112
+ fps: float
113
+
114
+ class ProcessingStatus(BaseModel):
115
+ course: str
116
+ total_images: int
117
+ processed_images: int
118
+ progress_percent: float
119
+ status: str
120
+
121
+ class StartProcessingRequest(BaseModel):
122
+ courses: Optional[List[str]] = None # If None, process all courses
123
+ continuous: bool = False
124
+
125
+ # FastAPI App
126
+ app = FastAPI(
127
+ title="Caption Coordinator API",
128
+ description="Distributed caption processing coordinator",
129
+ version="1.0.0"
130
+ )
131
+
132
+ # Global state
133
  processed_images: Dict[str, Set[str]] = {} # {course: set(image_names)}
134
  course_captions: Dict[str, List[Dict]] = {} # {course: [{image, caption, metadata}]}
135
+ servers = []
136
+ is_processing = False
137
+ current_processing_task = None
138
+
139
+ class CaptionServer:
140
+ def __init__(self, url):
141
+ self.url = url
142
+ self.busy = False
143
+ self.model = "unknown"
144
+ self.total_processed = 0
145
+ self.total_time = 0
146
+
147
+ @property
148
+ def fps(self):
149
+ return self.total_processed / self.total_time if self.total_time > 0 else 0
150
+
151
+ # Initialize servers
152
+ def initialize_servers():
153
+ global servers
154
+ servers = [CaptionServer(url) for url in CAPTION_SERVERS]
155
+
156
+ # API Routes
157
+ @app.get("/")
158
+ async def root():
159
+ return {"message": "Caption Coordinator API", "status": "running"}
160
+
161
+ @app.get("/health")
162
+ async def health():
163
+ return {
164
+ "status": "healthy",
165
+ "servers_available": len([s for s in servers if not s.busy]),
166
+ "total_servers": len(servers),
167
+ "is_processing": is_processing
168
+ }
169
+
170
+ @app.get("/courses")
171
+ async def get_courses():
172
+ """Fetch available courses from source server"""
173
+ try:
174
+ async with aiohttp.ClientSession() as session:
175
+ async with session.get(f"{SOURCE_SERVER}/courses") as resp:
176
+ data = await resp.json()
177
+ if isinstance(data, dict) and 'courses' in data:
178
+ return [c['course_folder'] for c in data['courses'] if isinstance(c, dict)]
179
+ return []
180
+ except Exception as e:
181
+ raise HTTPException(status_code=500, detail=f"Error fetching courses: {e}")
182
+
183
+ @app.get("/courses/{course}/images")
184
+ async def get_course_images(course: str):
185
+ """Fetch images list for a course"""
186
+ try:
187
+ course_frames = f"{course}_frames" if not course.endswith("_frames") else course
188
+ url = f"{SOURCE_SERVER}/images/{quote(course_frames)}"
189
+ async with aiohttp.ClientSession() as session:
190
+ async with session.get(url) as resp:
191
+ data = await resp.json()
192
+ if isinstance(data, dict) and 'images' in data:
193
+ return data['images']
194
+ return []
195
+ except Exception as e:
196
+ raise HTTPException(status_code=500, detail=f"Error fetching images: {e}")
197
+
198
+ @app.get("/servers/status")
199
+ async def get_servers_status():
200
+ """Get status of all caption servers"""
201
+ server_statuses = []
202
+ for server in servers:
203
+ server_statuses.append(ServerStatus(
204
+ url=server.url,
205
+ model=server.model,
206
+ busy=server.busy,
207
+ total_processed=server.total_processed,
208
+ total_time=server.total_time,
209
+ fps=server.fps
210
+ ))
211
+ return server_statuses
212
 
213
+ @app.get("/processing/status")
214
+ async def get_processing_status():
215
+ """Get current processing status"""
216
+ status_info = {}
217
+ for course in processed_images:
218
+ total = len(processed_images[course])
219
+ processed = len(course_captions.get(course, []))
220
+ status_info[course] = ProcessingStatus(
221
+ course=course,
222
+ total_images=total,
223
+ processed_images=processed,
224
+ progress_percent=(processed / total * 100) if total > 0 else 0,
225
+ status="processing" if processed < total else "completed"
226
+ )
227
+ return status_info
228
+
229
+ @app.post("/processing/start")
230
+ async def start_processing(request: StartProcessingRequest, background_tasks: BackgroundTasks):
231
+ """Start caption processing"""
232
+ global is_processing, current_processing_task
233
+
234
+ if is_processing:
235
+ raise HTTPException(status_code=400, detail="Processing is already running")
236
+
237
+ is_processing = True
238
+ current_processing_task = asyncio.create_task(processing_loop(request.courses, request.continuous))
239
+
240
+ return {"message": "Processing started", "continuous": request.continuous}
241
+
242
+ @app.post("/processing/stop")
243
+ async def stop_processing():
244
+ """Stop caption processing"""
245
+ global is_processing, current_processing_task
246
+
247
+ if not is_processing:
248
+ raise HTTPException(status_code=400, detail="Processing is not running")
249
+
250
+ is_processing = False
251
+ if current_processing_task:
252
+ current_processing_task.cancel()
253
+ current_processing_task = None
254
+
255
+ return {"message": "Processing stopped"}
256
+
257
+ @app.get("/captions/{course}")
258
+ async def get_captions(course: str):
259
+ """Get captions for a specific course"""
260
+ captions = load_captions_from_file(course)
261
+ return {
262
+ "course": course,
263
+ "total_captions": len(captions),
264
+ "captions": captions
265
+ }
266
+
267
+ @app.delete("/captions/{course}")
268
+ async def delete_captions(course: str):
269
+ """Delete captions for a specific course"""
270
+ try:
271
+ file_path = get_caption_file_path(course)
272
+ if file_path.exists():
273
+ file_path.unlink()
274
+ if course in processed_images:
275
+ del processed_images[course]
276
+ if course in course_captions:
277
+ del course_captions[course]
278
+ return {"message": f"Captions for {course} deleted"}
279
+ else:
280
+ raise HTTPException(status_code=404, detail=f"No captions found for {course}")
281
+ except Exception as e:
282
+ raise HTTPException(status_code=500, detail=f"Error deleting captions: {e}")
283
+
284
+ # Core processing functions (same as original)
285
  async def fetch_courses() -> List[str]:
286
  """Fetch available courses from source server"""
287
  async with aiohttp.ClientSession() as session:
 
333
  print(f"Couldn't get model info from {server}: {e}")
334
  return model_info
335
 
 
 
 
 
 
 
 
 
 
 
 
 
336
  async def process_image(server: CaptionServer, course: str, image: Dict) -> Dict:
337
  """Process single image through one caption server"""
338
  if server.busy:
 
414
  print(f"\nProcessing {len(images)} images for course {course}")
415
  remaining_images = [img for img in images if img['filename'] not in processed_images[course]]
416
 
417
+ while remaining_images and is_processing:
418
  # Create tasks for each available server
419
  tasks = []
420
  for server in servers:
 
454
  course_captions[course].clear()
455
  break
456
 
457
+ async def processing_loop(specific_courses: Optional[List[str]] = None, continuous: bool = False):
458
+ """Main processing loop"""
459
+ global is_processing
 
 
 
 
 
 
 
 
 
 
 
 
 
 
460
 
461
  # Get model information and verify Florence-2-large availability
462
  model_info = await get_model_info()
 
472
 
473
  if not available_servers:
474
  print(f"\nError: No servers with {MODEL_TYPE} available!")
475
+ is_processing = False
476
  return
477
 
478
  # Update servers list to only use those with large model
479
+ processing_servers = available_servers
480
+ print(f"\nUsing {len(processing_servers)} servers with {MODEL_TYPE}")
481
  print()
482
 
483
  start_time = time.time()
484
 
485
+ while is_processing:
486
  try:
487
  # Get available courses
488
+ if specific_courses:
489
+ courses = specific_courses
490
+ else:
491
+ courses = await fetch_courses()
492
+
493
  if not courses:
494
  print("No courses found, waiting...")
495
+ if not continuous:
496
+ break
497
  await asyncio.sleep(10)
498
  continue
499
 
 
501
 
502
  # Process each course with all available servers
503
  for course in courses:
504
+ if not is_processing:
505
+ break
506
+ await process_course(course, processing_servers)
507
 
508
  # Show server stats
509
  print("\nServer Stats:")
510
+ total_processed = sum(s.total_processed for s in processing_servers)
511
  elapsed = time.time() - start_time
512
  if elapsed > 0:
513
  print(f"Total images processed: {total_processed}")
514
  print(f"Overall speed: {total_processed/elapsed:.2f} fps")
515
+ for s in processing_servers:
516
  print(f"- {s.url}: {s.total_processed} images, {s.fps:.2f} fps")
517
  print()
518
 
519
+ if not continuous:
520
+ break
521
+
522
  # Wait before next check
523
  await asyncio.sleep(5)
524
 
525
+ except asyncio.CancelledError:
526
+ print("Processing cancelled")
527
+ break
528
  except Exception as e:
529
+ print(f"Error in processing loop: {e}")
530
  await asyncio.sleep(10)
531
+
532
+ is_processing = False
533
 
534
+ # Startup event
535
+ @app.on_event("startup")
536
+ async def startup_event():
537
+ """Initialize servers on startup"""
538
+ initialize_servers()
539
+ print("Caption Coordinator API started")
540
  print(f"Source server: {SOURCE_SERVER}")
541
+ print(f"Caption servers: {len(CAPTION_SERVERS)}")
542
  print(f"Dataset server: {DATA_COLLECTION_SERVER}")
543
+
544
+ if __name__ == "__main__":
545
+ uvicorn.run(
546
+ "app:app",
547
+ host="0.0.0.0",
548
+ port=8000,
549
+ reload=True
550
+ )