Spaces:
Paused
Paused
File size: 2,467 Bytes
5da3cbc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
import time
import os
import shutil
import logging
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# Configuration
WATCH_DIR = os.getenv("KNOWLEDGE_WATCH_DIR", "/knowledge_data")
# Agent Zero internal knowledge directory
TARGET_DIR = "/a0/knowledge/default" # 'default' knowledge base
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
logger = logging.getLogger(__name__)
class KnowledgeHandler(FileSystemEventHandler):
def on_created(self, event):
if event.is_directory:
return
filename = os.path.basename(event.src_path)
# unexpected files filter
if filename.startswith(".") or filename.endswith(".tmp"):
return
logger.info(f"New file detected: {event.src_path}")
# We wait a bit to ensure file write is complete
time.sleep(1)
try:
# Copy to Agent Zero knowledge directory
# This triggers Agent Zero's internal mechanisms if it has any auto-ingest,
# OR we simply place it there so it's available for the 'knowledge' tool.
# Agent Zero 0.9.7+ might need explicit embedding or indexing.
# However, simply having it in the folder makes it accessible to 'knowledge' tool usually.
target_path = os.path.join(TARGET_DIR, filename)
if os.path.exists(target_path):
logger.info(f"File already exists in knowledge base: {filename}")
return
shutil.copy2(event.src_path, target_path)
logger.info(f"Ingested {filename} into {target_path}")
# TODO: If explicit API call is needed to trigger embedding, do it here.
# For now, we assume A0 indexes on access or via background jobs.
except Exception as e:
logger.error(f"Error processing file {event.src_path}: {e}")
if __name__ == "__main__":
# Ensure directories exist
os.makedirs(WATCH_DIR, exist_ok=True)
os.makedirs(TARGET_DIR, exist_ok=True)
event_handler = KnowledgeHandler()
observer = Observer()
observer.schedule(event_handler, WATCH_DIR, recursive=False)
logger.info(f"Starting Knowledge Watcher on {WATCH_DIR} -> {TARGET_DIR}")
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
|