Smart-Helpdesk-AI-Agent / agent_langchain.py
Pulastya0's picture
Update agent_langchain.py
5bf6a85 verified
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"
os.environ["TRANSFORMERS_CACHE"] = "/tmp/transformers"
os.environ["HF_HOME"] = "/tmp/huggingface"
os.environ["SENTENCE_TRANSFORMERS_HOME"] = "/tmp/sentence_transformers"
os.environ["TORCH_HOME"] = "/tmp/torch"
import requests
import torch
import time
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.agents import AgentExecutor, create_react_agent
from langchain.tools import Tool
from langchain.prompts import PromptTemplate
import threading
from datetime import datetime
# Environment
GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
ROUTING_URL = os.environ.get("ROUTING_URL")
SPACE_URL = os.environ.get("SPACE_URL", "http://localhost:7860")
# Label Dictionary
LABEL_DICTIONARY = {
"I1": "Low Impact", "I2": "Medium Impact", "I3": "High Impact", "I4": "Critical Impact",
"U1": "Low Urgency", "U2": "Medium Urgency", "U3": "High Urgency", "U4": "Critical Urgency",
"T1": "Information", "T2": "Incident", "T3": "Problem", "T4": "Request", "T5": "Question"
}
# Classification Model
clf_model_name = "DavinciTech/BERT_Categorizer"
clf_tokenizer = AutoTokenizer.from_pretrained(clf_model_name, cache_dir="/tmp/transformers")
clf_model = AutoModelForSequenceClassification.from_pretrained(clf_model_name, cache_dir="/tmp/transformers")
def classify_ticket(text):
"""Classify ticket into Impact, Urgency, and Type."""
inputs = clf_tokenizer(text, return_tensors="pt", truncation=True)
outputs = clf_model(**inputs)
logits = outputs.logits[0]
impact_idx = torch.argmax(logits[:4]).item() + 1
urgency_idx = torch.argmax(logits[4:8]).item() + 1
type_idx = torch.argmax(logits[8:]).item() + 1
return {
"impact": LABEL_DICTIONARY[f"I{impact_idx}"],
"urgency": LABEL_DICTIONARY[f"U{urgency_idx}"],
"type": LABEL_DICTIONARY[f"T{type_idx}"]
}
# Routing Function
def call_routing(text, retries=3, delay=5):
"""Route ticket to appropriate department."""
url = ROUTING_URL if ROUTING_URL else f"{SPACE_URL}/route"
for attempt in range(retries):
try:
resp = requests.post(url, json={"text": text}, timeout=30)
resp.raise_for_status()
return resp.json().get("department", "General IT")
except Exception as e:
print(f"Routing attempt {attempt+1} failed: {e}")
if attempt < retries - 1:
time.sleep(delay)
return "General IT"
# Knowledge Base
CHROMA_PATH = "/tmp/chroma"
COLLECTION_NAME = "knowledge_base"
kb_collection = None
kb_lock = threading.Lock()
encoder = SentenceTransformer("all-MiniLM-L6-v2", cache_folder="/tmp/sentence_transformers")
def get_kb_collection():
global kb_collection
if kb_collection is None:
with kb_lock:
if kb_collection is None:
try:
chroma_client = chromadb.PersistentClient(
path=CHROMA_PATH,
settings=Settings(anonymized_telemetry=False, allow_reset=True)
)
kb_collection = chroma_client.get_or_create_collection(COLLECTION_NAME)
except Exception as e:
print(f"Could not get KB collection: {e}")
return kb_collection
def query_kb(text: str, top_k: int = 1):
"""Query KB and return answer with confidence."""
collection = get_kb_collection()
if not collection or collection.count() == 0:
return {"answer": None, "confidence": 0.0}
try:
query_embedding = encoder.encode([text])[0].tolist()
results = collection.query(
query_embeddings=[query_embedding],
n_results=top_k,
include=["documents", "distances", "metadatas"]
)
if not results or not results.get("documents") or len(results["documents"][0]) == 0:
return {"answer": None, "confidence": 0.0}
answer = results["documents"][0][0]
distance = results["distances"][0][0] if results.get("distances") else 1.0
confidence = max(0.0, 1.0 - (distance / 2.0))
return {"answer": answer, "confidence": round(float(confidence), 3)}
except Exception as e:
print(f"KB query failed: {e}")
return {"answer": None, "confidence": 0.0}
# Gemini LLM
llm = ChatGoogleGenerativeAI(
model="gemini-2.0-flash-exp",
temperature=0.3,
google_api_key=GEMINI_API_KEY
)
# Global conversation storage
conversations = {}
# Tool Functions for Agent
def classify_tool(query: str) -> str:
"""Classifies IT ticket into impact, urgency, and type. Use this FIRST."""
result = classify_ticket(query)
return f"Impact: {result['impact']}, Urgency: {result['urgency']}, Type: {result['type']}"
def routing_tool(query: str) -> str:
"""Determines which IT department should handle this ticket. Use this SECOND."""
dept = call_routing(query)
return f"Department: {dept}"
def kb_tool(query: str) -> str:
"""Searches knowledge base for solutions. Returns answer and confidence score. Use this THIRD."""
result = query_kb(query)
if result["answer"] and result["confidence"] > 0.5:
return f"[KB Confidence: {result['confidence']}]\n{result['answer']}"
return f"[KB Confidence: {result['confidence']}] No relevant solution found in knowledge base."
def escalation_tool(reason: str) -> str:
"""Escalates ticket to human agent. Use ONLY when KB confidence < 0.75 OR user says solution didn't work."""
ticket_id = f"TKT-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
return f"ESCALATED: Ticket {ticket_id} created. Reason: {reason}. Human agent will respond in 2-4 hours."
# Define Tools
tools = [
Tool(
name="ClassifyTicket",
func=classify_tool,
description="Classifies IT ticket severity. Input: user's issue description. Use this FIRST for every new ticket."
),
Tool(
name="RouteTicket",
func=routing_tool,
description="Determines responsible department. Input: user's issue description. Use this SECOND after classification."
),
Tool(
name="SearchKnowledgeBase",
func=kb_tool,
description="Searches for solutions with confidence score. Input: user's issue description. Use this THIRD to find solutions."
),
Tool(
name="EscalateToHuman",
func=escalation_tool,
description="Creates escalation ticket. Input: brief reason. Use ONLY when: 1) KB confidence < 0.75, 2) User reports solution failed, 3) Complex/unusual issue."
)
]
# Agent Prompt Template
AGENT_PROMPT = """You are an intelligent IT Helpdesk AI Agent. Resolve tickets efficiently using available tools.
TOOLS:
{tools}
WORKFLOW FOR NEW TICKETS:
1. Use ClassifyTicket to understand severity
2. Use RouteTicket to determine responsible team
3. Use SearchKnowledgeBase to find solutions
4. Evaluate KB confidence score:
- If confidence >= 0.75: Provide the solution to user
- If confidence < 0.75: Use EscalateToHuman with clear reason
WORKFLOW FOR FOLLOW-UPS:
- If user confirms solution worked: Thank them and close positively
- If user says solution didn't work: Use SearchKnowledgeBase again OR EscalateToHuman
- For clarification questions: Answer directly
RULES:
- Be professional, empathetic, and clear
- Provide step-by-step instructions
- Trust high-confidence KB solutions (>= 0.45)
- Don't escalate prematurely
- Remember conversation context
Use this format:
Question: the input question
Thought: think about what to do
Action: the action to take, must be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (repeat as needed)
Thought: I now know the final answer
Final Answer: the final answer
Begin!
Question: {input}
Thought: {agent_scratchpad}"""
prompt = PromptTemplate.from_template(AGENT_PROMPT)
# Create Agent
agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
max_iterations=6,
handle_parsing_errors=True,
return_intermediate_steps=True
)
# Main Processing Function
def process_with_agent(user_message: str, conversation_id: str = None):
"""Process user message through autonomous AI agent."""
if not conversation_id:
conversation_id = f"conv_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{hash(user_message) % 10000}"
if conversation_id not in conversations:
conversations[conversation_id] = {
"messages": [],
"ticket_info": {},
"created_at": datetime.now().isoformat()
}
conv = conversations[conversation_id]
conv["messages"].append({
"role": "user",
"content": user_message,
"timestamp": datetime.now().isoformat()
})
# Build context for follow-ups
if len(conv["messages"]) > 1:
context = f"PREVIOUS CONVERSATION:\n"
for msg in conv["messages"][-5:-1]:
context += f"{msg['role'].upper()}: {msg['content']}\n"
context += f"\nCURRENT USER MESSAGE: {user_message}"
agent_input = context
else:
agent_input = user_message
try:
result = agent_executor.invoke({"input": agent_input})
agent_response = result.get("output", "I apologize, I encountered an error.")
intermediate_steps = result.get("intermediate_steps", [])
status = "resolved"
if "ESCALATED" in agent_response or "TKT-" in agent_response:
status = "escalated"
elif len(conv["messages"]) > 1:
status = "in_progress"
reasoning_trace = []
for action, observation in intermediate_steps:
reasoning_trace.append({
"tool": action.tool,
"input": action.tool_input,
"output": str(observation)[:200]
})
conv["messages"].append({
"role": "assistant",
"content": agent_response,
"timestamp": datetime.now().isoformat(),
"reasoning": reasoning_trace
})
return {
"conversation_id": conversation_id,
"response": agent_response,
"status": status,
"message_count": len(conv["messages"]),
"reasoning_trace": reasoning_trace
}
except Exception as e:
print(f"Agent error: {e}")
import traceback
traceback.print_exc()
error_response = "I apologize, I encountered an error. Please try again."
conv["messages"].append({
"role": "assistant",
"content": error_response,
"timestamp": datetime.now().isoformat()
})
return {
"conversation_id": conversation_id,
"response": error_response,
"status": "error",
"error": str(e)
}
def get_conversation_history(conversation_id: str):
"""Get conversation history."""
return conversations.get(conversation_id)