|
|
|
|
|
import os |
|
|
|
|
|
os.environ.setdefault("CUDA_VISIBLE_DEVICES", "") |
|
|
os.environ.setdefault("TRANSFORMERS_NO_ADVISORY_WARNINGS", "1") |
|
|
|
|
|
import sys |
|
|
import traceback |
|
|
import json |
|
|
from typing import List, Optional |
|
|
|
|
|
import requests |
|
|
import torch |
|
|
import torch.nn.functional as F |
|
|
from datasets import load_dataset |
|
|
from transformers import ( |
|
|
AutoProcessor, |
|
|
AutoModel, |
|
|
AutoTokenizer, |
|
|
AutoModelForCausalLM, |
|
|
) |
|
|
from PIL import Image |
|
|
import gradio as gr |
|
|
from tqdm import tqdm |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SIGLIP_MODEL_ID = "EYEDOL/siglipFULL-agri-finetuned" |
|
|
LLAVA_MODEL_ID = "liuhaotian/llava-v1.6-vicuna-7b" |
|
|
DATASET_TEMPLATE = "EYEDOL/AGRILLAVA-image-text{}" |
|
|
NUM_DATASETS = 1 |
|
|
BATCH_SIZE = 16 |
|
|
TOP_K_DEFAULT = 3 |
|
|
|
|
|
|
|
|
HF_API_URL = "https://router.huggingface.co/hf-inference" |
|
|
HUGGINGFACE_TOKEN = os.environ.get("HUGGINGFACE_TOKEN", None) |
|
|
|
|
|
|
|
|
device = torch.device("cpu") |
|
|
print("Running on device:", device) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("Loading datasets and computing SigLip text embeddings (startup)...") |
|
|
texts_all: List[str] = [] |
|
|
for i in range(1, NUM_DATASETS + 1): |
|
|
ds = load_dataset(DATASET_TEMPLATE.format(i), split="train") |
|
|
texts_all.extend(ds["text"]) |
|
|
|
|
|
siglip_processor = AutoProcessor.from_pretrained(SIGLIP_MODEL_ID) |
|
|
siglip_model = AutoModel.from_pretrained(SIGLIP_MODEL_ID).to(device) |
|
|
siglip_model.eval() |
|
|
|
|
|
|
|
|
text_embeds_parts = [] |
|
|
for i in tqdm(range(0, len(texts_all), BATCH_SIZE), desc="Encoding texts (CPU)"): |
|
|
batch_texts = texts_all[i : i + BATCH_SIZE] |
|
|
inputs = siglip_processor(text=batch_texts, padding=True, truncation=True, return_tensors="pt") |
|
|
with torch.no_grad(): |
|
|
text_embeds = siglip_model.get_text_features(**inputs) |
|
|
text_embeds = text_embeds / text_embeds.norm(p=2, dim=-1, keepdim=True) |
|
|
text_embeds_parts.append(text_embeds.cpu()) |
|
|
del inputs, text_embeds |
|
|
if text_embeds_parts: |
|
|
text_embeds_all = torch.cat(text_embeds_parts, dim=0) |
|
|
else: |
|
|
text_embeds_all = torch.empty((0, 0)) |
|
|
print(f"Encoded {len(texts_all)} texts. Embeddings shape: {text_embeds_all.shape}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
llava_tokenizer: Optional[AutoTokenizer] = None |
|
|
llava_model = None |
|
|
llava_mode: Optional[str] = None |
|
|
load_errors = [] |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
from llava.model import LlavaForCausalLM |
|
|
|
|
|
print("Loading LlavaForCausalLM from installed 'llava' package (CPU)...") |
|
|
llava_tokenizer = AutoTokenizer.from_pretrained(LLAVA_MODEL_ID, use_fast=False) |
|
|
llava_model = LlavaForCausalLM.from_pretrained( |
|
|
LLAVA_MODEL_ID, |
|
|
device_map={"": "cpu"}, |
|
|
torch_dtype=torch.float32, |
|
|
low_cpu_mem_usage=True, |
|
|
) |
|
|
llava_model.to(device) |
|
|
llava_model.eval() |
|
|
llava_mode = "local" |
|
|
print("✅ Llava loaded from installed package.") |
|
|
except Exception: |
|
|
tb_local = traceback.format_exc() |
|
|
load_errors.append(("local_llava_import", tb_local)) |
|
|
print("Local llava import failed — will try trust_remote_code fallback. See logs for details.") |
|
|
|
|
|
|
|
|
if llava_mode is None: |
|
|
try: |
|
|
print("Attempting AutoModelForCausalLM.from_pretrained(..., trust_remote_code=True) (CPU)...") |
|
|
llava_tokenizer = AutoTokenizer.from_pretrained(LLAVA_MODEL_ID, use_fast=False) |
|
|
llava_model = AutoModelForCausalLM.from_pretrained( |
|
|
LLAVA_MODEL_ID, |
|
|
trust_remote_code=True, |
|
|
device_map={"": "cpu"}, |
|
|
torch_dtype=torch.float32, |
|
|
low_cpu_mem_usage=True, |
|
|
) |
|
|
llava_model.to(device) |
|
|
llava_model.eval() |
|
|
llava_mode = "trust_remote_code" |
|
|
print("✅ Llava loaded via trust_remote_code fallback.") |
|
|
except Exception: |
|
|
tb_trust = traceback.format_exc() |
|
|
load_errors.append(("fallback_trust_remote_code", tb_trust)) |
|
|
print("trust_remote_code fallback failed — will try HF router if token provided.") |
|
|
|
|
|
|
|
|
if llava_mode is None and HUGGINGFACE_TOKEN: |
|
|
llava_mode = "hf_api" |
|
|
print("No usable local model found. Will use Hugging Face router Inference API for generation (HUGGINGFACE_TOKEN detected).") |
|
|
|
|
|
if llava_mode is None: |
|
|
print("WARNING: No Llava model available and no HUGGINGFACE_TOKEN supplied. Generation will return an actionable error.") |
|
|
for name, tb in load_errors: |
|
|
print(f"--- {name} traceback ---\n{tb}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def call_hf_inference_api(prompt: str, max_new_tokens: int = 256, temperature: float = 0.0): |
|
|
if not HUGGINGFACE_TOKEN: |
|
|
raise RuntimeError("HUGGINGFACE_TOKEN not set; cannot call Hugging Face Inference API.") |
|
|
headers = {"Authorization": f"Bearer {HUGGINGFACE_TOKEN}", "Content-Type": "application/json"} |
|
|
payload = { |
|
|
"model": LLAVA_MODEL_ID, |
|
|
"inputs": prompt, |
|
|
"parameters": {"max_new_tokens": max_new_tokens, "temperature": temperature}, |
|
|
"options": {"wait_for_model": True}, |
|
|
} |
|
|
resp = requests.post(HF_API_URL, headers=headers, json=payload, timeout=300) |
|
|
if resp.status_code != 200: |
|
|
raise RuntimeError(f"HF Inference API error {resp.status_code}: {resp.text}") |
|
|
data = resp.json() |
|
|
|
|
|
if isinstance(data, list) and data and isinstance(data[0], dict) and "generated_text" in data[0]: |
|
|
return data[0]["generated_text"] |
|
|
if isinstance(data, dict) and "generated_text" in data: |
|
|
return data["generated_text"] |
|
|
if isinstance(data, str): |
|
|
return data |
|
|
return json.dumps(data) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def retrieve_top_k_texts(image: Image.Image, k: int = TOP_K_DEFAULT): |
|
|
inputs = siglip_processor(images=image, return_tensors="pt") |
|
|
with torch.no_grad(): |
|
|
img_embed = siglip_model.get_image_features(**inputs) |
|
|
img_embed = img_embed / img_embed.norm(p=2, dim=-1, keepdim=True) |
|
|
|
|
|
sims = F.cosine_similarity(img_embed.cpu(), text_embeds_all) |
|
|
topk = torch.topk(sims, k) |
|
|
results = [(texts_all[idx.item()], float(score)) for idx, score in zip(topk.indices, topk.values)] |
|
|
return results |
|
|
|
|
|
def llava_answer(image: Image.Image, retrieved_texts, question: str, max_tokens: int = 256): |
|
|
context_text = "\n".join([f"Retrieved Text: {t}" for t, _ in retrieved_texts]) |
|
|
prompt = ( |
|
|
"You are an agricultural assistant. Use the provided retrieved texts to answer concisely.\n\n" |
|
|
f"Retrieved texts:\n{context_text}\n\n" |
|
|
f"User question: {question}\n\n" |
|
|
"Provide a concise, actionable answer and crop suggestions when applicable." |
|
|
) |
|
|
|
|
|
if llava_mode in ("local", "trust_remote_code"): |
|
|
inputs = llava_tokenizer(prompt, return_tensors="pt") |
|
|
inputs = {k: v.to(device) for k, v in inputs.items()} |
|
|
with torch.no_grad(): |
|
|
output_ids = llava_model.generate(**inputs, max_new_tokens=max_tokens) |
|
|
resp = llava_tokenizer.decode(output_ids[0], skip_special_tokens=True) |
|
|
return resp |
|
|
elif llava_mode == "hf_api": |
|
|
return call_hf_inference_api(prompt, max_new_tokens=max_tokens) |
|
|
else: |
|
|
err = ( |
|
|
"No Llava model is available for generation.\n\n" |
|
|
"Fix options:\n" |
|
|
"1) Install the LLaVA repo in requirements.txt and rebuild the Space:\n" |
|
|
" git+https://github.com/haotian-liu/LLaVA.git@main\n" |
|
|
"2) Or add a valid Hugging Face API token as HUGGINGFACE_TOKEN in Space secrets to use the router.\n\n" |
|
|
"Check Space logs for detailed tracebacks printed at startup." |
|
|
) |
|
|
return err |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def gradio_pipeline(image: Image.Image, question: str, k: int = TOP_K_DEFAULT): |
|
|
if image is None or not question: |
|
|
return None, "Please provide both an image and a question." |
|
|
retrieved = retrieve_top_k_texts(image, k=int(k)) |
|
|
try: |
|
|
answer = llava_answer(image, retrieved, question) |
|
|
except Exception as e: |
|
|
tb = traceback.format_exc() |
|
|
answer = f"Error during generation: {e}\n\nTraceback:\n{tb}" |
|
|
return image, answer |
|
|
|
|
|
with gr.Blocks(title="Agri Image + Question → Llava Response (robust)") as demo: |
|
|
gr.Markdown( |
|
|
"## Agri Image QA\n\nThis app preloads SigLip embeddings at startup. " |
|
|
"Generation uses a local Llava model if available, otherwise the Hugging Face router Inference API " |
|
|
"(requires HUGGINGFACE_TOKEN secret in Space settings)." |
|
|
) |
|
|
with gr.Row(): |
|
|
img_in = gr.Image(type="pil") |
|
|
out_img = gr.Image(type="pil", label="Image") |
|
|
question_input = gr.Textbox(label="Question about the image", lines=2) |
|
|
k_slider = gr.Slider(minimum=1, maximum=10, step=1, value=TOP_K_DEFAULT, label="Top-k retrieval") |
|
|
txt_out = gr.Textbox(label="Llava Response", lines=12) |
|
|
run_btn = gr.Button("Generate Answer") |
|
|
run_btn.click(fn=gradio_pipeline, inputs=[img_in, question_input, k_slider], outputs=[out_img, txt_out]) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch(server_name="0.0.0.0", share=False) |
|
|
|