parthnuwal7 commited on
Commit
7175d0a
·
1 Parent(s): 4b62d23

Adding app.py

Browse files
Files changed (2) hide show
  1. app.py +493 -0
  2. requirements.txt +24 -25
app.py ADDED
@@ -0,0 +1,493 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ FastAPI backend wrapper for HF Spaces
3
+ Provides REST API endpoints while keeping Streamlit UI
4
+ """
5
+
6
+ # Load environment variables FIRST before any other imports
7
+ from dotenv import load_dotenv
8
+ load_dotenv()
9
+
10
+ from fastapi import FastAPI, HTTPException
11
+ from fastapi.middleware.cors import CORSMiddleware
12
+ from pydantic import BaseModel
13
+ from typing import Dict, List, Any, Optional
14
+ import pandas as pd
15
+ import sys
16
+ import os
17
+ import uvicorn
18
+ import asyncio
19
+ from threading import Thread
20
+ from concurrent.futures import ThreadPoolExecutor
21
+ import subprocess
22
+
23
+ # Add src to path for imports
24
+ current_dir = os.path.dirname(os.path.abspath(__file__))
25
+ src_path = os.path.join(current_dir, 'src')
26
+ if src_path not in sys.path:
27
+ sys.path.insert(0, src_path)
28
+
29
+ from utils.data_processor import DataProcessor
30
+ from utils.task_manager import get_task_manager
31
+ from utils.rate_limit_middleware import RateLimitMiddleware
32
+ from utils.mongodb_service import get_mongodb_service
33
+ from utils.redis_service import get_redis_service
34
+ from utils.task_queue import get_task_queue
35
+ from utils.ip_location_service import get_ip_location_service
36
+ from utils.admin_endpoints import router as admin_router
37
+
38
+ app = FastAPI(title="ABSA ML Backend API", version="1.0.0")
39
+
40
+ # Add CORS middleware
41
+ app.add_middleware(
42
+ CORSMiddleware,
43
+ allow_origins=["*"],
44
+ allow_credentials=True,
45
+ allow_methods=["*"],
46
+ allow_headers=["*"],
47
+ )
48
+
49
+ # Add rate limiting middleware
50
+ app.add_middleware(RateLimitMiddleware, max_requests=2, window_seconds=60)
51
+
52
+ # Include admin router
53
+ app.include_router(admin_router)
54
+
55
+ # Initialize processor and task manager
56
+ processor = None
57
+ task_manager = get_task_manager()
58
+ executor = ThreadPoolExecutor(max_workers=int(os.getenv('MAX_WORKERS', '2')))
59
+
60
+ # Initialize services
61
+ mongodb_service = get_mongodb_service()
62
+ redis_service = get_redis_service()
63
+ ip_location_service = get_ip_location_service()
64
+
65
+ # Initialize task queue with processor (will be set later)
66
+ task_queue = None
67
+
68
+ def get_processor():
69
+ """Get or initialize processor with task manager."""
70
+ global processor, task_queue
71
+ if processor is None:
72
+ processor = DataProcessor()
73
+ processor.set_task_manager(task_manager)
74
+ # Initialize task queue with processor
75
+ task_queue = get_task_queue(processor)
76
+ task_queue.start_worker()
77
+ return processor
78
+
79
+ def calculate_timeout(num_reviews: int) -> float:
80
+ """
81
+ Calculate dynamic timeout based on dataset size.
82
+
83
+ Args:
84
+ num_reviews: Number of reviews to process
85
+
86
+ Returns:
87
+ Timeout in seconds
88
+ """
89
+ base_timeout = 300.0 # 5 minutes
90
+ per_review_time = 0.3 # 0.3 seconds per review
91
+ calculated = base_timeout + (num_reviews * per_review_time)
92
+ max_timeout = 900.0 # 15 minutes absolute max
93
+
94
+ return min(calculated, max_timeout)
95
+
96
+ class ReviewData(BaseModel):
97
+ id: int
98
+ reviews_title: str
99
+ review: str
100
+ date: str
101
+ user_id: str
102
+
103
+ class ProcessRequest(BaseModel):
104
+ data: List[ReviewData]
105
+ options: Optional[Dict[str, Any]] = {}
106
+ user_id: Optional[str] = "default"
107
+
108
+ class ProcessResponse(BaseModel):
109
+ status: str
110
+ data: Optional[Dict[str, Any]] = None
111
+ message: Optional[str] = None
112
+
113
+ @app.get("/")
114
+ async def root():
115
+ return {"message": "ABSA ML Backend API", "status": "running"}
116
+
117
+ @app.post("/log-session")
118
+ async def log_session(request: dict):
119
+ """
120
+ Log session metadata with IP and location (gated by Redis).
121
+
122
+ Expected payload:
123
+ {
124
+ "device_id": "string",
125
+ "user_id": "string (optional)",
126
+ "ip_address": "string",
127
+ "user_agent": "string (optional)"
128
+ }
129
+ """
130
+ device_id = request.get("device_id")
131
+ user_id = request.get("user_id")
132
+ ip_address = request.get("ip_address")
133
+ user_agent = request.get("user_agent")
134
+
135
+ if not device_id or not ip_address:
136
+ raise HTTPException(status_code=400, detail="device_id and ip_address required")
137
+
138
+ # Log session metadata (gated by Redis)
139
+ logged = ip_location_service.log_session_metadata(
140
+ device_id=device_id,
141
+ ip_address=ip_address,
142
+ user_id=user_id,
143
+ user_agent=user_agent
144
+ )
145
+
146
+ return {
147
+ "status": "success",
148
+ "logged": logged,
149
+ "message": "Session metadata logged" if logged else "Already logged within TTL window"
150
+ }
151
+
152
+ @app.post("/log-event")
153
+ async def log_event(request: dict):
154
+ """
155
+ Log a telemetry event to MongoDB.
156
+
157
+ Expected payload:
158
+ {
159
+ "event_type": "DASHBOARD_VIEW | ANALYSIS_REQUEST | etc.",
160
+ "device_id": "string",
161
+ "user_id": "string (optional)",
162
+ "metadata": {} (optional)
163
+ }
164
+ """
165
+ event_type = request.get("event_type")
166
+ device_id = request.get("device_id")
167
+ user_id = request.get("user_id")
168
+ metadata = request.get("metadata")
169
+
170
+ if not event_type or not device_id:
171
+ raise HTTPException(status_code=400, detail="event_type and device_id required")
172
+
173
+ success = mongodb_service.log_event(
174
+ event_type=event_type,
175
+ device_id=device_id,
176
+ user_id=user_id,
177
+ metadata=metadata
178
+ )
179
+
180
+ return {
181
+ "status": "success" if success else "error",
182
+ "logged": success
183
+ }
184
+
185
+ @app.get("/health")
186
+ async def health_check():
187
+ try:
188
+ proc = get_processor()
189
+ return {
190
+ "status": "healthy",
191
+ "translation_service": "available" if hasattr(proc.translator, 'model') else "unavailable",
192
+ "absa_service": "available" if hasattr(proc.absa_processor, 'aspect_extractor') else "unavailable",
193
+ "mongodb": "connected" if mongodb_service._client else "disconnected",
194
+ "redis": "connected" if redis_service.is_connected() else "disconnected"
195
+ }
196
+ except Exception as e:
197
+ return {"status": "error", "message": str(e)}
198
+
199
+ @app.post("/submit-job", response_model=Dict[str, Any])
200
+ async def submit_job(request: ProcessRequest):
201
+ """
202
+ Submit ABSA job to async queue.
203
+
204
+ Returns job_id for status tracking.
205
+ """
206
+ try:
207
+ # Get device_id and user_id from request headers or body
208
+ device_id = request.options.get("device_id", "unknown")
209
+ user_id = request.user_id
210
+
211
+ # Convert request data to dict
212
+ data_list = [item.model_dump() if hasattr(item, 'model_dump') else item.dict() for item in request.data]
213
+
214
+ # Ensure task_queue is initialized
215
+ get_processor()
216
+
217
+ # Submit job to queue
218
+ job_id = task_queue.submit_job(
219
+ data={"csv_data": data_list, "options": request.options},
220
+ device_id=device_id,
221
+ user_id=user_id
222
+ )
223
+
224
+ return {
225
+ "status": "queued",
226
+ "job_id": job_id,
227
+ "message": "Job submitted successfully. Use /job-status/{job_id} to check progress."
228
+ }
229
+
230
+ except Exception as e:
231
+ import logging
232
+ logger = logging.getLogger(__name__)
233
+ logger.error(f"Failed to submit job: {str(e)}")
234
+ raise HTTPException(status_code=500, detail=str(e))
235
+
236
+ @app.get("/job-status/{job_id}")
237
+ async def get_job_status(job_id: str):
238
+ """Get status of queued job."""
239
+ get_processor() # Ensure task_queue is initialized
240
+
241
+ status = task_queue.get_job_status(job_id)
242
+
243
+ if status is None:
244
+ raise HTTPException(status_code=404, detail="Job not found")
245
+
246
+ response = {
247
+ "job_id": job_id,
248
+ "status": status
249
+ }
250
+
251
+ # If job is done, include result
252
+ if status == "DONE":
253
+ result = task_queue.get_job_result(job_id)
254
+ if result:
255
+ response["result"] = result
256
+
257
+ return response
258
+
259
+ @app.post("/process-reviews", response_model=ProcessResponse)
260
+ async def process_reviews(request: ProcessRequest):
261
+ """
262
+ Process reviews with cancellation support and timeout.
263
+ """
264
+ # Create task for tracking
265
+ task_id = task_manager.create_task(user_id=request.user_id)
266
+
267
+ try:
268
+ # Convert request data to DataFrame (using model_dump for Pydantic v2)
269
+ data_list = [item.model_dump() if hasattr(item, 'model_dump') else item.dict() for item in request.data]
270
+ df = pd.DataFrame(data_list)
271
+
272
+ # Calculate dynamic timeout
273
+ timeout = calculate_timeout(len(df))
274
+
275
+ # Update task status
276
+ task_manager.update_task(task_id, status='processing', message=f'Processing {len(df)} reviews')
277
+
278
+ # Run processing in background thread with timeout
279
+ proc = get_processor()
280
+ loop = asyncio.get_event_loop()
281
+
282
+ try:
283
+ results = await asyncio.wait_for(
284
+ loop.run_in_executor(
285
+ executor,
286
+ proc.process_uploaded_data,
287
+ df,
288
+ task_id
289
+ ),
290
+ timeout=timeout
291
+ )
292
+ except asyncio.TimeoutError:
293
+ # Mark task as failed and cleanup
294
+ task_manager.complete_task(task_id, success=False, message=f'Processing timeout ({timeout}s exceeded)')
295
+ task_manager.cleanup_task(task_id)
296
+
297
+ return ProcessResponse(
298
+ status="timeout",
299
+ message=f"Processing exceeded {timeout:.0f} second limit. Try with fewer reviews or wait and retry."
300
+ )
301
+
302
+ # Check if cancelled during processing
303
+ if isinstance(results, dict) and results.get('status') == 'cancelled':
304
+ task_manager.mark_cancelled(task_id)
305
+ task_manager.cleanup_task(task_id)
306
+
307
+ return ProcessResponse(
308
+ status="cancelled",
309
+ message=results.get('message', 'Task was cancelled by user')
310
+ )
311
+
312
+ # Check for errors
313
+ if 'error' in results:
314
+ task_manager.complete_task(task_id, success=False, message=str(results['error']))
315
+ raise HTTPException(status_code=400, detail=results['error'])
316
+
317
+ # Mark task as complete
318
+ task_manager.complete_task(task_id, success=True, message='Processing completed successfully')
319
+
320
+ # Serialize for API response
321
+ serialized_results = serialize_for_api(results)
322
+ serialized_results['task_id'] = task_id
323
+ serialized_results['timeout_used'] = timeout
324
+
325
+ return ProcessResponse(
326
+ status="success",
327
+ data=serialized_results
328
+ )
329
+
330
+ except HTTPException:
331
+ raise
332
+ except Exception as e:
333
+ import traceback
334
+ error_detail = {
335
+ "error": str(e),
336
+ "traceback": traceback.format_exc(),
337
+ "task_id": task_id
338
+ }
339
+
340
+ task_manager.complete_task(task_id, success=False, message=str(e))
341
+ task_manager.cleanup_task(task_id)
342
+
343
+ # Log full error
344
+ import logging
345
+ logger = logging.getLogger(__name__)
346
+ logger.error(f"Processing error for task {task_id}: {str(e)}")
347
+ logger.error(f"Traceback: {traceback.format_exc()}")
348
+
349
+ raise HTTPException(status_code=500, detail=error_detail)
350
+
351
+ @app.post("/cancel-task/{task_id}")
352
+ async def cancel_task(task_id: str):
353
+ """Cancel a running task."""
354
+ success = task_manager.cancel_task(task_id)
355
+
356
+ if success:
357
+ return {
358
+ "status": "success",
359
+ "message": f"Task {task_id} cancellation requested",
360
+ "task_id": task_id
361
+ }
362
+ else:
363
+ return {
364
+ "status": "error",
365
+ "message": "Task not found or already completed",
366
+ "task_id": task_id
367
+ }
368
+
369
+ @app.get("/task-status/{task_id}")
370
+ async def get_task_status(task_id: str):
371
+ """Get status of a specific task."""
372
+ status = task_manager.get_task_status(task_id)
373
+
374
+ if status:
375
+ return {
376
+ "status": "success",
377
+ "task": status
378
+ }
379
+ else:
380
+ raise HTTPException(status_code=404, detail="Task not found")
381
+
382
+ @app.post("/cancel-user-tasks/{user_id}")
383
+ async def cancel_user_tasks(user_id: str):
384
+ """Cancel all tasks for a specific user."""
385
+ count = task_manager.cancel_user_tasks(user_id)
386
+
387
+ return {
388
+ "status": "success",
389
+ "message": f"Cancelled {count} tasks for user {user_id}",
390
+ "cancelled_count": count
391
+ }
392
+
393
+ @app.get("/user-tasks/{user_id}")
394
+ async def get_user_tasks(user_id: str):
395
+ """Get all tasks for a specific user."""
396
+ tasks = task_manager.get_user_tasks(user_id)
397
+
398
+ return {
399
+ "status": "success",
400
+ "user_id": user_id,
401
+ "task_count": len(tasks),
402
+ "tasks": tasks
403
+ }
404
+
405
+ @app.get("/task-stats")
406
+ async def get_task_stats():
407
+ """Get overall task statistics."""
408
+ stats = task_manager.get_stats()
409
+
410
+ return {
411
+ "status": "success",
412
+ "stats": stats
413
+ }
414
+
415
+ @app.post("/cleanup-old-tasks")
416
+ async def cleanup_old_tasks(max_age_hours: int = 1):
417
+ """Clean up old completed tasks."""
418
+ max_age_seconds = max_age_hours * 3600
419
+ task_manager.cleanup_old_tasks(max_age_seconds)
420
+
421
+ return {
422
+ "status": "success",
423
+ "message": f"Cleaned up tasks older than {max_age_hours} hour(s)"
424
+ }
425
+
426
+ def serialize_for_api(results: Dict) -> Dict:
427
+ """Convert complex objects to JSON-serializable format."""
428
+ serialized = {}
429
+
430
+ for key, value in results.items():
431
+ if key == 'processed_data':
432
+ # Convert DataFrame to dict
433
+ serialized[key] = value.to_dict('records') if hasattr(value, 'to_dict') else value
434
+ elif key == 'aspect_network':
435
+ # Convert NetworkX graph to dict
436
+ import networkx as nx
437
+ if hasattr(value, 'nodes'):
438
+ serialized[key] = nx.node_link_data(value)
439
+ else:
440
+ serialized[key] = value
441
+ elif hasattr(value, 'to_dict'):
442
+ # Convert DataFrames
443
+ serialized[key] = value.to_dict('records')
444
+ elif isinstance(value, pd.DataFrame):
445
+ serialized[key] = value.to_dict('records')
446
+ else:
447
+ # Keep as is for basic types
448
+ serialized[key] = value
449
+
450
+ return serialized
451
+
452
+ def run_streamlit():
453
+ """Run Streamlit in a separate thread (optional - only if app file exists)"""
454
+ import logging
455
+ logger = logging.getLogger(__name__)
456
+
457
+ # Check if streamlit app exists
458
+ streamlit_files = ["frontend_light.py", "app_enhanced.py", "app.py"]
459
+ streamlit_app = None
460
+
461
+ for file in streamlit_files:
462
+ if os.path.exists(file):
463
+ streamlit_app = file
464
+ break
465
+
466
+ if streamlit_app:
467
+ logger.info(f"Starting Streamlit UI with {streamlit_app}")
468
+ subprocess.run([
469
+ "streamlit", "run", streamlit_app,
470
+ "--server.port=8502",
471
+ "--server.address=0.0.0.0"
472
+ ])
473
+ else:
474
+ logger.info("No Streamlit app found. Running FastAPI only (API-only mode)")
475
+
476
+ if __name__ == "__main__":
477
+ import logging
478
+ logging.basicConfig(level=logging.INFO)
479
+ logger = logging.getLogger(__name__)
480
+
481
+ # Try to start Streamlit in background (optional)
482
+ streamlit_available = any(os.path.exists(f) for f in ["frontend_light.py", "app_enhanced.py", "app.py"])
483
+
484
+ if streamlit_available:
485
+ logger.info("🎨 Starting Streamlit UI in background...")
486
+ streamlit_thread = Thread(target=run_streamlit, daemon=True)
487
+ streamlit_thread.start()
488
+ else:
489
+ logger.info("📡 Running in API-only mode (no Streamlit UI)")
490
+
491
+ # Start FastAPI
492
+ logger.info("🚀 Starting FastAPI server on http://0.0.0.0:7860")
493
+ uvicorn.run(app, host="0.0.0.0", port=7860)
requirements.txt CHANGED
@@ -1,24 +1,23 @@
1
- altair
2
- pandas
3
- # Streamlit Cloud Requirements - Optimized for API approach
4
- streamlit>=1.28.0
5
- pandas>=1.5.0
6
- numpy>=1.24.0
7
- plotly>=5.15.0
8
- seaborn>=0.12.0
9
- matplotlib>=3.7.0
10
- scikit-learn>=1.3.0
11
- wordcloud>=1.9.0
 
 
 
 
12
  langdetect>=1.0.9
13
- streamlit-option-menu>=0.3.6
14
- streamlit-aggrid>=0.3.4
15
- joblib>=1.3.0
16
- pillow>=10.0.0
17
  requests>=2.31.0
18
- faker>=18.0.0
19
- networkx>=3.0
20
- openpyxl>=3.1.0
21
- reportlab>=4.0.0
22
  fastapi>=0.104.0
23
  uvicorn>=0.24.0
24
  pydantic>=2.0.0
@@ -27,10 +26,10 @@ pydantic>=2.0.0
27
  pymongo>=4.6.0
28
  redis>=5.0.0
29
 
30
- # Removed heavy ML dependencies for API approach:
31
- # - torch (saved ~1GB)
32
- # - transformers (saved ~500MB)
33
- # - pyabsa (saved download issues)
34
- # - sentencepiece, sacremoses, faiss-cpu (not needed)
35
 
36
- # Using HF Inference API instead - much more reliable!
 
 
1
+ # Backend Requirements - HF Spaces with PyABSA
2
+ # Full ML stack for high-accuracy processing
3
+
4
+ # Core ML frameworks
5
+ torch>=2.0.0,<2.2.0
6
+ transformers>=4.30.0,<4.37.0
7
+ pyabsa>=2.4.0,<3.0.0
8
+ sentencepiece>=0.1.99
9
+ sacremoses>=0.0.53
10
+ faiss-cpu>=1.7.4
11
+
12
+ # Data processing
13
+ pandas>=1.5.0,<2.1.0
14
+ numpy>=1.24.0,<1.26.0
15
+ scikit-learn>=1.3.0,<1.4.0
16
  langdetect>=1.0.9
17
+
18
+ # API and web framework
19
+ streamlit>=1.36.0
 
20
  requests>=2.31.0
 
 
 
 
21
  fastapi>=0.104.0
22
  uvicorn>=0.24.0
23
  pydantic>=2.0.0
 
26
  pymongo>=4.6.0
27
  redis>=5.0.0
28
 
29
+ # Utilities
30
+ joblib>=1.3.0
31
+ tqdm>=4.65.0
32
+ pillow>=10.0.0,<10.2.0
 
33
 
34
+ # Optional for network analysis (if needed)
35
+ networkx>=3.0