yukee1992 commited on
Commit
e24fe54
Β·
verified Β·
1 Parent(s): aca7a8d

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +132 -204
app.py CHANGED
@@ -1,27 +1,19 @@
1
  import os
2
- import json
3
  import uuid
 
 
 
4
  from datetime import datetime
5
  from typing import Dict, List, Optional
6
- from fastapi import FastAPI, HTTPException, BackgroundTasks
7
  from fastapi.middleware.cors import CORSMiddleware
8
  from pydantic import BaseModel
9
  import requests
10
- from supabase import create_client
11
- import threading
12
- import time
13
-
14
- # =============================================
15
- # CONFIGURATION
16
- # =============================================
17
- SUPABASE_URL = os.environ.get("SUPABASE_URL")
18
- SUPABASE_KEY = os.environ.get("SUPABASE_KEY")
19
- SUPABASE = create_client(SUPABASE_URL, SUPABASE_KEY) if SUPABASE_URL and SUPABASE_KEY else None
20
 
21
  print("=" * 60)
22
  print("πŸš€ STARTING STATUS TRACKER")
23
  print("=" * 60)
24
- print(f"Supabase connected: {SUPABASE is not None}")
25
 
26
  # =============================================
27
  # DATA MODELS
@@ -30,53 +22,44 @@ print(f"Supabase connected: {SUPABASE is not None}")
30
  class ServiceConfig(BaseModel):
31
  """Configuration for a service to track"""
32
  name: str
33
- type: str # 'tts', 'image', 'video', etc.
34
- expected_count: int = 1 # How many items expected from this service
35
- timeout_minutes: int = 10
36
- webhook_url: Optional[str] = None
 
 
 
 
 
37
 
38
  class StatusUpdate(BaseModel):
39
  """Status update from a service"""
40
  project_id: str
41
- service_type: str # 'tts', 'image', etc.
42
- status: str # 'started', 'processing', 'completed', 'failed'
43
  file_urls: Optional[List[str]] = None
44
- file_count: Optional[int] = 1
45
  error: Optional[str] = None
46
- metadata: Optional[Dict] = None
47
- timestamp: str = datetime.now().isoformat()
48
 
49
- class ProjectStatus(BaseModel):
50
- """Track status of a complete project"""
51
- project_id: str
52
- created_at: str
53
- services: Dict[str, ServiceStatus]
54
- n8n_webhook: Optional[str]
55
- all_completed: bool = False
56
- completed_at: Optional[str] = None
57
- final_data: Dict = {}
58
-
59
- class ServiceStatus(BaseModel):
60
- """Status of a single service"""
61
- type: str
62
- status: str # 'pending', 'processing', 'completed', 'failed'
63
- file_urls: List[str] = []
64
- file_count: int = 0
65
- started_at: str
66
- completed_at: Optional[str] = None
67
- error: Optional[str] = None
68
- metadata: Dict = {}
69
 
70
  # =============================================
71
- # STORAGE (In-memory + Supabase)
72
  # =============================================
73
 
74
- projects = {} # project_id -> ProjectStatus
75
- services_config = {} # service_type -> ServiceConfig
76
 
77
  # Load service configurations from environment
78
  def load_service_configs():
79
- """Load service configurations from environment variables"""
80
  config_str = os.environ.get("SERVICE_CONFIGS", "[]")
81
  try:
82
  configs = json.loads(config_str)
@@ -85,9 +68,9 @@ def load_service_configs():
85
  print(f"βœ… Loaded {len(configs)} service configurations")
86
  except Exception as e:
87
  print(f"❌ Failed to load service configs: {e}")
88
- # Default configs if none provided
89
- services_config["tts"] = ServiceConfig(name="TTS Service", type="tts", expected_count=1)
90
- services_config["image"] = ServiceConfig(name="Image Generator", type="image", expected_count=4)
91
 
92
  load_service_configs()
93
 
