import gradio as gr from fastapi import FastAPI from fastapi.staticfiles import StaticFiles from fastapi.responses import RedirectResponse import shutil import os import uuid from datetime import datetime from PIL import Image from collections import defaultdict import base64 from io import BytesIO import requests import json from dotenv import load_dotenv import os # Directories TEMP_DIR = "cloud_storage/temp" PERM_DIR = "cloud_storage/permanent" THUMB_DIR = "cloud_storage/thumbs" os.makedirs(TEMP_DIR, exist_ok=True) os.makedirs(PERM_DIR, exist_ok=True) os.makedirs(THUMB_DIR, exist_ok=True) # Load variables from .env file load_dotenv() # Sightengine API credentials (set as environment variables) API_USER = os.getenv('SIGHTENGINE_API_USER') API_SECRET = os.getenv('SIGHTENGINE_API_SECRET') # FastAPI app app = FastAPI() # Redirect to '/gradio' endpoint of FastAPI. @app.get("/") async def root(): return RedirectResponse(url="/gradio/?__theme=light", status_code=307) # Serve images and thumbnails as static files app.mount("/images/temp", StaticFiles(directory=TEMP_DIR), name="temp_images") app.mount("/images/permanent", StaticFiles(directory=PERM_DIR), name="perm_images") app.mount("/images/thumbs", StaticFiles(directory=THUMB_DIR), name="thumbnails") # In-memory analytics store analytics = defaultdict(lambda: {"views": 0, "likes": 0, "upload_time": None, "size": 0, "thumb": None, "tags": []}) # Generate thumbnail for an image def generate_thumbnail(src_path, filename): thumb_path = os.path.join(THUMB_DIR, filename) try: with Image.open(src_path) as img: img.thumbnail((300, 300)) img.save(thumb_path, quality=85) return f"/images/thumbs/{filename}" except Exception as e: print(f"Error generating thumbnail: {e}") return None # Generate data URI for preview def generate_preview(image_path): try: with Image.open(image_path) as img: img.thumbnail((100, 100)) buffered = BytesIO() img.save(buffered, format="PNG") return f"data:image/png;base64,{base64.b64encode(buffered.getvalue()).decode()}" except Exception as e: print(f"Error generating preview: {e}") return None # Generate HTML for previews def generate_preview_html(image_paths): if not image_paths: return "

No images selected.

" html = "
" for path in image_paths: data_uri = generate_preview(path) filename = os.path.basename(path) short_name = filename[:8] + "..." if len(filename) > 10 else filename if data_uri: html += f"""

{short_name}

