Spaces:
Sleeping
Sleeping
| """ | |
| ITSM Knowledge Base Chatbot - Hugging Face Deployment | |
| Optimized for Spaces with minimal dependencies | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import faiss | |
| import pickle | |
| import numpy as np | |
| import pandas as pd | |
| import uuid | |
| from datetime import datetime | |
| from typing import List, Dict, Any, Optional | |
| from dataclasses import dataclass, asdict | |
| import threading | |
| from flask import Flask, request, jsonify, render_template, session | |
| from sentence_transformers import SentenceTransformer | |
| from dotenv import load_dotenv | |
| # Try to import Groq (optional) | |
| try: | |
| from groq import Groq | |
| GROQ_AVAILABLE = True | |
| except ImportError: | |
| GROQ_AVAILABLE = False | |
| print("β οΈ Groq not installed. Using simple responses.") | |
| # ==================== CONFIGURATION ==================== | |
| load_dotenv() | |
| class Config: | |
| EMBEDDING_MODEL = "all-MiniLM-L6-v2" | |
| CSV_PATH = "data/synthetic_knowledge_items.csv" | |
| VECTOR_STORE_DIR = "vector_store" | |
| INDEX_PATH = f"{VECTOR_STORE_DIR}/kb_index.faiss" | |
| METADATA_PATH = f"{VECTOR_STORE_DIR}/kb_metadata.pkl" | |
| GROQ_MODEL = "llama3-70b-8192" | |
| HISTORY_FILE = "chat_data.json" | |
| SECRET_KEY = os.getenv("SECRET_KEY", "dev-key-for-huggingface") | |
| HF_SPACE = os.getenv("HF_SPACE", "False") == "True" | |
| # ==================== DATACLASSES ==================== | |
| class KnowledgeItem: | |
| id: str | |
| topic: str | |
| text: str | |
| alt_text: str = "" | |
| bad_text: str = "" | |
| category: str = "General" | |
| class SupportTicket: | |
| id: str | |
| description: str | |
| status: str | |
| priority: str | |
| category: str | |
| created_at: str | |
| created_by: str | |
| estimated_resolution: str = "" | |
| # ==================== INDEX BUILDER ==================== | |
| class KnowledgeIndexBuilder: | |
| def __init__(self): | |
| # Use smaller model for Hugging Face | |
| self.model = SentenceTransformer(Config.EMBEDDING_MODEL) | |
| self.index = None | |
| self.knowledge_items = [] | |
| def build_index(self): | |
| """Build FAISS index from CSV file""" | |
| print(f"π Building index from: {Config.CSV_PATH}") | |
| if not os.path.exists(Config.CSV_PATH): | |
| raise FileNotFoundError(f"CSV file not found: {Config.CSV_PATH}") | |
| # Load and process CSV | |
| df = pd.read_csv(Config.CSV_PATH) | |
| print(f"π Loaded {len(df)} rows") | |
| processed_items = [] | |
| texts_for_embedding = [] | |
| # Process in smaller batches for Hugging Face | |
| for idx, row in df.iterrows(): | |
| if idx >= 100 and Config.HF_SPACE: # Limit for Hugging Face | |
| break | |
| # Clean data | |
| topic = str(row.get('ki_topic', '')).strip() | |
| text = str(row.get('ki_text', '')).strip() | |
| alt_text = str(row.get('alt_ki_text', '')).strip() | |
| bad_text = str(row.get('bad_ki_text', '')).strip() | |
| if not topic and not text: | |
| continue | |
| # Create knowledge item | |
| item_id = f"kb_{idx:04d}" | |
| category = self._extract_category(topic) | |
| item = KnowledgeItem( | |
| id=item_id, | |
| topic=topic, | |
| text=text, | |
| alt_text=alt_text, | |
| bad_text=bad_text, | |
| category=category | |
| ) | |
| # For embedding | |
| embedding_text = f"{topic}. {text}" | |
| processed_items.append(item) | |
| texts_for_embedding.append(embedding_text) | |
| print(f"β Processed {len(processed_items)} unique items") | |
| if not processed_items: | |
| raise ValueError("No valid knowledge items found") | |
| # Generate embeddings | |
| print("π’ Generating embeddings...") | |
| embeddings = self.model.encode( | |
| texts_for_embedding, | |
| convert_to_numpy=True, | |
| normalize_embeddings=True, | |
| show_progress_bar=True | |
| ) | |
| # Create FAISS index | |
| dim = embeddings.shape[1] | |
| self.index = faiss.IndexFlatIP(dim) | |
| self.index.add(embeddings.astype('float32')) | |
| self.knowledge_items = processed_items | |
| # Save to disk | |
| self._save_to_disk() | |
| # Print statistics | |
| self._print_statistics() | |
| return self | |
| def _extract_category(self, topic: str) -> str: | |
| """Extract category from topic""" | |
| categories = { | |
| 'password': 'Authentication', 'login': 'Authentication', | |
| 'vpn': 'Network', 'wifi': 'Network', | |
| 'email': 'Communication', 'outlook': 'Communication', | |
| 'software': 'Software', 'install': 'Software', | |
| 'printer': 'Hardware', 'hardware': 'Hardware' | |
| } | |
| topic_lower = topic.lower() | |
| for keyword, category in categories.items(): | |
| if keyword in topic_lower: | |
| return category | |
| return 'General' | |
| def _save_to_disk(self): | |
| """Save index and metadata to disk""" | |
| os.makedirs(Config.VECTOR_STORE_DIR, exist_ok=True) | |
| # Save FAISS index | |
| faiss.write_index(self.index, Config.INDEX_PATH) | |
| print(f"πΎ Saved FAISS index to: {Config.INDEX_PATH}") | |
| # Save metadata | |
| metadata = [] | |
| for item in self.knowledge_items: | |
| metadata.append(asdict(item)) | |
| with open(Config.METADATA_PATH, 'wb') as f: | |
| pickle.dump(metadata, f) | |
| print(f"πΎ Saved metadata to: {Config.METADATA_PATH}") | |
| # Save statistics | |
| stats = { | |
| 'total_items': len(self.knowledge_items), | |
| 'categories': {}, | |
| 'created_at': datetime.now().isoformat(), | |
| 'deployment': 'huggingface' if Config.HF_SPACE else 'local' | |
| } | |
| for item in self.knowledge_items: | |
| stats['categories'][item.category] = stats['categories'].get(item.category, 0) + 1 | |
| stats_path = f"{Config.VECTOR_STORE_DIR}/stats.json" | |
| with open(stats_path, 'w') as f: | |
| json.dump(stats, f, indent=2) | |
| print(f"π Saved statistics to: {stats_path}") | |
| def _print_statistics(self): | |
| """Print index statistics""" | |
| print(f"\nπ Index Statistics:") | |
| print(f" Total knowledge items: {len(self.knowledge_items)}") | |
| print(f" Embedding dimension: {self.index.d}") | |
| print(f" Index size: {self.index.ntotal} vectors") | |
| print(f" Deployment: {'Hugging Face' if Config.HF_SPACE else 'Local'}") | |
| # ==================== KNOWLEDGE RETRIEVER ==================== | |
| class KnowledgeRetriever: | |
| def __init__(self): | |
| """Load existing index or build if not exists""" | |
| self.model = SentenceTransformer(Config.EMBEDDING_MODEL) | |
| self.index = None | |
| self.knowledge_data = [] | |
| self._load_index() | |
| def _load_index(self): | |
| """Load FAISS index and metadata""" | |
| if not os.path.exists(Config.INDEX_PATH): | |
| print("β οΈ Index not found. Building now...") | |
| builder = KnowledgeIndexBuilder() | |
| builder.build_index() | |
| # Load index | |
| self.index = faiss.read_index(Config.INDEX_PATH) | |
| # Load metadata | |
| with open(Config.METADATA_PATH, 'rb') as f: | |
| self.knowledge_data = pickle.load(f) | |
| print(f"β Loaded {len(self.knowledge_data)} knowledge items") | |
| def retrieve(self, query: str, top_k: int = 3, similarity_threshold: float = 0.3) -> List[Dict]: | |
| """Retrieve relevant knowledge items (optimized for Hugging Face)""" | |
| # Encode query | |
| query_embedding = self.model.encode([query], convert_to_numpy=True) | |
| query_embedding = query_embedding.astype('float32') | |
| # Normalize for cosine similarity | |
| faiss.normalize_L2(query_embedding) | |
| # Search | |
| k = min(top_k * 2, len(self.knowledge_data)) | |
| scores, indices = self.index.search(query_embedding, k) | |
| # Process results | |
| results = [] | |
| for score, idx in zip(scores[0], indices[0]): | |
| if idx < 0 or idx >= len(self.knowledge_data): | |
| continue | |
| similarity = float(score) | |
| if similarity >= similarity_threshold: | |
| item = self.knowledge_data[idx].copy() | |
| item['similarity'] = similarity | |
| results.append(item) | |
| # Sort by similarity | |
| results.sort(key=lambda x: x['similarity'], reverse=True) | |
| return results[:top_k] | |
| # ==================== CHAT ENGINE ==================== | |
| class ChatEngine: | |
| def __init__(self): | |
| self.retriever = KnowledgeRetriever() | |
| self.groq_client = None | |
| self.active_tickets = {} | |
| self._init_groq() | |
| def _init_groq(self): | |
| """Initialize Groq client if available""" | |
| if GROQ_AVAILABLE: | |
| api_key = os.getenv("GROQ_API_KEY") | |
| if api_key: | |
| try: | |
| self.groq_client = Groq(api_key=api_key) | |
| print(f"β Groq integration enabled (Model: {Config.GROQ_MODEL})") | |
| except Exception as e: | |
| print(f"β οΈ Groq initialization failed: {e}") | |
| self.groq_client = None | |
| else: | |
| print("β οΈ GROQ_API_KEY not found. Using simple responses.") | |
| else: | |
| print("βΉοΈ Using simple response mode") | |
| def generate_response(self, query: str, context: str) -> str: | |
| """Generate response using Groq or fallback""" | |
| if self.groq_client: | |
| try: | |
| return self._generate_groq_response(query, context) | |
| except Exception as e: | |
| print(f"β οΈ Groq error: {e}") | |
| # Fallback response | |
| return self._generate_fallback_response(query, context) | |
| def _generate_groq_response(self, query: str, context: str) -> str: | |
| """Generate response using Groq API""" | |
| system_prompt = """You are a helpful IT support assistant. | |
| Provide clear, step-by-step instructions based on the company knowledge base. | |
| Be concise but thorough. Use bullet points when helpful.""" | |
| user_prompt = f"""User Query: {query} | |
| Relevant Knowledge Base Information: | |
| {context} | |
| Based on the above knowledge base information, provide a helpful response.""" | |
| try: | |
| response = self.groq_client.chat.completions.create( | |
| model=Config.GROQ_MODEL, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ], | |
| temperature=0.3, | |
| max_tokens=300, # Reduced for Hugging Face | |
| timeout=30 # Add timeout for Hugging Face | |
| ) | |
| return response.choices[0].message.content.strip() | |
| except Exception as e: | |
| print(f"Groq API call failed: {e}") | |
| return self._generate_fallback_response(query, context) | |
| def _generate_fallback_response(self, query: str, context: str) -> str: | |
| """Generate simple response without AI - FIXED VERSION""" | |
| # Check if context has actual knowledge items | |
| if "No relevant knowledge found" in context: | |
| return "I couldn't find specific information about that in our knowledge base. Could you please provide more details?" | |
| # Try to extract the most relevant information | |
| lines = context.split('\n') | |
| instructions = [] | |
| # Look for actual instruction lines | |
| for i, line in enumerate(lines): | |
| line_lower = line.lower() | |
| # Look for instruction content | |
| if "instruction:" in line_lower and i + 1 < len(lines): | |
| next_line = lines[i + 1].strip() | |
| if next_line and not next_line.startswith("["): | |
| instructions.append(next_line) | |
| # Also look for text content directly | |
| elif "text:" in line_lower and i + 1 < len(lines): | |
| next_line = lines[i + 1].strip() | |
| if next_line and not next_line.startswith("["): | |
| instructions.append(next_line) | |
| if instructions: | |
| # Use the first (most relevant) instruction | |
| response = f"Based on our knowledge base, here's how to connect to VPN:\n\n" | |
| response += instructions[0] | |
| # Add alternative if available | |
| if len(instructions) > 1: | |
| response += f"\n\nAlternative approach:\n{instructions[1]}" | |
| return response | |
| # If no instructions found but context exists, extract directly | |
| if lines: | |
| # Try to find any useful content | |
| for line in lines: | |
| if line and len(line) > 20 and not line.startswith("[") and ":" not in line: | |
| return f"Based on our knowledge base:\n\n{line.strip()}" | |
| return "I found some relevant information in our knowledge base. Please check with IT support for detailed implementation." | |
| def process_query(self, query: str, user_id: str = "anonymous") -> Dict[str, Any]: | |
| """Process user query and generate response""" | |
| # Handle greetings | |
| query_lower = query.lower() | |
| if any(word in query_lower for word in ["hello", "hi", "hey", "greetings"]): | |
| return { | |
| "answer": "Hello! I'm your ITSM assistant. I can help you with IT issues, knowledge base searches, and ticket creation. How can I assist you today?", | |
| "timestamp": datetime.now().isoformat(), | |
| "sources": [], | |
| "intent": "greeting" | |
| } | |
| # Retrieve relevant knowledge | |
| results = self.retriever.retrieve(query, top_k=2) # Reduced for Hugging Face | |
| # Prepare context | |
| if results: | |
| context_parts = [] | |
| for i, item in enumerate(results, 1): | |
| context = f"[{i}] {item['topic']}\n" | |
| context += f"Category: {item['category']}\n" | |
| context += f"Instruction: {item['text']}\n" | |
| context_parts.append(context) | |
| context = "\n".join(context_parts) | |
| else: | |
| context = "No relevant knowledge found." | |
| # Generate response | |
| answer = self.generate_response(query, context) | |
| # Prepare response | |
| response = { | |
| "answer": answer, | |
| "timestamp": datetime.now().isoformat(), | |
| "sources": [], | |
| "context_used": bool(results) | |
| } | |
| # Add sources if available | |
| if results: | |
| response["sources"] = [{ | |
| "topic": r["topic"], | |
| "category": r["category"], | |
| "confidence": f"{r.get('similarity', 0):.0%}" | |
| } for r in results] | |
| return response | |
| # ==================== CHAT HISTORY MANAGER ==================== | |
| class ChatHistoryManager: | |
| def __init__(self): | |
| self.history_file = Config.HISTORY_FILE | |
| self.lock = threading.RLock() | |
| self._ensure_file() | |
| def _ensure_file(self): | |
| """Ensure history file exists""" | |
| if not os.path.exists(self.history_file): | |
| with open(self.history_file, 'w') as f: | |
| json.dump({"sessions": {}}, f) | |
| def add_message(self, session_id: str, role: str, content: str): | |
| """Add message to chat history (simplified for Hugging Face)""" | |
| with self.lock: | |
| try: | |
| # Load existing data | |
| with open(self.history_file, 'r') as f: | |
| data = json.load(f) | |
| # Ensure session exists | |
| if 'sessions' not in data: | |
| data['sessions'] = {} | |
| if session_id not in data['sessions']: | |
| data['sessions'][session_id] = { | |
| "created_at": datetime.now().isoformat(), | |
| "messages": [] | |
| } | |
| # Add message (limit to last 10 messages per session for Hugging Face) | |
| message = { | |
| "role": role, | |
| "content": content, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| data['sessions'][session_id]['messages'].append(message) | |
| # Keep only last 10 messages | |
| if len(data['sessions'][session_id]['messages']) > 10: | |
| data['sessions'][session_id]['messages'] = data['sessions'][session_id]['messages'][-10:] | |
| data['sessions'][session_id]['last_activity'] = datetime.now().isoformat() | |
| # Save back | |
| with open(self.history_file, 'w') as f: | |
| json.dump(data, f) | |
| except Exception as e: | |
| print(f"Error saving chat history: {e}") | |
| def get_messages(self, session_id: str) -> List[Dict]: | |
| """Get messages from a session""" | |
| with self.lock: | |
| try: | |
| with open(self.history_file, 'r') as f: | |
| data = json.load(f) | |
| if 'sessions' not in data or session_id not in data['sessions']: | |
| return [] | |
| return data['sessions'][session_id].get('messages', []) | |
| except Exception as e: | |
| print(f"Error loading chat history: {e}") | |
| return [] | |
| # ==================== FLASK APPLICATION ==================== | |
| app = Flask(__name__, | |
| static_folder='static', | |
| template_folder='templates') | |
| app.secret_key = Config.SECRET_KEY | |
| # Initialize components | |
| chat_engine = ChatEngine() | |
| history_manager = ChatHistoryManager() | |
| # ==================== ROUTES ==================== | |
| def home(): | |
| """Serve the main chat interface""" | |
| # Generate session ID if not exists | |
| if 'session_id' not in session: | |
| session['session_id'] = str(uuid.uuid4())[:8] | |
| return render_template('index.html', | |
| session_id=session['session_id'], | |
| hf_space=Config.HF_SPACE) | |
| def chat(): | |
| """Handle chat messages""" | |
| try: | |
| data = request.json | |
| user_message = data.get('message', '').strip() | |
| session_id = data.get('session_id') or session.get('session_id') | |
| if not user_message: | |
| return jsonify({'error': 'Message is required'}), 400 | |
| if not session_id: | |
| session_id = str(uuid.uuid4())[:8] | |
| session['session_id'] = session_id | |
| # Save user message | |
| history_manager.add_message(session_id, 'user', user_message) | |
| # Process through chat engine | |
| response = chat_engine.process_query(user_message, session_id) | |
| # Save bot response | |
| history_manager.add_message(session_id, 'assistant', response['answer']) | |
| # Return response | |
| return jsonify({ | |
| 'success': True, | |
| 'session_id': session_id, | |
| 'response': response['answer'], | |
| 'sources': response.get('sources', []), | |
| 'timestamp': response['timestamp'] | |
| }) | |
| except Exception as e: | |
| print(f"Error in chat endpoint: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'Internal server error', | |
| 'response': 'Sorry, I encountered an error. Please try again.' | |
| }), 500 | |
| def get_history(session_id): | |
| """Get chat history for a session""" | |
| try: | |
| messages = history_manager.get_messages(session_id) | |
| return jsonify({ | |
| 'success': True, | |
| 'session_id': session_id, | |
| 'messages': messages, | |
| 'count': len(messages) | |
| }) | |
| except Exception as e: | |
| return jsonify({'success': False, 'error': str(e)}), 500 | |
| def health_check(): | |
| """Health check endpoint""" | |
| return jsonify({ | |
| 'status': 'healthy', | |
| 'timestamp': datetime.now().isoformat(), | |
| 'deployment': 'huggingface' if Config.HF_SPACE else 'local', | |
| 'knowledge_items': len(chat_engine.retriever.knowledge_data), | |
| 'groq_available': GROQ_AVAILABLE and bool(os.getenv("GROQ_API_KEY")), | |
| 'version': '1.0.0' | |
| }) | |
| def clear_session(): | |
| """Clear current session""" | |
| try: | |
| session_id = request.json.get('session_id') or session.get('session_id') | |
| if 'session_id' in session: | |
| session.pop('session_id') | |
| return jsonify({ | |
| 'success': True, | |
| 'message': 'Session cleared' | |
| }) | |
| except Exception as e: | |
| return jsonify({'success': False, 'error': str(e)}), 500 | |
| # ==================== UTILITY FUNCTIONS ==================== | |
| def ensure_directories(): | |
| """Ensure required directories exist""" | |
| os.makedirs('data', exist_ok=True) | |
| os.makedirs('vector_store', exist_ok=True) | |
| os.makedirs('static', exist_ok=True) | |
| os.makedirs('templates', exist_ok=True) | |
| # Check for CSV file | |
| if not os.path.exists(Config.CSV_PATH): | |
| print(f"β οΈ Warning: CSV file not found at {Config.CSV_PATH}") | |
| # Create a minimal sample CSV for Hugging Face demo | |
| sample_data = pd.DataFrame({ | |
| 'ki_topic': [ | |
| 'Password Reset Guide', | |
| 'VPN Connection Issues', | |
| 'Email Setup Instructions' | |
| ], | |
| 'ki_text': [ | |
| 'To reset your password, visit the company portal and click "Forgot Password".', | |
| 'For VPN issues, check your internet connection and restart the VPN client.', | |
| 'Configure email by entering server settings: mail.company.com, port 993.' | |
| ], | |
| 'alt_ki_text': ['', '', ''], | |
| 'bad_ki_text': ['', '', ''] | |
| }) | |
| sample_data.to_csv(Config.CSV_PATH, index=False) | |
| print(f"π Created sample CSV at {Config.CSV_PATH}") | |
| return True | |
| # ==================== MAIN ENTRY ==================== | |
| if __name__ == '__main__': | |
| # Ensure directories exist | |
| ensure_directories() | |
| # Check if index needs to be built | |
| if not os.path.exists(Config.INDEX_PATH): | |
| print("π¨ Building knowledge index...") | |
| try: | |
| builder = KnowledgeIndexBuilder() | |
| builder.build_index() | |
| print("β Index built successfully!") | |
| except Exception as e: | |
| print(f"β Error building index: {e}") | |
| # Print startup info | |
| print("\n" + "="*60) | |
| print("π ITSM Knowledge Base Chatbot") | |
| print("="*60) | |
| print(f"π Knowledge Base: {len(chat_engine.retriever.knowledge_data)} items") | |
| print(f"π€ Chat Engine: Ready") | |
| print(f"π Deployment: {'Hugging Face Space' if Config.HF_SPACE else 'Local'}") | |
| print(f"π URL: http://localhost:5000") | |
| print("="*60 + "\n") | |
| # Run the application | |
| port = int(os.getenv("PORT", 5000)) | |
| debug = not Config.HF_SPACE # Disable debug in production/HF | |
| app.run(debug=debug, host='0.0.0.0', port=port) |