""" Lightweight local preview server for testing the API structure. This version uses simple image processing instead of AI models. For full AI processing, deploy to Hugging Face Spaces. """ import io import os import uuid import threading import base64 from pathlib import Path from http.server import HTTPServer, SimpleHTTPRequestHandler import json import urllib.parse try: import httpx HAS_HTTPX = True except ImportError: import urllib.request HAS_HTTPX = False HF_API_URL = "https://router.huggingface.co/hf-inference/models/black-forest-labs/FLUX.1-schnell" jobs = {} def get_hf_token(): """Get Hugging Face API token from environment.""" token = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_TOKEN") if not token: return None return token def generate_image_from_hf(prompt: str, width: int = 1024, height: int = 1024) -> bytes: """Generate image using Hugging Face Inference API with FLUX.1-schnell model.""" token = get_hf_token() if not token: raise Exception("Hugging Face API token not configured. Please set HF_TOKEN secret.") headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } payload = { "inputs": prompt, "parameters": { "width": width, "height": height, "num_inference_steps": 4 } } if HAS_HTTPX: with httpx.Client(timeout=120.0) as client: response = client.post(HF_API_URL, headers=headers, json=payload) if response.status_code == 503: error_data = response.json() if "estimated_time" in error_data: raise Exception(f"Model is loading. Estimated time: {error_data['estimated_time']:.0f}s. Please retry shortly.") if response.status_code != 200: try: error_detail = response.json() except: error_detail = response.text raise Exception(f"Hugging Face API error: {error_detail}") return response.content else: req = urllib.request.Request(HF_API_URL, data=json.dumps(payload).encode(), headers=headers, method='POST') try: with urllib.request.urlopen(req, timeout=120) as response: return response.read() except urllib.error.HTTPError as e: raise Exception(f"Hugging Face API error: {e.read().decode()}") def process_generate_image_job(job_id: str, prompt: str, width: int, height: int, output_path: Path): """Background task for image generation.""" try: jobs[job_id] = {"status": "processing", "progress": 20.0, "message": "Sending prompt to FLUX.1-schnell..."} image_bytes = generate_image_from_hf(prompt, width, height) jobs[job_id] = {"status": "processing", "progress": 80.0, "message": "Saving image..."} from PIL import Image generated_image = Image.open(io.BytesIO(image_bytes)) generated_image.save(output_path, "PNG") jobs[job_id] = { "status": "completed", "progress": 100.0, "message": f"Image generated: {generated_image.width}x{generated_image.height}", "result": str(output_path) } except Exception as e: jobs[job_id] = {"status": "failed", "progress": 0, "error": str(e)} UPLOAD_DIR = Path("uploads") OUTPUT_DIR = Path("outputs") UPLOAD_DIR.mkdir(exist_ok=True) OUTPUT_DIR.mkdir(exist_ok=True) class APIHandler(SimpleHTTPRequestHandler): def do_GET(self): parsed = urllib.parse.urlparse(self.path) path = parsed.path if path == "/" or path == "": self.serve_html() elif path == "/docs": self.serve_swagger() elif path == "/health": self.send_json({ "status": "healthy", "version": "2.2.0 (preview)", "features": ["enhance", "remove-background", "denoise", "generate-image"] }) elif path == "/model-info": self.send_json({ "models": { "super_resolution": { "name": "Real-ESRGAN x4plus", "description": "State-of-the-art image super-resolution", "note": "Preview mode - deploy to HF Spaces for full AI" }, "background_removal": { "name": "BiRefNet-general", "description": "High-accuracy background removal", "note": "Preview mode - deploy to HF Spaces for full AI" }, "noise_reduction": { "name": "Non-Local Means Denoising", "description": "Advanced noise reduction algorithm" }, "image_generation": { "name": "FLUX.1-schnell", "description": "Fast, high-quality text-to-image generation by Black Forest Labs", "max_resolution": "1440x1440", "default_resolution": "1024x1024", "source": "https://huggingface.co/black-forest-labs/FLUX.1-schnell" } }, "supported_formats": ["png", "jpg", "jpeg", "webp", "bmp"] }) elif path == "/openapi.json": self.serve_openapi() elif path.startswith("/outputs/"): self.serve_file(path) elif path.startswith("/progress/"): job_id = path.split("/progress/")[1] if job_id in jobs: self.send_json(jobs[job_id]) else: self.send_error(404, "Job not found") elif path.startswith("/result/"): job_id = path.split("/result/")[1] if job_id in jobs: job = jobs[job_id] if job.get("status") == "completed" and job.get("result"): result_path = Path(job["result"]) if result_path.exists(): self.send_response(200) self.send_header('Content-Type', 'image/png') self.send_header('Content-Disposition', f'attachment; filename="{result_path.name}"') self.end_headers() self.wfile.write(result_path.read_bytes()) else: self.send_error(404, "Result file not found") elif job.get("status") == "failed": self.send_error(500, job.get("error", "Job failed")) else: self.send_json({"status": job.get("status"), "progress": job.get("progress"), "message": "Job still processing"}) else: self.send_error(404, "Job not found") else: super().do_GET() def do_POST(self): parsed = urllib.parse.urlparse(self.path) path = parsed.path query = urllib.parse.parse_qs(parsed.query) if path == "/enhance" or path == "/enhance/base64": self.handle_enhance(path, query) elif path == "/remove-background" or path == "/remove-background/base64": self.handle_remove_background(path, query) elif path == "/denoise" or path == "/denoise/base64": self.handle_denoise(path, query) elif path == "/generate-image" or path == "/generate-image/base64": self.handle_generate_image(path, query) elif path == "/generate-image/async": self.handle_generate_image_async(query) else: self.send_error(404, "Not Found") def parse_multipart(self): content_type = self.headers.get('Content-Type', '') if 'multipart/form-data' not in content_type: return None boundary = None for part in content_type.split(';'): if 'boundary=' in part: boundary = part.split('=')[1].strip() break if not boundary: return None content_length = int(self.headers.get('Content-Length', 0)) body = self.rfile.read(content_length) boundary_bytes = boundary.encode() parts = body.split(b'--' + boundary_bytes) for part in parts: if b'filename=' in part: header_end = part.find(b'\r\n\r\n') if header_end != -1: file_data = part[header_end + 4:] if file_data.endswith(b'\r\n'): file_data = file_data[:-2] if file_data.endswith(b'--'): file_data = file_data[:-2] if file_data.endswith(b'\r\n'): file_data = file_data[:-2] return file_data return None def handle_enhance(self, path, query): try: file_data = self.parse_multipart() if not file_data: self.send_error(400, "No file uploaded") return scale = int(query.get('scale', [4])[0]) if scale not in [2, 4]: scale = 4 from PIL import Image, ImageEnhance input_image = Image.open(io.BytesIO(file_data)) if input_image.mode != "RGB": input_image = input_image.convert("RGB") new_size = (input_image.width * scale, input_image.height * scale) upscaled = input_image.resize(new_size, Image.LANCZOS) enhancer = ImageEnhance.Sharpness(upscaled) sharpened = enhancer.enhance(1.3) enhancer = ImageEnhance.Contrast(sharpened) enhanced = enhancer.enhance(1.1) file_id = str(uuid.uuid4()) output_path = OUTPUT_DIR / f"{file_id}_enhanced.png" enhanced.save(output_path, "PNG") if "/base64" in path: import base64 buffer = io.BytesIO() enhanced.save(buffer, format="PNG") buffer.seek(0) img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8") self.send_json({ "success": True, "image_base64": img_base64, "original_size": {"width": input_image.width, "height": input_image.height}, "enhanced_size": {"width": enhanced.width, "height": enhanced.height}, "scale_factor": scale, "note": "Preview mode - deploy to Hugging Face for AI enhancement" }) else: self.send_response(200) self.send_header('Content-Type', 'image/png') self.send_header('Content-Disposition', 'attachment; filename="enhanced.png"') self.end_headers() with open(output_path, 'rb') as f: self.wfile.write(f.read()) except Exception as e: self.send_error(500, f"Error processing image: {str(e)}") def handle_remove_background(self, path, query): try: file_data = self.parse_multipart() if not file_data: self.send_error(400, "No file uploaded") return bgcolor = query.get('bgcolor', ['transparent'])[0] from PIL import Image input_image = Image.open(io.BytesIO(file_data)) if input_image.mode != "RGBA": input_image = input_image.convert("RGBA") output_image = input_image file_id = str(uuid.uuid4()) output_path = OUTPUT_DIR / f"{file_id}_nobg.png" output_image.save(output_path, "PNG") if "/base64" in path: import base64 buffer = io.BytesIO() output_image.save(buffer, format="PNG") buffer.seek(0) img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8") self.send_json({ "success": True, "image_base64": img_base64, "original_size": {"width": input_image.width, "height": input_image.height}, "background": bgcolor, "note": "Preview mode - deploy to Hugging Face for AI background removal" }) else: self.send_response(200) self.send_header('Content-Type', 'image/png') self.send_header('Content-Disposition', 'attachment; filename="nobg.png"') self.end_headers() with open(output_path, 'rb') as f: self.wfile.write(f.read()) except Exception as e: self.send_error(500, f"Error processing image: {str(e)}") def handle_denoise(self, path, query): try: file_data = self.parse_multipart() if not file_data: self.send_error(400, "No file uploaded") return strength = int(query.get('strength', [10])[0]) from PIL import Image, ImageFilter input_image = Image.open(io.BytesIO(file_data)) if input_image.mode != "RGB": input_image = input_image.convert("RGB") output_image = input_image.filter(ImageFilter.SMOOTH_MORE) if strength > 10: output_image = output_image.filter(ImageFilter.SMOOTH_MORE) if strength > 20: output_image = output_image.filter(ImageFilter.SMOOTH_MORE) file_id = str(uuid.uuid4()) output_path = OUTPUT_DIR / f"{file_id}_denoised.png" output_image.save(output_path, "PNG") if "/base64" in path: import base64 buffer = io.BytesIO() output_image.save(buffer, format="PNG") buffer.seek(0) img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8") self.send_json({ "success": True, "image_base64": img_base64, "original_size": {"width": input_image.width, "height": input_image.height}, "strength": strength, "note": "Preview mode - deploy to Hugging Face for AI denoising" }) else: self.send_response(200) self.send_header('Content-Type', 'image/png') self.send_header('Content-Disposition', 'attachment; filename="denoised.png"') self.end_headers() with open(output_path, 'rb') as f: self.wfile.write(f.read()) except Exception as e: self.send_error(500, f"Error processing image: {str(e)}") def handle_generate_image(self, path, query): """Handle synchronous image generation.""" try: prompt = query.get('prompt', [''])[0] if not prompt: self.send_error(400, "Missing 'prompt' parameter") return width = int(query.get('width', [1024])[0]) height = int(query.get('height', [1024])[0]) width = max(256, min(1440, width)) height = max(256, min(1440, height)) async_mode = query.get('async_mode', ['false'])[0].lower() == 'true' if async_mode: self.handle_generate_image_async(query) return from PIL import Image image_bytes = generate_image_from_hf(prompt, width, height) generated_image = Image.open(io.BytesIO(image_bytes)) file_id = str(uuid.uuid4()) output_path = OUTPUT_DIR / f"{file_id}_generated.png" generated_image.save(output_path, "PNG") if "/base64" in path: buffer = io.BytesIO() generated_image.save(buffer, format="PNG") buffer.seek(0) img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8") self.send_json({ "success": True, "image_base64": img_base64, "size": {"width": generated_image.width, "height": generated_image.height}, "model": "FLUX.1-schnell", "prompt": prompt }) else: self.send_response(200) self.send_header('Content-Type', 'image/png') self.send_header('Content-Disposition', f'attachment; filename="generated_{file_id[:8]}.png"') self.end_headers() with open(output_path, 'rb') as f: self.wfile.write(f.read()) except Exception as e: self.send_error(500, f"Error generating image: {str(e)}") def handle_generate_image_async(self, query): """Handle async image generation with progress tracking.""" try: prompt = query.get('prompt', [''])[0] if not prompt: self.send_error(400, "Missing 'prompt' parameter") return width = int(query.get('width', [1024])[0]) height = int(query.get('height', [1024])[0]) width = max(256, min(1440, width)) height = max(256, min(1440, height)) token = get_hf_token() if not token: self.send_error(500, "Hugging Face API token not configured. Please set HF_TOKEN secret.") return job_id = str(uuid.uuid4()) file_id = str(uuid.uuid4()) output_path = OUTPUT_DIR / f"{file_id}_generated.png" jobs[job_id] = {"status": "pending", "progress": 0, "message": "Starting image generation..."} thread = threading.Thread( target=process_generate_image_job, args=(job_id, prompt, width, height, output_path) ) thread.start() self.send_json({ "job_id": job_id, "status": "processing", "message": "Image generation started. Poll /progress/{job_id} for updates.", "progress_url": f"/progress/{job_id}", "result_url": f"/result/{job_id}", "model": "FLUX.1-schnell", "prompt": prompt }) except Exception as e: self.send_error(500, f"Error starting image generation: {str(e)}") def serve_html(self): html_path = Path("templates/index.html") if html_path.exists(): self.send_response(200) self.send_header('Content-Type', 'text/html') self.send_header('Cache-Control', 'no-cache') self.end_headers() self.wfile.write(html_path.read_bytes()) else: self.send_error(404, "Template not found") def serve_swagger(self): swagger_html = """