import streamlit as st import json from datetime import datetime import uuid import os import time import base64 from PIL import Image import io from main import call_agent as agent_invoker from main import DemoState def call_agent(query): """ Mock function - replace this with your actual call_agent function """ # This is a mock response for demonstration mock_response = [ {"id": 1, "task": "Fetch DEM data for the specified region", "tool": "DEMFetcher"}, {"id": 2, "task": "Extract drainage networks from available data", "tool": "DrainageExtractor"}, {"id": 3, "task": "Analyze hydrological flow patterns", "tool": "HydrologyAnalyzer"}, {"id": 4, "task": "Generate flood risk assessment maps", "tool": "LLM Reasoning"} ] state = agent_invoker(query) # Mock state object class MockState: def __init__(self): self.query = state.get('query', query) self.response = state.get('response', mock_response) self.output_files_path = state.get('output_files_path', [f"outputs/geospatial_plan_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}.json"]) return MockState() def get_image_base64(image_path): """Convert local image to base64 string for embedding in HTML""" try: if os.path.exists(image_path): with open(image_path, "rb") as img_file: return base64.b64encode(img_file.read()).decode() else: # Return a placeholder SVG if image doesn't exist placeholder_svg = f""" Image not found: {os.path.basename(image_path)} """ return base64.b64encode(placeholder_svg.encode()).decode() except Exception as e: # Return error placeholder error_svg = f""" Error loading image: {str(e)} """ return base64.b64encode(error_svg.encode()).decode() def create_placeholder_image(text, width=800, height=600, bg_color="#2196F3", text_color="#FFFFFF"): """Create a placeholder image with text""" try: img = Image.new('RGB', (width, height), bg_color) # Note: This creates a simple colored rectangle. For text rendering, you'd need PIL's ImageDraw # For now, we'll return a base64 encoded simple image buffer = io.BytesIO() img.save(buffer, format='PNG') return base64.b64encode(buffer.getvalue()).decode() except: # Fallback to SVG svg = f""" {text} """ return base64.b64encode(svg.encode()).decode() # Sample analysis data with local image paths # Update these paths to point to your actual local images SAMPLE_ANALYSES = [ { "image_path": "output/Chennai_dem_filled.png", # Update this path "caption": "Digital elevation model for CHENNAI,tamil nadu , India", "query": "Create a flood risk assessment map for Chennai during monsoon season" }, { "image_path": "output/chennai_impervious.png", # Update this path "caption": "Impervious Structure in chennai", "query": "Create a flood risk assessment map for Chennai during monsoon season" }, { "image_path": "output/chennai_flow_direction.png", # Update this path "caption": "Chennai Water flow direction", "query": "Create a flood risk assessment map for Chennai during monsoon season" }, { "image_path": "output/chennai_streams.png", # Update this path "caption": "Chennai Stream Flow", "query": "Create a flood risk assessment map for Chennai during monsoon season" }, { "image_path": "output/chennai_raods.png", # Update this path "caption": "Roads Pattern of Chennai", "query": "Create a flood risk assessment map for Chennai during monsoon season" } ] # Page configuration st.set_page_config( page_title="Geospatial AI Task Planner", page_icon="🌍", layout="wide", initial_sidebar_state="collapsed" ) # Custom CSS for modern design (same as before) st.markdown(""" """, unsafe_allow_html=True) # Initialize session state if 'messages' not in st.session_state: st.session_state.messages = [] if 'generated_json' not in st.session_state: st.session_state.generated_json = None if 'processing' not in st.session_state: st.session_state.processing = False if 'current_slide' not in st.session_state: st.session_state.current_slide = 0 if 'last_slide_change' not in st.session_state: st.session_state.last_slide_change = time.time() # Header with logos and title st.markdown("""

Bharatvarsha Hackathon 2025-26

Geospatial AI Task Planner

5: # 5 seconds st.session_state.current_slide = (st.session_state.current_slide + 1) % len(SAMPLE_ANALYSES) st.session_state.last_slide_change = current_time # Create two columns using st.columns col1, col2 = st.columns([1, 1], gap="medium") # Left column - Sample Analysis Slideshow with col1: st.markdown("""

Sample Analysis

""", unsafe_allow_html=True) # Display current query current_analysis = SAMPLE_ANALYSES[st.session_state.current_slide] st.markdown(f"""
Query:

{current_analysis['query']}

""", unsafe_allow_html=True) # Display the current slide image using Streamlit's native image display current_image_path = current_analysis['image_path'] try: if os.path.exists(current_image_path): # Use Streamlit's native image display for better performance st.image( current_image_path, caption=current_analysis['caption'], ) else: # Show a placeholder message when image is not found st.markdown(f"""

