""" Lightweight local preview server for testing the API structure. This version uses simple image processing instead of AI models. For full AI processing, deploy to Hugging Face Spaces. """ import io import os import uuid import threading import base64 from pathlib import Path from http.server import HTTPServer, SimpleHTTPRequestHandler import json import urllib.parse try: import httpx HAS_HTTPX = True except ImportError: import urllib.request HAS_HTTPX = False HF_API_URL = "https://router.huggingface.co/hf-inference/models/black-forest-labs/FLUX.1-schnell" jobs = {} def get_hf_token(): """Get Hugging Face API token from environment.""" token = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_TOKEN") if not token: return None return token def generate_image_from_hf(prompt: str, width: int = 1024, height: int = 1024) -> bytes: """Generate image using Hugging Face Inference API with FLUX.1-schnell model.""" token = get_hf_token() if not token: raise Exception("Hugging Face API token not configured. Please set HF_TOKEN secret.") headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } payload = { "inputs": prompt, "parameters": { "width": width, "height": height, "num_inference_steps": 4 } } if HAS_HTTPX: with httpx.Client(timeout=120.0) as client: response = client.post(HF_API_URL, headers=headers, json=payload) if response.status_code == 503: error_data = response.json() if "estimated_time" in error_data: raise Exception(f"Model is loading. Estimated time: {error_data['estimated_time']:.0f}s. Please retry shortly.") if response.status_code != 200: try: error_detail = response.json() except: error_detail = response.text raise Exception(f"Hugging Face API error: {error_detail}") return response.content else: req = urllib.request.Request(HF_API_URL, data=json.dumps(payload).encode(), headers=headers, method='POST') try: with urllib.request.urlopen(req, timeout=120) as response: return response.read() except urllib.error.HTTPError as e: raise Exception(f"Hugging Face API error: {e.read().decode()}") def process_generate_image_job(job_id: str, prompt: str, width: int, height: int, output_path: Path): """Background task for image generation.""" try: jobs[job_id] = {"status": "processing", "progress": 20.0, "message": "Sending prompt to FLUX.1-schnell..."} image_bytes = generate_image_from_hf(prompt, width, height) jobs[job_id] = {"status": "processing", "progress": 80.0, "message": "Saving image..."} from PIL import Image generated_image = Image.open(io.BytesIO(image_bytes)) generated_image.save(output_path, "PNG") jobs[job_id] = { "status": "completed", "progress": 100.0, "message": f"Image generated: {generated_image.width}x{generated_image.height}", "result": str(output_path) } except Exception as e: jobs[job_id] = {"status": "failed", "progress": 0, "error": str(e)} UPLOAD_DIR = Path("uploads") OUTPUT_DIR = Path("outputs") UPLOAD_DIR.mkdir(exist_ok=True) OUTPUT_DIR.mkdir(exist_ok=True) class APIHandler(SimpleHTTPRequestHandler): def do_GET(self): parsed = urllib.parse.urlparse(self.path) path = parsed.path if path == "/" or path == "": self.serve_html() elif path == "/docs": self.serve_swagger() elif path == "/health": self.send_json({ "status": "healthy", "version": "2.2.0 (preview)", "features": ["enhance", "remove-background", "denoise", "generate-image"] }) elif path == "/model-info": self.send_json({ "models": { "super_resolution": { "name": "Real-ESRGAN x4plus", "description": "State-of-the-art image super-resolution", "note": "Preview mode - deploy to HF Spaces for full AI" }, "background_removal": { "name": "BiRefNet-general", "description": "High-accuracy background removal", "note": "Preview mode - deploy to HF Spaces for full AI" }, "noise_reduction": { "name": "Non-Local Means Denoising", "description": "Advanced noise reduction algorithm" }, "image_generation": { "name": "FLUX.1-schnell", "description": "Fast, high-quality text-to-image generation by Black Forest Labs", "max_resolution": "1440x1440", "default_resolution": "1024x1024", "source": "https://huggingface.co/black-forest-labs/FLUX.1-schnell" } }, "supported_formats": ["png", "jpg", "jpeg", "webp", "bmp"] }) elif path == "/openapi.json": self.serve_openapi() elif path.startswith("/outputs/"): self.serve_file(path) elif path.startswith("/progress/"): job_id = path.split("/progress/")[1] if job_id in jobs: self.send_json(jobs[job_id]) else: self.send_error(404, "Job not found") elif path.startswith("/result/"): job_id = path.split("/result/")[1] if job_id in jobs: job = jobs[job_id] if job.get("status") == "completed" and job.get("result"): result_path = Path(job["result"]) if result_path.exists(): self.send_response(200) self.send_header('Content-Type', 'image/png') self.send_header('Content-Disposition', f'attachment; filename="{result_path.name}"') self.end_headers() self.wfile.write(result_path.read_bytes()) else: self.send_error(404, "Result file not found") elif job.get("status") == "failed": self.send_error(500, job.get("error", "Job failed")) else: self.send_json({"status": job.get("status"), "progress": job.get("progress"), "message": "Job still processing"}) else: self.send_error(404, "Job not found") else: super().do_GET() def do_POST(self): parsed = urllib.parse.urlparse(self.path) path = parsed.path query = urllib.parse.parse_qs(parsed.query) if path == "/enhance" or path == "/enhance/base64": self.handle_enhance(path, query) elif path == "/remove-background" or path == "/remove-background/base64": self.handle_remove_background(path, query) elif path == "/denoise" or path == "/denoise/base64": self.handle_denoise(path, query) elif path == "/generate-image" or path == "/generate-image/base64": self.handle_generate_image(path, query) elif path == "/generate-image/async": self.handle_generate_image_async(query) else: self.send_error(404, "Not Found") def parse_multipart(self): content_type = self.headers.get('Content-Type', '') if 'multipart/form-data' not in content_type: return None boundary = None for part in content_type.split(';'): if 'boundary=' in part: boundary = part.split('=')[1].strip() break if not boundary: return None content_length = int(self.headers.get('Content-Length', 0)) body = self.rfile.read(content_length) boundary_bytes = boundary.encode() parts = body.split(b'--' + boundary_bytes) for part in parts: if b'filename=' in part: header_end = part.find(b'\r\n\r\n') if header_end != -1: file_data = part[header_end + 4:] if file_data.endswith(b'\r\n'): file_data = file_data[:-2] if file_data.endswith(b'--'): file_data = file_data[:-2] if file_data.endswith(b'\r\n'): file_data = file_data[:-2] return file_data return None def handle_enhance(self, path, query): try: file_data = self.parse_multipart() if not file_data: self.send_error(400, "No file uploaded") return scale = int(query.get('scale', [4])[0]) if scale not in [2, 4]: scale = 4 from PIL import Image, ImageEnhance input_image = Image.open(io.BytesIO(file_data)) if input_image.mode != "RGB": input_image = input_image.convert("RGB") new_size = (input_image.width * scale, input_image.height * scale) upscaled = input_image.resize(new_size, Image.LANCZOS) enhancer = ImageEnhance.Sharpness(upscaled) sharpened = enhancer.enhance(1.3) enhancer = ImageEnhance.Contrast(sharpened) enhanced = enhancer.enhance(1.1) file_id = str(uuid.uuid4()) output_path = OUTPUT_DIR / f"{file_id}_enhanced.png" enhanced.save(output_path, "PNG") if "/base64" in path: import base64 buffer = io.BytesIO() enhanced.save(buffer, format="PNG") buffer.seek(0) img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8") self.send_json({ "success": True, "image_base64": img_base64, "original_size": {"width": input_image.width, "height": input_image.height}, "enhanced_size": {"width": enhanced.width, "height": enhanced.height}, "scale_factor": scale, "note": "Preview mode - deploy to Hugging Face for AI enhancement" }) else: self.send_response(200) self.send_header('Content-Type', 'image/png') self.send_header('Content-Disposition', 'attachment; filename="enhanced.png"') self.end_headers() with open(output_path, 'rb') as f: self.wfile.write(f.read()) except Exception as e: self.send_error(500, f"Error processing image: {str(e)}") def handle_remove_background(self, path, query): try: file_data = self.parse_multipart() if not file_data: self.send_error(400, "No file uploaded") return bgcolor = query.get('bgcolor', ['transparent'])[0] from PIL import Image input_image = Image.open(io.BytesIO(file_data)) if input_image.mode != "RGBA": input_image = input_image.convert("RGBA") output_image = input_image file_id = str(uuid.uuid4()) output_path = OUTPUT_DIR / f"{file_id}_nobg.png" output_image.save(output_path, "PNG") if "/base64" in path: import base64 buffer = io.BytesIO() output_image.save(buffer, format="PNG") buffer.seek(0) img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8") self.send_json({ "success": True, "image_base64": img_base64, "original_size": {"width": input_image.width, "height": input_image.height}, "background": bgcolor, "note": "Preview mode - deploy to Hugging Face for AI background removal" }) else: self.send_response(200) self.send_header('Content-Type', 'image/png') self.send_header('Content-Disposition', 'attachment; filename="nobg.png"') self.end_headers() with open(output_path, 'rb') as f: self.wfile.write(f.read()) except Exception as e: self.send_error(500, f"Error processing image: {str(e)}") def handle_denoise(self, path, query): try: file_data = self.parse_multipart() if not file_data: self.send_error(400, "No file uploaded") return strength = int(query.get('strength', [10])[0]) from PIL import Image, ImageFilter input_image = Image.open(io.BytesIO(file_data)) if input_image.mode != "RGB": input_image = input_image.convert("RGB") output_image = input_image.filter(ImageFilter.SMOOTH_MORE) if strength > 10: output_image = output_image.filter(ImageFilter.SMOOTH_MORE) if strength > 20: output_image = output_image.filter(ImageFilter.SMOOTH_MORE) file_id = str(uuid.uuid4()) output_path = OUTPUT_DIR / f"{file_id}_denoised.png" output_image.save(output_path, "PNG") if "/base64" in path: import base64 buffer = io.BytesIO() output_image.save(buffer, format="PNG") buffer.seek(0) img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8") self.send_json({ "success": True, "image_base64": img_base64, "original_size": {"width": input_image.width, "height": input_image.height}, "strength": strength, "note": "Preview mode - deploy to Hugging Face for AI denoising" }) else: self.send_response(200) self.send_header('Content-Type', 'image/png') self.send_header('Content-Disposition', 'attachment; filename="denoised.png"') self.end_headers() with open(output_path, 'rb') as f: self.wfile.write(f.read()) except Exception as e: self.send_error(500, f"Error processing image: {str(e)}") def handle_generate_image(self, path, query): """Handle synchronous image generation.""" try: prompt = query.get('prompt', [''])[0] if not prompt: self.send_error(400, "Missing 'prompt' parameter") return width = int(query.get('width', [1024])[0]) height = int(query.get('height', [1024])[0]) width = max(256, min(1440, width)) height = max(256, min(1440, height)) async_mode = query.get('async_mode', ['false'])[0].lower() == 'true' if async_mode: self.handle_generate_image_async(query) return from PIL import Image image_bytes = generate_image_from_hf(prompt, width, height) generated_image = Image.open(io.BytesIO(image_bytes)) file_id = str(uuid.uuid4()) output_path = OUTPUT_DIR / f"{file_id}_generated.png" generated_image.save(output_path, "PNG") if "/base64" in path: buffer = io.BytesIO() generated_image.save(buffer, format="PNG") buffer.seek(0) img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8") self.send_json({ "success": True, "image_base64": img_base64, "size": {"width": generated_image.width, "height": generated_image.height}, "model": "FLUX.1-schnell", "prompt": prompt }) else: self.send_response(200) self.send_header('Content-Type', 'image/png') self.send_header('Content-Disposition', f'attachment; filename="generated_{file_id[:8]}.png"') self.end_headers() with open(output_path, 'rb') as f: self.wfile.write(f.read()) except Exception as e: self.send_error(500, f"Error generating image: {str(e)}") def handle_generate_image_async(self, query): """Handle async image generation with progress tracking.""" try: prompt = query.get('prompt', [''])[0] if not prompt: self.send_error(400, "Missing 'prompt' parameter") return width = int(query.get('width', [1024])[0]) height = int(query.get('height', [1024])[0]) width = max(256, min(1440, width)) height = max(256, min(1440, height)) token = get_hf_token() if not token: self.send_error(500, "Hugging Face API token not configured. Please set HF_TOKEN secret.") return job_id = str(uuid.uuid4()) file_id = str(uuid.uuid4()) output_path = OUTPUT_DIR / f"{file_id}_generated.png" jobs[job_id] = {"status": "pending", "progress": 0, "message": "Starting image generation..."} thread = threading.Thread( target=process_generate_image_job, args=(job_id, prompt, width, height, output_path) ) thread.start() self.send_json({ "job_id": job_id, "status": "processing", "message": "Image generation started. Poll /progress/{job_id} for updates.", "progress_url": f"/progress/{job_id}", "result_url": f"/result/{job_id}", "model": "FLUX.1-schnell", "prompt": prompt }) except Exception as e: self.send_error(500, f"Error starting image generation: {str(e)}") def serve_html(self): html_path = Path("templates/index.html") if html_path.exists(): self.send_response(200) self.send_header('Content-Type', 'text/html') self.send_header('Cache-Control', 'no-cache') self.end_headers() self.wfile.write(html_path.read_bytes()) else: self.send_error(404, "Template not found") def serve_swagger(self): swagger_html = """ AI Image Processing - API Documentation
""" self.send_response(200) self.send_header('Content-Type', 'text/html') self.send_header('Cache-Control', 'no-cache') self.end_headers() self.wfile.write(swagger_html.encode()) def serve_openapi(self): openapi_spec = { "openapi": "3.1.0", "info": { "title": "AI Image Processing API", "description": "Comprehensive AI-powered image processing API.\n\n**Features:**\n- Image enhancement and upscaling (Real-ESRGAN)\n- Background removal (BiRefNet)\n- Noise reduction (OpenCV NLM)\n- Image generation from text (FLUX.1-schnell)\n\n**Note:** This is a preview deployment. Deploy to Hugging Face Spaces for full AI processing.", "version": "2.2.0" }, "servers": [{"url": "/", "description": "Current server"}], "paths": { "/enhance": { "post": { "summary": "Enhance image (upscale)", "description": "Upscale and enhance image quality using Real-ESRGAN.", "parameters": [{"name": "scale", "in": "query", "schema": {"type": "integer", "default": 4, "enum": [2, 4]}}], "requestBody": {"required": True, "content": {"multipart/form-data": {"schema": {"type": "object", "properties": {"file": {"type": "string", "format": "binary"}}}}}}, "responses": {"200": {"description": "Enhanced image"}} } }, "/remove-background": { "post": { "summary": "Remove background", "description": "Remove background from image using BiRefNet AI model.", "parameters": [{"name": "bgcolor", "in": "query", "schema": {"type": "string", "default": "transparent"}}], "requestBody": {"required": True, "content": {"multipart/form-data": {"schema": {"type": "object", "properties": {"file": {"type": "string", "format": "binary"}}}}}}, "responses": {"200": {"description": "Image with background removed"}} } }, "/denoise": { "post": { "summary": "Reduce noise", "description": "Reduce image noise using Non-Local Means Denoising.", "parameters": [{"name": "strength", "in": "query", "schema": {"type": "integer", "default": 10, "minimum": 1, "maximum": 30}}], "requestBody": {"required": True, "content": {"multipart/form-data": {"schema": {"type": "object", "properties": {"file": {"type": "string", "format": "binary"}}}}}}, "responses": {"200": {"description": "Denoised image"}} } }, "/generate-image": { "post": { "summary": "Generate image from text", "description": "Generate an image from a text prompt using FLUX.1-schnell AI model.", "parameters": [ {"name": "prompt", "in": "query", "required": True, "schema": {"type": "string"}, "description": "Text prompt describing the image to generate"}, {"name": "width", "in": "query", "schema": {"type": "integer", "default": 1024, "minimum": 256, "maximum": 1440}}, {"name": "height", "in": "query", "schema": {"type": "integer", "default": 1024, "minimum": 256, "maximum": 1440}}, {"name": "async_mode", "in": "query", "schema": {"type": "boolean", "default": False}, "description": "Use async mode with progress tracking"} ], "responses": {"200": {"description": "Generated image"}} } }, "/generate-image/async": { "post": { "summary": "Generate image (async)", "description": "Start async image generation with progress tracking using FLUX.1-schnell.", "parameters": [ {"name": "prompt", "in": "query", "required": True, "schema": {"type": "string"}, "description": "Text prompt describing the image to generate"}, {"name": "width", "in": "query", "schema": {"type": "integer", "default": 1024, "minimum": 256, "maximum": 1440}}, {"name": "height", "in": "query", "schema": {"type": "integer", "default": 1024, "minimum": 256, "maximum": 1440}} ], "responses": {"200": {"description": "Job ID for tracking progress"}} } }, "/generate-image/base64": { "post": { "summary": "Generate image (base64)", "description": "Generate an image and return as base64-encoded string.", "parameters": [ {"name": "prompt", "in": "query", "required": True, "schema": {"type": "string"}, "description": "Text prompt describing the image to generate"}, {"name": "width", "in": "query", "schema": {"type": "integer", "default": 1024, "minimum": 256, "maximum": 1440}}, {"name": "height", "in": "query", "schema": {"type": "integer", "default": 1024, "minimum": 256, "maximum": 1440}} ], "responses": {"200": {"description": "Base64 encoded image"}} } }, "/progress/{job_id}": { "get": { "summary": "Get job progress", "description": "Get the progress of an async image processing job.", "parameters": [{"name": "job_id", "in": "path", "required": True, "schema": {"type": "string"}}], "responses": {"200": {"description": "Job progress"}} } }, "/result/{job_id}": { "get": { "summary": "Get job result", "description": "Get the result of a completed async job.", "parameters": [{"name": "job_id", "in": "path", "required": True, "schema": {"type": "string"}}], "responses": {"200": {"description": "Processed image"}} } }, "/health": {"get": {"summary": "Health check", "responses": {"200": {"description": "API status"}}}}, "/model-info": {"get": {"summary": "Model information", "responses": {"200": {"description": "Model details"}}}} } } self.send_json(openapi_spec) def serve_file(self, path): file_path = Path("." + path) if file_path.exists(): self.send_response(200) self.send_header('Content-Type', 'image/png') self.end_headers() self.wfile.write(file_path.read_bytes()) else: self.send_error(404, "File not found") def send_json(self, data): response = json.dumps(data, indent=2) self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_header('Cache-Control', 'no-cache') self.end_headers() self.wfile.write(response.encode()) def run_server(port=5000): server_address = ('0.0.0.0', port) httpd = HTTPServer(server_address, APIHandler) print(f"AI Image Processing API running at http://0.0.0.0:{port}") print(f"API Documentation: http://0.0.0.0:{port}/docs") print(f"Frontend: http://0.0.0.0:{port}/") print("\nNote: This is preview mode using simple processing.") print("Deploy to Hugging Face Spaces for full AI features.") httpd.serve_forever() if __name__ == "__main__": run_server()