hadinicknam's picture
better Error calling
1ac656e
import logging
import uuid
import os
import shutil
import pathlib
from urllib.parse import urlparse
from django.conf import settings
from django.core.files.storage import FileSystemStorage
from django.core.exceptions import SuspiciousFileOperation
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status, permissions
from .gradio_helpers import get_space_details
from gradio_client import Client, exceptions, file as gradio_file
import time
# In-memory log storage for debugging
BACKEND_LOGS = []
def log_backend_message(message):
"""Adds a message to the in-memory log store."""
logging.info(message)
BACKEND_LOGS.append(f"INFO: {message}")
def log_backend_error(message, exc_info=False):
"""Adds an error to the in-memory log store."""
logging.error(message, exc_info=exc_info)
BACKEND_LOGS.append(f"ERROR: {message}")
class BackendLogView(APIView):
"""An endpoint to fetch and clear backend logs for debugging."""
permission_classes = [permissions.AllowAny]
def get(self, request, *args, **kwargs):
if not settings.DEBUG:
return Response({"error": "Logs are disabled in production"}, status=status.HTTP_403_FORBIDDEN)
logs = BACKEND_LOGS.copy()
BACKEND_LOGS.clear()
return Response({"logs": logs})
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
JOBS = {}
# Short-lived cache for completed results to tolerate trailing polls from the frontend
COMPLETED_RESULTS = {}
COMPLETED_TTL_SECONDS = 30
CLIENT_CACHE = {}
class FileUploadView(APIView):
permission_classes = [permissions.AllowAny]
def post(self, request, *args, **kwargs):
log_backend_message("FileUploadView: Received file upload request.")
file_obj = request.FILES.get('file')
if not file_obj:
log_backend_error("FileUploadView: No file provided in the request.")
return Response({"error": "File not provided"}, status=status.HTTP_400_BAD_REQUEST)
if not request.session.session_key:
request.session.create()
user_id = request.session.session_key or 'anonymous'
log_backend_message(f"FileUploadView: Upload initiated by user_id: {user_id}")
# Validate filename and content type
allowed_extensions = {'.png', '.jpg', '.jpeg', '.gif', '.webp', '.txt', '.csv', '.json', '.pdf'}
extension = os.path.splitext(file_obj.name)[1].lower()
if extension not in allowed_extensions:
log_backend_error(f"FileUploadView: Disallowed file extension: {extension}")
return Response({"error": "Unsupported file type"}, status=status.HTTP_400_BAD_REQUEST)
max_size_bytes = 10 * 1024 * 1024 # 10 MB
if file_obj.size and file_obj.size > max_size_bytes:
log_backend_error("FileUploadView: Uploaded file exceeds size limit")
return Response({"error": "File too large"}, status=status.HTTP_400_BAD_REQUEST)
# Store uploads outside of STATIC/MEDIA served paths, under per-user directory
# Use a writable uploads dir under /data
upload_root = os.getenv('DJANGO_UPLOADS_ROOT', str(getattr(settings, 'UPLOADS_ROOT', settings.BASE_DIR / 'uploads')))
upload_dir = os.path.join(upload_root, user_id)
try:
os.makedirs(upload_dir, exist_ok=True)
except PermissionError:
# Fallback to /data/uploads if root filesystem is read-only (HF Spaces)
upload_root = '/data/uploads'
upload_dir = os.path.join(upload_root, user_id)
os.makedirs(upload_dir, exist_ok=True)
fs = FileSystemStorage(location=upload_dir)
try:
filename = fs.save(file_obj.name, file_obj)
except (SuspiciousFileOperation, OSError) as e:
log_backend_error(f"FileUploadView: Failed to save file securely: {e}")
return Response({"error": "Invalid filename"}, status=status.HTTP_400_BAD_REQUEST)
file_path = fs.path(filename)
log_backend_message(f"FileUploadView: File '{filename}' saved to '{file_path}'.")
return Response({"path": file_path}, status=status.HTTP_201_CREATED)
import logging
import traceback
from urllib.parse import urlparse
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status, permissions
from .gradio_helpers import get_space_details
from .views import log_backend_message, log_backend_error # adjust import as needed
class GradioView(APIView):
"""
API endpoint to fetch details of a Gradio Space.
Expects a 'space_id' query parameter (either a Hugging Face Space ID or a full URL).
Returns structured information about the space's endpoints and components.
"""
permission_classes = [permissions.AllowAny]
def get(self, request, *args, **kwargs):
log_backend_message("GradioView: Received request to fetch space details.")
space_id = request.query_params.get('space_id')
log_backend_message(f"GradioView: Requested space_id: '{space_id}'")
if not space_id:
log_backend_error("GradioView: space_id parameter is missing.")
return Response(
{"error": {"type": "missing_param", "message": "space_id is required"}},
status=status.HTTP_400_BAD_REQUEST
)
# If the user provided a full HF Space URL, extract the ID
if space_id.startswith('http'):
log_backend_message("GradioView: space_id is a URL, attempting to parse.")
try:
parsed_url = urlparse(space_id)
path_parts = parsed_url.path.strip('/').split('/')
# Typical format: /spaces/username/space-name
if len(path_parts) >= 3 and path_parts[0] == 'spaces':
extracted_id = f"{path_parts[1]}/{path_parts[2]}"
log_backend_message(f"GradioView: Extracted space_id '{extracted_id}' from URL.")
space_id = extracted_id
else:
raise ValueError("URL does not match expected /spaces/username/space-name pattern")
except Exception as e:
log_backend_error(f"GradioView: Failed to parse URL: {e}")
return Response(
{"error": {"type": "invalid_url", "message": "Invalid Hugging Face Space URL provided."}},
status=status.HTTP_400_BAD_REQUEST
)
# Call the enhanced helper that returns structured results
try:
log_backend_message(f"GradioView: Calling get_space_details for '{space_id}'.")
result = get_space_details(space_id)
if result.get("success"):
log_backend_message(f"GradioView: Successfully fetched details for '{space_id}'.")
return Response(result["data"])
else:
# Handle known error types with appropriate HTTP status codes
error_info = result["error"]
error_type = error_info.get("type", "unknown")
# Map error types to HTTP status codes
status_map = {
"authentication": status.HTTP_401_UNAUTHORIZED,
"build": status.HTTP_503_SERVICE_UNAVAILABLE,
"server": status.HTTP_502_BAD_GATEWAY,
"not_found": status.HTTP_404_NOT_FOUND,
"timeout": status.HTTP_504_GATEWAY_TIMEOUT,
}
http_status = status_map.get(error_type, status.HTTP_500_INTERNAL_SERVER_ERROR)
log_backend_error(f"GradioView: Failed to get space details for '{space_id}'. Error: {error_info}")
return Response({"error": error_info}, status=http_status)
except Exception as e:
# This should not happen if get_space_details catches everything, but just in case
log_backend_error(f"GradioView: Unhandled exception while processing '{space_id}': {traceback.format_exc()}")
return Response(
{
"error": {
"type": "unhandled",
"message": f"Internal server error: {str(e)}",
"detail": traceback.format_exc() if settings.DEBUG else None
}
},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
class PredictView(APIView):
permission_classes = [permissions.AllowAny]
def post(self, request, *args, **kwargs):
BACKEND_LOGS.clear() # Clear logs for new run
log_backend_message("PredictView: Received new prediction request.")
space_id = request.data.get('space_id')
api_name = request.data.get('api_name')
inputs = request.data.get('inputs', [])
log_backend_message(f"PredictView: Space: {space_id}, API: {api_name}, Inputs: {inputs}")
if not all([space_id, api_name]):
log_backend_error("PredictView: Missing space_id or api_name.")
return Response({"error": "space_id and api_name are required"}, status=status.HTTP_400_BAD_REQUEST)
try:
if space_id in CLIENT_CACHE:
log_backend_message(f"PredictView: Reusing existing Gradio client for space: {space_id}")
client = CLIENT_CACHE[space_id]
else:
log_backend_message(f"PredictView: Initializing new Gradio client for space: {space_id}")
# Use HF_TOKEN from environment if available for gated/private spaces
hf_token = os.getenv("HF_TOKEN")
client = Client(space_id, hf_token=hf_token, httpx_kwargs={"timeout": 1000}, verbose=True)
CLIENT_CACHE[space_id] = client
log_backend_message(f"PredictView: New client for {space_id} cached.")
# Process inputs to handle file paths correctly
processed_inputs = []
for input_value in inputs:
# Check if the input is a file path string that exists on the local system
if isinstance(input_value, str) and os.path.exists(input_value) and os.path.isfile(input_value):
# Convert file path to a Gradio file object that includes the file content
log_backend_message(f"PredictView: Converting local file path '{input_value}' to file content")
try:
# Use gradio_client's file utility to properly handle the file
processed_value = gradio_file(input_value)
processed_inputs.append(processed_value)
except Exception as file_error:
log_backend_error(f"PredictView: Failed to process file '{input_value}': {str(file_error)}")
return Response({"error": f"File processing error: {str(file_error)}"},
status=status.HTTP_400_BAD_REQUEST)
else:
# Non-file inputs pass through unchanged
processed_inputs.append(input_value)
log_backend_message("PredictView: Submitting job to Gradio client with processed inputs...")
job = client.submit(*processed_inputs, api_name=api_name)
job_id = str(uuid.uuid4())
# Store input types for better error reporting
input_types = [type(inp).__name__ for inp in inputs]
if not request.session.session_key:
request.session.create()
JOBS[job_id] = {'job': job, 'owner_session': request.session.session_key, 'input_types': input_types}
log_backend_message(f"PredictView: Job submitted with temporary ID: {job_id}")
return Response({"job_id": job_id}, status=status.HTTP_202_ACCEPTED)
except Exception as e:
log_backend_error(f"PredictView: Prediction failed for space '{space_id}': {str(e)}", exc_info=True)
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class ResultView(APIView):
permission_classes = [permissions.AllowAny]
def _process_result(self, result, request):
log_backend_message(f"ResultView._process_result: Processing item. Type: {type(result)}. Value: {result}")
# Recursively process lists and tuples
if isinstance(result, list) or isinstance(result, tuple):
log_backend_message(f"ResultView._process_result: Item is a {type(result).__name__}, processing each element recursively.")
return [self._process_result(item, request) for item in result]
if isinstance(result, str):
# EAFP (Easier to Ask for Forgiveness than Permission) approach
try:
# Attempt to treat the string as a file path
log_backend_message(f"ResultView._process_result: Item is a string. Attempting to copy '{result}' as a file.")
# An extra check to avoid trying to copy things that are clearly not paths
if not os.path.sep in result:
log_backend_message(f"ResultView._process_result: '{result}' does not contain a path separator. Assuming it's a regular string.")
return result
if not request.session.session_key:
request.session.create()
user_id = request.session.session_key or 'anonymous'
user_media_dir = os.path.join(settings.MEDIA_ROOT, user_id)
if not os.path.exists(user_media_dir):
os.makedirs(user_media_dir)
log_backend_message(f"ResultView._process_result: Created user media directory: {user_media_dir}")
# Create a unique filename to avoid conflicts
original_filename = os.path.basename(result)
unique_filename = str(uuid.uuid4()) + os.path.splitext(original_filename)[1]
destination_path = os.path.join(user_media_dir, unique_filename)
log_backend_message(f"ResultView._process_result: Destination path for copy: {destination_path}")
shutil.copy(result, destination_path)
log_backend_message("ResultView._process_result: File copied successfully.")
# Construct the public URL
file_url_path = f"{settings.MEDIA_URL}{user_id}/{unique_filename}"
url = request.build_absolute_uri(file_url_path)
log_backend_message(f"ResultView._process_result: Successfully converted file path to public URL: '{url}'")
return url
except (FileNotFoundError, IsADirectoryError, OSError) as e:
# This will trigger if `result` is not a valid file path
log_backend_message(f"ResultView._process_result: Could not treat '{result}' as a file. It's likely a regular string. Error: {e}")
return result
except Exception as e:
# Catch any other unexpected errors during the copy
log_backend_error(f"ResultView._process_result: An unexpected error occurred while processing '{result}'. Error: {e}", exc_info=True)
return result
log_backend_message(f"ResultView._process_result: Item is not a string, list, or tuple ({type(result).__name__}), returning as is.")
return result
def get(self, request, job_id, *args, **kwargs):
log_backend_message(f"ResultView: Received result request for job_id: {job_id}")
# Opportunistic cleanup of expired completed entries
now_ts = time.time()
for key, value in list(COMPLETED_RESULTS.items()):
if value.get('expiry', 0) <= now_ts:
COMPLETED_RESULTS.pop(key, None)
job_info = JOBS.get(job_id)
if job_info is None:
completed_entry = COMPLETED_RESULTS.get(job_id)
if completed_entry and completed_entry.get('expiry', 0) > now_ts:
log_backend_message(f"ResultView: Serving cached completed result for job {job_id}.")
return Response({
"status": "completed",
"result": completed_entry.get('result'),
"logs": []
}, status=status.HTTP_200_OK)
log_backend_message(f"ResultView: Job with ID {job_id} not found (likely already completed and evicted).")
return Response({"status": "not_found"}, status=status.HTTP_200_OK)
if not request.session.session_key:
request.session.create()
owner_session = job_info.get('owner_session')
current_session = request.session.session_key
if owner_session != current_session:
# In embedded contexts (HF Spaces iframe), browsers may block third-party cookies,
# causing session changes between requests. Allow bypass if explicitly enabled.
if os.getenv('DISABLE_JOB_SESSION_CHECK', '1') == '1':
log_backend_message(
f"ResultView: Session mismatch for job {job_id} (owner={owner_session}, current={current_session}). Bypassing due to DISABLE_JOB_SESSION_CHECK=1."
)
else:
log_backend_error(
f"ResultView: Session {current_session} attempted to access job {job_id} owned by {owner_session}"
)
return Response({"error": "Forbidden"}, status=status.HTTP_403_FORBIDDEN)
job = job_info['job']
try:
try:
status_name = job.status().code.name.lower()
except Exception:
# If the client discarded the job (e.g., remote space returned 404 on queue),
# treat as finished to force a result fetch attempt which surfaces the error.
status_name = "finished"
log_backend_message(f"ResultView: Job {job_id} status is '{status_name}'.")
if status_name == "finished":
log_backend_message(f"ResultView: Job {job_id} is finished. Processing final result.")
try:
# Added try-except block specifically around job.result() call
raw_results = job.result()
log_backend_message(f"ResultView: Job {job_id} completed with raw result: {raw_results}")
if not isinstance(raw_results, list):
raw_results = [raw_results]
final_result = self._process_result(raw_results, request)
JOBS.pop(job_id, None)
# Capture the logs from this final processing step
final_logs = BACKEND_LOGS.copy()
BACKEND_LOGS.clear()
log_backend_message(f"ResultView: Final processed result for job {job_id}: {final_result}")
# Cache the result briefly to absorb trailing polls
COMPLETED_RESULTS[job_id] = {"result": final_result, "expiry": time.time() + COMPLETED_TTL_SECONDS}
return Response({
"status": "completed",
"result": final_result,
"logs": final_logs # Include final logs in the response
}, status=status.HTTP_200_OK)
except exceptions.AppError as e:
# Specifically handling Gradio AppError
error_message = str(e)
log_backend_error(f"ResultView: Gradio app error for job {job_id}: {error_message}")
# Add special frontend console logging with the error details
log_backend_message(f"FRONTEND_LOG: ⚠️ Gradio API Error: {error_message}")
# Provide a more user-friendly message for the "show_error=True" issue
if "show_error=True in launch()" in error_message:
# Check if this might be an image upload issue based on stored input types
input_types = job_info.get('input_types', [])
friendly_message = "The remote Gradio app encountered an error but didn't provide detailed information."
if any('file' in str(t).lower() for t in input_types) or any('image' in str(t).lower() for t in input_types):
friendly_message += " This may be related to image upload. Try using a different image format or size."
# Add detailed technical information for the frontend console
log_backend_message(f"FRONTEND_LOG: 📋 Technical Details: The Gradio space needs 'show_error=True' in its launch() method to provide more specific error information.")
log_backend_message(f"FRONTEND_LOG: 💡 Recommendation: Try a different input or contact the space owner about enabling detailed error reporting.")
# Add timestamp for debugging
import datetime
timestamp = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
log_backend_message(f"FRONTEND_LOG: 🕒 Error occurred at (UTC): {timestamp}")
log_backend_message(f"ResultView: Providing friendly error message: {friendly_message}")
JOBS.pop(job_id, None)
return Response({
"status": "error",
"error": friendly_message,
"original_error": error_message,
"logs": BACKEND_LOGS.copy()
}, status=status.HTTP_400_BAD_REQUEST)
else:
# For other AppErrors, forward the original message but also log for frontend
log_backend_message(f"FRONTEND_LOG: ⚠️ Error details: {error_message}")
JOBS.pop(job_id, None)
return Response({
"status": "error",
"error": error_message,
"logs": BACKEND_LOGS.copy()
}, status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
# Handle other types of exceptions
log_backend_error(f"ResultView: Error getting result for job {job_id}: {e}", exc_info=True)
log_backend_message(f"FRONTEND_LOG: ❌ Unexpected error: {str(e)}")
JOBS.pop(job_id, None)
return Response({
"status": "error",
"error": f"Error processing result: {str(e)}",
"logs": BACKEND_LOGS.copy()
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
elif status_name in ["cancelled", "failed"]:
log_backend_message(f"ResultView: Job {job_id} ended with terminal status: {status_name}")
log_backend_message(f"FRONTEND_LOG: ⚠️ Job {status_name}: The process was terminated before completion.")
JOBS.pop(job_id, None)
return Response({"status": status_name, "error": f"Job ended with status: {status_name}"}, status=status.HTTP_200_OK)
else: # The job is still running
log_backend_message(f"ResultView: Job {job_id} still processing.")
# For polling requests, return the current logs
logs = BACKEND_LOGS.copy()
BACKEND_LOGS.clear()
return Response({"status": "processing", "detail": status_name, "logs": logs}, status=status.HTTP_200_OK)
except Exception as e:
log_backend_error(f"ResultView: Error processing job {job_id}: {e}", exc_info=True)
log_backend_message(f"FRONTEND_LOG: ❌ Job processing error: {str(e)}")
JOBS.pop(job_id, None)
return Response({"status": "error", "error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)