Spaces:
Paused
Paused
| import gradio as gr | |
| import os | |
| import tempfile | |
| import json | |
| from datetime import datetime | |
| import re | |
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import base64 | |
| try: | |
| import oci | |
| from oci.object_storage import ObjectStorageClient | |
| OCI_AVAILABLE = True | |
| print("β OCI package imported successfully") | |
| except ImportError as e: | |
| OCI_AVAILABLE = False | |
| print(f"β OCI package import failed: {e}") | |
| # OCI Connector Class | |
| class OCIStorageConnector: | |
| def __init__(self): | |
| self.config = self._load_config() | |
| self.client = None | |
| self.initialization_error = None | |
| self._initialize_client() | |
| def _load_config(self): | |
| """Load configuration from environment variables""" | |
| config = { | |
| 'user_ocid': os.getenv('OCI_USER_OCID'), | |
| 'tenancy_ocid': os.getenv('OCI_TENANCY_OCID'), | |
| 'key_fingerprint': os.getenv('OCI_FINGERPRINT'), | |
| 'private_key': os.getenv('OCI_PRIVATE_KEY'), | |
| 'region': os.getenv('OCI_REGION'), | |
| 'namespace': os.getenv('OCI_NAMESPACE'), | |
| 'bucket_name': os.getenv('OCI_BUCKET_NAME') | |
| } | |
| # Check for missing config | |
| missing = [k for k, v in config.items() if not v] | |
| if missing: | |
| print(f"β Missing config values: {missing}") | |
| return config | |
| def _initialize_client(self): | |
| """Initialize OCI client with detailed error handling""" | |
| try: | |
| if not OCI_AVAILABLE: | |
| self.initialization_error = "OCI SDK not available" | |
| return | |
| # Check if we have all required config | |
| required_config = ['user_ocid', 'tenancy_ocid', 'key_fingerprint', 'private_key', 'region'] | |
| missing_config = [key for key in required_config if not self.config.get(key)] | |
| if missing_config: | |
| self.initialization_error = f"Missing config: {missing_config}" | |
| return | |
| print("π§ Attempting to initialize OCI client...") | |
| print(f" Region: {self.config['region']}") | |
| # FIX: Replace \n with actual newlines for Hugging Face secrets | |
| private_key_content = self.config['private_key'].replace('\\n', '\n') | |
| config = { | |
| "user": self.config['user_ocid'], | |
| "key_content": private_key_content, | |
| "fingerprint": self.config['key_fingerprint'], | |
| "tenancy": self.config['tenancy_ocid'], | |
| "region": self.config['region'] | |
| } | |
| self.client = ObjectStorageClient(config) | |
| # Test the connection with a simple API call | |
| try: | |
| namespace = self.client.get_namespace().data | |
| print(f"β OCI Client initialized successfully! Namespace: {namespace}") | |
| except Exception as test_error: | |
| self.initialization_error = f"Connection test failed: {test_error}" | |
| print(f"β Connection test failed: {test_error}") | |
| self.client = None | |
| except Exception as e: | |
| self.initialization_error = f"Client initialization failed: {str(e)}" | |
| print(f"β OCI Client initialization failed: {e}") | |
| self.client = None | |
| def upload_file(self, file_path, object_name, project_id=None): | |
| """Upload a file to OCI Object Storage""" | |
| try: | |
| if self.initialization_error: | |
| return False, f"OCI Client Error: {self.initialization_error}" | |
| if not self.client: | |
| return False, "OCI client not initialized" | |
| namespace = self.config['namespace'] | |
| bucket_name = self.config['bucket_name'] | |
| if not namespace or not bucket_name: | |
| return False, "Namespace or bucket name not configured" | |
| # Use the provided object_name as-is (it should already include project_id/subfolder/) | |
| final_object_name = object_name | |
| print(f"π€ Uploading {file_path} to {final_object_name}") | |
| # Upload the file | |
| with open(file_path, 'rb') as file: | |
| response = self.client.put_object( | |
| namespace_name=namespace, | |
| bucket_name=bucket_name, | |
| object_name=final_object_name, | |
| put_object_body=file | |
| ) | |
| print(f"β Upload successful! ETag: {response.headers['etag']}") | |
| return True, f"File uploaded: {final_object_name}" | |
| except oci.exceptions.ServiceError as e: | |
| error_msg = f"OCI Service Error (Status: {e.status}): {e.message}" | |
| print(f"β {error_msg}") | |
| return False, error_msg | |
| except Exception as e: | |
| error_msg = f"Upload failed: {str(e)}" | |
| print(f"β {error_msg}") | |
| return False, error_msg | |
| def list_files(self, prefix=None): | |
| """List files in OCI bucket, optionally filtered by prefix""" | |
| try: | |
| if self.initialization_error: | |
| print(f"β Cannot list files: {self.initialization_error}") | |
| return [] | |
| if not self.client: | |
| print("β OCI client not available for listing") | |
| return [] | |
| namespace = self.config['namespace'] | |
| bucket_name = self.config['bucket_name'] | |
| if not namespace or not bucket_name: | |
| print("β Namespace or bucket name missing for listing") | |
| return [] | |
| print(f"π Listing objects in bucket: {bucket_name}" + (f" with prefix: {prefix}" if prefix else "")) | |
| # List objects in the bucket | |
| list_objects_kwargs = { | |
| 'namespace_name': namespace, | |
| 'bucket_name': bucket_name | |
| } | |
| if prefix: | |
| list_objects_kwargs['prefix'] = prefix | |
| list_objects_kwargs['delimiter'] = '/' | |
| response = self.client.list_objects(**list_objects_kwargs) | |
| print(f"π List response received: {len(response.data.objects)} objects") | |
| files = [] | |
| for obj in response.data.objects: | |
| # Handle None values | |
| size_display = f"{obj.size / 1024 / 1024:.1f} MB" if obj.size else "Unknown size" | |
| time_display = obj.time_created.strftime("%Y-%m-%d %H:%M:%S") if obj.time_created else "Unknown time" | |
| files.append({ | |
| 'name': obj.name, | |
| 'size': size_display, | |
| 'time': time_display, | |
| 'size_bytes': obj.size or 0, | |
| 'directory': '/' in obj.name | |
| }) | |
| print(f"β Found {len(files)} objects") | |
| return files | |
| except oci.exceptions.ServiceError as e: | |
| print(f"β List failed - Service Error: {e.message} (Status: {e.status})") | |
| return [] | |
| except Exception as e: | |
| print(f"β List failed: {e}") | |
| return [] | |
| def list_directories(self): | |
| """List all directories (prefixes) in the bucket""" | |
| try: | |
| if self.initialization_error: | |
| return [] | |
| if not self.client: | |
| return [] | |
| namespace = self.config['namespace'] | |
| bucket_name = self.config['bucket_name'] | |
| # List objects with delimiter to get directories | |
| response = self.client.list_objects( | |
| namespace_name=namespace, | |
| bucket_name=bucket_name, | |
| delimiter='/' | |
| ) | |
| directories = [] | |
| if hasattr(response.data, 'prefixes'): | |
| directories = response.data.prefixes | |
| return directories | |
| except Exception as e: | |
| print(f"β List directories failed: {e}") | |
| return [] | |
| def download_file(self, object_name, download_path): | |
| """Download a file from OCI bucket""" | |
| try: | |
| if self.initialization_error: | |
| return False, f"OCI Client Error: {self.initialization_error}" | |
| if not self.client: | |
| return False, "OCI client not initialized" | |
| namespace = self.config['namespace'] | |
| bucket_name = self.config['bucket_name'] | |
| print(f"π₯ Downloading {object_name}") | |
| # Download the file | |
| response = self.client.get_object( | |
| namespace_name=namespace, | |
| bucket_name=bucket_name, | |
| object_name=object_name | |
| ) | |
| # Save to file | |
| with open(download_path, 'wb') as f: | |
| for chunk in response.data.raw.stream(1024 * 1024): | |
| f.write(chunk) | |
| print(f"β Download successful: {object_name}") | |
| return True, f"File downloaded successfully!" | |
| except oci.exceptions.ServiceError as e: | |
| error_msg = f"OCI Service Error (Status: {e.status}): {e.message}" | |
| print(f"β {error_msg}") | |
| return False, error_msg | |
| except Exception as e: | |
| error_msg = f"Download failed: {str(e)}" | |
| print(f"β {error_msg}") | |
| return False, error_msg | |
| def delete_file(self, object_name): | |
| """Delete a file from OCI bucket""" | |
| try: | |
| if self.initialization_error: | |
| return False, f"OCI Client Error: {self.initialization_error}" | |
| if not self.client: | |
| return False, "OCI client not initialized" | |
| namespace = self.config['namespace'] | |
| bucket_name = self.config['bucket_name'] | |
| print(f"ποΈ Deleting {object_name}") | |
| # Delete the file | |
| response = self.client.delete_object( | |
| namespace_name=namespace, | |
| bucket_name=bucket_name, | |
| object_name=object_name | |
| ) | |
| print(f"β Delete successful: {object_name}") | |
| return True, f"File deleted successfully!" | |
| except oci.exceptions.ServiceError as e: | |
| error_msg = f"OCI Service Error (Status: {e.status}): {e.message}" | |
| print(f"β {error_msg}") | |
| return False, error_msg | |
| except Exception as e: | |
| error_msg = f"Delete failed: {str(e)}" | |
| print(f"β {error_msg}") | |
| return False, error_msg | |
| # Initialize OCI connector | |
| print("π Initializing OCI Connector...") | |
| oci_connector = OCIStorageConnector() | |
| # Create FastAPI app for API endpoints | |
| app = FastAPI(title="OCI Storage API") | |
| # Add CORS middleware to allow requests from n8n | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| async def api_upload( | |
| project_id: str = Form(...), | |
| file: UploadFile = File(...), | |
| subfolder: str = Form(None) | |
| ): | |
| """API endpoint for n8n and other services to upload files with subfolder support""" | |
| try: | |
| print(f"π₯ API Upload received") | |
| print(f" Project ID: {project_id}") | |
| print(f" Subfolder: {subfolder}") | |
| print(f" Filename: {file.filename}") | |
| print(f" Content Type: {file.content_type}") | |
| # Check if project_id is provided | |
| if not project_id or project_id.strip() == "": | |
| print("β Error: project_id is required") | |
| return {"status": "error", "message": "project_id is required"} | |
| # Read file content | |
| file_content = await file.read() | |
| print(f" File size: {len(file_content)} bytes") | |
| # Create temporary file | |
| temp_path = f"/tmp/{file.filename}" | |
| with open(temp_path, "wb") as f: | |
| f.write(file_content) | |
| print(f" Temporary file created: {temp_path}") | |
| # Create object name with project_id and subfolder directory | |
| if project_id and project_id.strip(): | |
| # Sanitize project_id for use in path | |
| sanitized_project_id = re.sub(r'[^a-zA-Z0-9_\-]', '', project_id.strip()) | |
| # Add subfolder if specified | |
| if subfolder and subfolder.strip(): | |
| sanitized_subfolder = re.sub(r'[^a-zA-Z0-9_\-]', '', subfolder.strip()) | |
| final_object_name = f"{sanitized_project_id}/{sanitized_subfolder}/{file.filename}" | |
| print(f" Using subfolder: {sanitized_subfolder}") | |
| else: | |
| final_object_name = f"{sanitized_project_id}/{file.filename}" | |
| print(f" No subfolder specified") | |
| else: | |
| final_object_name = file.filename | |
| print(f" Final object name: {final_object_name}") | |
| # Upload to OCI with the final object name | |
| success, message = oci_connector.upload_file(temp_path, final_object_name, None) | |
| # Clean up temporary file | |
| try: | |
| os.remove(temp_path) | |
| print(f" Temporary file cleaned up") | |
| except: | |
| print(" Warning: Could not remove temporary file") | |
| pass | |
| if success: | |
| print(f"β Upload successful: {message}") | |
| return { | |
| "status": "success", | |
| "message": message, | |
| "project_id": project_id, | |
| "subfolder": subfolder, | |
| "object_name": final_object_name | |
| } | |
| else: | |
| print(f"β Upload failed: {message}") | |
| return { | |
| "status": "error", | |
| "message": message, | |
| "project_id": project_id, | |
| "subfolder": subfolder | |
| } | |
| except Exception as e: | |
| error_msg = f"API error: {str(e)}" | |
| print(f"β {error_msg}") | |
| import traceback | |
| print(f"Traceback: {traceback.format_exc()}") | |
| return { | |
| "status": "error", | |
| "message": error_msg, | |
| "project_id": project_id, | |
| "subfolder": subfolder | |
| } | |
| async def api_upload_text( | |
| project_id: str = Form(...), | |
| content: str = Form(...), | |
| filename: str = Form("text_file.txt"), | |
| subfolder: str = Form(None) | |
| ): | |
| """API endpoint for uploading text content directly with subfolder support""" | |
| try: | |
| print(f"π₯ API Text Upload received - Project: {project_id}, File: {filename}, Subfolder: {subfolder}") | |
| # Create temporary file with text content | |
| temp_path = f"/tmp/{filename}" | |
| with open(temp_path, "w", encoding="utf-8") as f: | |
| f.write(content) | |
| # Create object name with project_id and subfolder directory | |
| if project_id and project_id.strip(): | |
| # Sanitize project_id for use in path | |
| sanitized_project_id = re.sub(r'[^a-zA-Z0-9_\-]', '', project_id.strip()) | |
| # Add subfolder if specified | |
| if subfolder and subfolder.strip(): | |
| sanitized_subfolder = re.sub(r'[^a-zA-Z0-9_\-]', '', subfolder.strip()) | |
| final_object_name = f"{sanitized_project_id}/{sanitized_subfolder}/{filename}" | |
| else: | |
| final_object_name = f"{sanitized_project_id}/{filename}" | |
| else: | |
| final_object_name = filename | |
| # Upload to OCI with the final object name | |
| success, message = oci_connector.upload_file(temp_path, final_object_name, None) | |
| # Clean up temporary file | |
| try: | |
| os.remove(temp_path) | |
| except: | |
| pass | |
| if success: | |
| return { | |
| "status": "success", | |
| "message": message, | |
| "project_id": project_id, | |
| "subfolder": subfolder, | |
| "object_name": final_object_name | |
| } | |
| else: | |
| return { | |
| "status": "error", | |
| "message": message, | |
| "project_id": project_id, | |
| "subfolder": subfolder | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": f"API error: {str(e)}", | |
| "project_id": project_id, | |
| "subfolder": subfolder | |
| } | |
| async def api_health(): | |
| """Health check endpoint""" | |
| return { | |
| "status": "healthy", | |
| "oci_available": OCI_AVAILABLE, | |
| "oci_initialized": oci_connector.client is not None, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| async def api_list_files(project_id: str): | |
| """API endpoint to list files in a project directory""" | |
| try: | |
| files = oci_connector.list_files(prefix=project_id) | |
| return {"status": "success", "project_id": project_id, "files": files} | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| async def debug_bucket_structure(): | |
| """Debug endpoint to check bucket structure""" | |
| try: | |
| # List all objects to see the current structure | |
| all_files = oci_connector.list_files() | |
| # Group by project and subfolder | |
| structure = {} | |
| for file in all_files: | |
| parts = file['name'].split('/') | |
| if len(parts) >= 2: | |
| project = parts[0] | |
| if project not in structure: | |
| structure[project] = {} | |
| if len(parts) >= 3: | |
| subfolder = parts[1] | |
| filename = '/'.join(parts[2:]) | |
| if subfolder not in structure[project]: | |
| structure[project][subfolder] = [] | |
| structure[project][subfolder].append(filename) | |
| else: | |
| if 'root' not in structure[project]: | |
| structure[project]['root'] = [] | |
| structure[project]['root'].append(parts[1]) | |
| return { | |
| "status": "success", | |
| "bucket_structure": structure, | |
| "total_files": len(all_files) | |
| } | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| # Gradio Functions | |
| def upload_video(file, project_id, subfolder=None): | |
| """Upload file to OCI with project_id directory and optional subfolder""" | |
| if file is None: | |
| return "β Please select a file first" | |
| try: | |
| # Get filename | |
| filename = os.path.basename(file.name) | |
| # Create object name with project_id and subfolder directory | |
| if project_id and project_id.strip(): | |
| # Sanitize project_id for use in path | |
| sanitized_project_id = re.sub(r'[^a-zA-Z0-9_\-]', '', project_id.strip()) | |
| # Add subfolder if specified | |
| if subfolder and subfolder.strip(): | |
| sanitized_subfolder = re.sub(r'[^a-zA-Z0-9_\-]', '', subfolder.strip()) | |
| final_object_name = f"{sanitized_project_id}/{sanitized_subfolder}/{filename}" | |
| else: | |
| final_object_name = f"{sanitized_project_id}/{filename}" | |
| else: | |
| final_object_name = filename | |
| # Upload to OCI with the final object name | |
| success, message = oci_connector.upload_file(file.name, final_object_name, None) | |
| if success: | |
| return f"β {message}" | |
| else: | |
| return f"β {message}" | |
| except Exception as e: | |
| return f"β Unexpected error: {str(e)}" | |
| def list_files(prefix=None): | |
| """List files in OCI bucket, optionally filtered by prefix""" | |
| try: | |
| files = oci_connector.list_files(prefix) | |
| if not files: | |
| return gr.Dropdown(choices=[], value=None), "π No files found" + (f" in '{prefix}'" if prefix else "") | |
| # Create dropdown options | |
| file_options = [f['name'] for f in files] | |
| file_info = f"π Files{(' in ' + prefix) if prefix else ''}:\n\n" + "\n".join([f"β’ {f['name']} ({f['size']}) - {f['time']}" for f in files]) | |
| return gr.Dropdown(choices=file_options, value=file_options[0] if file_options else None), file_info | |
| except Exception as e: | |
| return gr.Dropdown(choices=[], value=None), f"β Error listing files: {str(e)}" | |
| def list_directories(): | |
| """List all directories in the bucket""" | |
| try: | |
| directories = oci_connector.list_directories() | |
| if not directories: | |
| return gr.Dropdown(choices=[], value=None), "π No directories found" | |
| dir_options = [d for d in directories if d] # Filter out empty strings | |
| dir_info = "π Project Directories:\n\n" + "\n".join([f"β’ {d}" for d in dir_options]) | |
| return gr.Dropdown(choices=dir_options, value=dir_options[0] if dir_options else None), dir_info | |
| except Exception as e: | |
| return gr.Dropdown(choices=[], value=None), f"β Error listing directories: {str(e)}" | |
| def download_file(selected_file): | |
| """Download file from OCI bucket""" | |
| if not selected_file: | |
| return None, "β Please select a file to download" | |
| try: | |
| # Create temp file for download | |
| download_dir = "/tmp/downloads" | |
| os.makedirs(download_dir, exist_ok=True) | |
| filename = os.path.basename(selected_file) | |
| download_path = os.path.join(download_dir, filename) | |
| # Download from OCI | |
| success, message = oci_connector.download_file(selected_file, download_path) | |
| if success: | |
| return download_path, f"β {message}" | |
| else: | |
| return None, f"β {message}" | |
| except Exception as e: | |
| return None, f"β Download error: {str(e)}" | |
| def delete_file(selected_file): | |
| """Delete file from OCI bucket""" | |
| if not selected_file: | |
| return "β Please select a file to delete" | |
| try: | |
| # Delete from OCI | |
| success, message = oci_connector.delete_file(selected_file) | |
| if success: | |
| return f"β {message}" | |
| else: | |
| return f"β {message}" | |
| except Exception as e: | |
| return f"β Delete error: {str(e)}" | |
| def check_environment(): | |
| """Check if environment variables are set correctly""" | |
| required_vars = ['OCI_USER_OCID', 'OCI_FINGERPRINT', 'OCI_TENANCY_OCID', | |
| 'OCI_REGION', 'OCI_NAMESPACE', 'OCI_BUCKET_NAME', 'OCI_PRIVATE_KEY'] | |
| result = "π Environment Check:\n\n" | |
| all_set = True | |
| for var in required_vars: | |
| value = os.getenv(var) | |
| if value: | |
| display_value = value[:20] + "..." if len(value) > 20 else value | |
| result += f"β {var}: {display_value}\n" | |
| else: | |
| result += f"β {var}: MISSING\n" | |
| all_set = False | |
| if hasattr(oci_connector, 'initialization_error') and oci_connector.initialization_error: | |
| result += f"\nπ§ OCI Client: β {oci_connector.initialization_error}" | |
| elif oci_connector and oci_connector.client: | |
| result += f"\nπ§ OCI Client: β INITIALIZED" | |
| else: | |
| result += f"\nπ§ OCI Client: β³ INITIALIZING" | |
| result += f"\nπ§ OCI SDK: {'β AVAILABLE' if OCI_AVAILABLE else 'β NOT AVAILABLE'}" | |
| result += f"\nπ Status: {'β ALL SET' if all_set else 'β MISSING VARIABLES'}" | |
| return result | |
| # Create Gradio interface | |
| with gr.Blocks(title="OCI Storage Control Board", theme="soft") as demo: | |
| gr.Markdown("# ποΈ OCI Storage Control Board") | |
| gr.Markdown("Complete file management with project_id-based organization") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("## π Configuration") | |
| check_btn = gr.Button("Check Environment", variant="secondary") | |
| check_output = gr.Textbox(label="Environment Status", interactive=False, lines=10) | |
| with gr.Column(scale=2): | |
| gr.Markdown("## π€ Upload Files") | |
| project_id_input = gr.Textbox( | |
| label="Project ID", | |
| placeholder="Enter project_id from n8n (e.g., 'my_video_project_x7f3')", | |
| interactive=True | |
| ) | |
| subfolder_input = gr.Textbox( | |
| label="Subfolder (Optional)", | |
| placeholder="Enter subfolder name (e.g., 'video', 'audio')", | |
| interactive=True | |
| ) | |
| file_input = gr.File( | |
| label="Select File to Upload", | |
| file_types=[ | |
| ".mp4", ".mov", ".avi", ".mkv", ".wav", ".mp3", ".ogg", ".flac", | |
| ".txt", ".pdf", ".doc", ".docx", ".xls", ".xlsx", ".ppt", ".pptx", | |
| ".jpg", ".jpeg", ".png", ".gif", ".bmp", ".svg", | |
| ".zip", ".rar", ".7z", ".tar", ".gz", | |
| ".py", ".js", ".html", ".css", ".json", ".xml" | |
| ] | |
| ) | |
| upload_btn = gr.Button("π€ Upload to OCI", variant="primary") | |
| upload_output = gr.Textbox(label="Upload Status", interactive=False) | |
| with gr.Row(): | |
| gr.Markdown("## π File Management") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### ποΈ Browse Projects") | |
| dir_btn = gr.Button("π List Projects", variant="secondary") | |
| dir_dropdown = gr.Dropdown( | |
| label="Select Project Directory", | |
| choices=[], | |
| interactive=True | |
| ) | |
| dir_info = gr.Textbox(label="Project Contents", interactive=False, lines=6) | |
| with gr.Column(scale=2): | |
| gr.Markdown("### π File Operations") | |
| list_btn = gr.Button("π Refresh File List", variant="secondary") | |
| file_dropdown = gr.Dropdown( | |
| label="Select File", | |
| choices=[], | |
| interactive=True | |
| ) | |
| file_info = gr.Textbox(label="File List", interactive=False, lines=8) | |
| with gr.Row(): | |
| download_btn = gr.Button("π₯ Download", variant="secondary") | |
| delete_btn = gr.Button("ποΈ Delete", variant="stop") | |
| download_output = gr.Textbox(label="Download Status", interactive=False) | |
| delete_output = gr.Textbox(label="Delete Status", interactive=False) | |
| with gr.Row(): | |
| download_file_component = gr.File(label="Downloaded File", visible=False) | |
| # Connect functions to buttons | |
| check_btn.click(check_environment, inputs=None, outputs=check_output) | |
| upload_btn.click(upload_video, inputs=[file_input, project_id_input, subfolder_input], outputs=upload_output) | |
| dir_btn.click( | |
| fn=list_directories, | |
| inputs=None, | |
| outputs=[dir_dropdown, dir_info] | |
| ) | |
| dir_dropdown.change( | |
| fn=lambda x: list_files(x), | |
| inputs=[dir_dropdown], | |
| outputs=[file_dropdown, file_info] | |
| ) | |
| list_btn.click( | |
| fn=lambda: list_files(None), | |
| inputs=None, | |
| outputs=[file_dropdown, file_info] | |
| ) | |
| download_btn.click( | |
| fn=download_file, | |
| inputs=[file_dropdown], | |
| outputs=[download_file_component, download_output] | |
| ) | |
| delete_btn.click( | |
| fn=delete_file, | |
| inputs=[file_dropdown], | |
| outputs=delete_output | |
| ).then( | |
| fn=lambda: list_files(dir_dropdown.value) if dir_dropdown.value else list_files(None), | |
| inputs=None, | |
| outputs=[file_dropdown, file_info] | |
| ) | |
| # Auto-load on start | |
| demo.load( | |
| fn=list_directories, | |
| inputs=None, | |
| outputs=[dir_dropdown, dir_info] | |
| ) | |
| # Mount Gradio app to FastAPI | |
| app = gr.mount_gradio_app(app, demo, path="/") | |
| # Hugging Face needs this specific variable name for detection | |
| def get_app(): | |
| """Return the FastAPI app for Hugging Face""" | |
| return app | |
| # For Hugging Face Spaces, we need to expose the app differently | |
| if __name__ == "__main__": | |
| # This is for local development only | |
| print("β App initialized successfully!") | |
| print("π‘ API Endpoints available:") | |
| print(" - POST /api/upload") | |
| print(" - POST /api/upload-text") | |
| print(" - GET /api/health") | |
| print(" - GET /api/files/{project_id}") | |
| print(" - GET /api/debug/bucket-structure") | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |
| else: | |
| # For Hugging Face deployment | |
| print("π Hugging Face Space detected - using automatic deployment") |