Samfredoly commited on
Commit
cfa5302
·
verified ·
1 Parent(s): c14f762

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +705 -53
app.py CHANGED
@@ -1,61 +1,713 @@
1
- import logging
2
- import re
3
- import requests
4
- from fastapi import FastAPI, HTTPException
5
- from pydantic import BaseModel, EmailStr, Field
6
- import uvicorn
7
-
8
- # =======================
9
- # Configuration Settings
10
- # =======================
11
- SMTP_API_URL = "https://smtp-server-ten.vercel.app/smtp" # Change to your deployed Vercel endpoint
12
-
13
- # =======================
14
- # Logging Configuration
15
- # =======================
16
- logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
17
- logger = logging.getLogger("bot_server")
18
-
19
- # =======================
20
- # Pydantic Model for Email Requests
21
- # =======================
22
- class EmailRequest(BaseModel):
23
- from_: EmailStr = Field(..., alias="from")
24
- to: EmailStr
25
- subject: str
26
- body: str
27
-
28
- # =======================
29
- # FastAPI Bot Application
30
- # =======================
31
- app = FastAPI()
32
-
33
- def send_email_via_api(from_addr: str, to_addr: str, subject: str, body: str) -> bool:
34
- payload = {
35
- "from": from_addr,
36
- "to": to_addr,
37
- "subject": subject,
38
- "body": body,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
39
  }
40
- logger.debug("Calling SMTP API with payload: %s", payload)
 
 
41
  try:
42
- response = requests.post(SMTP_API_URL, json=payload, timeout=10)
43
- response.raise_for_status()
44
- result = response.json()
45
- logger.info("SMTP API response: %s", result)
46
- return result.get("success", False)
47
  except Exception as e:
48
- logger.error("Error calling SMTP API: %s", e)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49
  return False
50
 
51
- @app.post("/send-email")
52
- async def send_email(request_data: EmailRequest):
53
- logger.debug("Received email request: %s", request_data.dict(by_alias=True))
54
- success = send_email_via_api(request_data.from_, request_data.to, request_data.subject, request_data.body)
55
- if success:
56
- return {"status": "success", "message": "Email sent successfully."}
 
 
 
 
 
 
57
  else:
