from fastapi import FastAPI, File, UploadFile import numpy as np from PIL import Image import io import tensorflow as tf from docx import Document import os import google.generativeai as genai from pydantic import BaseModel from ultralytics import YOLO import matplotlib.pyplot as plt from io import BytesIO import base64 from langchain.document_loaders import TextLoader # Or a custom loader for .docs from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_google_genai import GoogleGenerativeAIEmbeddings from langchain.vectorstores import FAISS from langchain_chroma import Chroma from langchain.chains import RetrievalQA from langchain_google_genai import ChatGoogleGenerativeAI # Import the GoogleGenerativeAI class from langchain.prompts import PromptTemplate from langchain.chains import RetrievalQA from langchain.chains import LLMChain import pandas as pd import numpy as np from sklearn.preprocessing import LabelEncoder from sklearn.svm import SVC import joblib model = tf.keras.models.load_model('946_.keras', compile=False) modelPalm = tf.keras.models.load_model('palm_model.h5', compile=False) file_path = "Algeria Plant Disease Treatment Plan.docx" def docx_to_knowledge_base(file_path): try: doc = Document(file_path) except Exception as e: print(f"Error reading file: {e}") # 1. Extract paragraphs paragraphs = [p.text.strip() for p in doc.paragraphs] # 2. Extract tables as sentences table_sentences = [] for table in doc.tables: rows = table.rows headers = [cell.text.strip() for cell in rows[0].cells] for row in rows[1:]: values = [cell.text.strip() for cell in row.cells] entry = ', '.join([f"{h}: {v}" for h, v in zip(headers, values)]) table_sentences.append(entry) # 3. Combine everything into one text block knowledge_text = "\n".join(paragraphs + table_sentences) return knowledge_text # Usage knowledge = docx_to_knowledge_base(file_path) # Configure Gemini API (replace with your actual API key) genai.configure(api_key=os.environ["GOOGLE_API_KEY"]) gemini_model_name = "gemini-2.0-flash" embedding_model_name = "models/embedding-001" # 2. Chunking text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100) docs = text_splitter.create_documents([knowledge]) # 3. Embedding embeddings = GoogleGenerativeAIEmbeddings(model=embedding_model_name) # 4. Vector Database Chorma_path = "/code/Chroma/" # Check if the directory exists if not os.path.exists(Chorma_path): try: os.makedirs(Chorma_path) print(f"Directory created: {Chorma_path}") except PermissionError: print(f"Permission denied: Unable to create directory {Chorma_path}") except Exception as e: print(f"Error creating directory {Chorma_path}: {e}") else: print(f"Directory already exists: {Chorma_path}") # Create a new empty Chroma DB (or load existing if path exists) db = Chroma( collection_name="plant_treatments", embedding_function=embeddings, persist_directory= Chorma_path # Local folder to persist data ) db.add_documents(docs) retriever = db.as_retriever() # 5. RAG Pipeline llm = ChatGoogleGenerativeAI(model=gemini_model_name, temperature=0.7) def create_prompt(disease_name, severity,language="english"): prompt = f""" for the next prompt you should answer with the following language and it is obligatory : {language} """ +""" You are an expert in plant disease treatment for a platform called Growth. Use the following context to answer the user's question. If you don't know the answer, try to find the answer from the context try to make similair answer to the context. If the question is not related to the context, Generate from your exsisting knowlodge about the context. When responding dont start your answer by saying Ok i will, or Yes i will give you, Make it like you are an Assistant and you generate solutions directly. Context: {context} Question: {question} Given a plant disease and its severity, suggest: 1. what is Organic treatment and how it will help Organic treatment 2. what is Chemical product and how it will help Chemical product 3. Application schedule plan of the treatment (explain in bullet points the schedule in simple/detailed/understandable way) """+f""" Now here is a new case: Disease: {disease_name}, Severity: {severity} - Organic:""" return prompt def create_prompt_chat(question,language): prompt = f""" for the next prompt you should answer with the following language and it is obligatory : {language} """ +""" You are a chatbot called Growth, and you assist users with plant leaf disease identification and solutions. Use the following context to answer the user's question. If you don't know the answer, try to find the answer from the context try to make similair answer to the context. If the question is not related to the context, Generate from your exsisting knowlodge about the context. Context: {context} Answer the following question: """+f"""{question}""" return prompt # Load the saved model loaded_svm_model = joblib.load('svm_model.joblib') # Load the saved scaler loaded_scaler = joblib.load('scaler.joblib') # Load the saved label encoder loaded_label_encoder = joblib.load('label_encoder.joblib') Geomodel = tf.keras.models.load_model('keras_model.h5') def predict_crop_disease(input_data): # Load the saved model, scaler, and label encoder loaded_svm_model = joblib.load('svm_model.joblib') loaded_scaler = joblib.load('scaler.joblib') loaded_label_encoder = joblib.load('label_encoder.joblib') # Define the features columns features_columns = ['region', 'crop_type', 'soil_moisture_%', 'soil_pH', 'temperature_C', 'rainfall_mm', 'humidity_%', 'sunlight_hours', 'irrigation_type', 'fertilizer_type', 'pesticide_usage_ml', 'total_days', 'yield_kg_per_hectare', 'latitude', 'longitude', 'NDVI_index'] # Create a DataFrame from the input data input_df = pd.DataFrame([input_data],columns=features_columns) # Ensure the input DataFrame has the correct columns and order input_df = input_df[features_columns] # Encoding categorical features (using the loaded LabelEncoder) # Assuming these are the categorical columns and their corresponding mapping based on your training data categorical_cols = ['region', 'crop_type', 'irrigation_type', 'fertilizer_type'] # Define the mapping (replace with actual mapping from your training data) region_mapping = {'Central USA': 0, 'East Africa': 1, 'North India': 2, 'South India': 3, 'South USA': 4} crop_mapping = {'Cotton': 0, 'Maize': 1, 'Rice': 2, 'Soybean': 3, 'Wheat': 4} irrigation_mapping = {'Drip': 0, 'Manual': 1, 'Sprinkler': 2} fertilizer_mapping = {'Inorganic': 0, 'Mixed': 1, 'Organic': 2} for col in categorical_cols: if col == 'region': input_df[col] = input_df[col].map(region_mapping).fillna(0) # Handle missing values elif col == 'crop_type': input_df[col] = input_df[col].map(crop_mapping).fillna(0) # Handle missing values elif col == 'irrigation_type': input_df[col] = input_df[col].map(irrigation_mapping).fillna(0) # Handle missing values elif col == 'fertilizer_type': input_df[col] = input_df[col].map(fertilizer_mapping).fillna(0) # Handle missing values # Scale the input features input_scaled = loaded_scaler.transform(input_df.values.reshape(-1, 1)).reshape(1,-1) print(input_scaled.reshape(1,-1).shape) # Make predictions prediction = Geomodel.predict(input_scaled) # Inverse transform the prediction (if needed, depending on your model) #predicted_label = loaded_label_encoder.inverse_transform(prediction) return prediction[0] #predicted_label[0] def make_gradcam_heatmap(img_array, model, last_conv_layer_name, pred_index=None): grad_model = tf.keras.models.Model( [model.inputs], [model.get_layer(last_conv_layer_name).output, model.output] ) with tf.GradientTape() as tape: last_conv_layer_output, preds = grad_model(img_array) if pred_index is None: pred_index = tf.argmax(preds[0]) class_channel = preds[:, pred_index] grads = tape.gradient(class_channel, last_conv_layer_output) pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2)) last_conv_layer_output = last_conv_layer_output[0] heatmap = last_conv_layer_output @ pooled_grads[..., tf.newaxis] heatmap = tf.squeeze(heatmap) heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap) return heatmap.numpy() def display_gradcam(img, heatmap, alpha=0.4): # Create heatmap from the given heatmap values heatmap = np.uint8(255 * heatmap) jet = plt.cm.get_cmap("jet") jet_colors = jet(np.arange(256))[:, :3] jet_heatmap = jet_colors[heatmap] jet_heatmap = tf.keras.preprocessing.image.array_to_img(jet_heatmap) jet_heatmap = jet_heatmap.resize((img.shape[1], img.shape[0])) jet_heatmap = tf.keras.preprocessing.image.img_to_array(jet_heatmap) # Superimpose the heatmap onto the original image superimposed_img = jet_heatmap * alpha + img superimposed_img = tf.keras.preprocessing.image.array_to_img(superimposed_img) # Save the image to a BytesIO object instead of showing it with plt.imshow() img_byte_arr = BytesIO() superimposed_img.save(img_byte_arr, format='PNG') img_byte_arr = img_byte_arr.getvalue() # Return the image as base64 return base64.b64encode(img_byte_arr).decode('utf-8') def calculate_activation_ratio(heatmap, threshold=0.2): """Calculates the ratio of activated to non-activated pixels in a heatmap. Args: heatmap: A NumPy array representing the Grad-CAM heatmap. threshold: The threshold for classifying pixels as activated or not. Returns: The ratio of activated pixels to non-activated pixels. """ activated_pixels = np.sum(heatmap > threshold) total_pixels = heatmap.size non_activated_pixels = total_pixels - activated_pixels if non_activated_pixels == 0: return 1.0 # Avoid division by zero if all pixels are activated return activated_pixels / non_activated_pixels # Example usage within the existing code (assuming heatmap is calculated as before): last_conv_layer_name = "block3_conv2" app = FastAPI() @app.post("/classify") async def classify(image: UploadFile = File(...)): if image is not None: img = Image.open(io.BytesIO(await image.read())) img = img.resize((64,64)) img_array = np.array(img) / 255.0 img_array = np.expand_dims(img_array, axis=0) predictions = model.predict(img_array) predicted_class_idx = np.argmax(predictions) predicted_class_idx = int(predicted_class_idx) heatmap = make_gradcam_heatmap(img_array, model, last_conv_layer_name) base64_image = display_gradcam( np.array(img), heatmap) # Return the base64 encoded image in the response # Calculate and print the activation ratio ratio = calculate_activation_ratio(heatmap) return {"prediction": predicted_class_idx,"gradcam": base64_image,"ration":ratio} else: return {"error": "No image provided"} @app.post("/palmclassify") async def palmclassify(image: UploadFile = File(...)): if image is not None: img = Image.open(io.BytesIO(await image.read())) img = img.resize((64,64)) img_array = np.array(img) / 255.0 img_array = np.expand_dims(img_array, axis=0) predictions = modelPalm.predict(img_array) predicted_class_idx = np.argmax(predictions) predicted_class_idx = int(predicted_class_idx) last_mb = "Conv_1" heatmap = make_gradcam_heatmap(img_array, modelPalm, last_mb) base64_image = display_gradcam( np.array(img), heatmap) # Return the base64 encoded image in the response # Calculate and print the activation ratio ratio = calculate_activation_ratio(heatmap) return {"prediction": predicted_class_idx,"gradcam": base64_image,"ration":ratio} else: return {"error": "No image provided"} yolomodel = YOLO("yolo11m.pt") @app.post("/multiclassify") async def classify(image: UploadFile = File(...)): if image is not None: img = Image.open(io.BytesIO(await image.read())) results = yolomodel(img) output=[] for i, box in enumerate(results[0].boxes): # Extract box coordinates x1, y1, x2, y2 = box.xyxy[0].tolist() confidence = box.conf[0].item() class_id = box.cls[0].item() # Crop the image based on bounding box cropped_image = results[0].orig_img[int(y1):int(y2), int(x1):int(x2)] cropped_image = Image.fromarray(cropped_image) cropped_image = cropped_image.resize((64,64)) img_array = np.array(cropped_image) / 255.0 img_array = np.expand_dims(img_array, axis=0) predictions = model.predict(img_array) predicted_class_idx = np.argmax(predictions) predicted_class_idx = int(predicted_class_idx) print("HIIIIIIIIIIIIIIIIII") output.append({ "box": {"x1": x1, "y1": y1, "x2": x2, "y2": y2}, "confidence": round(confidence, 2), "predicted_class": predicted_class_idx }) return {"output": output} else: return {"error": "No image provided"} class DiseaseQuery(BaseModel): disease: str severity: str # "normal" or "severe" language: str @app.post("/RAG") async def rag_classify(query: DiseaseQuery): try: prompt_template = create_prompt(query.disease, query.severity,query.language) prompt = PromptTemplate( input_variables=["context", "question"], template=prompt_template, ) qa = RetrievalQA.from_chain_type( llm=llm, chain_type="stuff", retriever=retriever, chain_type_kwargs={"prompt": prompt}, verbose=True, ) final_query = f"What is the best treatment plan for {query.disease} in a {query.severity} case?" result = qa.run(final_query) return {"answer": result} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) class ChatQuery(BaseModel): question: str language: str @app.post("/RAGChat") async def rag_chat(query: ChatQuery): try: prompt_template = create_prompt_chat(query.question,query.language) prompt = PromptTemplate( input_variables=["context", "question"], template=prompt_template, ) qa = RetrievalQA.from_chain_type( llm=llm, chain_type="stuff", retriever=retriever, chain_type_kwargs={"prompt": prompt}, verbose=True, ) final_query = f"Answer the Provided questino like a human, and remember the chat history {query.question}" result = qa.run(final_query) return {"answer": result} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) class GeoSenQuery(BaseModel): region: str crop_type: str soil_moisture: float soil_pH: float temperature_C: float rainfall_mm: float humidity: float sunlight_hours: float irrigation_type: str fertilizer_type: str pesticide_usage_ml: float total_days: int yield_kg_per_hectare: float latitude: float longitude: float NDVI_index: float @app.post("/GeoSensor") async def geo_sensor_classify(query: GeoSenQuery): try: logits = predict_crop_disease([query.region,query.crop_type,query.soil_moisture,query.soil_pH,query.temperature_C, query.rainfall_mm,query.humidity,query.sunlight_hours,query.irrigation_type,query.fertilizer_type, query.pesticide_usage_ml,query.total_days,query.yield_kg_per_hectare,query.latitude,query.longitude, query.NDVI_index]) return { "answer": int(np.argmax(logits)), "logits": logits.tolist() if isinstance(logits, np.ndarray) else float(logits) } except Exception as e: raise HTTPException(status_code=500, detail=str(e))