Spaces:
Sleeping
Sleeping
| # indexer/watcher.py | |
| import os | |
| import time | |
| from watchdog.observers import Observer | |
| from watchdog.events import FileSystemEventHandler | |
| from indexer.pipeline import IndexingPipeline | |
| import yaml | |
| class IndexHandler(FileSystemEventHandler): | |
| """ | |
| Handles filesystem events detected by watchdog. | |
| watchdog calls these methods automatically: | |
| - on_created(event) → new file added | |
| - on_modified(event) → existing file changed | |
| - on_deleted(event) → file removed | |
| """ | |
| def __init__(self, pipeline, config_path="config.yaml"): | |
| """ | |
| Args: | |
| pipeline (IndexingPipeline) — existing pipeline instance | |
| """ | |
| with open(config_path) as f: | |
| config = yaml.safe_load(f) | |
| self._debounce_seconds = config["debounce_seconds"] | |
| self.pipeline = pipeline | |
| self.include_extensions = self.pipeline.crawler.include_extensions | |
| self._last_event = {} # {filepath: timestamp} | |
| def _is_duplicate(self, filepath): | |
| """ | |
| Check if we've already handled an event for this file recently. | |
| Returns True if we should skip this event. | |
| """ | |
| now = time.time() | |
| last = self._last_event.get(filepath, 0) | |
| if now - last < self._debounce_seconds: | |
| return True | |
| self._last_event[filepath] = now | |
| return False | |
| def _is_relevant(self, filepath): | |
| """ | |
| Check if a file event is for a file type we care about. | |
| Args: | |
| filepath (str) — path from the event | |
| Returns: | |
| bool — True if the file extension is in our include list | |
| """ | |
| ext = os.path.splitext(filepath)[1].lower() | |
| return ext in self.include_extensions | |
| def on_created(self, event): | |
| """ | |
| Called when a new file is created. | |
| Args: | |
| event — watchdog event | |
| """ | |
| if(event.is_directory): | |
| return | |
| if(not self._is_relevant(event.src_path)): | |
| return | |
| if self._is_duplicate(event.src_path): | |
| return | |
| print(f"New file detected: {event.src_path}") | |
| text = self.pipeline.extractor.extract(event.src_path) | |
| if(not text.strip()): | |
| print(f" Skipping (no text extracted)") | |
| return | |
| chunks = self.pipeline.chunker.chunk_file(text, event.src_path) | |
| chunk_texts = [c["text"] for c in chunks] | |
| embeddings = self.pipeline.embedder.embed_chunks(chunk_texts) | |
| self.pipeline.store.remove_file_chunks(event.src_path) | |
| self.pipeline.store.add_chunks(chunks, embeddings) | |
| file_hash = self.pipeline.crawler.compute_hash(event.src_path) | |
| self.pipeline.store.save_file_info(event.src_path, file_hash, len(chunks)) | |
| print(f" File stored: {event.src_path}") | |
| def on_modified(self, event): | |
| """ | |
| Called when an existing file is modified. | |
| Args: | |
| event - watchdog event | |
| """ | |
| if(event.is_directory): | |
| return | |
| if(not self._is_relevant(event.src_path)): | |
| return | |
| if self._is_duplicate(event.src_path): | |
| return | |
| print(f"File modified: {event.src_path}") | |
| self.pipeline.store.remove_file_chunks(event.src_path) | |
| text = self.pipeline.extractor.extract(event.src_path) | |
| if(not text.strip()): | |
| print(f" Skipping (no text extracted)") | |
| return | |
| chunks = self.pipeline.chunker.chunk_file(text, event.src_path) | |
| chunk_texts = [c["text"] for c in chunks] | |
| embeddings = self.pipeline.embedder.embed_chunks(chunk_texts) | |
| self.pipeline.store.add_chunks(chunks, embeddings) | |
| file_hash = self.pipeline.crawler.compute_hash(event.src_path) | |
| self.pipeline.store.save_file_info(event.src_path, file_hash, len(chunks)) | |
| print(f" File saved: {event.src_path}") | |
| def on_deleted(self, event): | |
| """ | |
| Called when a file is deleted. | |
| Args: | |
| event - watchdog event | |
| """ | |
| if(event.is_directory): | |
| return | |
| if(not self._is_relevant(event.src_path)): | |
| return | |
| print(f"File deleted: {event.src_path}") | |
| self.pipeline.store.remove_file_chunks(event.src_path) | |
| class Watcher: | |
| """ | |
| Starts watchdog observers on all configured watch_paths. | |
| Runs continuously until the user presses Ctrl+C. | |
| """ | |
| def __init__(self, config_path="config.yaml"): | |
| """ | |
| Initialize the Watcher. | |
| """ | |
| self.pipeline = IndexingPipeline(config_path) | |
| self.handler = IndexHandler(self.pipeline) | |
| self.watch_paths = self.pipeline.crawler.watch_paths | |
| def start(self): | |
| """ | |
| Start watching all configured directories. | |
| """ | |
| observer = Observer() | |
| for path in self.watch_paths: | |
| observer.schedule(self.handler, path, recursive=True) | |
| observer.start() | |
| print(f"Watchdog active. Watching {', '.join(self.watch_paths)}") | |
| try: | |
| while True: | |
| time.sleep(1) | |
| except KeyboardInterrupt: | |
| print("Stopping watcher...") | |
| finally: | |
| observer.stop() | |
| observer.join() | |
| # --- Test it --- | |
| if __name__ == "__main__": | |
| # First run the full pipeline to index existing files | |
| print("Running initial index...") | |
| watcher = Watcher() | |
| watcher.pipeline.run() | |
| # Then start watching for changes | |
| print("\nStarting file watcher...") | |
| watcher.start() |