yukee1992's picture
Update app.py
9c6a37d verified
import gradio as gr
import os
import tempfile
import json
from datetime import datetime
import re
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import base64
try:
import oci
from oci.object_storage import ObjectStorageClient
OCI_AVAILABLE = True
print("βœ… OCI package imported successfully")
except ImportError as e:
OCI_AVAILABLE = False
print(f"❌ OCI package import failed: {e}")
# OCI Connector Class
class OCIStorageConnector:
def __init__(self):
self.config = self._load_config()
self.client = None
self.initialization_error = None
self._initialize_client()
def _load_config(self):
"""Load configuration from environment variables"""
config = {
'user_ocid': os.getenv('OCI_USER_OCID'),
'tenancy_ocid': os.getenv('OCI_TENANCY_OCID'),
'key_fingerprint': os.getenv('OCI_FINGERPRINT'),
'private_key': os.getenv('OCI_PRIVATE_KEY'),
'region': os.getenv('OCI_REGION'),
'namespace': os.getenv('OCI_NAMESPACE'),
'bucket_name': os.getenv('OCI_BUCKET_NAME')
}
# Check for missing config
missing = [k for k, v in config.items() if not v]
if missing:
print(f"❌ Missing config values: {missing}")
return config
def _initialize_client(self):
"""Initialize OCI client with detailed error handling"""
try:
if not OCI_AVAILABLE:
self.initialization_error = "OCI SDK not available"
return
# Check if we have all required config
required_config = ['user_ocid', 'tenancy_ocid', 'key_fingerprint', 'private_key', 'region']
missing_config = [key for key in required_config if not self.config.get(key)]
if missing_config:
self.initialization_error = f"Missing config: {missing_config}"
return
print("πŸ”§ Attempting to initialize OCI client...")
print(f" Region: {self.config['region']}")
# FIX: Replace \n with actual newlines for Hugging Face secrets
private_key_content = self.config['private_key'].replace('\\n', '\n')
config = {
"user": self.config['user_ocid'],
"key_content": private_key_content,
"fingerprint": self.config['key_fingerprint'],
"tenancy": self.config['tenancy_ocid'],
"region": self.config['region']
}
self.client = ObjectStorageClient(config)
# Test the connection with a simple API call
try:
namespace = self.client.get_namespace().data
print(f"βœ… OCI Client initialized successfully! Namespace: {namespace}")
except Exception as test_error:
self.initialization_error = f"Connection test failed: {test_error}"
print(f"❌ Connection test failed: {test_error}")
self.client = None
except Exception as e:
self.initialization_error = f"Client initialization failed: {str(e)}"
print(f"❌ OCI Client initialization failed: {e}")
self.client = None
def upload_file(self, file_path, object_name, project_id=None):
"""Upload a file to OCI Object Storage"""
try:
if self.initialization_error:
return False, f"OCI Client Error: {self.initialization_error}"
if not self.client:
return False, "OCI client not initialized"
namespace = self.config['namespace']
bucket_name = self.config['bucket_name']
if not namespace or not bucket_name:
return False, "Namespace or bucket name not configured"
# Use the provided object_name as-is (it should already include project_id/subfolder/)
final_object_name = object_name
print(f"πŸ“€ Uploading {file_path} to {final_object_name}")
# Upload the file
with open(file_path, 'rb') as file:
response = self.client.put_object(
namespace_name=namespace,
bucket_name=bucket_name,
object_name=final_object_name,
put_object_body=file
)
print(f"βœ… Upload successful! ETag: {response.headers['etag']}")
return True, f"File uploaded: {final_object_name}"
except oci.exceptions.ServiceError as e:
error_msg = f"OCI Service Error (Status: {e.status}): {e.message}"
print(f"❌ {error_msg}")
return False, error_msg
except Exception as e:
error_msg = f"Upload failed: {str(e)}"
print(f"❌ {error_msg}")
return False, error_msg
def list_files(self, prefix=None):
"""List files in OCI bucket, optionally filtered by prefix"""
try:
if self.initialization_error:
print(f"❌ Cannot list files: {self.initialization_error}")
return []
if not self.client:
print("❌ OCI client not available for listing")
return []
namespace = self.config['namespace']
bucket_name = self.config['bucket_name']
if not namespace or not bucket_name:
print("❌ Namespace or bucket name missing for listing")
return []
print(f"πŸ“‹ Listing objects in bucket: {bucket_name}" + (f" with prefix: {prefix}" if prefix else ""))
# List objects in the bucket
list_objects_kwargs = {
'namespace_name': namespace,
'bucket_name': bucket_name
}
if prefix:
list_objects_kwargs['prefix'] = prefix
list_objects_kwargs['delimiter'] = '/'
response = self.client.list_objects(**list_objects_kwargs)
print(f"πŸ“Š List response received: {len(response.data.objects)} objects")
files = []
for obj in response.data.objects:
# Handle None values
size_display = f"{obj.size / 1024 / 1024:.1f} MB" if obj.size else "Unknown size"
time_display = obj.time_created.strftime("%Y-%m-%d %H:%M:%S") if obj.time_created else "Unknown time"
files.append({
'name': obj.name,
'size': size_display,
'time': time_display,
'size_bytes': obj.size or 0,
'directory': '/' in obj.name
})
print(f"βœ… Found {len(files)} objects")
return files
except oci.exceptions.ServiceError as e:
print(f"❌ List failed - Service Error: {e.message} (Status: {e.status})")
return []
except Exception as e:
print(f"❌ List failed: {e}")
return []
def list_directories(self):
"""List all directories (prefixes) in the bucket"""
try:
if self.initialization_error:
return []
if not self.client:
return []
namespace = self.config['namespace']
bucket_name = self.config['bucket_name']
# List objects with delimiter to get directories
response = self.client.list_objects(
namespace_name=namespace,
bucket_name=bucket_name,
delimiter='/'
)
directories = []
if hasattr(response.data, 'prefixes'):
directories = response.data.prefixes
return directories
except Exception as e:
print(f"❌ List directories failed: {e}")
return []
def download_file(self, object_name, download_path):
"""Download a file from OCI bucket"""
try:
if self.initialization_error:
return False, f"OCI Client Error: {self.initialization_error}"
if not self.client:
return False, "OCI client not initialized"
namespace = self.config['namespace']
bucket_name = self.config['bucket_name']
print(f"πŸ“₯ Downloading {object_name}")
# Download the file
response = self.client.get_object(
namespace_name=namespace,
bucket_name=bucket_name,
object_name=object_name
)
# Save to file
with open(download_path, 'wb') as f:
for chunk in response.data.raw.stream(1024 * 1024):
f.write(chunk)
print(f"βœ… Download successful: {object_name}")
return True, f"File downloaded successfully!"
except oci.exceptions.ServiceError as e:
error_msg = f"OCI Service Error (Status: {e.status}): {e.message}"
print(f"❌ {error_msg}")
return False, error_msg
except Exception as e:
error_msg = f"Download failed: {str(e)}"
print(f"❌ {error_msg}")
return False, error_msg
def delete_file(self, object_name):
"""Delete a file from OCI bucket"""
try:
if self.initialization_error:
return False, f"OCI Client Error: {self.initialization_error}"
if not self.client:
return False, "OCI client not initialized"
namespace = self.config['namespace']
bucket_name = self.config['bucket_name']
print(f"πŸ—‘οΈ Deleting {object_name}")
# Delete the file
response = self.client.delete_object(
namespace_name=namespace,
bucket_name=bucket_name,
object_name=object_name
)
print(f"βœ… Delete successful: {object_name}")
return True, f"File deleted successfully!"
except oci.exceptions.ServiceError as e:
error_msg = f"OCI Service Error (Status: {e.status}): {e.message}"
print(f"❌ {error_msg}")
return False, error_msg
except Exception as e:
error_msg = f"Delete failed: {str(e)}"
print(f"❌ {error_msg}")
return False, error_msg
# Initialize OCI connector
print("πŸš€ Initializing OCI Connector...")
oci_connector = OCIStorageConnector()
# Create FastAPI app for API endpoints
app = FastAPI(title="OCI Storage API")
# Add CORS middleware to allow requests from n8n
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post("/api/upload")
async def api_upload(
project_id: str = Form(...),
file: UploadFile = File(...),
subfolder: str = Form(None)
):
"""API endpoint for n8n and other services to upload files with subfolder support"""
try:
print(f"πŸ“₯ API Upload received")
print(f" Project ID: {project_id}")
print(f" Subfolder: {subfolder}")
print(f" Filename: {file.filename}")
print(f" Content Type: {file.content_type}")
# Check if project_id is provided
if not project_id or project_id.strip() == "":
print("❌ Error: project_id is required")
return {"status": "error", "message": "project_id is required"}
# Read file content
file_content = await file.read()
print(f" File size: {len(file_content)} bytes")
# Create temporary file
temp_path = f"/tmp/{file.filename}"
with open(temp_path, "wb") as f:
f.write(file_content)
print(f" Temporary file created: {temp_path}")
# Create object name with project_id and subfolder directory
if project_id and project_id.strip():
# Sanitize project_id for use in path
sanitized_project_id = re.sub(r'[^a-zA-Z0-9_\-]', '', project_id.strip())
# Add subfolder if specified
if subfolder and subfolder.strip():
sanitized_subfolder = re.sub(r'[^a-zA-Z0-9_\-]', '', subfolder.strip())
final_object_name = f"{sanitized_project_id}/{sanitized_subfolder}/{file.filename}"
print(f" Using subfolder: {sanitized_subfolder}")
else:
final_object_name = f"{sanitized_project_id}/{file.filename}"
print(f" No subfolder specified")
else:
final_object_name = file.filename
print(f" Final object name: {final_object_name}")
# Upload to OCI with the final object name
success, message = oci_connector.upload_file(temp_path, final_object_name, None)
# Clean up temporary file
try:
os.remove(temp_path)
print(f" Temporary file cleaned up")
except:
print(" Warning: Could not remove temporary file")
pass
if success:
print(f"βœ… Upload successful: {message}")
return {
"status": "success",
"message": message,
"project_id": project_id,
"subfolder": subfolder,
"object_name": final_object_name
}
else:
print(f"❌ Upload failed: {message}")
return {
"status": "error",
"message": message,
"project_id": project_id,
"subfolder": subfolder
}
except Exception as e:
error_msg = f"API error: {str(e)}"
print(f"❌ {error_msg}")
import traceback
print(f"Traceback: {traceback.format_exc()}")
return {
"status": "error",
"message": error_msg,
"project_id": project_id,
"subfolder": subfolder
}
@app.post("/api/upload-text")
async def api_upload_text(
project_id: str = Form(...),
content: str = Form(...),
filename: str = Form("text_file.txt"),
subfolder: str = Form(None)
):
"""API endpoint for uploading text content directly with subfolder support"""
try:
print(f"πŸ“₯ API Text Upload received - Project: {project_id}, File: {filename}, Subfolder: {subfolder}")
# Create temporary file with text content
temp_path = f"/tmp/{filename}"
with open(temp_path, "w", encoding="utf-8") as f:
f.write(content)
# Create object name with project_id and subfolder directory
if project_id and project_id.strip():
# Sanitize project_id for use in path
sanitized_project_id = re.sub(r'[^a-zA-Z0-9_\-]', '', project_id.strip())
# Add subfolder if specified
if subfolder and subfolder.strip():
sanitized_subfolder = re.sub(r'[^a-zA-Z0-9_\-]', '', subfolder.strip())
final_object_name = f"{sanitized_project_id}/{sanitized_subfolder}/{filename}"
else:
final_object_name = f"{sanitized_project_id}/{filename}"
else:
final_object_name = filename
# Upload to OCI with the final object name
success, message = oci_connector.upload_file(temp_path, final_object_name, None)
# Clean up temporary file
try:
os.remove(temp_path)
except:
pass
if success:
return {
"status": "success",
"message": message,
"project_id": project_id,
"subfolder": subfolder,
"object_name": final_object_name
}
else:
return {
"status": "error",
"message": message,
"project_id": project_id,
"subfolder": subfolder
}
except Exception as e:
return {
"status": "error",
"message": f"API error: {str(e)}",
"project_id": project_id,
"subfolder": subfolder
}
@app.get("/api/health")
async def api_health():
"""Health check endpoint"""
return {
"status": "healthy",
"oci_available": OCI_AVAILABLE,
"oci_initialized": oci_connector.client is not None,
"timestamp": datetime.now().isoformat()
}
@app.get("/api/files/{project_id}")
async def api_list_files(project_id: str):
"""API endpoint to list files in a project directory"""
try:
files = oci_connector.list_files(prefix=project_id)
return {"status": "success", "project_id": project_id, "files": files}
except Exception as e:
return {"status": "error", "message": str(e)}
@app.get("/api/debug/bucket-structure")
async def debug_bucket_structure():
"""Debug endpoint to check bucket structure"""
try:
# List all objects to see the current structure
all_files = oci_connector.list_files()
# Group by project and subfolder
structure = {}
for file in all_files:
parts = file['name'].split('/')
if len(parts) >= 2:
project = parts[0]
if project not in structure:
structure[project] = {}
if len(parts) >= 3:
subfolder = parts[1]
filename = '/'.join(parts[2:])
if subfolder not in structure[project]:
structure[project][subfolder] = []
structure[project][subfolder].append(filename)
else:
if 'root' not in structure[project]:
structure[project]['root'] = []
structure[project]['root'].append(parts[1])
return {
"status": "success",
"bucket_structure": structure,
"total_files": len(all_files)
}
except Exception as e:
return {"status": "error", "message": str(e)}
# Gradio Functions
def upload_video(file, project_id, subfolder=None):
"""Upload file to OCI with project_id directory and optional subfolder"""
if file is None:
return "❌ Please select a file first"
try:
# Get filename
filename = os.path.basename(file.name)
# Create object name with project_id and subfolder directory
if project_id and project_id.strip():
# Sanitize project_id for use in path
sanitized_project_id = re.sub(r'[^a-zA-Z0-9_\-]', '', project_id.strip())
# Add subfolder if specified
if subfolder and subfolder.strip():
sanitized_subfolder = re.sub(r'[^a-zA-Z0-9_\-]', '', subfolder.strip())
final_object_name = f"{sanitized_project_id}/{sanitized_subfolder}/{filename}"
else:
final_object_name = f"{sanitized_project_id}/{filename}"
else:
final_object_name = filename
# Upload to OCI with the final object name
success, message = oci_connector.upload_file(file.name, final_object_name, None)
if success:
return f"βœ… {message}"
else:
return f"❌ {message}"
except Exception as e:
return f"❌ Unexpected error: {str(e)}"
def list_files(prefix=None):
"""List files in OCI bucket, optionally filtered by prefix"""
try:
files = oci_connector.list_files(prefix)
if not files:
return gr.Dropdown(choices=[], value=None), "πŸ“ No files found" + (f" in '{prefix}'" if prefix else "")
# Create dropdown options
file_options = [f['name'] for f in files]
file_info = f"πŸ“ Files{(' in ' + prefix) if prefix else ''}:\n\n" + "\n".join([f"β€’ {f['name']} ({f['size']}) - {f['time']}" for f in files])
return gr.Dropdown(choices=file_options, value=file_options[0] if file_options else None), file_info
except Exception as e:
return gr.Dropdown(choices=[], value=None), f"❌ Error listing files: {str(e)}"
def list_directories():
"""List all directories in the bucket"""
try:
directories = oci_connector.list_directories()
if not directories:
return gr.Dropdown(choices=[], value=None), "πŸ“ No directories found"
dir_options = [d for d in directories if d] # Filter out empty strings
dir_info = "πŸ“ Project Directories:\n\n" + "\n".join([f"β€’ {d}" for d in dir_options])
return gr.Dropdown(choices=dir_options, value=dir_options[0] if dir_options else None), dir_info
except Exception as e:
return gr.Dropdown(choices=[], value=None), f"❌ Error listing directories: {str(e)}"
def download_file(selected_file):
"""Download file from OCI bucket"""
if not selected_file:
return None, "❌ Please select a file to download"
try:
# Create temp file for download
download_dir = "/tmp/downloads"
os.makedirs(download_dir, exist_ok=True)
filename = os.path.basename(selected_file)
download_path = os.path.join(download_dir, filename)
# Download from OCI
success, message = oci_connector.download_file(selected_file, download_path)
if success:
return download_path, f"βœ… {message}"
else:
return None, f"❌ {message}"
except Exception as e:
return None, f"❌ Download error: {str(e)}"
def delete_file(selected_file):
"""Delete file from OCI bucket"""
if not selected_file:
return "❌ Please select a file to delete"
try:
# Delete from OCI
success, message = oci_connector.delete_file(selected_file)
if success:
return f"βœ… {message}"
else:
return f"❌ {message}"
except Exception as e:
return f"❌ Delete error: {str(e)}"
def check_environment():
"""Check if environment variables are set correctly"""
required_vars = ['OCI_USER_OCID', 'OCI_FINGERPRINT', 'OCI_TENANCY_OCID',
'OCI_REGION', 'OCI_NAMESPACE', 'OCI_BUCKET_NAME', 'OCI_PRIVATE_KEY']
result = "πŸ” Environment Check:\n\n"
all_set = True
for var in required_vars:
value = os.getenv(var)
if value:
display_value = value[:20] + "..." if len(value) > 20 else value
result += f"βœ… {var}: {display_value}\n"
else:
result += f"❌ {var}: MISSING\n"
all_set = False
if hasattr(oci_connector, 'initialization_error') and oci_connector.initialization_error:
result += f"\nπŸ”§ OCI Client: ❌ {oci_connector.initialization_error}"
elif oci_connector and oci_connector.client:
result += f"\nπŸ”§ OCI Client: βœ… INITIALIZED"
else:
result += f"\nπŸ”§ OCI Client: ⏳ INITIALIZING"
result += f"\nπŸ”§ OCI SDK: {'βœ… AVAILABLE' if OCI_AVAILABLE else '❌ NOT AVAILABLE'}"
result += f"\nπŸ“Š Status: {'βœ… ALL SET' if all_set else '❌ MISSING VARIABLES'}"
return result
# Create Gradio interface
with gr.Blocks(title="OCI Storage Control Board", theme="soft") as demo:
gr.Markdown("# πŸ—‚οΈ OCI Storage Control Board")
gr.Markdown("Complete file management with project_id-based organization")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("## πŸ” Configuration")
check_btn = gr.Button("Check Environment", variant="secondary")
check_output = gr.Textbox(label="Environment Status", interactive=False, lines=10)
with gr.Column(scale=2):
gr.Markdown("## πŸ“€ Upload Files")
project_id_input = gr.Textbox(
label="Project ID",
placeholder="Enter project_id from n8n (e.g., 'my_video_project_x7f3')",
interactive=True
)
subfolder_input = gr.Textbox(
label="Subfolder (Optional)",
placeholder="Enter subfolder name (e.g., 'video', 'audio')",
interactive=True
)
file_input = gr.File(
label="Select File to Upload",
file_types=[
".mp4", ".mov", ".avi", ".mkv", ".wav", ".mp3", ".ogg", ".flac",
".txt", ".pdf", ".doc", ".docx", ".xls", ".xlsx", ".ppt", ".pptx",
".jpg", ".jpeg", ".png", ".gif", ".bmp", ".svg",
".zip", ".rar", ".7z", ".tar", ".gz",
".py", ".js", ".html", ".css", ".json", ".xml"
]
)
upload_btn = gr.Button("πŸ“€ Upload to OCI", variant="primary")
upload_output = gr.Textbox(label="Upload Status", interactive=False)
with gr.Row():
gr.Markdown("## πŸ“‹ File Management")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### πŸ—‚οΈ Browse Projects")
dir_btn = gr.Button("πŸ“ List Projects", variant="secondary")
dir_dropdown = gr.Dropdown(
label="Select Project Directory",
choices=[],
interactive=True
)
dir_info = gr.Textbox(label="Project Contents", interactive=False, lines=6)
with gr.Column(scale=2):
gr.Markdown("### πŸ“„ File Operations")
list_btn = gr.Button("πŸ”„ Refresh File List", variant="secondary")
file_dropdown = gr.Dropdown(
label="Select File",
choices=[],
interactive=True
)
file_info = gr.Textbox(label="File List", interactive=False, lines=8)
with gr.Row():
download_btn = gr.Button("πŸ“₯ Download", variant="secondary")
delete_btn = gr.Button("πŸ—‘οΈ Delete", variant="stop")
download_output = gr.Textbox(label="Download Status", interactive=False)
delete_output = gr.Textbox(label="Delete Status", interactive=False)
with gr.Row():
download_file_component = gr.File(label="Downloaded File", visible=False)
# Connect functions to buttons
check_btn.click(check_environment, inputs=None, outputs=check_output)
upload_btn.click(upload_video, inputs=[file_input, project_id_input, subfolder_input], outputs=upload_output)
dir_btn.click(
fn=list_directories,
inputs=None,
outputs=[dir_dropdown, dir_info]
)
dir_dropdown.change(
fn=lambda x: list_files(x),
inputs=[dir_dropdown],
outputs=[file_dropdown, file_info]
)
list_btn.click(
fn=lambda: list_files(None),
inputs=None,
outputs=[file_dropdown, file_info]
)
download_btn.click(
fn=download_file,
inputs=[file_dropdown],
outputs=[download_file_component, download_output]
)
delete_btn.click(
fn=delete_file,
inputs=[file_dropdown],
outputs=delete_output
).then(
fn=lambda: list_files(dir_dropdown.value) if dir_dropdown.value else list_files(None),
inputs=None,
outputs=[file_dropdown, file_info]
)
# Auto-load on start
demo.load(
fn=list_directories,
inputs=None,
outputs=[dir_dropdown, dir_info]
)
# Mount Gradio app to FastAPI
app = gr.mount_gradio_app(app, demo, path="/")
# Hugging Face needs this specific variable name for detection
def get_app():
"""Return the FastAPI app for Hugging Face"""
return app
# For Hugging Face Spaces, we need to expose the app differently
if __name__ == "__main__":
# This is for local development only
print("βœ… App initialized successfully!")
print("πŸ“‘ API Endpoints available:")
print(" - POST /api/upload")
print(" - POST /api/upload-text")
print(" - GET /api/health")
print(" - GET /api/files/{project_id}")
print(" - GET /api/debug/bucket-structure")
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)
else:
# For Hugging Face deployment
print("πŸš€ Hugging Face Space detected - using automatic deployment")