58
- raise HTTPException(status_code=500, detail="Failed to send email.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59
 
60
  if __name__ == "__main__":
61
- uvicorn.run("bot_server:app", host="0.0.0.0", port=5000, reload=True)
 
 
 
1
+ import os
2
+ import json
3
+ import time
4
+ import asyncio
5
+ import aiohttp
6
+ import zipfile
7
+ import shutil
8
+ from typing import Dict, List, Set, Optional, Tuple, Any
9
+ from urllib.parse import quote
10
+ from datetime import datetime
11
+ from pathlib import Path
12
+ import io
13
+
14
+ from fastapi import FastAPI, BackgroundTasks, HTTPException, status
15
+ from pydantic import BaseModel, Field
16
+ from huggingface_hub import HfApi, hf_hub_download
17
+
18
+ # --- Configuration ---
19
+ AUTO_START_INDEX = 376 # Hardcoded default start index if no progress is found
20
+ FLOW_ID = os.getenv("FLOW_ID", "flow_default")
21
+ FLOW_PORT = int(os.getenv("FLOW_PORT", 8001))
22
+ HF_TOKEN = os.getenv("HF_TOKEN", "")
23
+ HF_DATASET_ID = os.getenv("HF_DATASET_ID", "Fred808/BG3") # Source dataset for zip files
24
+ HF_OUTPUT_DATASET_ID = os.getenv("HF_OUTPUT_DATASET_ID", "fred808/helium") # Target dataset for captions
25
+
26
+ # Progress and State Tracking
27
+ PROGRESS_FILE = Path("processing_progress.json")
28
+ HF_STATE_FILE = "processing_state_captions.json" # State file in helium dataset
29
+ LOCAL_STATE_FOLDER = Path(".state") # Local folder for state file
30
+ LOCAL_STATE_FOLDER.mkdir(exist_ok=True)
31
+
32
+ # Directory within the HF dataset where the zip files are located
33
+ ZIP_FILE_PREFIX = "frames_zips/"
34
+
35
+ # Using the full list from the user's original code for actual deployment
36
+ CAPTION_SERVERS = [
37
+ "https://elias80-elen-1.hf.space/analyze",
38
+ "https://elias80-elen-2.hf.space/analyze",
39
+ "https://elias80-elen-3.hf.space/analyze",
40
+ "https://elias80-elen-4.hf.space/analyze",
41
+ "https://elias80-elen-5.hf.space/analyze",
42
+ "https://elias80-elen-6.hf.space/analyze",
43
+ "https://elias80-elen-7.hf.space/analyze",
44
+ "https://elias80-elen-8.hf.space/analyze",
45
+ "https://elias80-elen-9.hf.space/analyze",
46
+ "https://elias80-elen-10.hf.space/analyze",
47
+ "https://elias80-elen-11.hf.space/analyze",
48
+ "https://elias80-elen-12.hf.space/analyze",
49
+ "https://elias80-elen-13.hf.space/analyze",
50
+ "https://elias80-elen-14.hf.space/analyze",
51
+ "https://elias80-elen-15.hf.space/analyze",
52
+ "https://elias80-elen-16.hf.space/analyze",
53
+ "https://elias80-elen-17.hf.space/analyze",
54
+ "https://elias80-elen-18.hf.space/analyze",
55
+ "https://elias80-elen-19.hf.space/analyze",
56
+ "https://elias80-elen-20.hf.space/analyze",
57
+ "https://onewayto-jean-1.hf.space/analyze",
58
+ "https://onewayto-jean-2.hf.space/analyze",
59
+ "https://onewayto-jean-3.hf.space/analyze",
60
+ "https://onewayto-jean-4.hf.space/analyze",
61
+ "https://onewayto-jean-5.hf.space/analyze",
62
+ "https://onewayto-jean-6.hf.space/analyze",
63
+ "https://onewayto-jean-7.hf.space/analyze",
64
+ "https://onewayto-jean-8.hf.space/analyze",
65
+ "https://onewayto-jean-9.hf.space/analyze",
66
+ "https://onewayto-jean-10.hf.space/analyze",
67
+ "https://onewayto-jean-11.hf.space/analyze",
68
+ "https://onewayto-jean-12.hf.space/analyze",
69
+ "https://onewayto-jean-13.hf.space/analyze",
70
+ "https://onewayto-jean-14.hf.space/analyze",
71
+ "https://onewayto-jean-15.hf.space/analyze",
72
+ "https://onewayto-jean-16.hf.space/analyze",
73
+ "https://onewayto-jean-17.hf.space/analyze",
74
+ "https://onewayto-jean-18.hf.space/analyze",
75
+ "https://onewayto-jean-19.hf.space/analyze",
76
+ "https://onewayto-jean-20.hf.space/analyze",
77
+ "https://Elias2211-bam-1.hf.space/analyze",
78
+ "https://Elias2211-bam-2.hf.space/analyze",
79
+ "https://Elias2211-bam-3.hf.space/analyze",
80
+ "https://Elias2211-bam-4.hf.space/analyze",
81
+ "https://Elias2211-bam-5.hf.space/analyze",
82
+ "https://Elias2211-bam-6.hf.space/analyze",
83
+ "https://Elias2211-bam-7.hf.space/analyze",
84
+ "https://Elias2211-bam-8.hf.space/analyze",
85
+ "https://Elias2211-bam-9.hf.space/analyze",
86
+ "https://Elias2211-bam-10.hf.space/analyze",
87
+ "https://Elias2211-bam-11.hf.space/analyze",
88
+ "https://Elias2211-bam-12.hf.space/analyze",
89
+ "https://Elias2211-bam-13.hf.space/analyze",
90
+ "https://Elias2211-bam-14.hf.space/analyze",
91
+ "https://Elias2211-bam-15.hf.space/analyze",
92
+ "https://Elias2211-bam-16.hf.space/analyze",
93
+ "https://Elias2211-bam-17.hf.space/analyze",
94
+ "https://Elias2211-bam-18.hf.space/analyze",
95
+ "https://Elias2211-bam-19.hf.space/analyze",
96
+ "https://Elias2211-bam-20.hf.space/analyze",
97
+ "https://samfred2-isherelike-1.hf.space/analyze",
98
+ "https://samfred2-isherelike-2.hf.space/analyze",
99
+ "https://samfred2-isherelike-3.hf.space/analyze",
100
+ "https://samfred2-isherelike-4.hf.space/analyze",
101
+ "https://samfred2-isherelike-5.hf.space/analyze",
102
+ "https://samfred2-isherelike-6.hf.space/analyze",
103
+ "https://samfred2-isherelike-7.hf.space/analyze",
104
+ "https://samfred2-isherelike-8.hf.space/analyze",
105
+ "https://samfred2-isherelike-9.hf.space/analyze",
106
+ "https://samfred2-isherelike-10.hf.space/analyze",
107
+ "https://samfred2-isherelike-11.hf.space/analyze",
108
+ "https://samfred2-isherelike-12.hf.space/analyze",
109
+ "https://samfred2-isherelike-13.hf.space/analyze",
110
+ "https://samfred2-isherelike-14.hf.space/analyze",
111
+ "https://samfred2-isherelike-15.hf.space/analyze",
112
+ "https://samfred2-isherelike-16.hf.space/analyze",
113
+ "https://samfred2-isherelike-17.hf.space/analyze",
114
+ "https://samfred2-isherelike-18.hf.space/analyze",
115
+ "https://samfred2-isherelike-19.hf.space/analyze",
116
+ "https://Fred800-jam-1.hf.space/analyze",
117
+ "https://Fred800-jam-2.hf.space/analyze",
118
+ "https://Fred800-jam-3.hf.space/analyze",
119
+ "https://Fred800-jam-4.hf.space/analyze",
120
+ "https://Fred800-jam-5.hf.space/analyze",
121
+ "https://Fred800-jam-6.hf.space/analyze",
122
+ "https://Fred800-jam-7.hf.space/analyze",
123
+ "https://Fred800-jam-8.hf.space/analyze",
124
+ "https://Fred800-jam-9.hf.space/analyze",
125
+ "https://Fred800-jam-10.hf.space/analyze",
126
+ "https://Fred800-jam-11.hf.space/analyze",
127
+ "https://Fred800-jam-12.hf.space/analyze",
128
+ "https://Fred800-jam-13.hf.space/analyze",
129
+ "https://Fred800-jam-14.hf.space/analyze",
130
+ "https://Fred800-jam-15.hf.space/analyze",
131
+ "https://Fred800-jam-16.hf.space/analyze",
132
+ "https://Fred800-jam-17.hf.space/analyze",
133
+ "https://Fred800-jam-18.hf.space/analyze",
134
+ "https://Fred800-jam-19.hf.space/analyze",
135
+ "https://Fred800-jam-20.hf.space/analyze"
136
+
137
+ ]
138
+ MODEL_TYPE = "Florence-2-large"
139
+
140
+ # Temporary storage for images
141
+ TEMP_DIR = Path(f"temp_images_{FLOW_ID}")
142
+ TEMP_DIR.mkdir(exist_ok=True)
143
+
144
+ # --- Models ---
145
+ class ProcessStartRequest(BaseModel):
146
+ start_index: int = Field(AUTO_START_INDEX, ge=1, description="The index number of the zip file to start processing from (1-indexed).")
147
+
148
+ class CaptionServer:
149
+ def __init__(self, url):
150
+ self.url = url
151
+ self.busy = False
152
+ self.total_processed = 0
153
+ self.total_time = 0
154
+ self.model = MODEL_TYPE
155
+
156
+ @property
157
+ def fps(self):
158
+ return self.total_processed / self.total_time if self.total_time > 0 else 0
159
+
160
+ # Global state for caption servers
161
+ servers = [CaptionServer(url) for url in CAPTION_SERVERS]
162
+ server_index = 0
163
+
164
+ # --- Progress and State Management Functions ---
165
+
166
+ def load_progress() -> Dict:
167
+ """Loads the local processing progress from the JSON file."""
168
+ if PROGRESS_FILE.exists():
169
+ try:
170
+ with PROGRESS_FILE.open('r') as f:
171
+ return json.load(f)
172
+ except json.JSONDecodeError:
173
+ print(f"[{FLOW_ID}] WARNING: Progress file is corrupted. Starting fresh.")
174
+ # Fall through to return default structure
175
+
176
+ # Default structure
177
+ return {
178
+ "last_processed_index": 0,
179
+ "processed_files": {}, # {index: repo_path}
180
+ "file_list": [] # Full list of all zip files found in the dataset
181
  }
182
+
183
+ def save_progress(progress_data: Dict):
184
+ """Saves the local processing progress to the JSON file."""
185
  try:
186
+ with PROGRESS_FILE.open('w') as f:
187
+ json.dump(progress_data, f, indent=4)
 
 
 
188
  except Exception as e:
189
+ print(f"[{FLOW_ID}] CRITICAL ERROR: Could not save progress to {PROGRESS_FILE}: {e}")
190
+
191
+ def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str, Any]:
192
+ """Load state from JSON file with migration logic for new structure."""
193
+ if os.path.exists(file_path):
194
+ try:
195
+ with open(file_path, "r") as f:
196
+ data = json.load(f)
197
+
198
+ # Migration Logic
199
+ if "file_states" not in data or not isinstance(data["file_states"], dict):
200
+ print(f"[{FLOW_ID}] Initializing 'file_states' dictionary.")
201
+ data["file_states"] = {}
202
+
203
+ if "next_download_index" not in data:
204
+ data["next_download_index"] = 0
205
+
206
+ return data
207
+ except json.JSONDecodeError:
208
+ print(f"[{FLOW_ID}] WARNING: Corrupted state file: {file_path}")
209
+ return default_value
210
+
211
+ def save_json_state(file_path: str, data: Dict[str, Any]):
212
+ """Save state to JSON file"""
213
+ with open(file_path, "w") as f:
214
+ json.dump(data, f, indent=2)
215
+
216
+ async def download_hf_state() -> Dict[str, Any]:
217
+ """Downloads the state file from Hugging Face or returns a default state."""
218
+ local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
219
+ default_state = {"next_download_index": 0, "file_states": {}}
220
+
221
+ try:
222
+ # Check if the file exists in the helium repo
223
+ files = HfApi(token=HF_TOKEN).list_repo_files(
224
+ repo_id=HF_OUTPUT_DATASET_ID,
225
+ repo_type="dataset"
226
+ )
227
+
228
+ if HF_STATE_FILE not in files:
229
+ print(f"[{FLOW_ID}] State file not found in {HF_OUTPUT_DATASET_ID}. Starting fresh.")
230
+ return default_state
231
+
232
+ # Download the file
233
+ hf_hub_download(
234
+ repo_id=HF_OUTPUT_DATASET_ID,
235
+ filename=HF_STATE_FILE,
236
+ repo_type="dataset",
237
+ local_dir=LOCAL_STATE_FOLDER,
238
+ local_dir_use_symlinks=False,
239
+ token=HF_TOKEN
240
+ )
241
+
242
+ print(f"[{FLOW_ID}] Successfully downloaded state file.")
243
+ return load_json_state(str(local_path), default_state)
244
+
245
+ except Exception as e:
246
+ print(f"[{FLOW_ID}] Failed to download state file: {str(e)}. Starting fresh.")
247
+ return default_state
248
+
249
+ async def upload_hf_state(state: Dict[str, Any]) -> bool:
250
+ """Uploads the state file to Hugging Face."""
251
+ local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
252
+
253
+ try:
254
+ # Save state locally first
255
+ save_json_state(str(local_path), state)
256
+
257
+ # Upload to helium dataset
258
+ HfApi(token=HF_TOKEN).upload_file(
259
+ path_or_fileobj=str(local_path),
260
+ path_in_repo=HF_STATE_FILE,
261
+ repo_id=HF_OUTPUT_DATASET_ID,
262
+ repo_type="dataset",
263
+ commit_message=f"Update caption processing state: next_index={state['next_download_index']}"
264
+ )
265
+ print(f"[{FLOW_ID}] Successfully uploaded state file.")
266
+ return True
267
+ except Exception as e:
268
+ print(f"[{FLOW_ID}] Failed to upload state file: {str(e)}")
269
+ return False
270
+
271
+ async def lock_file_for_processing(zip_filename: str, state: Dict[str, Any]) -> bool:
272
+ """Marks a file as 'processing' in the state file and uploads the lock."""
273
+ print(f"[{FLOW_ID}] 🔒 Attempting to lock file: {zip_filename}")
274
+
275
+ # Update state locally
276
+ state["file_states"][zip_filename] = "processing"
277
+
278
+ # Upload the updated state file immediately to establish the lock
279
+ if await upload_hf_state(state):
280
+ print(f"[{FLOW_ID}] ✅ Successfully locked file: {zip_filename}")
281
+ return True
282
+ else:
283
+ print(f"[{FLOW_ID}] ❌ Failed to lock file: {zip_filename}")
284
+ # Revert local state
285
+ if zip_filename in state["file_states"]:
286
+ del state["file_states"][zip_filename]
287
  return False
