import logging import uuid import os import shutil import pathlib from urllib.parse import urlparse from django.conf import settings from django.core.files.storage import FileSystemStorage from django.core.exceptions import SuspiciousFileOperation from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status, permissions from .gradio_helpers import get_space_details from gradio_client import Client, exceptions, file as gradio_file import time # In-memory log storage for debugging BACKEND_LOGS = [] def log_backend_message(message): """Adds a message to the in-memory log store.""" logging.info(message) BACKEND_LOGS.append(f"INFO: {message}") def log_backend_error(message, exc_info=False): """Adds an error to the in-memory log store.""" logging.error(message, exc_info=exc_info) BACKEND_LOGS.append(f"ERROR: {message}") class BackendLogView(APIView): """An endpoint to fetch and clear backend logs for debugging.""" permission_classes = [permissions.AllowAny] def get(self, request, *args, **kwargs): if not settings.DEBUG: return Response({"error": "Logs are disabled in production"}, status=status.HTTP_403_FORBIDDEN) logs = BACKEND_LOGS.copy() BACKEND_LOGS.clear() return Response({"logs": logs}) logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') JOBS = {} # Short-lived cache for completed results to tolerate trailing polls from the frontend COMPLETED_RESULTS = {} COMPLETED_TTL_SECONDS = 30 CLIENT_CACHE = {} class FileUploadView(APIView): permission_classes = [permissions.AllowAny] def post(self, request, *args, **kwargs): log_backend_message("FileUploadView: Received file upload request.") file_obj = request.FILES.get('file') if not file_obj: log_backend_error("FileUploadView: No file provided in the request.") return Response({"error": "File not provided"}, status=status.HTTP_400_BAD_REQUEST) if not request.session.session_key: request.session.create() user_id = request.session.session_key or 'anonymous' log_backend_message(f"FileUploadView: Upload initiated by user_id: {user_id}") # Validate filename and content type allowed_extensions = {'.png', '.jpg', '.jpeg', '.gif', '.webp', '.txt', '.csv', '.json', '.pdf'} extension = os.path.splitext(file_obj.name)[1].lower() if extension not in allowed_extensions: log_backend_error(f"FileUploadView: Disallowed file extension: {extension}") return Response({"error": "Unsupported file type"}, status=status.HTTP_400_BAD_REQUEST) max_size_bytes = 10 * 1024 * 1024 # 10 MB if file_obj.size and file_obj.size > max_size_bytes: log_backend_error("FileUploadView: Uploaded file exceeds size limit") return Response({"error": "File too large"}, status=status.HTTP_400_BAD_REQUEST) # Store uploads outside of STATIC/MEDIA served paths, under per-user directory # Use a writable uploads dir under /data upload_root = os.getenv('DJANGO_UPLOADS_ROOT', str(getattr(settings, 'UPLOADS_ROOT', settings.BASE_DIR / 'uploads'))) upload_dir = os.path.join(upload_root, user_id) try: os.makedirs(upload_dir, exist_ok=True) except PermissionError: # Fallback to /data/uploads if root filesystem is read-only (HF Spaces) upload_root = '/data/uploads' upload_dir = os.path.join(upload_root, user_id) os.makedirs(upload_dir, exist_ok=True) fs = FileSystemStorage(location=upload_dir) try: filename = fs.save(file_obj.name, file_obj) except (SuspiciousFileOperation, OSError) as e: log_backend_error(f"FileUploadView: Failed to save file securely: {e}") return Response({"error": "Invalid filename"}, status=status.HTTP_400_BAD_REQUEST) file_path = fs.path(filename) log_backend_message(f"FileUploadView: File '{filename}' saved to '{file_path}'.") return Response({"path": file_path}, status=status.HTTP_201_CREATED) import logging import traceback from urllib.parse import urlparse from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status, permissions from .gradio_helpers import get_space_details from .views import log_backend_message, log_backend_error # adjust import as needed class GradioView(APIView): """ API endpoint to fetch details of a Gradio Space. Expects a 'space_id' query parameter (either a Hugging Face Space ID or a full URL). Returns structured information about the space's endpoints and components. """ permission_classes = [permissions.AllowAny] def get(self, request, *args, **kwargs): log_backend_message("GradioView: Received request to fetch space details.") space_id = request.query_params.get('space_id') log_backend_message(f"GradioView: Requested space_id: '{space_id}'") if not space_id: log_backend_error("GradioView: space_id parameter is missing.") return Response( {"error": {"type": "missing_param", "message": "space_id is required"}}, status=status.HTTP_400_BAD_REQUEST ) # If the user provided a full HF Space URL, extract the ID if space_id.startswith('http'): log_backend_message("GradioView: space_id is a URL, attempting to parse.") try: parsed_url = urlparse(space_id) path_parts = parsed_url.path.strip('/').split('/') # Typical format: /spaces/username/space-name if len(path_parts) >= 3 and path_parts[0] == 'spaces': extracted_id = f"{path_parts[1]}/{path_parts[2]}" log_backend_message(f"GradioView: Extracted space_id '{extracted_id}' from URL.") space_id = extracted_id else: raise ValueError("URL does not match expected /spaces/username/space-name pattern") except Exception as e: log_backend_error(f"GradioView: Failed to parse URL: {e}") return Response( {"error": {"type": "invalid_url", "message": "Invalid Hugging Face Space URL provided."}}, status=status.HTTP_400_BAD_REQUEST ) # Call the enhanced helper that returns structured results try: log_backend_message(f"GradioView: Calling get_space_details for '{space_id}'.") result = get_space_details(space_id) if result.get("success"): log_backend_message(f"GradioView: Successfully fetched details for '{space_id}'.") return Response(result["data"]) else: # Handle known error types with appropriate HTTP status codes error_info = result["error"] error_type = error_info.get("type", "unknown") # Map error types to HTTP status codes status_map = { "authentication": status.HTTP_401_UNAUTHORIZED, "build": status.HTTP_503_SERVICE_UNAVAILABLE, "server": status.HTTP_502_BAD_GATEWAY, "not_found": status.HTTP_404_NOT_FOUND, "timeout": status.HTTP_504_GATEWAY_TIMEOUT, } http_status = status_map.get(error_type, status.HTTP_500_INTERNAL_SERVER_ERROR) log_backend_error(f"GradioView: Failed to get space details for '{space_id}'. Error: {error_info}") return Response({"error": error_info}, status=http_status) except Exception as e: # This should not happen if get_space_details catches everything, but just in case log_backend_error(f"GradioView: Unhandled exception while processing '{space_id}': {traceback.format_exc()}") return Response( { "error": { "type": "unhandled", "message": f"Internal server error: {str(e)}", "detail": traceback.format_exc() if settings.DEBUG else None } }, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) class PredictView(APIView): permission_classes = [permissions.AllowAny] def post(self, request, *args, **kwargs): BACKEND_LOGS.clear() # Clear logs for new run log_backend_message("PredictView: Received new prediction request.") space_id = request.data.get('space_id') api_name = request.data.get('api_name') inputs = request.data.get('inputs', []) log_backend_message(f"PredictView: Space: {space_id}, API: {api_name}, Inputs: {inputs}") if not all([space_id, api_name]): log_backend_error("PredictView: Missing space_id or api_name.") return Response({"error": "space_id and api_name are required"}, status=status.HTTP_400_BAD_REQUEST) try: if space_id in CLIENT_CACHE: log_backend_message(f"PredictView: Reusing existing Gradio client for space: {space_id}") client = CLIENT_CACHE[space_id] else: log_backend_message(f"PredictView: Initializing new Gradio client for space: {space_id}") # Use HF_TOKEN from environment if available for gated/private spaces hf_token = os.getenv("HF_TOKEN") client = Client(space_id, hf_token=hf_token, httpx_kwargs={"timeout": 1000}, verbose=True) CLIENT_CACHE[space_id] = client log_backend_message(f"PredictView: New client for {space_id} cached.") # Process inputs to handle file paths correctly processed_inputs = [] for input_value in inputs: # Check if the input is a file path string that exists on the local system if isinstance(input_value, str) and os.path.exists(input_value) and os.path.isfile(input_value): # Convert file path to a Gradio file object that includes the file content log_backend_message(f"PredictView: Converting local file path '{input_value}' to file content") try: # Use gradio_client's file utility to properly handle the file processed_value = gradio_file(input_value) processed_inputs.append(processed_value) except Exception as file_error: log_backend_error(f"PredictView: Failed to process file '{input_value}': {str(file_error)}") return Response({"error": f"File processing error: {str(file_error)}"}, status=status.HTTP_400_BAD_REQUEST) else: # Non-file inputs pass through unchanged processed_inputs.append(input_value) log_backend_message("PredictView: Submitting job to Gradio client with processed inputs...") job = client.submit(*processed_inputs, api_name=api_name) job_id = str(uuid.uuid4()) # Store input types for better error reporting input_types = [type(inp).__name__ for inp in inputs] if not request.session.session_key: request.session.create() JOBS[job_id] = {'job': job, 'owner_session': request.session.session_key, 'input_types': input_types} log_backend_message(f"PredictView: Job submitted with temporary ID: {job_id}") return Response({"job_id": job_id}, status=status.HTTP_202_ACCEPTED) except Exception as e: log_backend_error(f"PredictView: Prediction failed for space '{space_id}': {str(e)}", exc_info=True) return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) class ResultView(APIView): permission_classes = [permissions.AllowAny] def _process_result(self, result, request): log_backend_message(f"ResultView._process_result: Processing item. Type: {type(result)}. Value: {result}") # Recursively process lists and tuples if isinstance(result, list) or isinstance(result, tuple): log_backend_message(f"ResultView._process_result: Item is a {type(result).__name__}, processing each element recursively.") return [self._process_result(item, request) for item in result] if isinstance(result, str): # EAFP (Easier to Ask for Forgiveness than Permission) approach try: # Attempt to treat the string as a file path log_backend_message(f"ResultView._process_result: Item is a string. Attempting to copy '{result}' as a file.") # An extra check to avoid trying to copy things that are clearly not paths if not os.path.sep in result: log_backend_message(f"ResultView._process_result: '{result}' does not contain a path separator. Assuming it's a regular string.") return result if not request.session.session_key: request.session.create() user_id = request.session.session_key or 'anonymous' user_media_dir = os.path.join(settings.MEDIA_ROOT, user_id) if not os.path.exists(user_media_dir): os.makedirs(user_media_dir) log_backend_message(f"ResultView._process_result: Created user media directory: {user_media_dir}") # Create a unique filename to avoid conflicts original_filename = os.path.basename(result) unique_filename = str(uuid.uuid4()) + os.path.splitext(original_filename)[1] destination_path = os.path.join(user_media_dir, unique_filename) log_backend_message(f"ResultView._process_result: Destination path for copy: {destination_path}") shutil.copy(result, destination_path) log_backend_message("ResultView._process_result: File copied successfully.") # Construct the public URL file_url_path = f"{settings.MEDIA_URL}{user_id}/{unique_filename}" url = request.build_absolute_uri(file_url_path) log_backend_message(f"ResultView._process_result: Successfully converted file path to public URL: '{url}'") return url except (FileNotFoundError, IsADirectoryError, OSError) as e: # This will trigger if `result` is not a valid file path log_backend_message(f"ResultView._process_result: Could not treat '{result}' as a file. It's likely a regular string. Error: {e}") return result except Exception as e: # Catch any other unexpected errors during the copy log_backend_error(f"ResultView._process_result: An unexpected error occurred while processing '{result}'. Error: {e}", exc_info=True) return result log_backend_message(f"ResultView._process_result: Item is not a string, list, or tuple ({type(result).__name__}), returning as is.") return result def get(self, request, job_id, *args, **kwargs): log_backend_message(f"ResultView: Received result request for job_id: {job_id}") # Opportunistic cleanup of expired completed entries now_ts = time.time() for key, value in list(COMPLETED_RESULTS.items()): if value.get('expiry', 0) <= now_ts: COMPLETED_RESULTS.pop(key, None) job_info = JOBS.get(job_id) if job_info is None: completed_entry = COMPLETED_RESULTS.get(job_id) if completed_entry and completed_entry.get('expiry', 0) > now_ts: log_backend_message(f"ResultView: Serving cached completed result for job {job_id}.") return Response({ "status": "completed", "result": completed_entry.get('result'), "logs": [] }, status=status.HTTP_200_OK) log_backend_message(f"ResultView: Job with ID {job_id} not found (likely already completed and evicted).") return Response({"status": "not_found"}, status=status.HTTP_200_OK) if not request.session.session_key: request.session.create() owner_session = job_info.get('owner_session') current_session = request.session.session_key if owner_session != current_session: # In embedded contexts (HF Spaces iframe), browsers may block third-party cookies, # causing session changes between requests. Allow bypass if explicitly enabled. if os.getenv('DISABLE_JOB_SESSION_CHECK', '1') == '1': log_backend_message( f"ResultView: Session mismatch for job {job_id} (owner={owner_session}, current={current_session}). Bypassing due to DISABLE_JOB_SESSION_CHECK=1." ) else: log_backend_error( f"ResultView: Session {current_session} attempted to access job {job_id} owned by {owner_session}" ) return Response({"error": "Forbidden"}, status=status.HTTP_403_FORBIDDEN) job = job_info['job'] try: try: status_name = job.status().code.name.lower() except Exception: # If the client discarded the job (e.g., remote space returned 404 on queue), # treat as finished to force a result fetch attempt which surfaces the error. status_name = "finished" log_backend_message(f"ResultView: Job {job_id} status is '{status_name}'.") if status_name == "finished": log_backend_message(f"ResultView: Job {job_id} is finished. Processing final result.") try: # Added try-except block specifically around job.result() call raw_results = job.result() log_backend_message(f"ResultView: Job {job_id} completed with raw result: {raw_results}") if not isinstance(raw_results, list): raw_results = [raw_results] final_result = self._process_result(raw_results, request) JOBS.pop(job_id, None) # Capture the logs from this final processing step final_logs = BACKEND_LOGS.copy() BACKEND_LOGS.clear() log_backend_message(f"ResultView: Final processed result for job {job_id}: {final_result}") # Cache the result briefly to absorb trailing polls COMPLETED_RESULTS[job_id] = {"result": final_result, "expiry": time.time() + COMPLETED_TTL_SECONDS} return Response({ "status": "completed", "result": final_result, "logs": final_logs # Include final logs in the response }, status=status.HTTP_200_OK) except exceptions.AppError as e: # Specifically handling Gradio AppError error_message = str(e) log_backend_error(f"ResultView: Gradio app error for job {job_id}: {error_message}") # Add special frontend console logging with the error details log_backend_message(f"FRONTEND_LOG: ⚠️ Gradio API Error: {error_message}") # Provide a more user-friendly message for the "show_error=True" issue if "show_error=True in launch()" in error_message: # Check if this might be an image upload issue based on stored input types input_types = job_info.get('input_types', []) friendly_message = "The remote Gradio app encountered an error but didn't provide detailed information." if any('file' in str(t).lower() for t in input_types) or any('image' in str(t).lower() for t in input_types): friendly_message += " This may be related to image upload. Try using a different image format or size." # Add detailed technical information for the frontend console log_backend_message(f"FRONTEND_LOG: 📋 Technical Details: The Gradio space needs 'show_error=True' in its launch() method to provide more specific error information.") log_backend_message(f"FRONTEND_LOG: 💡 Recommendation: Try a different input or contact the space owner about enabling detailed error reporting.") # Add timestamp for debugging import datetime timestamp = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") log_backend_message(f"FRONTEND_LOG: 🕒 Error occurred at (UTC): {timestamp}") log_backend_message(f"ResultView: Providing friendly error message: {friendly_message}") JOBS.pop(job_id, None) return Response({ "status": "error", "error": friendly_message, "original_error": error_message, "logs": BACKEND_LOGS.copy() }, status=status.HTTP_400_BAD_REQUEST) else: # For other AppErrors, forward the original message but also log for frontend log_backend_message(f"FRONTEND_LOG: ⚠️ Error details: {error_message}") JOBS.pop(job_id, None) return Response({ "status": "error", "error": error_message, "logs": BACKEND_LOGS.copy() }, status=status.HTTP_400_BAD_REQUEST) except Exception as e: # Handle other types of exceptions log_backend_error(f"ResultView: Error getting result for job {job_id}: {e}", exc_info=True) log_backend_message(f"FRONTEND_LOG: ❌ Unexpected error: {str(e)}") JOBS.pop(job_id, None) return Response({ "status": "error", "error": f"Error processing result: {str(e)}", "logs": BACKEND_LOGS.copy() }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) elif status_name in ["cancelled", "failed"]: log_backend_message(f"ResultView: Job {job_id} ended with terminal status: {status_name}") log_backend_message(f"FRONTEND_LOG: ⚠️ Job {status_name}: The process was terminated before completion.") JOBS.pop(job_id, None) return Response({"status": status_name, "error": f"Job ended with status: {status_name}"}, status=status.HTTP_200_OK) else: # The job is still running log_backend_message(f"ResultView: Job {job_id} still processing.") # For polling requests, return the current logs logs = BACKEND_LOGS.copy() BACKEND_LOGS.clear() return Response({"status": "processing", "detail": status_name, "logs": logs}, status=status.HTTP_200_OK) except Exception as e: log_backend_error(f"ResultView: Error processing job {job_id}: {e}", exc_info=True) log_backend_message(f"FRONTEND_LOG: ❌ Job processing error: {str(e)}") JOBS.pop(job_id, None) return Response({"status": "error", "error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)