Spaces:
Sleeping
Sleeping
| import os | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from langchain_openai import ChatOpenAI | |
| from crewai import Agent, Task, Crew, LLM | |
| from crewai_tools import SerperDevTool | |
| from typing import List, Optional | |
| import uvicorn | |
| # Environment variables | |
| OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") | |
| SERPER_API_KEY = os.getenv("SERPER_API_KEY") | |
| if not OPENROUTER_API_KEY: | |
| raise ValueError("Missing OPENROUTER_API_KEY environment variable") | |
| # Initialize FastAPI | |
| app = FastAPI( | |
| title="Construction AI Assistant", | |
| description="Expert construction chatbot powered by DeepSeek R1", | |
| version="1.0.0" | |
| ) | |
| # CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # LLM configurations | |
| crew_llm = LLM( | |
| model="openrouter/deepseek/deepseek-r1", | |
| base_url="https://openrouter.ai/api/v1", | |
| api_key=OPENROUTER_API_KEY, | |
| temperature=0.7 | |
| ) | |
| direct_llm = ChatOpenAI( | |
| model="deepseek/deepseek-r1", | |
| openai_api_key=OPENROUTER_API_KEY, | |
| openai_api_base="https://openrouter.ai/api/v1", | |
| temperature=0.7, | |
| max_tokens=2000 | |
| ) | |
| # Pydantic models | |
| class ChatMessage(BaseModel): | |
| role: str | |
| content: str | |
| class ChatRequest(BaseModel): | |
| message: str | |
| session_id: Optional[str] = "default" | |
| class ChatResponse(BaseModel): | |
| response: str | |
| session_id: str | |
| memory_count: int | |
| search_enabled: bool | |
| class ClearRequest(BaseModel): | |
| session_id: Optional[str] = "default" | |
| class ConstructionChatbot: | |
| def __init__(self): | |
| self.sessions = {} # Store memory per session | |
| self.setup_tools() | |
| self.setup_crew() | |
| def get_session_memory(self, session_id: str) -> List[tuple]: | |
| """Get or create session memory""" | |
| if session_id not in self.sessions: | |
| self.sessions[session_id] = [] | |
| return self.sessions[session_id] | |
| def setup_tools(self): | |
| """Set up web search tools""" | |
| try: | |
| if SERPER_API_KEY: | |
| self.search_tool = SerperDevTool() | |
| print("✅ Web search tool initialized successfully") | |
| else: | |
| self.search_tool = None | |
| print("⚠️ Warning: SERPER_API_KEY not set - web search disabled") | |
| except Exception as e: | |
| self.search_tool = None | |
| print(f"⚠️ Warning: Could not initialize web search tool: {e}") | |
| def setup_crew(self): | |
| """Set up CrewAI agents and tasks""" | |
| tools = [self.search_tool] if self.search_tool else [] | |
| self.construction_agent = Agent( | |
| role='Construction Expert Assistant', | |
| goal='Provide accurate construction-related information ONLY. Reject all non-construction queries.', | |
| backstory="""You are a specialized construction industry expert with deep knowledge in: | |
| - Building safety and regulations | |
| - Fire safety codes and compliance | |
| - Construction materials and costs | |
| - Project management methodologies | |
| - Heavy machinery and equipment | |
| - Civil engineering principles | |
| - Structural design and analysis | |
| - Site management and safety protocols | |
| IMPORTANT: You MUST ONLY respond to construction-related questions. | |
| If a user asks about anything not related to construction, building, | |
| engineering, safety, materials, or project management, you must respond | |
| with EXACTLY: "I can only assist with construction-related queries. Please ask about building, safety, materials, project management, or engineering topics." | |
| When you need current information about construction topics, use the search tool.""", | |
| llm=crew_llm, | |
| tools=tools, | |
| verbose=True, | |
| allow_delegation=False, | |
| max_iter=3, | |
| max_execution_time=45 | |
| ) | |
| if self.search_tool: | |
| self.research_agent = Agent( | |
| role='Construction Research Specialist', | |
| goal='Search and gather current construction-related information from the internet ONLY', | |
| backstory="""You are a specialized researcher focused exclusively on construction industry topics. | |
| You search for the most current information about: | |
| - Construction practices and regulations | |
| - Building costs and material prices | |
| - Safety standards and compliance requirements | |
| - Industry trends and new technologies | |
| - Engineering standards and best practices | |
| You ONLY research construction-related topics. If asked to research non-construction | |
| topics, decline politely and redirect to construction subjects.""", | |
| llm=crew_llm, | |
| tools=[self.search_tool], | |
| verbose=True, | |
| allow_delegation=False, | |
| max_iter=2, | |
| max_execution_time=30 | |
| ) | |
| else: | |
| self.research_agent = None | |
| def add_to_memory(self, session_id: str, user_query: str, response: str): | |
| """Add interaction to rolling memory window""" | |
| memory = self.get_session_memory(session_id) | |
| memory.append((user_query, response)) | |
| if len(memory) > 5: | |
| memory.pop(0) | |
| def get_chat_history(self, session_id: str) -> str: | |
| """Format chat history for prompt""" | |
| memory = self.get_session_memory(session_id) | |
| if not memory: | |
| return "No previous conversation." | |
| history = "" | |
| for i, (user_msg, bot_msg) in enumerate(memory, 1): | |
| history += f"Message {i}:\nUser: {user_msg}\nAssistant: {bot_msg}\n\n" | |
| return history.strip() | |
| def is_construction_related(self, query: str) -> bool: | |
| """Simple check if query is construction-related""" | |
| construction_keywords = [ | |
| 'construction', 'building', 'concrete', 'steel', 'foundation', 'safety', | |
| 'project management', 'engineering', 'structure', 'material', 'cost', | |
| 'regulation', 'fire safety', 'osha', 'machinery', 'equipment', 'site', | |
| 'contractor', 'cement', 'rebar', 'excavation', 'blueprint', 'architect', | |
| 'electrical', 'plumbing', 'hvac', 'roofing', 'insulation', 'drywall' | |
| ] | |
| query_lower = query.lower() | |
| return any(keyword in query_lower for keyword in construction_keywords) | |
| def generate_response_with_crew(self, user_query: str, session_id: str) -> str: | |
| """Generate response using CrewAI with web search capabilities""" | |
| if not self.is_construction_related(user_query): | |
| response = "I can only assist with construction-related queries. Please ask about building, safety, materials, project management, or engineering topics." | |
| self.add_to_memory(session_id, user_query, response) | |
| return response | |
| chat_history = self.get_chat_history(session_id) | |
| try: | |
| search_keywords = ['current', 'latest', 'recent', 'today', '2024', '2025', 'price', 'cost', 'regulation', 'new', 'trend'] | |
| needs_search = any(keyword in user_query.lower() for keyword in search_keywords) | |
| if needs_search and self.research_agent: | |
| research_task = Task( | |
| description=f"""Search for current construction-related information about: {user_query} | |
| Focus on finding: | |
| - Latest construction industry data | |
| - Current material prices and costs | |
| - Recent regulations and safety updates | |
| - New construction technologies and methods | |
| - Industry trends and market information | |
| Search query should be concise and focused on construction industry information. | |
| """, | |
| expected_output="Current, accurate construction industry information and data", | |
| agent=self.research_agent | |
| ) | |
| response_task = Task( | |
| description=f"""Based on research findings and chat history, provide a comprehensive response to: {user_query} | |
| Chat history: {chat_history} | |
| Guidelines: | |
| - Use the research data to provide accurate, current information | |
| - Focus on construction industry expertise | |
| - Provide practical, actionable advice | |
| - Include specific details like prices, regulations, or technical specifications when available | |
| - Structure the response clearly and professionally | |
| """, | |
| expected_output="Detailed, informative construction industry response with current data", | |
| agent=self.construction_agent, | |
| context=[research_task] | |
| ) | |
| crew = Crew( | |
| agents=[self.research_agent, self.construction_agent], | |
| tasks=[research_task, response_task], | |
| verbose=False | |
| ) | |
| else: | |
| response_task = Task( | |
| description=f"""Provide expert construction advice for: {user_query} | |
| Chat history: {chat_history} | |
| Guidelines: | |
| - Draw from your construction industry expertise | |
| - Provide detailed, accurate information | |
| - Include relevant safety considerations | |
| - Suggest best practices and standards | |
| - Structure the response professionally | |
| """, | |
| expected_output="Expert construction industry advice and information", | |
| agent=self.construction_agent | |
| ) | |
| crew = Crew( | |
| agents=[self.construction_agent], | |
| tasks=[response_task], | |
| verbose=False | |
| ) | |
| result = crew.kickoff() | |
| response = str(result).strip() | |
| if not response or len(response) < 10: | |
| response = "I apologize, but I'm having trouble generating a proper response. Could you please rephrase your construction-related question?" | |
| self.add_to_memory(session_id, user_query, response) | |
| return response | |
| except Exception as e: | |
| print(f"CrewAI Error: {e}") | |
| return self.generate_response_direct(user_query, session_id) | |
| def generate_response_direct(self, user_query: str, session_id: str) -> str: | |
| """Fallback method using direct LLM with construction filtering""" | |
| if not self.is_construction_related(user_query): | |
| response = "I can only assist with construction-related queries. Please ask about building, safety, materials, project management, or engineering topics." | |
| self.add_to_memory(session_id, user_query, response) | |
| return response | |
| chat_history = self.get_chat_history(session_id) | |
| prompt = f"""You are a specialized construction industry AI assistant with expertise in building, safety, materials, project management, and engineering. | |
| Chat history: {chat_history} | |
| User question: {user_query} | |
| Provide a detailed, professional response focusing on construction industry knowledge. Include specific information about safety standards, building codes, material specifications, cost estimates, or project management advice as relevant to the question. | |
| Response:""" | |
| try: | |
| response = direct_llm.invoke(prompt) | |
| if hasattr(response, 'content'): | |
| response_text = response.content | |
| else: | |
| response_text = str(response) | |
| self.add_to_memory(session_id, user_query, response_text) | |
| return response_text | |
| except Exception as e: | |
| fallback_response = f"""I apologize, but I'm experiencing technical difficulties. However, I can still help with construction-related questions about safety, materials, project management, and engineering. Please try rephrasing your question. | |
| Technical error: {str(e)[:100]}...""" | |
| self.add_to_memory(session_id, user_query, fallback_response) | |
| return fallback_response | |
| def generate_response(self, user_query: str, session_id: str = "default") -> str: | |
| """Main response generation method""" | |
| try: | |
| return self.generate_response_with_crew(user_query, session_id) | |
| except Exception as e: | |
| print(f"Crew method failed, using direct method: {e}") | |
| return self.generate_response_direct(user_query, session_id) | |
| def clear_session(self, session_id: str): | |
| """Clear a specific session's memory""" | |
| if session_id in self.sessions: | |
| self.sessions[session_id].clear() | |
| # Initialize chatbot | |
| chatbot = ConstructionChatbot() | |
| # API Endpoints | |
| async def root(): | |
| """Root endpoint with API information""" | |
| return { | |
| "message": "Construction AI Assistant API", | |
| "version": "1.0.0", | |
| "endpoints": { | |
| "/chat": "POST - Send a message to the chatbot", | |
| "/clear": "POST - Clear conversation history", | |
| "/status": "GET - Check system status", | |
| "/docs": "GET - Interactive API documentation" | |
| } | |
| } | |
| async def status(): | |
| """Get system status""" | |
| return { | |
| "status": "online", | |
| "model": "DeepSeek R1", | |
| "web_search_enabled": chatbot.search_tool is not None, | |
| "active_sessions": len(chatbot.sessions) | |
| } | |
| async def chat(request: ChatRequest): | |
| """ | |
| Send a message to the construction chatbot | |
| - **message**: Your construction-related question | |
| - **session_id**: Optional session identifier for maintaining conversation context | |
| """ | |
| try: | |
| if not request.message or not request.message.strip(): | |
| raise HTTPException(status_code=400, detail="Message cannot be empty") | |
| response = chatbot.generate_response(request.message, request.session_id) | |
| memory_count = len(chatbot.get_session_memory(request.session_id)) | |
| return ChatResponse( | |
| response=response, | |
| session_id=request.session_id, | |
| memory_count=memory_count, | |
| search_enabled=chatbot.search_tool is not None | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error processing request: {str(e)}") | |
| async def clear_chat(request: ClearRequest): | |
| """ | |
| Clear conversation history for a session | |
| - **session_id**: Optional session identifier to clear (default: "default") | |
| """ | |
| try: | |
| chatbot.clear_session(request.session_id) | |
| return { | |
| "message": f"Conversation history cleared for session: {request.session_id}", | |
| "session_id": request.session_id | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error clearing session: {str(e)}") | |
| if __name__ == "__main__": | |
| print("🏗️ Starting Construction AI Assistant API...") | |
| print(f"🧠 Model: DeepSeek R1") | |
| print(f"🌐 Web Search: {'ENABLED' if chatbot.search_tool else 'DISABLED'}") | |
| print(f"📡 Server starting at: http://localhost:8000") | |
| print(f"📚 API Docs available at: http://localhost:8000/docs") | |
| uvicorn.run(app, host="0.0.0.0", port=8000) |