Spaces:
Sleeping
Sleeping
File size: 5,817 Bytes
bb04c5f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 | # indexer/watcher.py
import os
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from indexer.pipeline import IndexingPipeline
import yaml
class IndexHandler(FileSystemEventHandler):
"""
Handles filesystem events detected by watchdog.
watchdog calls these methods automatically:
- on_created(event) β new file added
- on_modified(event) β existing file changed
- on_deleted(event) β file removed
"""
def __init__(self, pipeline, config_path="config.yaml"):
"""
Args:
pipeline (IndexingPipeline) β existing pipeline instance
"""
with open(config_path) as f:
config = yaml.safe_load(f)
self._debounce_seconds = config["debounce_seconds"]
self.pipeline = pipeline
self.include_extensions = self.pipeline.crawler.include_extensions
self._last_event = {} # {filepath: timestamp}
def _is_duplicate(self, filepath):
"""
Check if we've already handled an event for this file recently.
Returns True if we should skip this event.
"""
now = time.time()
last = self._last_event.get(filepath, 0)
if now - last < self._debounce_seconds:
return True
self._last_event[filepath] = now
return False
def _is_relevant(self, filepath):
"""
Check if a file event is for a file type we care about.
Args:
filepath (str) β path from the event
Returns:
bool β True if the file extension is in our include list
"""
ext = os.path.splitext(filepath)[1].lower()
return ext in self.include_extensions
def on_created(self, event):
"""
Called when a new file is created.
Args:
event β watchdog event
"""
if(event.is_directory):
return
if(not self._is_relevant(event.src_path)):
return
if self._is_duplicate(event.src_path):
return
print(f"New file detected: {event.src_path}")
text = self.pipeline.extractor.extract(event.src_path)
if(not text.strip()):
print(f" Skipping (no text extracted)")
return
chunks = self.pipeline.chunker.chunk_file(text, event.src_path)
chunk_texts = [c["text"] for c in chunks]
embeddings = self.pipeline.embedder.embed_chunks(chunk_texts)
self.pipeline.store.remove_file_chunks(event.src_path)
self.pipeline.store.add_chunks(chunks, embeddings)
file_hash = self.pipeline.crawler.compute_hash(event.src_path)
self.pipeline.store.save_file_info(event.src_path, file_hash, len(chunks))
print(f" File stored: {event.src_path}")
def on_modified(self, event):
"""
Called when an existing file is modified.
Args:
event - watchdog event
"""
if(event.is_directory):
return
if(not self._is_relevant(event.src_path)):
return
if self._is_duplicate(event.src_path):
return
print(f"File modified: {event.src_path}")
self.pipeline.store.remove_file_chunks(event.src_path)
text = self.pipeline.extractor.extract(event.src_path)
if(not text.strip()):
print(f" Skipping (no text extracted)")
return
chunks = self.pipeline.chunker.chunk_file(text, event.src_path)
chunk_texts = [c["text"] for c in chunks]
embeddings = self.pipeline.embedder.embed_chunks(chunk_texts)
self.pipeline.store.add_chunks(chunks, embeddings)
file_hash = self.pipeline.crawler.compute_hash(event.src_path)
self.pipeline.store.save_file_info(event.src_path, file_hash, len(chunks))
print(f" File saved: {event.src_path}")
def on_deleted(self, event):
"""
Called when a file is deleted.
Args:
event - watchdog event
"""
if(event.is_directory):
return
if(not self._is_relevant(event.src_path)):
return
print(f"File deleted: {event.src_path}")
self.pipeline.store.remove_file_chunks(event.src_path)
class Watcher:
"""
Starts watchdog observers on all configured watch_paths.
Runs continuously until the user presses Ctrl+C.
"""
def __init__(self, config_path="config.yaml"):
"""
Initialize the Watcher.
"""
self.pipeline = IndexingPipeline(config_path)
self.handler = IndexHandler(self.pipeline)
self.watch_paths = self.pipeline.crawler.watch_paths
def start(self):
"""
Start watching all configured directories.
"""
observer = Observer()
for path in self.watch_paths:
observer.schedule(self.handler, path, recursive=True)
observer.start()
print(f"Watchdog active. Watching {', '.join(self.watch_paths)}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping watcher...")
finally:
observer.stop()
observer.join()
# --- Test it ---
if __name__ == "__main__":
# First run the full pipeline to index existing files
print("Running initial index...")
watcher = Watcher()
watcher.pipeline.run()
# Then start watching for changes
print("\nStarting file watcher...")
watcher.start() |