Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import time | |
| import threading | |
| from fastapi import FastAPI, HTTPException, BackgroundTasks | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| import uvicorn | |
| from typing import Dict | |
| from pathlib import Path | |
| from datetime import datetime | |
| from fastapi.responses import FileResponse | |
| # Import from cursor_tracker | |
| from cursor_tracker import ( | |
| main_processing_loop, | |
| processing_status, | |
| CURSOR_TRACKING_OUTPUT_FOLDER, | |
| CURSOR_TEMPLATES_DIR, | |
| log_message | |
| ) | |
| # FastAPI App Definition | |
| app = FastAPI(title="Cursor Tracking API", description="API to access cursor tracking results", version="1.0.0") | |
| # Add CORS middleware to allow cross-origin requests | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Allows all origins | |
| allow_credentials=True, | |
| allow_methods=["*"], # Allows all methods | |
| allow_headers=["*"], | |
| ) | |
| # Global variable to track if processing is running | |
| processing_thread = None | |
| def log_message(message): | |
| """Add a log message with timestamp""" | |
| timestamp = datetime.now().strftime("%H:%M:%S") | |
| log_entry = f"[{timestamp}] {message}" | |
| processing_status["logs"].append(log_entry) | |
| # Keep only the last 100 logs | |
| if len(processing_status["logs"]) > 100: | |
| processing_status["logs"] = processing_status["logs"][-100:] | |
| print(log_entry) | |
| async def startup_event(): | |
| """Run the processing loop in the background when the API starts""" | |
| global processing_thread | |
| if not (processing_thread and processing_thread.is_alive()): | |
| log_message("π Starting RAR extraction, frame extraction, and cursor tracking pipeline in background...") | |
| processing_thread = threading.Thread(target=main_processing_loop) | |
| processing_thread.daemon = True | |
| processing_thread.start() | |
| from fastapi.staticfiles import StaticFiles | |
| # app.mount("/static", StaticFiles(directory="static"), name="static") | |
| # Serve your main HTML file | |
| async def root(): | |
| return () | |
| # return FileResponse("index.html") | |
| # # Optional: If you need to serve other static files individually | |
| # @app.get("/{filename}") | |
| # async def serve_file(filename: str): | |
| # if filename in ['style.css', 'script.js']: | |
| # return FileResponse(f"static/{filename}") | |
| # return FileResponse(f"static/{filename}") | |
| async def get_status(): | |
| """Get current processing status""" | |
| return { | |
| "processing_status": processing_status, | |
| "cursor_tracking_folder": CURSOR_TRACKING_OUTPUT_FOLDER, | |
| "folder_exists": os.path.exists(CURSOR_TRACKING_OUTPUT_FOLDER) | |
| } | |
| async def list_cursor_data(): | |
| """List all available cursor tracking JSON files""" | |
| if not os.path.exists(CURSOR_TRACKING_OUTPUT_FOLDER): | |
| return {"files": [], "message": "Cursor tracking output folder does not exist yet"} | |
| json_files = [] | |
| for file in os.listdir(CURSOR_TRACKING_OUTPUT_FOLDER): | |
| if file.endswith(".json"): | |
| file_path = os.path.join(CURSOR_TRACKING_OUTPUT_FOLDER, file) | |
| file_stats = os.stat(file_path) | |
| json_files.append({ | |
| "filename": file, | |
| "size_bytes": file_stats.st_size, | |
| "modified_time": time.ctime(file_stats.st_mtime), | |
| "download_url": f"/cursor-data/{file}" | |
| }) | |
| return { | |
| "files": json_files, | |
| "total_files": len(json_files), | |
| "folder_path": CURSOR_TRACKING_OUTPUT_FOLDER | |
| } | |
| from fastapi.encoders import jsonable_encoder | |
| def get_disk_usage(path: str) -> Dict[str, float]: | |
| """Get disk usage statistics in GB""" | |
| statvfs = os.statvfs(path) | |
| total = statvfs.f_frsize * statvfs.f_blocks / (1024**3) | |
| free = statvfs.f_frsize * statvfs.f_bavail / (1024**3) | |
| used = total - free | |
| return {"total": total, "free": free, "used": used} | |
| class SafeJSONEncoder(json.JSONEncoder): | |
| def default(self, obj): | |
| try: | |
| if isinstance(obj, float): | |
| if obj != obj: # Check for NaN | |
| return None | |
| if obj == float('inf') or obj == float('-inf'): | |
| return None | |
| return super().default(obj) | |
| except: | |
| return None | |
| async def get_cursor_data(filename: str): | |
| """Get specific cursor tracking data by filename""" | |
| if not filename.endswith(".json"): | |
| raise HTTPException(status_code=400, detail="File must be a JSON file") | |
| file_path = os.path.join(CURSOR_TRACKING_OUTPUT_FOLDER, filename) | |
| if not os.path.exists(file_path): | |
| raise HTTPException(status_code=404, detail=f"File {filename} not found") | |
| try: | |
| with open(file_path, "r") as f: | |
| data = json.load(f) | |
| # Clean the data of any NaN or infinity values | |
| def clean_floats(obj): | |
| if isinstance(obj, float): | |
| if obj != obj: # NaN | |
| return None | |
| if obj == float('inf') or obj == float('-inf'): | |
| return None | |
| return obj | |
| elif isinstance(obj, dict): | |
| return {k: clean_floats(v) for k, v in obj.items()} | |
| elif isinstance(obj, list): | |
| return [clean_floats(v) for v in obj] | |
| return obj | |
| cleaned_data = clean_floats(data) | |
| # Add metadata | |
| file_stats = os.stat(file_path) | |
| response_data = { | |
| "filename": filename, | |
| "file_size_bytes": file_stats.st_size, | |
| "modified_time": time.ctime(file_stats.st_mtime), | |
| "total_frames": len(cleaned_data), | |
| "cursor_active_frames": len([frame for frame in cleaned_data if frame.get("cursor_active", False)]), | |
| "data": cleaned_data | |
| } | |
| return JSONResponse(content=jsonable_encoder(response_data)) | |
| except json.JSONDecodeError: | |
| raise HTTPException(status_code=500, detail=f"Invalid JSON in file {filename}") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error reading file {filename}: {str(e)}") | |
| async def start_processing(background_tasks: BackgroundTasks, start_index: int = 0): | |
| """Start the RAR processing pipeline in the background""" | |
| global processing_thread | |
| if processing_thread and processing_thread.is_alive(): | |
| return {"message": "Processing is already running", "status": "already_running"} | |
| if processing_status["is_running"]: | |
| return {"message": "Processing is already running", "status": "already_running"} | |
| # Start processing in a background thread | |
| processing_thread = threading.Thread(target=main_processing_loop, args=(start_index,)) | |
| processing_thread.daemon = True | |
| processing_thread.start() | |
| return {"message": f"Processing started in background from index {start_index}", "status": "started"} | |
| async def stop_processing(): | |
| """Stop the RAR processing pipeline""" | |
| global processing_thread | |
| if not processing_status["is_running"] and (not processing_thread or not processing_thread.is_alive()): | |
| return {"message": "No processing is currently running", "status": "not_running"} | |
| # Note: This is a graceful stop request. The actual stopping depends on the processing loop | |
| # checking the processing_status["is_running"] flag | |
| processing_status["is_running"] = False | |
| return {"message": "Stop signal sent to processing pipeline", "status": "stop_requested"} | |
| async def get_cursor_data_summary(filename: str): | |
| """Get a summary of cursor tracking data without the full frame data""" | |
| if not filename.endswith(".json"): | |
| raise HTTPException(status_code=400, detail="File must be a JSON file") | |
| file_path = os.path.join(CURSOR_TRACKING_OUTPUT_FOLDER, filename) | |
| if not os.path.exists(file_path): | |
| raise HTTPException(status_code=404, detail=f"File {filename} not found") | |
| try: | |
| with open(file_path, "r") as f: | |
| data = json.load(f) | |
| # Clean the data first | |
| def clean_floats(obj): | |
| if isinstance(obj, float): | |
| if obj != obj: # NaN | |
| return None | |
| if obj == float('inf') or obj == float('-inf'): | |
| return None | |
| return obj | |
| elif isinstance(obj, dict): | |
| return {k: clean_floats(v) for k, v in obj.items()} | |
| elif isinstance(obj, list): | |
| return [clean_floats(v) for v in obj] | |
| return obj | |
| cleaned_data = clean_floats(data) | |
| # Calculate summary statistics | |
| total_frames = len(cleaned_data) | |
| cursor_active_frames = len([frame for frame in cleaned_data if frame.get("cursor_active", False)]) | |
| cursor_inactive_frames = total_frames - cursor_active_frames | |
| # Get unique templates used | |
| templates_used = set() | |
| confidence_scores = [] | |
| for frame in cleaned_data: | |
| if frame.get("cursor_active", False) and frame.get("template"): | |
| templates_used.add(frame["template"]) | |
| if frame.get("confidence") is not None: | |
| # Ensure confidence is a valid number | |
| try: | |
| conf = float(frame["confidence"]) | |
| if not (conf != conf or conf == float('inf') or conf == float('-inf')): | |
| confidence_scores.append(conf) | |
| except (ValueError, TypeError): | |
| pass | |
| # Calculate confidence statistics | |
| avg_confidence = sum(confidence_scores) / len(confidence_scores) if confidence_scores else 0 | |
| max_confidence = max(confidence_scores) if confidence_scores else 0 | |
| min_confidence = min(confidence_scores) if confidence_scores else 0 | |
| file_stats = os.stat(file_path) | |
| summary = { | |
| "filename": filename, | |
| "file_size_bytes": file_stats.st_size, | |
| "modified_time": time.ctime(file_stats.st_mtime), | |
| "total_frames": total_frames, | |
| "cursor_active_frames": cursor_active_frames, | |
| "cursor_inactive_frames": cursor_inactive_frames, | |
| "cursor_detection_rate": cursor_active_frames / total_frames if total_frames > 0 else 0, | |
| "templates_used": list(templates_used), | |
| "confidence_stats": { | |
| "average": avg_confidence, | |
| "maximum": max_confidence, | |
| "minimum": min_confidence, | |
| "total_measurements": len(confidence_scores) | |
| } | |
| } | |
| return JSONResponse(content=jsonable_encoder(summary)) | |
| except json.JSONDecodeError: | |
| raise HTTPException(status_code=500, detail=f"Invalid JSON in file {filename}") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error reading file {filename}: {str(e)}") | |
| if __name__ == "__main__": | |
| # Start the FastAPI server | |
| print("Starting Cursor Tracking FastAPI Server...") | |
| print("API Documentation will be available at: http://localhost:8000/docs") | |
| print("API Root endpoint: http://localhost:8000/") | |
| # Ensure the cursor tracking output folder exists | |
| os.makedirs(CURSOR_TRACKING_OUTPUT_FOLDER, exist_ok=True) | |
| uvicorn.run( | |
| app, | |
| host="0.0.0.0", | |
| port=8000, | |
| log_level="info", | |
| reload=False # Set to False for production | |
| ) | |