167AliRaza's picture
Create app.py
df6b3ab verified
import os
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from langchain_openai import ChatOpenAI
from crewai import Agent, Task, Crew, LLM
from crewai_tools import SerperDevTool
from typing import List, Optional
import uvicorn
# Environment variables
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
SERPER_API_KEY = os.getenv("SERPER_API_KEY")
if not OPENROUTER_API_KEY:
raise ValueError("Missing OPENROUTER_API_KEY environment variable")
# Initialize FastAPI
app = FastAPI(
title="Construction AI Assistant",
description="Expert construction chatbot powered by DeepSeek R1",
version="1.0.0"
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# LLM configurations
crew_llm = LLM(
model="openrouter/deepseek/deepseek-r1",
base_url="https://openrouter.ai/api/v1",
api_key=OPENROUTER_API_KEY,
temperature=0.7
)
direct_llm = ChatOpenAI(
model="deepseek/deepseek-r1",
openai_api_key=OPENROUTER_API_KEY,
openai_api_base="https://openrouter.ai/api/v1",
temperature=0.7,
max_tokens=2000
)
# Pydantic models
class ChatMessage(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
message: str
session_id: Optional[str] = "default"
class ChatResponse(BaseModel):
response: str
session_id: str
memory_count: int
search_enabled: bool
class ClearRequest(BaseModel):
session_id: Optional[str] = "default"
class ConstructionChatbot:
def __init__(self):
self.sessions = {} # Store memory per session
self.setup_tools()
self.setup_crew()
def get_session_memory(self, session_id: str) -> List[tuple]:
"""Get or create session memory"""
if session_id not in self.sessions:
self.sessions[session_id] = []
return self.sessions[session_id]
def setup_tools(self):
"""Set up web search tools"""
try:
if SERPER_API_KEY:
self.search_tool = SerperDevTool()
print("✅ Web search tool initialized successfully")
else:
self.search_tool = None
print("⚠️ Warning: SERPER_API_KEY not set - web search disabled")
except Exception as e:
self.search_tool = None
print(f"⚠️ Warning: Could not initialize web search tool: {e}")
def setup_crew(self):
"""Set up CrewAI agents and tasks"""
tools = [self.search_tool] if self.search_tool else []
self.construction_agent = Agent(
role='Construction Expert Assistant',
goal='Provide accurate construction-related information ONLY. Reject all non-construction queries.',
backstory="""You are a specialized construction industry expert with deep knowledge in:
- Building safety and regulations
- Fire safety codes and compliance
- Construction materials and costs
- Project management methodologies
- Heavy machinery and equipment
- Civil engineering principles
- Structural design and analysis
- Site management and safety protocols
IMPORTANT: You MUST ONLY respond to construction-related questions.
If a user asks about anything not related to construction, building,
engineering, safety, materials, or project management, you must respond
with EXACTLY: "I can only assist with construction-related queries. Please ask about building, safety, materials, project management, or engineering topics."
When you need current information about construction topics, use the search tool.""",
llm=crew_llm,
tools=tools,
verbose=True,
allow_delegation=False,
max_iter=3,
max_execution_time=45
)
if self.search_tool:
self.research_agent = Agent(
role='Construction Research Specialist',
goal='Search and gather current construction-related information from the internet ONLY',
backstory="""You are a specialized researcher focused exclusively on construction industry topics.
You search for the most current information about:
- Construction practices and regulations
- Building costs and material prices
- Safety standards and compliance requirements
- Industry trends and new technologies
- Engineering standards and best practices
You ONLY research construction-related topics. If asked to research non-construction
topics, decline politely and redirect to construction subjects.""",
llm=crew_llm,
tools=[self.search_tool],
verbose=True,
allow_delegation=False,
max_iter=2,
max_execution_time=30
)
else:
self.research_agent = None
def add_to_memory(self, session_id: str, user_query: str, response: str):
"""Add interaction to rolling memory window"""
memory = self.get_session_memory(session_id)
memory.append((user_query, response))
if len(memory) > 5:
memory.pop(0)
def get_chat_history(self, session_id: str) -> str:
"""Format chat history for prompt"""
memory = self.get_session_memory(session_id)
if not memory:
return "No previous conversation."
history = ""
for i, (user_msg, bot_msg) in enumerate(memory, 1):
history += f"Message {i}:\nUser: {user_msg}\nAssistant: {bot_msg}\n\n"
return history.strip()
def is_construction_related(self, query: str) -> bool:
"""Simple check if query is construction-related"""
construction_keywords = [
'construction', 'building', 'concrete', 'steel', 'foundation', 'safety',
'project management', 'engineering', 'structure', 'material', 'cost',
'regulation', 'fire safety', 'osha', 'machinery', 'equipment', 'site',
'contractor', 'cement', 'rebar', 'excavation', 'blueprint', 'architect',
'electrical', 'plumbing', 'hvac', 'roofing', 'insulation', 'drywall'
]
query_lower = query.lower()
return any(keyword in query_lower for keyword in construction_keywords)
def generate_response_with_crew(self, user_query: str, session_id: str) -> str:
"""Generate response using CrewAI with web search capabilities"""
if not self.is_construction_related(user_query):
response = "I can only assist with construction-related queries. Please ask about building, safety, materials, project management, or engineering topics."
self.add_to_memory(session_id, user_query, response)
return response
chat_history = self.get_chat_history(session_id)
try:
search_keywords = ['current', 'latest', 'recent', 'today', '2024', '2025', 'price', 'cost', 'regulation', 'new', 'trend']
needs_search = any(keyword in user_query.lower() for keyword in search_keywords)
if needs_search and self.research_agent:
research_task = Task(
description=f"""Search for current construction-related information about: {user_query}
Focus on finding:
- Latest construction industry data
- Current material prices and costs
- Recent regulations and safety updates
- New construction technologies and methods
- Industry trends and market information
Search query should be concise and focused on construction industry information.
""",
expected_output="Current, accurate construction industry information and data",
agent=self.research_agent
)
response_task = Task(
description=f"""Based on research findings and chat history, provide a comprehensive response to: {user_query}
Chat history: {chat_history}
Guidelines:
- Use the research data to provide accurate, current information
- Focus on construction industry expertise
- Provide practical, actionable advice
- Include specific details like prices, regulations, or technical specifications when available
- Structure the response clearly and professionally
""",
expected_output="Detailed, informative construction industry response with current data",
agent=self.construction_agent,
context=[research_task]
)
crew = Crew(
agents=[self.research_agent, self.construction_agent],
tasks=[research_task, response_task],
verbose=False
)
else:
response_task = Task(
description=f"""Provide expert construction advice for: {user_query}
Chat history: {chat_history}
Guidelines:
- Draw from your construction industry expertise
- Provide detailed, accurate information
- Include relevant safety considerations
- Suggest best practices and standards
- Structure the response professionally
""",
expected_output="Expert construction industry advice and information",
agent=self.construction_agent
)
crew = Crew(
agents=[self.construction_agent],
tasks=[response_task],
verbose=False
)
result = crew.kickoff()
response = str(result).strip()
if not response or len(response) < 10:
response = "I apologize, but I'm having trouble generating a proper response. Could you please rephrase your construction-related question?"
self.add_to_memory(session_id, user_query, response)
return response
except Exception as e:
print(f"CrewAI Error: {e}")
return self.generate_response_direct(user_query, session_id)
def generate_response_direct(self, user_query: str, session_id: str) -> str:
"""Fallback method using direct LLM with construction filtering"""
if not self.is_construction_related(user_query):
response = "I can only assist with construction-related queries. Please ask about building, safety, materials, project management, or engineering topics."
self.add_to_memory(session_id, user_query, response)
return response
chat_history = self.get_chat_history(session_id)
prompt = f"""You are a specialized construction industry AI assistant with expertise in building, safety, materials, project management, and engineering.
Chat history: {chat_history}
User question: {user_query}
Provide a detailed, professional response focusing on construction industry knowledge. Include specific information about safety standards, building codes, material specifications, cost estimates, or project management advice as relevant to the question.
Response:"""
try:
response = direct_llm.invoke(prompt)
if hasattr(response, 'content'):
response_text = response.content
else:
response_text = str(response)
self.add_to_memory(session_id, user_query, response_text)
return response_text
except Exception as e:
fallback_response = f"""I apologize, but I'm experiencing technical difficulties. However, I can still help with construction-related questions about safety, materials, project management, and engineering. Please try rephrasing your question.
Technical error: {str(e)[:100]}..."""
self.add_to_memory(session_id, user_query, fallback_response)
return fallback_response
def generate_response(self, user_query: str, session_id: str = "default") -> str:
"""Main response generation method"""
try:
return self.generate_response_with_crew(user_query, session_id)
except Exception as e:
print(f"Crew method failed, using direct method: {e}")
return self.generate_response_direct(user_query, session_id)
def clear_session(self, session_id: str):
"""Clear a specific session's memory"""
if session_id in self.sessions:
self.sessions[session_id].clear()
# Initialize chatbot
chatbot = ConstructionChatbot()
# API Endpoints
@app.get("/")
async def root():
"""Root endpoint with API information"""
return {
"message": "Construction AI Assistant API",
"version": "1.0.0",
"endpoints": {
"/chat": "POST - Send a message to the chatbot",
"/clear": "POST - Clear conversation history",
"/status": "GET - Check system status",
"/docs": "GET - Interactive API documentation"
}
}
@app.get("/status")
async def status():
"""Get system status"""
return {
"status": "online",
"model": "DeepSeek R1",
"web_search_enabled": chatbot.search_tool is not None,
"active_sessions": len(chatbot.sessions)
}
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""
Send a message to the construction chatbot
- **message**: Your construction-related question
- **session_id**: Optional session identifier for maintaining conversation context
"""
try:
if not request.message or not request.message.strip():
raise HTTPException(status_code=400, detail="Message cannot be empty")
response = chatbot.generate_response(request.message, request.session_id)
memory_count = len(chatbot.get_session_memory(request.session_id))
return ChatResponse(
response=response,
session_id=request.session_id,
memory_count=memory_count,
search_enabled=chatbot.search_tool is not None
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing request: {str(e)}")
@app.post("/clear")
async def clear_chat(request: ClearRequest):
"""
Clear conversation history for a session
- **session_id**: Optional session identifier to clear (default: "default")
"""
try:
chatbot.clear_session(request.session_id)
return {
"message": f"Conversation history cleared for session: {request.session_id}",
"session_id": request.session_id
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error clearing session: {str(e)}")
if __name__ == "__main__":
print("🏗️ Starting Construction AI Assistant API...")
print(f"🧠 Model: DeepSeek R1")
print(f"🌐 Web Search: {'ENABLED' if chatbot.search_tool else 'DISABLED'}")
print(f"📡 Server starting at: http://localhost:8000")
print(f"📚 API Docs available at: http://localhost:8000/docs")
uvicorn.run(app, host="0.0.0.0", port=8000)