Spaces:
Sleeping
Sleeping
| import os | |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
| os.environ["TRANSFORMERS_CACHE"] = "/tmp/transformers" | |
| os.environ["HF_HOME"] = "/tmp/huggingface" | |
| os.environ["SENTENCE_TRANSFORMERS_HOME"] = "/tmp/sentence_transformers" | |
| os.environ["TORCH_HOME"] = "/tmp/torch" | |
| import requests | |
| import torch | |
| import time | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| import numpy as np | |
| from sentence_transformers import SentenceTransformer | |
| import chromadb | |
| from chromadb.config import Settings | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain.agents import AgentExecutor, create_react_agent | |
| from langchain.tools import Tool | |
| from langchain.prompts import PromptTemplate | |
| import threading | |
| from datetime import datetime | |
| # Environment | |
| GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY") | |
| ROUTING_URL = os.environ.get("ROUTING_URL") | |
| SPACE_URL = os.environ.get("SPACE_URL", "http://localhost:7860") | |
| # Label Dictionary | |
| LABEL_DICTIONARY = { | |
| "I1": "Low Impact", "I2": "Medium Impact", "I3": "High Impact", "I4": "Critical Impact", | |
| "U1": "Low Urgency", "U2": "Medium Urgency", "U3": "High Urgency", "U4": "Critical Urgency", | |
| "T1": "Information", "T2": "Incident", "T3": "Problem", "T4": "Request", "T5": "Question" | |
| } | |
| # Classification Model | |
| clf_model_name = "DavinciTech/BERT_Categorizer" | |
| clf_tokenizer = AutoTokenizer.from_pretrained(clf_model_name, cache_dir="/tmp/transformers") | |
| clf_model = AutoModelForSequenceClassification.from_pretrained(clf_model_name, cache_dir="/tmp/transformers") | |
| def classify_ticket(text): | |
| """Classify ticket into Impact, Urgency, and Type.""" | |
| inputs = clf_tokenizer(text, return_tensors="pt", truncation=True) | |
| outputs = clf_model(**inputs) | |
| logits = outputs.logits[0] | |
| impact_idx = torch.argmax(logits[:4]).item() + 1 | |
| urgency_idx = torch.argmax(logits[4:8]).item() + 1 | |
| type_idx = torch.argmax(logits[8:]).item() + 1 | |
| return { | |
| "impact": LABEL_DICTIONARY[f"I{impact_idx}"], | |
| "urgency": LABEL_DICTIONARY[f"U{urgency_idx}"], | |
| "type": LABEL_DICTIONARY[f"T{type_idx}"] | |
| } | |
| # Routing Function | |
| def call_routing(text, retries=3, delay=5): | |
| """Route ticket to appropriate department.""" | |
| url = ROUTING_URL if ROUTING_URL else f"{SPACE_URL}/route" | |
| for attempt in range(retries): | |
| try: | |
| resp = requests.post(url, json={"text": text}, timeout=30) | |
| resp.raise_for_status() | |
| return resp.json().get("department", "General IT") | |
| except Exception as e: | |
| print(f"Routing attempt {attempt+1} failed: {e}") | |
| if attempt < retries - 1: | |
| time.sleep(delay) | |
| return "General IT" | |
| # Knowledge Base | |
| CHROMA_PATH = "/tmp/chroma" | |
| COLLECTION_NAME = "knowledge_base" | |
| kb_collection = None | |
| kb_lock = threading.Lock() | |
| encoder = SentenceTransformer("all-MiniLM-L6-v2", cache_folder="/tmp/sentence_transformers") | |
| def get_kb_collection(): | |
| global kb_collection | |
| if kb_collection is None: | |
| with kb_lock: | |
| if kb_collection is None: | |
| try: | |
| chroma_client = chromadb.PersistentClient( | |
| path=CHROMA_PATH, | |
| settings=Settings(anonymized_telemetry=False, allow_reset=True) | |
| ) | |
| kb_collection = chroma_client.get_or_create_collection(COLLECTION_NAME) | |
| except Exception as e: | |
| print(f"Could not get KB collection: {e}") | |
| return kb_collection | |
| def query_kb(text: str, top_k: int = 1): | |
| """Query KB and return answer with confidence.""" | |
| collection = get_kb_collection() | |
| if not collection or collection.count() == 0: | |
| return {"answer": None, "confidence": 0.0} | |
| try: | |
| query_embedding = encoder.encode([text])[0].tolist() | |
| results = collection.query( | |
| query_embeddings=[query_embedding], | |
| n_results=top_k, | |
| include=["documents", "distances", "metadatas"] | |
| ) | |
| if not results or not results.get("documents") or len(results["documents"][0]) == 0: | |
| return {"answer": None, "confidence": 0.0} | |
| answer = results["documents"][0][0] | |
| distance = results["distances"][0][0] if results.get("distances") else 1.0 | |
| confidence = max(0.0, 1.0 - (distance / 2.0)) | |
| return {"answer": answer, "confidence": round(float(confidence), 3)} | |
| except Exception as e: | |
| print(f"KB query failed: {e}") | |
| return {"answer": None, "confidence": 0.0} | |
| # Gemini LLM | |
| llm = ChatGoogleGenerativeAI( | |
| model="gemini-2.0-flash-exp", | |
| temperature=0.3, | |
| google_api_key=GEMINI_API_KEY | |
| ) | |
| # Global conversation storage | |
| conversations = {} | |
| # Tool Functions for Agent | |
| def classify_tool(query: str) -> str: | |
| """Classifies IT ticket into impact, urgency, and type. Use this FIRST.""" | |
| result = classify_ticket(query) | |
| return f"Impact: {result['impact']}, Urgency: {result['urgency']}, Type: {result['type']}" | |
| def routing_tool(query: str) -> str: | |
| """Determines which IT department should handle this ticket. Use this SECOND.""" | |
| dept = call_routing(query) | |
| return f"Department: {dept}" | |
| def kb_tool(query: str) -> str: | |
| """Searches knowledge base for solutions. Returns answer and confidence score. Use this THIRD.""" | |
| result = query_kb(query) | |
| if result["answer"] and result["confidence"] > 0.5: | |
| return f"[KB Confidence: {result['confidence']}]\n{result['answer']}" | |
| return f"[KB Confidence: {result['confidence']}] No relevant solution found in knowledge base." | |
| def escalation_tool(reason: str) -> str: | |
| """Escalates ticket to human agent. Use ONLY when KB confidence < 0.75 OR user says solution didn't work.""" | |
| ticket_id = f"TKT-{datetime.now().strftime('%Y%m%d-%H%M%S')}" | |
| return f"ESCALATED: Ticket {ticket_id} created. Reason: {reason}. Human agent will respond in 2-4 hours." | |
| # Define Tools | |
| tools = [ | |
| Tool( | |
| name="ClassifyTicket", | |
| func=classify_tool, | |
| description="Classifies IT ticket severity. Input: user's issue description. Use this FIRST for every new ticket." | |
| ), | |
| Tool( | |
| name="RouteTicket", | |
| func=routing_tool, | |
| description="Determines responsible department. Input: user's issue description. Use this SECOND after classification." | |
| ), | |
| Tool( | |
| name="SearchKnowledgeBase", | |
| func=kb_tool, | |
| description="Searches for solutions with confidence score. Input: user's issue description. Use this THIRD to find solutions." | |
| ), | |
| Tool( | |
| name="EscalateToHuman", | |
| func=escalation_tool, | |
| description="Creates escalation ticket. Input: brief reason. Use ONLY when: 1) KB confidence < 0.75, 2) User reports solution failed, 3) Complex/unusual issue." | |
| ) | |
| ] | |
| # Agent Prompt Template | |
| AGENT_PROMPT = """You are an intelligent IT Helpdesk AI Agent. Resolve tickets efficiently using available tools. | |
| TOOLS: | |
| {tools} | |
| WORKFLOW FOR NEW TICKETS: | |
| 1. Use ClassifyTicket to understand severity | |
| 2. Use RouteTicket to determine responsible team | |
| 3. Use SearchKnowledgeBase to find solutions | |
| 4. Evaluate KB confidence score: | |
| - If confidence >= 0.75: Provide the solution to user | |
| - If confidence < 0.75: Use EscalateToHuman with clear reason | |
| WORKFLOW FOR FOLLOW-UPS: | |
| - If user confirms solution worked: Thank them and close positively | |
| - If user says solution didn't work: Use SearchKnowledgeBase again OR EscalateToHuman | |
| - For clarification questions: Answer directly | |
| RULES: | |
| - Be professional, empathetic, and clear | |
| - Provide step-by-step instructions | |
| - Trust high-confidence KB solutions (>= 0.45) | |
| - Don't escalate prematurely | |
| - Remember conversation context | |
| Use this format: | |
| Question: the input question | |
| Thought: think about what to do | |
| Action: the action to take, must be one of [{tool_names}] | |
| Action Input: the input to the action | |
| Observation: the result of the action | |
| (repeat as needed) | |
| Thought: I now know the final answer | |
| Final Answer: the final answer | |
| Begin! | |
| Question: {input} | |
| Thought: {agent_scratchpad}""" | |
| prompt = PromptTemplate.from_template(AGENT_PROMPT) | |
| # Create Agent | |
| agent = create_react_agent(llm=llm, tools=tools, prompt=prompt) | |
| agent_executor = AgentExecutor( | |
| agent=agent, | |
| tools=tools, | |
| verbose=True, | |
| max_iterations=6, | |
| handle_parsing_errors=True, | |
| return_intermediate_steps=True | |
| ) | |
| # Main Processing Function | |
| def process_with_agent(user_message: str, conversation_id: str = None): | |
| """Process user message through autonomous AI agent.""" | |
| if not conversation_id: | |
| conversation_id = f"conv_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{hash(user_message) % 10000}" | |
| if conversation_id not in conversations: | |
| conversations[conversation_id] = { | |
| "messages": [], | |
| "ticket_info": {}, | |
| "created_at": datetime.now().isoformat() | |
| } | |
| conv = conversations[conversation_id] | |
| conv["messages"].append({ | |
| "role": "user", | |
| "content": user_message, | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| # Build context for follow-ups | |
| if len(conv["messages"]) > 1: | |
| context = f"PREVIOUS CONVERSATION:\n" | |
| for msg in conv["messages"][-5:-1]: | |
| context += f"{msg['role'].upper()}: {msg['content']}\n" | |
| context += f"\nCURRENT USER MESSAGE: {user_message}" | |
| agent_input = context | |
| else: | |
| agent_input = user_message | |
| try: | |
| result = agent_executor.invoke({"input": agent_input}) | |
| agent_response = result.get("output", "I apologize, I encountered an error.") | |
| intermediate_steps = result.get("intermediate_steps", []) | |
| status = "resolved" | |
| if "ESCALATED" in agent_response or "TKT-" in agent_response: | |
| status = "escalated" | |
| elif len(conv["messages"]) > 1: | |
| status = "in_progress" | |
| reasoning_trace = [] | |
| for action, observation in intermediate_steps: | |
| reasoning_trace.append({ | |
| "tool": action.tool, | |
| "input": action.tool_input, | |
| "output": str(observation)[:200] | |
| }) | |
| conv["messages"].append({ | |
| "role": "assistant", | |
| "content": agent_response, | |
| "timestamp": datetime.now().isoformat(), | |
| "reasoning": reasoning_trace | |
| }) | |
| return { | |
| "conversation_id": conversation_id, | |
| "response": agent_response, | |
| "status": status, | |
| "message_count": len(conv["messages"]), | |
| "reasoning_trace": reasoning_trace | |
| } | |
| except Exception as e: | |
| print(f"Agent error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| error_response = "I apologize, I encountered an error. Please try again." | |
| conv["messages"].append({ | |
| "role": "assistant", | |
| "content": error_response, | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| return { | |
| "conversation_id": conversation_id, | |
| "response": error_response, | |
| "status": "error", | |
| "error": str(e) | |
| } | |
| def get_conversation_history(conversation_id: str): | |
| """Get conversation history.""" | |
| return conversations.get(conversation_id) |