""" html += "
" return html # Save images to chosen directory with NSFW moderation def save_image(image_paths, store_type, tags): if not image_paths: return "No images uploaded.", "", get_gallery_html(), get_analytics_html(), gr.update(value=None), gr.update(value=None) save_dir = TEMP_DIR if store_type == "Temporary" else PERM_DIR tag_list = [tag.strip() for tag in tags.split(",")] if tags else [] saved_count = 0 rejected = [] for image_path in image_paths: # Moderate the image using Sightengine API params = { 'models': 'nudity-2.1', 'api_user': API_USER, 'api_secret': API_SECRET } files = {'media': open(image_path, 'rb')} try: r = requests.post('https://api.sightengine.com/1.0/check.json', files=files, data=params) output = json.loads(r.text) if output['status'] == 'success': nudity = output['nudity'] # Check if the image is NSFW if nudity['none'] < 0.9 or any(score > 0.1 for score in [ nudity.get('sexual_activity', 0), nudity.get('sexual_display', 0), nudity.get('erotica', 0), nudity.get('very_suggestive', 0), nudity.get('suggestive', 0), nudity.get('mildly_suggestive', 0) ]): rejected.append(os.path.basename(image_path)) continue else: # Save the image if safe ext = os.path.splitext(image_path)[1] new_filename = f"{uuid.uuid4().hex}{ext}" dest_path = os.path.join(save_dir, new_filename) shutil.copy(image_path, dest_path) thumb_src = generate_thumbnail(dest_path, new_filename) analytics[(new_filename, store_type)] = { "views": 0, "likes": 0, "upload_time": datetime.utcnow().isoformat(), "size": os.path.getsize(dest_path), "thumb": thumb_src, "tags": tag_list } saved_count += 1 else: print(f"Error moderating image {image_path}: {output.get('error', 'Unknown error')}") rejected.append(os.path.basename(image_path)) except Exception as e: print(f"Error processing image {image_path}: {e}") rejected.append(os.path.basename(image_path)) status_message = f"Saved {saved_count} out of {len(image_paths)} images to {store_type} storage." if rejected: status_message += f" Rejected: {', '.join(rejected)}" return (status_message, "", get_gallery_html(), get_analytics_html(), gr.update(value=None), gr.update(value=None)) # Delete selected image def delete_image(filename, storage): dir_path = TEMP_DIR if storage == "Temporary" else PERM_DIR file_path = os.path.join(dir_path, filename) thumb_path = os.path.join(THUMB_DIR, filename) if os.path.exists(file_path): os.remove(file_path) if os.path.exists(thumb_path): os.remove(thumb_path) del analytics[(filename, storage)] return f"Deleted {filename} from {storage} storage.", get_gallery_html(), get_analytics_html() return "Image not found.", get_gallery_html(), get_analytics_html() # Like an image def like_image(filename, storage): analytics[(filename, storage)]["likes"] += 1 return f"Liked {filename}.", get_gallery_html(), get_analytics_html() # Build gallery HTML with masonry-like layout and updated hover behavior def get_gallery_html(): def make_img_tag(src, filename, storage): meta = analytics[(filename, storage)] thumb_src = meta.get("thumb", src) views = meta["views"] likes = meta["likes"] upload_time = meta["upload_time"] size_kb = meta["size"] // 1024 tags = ", ".join(meta["tags"]) if meta["tags"] else "No tags" delete_event = f"document.querySelector('#delete_{filename}_{storage}').click()" like_event = f"document.querySelector('#like_{filename}_{storage}').click()" short_name = filename[:8] + "..." if len(filename) > 10 else filename share_link = f"{src}" modal_data = f"{short_name}|{storage}|{views}|{size_kb}|{upload_time}|{tags}" return f''' ''' temp_html = "" perm_html = "" for (filename, storage), meta in analytics.items(): src = f"/images/{'temp' if storage == 'Temporary' else 'permanent'}/{filename}" img_tag = make_img_tag(src, filename, storage) if storage == "Temporary": temp_html += img_tag else: perm_html += img_tag css = ''' ''' javascript = ''' ''' html = f""" {css}
×
Enlarged Image
⬇ Download
{javascript} """ return html # Build analytics HTML for top 5 images def get_analytics_html(): sorted_images = sorted(analytics.items(), key=lambda x: x[1]["likes"], reverse=True)[:5] analytics_html = "

📊 Top 5 Most Liked Images

" if sorted_images else "

No analytics data yet.

" return analytics_html # Gradio interface with enhanced features and UI with gr.Blocks(theme=gr.themes.Soft()) as gradio_app: gr.Markdown("# 🌟 Image Upload Gallery") # Upload Section gr.Markdown("## 📤 Upload Images") with gr.Row(): with gr.Column(): image_input = gr.File(label="Upload images (multiple allowed)", type="filepath", file_count="multiple") store_option = gr.Radio(["Temporary", "Permanent"], value="Temporary", label="Storage Type") tags_input = gr.Textbox(label="Tags (comma-separated)", placeholder="e.g., nature, portrait, art") with gr.Column(): preview_output = gr.HTML(label="Upload Preview") upload_btn = gr.Button("Upload", variant="primary") upload_status = gr.Textbox(label="Status", interactive=False, placeholder="Action status will appear here...") # Gallery Section gr.Markdown("## 🖼️ Gallery\nExplore and manage your images in a stunning layout.") gallery_html = gr.HTML(get_gallery_html) # Analytics Section gr.Markdown("## 📊 Analytics") analytics_html = gr.HTML(get_analytics_html) # Hidden buttons for event triggering hidden_components = [] for (filename, storage), _ in list(analytics.items()): delete_btn = gr.Button(f"Delete {filename}", elem_id=f"delete_{filename}_{storage}", visible=False) like_btn = gr.Button(f"Like {filename}", elem_id=f"like_{filename}_{storage}", visible=False) delete_btn.click(fn=delete_image, inputs=[gr.State(filename), gr.State(storage)], outputs=[upload_status, gallery_html, analytics_html]) like_btn.click(fn=like_image, inputs=[gr.State(filename), gr.State(storage)], outputs=[upload_status, gallery_html, analytics_html]) hidden_components.extend([delete_btn, like_btn]) # Event Handlers upload_btn.click( fn=save_image, inputs=[image_input, store_option, tags_input], outputs=[upload_status, preview_output, gallery_html, analytics_html, image_input, tags_input] ) image_input.change( fn=generate_preview_html, inputs=image_input, outputs=preview_output ) gradio_app.pwa = True # Enable PWA # Mount Gradio at /gradio app = gr.mount_gradio_app(app, gradio_app, path="/gradio")