Spaces:
Sleeping
Sleeping
File size: 12,009 Bytes
d1e5882 0452a50 d1e5882 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 |
"""
AI-Generated Knowledge Base Metadata Extraction Service
Extracts rich metadata from documents during ingestion:
- Title
- Summary
- Tags
- Topics (via LLM)
- Date detection
- Document quality score
"""
import os
import re
from typing import Dict, Any, Optional, List
from datetime import datetime
from ..services.llm_client import LLMClient
class MetadataExtractor:
"""
Extracts structured metadata from document content using LLM and pattern matching.
"""
def __init__(self, llm_client: Optional[LLMClient] = None):
self.llm = llm_client or LLMClient(
api_key=os.getenv("GROQ_API_KEY"),
model=os.getenv("GROQ_MODEL")
)
async def extract_metadata(
self,
content: str,
filename: Optional[str] = None,
url: Optional[str] = None,
source_type: Optional[str] = None
) -> Dict[str, Any]:
"""
Extract comprehensive metadata from document content.
Args:
content: Document text content
filename: Original filename (if available)
url: Source URL (if available)
source_type: Document type (pdf, docx, txt, etc.)
Returns:
Dictionary with extracted metadata:
- title: Extracted or inferred title
- summary: Brief summary (2-3 sentences)
- tags: List of relevant tags
- topics: List of main topics/themes
- detected_date: Extracted date (ISO format or None)
- quality_score: Document quality score (0.0-1.0)
- word_count: Word count
- language: Detected language (if available)
"""
# Basic metadata (always available)
word_count = len(content.split())
char_count = len(content)
# Extract title (try multiple methods)
title = self._extract_title(content, filename, url)
# Detect date
detected_date = self._detect_date(content)
# Try LLM extraction for rich metadata
llm_metadata = {}
try:
llm_metadata = await self._extract_with_llm(content, title)
except Exception as e:
print(f"LLM metadata extraction failed: {e}, using fallback")
llm_metadata = self._extract_fallback(content, title)
# Calculate quality score
quality_score = self._calculate_quality_score(
content, word_count, llm_metadata.get("summary", "")
)
return {
"title": title,
"summary": llm_metadata.get("summary", self._generate_basic_summary(content)),
"tags": llm_metadata.get("tags", self._extract_basic_tags(content)),
"topics": llm_metadata.get("topics", self._extract_basic_topics(content)),
"detected_date": detected_date,
"quality_score": quality_score,
"word_count": word_count,
"char_count": char_count,
"source_type": source_type or "unknown",
"extraction_method": "llm" if llm_metadata.get("summary") else "fallback"
}
def _extract_title(self, content: str, filename: Optional[str] = None, url: Optional[str] = None) -> str:
"""Extract title from content, filename, or URL."""
# Try filename first (remove extension)
if filename:
title = filename.rsplit('.', 1)[0] if '.' in filename else filename
if title and len(title) > 3:
return title.replace('_', ' ').replace('-', ' ').title()
# Try first line (common in markdown/docs)
lines = content.split('\n')
for line in lines[:5]:
line = line.strip()
if line and len(line) < 200 and not line.startswith('#'):
# Check if it looks like a title
if len(line.split()) <= 15:
return line
# Try markdown headers
for line in lines[:10]:
if line.startswith('# '):
return line[2:].strip()
if line.startswith('## '):
return line[3:].strip()
# Try URL path
if url:
from urllib.parse import urlparse
parsed = urlparse(url)
path = parsed.path.strip('/').split('/')[-1]
if path and len(path) > 3:
return path.replace('_', ' ').replace('-', ' ').title()
# Fallback: first 50 chars
return content[:50].strip() + "..." if len(content) > 50 else content.strip()
def _detect_date(self, content: str) -> Optional[str]:
"""Detect dates in various formats."""
# Common date patterns
patterns = [
r'\b(\d{4}-\d{2}-\d{2})\b', # YYYY-MM-DD
r'\b(\d{2}/\d{2}/\d{4})\b', # MM/DD/YYYY
r'\b(\d{4}/\d{2}/\d{2})\b', # YYYY/MM/DD
r'\b(January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}\b',
r'\b\d{1,2}\s+(January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{4}\b',
]
for pattern in patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
if matches:
try:
# Try to parse and normalize
date_str = matches[0] if isinstance(matches[0], str) else ' '.join(matches[0])
# Return first valid date found
return date_str
except:
continue
return None
async def _extract_with_llm(self, content: str, title: str) -> Dict[str, Any]:
"""Extract metadata using LLM."""
# Truncate content for LLM (first 2000 chars for efficiency)
preview = content[:2000] + "..." if len(content) > 2000 else content
prompt = f"""Analyze the following document and extract structured metadata.
Title: {title}
Content Preview:
{preview}
Extract the following information:
1. A concise summary (2-3 sentences) of what this document is about
2. 5-8 relevant tags (single words or short phrases, comma-separated)
3. 3-5 main topics/themes (comma-separated)
4. The primary subject matter or domain
Respond in JSON format:
{{
"summary": "Brief 2-3 sentence summary of the document",
"tags": ["tag1", "tag2", "tag3"],
"topics": ["topic1", "topic2", "topic3"],
"domain": "primary domain or subject area"
}}
Only return valid JSON, no additional text:"""
try:
import asyncio
response = await asyncio.wait_for(
self.llm.simple_call(prompt, temperature=0.3),
timeout=20.0 # 20 second timeout
)
# Clean up response
response = response.strip()
if response.startswith("```json"):
response = response[7:]
if response.startswith("```"):
response = response[3:]
if response.endswith("```"):
response = response[:-3]
response = response.strip()
import json
data = json.loads(response)
return {
"summary": data.get("summary", ""),
"tags": data.get("tags", []),
"topics": data.get("topics", []),
"domain": data.get("domain", "")
}
except asyncio.TimeoutError:
raise Exception("LLM timeout")
except Exception as e:
raise Exception(f"LLM extraction failed: {e}")
def _extract_fallback(self, content: str, title: str) -> Dict[str, Any]:
"""Fallback metadata extraction without LLM."""
return {
"summary": self._generate_basic_summary(content),
"tags": self._extract_basic_tags(content),
"topics": self._extract_basic_topics(content),
"domain": ""
}
def _generate_basic_summary(self, content: str) -> str:
"""Generate a basic summary from first sentences."""
sentences = re.split(r'[.!?]+', content)
sentences = [s.strip() for s in sentences if s.strip()]
if len(sentences) >= 3:
return ' '.join(sentences[:3]) + '.'
elif len(sentences) >= 1:
return sentences[0] + '.'
else:
return content[:200] + "..." if len(content) > 200 else content
def _extract_basic_tags(self, content: str) -> List[str]:
"""Extract basic tags using keyword frequency."""
# Common keywords that might indicate topics
keywords = [
"api", "documentation", "guide", "tutorial", "reference", "manual",
"policy", "procedure", "process", "workflow", "system", "application",
"security", "authentication", "authorization", "data", "database",
"server", "client", "network", "protocol", "framework", "library"
]
content_lower = content.lower()
found_tags = []
for keyword in keywords:
if keyword in content_lower:
found_tags.append(keyword)
# Also extract capitalized words (might be proper nouns/important terms)
capitalized = re.findall(r'\b[A-Z][a-z]+\b', content)
# Count frequency and take top 5
from collections import Counter
top_caps = [word.lower() for word, count in Counter(capitalized).most_common(5)]
found_tags.extend(top_caps[:3]) # Add top 3
return list(set(found_tags))[:8] # Return up to 8 unique tags
def _extract_basic_topics(self, content: str) -> List[str]:
"""Extract basic topics from content structure."""
topics = []
# Look for section headers (markdown style)
headers = re.findall(r'^#+\s+(.+)$', content, re.MULTILINE)
if headers:
topics.extend([h.strip() for h in headers[:5]])
# Look for common topic indicators
if any(word in content.lower() for word in ["introduction", "overview", "getting started"]):
topics.append("Introduction")
if any(word in content.lower() for word in ["api", "endpoint", "request", "response"]):
topics.append("API")
if any(word in content.lower() for word in ["example", "sample", "demo"]):
topics.append("Examples")
if any(word in content.lower() for word in ["error", "troubleshoot", "issue"]):
topics.append("Troubleshooting")
return topics[:5] if topics else ["General"]
def _calculate_quality_score(self, content: str, word_count: int, summary: str) -> float:
"""
Calculate document quality score (0.0-1.0).
Factors:
- Length (not too short, not too long)
- Structure (has paragraphs, sentences)
- Completeness (has summary/metadata)
"""
score = 0.0
# Length score (optimal: 200-5000 words)
if 200 <= word_count <= 5000:
score += 0.3
elif 100 <= word_count < 200 or 5000 < word_count <= 10000:
score += 0.2
elif word_count > 10000:
score += 0.1
# Structure score (has paragraphs and sentences)
paragraphs = content.split('\n\n')
if len(paragraphs) >= 2:
score += 0.2
sentences = re.split(r'[.!?]+', content)
if len(sentences) >= 5:
score += 0.2
# Completeness score (has summary)
if summary and len(summary) > 20:
score += 0.2
# Readability score (not too many special chars, has spaces)
if ' ' in content and len(re.findall(r'[a-zA-Z]', content)) > len(content) * 0.5:
score += 0.1
return min(score, 1.0)
|