import os os.environ["TOKENIZERS_PARALLELISM"] = "false" os.environ["TRANSFORMERS_CACHE"] = "/tmp/transformers" os.environ["HF_HOME"] = "/tmp/huggingface" os.environ["SENTENCE_TRANSFORMERS_HOME"] = "/tmp/sentence_transformers" os.environ["TORCH_HOME"] = "/tmp/torch" import requests import torch import time from transformers import AutoTokenizer, AutoModelForSequenceClassification import numpy as np from sentence_transformers import SentenceTransformer import chromadb from chromadb.config import Settings from langchain_google_genai import ChatGoogleGenerativeAI from langchain.agents import AgentExecutor, create_react_agent from langchain.tools import Tool from langchain.prompts import PromptTemplate import threading from datetime import datetime # Environment GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY") ROUTING_URL = os.environ.get("ROUTING_URL") SPACE_URL = os.environ.get("SPACE_URL", "http://localhost:7860") # Label Dictionary LABEL_DICTIONARY = { "I1": "Low Impact", "I2": "Medium Impact", "I3": "High Impact", "I4": "Critical Impact", "U1": "Low Urgency", "U2": "Medium Urgency", "U3": "High Urgency", "U4": "Critical Urgency", "T1": "Information", "T2": "Incident", "T3": "Problem", "T4": "Request", "T5": "Question" } # Classification Model clf_model_name = "DavinciTech/BERT_Categorizer" clf_tokenizer = AutoTokenizer.from_pretrained(clf_model_name, cache_dir="/tmp/transformers") clf_model = AutoModelForSequenceClassification.from_pretrained(clf_model_name, cache_dir="/tmp/transformers") def classify_ticket(text): """Classify ticket into Impact, Urgency, and Type.""" inputs = clf_tokenizer(text, return_tensors="pt", truncation=True) outputs = clf_model(**inputs) logits = outputs.logits[0] impact_idx = torch.argmax(logits[:4]).item() + 1 urgency_idx = torch.argmax(logits[4:8]).item() + 1 type_idx = torch.argmax(logits[8:]).item() + 1 return { "impact": LABEL_DICTIONARY[f"I{impact_idx}"], "urgency": LABEL_DICTIONARY[f"U{urgency_idx}"], "type": LABEL_DICTIONARY[f"T{type_idx}"] } # Routing Function def call_routing(text, retries=3, delay=5): """Route ticket to appropriate department.""" url = ROUTING_URL if ROUTING_URL else f"{SPACE_URL}/route" for attempt in range(retries): try: resp = requests.post(url, json={"text": text}, timeout=30) resp.raise_for_status() return resp.json().get("department", "General IT") except Exception as e: print(f"Routing attempt {attempt+1} failed: {e}") if attempt < retries - 1: time.sleep(delay) return "General IT" # Knowledge Base CHROMA_PATH = "/tmp/chroma" COLLECTION_NAME = "knowledge_base" kb_collection = None kb_lock = threading.Lock() encoder = SentenceTransformer("all-MiniLM-L6-v2", cache_folder="/tmp/sentence_transformers") def get_kb_collection(): global kb_collection if kb_collection is None: with kb_lock: if kb_collection is None: try: chroma_client = chromadb.PersistentClient( path=CHROMA_PATH, settings=Settings(anonymized_telemetry=False, allow_reset=True) ) kb_collection = chroma_client.get_or_create_collection(COLLECTION_NAME) except Exception as e: print(f"Could not get KB collection: {e}") return kb_collection def query_kb(text: str, top_k: int = 1): """Query KB and return answer with confidence.""" collection = get_kb_collection() if not collection or collection.count() == 0: return {"answer": None, "confidence": 0.0} try: query_embedding = encoder.encode([text])[0].tolist() results = collection.query( query_embeddings=[query_embedding], n_results=top_k, include=["documents", "distances", "metadatas"] ) if not results or not results.get("documents") or len(results["documents"][0]) == 0: return {"answer": None, "confidence": 0.0} answer = results["documents"][0][0] distance = results["distances"][0][0] if results.get("distances") else 1.0 confidence = max(0.0, 1.0 - (distance / 2.0)) return {"answer": answer, "confidence": round(float(confidence), 3)} except Exception as e: print(f"KB query failed: {e}") return {"answer": None, "confidence": 0.0} # Gemini LLM llm = ChatGoogleGenerativeAI( model="gemini-2.0-flash-exp", temperature=0.3, google_api_key=GEMINI_API_KEY ) # Global conversation storage conversations = {} # Tool Functions for Agent def classify_tool(query: str) -> str: """Classifies IT ticket into impact, urgency, and type. Use this FIRST.""" result = classify_ticket(query) return f"Impact: {result['impact']}, Urgency: {result['urgency']}, Type: {result['type']}" def routing_tool(query: str) -> str: """Determines which IT department should handle this ticket. Use this SECOND.""" dept = call_routing(query) return f"Department: {dept}" def kb_tool(query: str) -> str: """Searches knowledge base for solutions. Returns answer and confidence score. Use this THIRD.""" result = query_kb(query) if result["answer"] and result["confidence"] > 0.5: return f"[KB Confidence: {result['confidence']}]\n{result['answer']}" return f"[KB Confidence: {result['confidence']}] No relevant solution found in knowledge base." def escalation_tool(reason: str) -> str: """Escalates ticket to human agent. Use ONLY when KB confidence < 0.75 OR user says solution didn't work.""" ticket_id = f"TKT-{datetime.now().strftime('%Y%m%d-%H%M%S')}" return f"ESCALATED: Ticket {ticket_id} created. Reason: {reason}. Human agent will respond in 2-4 hours." # Define Tools tools = [ Tool( name="ClassifyTicket", func=classify_tool, description="Classifies IT ticket severity. Input: user's issue description. Use this FIRST for every new ticket." ), Tool( name="RouteTicket", func=routing_tool, description="Determines responsible department. Input: user's issue description. Use this SECOND after classification." ), Tool( name="SearchKnowledgeBase", func=kb_tool, description="Searches for solutions with confidence score. Input: user's issue description. Use this THIRD to find solutions." ), Tool( name="EscalateToHuman", func=escalation_tool, description="Creates escalation ticket. Input: brief reason. Use ONLY when: 1) KB confidence < 0.75, 2) User reports solution failed, 3) Complex/unusual issue." ) ] # Agent Prompt Template AGENT_PROMPT = """You are an intelligent IT Helpdesk AI Agent. Resolve tickets efficiently using available tools. TOOLS: {tools} WORKFLOW FOR NEW TICKETS: 1. Use ClassifyTicket to understand severity 2. Use RouteTicket to determine responsible team 3. Use SearchKnowledgeBase to find solutions 4. Evaluate KB confidence score: - If confidence >= 0.75: Provide the solution to user - If confidence < 0.75: Use EscalateToHuman with clear reason WORKFLOW FOR FOLLOW-UPS: - If user confirms solution worked: Thank them and close positively - If user says solution didn't work: Use SearchKnowledgeBase again OR EscalateToHuman - For clarification questions: Answer directly RULES: - Be professional, empathetic, and clear - Provide step-by-step instructions - Trust high-confidence KB solutions (>= 0.45) - Don't escalate prematurely - Remember conversation context Use this format: Question: the input question Thought: think about what to do Action: the action to take, must be one of [{tool_names}] Action Input: the input to the action Observation: the result of the action ... (repeat as needed) Thought: I now know the final answer Final Answer: the final answer Begin! Question: {input} Thought: {agent_scratchpad}""" prompt = PromptTemplate.from_template(AGENT_PROMPT) # Create Agent agent = create_react_agent(llm=llm, tools=tools, prompt=prompt) agent_executor = AgentExecutor( agent=agent, tools=tools, verbose=True, max_iterations=6, handle_parsing_errors=True, return_intermediate_steps=True ) # Main Processing Function def process_with_agent(user_message: str, conversation_id: str = None): """Process user message through autonomous AI agent.""" if not conversation_id: conversation_id = f"conv_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{hash(user_message) % 10000}" if conversation_id not in conversations: conversations[conversation_id] = { "messages": [], "ticket_info": {}, "created_at": datetime.now().isoformat() } conv = conversations[conversation_id] conv["messages"].append({ "role": "user", "content": user_message, "timestamp": datetime.now().isoformat() }) # Build context for follow-ups if len(conv["messages"]) > 1: context = f"PREVIOUS CONVERSATION:\n" for msg in conv["messages"][-5:-1]: context += f"{msg['role'].upper()}: {msg['content']}\n" context += f"\nCURRENT USER MESSAGE: {user_message}" agent_input = context else: agent_input = user_message try: result = agent_executor.invoke({"input": agent_input}) agent_response = result.get("output", "I apologize, I encountered an error.") intermediate_steps = result.get("intermediate_steps", []) status = "resolved" if "ESCALATED" in agent_response or "TKT-" in agent_response: status = "escalated" elif len(conv["messages"]) > 1: status = "in_progress" reasoning_trace = [] for action, observation in intermediate_steps: reasoning_trace.append({ "tool": action.tool, "input": action.tool_input, "output": str(observation)[:200] }) conv["messages"].append({ "role": "assistant", "content": agent_response, "timestamp": datetime.now().isoformat(), "reasoning": reasoning_trace }) return { "conversation_id": conversation_id, "response": agent_response, "status": status, "message_count": len(conv["messages"]), "reasoning_trace": reasoning_trace } except Exception as e: print(f"Agent error: {e}") import traceback traceback.print_exc() error_response = "I apologize, I encountered an error. Please try again." conv["messages"].append({ "role": "assistant", "content": error_response, "timestamp": datetime.now().isoformat() }) return { "conversation_id": conversation_id, "response": error_response, "status": "error", "error": str(e) } def get_conversation_history(conversation_id: str): """Get conversation history.""" return conversations.get(conversation_id)