288
 
289
+ async def unlock_file_as_processed(zip_filename: str, state: Dict[str, Any], next_index: int) -> bool:
290
+ """Marks a file as 'processed', updates the index, and uploads the state."""
291
+ print(f"[{FLOW_ID}] 🔓 Marking file as processed: {zip_filename}")
292
+
293
+ # Update state locally
294
+ state["file_states"][zip_filename] = "processed"
295
+ state["next_download_index"] = next_index
296
+
297
+ # Upload the updated state
298
+ if await upload_hf_state(state):
299
+ print(f"[{FLOW_ID}] ✅ Successfully marked as processed: {zip_filename}")
300
+ return True
301
  else:
302
+ print(f"[{FLOW_ID}] Failed to update state for: {zip_filename}")
303
+ return False
304
+
305
+ # --- Hugging Face Utility Functions ---
306
+
307
+ async def get_zip_file_list(progress_data: Dict) -> List[str]:
308
+ """
309
+ Fetches the list of all zip files from the dataset, or uses the cached list.
310
+ Updates the progress_data with the file list if a new list is fetched.
311
+ """
312
+ if progress_data['file_list']:
313
+ print(f"[{FLOW_ID}] Using cached file list with {len(progress_data['file_list'])} files.")
314
+ return progress_data['file_list']
315
+
316
+ print(f"[{FLOW_ID}] Fetching full list of zip files from {HF_DATASET_ID}...")
317
+ try:
318
+ api = HfApi(token=HF_TOKEN)
319
+ repo_files = api.list_repo_files(
320
+ repo_id=HF_DATASET_ID,
321
+ repo_type="dataset"
322
+ )
323
+
324
+ # Filter for zip files in the specified directory and sort them alphabetically for consistent indexing
325
+ zip_files = sorted([
326
+ f for f in repo_files
327
+ if f.startswith(ZIP_FILE_PREFIX) and f.endswith('.zip')
328
+ ])
329
+
330
+ if not zip_files:
331
+ raise FileNotFoundError(f"No zip files found in '{ZIP_FILE_PREFIX}' directory of dataset '{HF_DATASET_ID}'.")
332
+
333
+ print(f"[{FLOW_ID}] Found {len(zip_files)} zip files.")
334
+
335
+ # Update and save the progress data
336
+ progress_data['file_list'] = zip_files
337
+ save_progress(progress_data)
338
+
339
+ return zip_files
340
+
341
+ except Exception as e:
342
+ print(f"[{FLOW_ID}] Error fetching file list from Hugging Face: {e}")
343
+ return []
344
+
345
+ async def download_and_extract_zip_by_index(file_index: int, repo_file_full_path: str) -> Optional[Path]:
346
+ """Downloads the zip file for the given index and extracts its contents."""
347
+
348
+ # Extract the base name for the extraction directory
349
+ zip_full_name = Path(repo_file_full_path).name
350
+ course_name = zip_full_name.replace('.zip', '') # Use the file name as the course/job name
351
+
352
+ print(f"[{FLOW_ID}] Processing file #{file_index}: {repo_file_full_path}. Full name: {zip_full_name}")
353
+
354
+ try:
355
+ # Use hf_hub_download to get the file path
356
+ zip_path = hf_hub_download(
357
+ repo_id=HF_DATASET_ID,
358
+ filename=repo_file_full_path, # Use the full path in the repo
359
+ repo_type="dataset",
360
+ token=HF_TOKEN,
361
+ )
362
+
363
+ print(f"[{FLOW_ID}] Downloaded to {zip_path}. Extracting...")
364
+
365
+ # Create a temporary directory for extraction
366
+ extract_dir = TEMP_DIR / course_name
367
+ # Ensure a clean directory for extraction
368
+ if extract_dir.exists():
369
+ shutil.rmtree(extract_dir)
370
+ extract_dir.mkdir(exist_ok=True)
371
+
372
+ with zipfile.ZipFile(zip_path, 'r') as zip_ref:
373
+ zip_ref.extractall(extract_dir)
374
+
375
+ print(f"[{FLOW_ID}] Extraction complete to {extract_dir}.")
376
+
377
+ # Clean up the downloaded zip file to save space
378
+ os.remove(zip_path)
379
+
380
+ return extract_dir
381
+
382
+ except Exception as e:
383
+ print(f"[{FLOW_ID}] Error downloading or extracting zip for {repo_file_full_path}: {e}")
384
+ return None
385
+
386
+ async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> bool:
387
+ """Uploads the final captions JSON file to the output dataset."""
388
+ # Use the full zip name, replacing the extension with .json
389
+ caption_filename = Path(zip_full_name).with_suffix('.json').name
390
+
391
+ try:
392
+ print(f"[{FLOW_ID}] Uploading {len(captions)} captions for {zip_full_name} as {caption_filename} to {HF_OUTPUT_DATASET_ID}...")
393
+
394
+ # Create JSON content in memory
395
+ json_content = json.dumps(captions, indent=2, ensure_ascii=False).encode('utf-8')
396
+
397
+ api = HfApi(token=HF_TOKEN)
398
+ api.upload_file(
399
+ path_or_fileobj=io.BytesIO(json_content),
400
+ path_in_repo=caption_filename,
401
+ repo_id=HF_OUTPUT_DATASET_ID,
402
+ repo_type="dataset",
403
+ commit_message=f"[{FLOW_ID}] Captions for {zip_full_name}"
404
+ )
405
+
406
+ print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
407
+ return True
408
+
409
+ except Exception as e:
410
+ print(f"[{FLOW_ID}] Error uploading captions for {zip_full_name}: {e}")
411
+ return False
412
+
413
+ # --- Core Processing Functions (Modified) ---
414
+
415
+ async def get_available_server(timeout: float = 300.0) -> CaptionServer:
416
+ """Round-robin selection of an available caption server."""
417
+ global server_index
418
+ start_time = time.time()
419
+ while True:
420
+ # Round-robin check for an available server
421
+ for _ in range(len(servers)):
422
+ server = servers[server_index]
423
+ server_index = (server_index + 1) % len(servers)
424
+ if not server.busy:
425
+ return server
426
+
427
+ # If all servers are busy, wait for a short period and check again
428
+ await asyncio.sleep(0.5)
429
+
430
+ # Check if timeout has been reached
431
+ if time.time() - start_time > timeout:
432
+ raise TimeoutError(f"Timeout ({timeout}s) waiting for an available caption server.")
433
+
434
+ async def send_image_for_captioning(image_path: Path, course_name: str, progress_tracker: Dict) -> Optional[Dict]:
435
+ """Sends a single image to a caption server for processing."""
436
+ # This function now handles server selection and retries internally
437
+ MAX_RETRIES = 3
438
+ for attempt in range(MAX_RETRIES):
439
+ server = None
440
+ try:
441
+ # 1. Get an available server (will wait if all are busy, with a timeout)
442
+ server = await get_available_server()
443
+ server.busy = True
444
+ start_time = time.time()
445
+
446
+ # Print a less verbose message only on the first attempt
447
+ if attempt == 0:
448
+ print(f"[{FLOW_ID}] Starting attempt on {image_path.name}...")
449
+
450
+ # 2. Prepare request data
451
+ form_data = aiohttp.FormData()
452
+ form_data.add_field('file',
453
+ image_path.open('rb'),
454
+ filename=image_path.name,
455
+ content_type='image/jpeg')
456
+ form_data.add_field('model_choice', MODEL_TYPE)
457
+
458
+ # 3. Send request
459
+ async with aiohttp.ClientSession() as session:
460
+ # Increased timeout to 10 minutes (600s) as requested by user's problem description
461
+ async with session.post(server.url, data=form_data, timeout=600) as resp:
462
+ if resp.status == 200:
463
+ result = await resp.json()
464
+ caption = result.get("caption")
465
+
466
+ if caption:
467
+ # Update progress counter
468
+ progress_tracker['completed'] += 1
469
+ if progress_tracker['completed'] % 50 == 0:
470
+ print(f"[{FLOW_ID}] PROGRESS: {progress_tracker['completed']}/{progress_tracker['total']} captions completed.")
471
+
472
+ # Log success only if it's not a progress report interval
473
+ if progress_tracker['completed'] % 50 != 0:
474
+ print(f"[{FLOW_ID}] Success: {image_path.name} captioned by {server.url}")
475
+
476
+ return {
477
+ "course": course_name,
478
+ "image_path": image_path.name,
479
+ "caption": caption,
480
+ "timestamp": datetime.now().isoformat()
481
+ }
482
+ else:
483
+ print(f"[{FLOW_ID}] Server {server.url} returned success but no caption for {image_path.name}. Retrying...")
484
+ continue # Retry with a different server
485
+ else:
486
+ error_text = await resp.text()
487
+ print(f"[{FLOW_ID}] Error from server {server.url} for {image_path.name}: {resp.status} - {error_text}. Retrying...")
488
+ continue # Retry with a different server
489
+
490
+ except (aiohttp.ClientError, asyncio.TimeoutError, TimeoutError) as e:
491
+ print(f"[{FLOW_ID}] Connection/Timeout error for {image_path.name} on {server.url if server else 'unknown server'}: {e}. Retrying...")
492
+ continue # Retry with a different server
493
+ except Exception as e:
494
+ print(f"[{FLOW_ID}] Unexpected error during captioning for {image_path.name}: {e}. Retrying...")
495
+ continue # Retry with a different server
496
+ finally:
497
+ if server:
498
+ end_time = time.time()
499
+ server.busy = False
500
+ server.total_processed += 1
501
+ server.total_time += (end_time - start_time)
502
+
503
+ print(f"[{FLOW_ID}] FAILED after {MAX_RETRIES} attempts for {image_path.name}.")
504
+ return None
505
+
506
+ async def process_dataset_task(start_index: int):
507
+ """Main task to process the dataset sequentially starting from a given index."""
508
+
509
+ # Load both local progress and HF state
510
+ progress = load_progress()
511
+ current_state = await download_hf_state()
512
+ file_list = await get_zip_file_list(progress)
513
+
514
+ if not file_list:
515
+ print(f"[{FLOW_ID}] ERROR: Cannot proceed. File list is empty.")
516
+ return False
517
+
518
+ # Ensure start_index is within bounds
519
+ if start_index > len(file_list):
520
+ print(f"[{FLOW_ID}] WARNING: Start index {start_index} is greater than the total number of files ({len(file_list)}). Exiting.")
521
+ return True
522
+
523
+ # Determine the actual starting index in the 0-indexed list
524
+ start_list_index = start_index - 1
525
+
526
+ print(f"[{FLOW_ID}] Starting dataset processing from file index: {start_index} out of {len(file_list)}.")
527
+
528
+ global_success = True
529
+
530
+ for i in range(start_list_index, len(file_list)):
531
+ file_index = i + 1 # 1-indexed for user display and progress tracking
532
+ repo_file_full_path = file_list[i]
533
+ zip_full_name = Path(repo_file_full_path).name
534
+ course_name = zip_full_name.replace('.zip', '') # Use the file name as the course/job name
535
+
536
+ # Check file state in both local and HF state
537
+ file_state = current_state["file_states"].get(zip_full_name)
538
+ if file_state == "processed":
539
+ print(f"[{FLOW_ID}] Skipping {zip_full_name}: Already processed in global state.")
540
+ continue
541
+ elif file_state == "processing":
542
+ print(f"[{FLOW_ID}] Skipping {zip_full_name}: Currently being processed by another worker.")
543
+ continue
544
+
545
+ # Try to lock the file
546
+ if not await lock_file_for_processing(zip_full_name, current_state):
547
+ print(f"[{FLOW_ID}] Failed to lock {zip_full_name}. Skipping.")
548
+ continue
549
+
550
+ extract_dir = None
551
+ current_file_success = False
552
+
553
+ try:
554
+ # 1. Download and Extract
555
+ extract_dir = await download_and_extract_zip_by_index(file_index, repo_file_full_path)
556
+
557
+ if not extract_dir:
558
+ raise Exception("Failed to download or extract zip file.")
559
+
560
+ # 2. Find Images
561
+ # Use recursive glob to find images in subdirectories
562
+ image_paths = [p for p in extract_dir.glob("**/*") if p.is_file() and p.suffix.lower() in ['.jpg', '.jpeg', '.png']]
563
+ print(f"[{FLOW_ID}] Found {len(image_paths)} images to process in {zip_full_name}.")
564
+
565
+ if not image_paths:
566
+ print(f"[{FLOW_ID}] No images found in {zip_full_name}. Marking as complete.")
567
+ current_file_success = True
568
+ else:
569
+ # 3. Process Images (Captioning)
570
+ progress_tracker = {
571
+ 'total': len(image_paths),
572
+ 'completed': 0
573
+ }
574
+ print(f"[{FLOW_ID}] Starting captioning for {progress_tracker['total']} images in {zip_full_name}...")
575
+
576
+ # Create a semaphore to limit concurrent tasks to the number of available servers
577
+ semaphore = asyncio.Semaphore(len(servers))
578
+
579
+ async def limited_send_image_for_captioning(image_path, course_name, progress_tracker):
580
+ async with semaphore:
581
+ return await send_image_for_captioning(image_path, course_name, progress_tracker)
582
+
583
+ # Create a list of tasks for parallel captioning
584
+ caption_tasks = [limited_send_image_for_captioning(p, course_name, progress_tracker) for p in image_paths]
585
+
586
+ # Run all captioning tasks concurrently
587
+ results = await asyncio.gather(*caption_tasks)
588
+
589
+ # Filter out failed results
590
+ all_captions = [r for r in results if r is not None]
591
+
592
+ # Final progress report for the current file
593
+ if len(all_captions) == len(image_paths):
594
+ print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Successfully completed all {len(all_captions)} captions.")
595
+ else:
596
+ print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Completed with partial result: {len(all_captions)}/{len(image_paths)} captions.")
597
+
598
+ # Consider the file successful if we have any captions
599
+ current_file_success = len(all_captions) > 0
600
+
601
+ # 4. Upload Results
602
+ if all_captions:
603
+ print(f"[{FLOW_ID}] Uploading {len(all_captions)} captions for {zip_full_name}...")
604
+ if await upload_captions_to_hf(zip_full_name, all_captions):
605
+ print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
606
+ # Keep current_file_success as True since we have captions and successfully uploaded them
607
+ current_file_success = True
608
+ else:
609
+ print(f"[{FLOW_ID}] Failed to upload captions for {zip_full_name}.")
610
+ current_file_success = False
611
+ else:
612
+ print(f"[{FLOW_ID}] No captions generated. Skipping upload for {zip_full_name}.")
613
+ current_file_success = False
614
+
615
+ except Exception as e:
616
+ print(f"[{FLOW_ID}] Critical error in process_dataset_task for file #{file_index} ({zip_full_name}): {e}")
617
+ current_file_success = False
618
+ global_success = False # Mark overall task as failed if any file fails critically
619
+
620
+ finally:
621
+ # 5. Cleanup and Update Progress
622
+ if extract_dir and extract_dir.exists():
623
+ print(f"[{FLOW_ID}] Cleaned up temporary directory {extract_dir}.")
624
+ shutil.rmtree(extract_dir, ignore_errors=True)
625
+
626
+ if current_file_success:
627
+ # Update both local progress and HF state
628
+ progress['last_processed_index'] = file_index
629
+ progress['processed_files'][str(file_index)] = repo_file_full_path
630
+ save_progress(progress)
631
+
632
+ # Update HF state and unlock the file
633
+ if await unlock_file_as_processed(zip_full_name, current_state, file_index + 1):
634
+ print(f"[{FLOW_ID}] Progress saved and file unlocked: {zip_full_name}")
635
+ else:
636
+ print(f"[{FLOW_ID}] Warning: File processed but state update failed: {zip_full_name}")
637
+ else:
638
+ # Mark as failed in the state and continue with next file
639
+ current_state["file_states"][zip_full_name] = "failed"
640
+ await upload_hf_state(current_state)
641
+ print(f"[{FLOW_ID}] File {zip_full_name} marked as failed. Continuing with next file.")
642
+ global_success = False
643
+
644
+ print(f"[{FLOW_ID}] All processing loops complete. Overall success: {global_success}")
645
+ return global_success
646
+
647
+ # --- FastAPI App and Endpoints ---
648
+
649
+ app = FastAPI(
650
+ title=f"Flow Server {FLOW_ID} API",
651
+ description="Sequentially processes zip files from a dataset, captions images, and tracks progress.",
652
+ version="1.0.0"
653
+ )
654
+
655
+ @app.on_event("startup")
656
+ async def startup_event():
657
+ print(f"Flow Server {FLOW_ID} started on port {FLOW_PORT}.")
658
+
659
+ # Get both local progress and HF state
660
+ progress = load_progress()
661
+ current_state = await download_hf_state()
662
+
663
+ # Get the next_download_index from HF state if available
664
+ hf_next_index = current_state.get("next_download_index", 0)
665
+
666
+ # If HF state has a higher index, use that instead of local progress
667
+ if hf_next_index > 0:
668
+ start_index = hf_next_index
669
+ print(f"[{FLOW_ID}] Using next_download_index from HF state: {start_index}")
670
+ else:
671
+ # Fall back to local progress if HF state doesn't have a meaningful index
672
+ start_index = progress.get('last_processed_index', 0) + 1
673
+ if start_index < AUTO_START_INDEX:
674
+ start_index = AUTO_START_INDEX
675
+
676
+ # Use a dummy BackgroundTasks object for the startup task
677
+ # Note: FastAPI's startup events can't directly use BackgroundTasks, but we can use asyncio.create_task
678
+ # to run the long-running process in the background without blocking the server startup.
679
+ print(f"[{FLOW_ID}] Auto-starting processing from index: {start_index}...")
680
+ asyncio.create_task(process_dataset_task(start_index))
681
+
682
+ @app.get("/")
683
+ async def root():
684
+ progress = load_progress()
685
+ return {
686
+ "flow_id": FLOW_ID,
687
+ "status": "ready",
688
+ "last_processed_index": progress['last_processed_index'],
689
+ "total_files_in_list": len(progress['file_list']),
690
+ "processed_files_count": len(progress['processed_files']),
691
+ "total_servers": len(servers),
692
+ "busy_servers": sum(1 for s in servers if s.busy),
693
+ }
694
+
695
+ @app.post("/start_processing")
696
+ async def start_processing(request: ProcessStartRequest, background_tasks: BackgroundTasks):
697
+ """
698
+ Starts the sequential processing of zip files from the given index in the background.
699
+ """
700
+ start_index = request.start_index
701
+
702
+ print(f"[{FLOW_ID}] Received request to start processing from index: {start_index}. Starting background task.")
703
+
704
+ # Start the heavy processing in a background task so the API call returns immediately
705
+ # Note: The server is already auto-starting, but this allows for manual restart/override.
706
+ background_tasks.add_task(process_dataset_task, start_index)
707
+
708
+ return {"status": "processing", "start_index": start_index, "message": "Dataset processing started in background."}
709
 
710
  if __name__ == "__main__":
711
+ import uvicorn
712
+ # Note: When running in the sandbox, we need to use 0.0.0.0 to expose the port.
713
+ uvicorn.run(app, host="0.0.0.0", port=FLOW_PORT)