Spaces:
Build error
Build error
| import os | |
| import logging | |
| import asyncio | |
| import hashlib | |
| from typing import List, Dict, Any, Optional | |
| from pathlib import Path | |
| from datetime import datetime | |
| import time | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from llama_index.core import Document, VectorStoreIndex | |
| from llama_index.core.node_parser import SimpleNodeParser | |
| from llama_index.readers.file import PDFReader, DocxReader, UnstructuredReader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| # Import database for vector store access | |
| from src.database import get_vector_store, get_embedding_model | |
| logger = logging.getLogger(__name__) | |
| class DocumentProcessor: | |
| """Handles document processing and embedding generation.""" | |
| def __init__(self): | |
| self.vector_store = get_vector_store() | |
| self.embedding_model = get_embedding_model() | |
| self.text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1000, | |
| chunk_overlap=200, | |
| length_function=len, | |
| ) | |
| self.processed_hashes = set() # Track processed documents | |
| def _compute_hash(self, content: str) -> str: | |
| """Compute SHA256 hash of content to detect duplicates.""" | |
| return hashlib.sha256(content.encode()).hexdigest() | |
| def process_file(self, file_path: Path) -> bool: | |
| """Process a single file and add to vector store.""" | |
| try: | |
| logger.info(f"Processing file: {file_path}") | |
| # Read file content based on type | |
| content = self._read_file(file_path) | |
| if not content: | |
| return False | |
| # Check if already processed | |
| content_hash = self._compute_hash(content) | |
| if content_hash in self.processed_hashes: | |
| logger.info(f"File already processed: {file_path}") | |
| return False | |
| # Create document | |
| metadata = { | |
| "source": str(file_path), | |
| "file_name": file_path.name, | |
| "file_type": file_path.suffix, | |
| "ingestion_time": datetime.now().isoformat(), | |
| "content_hash": content_hash | |
| } | |
| # Split into chunks | |
| chunks = self.text_splitter.split_text(content) | |
| # Create LlamaIndex documents | |
| documents = [] | |
| for i, chunk in enumerate(chunks): | |
| doc = Document( | |
| text=chunk, | |
| metadata={ | |
| **metadata, | |
| "chunk_index": i, | |
| "total_chunks": len(chunks) | |
| } | |
| ) | |
| documents.append(doc) | |
| # Add to vector store | |
| index = VectorStoreIndex.from_documents( | |
| documents, | |
| vector_store=self.vector_store, | |
| embed_model=self.embedding_model, | |
| show_progress=True | |
| ) | |
| # Mark as processed | |
| self.processed_hashes.add(content_hash) | |
| logger.info(f"Successfully ingested {len(documents)} chunks from {file_path}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error processing file {file_path}: {e}") | |
| return False | |
| def _read_file(self, file_path: Path) -> Optional[str]: | |
| """Read content from various file types.""" | |
| try: | |
| suffix = file_path.suffix.lower() | |
| if suffix == '.pdf': | |
| reader = PDFReader() | |
| docs = reader.load_data(file_path) | |
| return "\n".join([doc.text for doc in docs]) | |
| elif suffix in ['.docx', '.doc']: | |
| reader = DocxReader() | |
| docs = reader.load_data(file_path) | |
| return "\n".join([doc.text for doc in docs]) | |
| elif suffix in ['.txt', '.md', '.py', '.js', '.json', '.yaml', '.yml']: | |
| return file_path.read_text(encoding='utf-8') | |
| else: | |
| # Try unstructured reader for other formats | |
| reader = UnstructuredReader() | |
| docs = reader.load_data(file_path) | |
| return "\n".join([doc.text for doc in docs]) | |
| except Exception as e: | |
| logger.error(f"Failed to read file {file_path}: {e}") | |
| return None | |
| def process_url(self, url: str) -> bool: | |
| """Process content from a URL.""" | |
| try: | |
| logger.info(f"Processing URL: {url}") | |
| # Fetch content | |
| response = requests.get(url, timeout=30) | |
| response.raise_for_status() | |
| # Parse HTML | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Extract text content | |
| # Remove script and style elements | |
| for script in soup(["script", "style"]): | |
| script.decompose() | |
| text = soup.get_text() | |
| lines = (line.strip() for line in text.splitlines()) | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| content = ' '.join(chunk for chunk in chunks if chunk) | |
| if not content: | |
| return False | |
| # Check if already processed | |
| content_hash = self._compute_hash(content) | |
| if content_hash in self.processed_hashes: | |
| logger.info(f"URL already processed: {url}") | |
| return False | |
| # Create document | |
| metadata = { | |
| "source": url, | |
| "source_type": "web", | |
| "title": soup.title.string if soup.title else "Untitled", | |
| "ingestion_time": datetime.now().isoformat(), | |
| "content_hash": content_hash | |
| } | |
| # Split and process similar to files | |
| chunks = self.text_splitter.split_text(content) | |
| documents = [] | |
| for i, chunk in enumerate(chunks): | |
| doc = Document( | |
| text=chunk, | |
| metadata={ | |
| **metadata, | |
| "chunk_index": i, | |
| "total_chunks": len(chunks) | |
| } | |
| ) | |
| documents.append(doc) | |
| # Add to vector store | |
| index = VectorStoreIndex.from_documents( | |
| documents, | |
| vector_store=self.vector_store, | |
| embed_model=self.embedding_model | |
| ) | |
| self.processed_hashes.add(content_hash) | |
| logger.info(f"Successfully ingested {len(documents)} chunks from {url}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error processing URL {url}: {e}") | |
| return False | |
| class KnowledgeLifecycleManager: | |
| """Manages knowledge base lifecycle including reindexing and cache invalidation""" | |
| def __init__(self, db_client=None, cache=None): | |
| self.db_client = db_client | |
| self.cache = cache | |
| self.last_reindex = datetime.now() | |
| self.reindex_threshold = 10 # Reindex after N new documents | |
| async def update_knowledge_lifecycle(self, doc_id: str, doc_metadata: Dict[str, Any]): | |
| """Update knowledge lifecycle tracking""" | |
| if not self.db_client: | |
| return | |
| try: | |
| # Update knowledge_lifecycle table | |
| await self.db_client.table("knowledge_lifecycle").upsert({ | |
| "doc_id": doc_id, | |
| "last_updated": datetime.now().isoformat(), | |
| "metadata": doc_metadata, | |
| "status": "active" | |
| }).execute() | |
| logger.info(f"Updated knowledge lifecycle for document {doc_id}") | |
| except Exception as e: | |
| logger.error(f"Failed to update knowledge lifecycle: {e}") | |
| async def trigger_reindex(self, force: bool = False): | |
| """Trigger vector store reindexing""" | |
| try: | |
| # Check if reindex is needed | |
| if not force: | |
| doc_count = await self._get_recent_document_count() | |
| if doc_count < self.reindex_threshold: | |
| logger.debug(f"Reindex not needed yet ({doc_count} recent docs)") | |
| return | |
| logger.info("Triggering vector store reindex...") | |
| # This would trigger the actual reindexing | |
| # In a real implementation, this would rebuild the vector index | |
| await self._perform_reindex() | |
| self.last_reindex = datetime.now() | |
| logger.info("Vector store reindex completed") | |
| except Exception as e: | |
| logger.error(f"Failed to trigger reindex: {e}") | |
| async def invalidate_cache(self, pattern: str = "knowledge_base:*"): | |
| """Invalidate knowledge base cache""" | |
| if not self.cache: | |
| return | |
| try: | |
| self.cache.invalidate(pattern) | |
| logger.info(f"Invalidated cache pattern: {pattern}") | |
| except Exception as e: | |
| logger.error(f"Failed to invalidate cache: {e}") | |
| async def _get_recent_document_count(self) -> int: | |
| """Get count of recently added documents""" | |
| if not self.db_client: | |
| return 0 | |
| try: | |
| # Query recent documents from knowledge_lifecycle table | |
| result = await self.db_client.table("knowledge_lifecycle").select( | |
| "doc_id" | |
| ).gte("last_updated", self.last_reindex.isoformat()).execute() | |
| return len(result.data) if result.data else 0 | |
| except Exception as e: | |
| logger.error(f"Failed to get recent document count: {e}") | |
| return 0 | |
| async def _perform_reindex(self): | |
| """Perform the actual reindexing operation""" | |
| # This is a placeholder for the actual reindexing logic | |
| # In a real implementation, this would: | |
| # 1. Rebuild the vector index | |
| # 2. Update metadata | |
| # 3. Optimize storage | |
| logger.info("Performing vector store reindex...") | |
| await asyncio.sleep(1) # Simulate reindexing time | |
| class KnowledgeIngestionService: | |
| """Enhanced knowledge ingestion service with lifecycle management.""" | |
| def __init__(self, watch_directories: List[str] = None, poll_urls: List[str] = None, | |
| db_client=None, cache=None): | |
| self.processor = DocumentProcessor() | |
| self.lifecycle_manager = KnowledgeLifecycleManager(db_client, cache) | |
| self.watch_directories = watch_directories or [] | |
| self.poll_urls = poll_urls or [] | |
| self.running = False | |
| self.db_client = db_client | |
| self.cache = cache | |
| async def ingest_document(self, doc_path: str) -> str: | |
| """Enhanced document ingestion with lifecycle management""" | |
| try: | |
| file_path = Path(doc_path) | |
| if not file_path.exists(): | |
| raise FileNotFoundError(f"Document not found: {doc_path}") | |
| # Process document | |
| success = self.processor.process_file(file_path) | |
| if not success: | |
| raise Exception(f"Failed to process document: {doc_path}") | |
| # Generate document ID | |
| doc_id = f"doc_{int(time.time())}_{hashlib.md5(str(file_path).encode()).hexdigest()[:8]}" | |
| # Update lifecycle | |
| doc_metadata = { | |
| "source": str(file_path), | |
| "file_name": file_path.name, | |
| "file_type": file_path.suffix, | |
| "ingestion_time": datetime.now().isoformat() | |
| } | |
| await self.lifecycle_manager.update_knowledge_lifecycle(doc_id, doc_metadata) | |
| # Trigger reindex if needed | |
| await self.lifecycle_manager.trigger_reindex() | |
| # Invalidate cache | |
| await self.lifecycle_manager.invalidate_cache() | |
| logger.info(f"Successfully ingested document {doc_id} from {doc_path}") | |
| return doc_id | |
| except Exception as e: | |
| logger.error(f"Failed to ingest document {doc_path}: {e}") | |
| raise | |
| async def ingest_url(self, url: str) -> str: | |
| """Enhanced URL ingestion with lifecycle management""" | |
| try: | |
| # Process URL | |
| success = self.processor.process_url(url) | |
| if not success: | |
| raise Exception(f"Failed to process URL: {url}") | |
| # Generate document ID | |
| doc_id = f"url_{int(time.time())}_{hashlib.md5(url.encode()).hexdigest()[:8]}" | |
| # Update lifecycle | |
| doc_metadata = { | |
| "source": url, | |
| "source_type": "web", | |
| "ingestion_time": datetime.now().isoformat() | |
| } | |
| await self.lifecycle_manager.update_knowledge_lifecycle(doc_id, doc_metadata) | |
| # Trigger reindex if needed | |
| await self.lifecycle_manager.trigger_reindex() | |
| # Invalidate cache | |
| await self.lifecycle_manager.invalidate_cache() | |
| logger.info(f"Successfully ingested URL {doc_id} from {url}") | |
| return doc_id | |
| except Exception as e: | |
| logger.error(f"Failed to ingest URL {url}: {e}") | |
| raise | |
| def start(self): | |
| """Start the ingestion service""" | |
| if self.running: | |
| logger.warning("Ingestion service already running") | |
| return | |
| self.running = True | |
| logger.info("Starting knowledge ingestion service...") | |
| # Process existing directories | |
| for directory in self.watch_directories: | |
| self._process_directory(directory) | |
| # Start URL polling in background | |
| if self.poll_urls: | |
| asyncio.create_task(self._poll_urls()) | |
| def stop(self): | |
| """Stop the ingestion service""" | |
| self.running = False | |
| logger.info("Stopped knowledge ingestion service") | |
| def _process_directory(self, directory: str): | |
| """Process all files in a directory""" | |
| try: | |
| dir_path = Path(directory) | |
| if not dir_path.exists(): | |
| logger.warning(f"Directory not found: {directory}") | |
| return | |
| for file_path in dir_path.rglob("*"): | |
| if file_path.is_file(): | |
| self.processor.process_file(file_path) | |
| except Exception as e: | |
| logger.error(f"Error processing directory {directory}: {e}") | |
| async def _poll_urls(self): | |
| """Poll URLs for updates""" | |
| while self.running: | |
| try: | |
| for url in self.poll_urls: | |
| self.processor.process_url(url) | |
| # Wait before next poll | |
| await asyncio.sleep(3600) # Poll every hour | |
| except Exception as e: | |
| logger.error(f"Error polling URLs: {e}") | |
| await asyncio.sleep(60) # Wait before retry | |
| def add_watch_directory(self, directory: str): | |
| """Add a directory to watch for new documents""" | |
| self.watch_directories.append(directory) | |
| logger.info(f"Added watch directory: {directory}") | |
| def add_poll_url(self, url: str): | |
| """Add a URL to poll for updates""" | |
| self.poll_urls.append(url) | |
| logger.info(f"Added poll URL: {url}") | |
| def run_ingestion_service(config: Dict[str, Any]): | |
| """Run the knowledge ingestion service with configuration""" | |
| try: | |
| # Extract configuration | |
| watch_dirs = config.get("watch_directories", []) | |
| poll_urls = config.get("poll_urls", []) | |
| db_client = config.get("db_client") | |
| cache = config.get("cache") | |
| # Create service | |
| service = KnowledgeIngestionService( | |
| watch_directories=watch_dirs, | |
| poll_urls=poll_urls, | |
| db_client=db_client, | |
| cache=cache | |
| ) | |
| # Start service | |
| service.start() | |
| return service | |
| except Exception as e: | |
| logger.error(f"Failed to run ingestion service: {e}") | |
| raise | |
| if __name__ == "__main__": | |
| # Example configuration | |
| config = { | |
| "watch_directories": [ | |
| "./documents", | |
| "./knowledge_base", | |
| os.path.expanduser("~/Documents/AI_Agent_Knowledge") | |
| ], | |
| "poll_urls": [ | |
| # Add URLs to periodically check for updates | |
| ] | |
| } | |
| logging.basicConfig(level=logging.INFO) | |
| run_ingestion_service(config) |