IntegraChat / backend /api /services /metadata_extractor.py
nothingworry's picture
Add Docker support and remove Ollama
0452a50
raw
history blame
12 kB
"""
AI-Generated Knowledge Base Metadata Extraction Service
Extracts rich metadata from documents during ingestion:
- Title
- Summary
- Tags
- Topics (via LLM)
- Date detection
- Document quality score
"""
import os
import re
from typing import Dict, Any, Optional, List
from datetime import datetime
from ..services.llm_client import LLMClient
class MetadataExtractor:
"""
Extracts structured metadata from document content using LLM and pattern matching.
"""
def __init__(self, llm_client: Optional[LLMClient] = None):
self.llm = llm_client or LLMClient(
api_key=os.getenv("GROQ_API_KEY"),
model=os.getenv("GROQ_MODEL")
)
async def extract_metadata(
self,
content: str,
filename: Optional[str] = None,
url: Optional[str] = None,
source_type: Optional[str] = None
) -> Dict[str, Any]:
"""
Extract comprehensive metadata from document content.
Args:
content: Document text content
filename: Original filename (if available)
url: Source URL (if available)
source_type: Document type (pdf, docx, txt, etc.)
Returns:
Dictionary with extracted metadata:
- title: Extracted or inferred title
- summary: Brief summary (2-3 sentences)
- tags: List of relevant tags
- topics: List of main topics/themes
- detected_date: Extracted date (ISO format or None)
- quality_score: Document quality score (0.0-1.0)
- word_count: Word count
- language: Detected language (if available)
"""
# Basic metadata (always available)
word_count = len(content.split())
char_count = len(content)
# Extract title (try multiple methods)
title = self._extract_title(content, filename, url)
# Detect date
detected_date = self._detect_date(content)
# Try LLM extraction for rich metadata
llm_metadata = {}
try:
llm_metadata = await self._extract_with_llm(content, title)
except Exception as e:
print(f"LLM metadata extraction failed: {e}, using fallback")
llm_metadata = self._extract_fallback(content, title)
# Calculate quality score
quality_score = self._calculate_quality_score(
content, word_count, llm_metadata.get("summary", "")
)
return {
"title": title,
"summary": llm_metadata.get("summary", self._generate_basic_summary(content)),
"tags": llm_metadata.get("tags", self._extract_basic_tags(content)),
"topics": llm_metadata.get("topics", self._extract_basic_topics(content)),
"detected_date": detected_date,
"quality_score": quality_score,
"word_count": word_count,
"char_count": char_count,
"source_type": source_type or "unknown",
"extraction_method": "llm" if llm_metadata.get("summary") else "fallback"
}
def _extract_title(self, content: str, filename: Optional[str] = None, url: Optional[str] = None) -> str:
"""Extract title from content, filename, or URL."""
# Try filename first (remove extension)
if filename:
title = filename.rsplit('.', 1)[0] if '.' in filename else filename
if title and len(title) > 3:
return title.replace('_', ' ').replace('-', ' ').title()
# Try first line (common in markdown/docs)
lines = content.split('\n')
for line in lines[:5]:
line = line.strip()
if line and len(line) < 200 and not line.startswith('#'):
# Check if it looks like a title
if len(line.split()) <= 15:
return line
# Try markdown headers
for line in lines[:10]:
if line.startswith('# '):
return line[2:].strip()
if line.startswith('## '):
return line[3:].strip()
# Try URL path
if url:
from urllib.parse import urlparse
parsed = urlparse(url)
path = parsed.path.strip('/').split('/')[-1]
if path and len(path) > 3:
return path.replace('_', ' ').replace('-', ' ').title()
# Fallback: first 50 chars
return content[:50].strip() + "..." if len(content) > 50 else content.strip()
def _detect_date(self, content: str) -> Optional[str]:
"""Detect dates in various formats."""
# Common date patterns
patterns = [
r'\b(\d{4}-\d{2}-\d{2})\b', # YYYY-MM-DD
r'\b(\d{2}/\d{2}/\d{4})\b', # MM/DD/YYYY
r'\b(\d{4}/\d{2}/\d{2})\b', # YYYY/MM/DD
r'\b(January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}\b',
r'\b\d{1,2}\s+(January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{4}\b',
]
for pattern in patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
if matches:
try:
# Try to parse and normalize
date_str = matches[0] if isinstance(matches[0], str) else ' '.join(matches[0])
# Return first valid date found
return date_str
except:
continue
return None
async def _extract_with_llm(self, content: str, title: str) -> Dict[str, Any]:
"""Extract metadata using LLM."""
# Truncate content for LLM (first 2000 chars for efficiency)
preview = content[:2000] + "..." if len(content) > 2000 else content
prompt = f"""Analyze the following document and extract structured metadata.
Title: {title}
Content Preview:
{preview}
Extract the following information:
1. A concise summary (2-3 sentences) of what this document is about
2. 5-8 relevant tags (single words or short phrases, comma-separated)
3. 3-5 main topics/themes (comma-separated)
4. The primary subject matter or domain
Respond in JSON format:
{{
"summary": "Brief 2-3 sentence summary of the document",
"tags": ["tag1", "tag2", "tag3"],
"topics": ["topic1", "topic2", "topic3"],
"domain": "primary domain or subject area"
}}
Only return valid JSON, no additional text:"""
try:
import asyncio
response = await asyncio.wait_for(
self.llm.simple_call(prompt, temperature=0.3),
timeout=20.0 # 20 second timeout
)
# Clean up response
response = response.strip()
if response.startswith("```json"):
response = response[7:]
if response.startswith("```"):
response = response[3:]
if response.endswith("```"):
response = response[:-3]
response = response.strip()
import json
data = json.loads(response)
return {
"summary": data.get("summary", ""),
"tags": data.get("tags", []),
"topics": data.get("topics", []),
"domain": data.get("domain", "")
}
except asyncio.TimeoutError:
raise Exception("LLM timeout")
except Exception as e:
raise Exception(f"LLM extraction failed: {e}")
def _extract_fallback(self, content: str, title: str) -> Dict[str, Any]:
"""Fallback metadata extraction without LLM."""
return {
"summary": self._generate_basic_summary(content),
"tags": self._extract_basic_tags(content),
"topics": self._extract_basic_topics(content),
"domain": ""
}
def _generate_basic_summary(self, content: str) -> str:
"""Generate a basic summary from first sentences."""
sentences = re.split(r'[.!?]+', content)
sentences = [s.strip() for s in sentences if s.strip()]
if len(sentences) >= 3:
return ' '.join(sentences[:3]) + '.'
elif len(sentences) >= 1:
return sentences[0] + '.'
else:
return content[:200] + "..." if len(content) > 200 else content
def _extract_basic_tags(self, content: str) -> List[str]:
"""Extract basic tags using keyword frequency."""
# Common keywords that might indicate topics
keywords = [
"api", "documentation", "guide", "tutorial", "reference", "manual",
"policy", "procedure", "process", "workflow", "system", "application",
"security", "authentication", "authorization", "data", "database",
"server", "client", "network", "protocol", "framework", "library"
]
content_lower = content.lower()
found_tags = []
for keyword in keywords:
if keyword in content_lower:
found_tags.append(keyword)
# Also extract capitalized words (might be proper nouns/important terms)
capitalized = re.findall(r'\b[A-Z][a-z]+\b', content)
# Count frequency and take top 5
from collections import Counter
top_caps = [word.lower() for word, count in Counter(capitalized).most_common(5)]
found_tags.extend(top_caps[:3]) # Add top 3
return list(set(found_tags))[:8] # Return up to 8 unique tags
def _extract_basic_topics(self, content: str) -> List[str]:
"""Extract basic topics from content structure."""
topics = []
# Look for section headers (markdown style)
headers = re.findall(r'^#+\s+(.+)$', content, re.MULTILINE)
if headers:
topics.extend([h.strip() for h in headers[:5]])
# Look for common topic indicators
if any(word in content.lower() for word in ["introduction", "overview", "getting started"]):
topics.append("Introduction")
if any(word in content.lower() for word in ["api", "endpoint", "request", "response"]):
topics.append("API")
if any(word in content.lower() for word in ["example", "sample", "demo"]):
topics.append("Examples")
if any(word in content.lower() for word in ["error", "troubleshoot", "issue"]):
topics.append("Troubleshooting")
return topics[:5] if topics else ["General"]
def _calculate_quality_score(self, content: str, word_count: int, summary: str) -> float:
"""
Calculate document quality score (0.0-1.0).
Factors:
- Length (not too short, not too long)
- Structure (has paragraphs, sentences)
- Completeness (has summary/metadata)
"""
score = 0.0
# Length score (optimal: 200-5000 words)
if 200 <= word_count <= 5000:
score += 0.3
elif 100 <= word_count < 200 or 5000 < word_count <= 10000:
score += 0.2
elif word_count > 10000:
score += 0.1
# Structure score (has paragraphs and sentences)
paragraphs = content.split('\n\n')
if len(paragraphs) >= 2:
score += 0.2
sentences = re.split(r'[.!?]+', content)
if len(sentences) >= 5:
score += 0.2
# Completeness score (has summary)
if summary and len(summary) > 20:
score += 0.2
# Readability score (not too many special chars, has spaces)
if ' ' in content and len(re.findall(r'[a-zA-Z]', content)) > len(content) * 0.5:
score += 0.1
return min(score, 1.0)