Imgenhance / app_local.py
azdxit
Update Hugging Face API endpoint to use the new router URL
7843402
"""
Lightweight local preview server for testing the API structure.
This version uses simple image processing instead of AI models.
For full AI processing, deploy to Hugging Face Spaces.
"""
import io
import os
import uuid
import threading
import base64
from pathlib import Path
from http.server import HTTPServer, SimpleHTTPRequestHandler
import json
import urllib.parse
try:
import httpx
HAS_HTTPX = True
except ImportError:
import urllib.request
HAS_HTTPX = False
HF_API_URL = "https://router.huggingface.co/hf-inference/models/black-forest-labs/FLUX.1-schnell"
jobs = {}
def get_hf_token():
"""Get Hugging Face API token from environment."""
token = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_TOKEN")
if not token:
return None
return token
def generate_image_from_hf(prompt: str, width: int = 1024, height: int = 1024) -> bytes:
"""Generate image using Hugging Face Inference API with FLUX.1-schnell model."""
token = get_hf_token()
if not token:
raise Exception("Hugging Face API token not configured. Please set HF_TOKEN secret.")
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
payload = {
"inputs": prompt,
"parameters": {
"width": width,
"height": height,
"num_inference_steps": 4
}
}
if HAS_HTTPX:
with httpx.Client(timeout=120.0) as client:
response = client.post(HF_API_URL, headers=headers, json=payload)
if response.status_code == 503:
error_data = response.json()
if "estimated_time" in error_data:
raise Exception(f"Model is loading. Estimated time: {error_data['estimated_time']:.0f}s. Please retry shortly.")
if response.status_code != 200:
try:
error_detail = response.json()
except:
error_detail = response.text
raise Exception(f"Hugging Face API error: {error_detail}")
return response.content
else:
req = urllib.request.Request(HF_API_URL, data=json.dumps(payload).encode(), headers=headers, method='POST')
try:
with urllib.request.urlopen(req, timeout=120) as response:
return response.read()
except urllib.error.HTTPError as e:
raise Exception(f"Hugging Face API error: {e.read().decode()}")
def process_generate_image_job(job_id: str, prompt: str, width: int, height: int, output_path: Path):
"""Background task for image generation."""
try:
jobs[job_id] = {"status": "processing", "progress": 20.0, "message": "Sending prompt to FLUX.1-schnell..."}
image_bytes = generate_image_from_hf(prompt, width, height)
jobs[job_id] = {"status": "processing", "progress": 80.0, "message": "Saving image..."}
from PIL import Image
generated_image = Image.open(io.BytesIO(image_bytes))
generated_image.save(output_path, "PNG")
jobs[job_id] = {
"status": "completed",
"progress": 100.0,
"message": f"Image generated: {generated_image.width}x{generated_image.height}",
"result": str(output_path)
}
except Exception as e:
jobs[job_id] = {"status": "failed", "progress": 0, "error": str(e)}
UPLOAD_DIR = Path("uploads")
OUTPUT_DIR = Path("outputs")
UPLOAD_DIR.mkdir(exist_ok=True)
OUTPUT_DIR.mkdir(exist_ok=True)
class APIHandler(SimpleHTTPRequestHandler):
def do_GET(self):
parsed = urllib.parse.urlparse(self.path)
path = parsed.path
if path == "/" or path == "":
self.serve_html()
elif path == "/docs":
self.serve_swagger()
elif path == "/health":
self.send_json({
"status": "healthy",
"version": "2.2.0 (preview)",
"features": ["enhance", "remove-background", "denoise", "generate-image"]
})
elif path == "/model-info":
self.send_json({
"models": {
"super_resolution": {
"name": "Real-ESRGAN x4plus",
"description": "State-of-the-art image super-resolution",
"note": "Preview mode - deploy to HF Spaces for full AI"
},
"background_removal": {
"name": "BiRefNet-general",
"description": "High-accuracy background removal",
"note": "Preview mode - deploy to HF Spaces for full AI"
},
"noise_reduction": {
"name": "Non-Local Means Denoising",
"description": "Advanced noise reduction algorithm"
},
"image_generation": {
"name": "FLUX.1-schnell",
"description": "Fast, high-quality text-to-image generation by Black Forest Labs",
"max_resolution": "1440x1440",
"default_resolution": "1024x1024",
"source": "https://huggingface.co/black-forest-labs/FLUX.1-schnell"
}
},
"supported_formats": ["png", "jpg", "jpeg", "webp", "bmp"]
})
elif path == "/openapi.json":
self.serve_openapi()
elif path.startswith("/outputs/"):
self.serve_file(path)
elif path.startswith("/progress/"):
job_id = path.split("/progress/")[1]
if job_id in jobs:
self.send_json(jobs[job_id])
else:
self.send_error(404, "Job not found")
elif path.startswith("/result/"):
job_id = path.split("/result/")[1]
if job_id in jobs:
job = jobs[job_id]
if job.get("status") == "completed" and job.get("result"):
result_path = Path(job["result"])
if result_path.exists():
self.send_response(200)
self.send_header('Content-Type', 'image/png')
self.send_header('Content-Disposition', f'attachment; filename="{result_path.name}"')
self.end_headers()
self.wfile.write(result_path.read_bytes())
else:
self.send_error(404, "Result file not found")
elif job.get("status") == "failed":
self.send_error(500, job.get("error", "Job failed"))
else:
self.send_json({"status": job.get("status"), "progress": job.get("progress"), "message": "Job still processing"})
else:
self.send_error(404, "Job not found")
else:
super().do_GET()
def do_POST(self):
parsed = urllib.parse.urlparse(self.path)
path = parsed.path
query = urllib.parse.parse_qs(parsed.query)
if path == "/enhance" or path == "/enhance/base64":
self.handle_enhance(path, query)
elif path == "/remove-background" or path == "/remove-background/base64":
self.handle_remove_background(path, query)
elif path == "/denoise" or path == "/denoise/base64":
self.handle_denoise(path, query)
elif path == "/generate-image" or path == "/generate-image/base64":
self.handle_generate_image(path, query)
elif path == "/generate-image/async":
self.handle_generate_image_async(query)
else:
self.send_error(404, "Not Found")
def parse_multipart(self):
content_type = self.headers.get('Content-Type', '')
if 'multipart/form-data' not in content_type:
return None
boundary = None
for part in content_type.split(';'):
if 'boundary=' in part:
boundary = part.split('=')[1].strip()
break
if not boundary:
return None
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length)
boundary_bytes = boundary.encode()
parts = body.split(b'--' + boundary_bytes)
for part in parts:
if b'filename=' in part:
header_end = part.find(b'\r\n\r\n')
if header_end != -1:
file_data = part[header_end + 4:]
if file_data.endswith(b'\r\n'):
file_data = file_data[:-2]
if file_data.endswith(b'--'):
file_data = file_data[:-2]
if file_data.endswith(b'\r\n'):
file_data = file_data[:-2]
return file_data
return None
def handle_enhance(self, path, query):
try:
file_data = self.parse_multipart()
if not file_data:
self.send_error(400, "No file uploaded")
return
scale = int(query.get('scale', [4])[0])
if scale not in [2, 4]:
scale = 4
from PIL import Image, ImageEnhance
input_image = Image.open(io.BytesIO(file_data))
if input_image.mode != "RGB":
input_image = input_image.convert("RGB")
new_size = (input_image.width * scale, input_image.height * scale)
upscaled = input_image.resize(new_size, Image.LANCZOS)
enhancer = ImageEnhance.Sharpness(upscaled)
sharpened = enhancer.enhance(1.3)
enhancer = ImageEnhance.Contrast(sharpened)
enhanced = enhancer.enhance(1.1)
file_id = str(uuid.uuid4())
output_path = OUTPUT_DIR / f"{file_id}_enhanced.png"
enhanced.save(output_path, "PNG")
if "/base64" in path:
import base64
buffer = io.BytesIO()
enhanced.save(buffer, format="PNG")
buffer.seek(0)
img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
self.send_json({
"success": True,
"image_base64": img_base64,
"original_size": {"width": input_image.width, "height": input_image.height},
"enhanced_size": {"width": enhanced.width, "height": enhanced.height},
"scale_factor": scale,
"note": "Preview mode - deploy to Hugging Face for AI enhancement"
})
else:
self.send_response(200)
self.send_header('Content-Type', 'image/png')
self.send_header('Content-Disposition', 'attachment; filename="enhanced.png"')
self.end_headers()
with open(output_path, 'rb') as f:
self.wfile.write(f.read())
except Exception as e:
self.send_error(500, f"Error processing image: {str(e)}")
def handle_remove_background(self, path, query):
try:
file_data = self.parse_multipart()
if not file_data:
self.send_error(400, "No file uploaded")
return
bgcolor = query.get('bgcolor', ['transparent'])[0]
from PIL import Image
input_image = Image.open(io.BytesIO(file_data))
if input_image.mode != "RGBA":
input_image = input_image.convert("RGBA")
output_image = input_image
file_id = str(uuid.uuid4())
output_path = OUTPUT_DIR / f"{file_id}_nobg.png"
output_image.save(output_path, "PNG")
if "/base64" in path:
import base64
buffer = io.BytesIO()
output_image.save(buffer, format="PNG")
buffer.seek(0)
img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
self.send_json({
"success": True,
"image_base64": img_base64,
"original_size": {"width": input_image.width, "height": input_image.height},
"background": bgcolor,
"note": "Preview mode - deploy to Hugging Face for AI background removal"
})
else:
self.send_response(200)
self.send_header('Content-Type', 'image/png')
self.send_header('Content-Disposition', 'attachment; filename="nobg.png"')
self.end_headers()
with open(output_path, 'rb') as f:
self.wfile.write(f.read())
except Exception as e:
self.send_error(500, f"Error processing image: {str(e)}")
def handle_denoise(self, path, query):
try:
file_data = self.parse_multipart()
if not file_data:
self.send_error(400, "No file uploaded")
return
strength = int(query.get('strength', [10])[0])
from PIL import Image, ImageFilter
input_image = Image.open(io.BytesIO(file_data))
if input_image.mode != "RGB":
input_image = input_image.convert("RGB")
output_image = input_image.filter(ImageFilter.SMOOTH_MORE)
if strength > 10:
output_image = output_image.filter(ImageFilter.SMOOTH_MORE)
if strength > 20:
output_image = output_image.filter(ImageFilter.SMOOTH_MORE)
file_id = str(uuid.uuid4())
output_path = OUTPUT_DIR / f"{file_id}_denoised.png"
output_image.save(output_path, "PNG")
if "/base64" in path:
import base64
buffer = io.BytesIO()
output_image.save(buffer, format="PNG")
buffer.seek(0)
img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
self.send_json({
"success": True,
"image_base64": img_base64,
"original_size": {"width": input_image.width, "height": input_image.height},
"strength": strength,
"note": "Preview mode - deploy to Hugging Face for AI denoising"
})
else:
self.send_response(200)
self.send_header('Content-Type', 'image/png')
self.send_header('Content-Disposition', 'attachment; filename="denoised.png"')
self.end_headers()
with open(output_path, 'rb') as f:
self.wfile.write(f.read())
except Exception as e:
self.send_error(500, f"Error processing image: {str(e)}")
def handle_generate_image(self, path, query):
"""Handle synchronous image generation."""
try:
prompt = query.get('prompt', [''])[0]
if not prompt:
self.send_error(400, "Missing 'prompt' parameter")
return
width = int(query.get('width', [1024])[0])
height = int(query.get('height', [1024])[0])
width = max(256, min(1440, width))
height = max(256, min(1440, height))
async_mode = query.get('async_mode', ['false'])[0].lower() == 'true'
if async_mode:
self.handle_generate_image_async(query)
return
from PIL import Image
image_bytes = generate_image_from_hf(prompt, width, height)
generated_image = Image.open(io.BytesIO(image_bytes))
file_id = str(uuid.uuid4())
output_path = OUTPUT_DIR / f"{file_id}_generated.png"
generated_image.save(output_path, "PNG")
if "/base64" in path:
buffer = io.BytesIO()
generated_image.save(buffer, format="PNG")
buffer.seek(0)
img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
self.send_json({
"success": True,
"image_base64": img_base64,
"size": {"width": generated_image.width, "height": generated_image.height},
"model": "FLUX.1-schnell",
"prompt": prompt
})
else:
self.send_response(200)
self.send_header('Content-Type', 'image/png')
self.send_header('Content-Disposition', f'attachment; filename="generated_{file_id[:8]}.png"')
self.end_headers()
with open(output_path, 'rb') as f:
self.wfile.write(f.read())
except Exception as e:
self.send_error(500, f"Error generating image: {str(e)}")
def handle_generate_image_async(self, query):
"""Handle async image generation with progress tracking."""
try:
prompt = query.get('prompt', [''])[0]
if not prompt:
self.send_error(400, "Missing 'prompt' parameter")
return
width = int(query.get('width', [1024])[0])
height = int(query.get('height', [1024])[0])
width = max(256, min(1440, width))
height = max(256, min(1440, height))
token = get_hf_token()
if not token:
self.send_error(500, "Hugging Face API token not configured. Please set HF_TOKEN secret.")
return
job_id = str(uuid.uuid4())
file_id = str(uuid.uuid4())
output_path = OUTPUT_DIR / f"{file_id}_generated.png"
jobs[job_id] = {"status": "pending", "progress": 0, "message": "Starting image generation..."}
thread = threading.Thread(
target=process_generate_image_job,
args=(job_id, prompt, width, height, output_path)
)
thread.start()
self.send_json({
"job_id": job_id,
"status": "processing",
"message": "Image generation started. Poll /progress/{job_id} for updates.",
"progress_url": f"/progress/{job_id}",
"result_url": f"/result/{job_id}",
"model": "FLUX.1-schnell",
"prompt": prompt
})
except Exception as e:
self.send_error(500, f"Error starting image generation: {str(e)}")
def serve_html(self):
html_path = Path("templates/index.html")
if html_path.exists():
self.send_response(200)
self.send_header('Content-Type', 'text/html')
self.send_header('Cache-Control', 'no-cache')
self.end_headers()
self.wfile.write(html_path.read_bytes())
else:
self.send_error(404, "Template not found")
def serve_swagger(self):
swagger_html = """<!DOCTYPE html>
<html>
<head>
<title>AI Image Processing - API Documentation</title>
<link rel="stylesheet" type="text/css" href="https://unpkg.com/swagger-ui-dist@5/swagger-ui.css">
<style>
body { margin: 0; padding: 0; }
.swagger-ui .topbar { display: none; }
</style>
</head>
<body>
<div id="swagger-ui"></div>
<script src="https://unpkg.com/swagger-ui-dist@5/swagger-ui-bundle.js"></script>
<script>
window.onload = function() {
SwaggerUIBundle({
url: "/openapi.json",
dom_id: '#swagger-ui',
presets: [SwaggerUIBundle.presets.apis, SwaggerUIBundle.SwaggerUIStandalonePreset],
layout: "BaseLayout"
});
};
</script>
</body>
</html>"""
self.send_response(200)
self.send_header('Content-Type', 'text/html')
self.send_header('Cache-Control', 'no-cache')
self.end_headers()
self.wfile.write(swagger_html.encode())
def serve_openapi(self):
openapi_spec = {
"openapi": "3.1.0",
"info": {
"title": "AI Image Processing API",
"description": "Comprehensive AI-powered image processing API.\n\n**Features:**\n- Image enhancement and upscaling (Real-ESRGAN)\n- Background removal (BiRefNet)\n- Noise reduction (OpenCV NLM)\n- Image generation from text (FLUX.1-schnell)\n\n**Note:** This is a preview deployment. Deploy to Hugging Face Spaces for full AI processing.",
"version": "2.2.0"
},
"servers": [{"url": "/", "description": "Current server"}],
"paths": {
"/enhance": {
"post": {
"summary": "Enhance image (upscale)",
"description": "Upscale and enhance image quality using Real-ESRGAN.",
"parameters": [{"name": "scale", "in": "query", "schema": {"type": "integer", "default": 4, "enum": [2, 4]}}],
"requestBody": {"required": True, "content": {"multipart/form-data": {"schema": {"type": "object", "properties": {"file": {"type": "string", "format": "binary"}}}}}},
"responses": {"200": {"description": "Enhanced image"}}
}
},
"/remove-background": {
"post": {
"summary": "Remove background",
"description": "Remove background from image using BiRefNet AI model.",
"parameters": [{"name": "bgcolor", "in": "query", "schema": {"type": "string", "default": "transparent"}}],
"requestBody": {"required": True, "content": {"multipart/form-data": {"schema": {"type": "object", "properties": {"file": {"type": "string", "format": "binary"}}}}}},
"responses": {"200": {"description": "Image with background removed"}}
}
},
"/denoise": {
"post": {
"summary": "Reduce noise",
"description": "Reduce image noise using Non-Local Means Denoising.",
"parameters": [{"name": "strength", "in": "query", "schema": {"type": "integer", "default": 10, "minimum": 1, "maximum": 30}}],
"requestBody": {"required": True, "content": {"multipart/form-data": {"schema": {"type": "object", "properties": {"file": {"type": "string", "format": "binary"}}}}}},
"responses": {"200": {"description": "Denoised image"}}
}
},
"/generate-image": {
"post": {
"summary": "Generate image from text",
"description": "Generate an image from a text prompt using FLUX.1-schnell AI model.",
"parameters": [
{"name": "prompt", "in": "query", "required": True, "schema": {"type": "string"}, "description": "Text prompt describing the image to generate"},
{"name": "width", "in": "query", "schema": {"type": "integer", "default": 1024, "minimum": 256, "maximum": 1440}},
{"name": "height", "in": "query", "schema": {"type": "integer", "default": 1024, "minimum": 256, "maximum": 1440}},
{"name": "async_mode", "in": "query", "schema": {"type": "boolean", "default": False}, "description": "Use async mode with progress tracking"}
],
"responses": {"200": {"description": "Generated image"}}
}
},
"/generate-image/async": {
"post": {
"summary": "Generate image (async)",
"description": "Start async image generation with progress tracking using FLUX.1-schnell.",
"parameters": [
{"name": "prompt", "in": "query", "required": True, "schema": {"type": "string"}, "description": "Text prompt describing the image to generate"},
{"name": "width", "in": "query", "schema": {"type": "integer", "default": 1024, "minimum": 256, "maximum": 1440}},
{"name": "height", "in": "query", "schema": {"type": "integer", "default": 1024, "minimum": 256, "maximum": 1440}}
],
"responses": {"200": {"description": "Job ID for tracking progress"}}
}
},
"/generate-image/base64": {
"post": {
"summary": "Generate image (base64)",
"description": "Generate an image and return as base64-encoded string.",
"parameters": [
{"name": "prompt", "in": "query", "required": True, "schema": {"type": "string"}, "description": "Text prompt describing the image to generate"},
{"name": "width", "in": "query", "schema": {"type": "integer", "default": 1024, "minimum": 256, "maximum": 1440}},
{"name": "height", "in": "query", "schema": {"type": "integer", "default": 1024, "minimum": 256, "maximum": 1440}}
],
"responses": {"200": {"description": "Base64 encoded image"}}
}
},
"/progress/{job_id}": {
"get": {
"summary": "Get job progress",
"description": "Get the progress of an async image processing job.",
"parameters": [{"name": "job_id", "in": "path", "required": True, "schema": {"type": "string"}}],
"responses": {"200": {"description": "Job progress"}}
}
},
"/result/{job_id}": {
"get": {
"summary": "Get job result",
"description": "Get the result of a completed async job.",
"parameters": [{"name": "job_id", "in": "path", "required": True, "schema": {"type": "string"}}],
"responses": {"200": {"description": "Processed image"}}
}
},
"/health": {"get": {"summary": "Health check", "responses": {"200": {"description": "API status"}}}},
"/model-info": {"get": {"summary": "Model information", "responses": {"200": {"description": "Model details"}}}}
}
}
self.send_json(openapi_spec)
def serve_file(self, path):
file_path = Path("." + path)
if file_path.exists():
self.send_response(200)
self.send_header('Content-Type', 'image/png')
self.end_headers()
self.wfile.write(file_path.read_bytes())
else:
self.send_error(404, "File not found")
def send_json(self, data):
response = json.dumps(data, indent=2)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Cache-Control', 'no-cache')
self.end_headers()
self.wfile.write(response.encode())
def run_server(port=5000):
server_address = ('0.0.0.0', port)
httpd = HTTPServer(server_address, APIHandler)
print(f"AI Image Processing API running at http://0.0.0.0:{port}")
print(f"API Documentation: http://0.0.0.0:{port}/docs")
print(f"Frontend: http://0.0.0.0:{port}/")
print("\nNote: This is preview mode using simple processing.")
print("Deploy to Hugging Face Spaces for full AI features.")
httpd.serve_forever()
if __name__ == "__main__":
run_server()