File size: 5,817 Bytes
bb04c5f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# indexer/watcher.py

import os
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from indexer.pipeline import IndexingPipeline
import yaml


class IndexHandler(FileSystemEventHandler):
    """

    Handles filesystem events detected by watchdog.

    

    watchdog calls these methods automatically:

        - on_created(event)   β†’ new file added

        - on_modified(event)  β†’ existing file changed

        - on_deleted(event)   β†’ file removed

    """

    def __init__(self, pipeline, config_path="config.yaml"):
        """

        Args:

            pipeline (IndexingPipeline) β€” existing pipeline instance

        """

        with open(config_path) as f:
            config = yaml.safe_load(f)
            self._debounce_seconds = config["debounce_seconds"]

        self.pipeline = pipeline
        self.include_extensions = self.pipeline.crawler.include_extensions
        self._last_event = {}    # {filepath: timestamp}

    def _is_duplicate(self, filepath):
        """

        Check if we've already handled an event for this file recently.

        Returns True if we should skip this event.

        """
        now = time.time()
        last = self._last_event.get(filepath, 0)
        if now - last < self._debounce_seconds:
            return True
        self._last_event[filepath] = now
        return False

    def _is_relevant(self, filepath):
        """

        Check if a file event is for a file type we care about.



        Args:

            filepath (str) β€” path from the event



        Returns:

            bool β€” True if the file extension is in our include list

        """
        ext = os.path.splitext(filepath)[1].lower()
        return ext in self.include_extensions

    def on_created(self, event):
        """

        Called when a new file is created.



        Args:

            event β€” watchdog event

        """
        if(event.is_directory):
            return
        
        if(not self._is_relevant(event.src_path)):
            return
        
        if self._is_duplicate(event.src_path):
            return

        print(f"New file detected: {event.src_path}")
        text = self.pipeline.extractor.extract(event.src_path)
        if(not text.strip()):
            print(f"  Skipping (no text extracted)")
            return
        
        chunks = self.pipeline.chunker.chunk_file(text, event.src_path)
        chunk_texts = [c["text"] for c in chunks]
        embeddings = self.pipeline.embedder.embed_chunks(chunk_texts)
        self.pipeline.store.remove_file_chunks(event.src_path)
        self.pipeline.store.add_chunks(chunks, embeddings)

        file_hash = self.pipeline.crawler.compute_hash(event.src_path)
        self.pipeline.store.save_file_info(event.src_path, file_hash, len(chunks))
        print(f"  File stored: {event.src_path}")


    def on_modified(self, event):
        """

        Called when an existing file is modified.



        Args:

            event - watchdog event

        """
        if(event.is_directory):
            return
        
        if(not self._is_relevant(event.src_path)):
            return
        
        if self._is_duplicate(event.src_path):
            return

        print(f"File modified: {event.src_path}")

        self.pipeline.store.remove_file_chunks(event.src_path)
        text = self.pipeline.extractor.extract(event.src_path)
        if(not text.strip()):
            print(f"  Skipping (no text extracted)")
            return
        
        chunks = self.pipeline.chunker.chunk_file(text, event.src_path)
        chunk_texts = [c["text"] for c in chunks]
        embeddings = self.pipeline.embedder.embed_chunks(chunk_texts)
        self.pipeline.store.add_chunks(chunks, embeddings)

        file_hash = self.pipeline.crawler.compute_hash(event.src_path)
        self.pipeline.store.save_file_info(event.src_path, file_hash, len(chunks))
        print(f"  File saved: {event.src_path}")

    def on_deleted(self, event):
        """

        Called when a file is deleted.



        Args:

            event - watchdog event

        """
        if(event.is_directory):
            return
        
        if(not self._is_relevant(event.src_path)):
            return

        print(f"File deleted: {event.src_path}")
        self.pipeline.store.remove_file_chunks(event.src_path)


class Watcher:
    """

    Starts watchdog observers on all configured watch_paths.

    Runs continuously until the user presses Ctrl+C.

    """

    def __init__(self, config_path="config.yaml"):
        """

        Initialize the Watcher.

        """
        self.pipeline = IndexingPipeline(config_path)
        self.handler = IndexHandler(self.pipeline)
        self.watch_paths = self.pipeline.crawler.watch_paths

    def start(self):
        """

        Start watching all configured directories.

        """
        observer = Observer()
        for path in self.watch_paths:
            observer.schedule(self.handler, path, recursive=True)
        observer.start()

        print(f"Watchdog active. Watching {', '.join(self.watch_paths)}")

        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            print("Stopping watcher...")
        finally:
            observer.stop()
            observer.join()


# --- Test it ---
if __name__ == "__main__":
    # First run the full pipeline to index existing files
    print("Running initial index...")
    watcher = Watcher()
    watcher.pipeline.run()

    # Then start watching for changes
    print("\nStarting file watcher...")
    watcher.start()