""" AI-Generated Knowledge Base Metadata Extraction Service Extracts rich metadata from documents during ingestion: - Title - Summary - Tags - Topics (via LLM) - Date detection - Document quality score """ import os import re from typing import Dict, Any, Optional, List from datetime import datetime from ..services.llm_client import LLMClient class MetadataExtractor: """ Extracts structured metadata from document content using LLM and pattern matching. """ def __init__(self, llm_client: Optional[LLMClient] = None): self.llm = llm_client or LLMClient( backend=os.getenv("LLM_BACKEND", "ollama"), url=os.getenv("OLLAMA_URL"), api_key=os.getenv("GROQ_API_KEY"), model=os.getenv("OLLAMA_MODEL", "llama3.1:latest") ) async def extract_metadata( self, content: str, filename: Optional[str] = None, url: Optional[str] = None, source_type: Optional[str] = None ) -> Dict[str, Any]: """ Extract comprehensive metadata from document content. Args: content: Document text content filename: Original filename (if available) url: Source URL (if available) source_type: Document type (pdf, docx, txt, etc.) Returns: Dictionary with extracted metadata: - title: Extracted or inferred title - summary: Brief summary (2-3 sentences) - tags: List of relevant tags - topics: List of main topics/themes - detected_date: Extracted date (ISO format or None) - quality_score: Document quality score (0.0-1.0) - word_count: Word count - language: Detected language (if available) """ # Basic metadata (always available) word_count = len(content.split()) char_count = len(content) # Extract title (try multiple methods) title = self._extract_title(content, filename, url) # Detect date detected_date = self._detect_date(content) # Try LLM extraction for rich metadata llm_metadata = {} try: llm_metadata = await self._extract_with_llm(content, title) except Exception as e: print(f"LLM metadata extraction failed: {e}, using fallback") llm_metadata = self._extract_fallback(content, title) # Calculate quality score quality_score = self._calculate_quality_score( content, word_count, llm_metadata.get("summary", "") ) return { "title": title, "summary": llm_metadata.get("summary", self._generate_basic_summary(content)), "tags": llm_metadata.get("tags", self._extract_basic_tags(content)), "topics": llm_metadata.get("topics", self._extract_basic_topics(content)), "detected_date": detected_date, "quality_score": quality_score, "word_count": word_count, "char_count": char_count, "source_type": source_type or "unknown", "extraction_method": "llm" if llm_metadata.get("summary") else "fallback" } def _extract_title(self, content: str, filename: Optional[str] = None, url: Optional[str] = None) -> str: """Extract title from content, filename, or URL.""" # Try filename first (remove extension) if filename: title = filename.rsplit('.', 1)[0] if '.' in filename else filename if title and len(title) > 3: return title.replace('_', ' ').replace('-', ' ').title() # Try first line (common in markdown/docs) lines = content.split('\n') for line in lines[:5]: line = line.strip() if line and len(line) < 200 and not line.startswith('#'): # Check if it looks like a title if len(line.split()) <= 15: return line # Try markdown headers for line in lines[:10]: if line.startswith('# '): return line[2:].strip() if line.startswith('## '): return line[3:].strip() # Try URL path if url: from urllib.parse import urlparse parsed = urlparse(url) path = parsed.path.strip('/').split('/')[-1] if path and len(path) > 3: return path.replace('_', ' ').replace('-', ' ').title() # Fallback: first 50 chars return content[:50].strip() + "..." if len(content) > 50 else content.strip() def _detect_date(self, content: str) -> Optional[str]: """Detect dates in various formats.""" # Common date patterns patterns = [ r'\b(\d{4}-\d{2}-\d{2})\b', # YYYY-MM-DD r'\b(\d{2}/\d{2}/\d{4})\b', # MM/DD/YYYY r'\b(\d{4}/\d{2}/\d{2})\b', # YYYY/MM/DD r'\b(January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}\b', r'\b\d{1,2}\s+(January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{4}\b', ] for pattern in patterns: matches = re.findall(pattern, content, re.IGNORECASE) if matches: try: # Try to parse and normalize date_str = matches[0] if isinstance(matches[0], str) else ' '.join(matches[0]) # Return first valid date found return date_str except: continue return None async def _extract_with_llm(self, content: str, title: str) -> Dict[str, Any]: """Extract metadata using LLM.""" # Truncate content for LLM (first 2000 chars for efficiency) preview = content[:2000] + "..." if len(content) > 2000 else content prompt = f"""Analyze the following document and extract structured metadata. Title: {title} Content Preview: {preview} Extract the following information: 1. A concise summary (2-3 sentences) of what this document is about 2. 5-8 relevant tags (single words or short phrases, comma-separated) 3. 3-5 main topics/themes (comma-separated) 4. The primary subject matter or domain Respond in JSON format: {{ "summary": "Brief 2-3 sentence summary of the document", "tags": ["tag1", "tag2", "tag3"], "topics": ["topic1", "topic2", "topic3"], "domain": "primary domain or subject area" }} Only return valid JSON, no additional text:""" try: import asyncio response = await asyncio.wait_for( self.llm.simple_call(prompt, temperature=0.3), timeout=20.0 # 20 second timeout ) # Clean up response response = response.strip() if response.startswith("```json"): response = response[7:] if response.startswith("```"): response = response[3:] if response.endswith("```"): response = response[:-3] response = response.strip() import json data = json.loads(response) return { "summary": data.get("summary", ""), "tags": data.get("tags", []), "topics": data.get("topics", []), "domain": data.get("domain", "") } except asyncio.TimeoutError: raise Exception("LLM timeout") except Exception as e: raise Exception(f"LLM extraction failed: {e}") def _extract_fallback(self, content: str, title: str) -> Dict[str, Any]: """Fallback metadata extraction without LLM.""" return { "summary": self._generate_basic_summary(content), "tags": self._extract_basic_tags(content), "topics": self._extract_basic_topics(content), "domain": "" } def _generate_basic_summary(self, content: str) -> str: """Generate a basic summary from first sentences.""" sentences = re.split(r'[.!?]+', content) sentences = [s.strip() for s in sentences if s.strip()] if len(sentences) >= 3: return ' '.join(sentences[:3]) + '.' elif len(sentences) >= 1: return sentences[0] + '.' else: return content[:200] + "..." if len(content) > 200 else content def _extract_basic_tags(self, content: str) -> List[str]: """Extract basic tags using keyword frequency.""" # Common keywords that might indicate topics keywords = [ "api", "documentation", "guide", "tutorial", "reference", "manual", "policy", "procedure", "process", "workflow", "system", "application", "security", "authentication", "authorization", "data", "database", "server", "client", "network", "protocol", "framework", "library" ] content_lower = content.lower() found_tags = [] for keyword in keywords: if keyword in content_lower: found_tags.append(keyword) # Also extract capitalized words (might be proper nouns/important terms) capitalized = re.findall(r'\b[A-Z][a-z]+\b', content) # Count frequency and take top 5 from collections import Counter top_caps = [word.lower() for word, count in Counter(capitalized).most_common(5)] found_tags.extend(top_caps[:3]) # Add top 3 return list(set(found_tags))[:8] # Return up to 8 unique tags def _extract_basic_topics(self, content: str) -> List[str]: """Extract basic topics from content structure.""" topics = [] # Look for section headers (markdown style) headers = re.findall(r'^#+\s+(.+)$', content, re.MULTILINE) if headers: topics.extend([h.strip() for h in headers[:5]]) # Look for common topic indicators if any(word in content.lower() for word in ["introduction", "overview", "getting started"]): topics.append("Introduction") if any(word in content.lower() for word in ["api", "endpoint", "request", "response"]): topics.append("API") if any(word in content.lower() for word in ["example", "sample", "demo"]): topics.append("Examples") if any(word in content.lower() for word in ["error", "troubleshoot", "issue"]): topics.append("Troubleshooting") return topics[:5] if topics else ["General"] def _calculate_quality_score(self, content: str, word_count: int, summary: str) -> float: """ Calculate document quality score (0.0-1.0). Factors: - Length (not too short, not too long) - Structure (has paragraphs, sentences) - Completeness (has summary/metadata) """ score = 0.0 # Length score (optimal: 200-5000 words) if 200 <= word_count <= 5000: score += 0.3 elif 100 <= word_count < 200 or 5000 < word_count <= 10000: score += 0.2 elif word_count > 10000: score += 0.1 # Structure score (has paragraphs and sentences) paragraphs = content.split('\n\n') if len(paragraphs) >= 2: score += 0.2 sentences = re.split(r'[.!?]+', content) if len(sentences) >= 5: score += 0.2 # Completeness score (has summary) if summary and len(summary) > 20: score += 0.2 # Readability score (not too many special chars, has spaces) if ' ' in content and len(re.findall(r'[a-zA-Z]', content)) > len(content) * 0.5: score += 0.1 return min(score, 1.0)