@@ -109,137 +92,101 @@ app.add_middleware(
109
  # API ENDPOINTS
110
  # =============================================
111
 
112
- @app.post("/api/register-project")
113
- async def register_project(request: dict):
114
- """Register a new project to track"""
115
  try:
116
- project_id = request.get("project_id") or str(uuid.uuid4())
117
- n8n_webhook = request.get("n8n_webhook")
118
- services = request.get("services", list(services_config.keys()))
119
 
120
- print(f"πŸ“ Registering project: {project_id}")
121
- print(f" Services to track: {services}")
 
 
122
 
123
- # Create project status
124
- project = ProjectStatus(
125
  project_id=project_id,
126
- created_at=datetime.now().isoformat(),
127
- services={},
128
- n8n_webhook=n8n_webhook
129
  )
130
 
131
- # Initialize each service
132
- for service_type in services:
133
- config = services_config.get(service_type)
134
- if config:
135
- project.services[service_type] = ServiceStatus(
136
- type=service_type,
137
- status="pending",
138
- file_urls=[],
139
- file_count=0,
140
- started_at=datetime.now().isoformat()
141
- )
142
-
143
- projects[project_id] = project
144
-
145
- # Also save to Supabase if available
146
- if SUPABASE:
147
- try:
148
- SUPABASE.table("tracked_projects").insert({
149
- "project_id": project_id,
150
- "data": project.dict(),
151
- "created_at": datetime.now().isoformat()
152
- }).execute()
153
- except Exception as e:
154
- print(f"⚠️ Failed to save to Supabase: {e}")
155
-
156
  return {
157
  "status": "registered",
158
  "project_id": project_id,
159
- "services": list(project.services.keys())
 
160
  }
161
 
162
  except Exception as e:
163
  print(f"❌ Registration error: {e}")
164
  raise HTTPException(status_code=500, detail=str(e))
165
 
166
- @app.post("/api/update-status")
167
  async def update_status(update: StatusUpdate):
168
  """Receive status update from a service"""
169
  try:
170
  project_id = update.project_id
171
  service_type = update.service_type
172
 
173
- print(f"πŸ“₯ Status update for {project_id} - {service_type}: {update.status}")
 
 
 
174
 
 
175
  if project_id not in projects:
176
- # Try to load from Supabase
177
- if SUPABASE:
178
- result = SUPABASE.table("tracked_projects").select("*").eq("project_id", project_id).execute()
179
- if result.data:
180
- projects[project_id] = ProjectStatus(**result.data[0]["data"])
181
- else:
182
- return {"error": "Project not found"}
183
- else:
184
- return {"error": "Project not found"}
185
 
186
  project = projects[project_id]
 
187
 
188
  # Update service status
189
  if service_type in project.services:
190
- service = project.services[service_type]
191
- service.status = update.status
192
  if update.file_urls:
193
- service.file_urls.extend(update.file_urls)
194
- if update.file_count:
195
- service.file_count = update.file_count
196
  if update.error:
197
- service.error = update.error
198
- if update.metadata:
199
- service.metadata.update(update.metadata)
200
-
201
- if update.status == "completed":
202
- service.completed_at = datetime.now().isoformat()
203
- else:
204
- # New service not in original registration
205
- config = services_config.get(service_type)
206
- if config:
207
- project.services[service_type] = ServiceStatus(
208
- type=service_type,
209
- status=update.status,
210
- file_urls=update.file_urls or [],
211
- file_count=update.file_count or 0,
212
- started_at=datetime.now().isoformat(),
213
- completed_at=datetime.now().isoformat() if update.status == "completed" else None
214
- )
215
 
216
  # Check if all services are complete
217
  all_completed = all(
218
- s.status == "completed" for s in project.services.values()
219
  )
220
 
221
- if all_completed and not project.all_completed:
222
- project.all_completed = True
223
- project.completed_at = datetime.now().isoformat()
224
 
225
  # Prepare final data
226
- for service_type, service in project.services.items():
227
- project.final_data[service_type] = {
228
- "file_urls": service.file_urls,
229
- "file_count": service.file_count,
230
- "metadata": service.metadata
 
 
 
 
 
 
 
231
  }
232
 
233
- # Send webhook to n8n if configured
234
- if project.n8n_webhook:
235
- await send_n8n_webhook(project)
236
-
237
- # Update Supabase
238
- if SUPABASE:
239
- SUPABASE.table("tracked_projects").update({
240
- "data": project.dict(),
241
- "updated_at": datetime.now().isoformat()
242
- }).eq("project_id", project_id).execute()
 
 
 
 
243
 
244
  return {
245
  "status": "updated",
@@ -251,95 +198,76 @@ async def update_status(update: StatusUpdate):
251
  print(f"❌ Update error: {e}")
252
  raise HTTPException(status_code=500, detail=str(e))
253
 
254
- @app.get("/api/project-status/{project_id}")
255
- async def get_project_status(project_id: str):
256
  """Get current status of a project"""
257
- if project_id in projects:
258
- return projects[project_id].dict()
259
 
260
- # Try Supabase
261
- if SUPABASE:
262
- result = SUPABASE.table("tracked_projects").select("*").eq("project_id", project_id).execute()
263
- if result.data:
264
- return result.data[0]["data"]
265
 
266
- raise HTTPException(status_code=404, detail="Project not found")
 
 
 
 
 
 
267
 
268
  @app.get("/api/services")
269
  async def list_services():
270
  """List all configured services"""
271
  return {
272
  "services": [
273
- {
274
- "type": stype,
275
- "config": config.dict()
276
- }
277
  for stype, config in services_config.items()
278
  ]
279
  }
280
 
281
- @app.post("/api/services/add")
282
- async def add_service(config: ServiceConfig):
283
- """Add a new service to track"""
284
- services_config[config.type] = config
285
- return {"status": "added", "service": config.dict()}
286
-
287
  @app.get("/health")
288
  async def health():
 
289
  return {
290
  "status": "healthy",
291
- "projects_tracking": len(projects),
292
- "services_configured": list(services_config.keys())
 
293
  }
294
 
295
- # =============================================
296
- # HELPER FUNCTIONS
297
- # =============================================
298
-
299
- async def send_n8n_webhook(project: ProjectStatus):
300
- """Send completion webhook to n8n"""
301
- try:
302
- webhook_data = {
303
- "project_id": project.project_id,
304
- "status": "completed",
305
- "completed_at": project.completed_at,
306
- "services": {}
307
- }
308
-
309
- for service_type, service in project.services.items():
310
- webhook_data["services"][service_type] = {
311
- "file_urls": service.file_urls,
312
- "file_count": service.file_count,
313
- "metadata": service.metadata,
314
- "completed_at": service.completed_at
315
- }
316
-
317
- response = requests.post(
318
- project.n8n_webhook,
319
- json=webhook_data,
320
- timeout=10,
321
- headers={"Content-Type": "application/json"}
322
- )
323
-
324
- print(f"πŸ“€ Webhook sent to n8n: {response.status_code}")
325
-
326
- except Exception as e:
327
- print(f"❌ Failed to send webhook: {e}")
328
 
329
  # =============================================
330
- # BACKGROUND CLEANUP TASK
331
  # =============================================
332
 
333
  def cleanup_old_projects():
334
- """Remove projects older than 24 hours"""
335
  while True:
336
- time.sleep(3600) # Check every hour
337
  now = datetime.now()
338
  to_delete = []
339
 
340
  for pid, project in projects.items():
341
- created = datetime.fromisoformat(project.created_at)
342
- if (now - created).total_seconds() > 86400: # 24 hours
 
 
 
343
  to_delete.append(pid)
344
 
345
  for pid in to_delete:
@@ -354,10 +282,10 @@ cleanup_thread.start()
354
  # RUN
355
  # =============================================
356
  if __name__ == "__main__":
357
- import uvicorn
358
  print("\n" + "=" * 60)
359
  print("🌐 Status Tracker running on port 7860")
360
- print(f"πŸ“Š Tracking {len(projects)} projects")
361
- print(f"πŸ”§ Configured services: {list(services_config.keys())}")
362
  print("=" * 60)
 
363
  uvicorn.run(app, host="0.0.0.0", port=7860)
 
1
  import os
 
2
  import uuid
3
+ import json
4
+ import time
5
+ import threading
6
  from datetime import datetime
7
  from typing import Dict, List, Optional
8
+ from fastapi import FastAPI, HTTPException
9
  from fastapi.middleware.cors import CORSMiddleware
10
  from pydantic import BaseModel
11
  import requests
12
+ import uvicorn
 
 
 
 
 
 
 
 
 
13
 
14
  print("=" * 60)
15
  print("πŸš€ STARTING STATUS TRACKER")
16
  print("=" * 60)
 
17
 
18
  # =============================================
19
  # DATA MODELS
 
22
  class ServiceConfig(BaseModel):
23
  """Configuration for a service to track"""
24
  name: str
25
+ type: str
26
+ expected_count: int = 1
27
+ timeout_minutes: int = 30
28
+
29
+ class RegisterRequest(BaseModel):
30
+ """Request to register a new project"""
31
+ project_id: Optional[str] = None
32
+ n8n_webhook: str
33
+ services: List[str]
34
 
35
  class StatusUpdate(BaseModel):
36
  """Status update from a service"""
37
  project_id: str
38
+ service_type: str
39
+ status: str # 'processing', 'completed', 'failed'
40
  file_urls: Optional[List[str]] = None
 
41
  error: Optional[str] = None
 
 
42
 
43
+ class Project:
44
+ """Track a single project"""
45
+ def __init__(self, project_id, n8n_webhook, services):
46
+ self.project_id = project_id
47
+ self.n8n_webhook = n8n_webhook
48
+ self.services = {s: {"status": "pending", "file_urls": []} for s in services}
49
+ self.created_at = datetime.now()
50
+ self.completed_at = None
51
+ self.last_update = datetime.now()
 
 
 
 
 
 
 
 
 
 
 
52
 
53
  # =============================================
54
+ # STORAGE (In-memory only - no database needed)
55
  # =============================================
56
 
57
+ projects = {} # project_id -> Project
58
+ services_config = {} # Load from environment
59
 
60
  # Load service configurations from environment
61
  def load_service_configs():
62
+ """Load service configurations from environment variable"""
63
  config_str = os.environ.get("SERVICE_CONFIGS", "[]")
64
  try:
65
  configs = json.loads(config_str)
 
68
  print(f"βœ… Loaded {len(configs)} service configurations")
69
  except Exception as e:
70
  print(f"❌ Failed to load service configs: {e}")
71
+ # Default configs
72
+ services_config["tts"] = ServiceConfig(name="TTS", type="tts", timeout_minutes=30)
73
+ services_config["image"] = ServiceConfig(name="Image", type="image", timeout_minutes=30)
74
 
75
  load_service_configs()
76
 
 
92
  # API ENDPOINTS
93
  # =============================================
94
 
95
+ @app.post("/api/register")
96
+ async def register_project(request: RegisterRequest):
97
+ """Register a new project with its own webhook URL"""
98
  try:
99
+ project_id = request.project_id or str(uuid.uuid4())[:8]
 
 
100
 
101
+ print(f"\nπŸ“ Registering new project:")
102
+ print(f" Project ID: {project_id}")
103
+ print(f" Webhook: {request.n8n_webhook}")
104
+ print(f" Services: {request.services}")
105
 
106
+ # Create new project
107
+ projects[project_id] = Project(
108
  project_id=project_id,
109
+ n8n_webhook=request.n8n_webhook,
110
+ services=request.services
 
111
  )
112
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
113
  return {
114
  "status": "registered",
115
  "project_id": project_id,
116
+ "webhook": request.n8n_webhook,
117
+ "services": request.services
118
  }
119
 
120
  except Exception as e:
121
  print(f"❌ Registration error: {e}")
122
  raise HTTPException(status_code=500, detail=str(e))
123
 
124
+ @app.post("/api/update")
125
  async def update_status(update: StatusUpdate):
126
  """Receive status update from a service"""
127
  try:
128
  project_id = update.project_id
129
  service_type = update.service_type
130
 
131
+ print(f"\nπŸ“₯ Status update:")
132
+ print(f" Project: {project_id}")
133
+ print(f" Service: {service_type}")
134
+ print(f" Status: {update.status}")
135
 
136
+ # Check if project exists
137
  if project_id not in projects:
138
+ print(f"❌ Project not found: {project_id}")
139
+ return {"error": "Project not found"}
 
 
 
 
 
 
 
140
 
141
  project = projects[project_id]
142
+ project.last_update = datetime.now()
143
 
144
  # Update service status
145
  if service_type in project.services:
146
+ project.services[service_type]["status"] = update.status
 
147
  if update.file_urls:
148
+ project.services[service_type]["file_urls"] = update.file_urls
 
 
149
  if update.error:
150
+ project.services[service_type]["error"] = update.error
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
151
 
152
  # Check if all services are complete
153
  all_completed = all(
154
+ s["status"] == "completed" for s in project.services.values()
155
  )
156
 
157
+ if all_completed and not project.completed_at:
158
+ project.completed_at = datetime.now()
159
+ print(f"πŸŽ‰ Project {project_id} COMPLETED!")
160
 
161
  # Prepare final data
162
+ final_data = {
163
+ "project_id": project_id,
164
+ "status": "completed",
165
+ "completed_at": project.completed_at.isoformat(),
166
+ "services": {}
167
+ }
168
+
169
+ for s_type, s_data in project.services.items():
170
+ final_data["services"][s_type] = {
171
+ "file_urls": s_data.get("file_urls", []),
172
+ "file_count": len(s_data.get("file_urls", [])),
173
+ "status": s_data["status"]
174
  }
175
 
176
+ # Send webhook to n8n
177
+ try:
178
+ webhook_response = requests.post(
179
+ project.n8n_webhook,
180
+ json=final_data,
181
+ timeout=10,
182
+ headers={"Content-Type": "application/json"}
183
+ )
184
+ print(f"πŸ“€ Webhook sent to n8n: {webhook_response.status_code}")
185
+ except Exception as e:
186
+ print(f"❌ Failed to send webhook: {e}")
187
+
188
+ # Clean up old project (optional)
189
+ # del projects[project_id]
190
 
191
  return {
192
  "status": "updated",
 
198
  print(f"❌ Update error: {e}")
199
  raise HTTPException(status_code=500, detail=str(e))
200
 
201
+ @app.get("/api/status/{project_id}")
202
+ async def get_status(project_id: str):
203
  """Get current status of a project"""
204
+ if project_id not in projects:
205
+ raise HTTPException(status_code=404, detail="Project not found")
206
 
207
+ project = projects[project_id]
 
 
 
 
208
 
209
+ return {
210
+ "project_id": project_id,
211
+ "created_at": project.created_at.isoformat(),
212
+ "completed_at": project.completed_at.isoformat() if project.completed_at else None,
213
+ "services": project.services,
214
+ "all_completed": all(s["status"] == "completed" for s in project.services.values())
215
+ }
216
 
217
  @app.get("/api/services")
218
  async def list_services():
219
  """List all configured services"""
220
  return {
221
  "services": [
222
+ {"type": stype, "config": config.dict()}
 
 
 
223
  for stype, config in services_config.items()
224
  ]
225
  }
226
 
 
 
 
 
 
 
227
  @app.get("/health")
228
  async def health():
229
+ """Health check endpoint"""
230
  return {
231
  "status": "healthy",
232
+ "active_projects": len(projects),
233
+ "services_configured": list(services_config.keys()),
234
+ "timestamp": datetime.now().isoformat()
235
  }
236
 
237
+ @app.get("/")
238
+ async def root():
239
+ """Root endpoint"""
240
+ return {
241
+ "name": "Status Tracker API",
242
+ "version": "1.0.0",
243
+ "endpoints": {
244
+ "register": "POST /api/register",
245
+ "update": "POST /api/update",
246
+ "status": "GET /api/status/{project_id}",
247
+ "services": "GET /api/services",
248
+ "health": "GET /health"
249
+ },
250
+ "active_projects": len(projects),
251
+ "services": list(services_config.keys())
252
+ }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
253
 
254
  # =============================================
255
+ # BACKGROUND CLEANUP TASK (Optional)
256
  # =============================================
257
 
258
  def cleanup_old_projects():
259
+ """Remove projects older than 1 hour"""
260
  while True:
261
+ time.sleep(300) # Check every 5 minutes
262
  now = datetime.now()
263
  to_delete = []
264
 
265
  for pid, project in projects.items():
266
+ # Remove projects completed more than 1 hour ago
267
+ if project.completed_at and (now - project.completed_at).total_seconds() > 3600:
268
+ to_delete.append(pid)
269
+ # Remove projects older than 2 hours (stuck)
270
+ elif (now - project.created_at).total_seconds() > 7200:
271
  to_delete.append(pid)
272
 
273
  for pid in to_delete:
 
282
  # RUN
283
  # =============================================
284
  if __name__ == "__main__":
 
285
  print("\n" + "=" * 60)
286
  print("🌐 Status Tracker running on port 7860")
287
+ print(f"πŸ“Š Active projects: {len(projects)}")
288
+ print(f"πŸ”§ Services: {list(services_config.keys())}")
289
  print("=" * 60)
290
+
291
  uvicorn.run(app, host="0.0.0.0", port=7860)