Semantic_File / indexer /watcher.py
JackSparrow89's picture
Upload 65 files
bb04c5f verified
# indexer/watcher.py
import os
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from indexer.pipeline import IndexingPipeline
import yaml
class IndexHandler(FileSystemEventHandler):
"""
Handles filesystem events detected by watchdog.
watchdog calls these methods automatically:
- on_created(event) → new file added
- on_modified(event) → existing file changed
- on_deleted(event) → file removed
"""
def __init__(self, pipeline, config_path="config.yaml"):
"""
Args:
pipeline (IndexingPipeline) — existing pipeline instance
"""
with open(config_path) as f:
config = yaml.safe_load(f)
self._debounce_seconds = config["debounce_seconds"]
self.pipeline = pipeline
self.include_extensions = self.pipeline.crawler.include_extensions
self._last_event = {} # {filepath: timestamp}
def _is_duplicate(self, filepath):
"""
Check if we've already handled an event for this file recently.
Returns True if we should skip this event.
"""
now = time.time()
last = self._last_event.get(filepath, 0)
if now - last < self._debounce_seconds:
return True
self._last_event[filepath] = now
return False
def _is_relevant(self, filepath):
"""
Check if a file event is for a file type we care about.
Args:
filepath (str) — path from the event
Returns:
bool — True if the file extension is in our include list
"""
ext = os.path.splitext(filepath)[1].lower()
return ext in self.include_extensions
def on_created(self, event):
"""
Called when a new file is created.
Args:
event — watchdog event
"""
if(event.is_directory):
return
if(not self._is_relevant(event.src_path)):
return
if self._is_duplicate(event.src_path):
return
print(f"New file detected: {event.src_path}")
text = self.pipeline.extractor.extract(event.src_path)
if(not text.strip()):
print(f" Skipping (no text extracted)")
return
chunks = self.pipeline.chunker.chunk_file(text, event.src_path)
chunk_texts = [c["text"] for c in chunks]
embeddings = self.pipeline.embedder.embed_chunks(chunk_texts)
self.pipeline.store.remove_file_chunks(event.src_path)
self.pipeline.store.add_chunks(chunks, embeddings)
file_hash = self.pipeline.crawler.compute_hash(event.src_path)
self.pipeline.store.save_file_info(event.src_path, file_hash, len(chunks))
print(f" File stored: {event.src_path}")
def on_modified(self, event):
"""
Called when an existing file is modified.
Args:
event - watchdog event
"""
if(event.is_directory):
return
if(not self._is_relevant(event.src_path)):
return
if self._is_duplicate(event.src_path):
return
print(f"File modified: {event.src_path}")
self.pipeline.store.remove_file_chunks(event.src_path)
text = self.pipeline.extractor.extract(event.src_path)
if(not text.strip()):
print(f" Skipping (no text extracted)")
return
chunks = self.pipeline.chunker.chunk_file(text, event.src_path)
chunk_texts = [c["text"] for c in chunks]
embeddings = self.pipeline.embedder.embed_chunks(chunk_texts)
self.pipeline.store.add_chunks(chunks, embeddings)
file_hash = self.pipeline.crawler.compute_hash(event.src_path)
self.pipeline.store.save_file_info(event.src_path, file_hash, len(chunks))
print(f" File saved: {event.src_path}")
def on_deleted(self, event):
"""
Called when a file is deleted.
Args:
event - watchdog event
"""
if(event.is_directory):
return
if(not self._is_relevant(event.src_path)):
return
print(f"File deleted: {event.src_path}")
self.pipeline.store.remove_file_chunks(event.src_path)
class Watcher:
"""
Starts watchdog observers on all configured watch_paths.
Runs continuously until the user presses Ctrl+C.
"""
def __init__(self, config_path="config.yaml"):
"""
Initialize the Watcher.
"""
self.pipeline = IndexingPipeline(config_path)
self.handler = IndexHandler(self.pipeline)
self.watch_paths = self.pipeline.crawler.watch_paths
def start(self):
"""
Start watching all configured directories.
"""
observer = Observer()
for path in self.watch_paths:
observer.schedule(self.handler, path, recursive=True)
observer.start()
print(f"Watchdog active. Watching {', '.join(self.watch_paths)}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping watcher...")
finally:
observer.stop()
observer.join()
# --- Test it ---
if __name__ == "__main__":
# First run the full pipeline to index existing files
print("Running initial index...")
watcher = Watcher()
watcher.pipeline.run()
# Then start watching for changes
print("\nStarting file watcher...")
watcher.start()