|
|
import os |
|
|
import tempfile |
|
|
import json |
|
|
import re |
|
|
import gradio as gr |
|
|
import google.generativeai as genai |
|
|
import random |
|
|
from PIL import Image |
|
|
|
|
|
print("Google Generative AI SDK version:", genai.__version__) |
|
|
|
|
|
API_KEYS = [ |
|
|
"AIzaSyBAZ1Zm2KCZHlmVKPgVf2Cch0c-0YJfJVg", |
|
|
"AIzaSyCWiMI9o4riw_9ucsSrIyModT3YD3a8BsU", |
|
|
"AIzaSyBJFwj-Wzq-kLOLlsodR5Lwf4qIT6d2dJQ", |
|
|
"AIzaSyAPFCgH8uSjANmPRF9iHYIYcneTOod8Qi0", |
|
|
"AIzaSyBbK-1P3JD6HPyE3QLhkOps6_-Xo3wUFbs" |
|
|
|
|
|
] |
|
|
|
|
|
key_index = 0 |
|
|
def get_next_key(): |
|
|
global key_index |
|
|
key = API_KEYS[key_index % len(API_KEYS)] |
|
|
key_index += 1 |
|
|
return key |
|
|
|
|
|
|
|
|
|
|
|
def extract_json(text): |
|
|
match = re.search(r"```json\s*(.*?)\s*```", text, re.IGNORECASE | re.DOTALL) |
|
|
if match: |
|
|
json_text = match.group(1).strip() |
|
|
else: |
|
|
json_text = text.strip() |
|
|
|
|
|
try: |
|
|
return json.loads(json_text) |
|
|
except Exception: |
|
|
first, last = json_text.find("{"), json_text.rfind("}") |
|
|
if first != -1 and last != -1 and last > first: |
|
|
try: |
|
|
return json.loads(json_text[first:last+1]) |
|
|
except Exception: |
|
|
pass |
|
|
return {"raw_response": text} |
|
|
|
|
|
|
|
|
|
|
|
def process_image(image, prompt): |
|
|
temp_file = None |
|
|
uploaded_file = None |
|
|
|
|
|
try: |
|
|
print(f"Received image: {type(image)}") |
|
|
print(f"Received prompt: {prompt}") |
|
|
|
|
|
api_key = random.choice(API_KEYS) |
|
|
if not api_key: |
|
|
return "ERROR: Missing GOOGLE_API_KEY.", None |
|
|
genai.configure(api_key=api_key) |
|
|
model_name = "gemini-2.5-flash" |
|
|
model = genai.GenerativeModel(model_name=model_name, |
|
|
generation_config={}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
COMPRESSION_QUALITY = 80 |
|
|
|
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpeg") as tmp: |
|
|
|
|
|
|
|
|
if hasattr(image, "save"): |
|
|
|
|
|
img = image |
|
|
elif isinstance(image, str) and os.path.exists(image): |
|
|
|
|
|
img = Image.open(image) |
|
|
else: |
|
|
|
|
|
raise ValueError(f"Unsupported image type: {type(image)}") |
|
|
|
|
|
if img.mode == 'RGBA': |
|
|
|
|
|
background = Image.new('RGB', img.size, (255, 255, 255)) |
|
|
|
|
|
background.paste(img, mask=img.split()[3]) |
|
|
img = background |
|
|
|
|
|
|
|
|
img.save( |
|
|
tmp.name, |
|
|
format='JPEG', |
|
|
optimize=True, |
|
|
quality=COMPRESSION_QUALITY |
|
|
) |
|
|
|
|
|
temp_file = tmp.name |
|
|
|
|
|
print("Temp file:", temp_file) |
|
|
|
|
|
|
|
|
|
|
|
uploaded_file = genai.upload_file(path=temp_file, mime_type="image/jpeg") |
|
|
print("Uploaded:", uploaded_file.name) |
|
|
|
|
|
|
|
|
|
|
|
response = model.generate_content([prompt, uploaded_file]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("Raw response:", response.text[:200], "...") |
|
|
|
|
|
return extract_json(response.text) |
|
|
|
|
|
except Exception as e: |
|
|
print("Error:", e) |
|
|
import traceback; traceback.print_exc() |
|
|
return {"error": str(e)} |
|
|
|
|
|
finally: |
|
|
if temp_file and os.path.exists(temp_file): |
|
|
os.remove(temp_file) |
|
|
if uploaded_file: |
|
|
try: |
|
|
genai.delete_file(uploaded_file.name) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
demo = gr.Interface( |
|
|
fn=process_image, |
|
|
inputs=[ |
|
|
gr.File(label="Upload Image", file_types=["image"]), |
|
|
gr.Textbox(lines=5, placeholder="Enter your prompt here...", label="Prompt"), |
|
|
], |
|
|
outputs=gr.JSON(label="Response"), |
|
|
title="OCR & Analyzer (RAG Enhanced)", |
|
|
description="Upload an image + prompt → analyze with RAG store", |
|
|
flagging_mode="never", |
|
|
) |
|
|
|
|
|
demo.api_name = "/predict" |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch( |
|
|
server_name="0.0.0.0", |
|
|
server_port=int(os.getenv("PORT", "7860")), |
|
|
show_error=True, |
|
|
debug=True, |
|
|
) |
|
|
|