import os from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from langchain_openai import ChatOpenAI from crewai import Agent, Task, Crew, LLM from crewai_tools import SerperDevTool from typing import List, Optional import uvicorn # Environment variables OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") SERPER_API_KEY = os.getenv("SERPER_API_KEY") if not OPENROUTER_API_KEY: raise ValueError("Missing OPENROUTER_API_KEY environment variable") # Initialize FastAPI app = FastAPI( title="Construction AI Assistant", description="Expert construction chatbot powered by DeepSeek R1", version="1.0.0" ) # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # LLM configurations crew_llm = LLM( model="openrouter/deepseek/deepseek-r1", base_url="https://openrouter.ai/api/v1", api_key=OPENROUTER_API_KEY, temperature=0.7 ) direct_llm = ChatOpenAI( model="deepseek/deepseek-r1", openai_api_key=OPENROUTER_API_KEY, openai_api_base="https://openrouter.ai/api/v1", temperature=0.7, max_tokens=2000 ) # Pydantic models class ChatMessage(BaseModel): role: str content: str class ChatRequest(BaseModel): message: str session_id: Optional[str] = "default" class ChatResponse(BaseModel): response: str session_id: str memory_count: int search_enabled: bool class ClearRequest(BaseModel): session_id: Optional[str] = "default" class ConstructionChatbot: def __init__(self): self.sessions = {} # Store memory per session self.setup_tools() self.setup_crew() def get_session_memory(self, session_id: str) -> List[tuple]: """Get or create session memory""" if session_id not in self.sessions: self.sessions[session_id] = [] return self.sessions[session_id] def setup_tools(self): """Set up web search tools""" try: if SERPER_API_KEY: self.search_tool = SerperDevTool() print("✅ Web search tool initialized successfully") else: self.search_tool = None print("⚠️ Warning: SERPER_API_KEY not set - web search disabled") except Exception as e: self.search_tool = None print(f"⚠️ Warning: Could not initialize web search tool: {e}") def setup_crew(self): """Set up CrewAI agents and tasks""" tools = [self.search_tool] if self.search_tool else [] self.construction_agent = Agent( role='Construction Expert Assistant', goal='Provide accurate construction-related information ONLY. Reject all non-construction queries.', backstory="""You are a specialized construction industry expert with deep knowledge in: - Building safety and regulations - Fire safety codes and compliance - Construction materials and costs - Project management methodologies - Heavy machinery and equipment - Civil engineering principles - Structural design and analysis - Site management and safety protocols IMPORTANT: You MUST ONLY respond to construction-related questions. If a user asks about anything not related to construction, building, engineering, safety, materials, or project management, you must respond with EXACTLY: "I can only assist with construction-related queries. Please ask about building, safety, materials, project management, or engineering topics." When you need current information about construction topics, use the search tool.""", llm=crew_llm, tools=tools, verbose=True, allow_delegation=False, max_iter=3, max_execution_time=45 ) if self.search_tool: self.research_agent = Agent( role='Construction Research Specialist', goal='Search and gather current construction-related information from the internet ONLY', backstory="""You are a specialized researcher focused exclusively on construction industry topics. You search for the most current information about: - Construction practices and regulations - Building costs and material prices - Safety standards and compliance requirements - Industry trends and new technologies - Engineering standards and best practices You ONLY research construction-related topics. If asked to research non-construction topics, decline politely and redirect to construction subjects.""", llm=crew_llm, tools=[self.search_tool], verbose=True, allow_delegation=False, max_iter=2, max_execution_time=30 ) else: self.research_agent = None def add_to_memory(self, session_id: str, user_query: str, response: str): """Add interaction to rolling memory window""" memory = self.get_session_memory(session_id) memory.append((user_query, response)) if len(memory) > 5: memory.pop(0) def get_chat_history(self, session_id: str) -> str: """Format chat history for prompt""" memory = self.get_session_memory(session_id) if not memory: return "No previous conversation." history = "" for i, (user_msg, bot_msg) in enumerate(memory, 1): history += f"Message {i}:\nUser: {user_msg}\nAssistant: {bot_msg}\n\n" return history.strip() def is_construction_related(self, query: str) -> bool: """Simple check if query is construction-related""" construction_keywords = [ 'construction', 'building', 'concrete', 'steel', 'foundation', 'safety', 'project management', 'engineering', 'structure', 'material', 'cost', 'regulation', 'fire safety', 'osha', 'machinery', 'equipment', 'site', 'contractor', 'cement', 'rebar', 'excavation', 'blueprint', 'architect', 'electrical', 'plumbing', 'hvac', 'roofing', 'insulation', 'drywall' ] query_lower = query.lower() return any(keyword in query_lower for keyword in construction_keywords) def generate_response_with_crew(self, user_query: str, session_id: str) -> str: """Generate response using CrewAI with web search capabilities""" if not self.is_construction_related(user_query): response = "I can only assist with construction-related queries. Please ask about building, safety, materials, project management, or engineering topics." self.add_to_memory(session_id, user_query, response) return response chat_history = self.get_chat_history(session_id) try: search_keywords = ['current', 'latest', 'recent', 'today', '2024', '2025', 'price', 'cost', 'regulation', 'new', 'trend'] needs_search = any(keyword in user_query.lower() for keyword in search_keywords) if needs_search and self.research_agent: research_task = Task( description=f"""Search for current construction-related information about: {user_query} Focus on finding: - Latest construction industry data - Current material prices and costs - Recent regulations and safety updates - New construction technologies and methods - Industry trends and market information Search query should be concise and focused on construction industry information. """, expected_output="Current, accurate construction industry information and data", agent=self.research_agent ) response_task = Task( description=f"""Based on research findings and chat history, provide a comprehensive response to: {user_query} Chat history: {chat_history} Guidelines: - Use the research data to provide accurate, current information - Focus on construction industry expertise - Provide practical, actionable advice - Include specific details like prices, regulations, or technical specifications when available - Structure the response clearly and professionally """, expected_output="Detailed, informative construction industry response with current data", agent=self.construction_agent, context=[research_task] ) crew = Crew( agents=[self.research_agent, self.construction_agent], tasks=[research_task, response_task], verbose=False ) else: response_task = Task( description=f"""Provide expert construction advice for: {user_query} Chat history: {chat_history} Guidelines: - Draw from your construction industry expertise - Provide detailed, accurate information - Include relevant safety considerations - Suggest best practices and standards - Structure the response professionally """, expected_output="Expert construction industry advice and information", agent=self.construction_agent ) crew = Crew( agents=[self.construction_agent], tasks=[response_task], verbose=False ) result = crew.kickoff() response = str(result).strip() if not response or len(response) < 10: response = "I apologize, but I'm having trouble generating a proper response. Could you please rephrase your construction-related question?" self.add_to_memory(session_id, user_query, response) return response except Exception as e: print(f"CrewAI Error: {e}") return self.generate_response_direct(user_query, session_id) def generate_response_direct(self, user_query: str, session_id: str) -> str: """Fallback method using direct LLM with construction filtering""" if not self.is_construction_related(user_query): response = "I can only assist with construction-related queries. Please ask about building, safety, materials, project management, or engineering topics." self.add_to_memory(session_id, user_query, response) return response chat_history = self.get_chat_history(session_id) prompt = f"""You are a specialized construction industry AI assistant with expertise in building, safety, materials, project management, and engineering. Chat history: {chat_history} User question: {user_query} Provide a detailed, professional response focusing on construction industry knowledge. Include specific information about safety standards, building codes, material specifications, cost estimates, or project management advice as relevant to the question. Response:""" try: response = direct_llm.invoke(prompt) if hasattr(response, 'content'): response_text = response.content else: response_text = str(response) self.add_to_memory(session_id, user_query, response_text) return response_text except Exception as e: fallback_response = f"""I apologize, but I'm experiencing technical difficulties. However, I can still help with construction-related questions about safety, materials, project management, and engineering. Please try rephrasing your question. Technical error: {str(e)[:100]}...""" self.add_to_memory(session_id, user_query, fallback_response) return fallback_response def generate_response(self, user_query: str, session_id: str = "default") -> str: """Main response generation method""" try: return self.generate_response_with_crew(user_query, session_id) except Exception as e: print(f"Crew method failed, using direct method: {e}") return self.generate_response_direct(user_query, session_id) def clear_session(self, session_id: str): """Clear a specific session's memory""" if session_id in self.sessions: self.sessions[session_id].clear() # Initialize chatbot chatbot = ConstructionChatbot() # API Endpoints @app.get("/") async def root(): """Root endpoint with API information""" return { "message": "Construction AI Assistant API", "version": "1.0.0", "endpoints": { "/chat": "POST - Send a message to the chatbot", "/clear": "POST - Clear conversation history", "/status": "GET - Check system status", "/docs": "GET - Interactive API documentation" } } @app.get("/status") async def status(): """Get system status""" return { "status": "online", "model": "DeepSeek R1", "web_search_enabled": chatbot.search_tool is not None, "active_sessions": len(chatbot.sessions) } @app.post("/chat", response_model=ChatResponse) async def chat(request: ChatRequest): """ Send a message to the construction chatbot - **message**: Your construction-related question - **session_id**: Optional session identifier for maintaining conversation context """ try: if not request.message or not request.message.strip(): raise HTTPException(status_code=400, detail="Message cannot be empty") response = chatbot.generate_response(request.message, request.session_id) memory_count = len(chatbot.get_session_memory(request.session_id)) return ChatResponse( response=response, session_id=request.session_id, memory_count=memory_count, search_enabled=chatbot.search_tool is not None ) except Exception as e: raise HTTPException(status_code=500, detail=f"Error processing request: {str(e)}") @app.post("/clear") async def clear_chat(request: ClearRequest): """ Clear conversation history for a session - **session_id**: Optional session identifier to clear (default: "default") """ try: chatbot.clear_session(request.session_id) return { "message": f"Conversation history cleared for session: {request.session_id}", "session_id": request.session_id } except Exception as e: raise HTTPException(status_code=500, detail=f"Error clearing session: {str(e)}") if __name__ == "__main__": print("🏗️ Starting Construction AI Assistant API...") print(f"🧠 Model: DeepSeek R1") print(f"🌐 Web Search: {'ENABLED' if chatbot.search_tool else 'DISABLED'}") print(f"📡 Server starting at: http://localhost:8000") print(f"📚 API Docs available at: http://localhost:8000/docs") uvicorn.run(app, host="0.0.0.0", port=8000)