📁 Image not found

Please place your image at: {current_image_path}

Or update the image path in the SAMPLE_ANALYSES list

""", unsafe_allow_html=True) except Exception as e: st.markdown(f"""

⚠️ Error loading image

Path: {current_image_path}

Error: {str(e)}

""", unsafe_allow_html=True) # Navigation buttons _,col_prev, col_next = st.columns([0.5,0.8, 1]) with col_prev: if st.button("◀", key="prev_slide"): st.session_state.current_slide = (st.session_state.current_slide - 1) % len(SAMPLE_ANALYSES) st.session_state.last_slide_change = time.time() st.rerun() with col_next: if st.button("▶", key="next_slide"): st.session_state.current_slide = (st.session_state.current_slide + 1) % len(SAMPLE_ANALYSES) st.session_state.last_slide_change = time.time() st.rerun() # Right column - Chat section with col2: # Chat header st.markdown("""
💬 AI Task Planner Chat
""", unsafe_allow_html=True) # Display chat messages for message in st.session_state.messages: if message["role"] == "user": st.markdown(f"""
You: {message["content"]}
""", unsafe_allow_html=True) else: # AI message if message["content"] == "task_plan": # Render task plan using native Streamlit components st.markdown("""
🤖 AI Task Planner:
""", unsafe_allow_html=True) for task in message["tasks"]: st.markdown(f"""
Task {task['id']}: {task['task']}
{task['tool']}
""", unsafe_allow_html=True) st.markdown("""
📄 Task Plan JSON Generated Successfully!

View the complete JSON structure below and download it for your records.
""", unsafe_allow_html=True) else: # This is plain text content st.markdown(f"""
🤖 AI Task Planner:

{message["content"]}
""", unsafe_allow_html=True) # If no messages, show welcome message if not st.session_state.messages: st.markdown("""
🤖 Welcome to Geospatial AI Task Planner!

I'm here to help you break down complex geospatial analysis tasks into manageable steps.

Simply describe your geospatial analysis needs in the text area below, and I'll create a detailed task plan for you.
""", unsafe_allow_html=True) # Text input for query user_input = st.text_area( "Enter your geospatial analysis query:", height=120, placeholder="e.g., Create a flood risk assessment map for Mumbai during monsoon season...", key="user_input", disabled=st.session_state.processing ) # Submit button if st.button("🚀 Generate Task Plan", type="primary", use_container_width=True, disabled=st.session_state.processing): if user_input.strip(): # Set processing state st.session_state.processing = True # Add user message st.session_state.messages.append({"role": "user", "content": user_input}) # Show processing message with spinner with st.spinner("🔄 Generating your geospatial task plan..."): try: # Call your agent result = call_agent(user_input) # Create JSON structure task_plan_json = { "query": user_input, "timestamp": datetime.now().isoformat(), "task_id": str(uuid.uuid4()), "tasks": result.response, "output_files": result.output_files_path, "status": "completed" } # Store the tasks in session state for rendering st.session_state.generated_json = task_plan_json # Instead of storing HTML, store the task data st.session_state.messages.append({ "role": "assistant", "content": "task_plan", "tasks": result.response }) # Show success message st.success("✅ Task plan generated successfully!") except Exception as e: error_msg = f"❌ Error processing request: {str(e)}" st.session_state.messages.append({"role": "assistant", "content": error_msg}) st.error(error_msg) st.session_state.generated_json = None # Reset processing state st.session_state.processing = False # Rerun to update the interface st.rerun() else: st.warning("Please enter a query before submitting.") # Display JSON and download button if JSON is generated if st.session_state.generated_json: st.markdown("### 📋 Generated Task Plan JSON") # Display JSON in a formatted container json_str = json.dumps(st.session_state.generated_json, indent=2) st.markdown(f'
{json_str}
', unsafe_allow_html=True) # Download button filename = f"geospatial_task_plan_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" st.download_button( label="📥 Download Task Plan JSON", data=json_str, file_name=filename, mime="application/json", use_container_width=True ) # Clear chat button (at the bottom) if st.session_state.messages: col_clear1, col_clear2 = st.columns([1, 1]) with col_clear1: if st.button("🗑️ Clear Chat", key="clear_chat", use_container_width=True): st.session_state.messages = [] st.session_state.generated_json = None st.session_state.processing = False st.rerun() with col_clear2: if st.session_state.generated_json and st.button("🔄 Generate New Plan", key="new_plan", use_container_width=True): st.session_state.generated_json = None st.rerun()