import gradio as gr import json from llm.ollama_llm import query_ollama from llm.rag_pipeline import retrieve_context from logger import get_logger logger = get_logger(__name__) # Function to load latest sensor data def get_latest_sensor_data(path="data/farm_data_log.json", num_entries=3): try: with open(path, "r") as f: data = json.load(f) return data[-num_entries:] if data else [] except FileNotFoundError: logger.error(f"Sensor data file {path} not found.") return [] except json.JSONDecodeError as e: logger.error(f"Invalid JSON in {path}: {e}") return [] # Global query history query_history = [] def process_query(user_query): """Handles user query and returns response + updated history.""" if not user_query.strip(): return "Please enter a question.", "\n".join(format_history()) logger.info("User query: %s", user_query) try: # Prepare sensor data sensor_data_entries = get_latest_sensor_data() combined_sensor_data = { entry["timestamp"]: { "soil": entry["soil"], "water": entry["water"], "environment": entry["environment"] } for entry in sensor_data_entries } # Retrieve context and query LLM rag_context = retrieve_context(user_query) response = query_ollama(user_query, combined_sensor_data, rag_context) logger.info("--- FARM ASSISTANT RESPONSE ---") # Add to query history query_history.append((user_query, response)) return response, format_history() except Exception as e: logger.error(f"Query processing failed: {e}") return "Error: Could not process query. Please try again.", "\n".join(format_history()) def format_history(): """Format query history as text for display.""" lines = [] for i, (q, r) in enumerate(query_history[-5:], start=1): lines.append(f"### Query {i}\n**Q:** {q}\n**A:** {r}\n") return "\n\n".join(lines) def clear_history(): query_history.clear() return "", "" # Clears both output panels # Show latest sensor data as markdown def display_sensor_data(): sensor_data_entries = get_latest_sensor_data() if not sensor_data_entries: return "No sensor data available." latest_entry = sensor_data_entries[-1] text = f""" **Latest Reading: {latest_entry['timestamp']}** ### Soil - Moisture: {latest_entry['soil']['moisture']} - pH: {latest_entry['soil']['pH']} - Temperature: {latest_entry['soil']['temperature']} ### Water - pH: {latest_entry['water']['pH']} - Turbidity: {latest_entry['water']['turbidity']} - Temperature: {latest_entry['water']['temperature']} ### Environment - Humidity: {latest_entry['environment']['humidity']} - Temperature: {latest_entry['environment']['temperature']} - Rainfall: {latest_entry['environment']['rainfall']} """ return text # Gradio UI with gr.Blocks(theme=gr.themes.Soft(primary_hue="green")) as demo: gr.Markdown("# 🌾 AgriEdge: Smart Farm Assistant") gr.Markdown("Ask about your farm's conditions and get tailored advice based on sensor data.") with gr.Tab("Ask Assistant"): query = gr.Textbox( label="Enter your farm-related question", placeholder="e.g., What should I do about soil moisture?" ) submit_btn = gr.Button("Submit Query") clear_btn = gr.Button("Clear History") response_box = gr.Markdown() history_box = gr.Markdown() submit_btn.click(process_query, inputs=query, outputs=[response_box, history_box]) clear_btn.click(clear_history, inputs=None, outputs=[response_box, history_box]) with gr.Tab("Recent Sensor Data"): sensor_md = gr.Markdown(display_sensor_data()) demo.launch()