import os import io import torch import torch.nn as nn from torchvision import transforms from torchvision.models import efficientnet_b3, EfficientNet_B3_Weights from PIL import Image from transformers import AutoTokenizer, AutoModelForCausalLM import gradio as gr from typing import Dict, Any, List, Tuple KNOWLEDGE_BASE: Dict[str, Dict[str, Any]] = { "DROUGHT_LEAVES": { "keywords": ["drought", "wilt", "dehydrated", "scorched leaf", "shriveled leaf", "water stress", "leaf margin"], "chunks": { "DESCRIPTION_AND_CAUSE": "**The leaves are undergoing desiccation (drying out) because the plant lacks sufficient soil moisture.** This is often triggered by ** prolonged drought, high heat, or windy conditions ** that cause the plant to lose water faster than its roots can supply it.", "DIAGNOSTIC_CLUES": "Look for leaf margins that are ** brown, brittle, or curled inwards **. The leaves will ** wilt noticeably during the midday sun **, even if they recover slightly overnight. Check soil is dry 4-6 inches deep.", "IMMEDIATE_ACTION": "Water the plants ** deeply and evenly ** using ** drip irrigation ** or a soaker hose, checking the soil moisture 6 inches down. If possible, ** apply shade nets ** during the peak afternoon heat.", "PREVENTION_AND_LONGTERM": "** Apply a thick layer of organic mulch ** (straw or dried leaves) to cool the soil and drastically reduce water evaporation. Ensure your irrigation system is ** consistent and efficient **." } }, "DROUGHT_FRUITS": { "keywords": ["drought fruit", "dry fruit", "shriveled fruit", "dried fruit", "leathery fruit", "fruit desiccation"], "chunks": { "DESCRIPTION_AND_CAUSE": "**The fruits are small, hard, and shriveled because water stress limits the plant’s ability to send enough water to the developing fruit tissue.** This is a symptom of ** severe or prolonged drought ** during the critical fruit enlargement stage.", "DIAGNOSTIC_CLUES": "The fruits will feel ** hard or leathery ** instead of plump. They may show ** uneven ripening ** or stop enlarging completely.", "IMMEDIATE_ACTION": "Immediately ** ensure consistent, deep irrigation ** to stabilize soil moisture. Lightly misting the foliage early in the morning can provide temporary relief.", "PREVENTION_AND_LONGTERM": "Maintain a ** strict irrigation schedule ** based on the weather forecast and plant stage. Consider a ** foliar spray of potassium ** during fruiting to improve the fruit's water-holding capacity." } }, "UNRIPE_FRUITS": { "keywords": ["unripe", "green fruit", "immature", "delayed", "slow color", "potassium deficiency"], "chunks": { "DESCRIPTION_AND_CAUSE": "**Ripening is delayed because the necessary sugar accumulation and pigment production enzymes are inhibited.** Common causes include ** low temperatures, insufficient sunlight due to shading, or nutrient imbalances **, particularly ** low phosphorus or potassium ** and excessive nitrogen.", "DIAGNOSTIC_CLUES": "Fruits remain ** firm and primarily green or pale ** for an extended period after reaching full size. Check for ** dense foliage ** that is blocking light.", "IMMEDIATE_ACTION": "** Remove excessive foliage ** (light pruning) to expose the fruits to 6–8 hours of direct sunlight per day. If available, apply a quick-release ** potassium-rich fertilizer ** near the plants.", "PREVENTION_AND_LONGTERM": "** Conduct a soil test ** to check your P:K:N balance. Ensure adequate potassium and phosphorus levels are maintained before and during the fruiting period. Choose a variety suited to your local climate." } }, "HEALTHY_RIPE": { "keywords": ["ripe", "mature", "healthy", "lush", "uniformly red", "quality", "post-harvest", "no spots"], "chunks": { "DESCRIPTION_AND_CAUSE": "The plant is in ** optimal health with successful maturity **. The fruit's uniform color and firmness are due to ** balanced water supply, sufficient nutrients, and proper cultural practices ** that allow natural ripening processes to proceed efficiently.", "DIAGNOSTIC_CLUES": "** Fruits are uniformly red, glossy, firm, and aromatic **, without any signs of spots, mold, or shriveling. Leaves are a ** vibrant, dark green **.", "IMMEDIATE_ACTION": "** Harvest the fruit in the morning ** when the fruits are cool. ** Handle gently ** to avoid bruising and cool the fruit quickly after picking to prolong shelf life.", "PREVENTION_AND_LONGTERM": "Maintain a ** balanced fertilization program ** and continue ** regular scouting ** for early signs of pests and diseases. ** Prioritize good drainage ** to prevent waterlogging." } }, "FUNGAL_LEAVES": { "keywords": ["dark spot", "purplish spot", "leaf spot", "blight", "leaf mildew", "fruit mildew", "white powder leaf"], "chunks": { "DESCRIPTION_AND_CAUSE": "**Leaves are infected by a fungal pathogen**, causing cell necrosis (death) or surface growth. This infection is ** favored by extended periods of leaf wetness, high humidity, or poor air circulation **.", "DIAGNOSTIC_CLUES": "Look for ** dark or purplish circular spots ** on the leaves, or a ** fuzzy white/gray powder ** coating the leaf surface. New growth may appear stunted or distorted.", "IMMEDIATE_ACTION": "** Immediately remove and destroy ** all infected leaves and plant debris. ** Switch from overhead irrigation ** to drip or soaker methods, and ** water only at the base ** of the plant in the morning.", "PREVENTION_AND_LONGTERM": "** Ensure adequate plant spacing ** to improve air circulation. Consider applying an approved ** copper-based or systemic fungicide ** according to local guidelines, and practice ** crop rotation ** (3–4 years)." } }, "FUNGAL_FRUITS": { "keywords": ["fruit rot", "gray mold", "botrytis", "moldy fruit", "soft fruit", "fruit mildew", "white powder fruit"], "chunks": { "DESCRIPTION_AND_CAUSE": "**Fruit tissue is being decomposed by fungi (like *Botrytis cinerea*) that colonize the fruit.** This is often caused by ** extended wet periods, poor ventilation in the canopy, or damage/wounds ** on the fruit surface.", "DIAGNOSTIC_CLUES": "The fruit becomes ** soft, mushy, and often develops a fuzzy gray mold ** or a ** white powdery coating **. The rot spreads quickly, especially where fruits are clustered or touch the ground.", "IMMEDIATE_ACTION": "** Harvest frequently and immediately remove and discard (do not compost) ** all rotten and infected fruits. ** Apply mulch ** beneath the plants to prevent fruit contact with the soil.", "PREVENTION_AND_LONGTERM": "Maintain a ** clean field environment **. Implement a preventative fungicide or bio-control program during the flowering and fruiting stage, and ensure ** good airflow ** within the plant canopy." } } } def retrieve_knowledge(caption: str, knowledge_base: Dict[str, Dict[str, Any]]) -> List[Tuple[str, str]]: caption_lower = caption.lower() best_match_key = None max_matches = 0 priority_order = list(knowledge_base.keys()) for key in priority_order: matches = sum(1 for keyword in knowledge_base[key]["keywords"] if keyword in caption_lower) phrase_boost = sum(1 for keyword in knowledge_base[key]["keywords"] if " " in keyword and keyword in caption_lower) matches += phrase_boost if matches > max_matches: max_matches = matches best_match_key = key retrieved_chunks = [] if best_match_key and max_matches > 0: for label, text in knowledge_base[best_match_key]["chunks"].items(): retrieved_chunks.append((label, text)) return retrieved_chunks if not retrieved_chunks and any(kw in caption_lower for kw in KNOWLEDGE_BASE.get("HEALTHY_RIPE", {}).get("keywords", [])): for label, text in KNOWLEDGE_BASE.get("HEALTHY_RIPE", {}).get("chunks", {}).items(): retrieved_chunks.append((label, text)) return retrieved_chunks return [] device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Using device: {device}") dtype = torch.float16 if device.type == "cuda" else torch.float32 try: from transformers import LlamaTokenizerFast tokenizer = LlamaTokenizerFast.from_pretrained("hf-internal-testing/llama-tokenizer") except Exception: tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased") if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token if hasattr(tokenizer, "eos_token") else "<|pad|>" VOCAB_SIZE = getattr(tokenizer, "vocab_size", 30522) ENCODER_FEATURE_DIM = 1536 MAX_LEN = 30 DROPOUT_RATE = 0.45 class ImageEncoder(nn.Module): def __init__(self, embed_dim=ENCODER_FEATURE_DIM): super().__init__() backbone = efficientnet_b3(weights=EfficientNet_B3_Weights.IMAGENET1K_V1) self.feature_extractor = backbone.features def forward(self, x): x = self.feature_extractor(x) x = x.flatten(2).permute(0, 2, 1) return x class CaptionModel(nn.Module): def __init__( self, encoder, vocab_size=VOCAB_SIZE, d_model=512, nhead=8, num_layers=4, max_len=MAX_LEN, dropout_rate=DROPOUT_RATE, encoder_feature_dim=ENCODER_FEATURE_DIM, ): super().__init__() self.encoder = encoder self.feature_proj = nn.Linear(encoder_feature_dim, d_model) self.embedding = nn.Embedding(vocab_size, d_model) self.pos_encoder = nn.Parameter(torch.zeros(1, max_len, d_model)) decoder_layer = nn.TransformerDecoderLayer( d_model, nhead, dim_feedforward=d_model * 4, dropout=dropout_rate, batch_first=True, ) self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers) self.fc_out = nn.Linear(d_model, vocab_size) def forward(self, images, captions): features = self.encoder(images) features = self.feature_proj(features) embeddings = self.embedding(captions) + self.pos_encoder[:, : captions.size(1)] T = captions.size(1) tgt_mask = nn.Transformer.generate_square_subsequent_mask(T).to(captions.device) output = self.transformer_decoder(tgt=embeddings, memory=features, tgt_mask=tgt_mask) return self.fc_out(output) def generate_caption_beam( model, img_tensor, device, max_len=MAX_LEN, num_beams=3, repetition_penalty=1.5, length_penalty=0.7, ): model.eval() with torch.no_grad(): img = img_tensor.unsqueeze(0).to(device) features = model.encoder(img) features = model.feature_proj(features) bos_id = tokenizer.bos_token_id if hasattr(tokenizer, "bos_token_id") else 0 beam = [(torch.tensor([[bos_id]], device=device), 0.0)] finished_beams = [] for _ in range(max_len): new_beam = [] if len(finished_beams) >= num_beams: break for seq, raw_score in beam: if hasattr(tokenizer, "eos_token_id") and seq[0, -1].item() == tokenizer.eos_token_id: normalized_score = raw_score / (seq.size(1) ** length_penalty) finished_beams.append((seq, normalized_score)) continue T = seq.size(1) tgt_mask = nn.Transformer.generate_square_subsequent_mask(T).to(device) embeddings = model.embedding(seq) + model.pos_encoder[:, :T] output = model.transformer_decoder(tgt=embeddings, memory=features, tgt_mask=tgt_mask) logits = model.fc_out(output)[:, -1, :].squeeze() for prev_id in seq.squeeze(0).tolist(): if logits[prev_id] > 0: logits[prev_id] /= repetition_penalty else: logits[prev_id] *= repetition_penalty probs = torch.log_softmax(logits, dim=-1) topk_probs, topk_idx = torch.topk(probs, num_beams) for i in range(num_beams): next_id = topk_idx[i].unsqueeze(0).unsqueeze(0) new_seq = torch.cat([seq, next_id], dim=1) new_raw_score = raw_score + topk_probs[i].item() new_beam.append((new_seq, new_raw_score)) new_beam.sort(key=lambda x: x[1], reverse=True) beam = new_beam[:num_beams] for seq, raw_score in beam: normalized_score = raw_score / (seq.size(1) ** length_penalty) finished_beams.append((seq, normalized_score)) if not finished_beams: return "Caption generation failed." best_seq, _ = sorted(finished_beams, key=lambda x: x[1], reverse=True)[0] caption = tokenizer.decode(best_seq.squeeze().tolist(), skip_special_tokens=True) caption = caption.replace("..", ".").replace(". .", ".").strip() caption = " ".join(caption.split()) if caption: first_period_index = caption.find(".") if first_period_index != -1: caption = caption[: first_period_index + 1] elif not caption.endswith("."): caption += "." return caption MODEL_PATH = "EfficientNetB3_model.pth" model_loaded_successfully = False try: if os.path.exists(MODEL_PATH): encoder = ImageEncoder() caption_model = CaptionModel(encoder, vocab_size=VOCAB_SIZE, dropout_rate=DROPOUT_RATE).to(device) caption_model.load_state_dict(torch.load(MODEL_PATH, map_location=device)) caption_model.eval() model_loaded_successfully = True else: raise FileNotFoundError except Exception: class MockCaptionModel(nn.Module): def __init__(self): super().__init__() def eval(self): pass caption_model = MockCaptionModel() transform = transforms.Compose( [ transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), ] ) LLM_MODEL_ID = "Qwen/Qwen2.5-3B-Instruct" llm = None llm_tokenizer = None try: llm_tokenizer = AutoTokenizer.from_pretrained(LLM_MODEL_ID) if device.type == "cuda": llm = AutoModelForCausalLM.from_pretrained(LLM_MODEL_ID, torch_dtype=dtype, device_map="auto") else: llm = AutoModelForCausalLM.from_pretrained(LLM_MODEL_ID, torch_dtype=dtype, device_map="cpu") if llm_tokenizer.pad_token is None: llm_tokenizer.pad_token = llm_tokenizer.eos_token print("LLM loaded:", True) except Exception as e: print("LLM failed to load (this may be expected on CPU-only environments):", e) llm = None llm_tokenizer = None print("LLM loaded:", False) def get_multiple_recommendations(pred_caption: str, llm_model, tokenizer_model, knowledge_base): if llm_model is None or tokenizer_model is None: return ( "Recommendations not available: LLM failed to load. The required models could not be loaded on this device.", [], ) retrieved_chunks = retrieve_knowledge(pred_caption, knowledge_base) context_text = "" if retrieved_chunks: context_text = "\n\n--- RAG KNOWLEDGE CONTEXT ---\n" for label, text in retrieved_chunks: context_text += f"**{label.replace('_', ' ')}**: {text}\n" context_text += "------------------------------\n\n" system_prompt = ( "You are a highly detailed and precise agricultural assistant specializing in strawberries. " "Your task is to generate a rich, professional, and actionable recommendation strictly based on the provided caption and RAG context. " "The output MUST be formatted into three distinct sections, each ending with a single paragraph/sentence. " "Do not introduce unobserved problems or speculate. Do not use salutations or empathy. " ) user_prompt = ( f'CAPTION: "{pred_caption}"\n\n' f"{context_text}" "INSTRUCTION: Generate a comprehensive analysis and recommendation in the following three-part stacked format, with rich descriptive text:\n" "1. Cause: A detailed sentence describing the likely cause and condition based on the caption and RAG context.\n" "2. Immediate Action: A comprehensive sentence detailing specific, time-sensitive actions the grower must take immediately.\n" "3. Long-term Action: A forward-looking sentence outlining preventative and sustainable strategies for the future.\n" "Ensure the output strictly follows the 'Label: Text' format below. Do not add extra text, line breaks, or numbering.\n\n" "Cause: [Your descriptive text for the cause]\n" "Immediate Action: [Your descriptive text for the immediate steps]\n" "Long-term Action: [Your descriptive text for the long-term steps]\n" ) messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ] try: prompt = tokenizer_model.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) except Exception: prompt = system_prompt + "\n\n" + user_prompt inputs = tokenizer_model(prompt, return_tensors="pt", truncation=True, max_length=1024).to(llm_model.device) output = llm_model.generate( **inputs, max_new_tokens=300, temperature=0.7, top_p=0.9, do_sample=True, repetition_penalty=1.1, pad_token_id=tokenizer_model.pad_token_id, ) text = tokenizer_model.decode(output[0], skip_special_tokens=False) response_start_tag = "<|im_start|>assistant\n" if response_start_tag in text: generated_text = text.split(response_start_tag)[-1].strip() else: generated_text = tokenizer_model.decode(output[0][inputs.input_ids.shape[1] :], skip_special_tokens=True).strip() final_recommendations = ( generated_text.replace(getattr(tokenizer_model, "eos_token", ""), "").replace("<|im_end|>", "").strip() ) final_recommendations = final_recommendations.replace("Cause:", "Cause:") final_recommendations = final_recommendations.replace("Immediate Action:", "Immediate Action:") final_recommendations = final_recommendations.replace("Long-term Action:", "Long-term Action:") return final_recommendations, retrieve_knowledge(pred_caption, knowledge_base) def get_rag_chat_response(message: str, history: list, caption: str, rag_context: str): if llm is None or llm_tokenizer is None: history.append((message, "Chat not available: LLM failed to load on this device.")) return history, history chat_system_prompt = ( "You are an expert, professional agricultural advisor for strawberry plants. " "Base your advice STRICTLY on the visual evidence provided (Image Caption) and the expert RAG Knowledge. " "Maintain a helpful, advisory, and professional tone. Keep responses concise unless asked for detail. " "Do not introduce unobserved problems or speculate. " f"--- Image Analysis ---\nCaption: {caption}\n" f"--- RAG Knowledge ---\n{rag_context}\n" "-----------------------\n" "Answer the user's question, using the provided context." ) messages = [{"role": "system", "content": chat_system_prompt}] for user_msg, assistant_msg in history: messages.append({"role": "user", "content": user_msg}) messages.append({"role": "assistant", "content": assistant_msg}) messages.append({"role": "user", "content": message}) try: prompt = llm_tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) except Exception: prompt_lines = [chat_system_prompt] for m in messages[1:]: prompt_lines.append(f"{m['role']}: {m['content']}") prompt = "\n\n".join(prompt_lines) inputs = llm_tokenizer(prompt, return_tensors="pt", truncation=True, max_length=1024).to(llm.device) output = llm.generate( **inputs, max_new_tokens=256, temperature=0.8, top_p=0.9, do_sample=True, repetition_penalty=1.1, pad_token_id=llm_tokenizer.pad_token_id, ) text = llm_tokenizer.decode(output[0], skip_special_tokens=False) response_start_tag = "<|im_start|>assistant\n" if response_start_tag in text: generated_text = text.split(response_start_tag)[-1].strip() else: generated_text = llm_tokenizer.decode(output[0][inputs.input_ids.shape[1] :], skip_special_tokens=True).strip() chat_response = ( generated_text.replace(getattr(llm_tokenizer, "eos_token", ""), "").replace("<|im_end|>", "").strip() ) history.append((message, chat_response)) return history, history def process_image_upload(image: Image.Image): pil_img = image.convert("RGB") try: img_tensor = transform(pil_img).to(device) except Exception: img_tensor = transform(pil_img) if model_loaded_successfully and hasattr(caption_model, "encoder"): try: caption = generate_caption_beam(caption_model, img_tensor, device) except Exception as e: print("Caption generation error:", e) caption = "Wrong Plant/Fruit Image!" else: caption = "Wrong Plant/Fruit Image!" recommendations, retrieved_list = get_multiple_recommendations(caption, llm, llm_tokenizer, KNOWLEDGE_BASE) if retrieved_list: retrieved_str = "\n\n".join([f"**{lab.replace('_', ' ')}**: {txt}" for lab, txt in retrieved_list]) else: retrieved_str = "No RAG context retrieved." return pil_img, caption, retrieved_str, recommendations, [] title = "Chat-O-Berry Plant Health Advisor" with gr.Blocks(title=title) as demo: gr.Markdown(""" """) chat_history_state = gr.State(value=[]) rag_state = gr.State(value="") with gr.Group(visible=True) as landing_group: gr.Markdown("## 🍓 Welcome to Chat-O-Berry") with gr.Row(): with gr.Column(scale=1): landing_image = gr.Image( value="samples/strawberry.jpg", label=None, show_label=False, interactive=False, height=260, elem_classes=["hero-img"], ) with gr.Column(scale=2): gr.Markdown( """
Ready to assess your plants? Open the advisor below.
" ) with gr.Row(): with gr.Column(scale=3): gr.Markdown("") with gr.Column(scale=4): gr.Markdown("") with gr.Column(scale=3): go_to_advisor_btn = gr.Button( "Open Chat-O-Berry Advisor", variant="primary", size="sm", ) with gr.Group(visible=False) as advisor_group: gr.Markdown("# 🍓 Chat‑O‑Berry Plant Health Advisor") gr.Markdown("Upload a plant image for AI‑powered health analysis and agronomic recommendations.") with gr.Row(): with gr.Column(scale=1): image_in = gr.Image(type="pil", label="Upload Plant Image", interactive=True) run_btn = gr.Button("Analyze Plant Health", variant="primary") hidden_out_image = gr.Image(visible=False) gr.Examples( examples=[ ["samples/darkspot.jpg"], ["samples/droughtfruits.jpg"], ["samples/fruitrot.png"], ["samples/healthyleaf.jpg"], ["samples/leafmildew.png"], ["samples/ripefruits.jpg"], ["samples/unripefruit.jpg"], ], inputs=[image_in], label="Sample strawberry images", ) with gr.Column(scale=2): gr.Markdown("### 🍓 **Plant Health Caption**") caption_out = gr.Textbox(label="", lines=2, interactive=False, container=False) with gr.Tabs(elem_classes=["mode-tabs"]): with gr.TabItem("Structured Recommendation"): gr.Markdown("### **Analysis and Action Plan:**") rec_out = gr.Textbox( label="", lines=8, interactive=False, container=False, placeholder="Upload and Analyze an image to receive a structured recommendation here.", ) with gr.TabItem("Advisory Chat"): gr.Markdown("### **Interactive Advisory Chat**") chat_box = gr.Chatbot( height=300, label="Advisory Chat based on Image Analysis", ) with gr.Row(): chat_input = gr.Textbox( scale=4, placeholder="Ask a follow-up question about the plant's health or treatment...", show_label=False, ) chat_send_btn = gr.Button("Send", scale=1, variant="secondary") with gr.Row(): with gr.Column(scale=7): gr.Markdown("") with gr.Column(scale=3): back_to_home_btn = gr.Button( "Back to Home Page", variant="primary", size="sm", ) run_btn.click( process_image_upload, inputs=[image_in], outputs=[hidden_out_image, caption_out, rag_state, rec_out, chat_history_state], ) chat_send_btn.click( get_rag_chat_response, inputs=[chat_input, chat_history_state, caption_out, rag_state], outputs=[chat_history_state, chat_box], ).then(lambda: "", inputs=None, outputs=[chat_input]) chat_input.submit( get_rag_chat_response, inputs=[chat_input, chat_history_state, caption_out, rag_state], outputs=[chat_history_state, chat_box], ).then(lambda: "", inputs=None, outputs=[chat_input]) def show_advisor(): return { landing_group: gr.update(visible=False), advisor_group: gr.update(visible=True), } def show_landing(): return { landing_group: gr.update(visible=True), advisor_group: gr.update(visible=False), } go_to_advisor_btn.click( show_advisor, outputs=[landing_group, advisor_group], ) back_to_home_btn.click( show_landing, outputs=[landing_group, advisor_group], ) if __name__ == "__main__": print("Starting app with Landing + Chat‑O‑Berry Advisor sections.") demo.launch()