Spaces:
Paused
Paused
| import time | |
| import os | |
| import shutil | |
| import logging | |
| from watchdog.observers import Observer | |
| from watchdog.events import FileSystemEventHandler | |
| # Configuration | |
| WATCH_DIR = os.getenv("KNOWLEDGE_WATCH_DIR", "/knowledge_data") | |
| # Agent Zero internal knowledge directory | |
| TARGET_DIR = "/a0/knowledge/default" # 'default' knowledge base | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| class KnowledgeHandler(FileSystemEventHandler): | |
| def on_created(self, event): | |
| if event.is_directory: | |
| return | |
| filename = os.path.basename(event.src_path) | |
| # unexpected files filter | |
| if filename.startswith(".") or filename.endswith(".tmp"): | |
| return | |
| logger.info(f"New file detected: {event.src_path}") | |
| # We wait a bit to ensure file write is complete | |
| time.sleep(1) | |
| try: | |
| # Copy to Agent Zero knowledge directory | |
| # This triggers Agent Zero's internal mechanisms if it has any auto-ingest, | |
| # OR we simply place it there so it's available for the 'knowledge' tool. | |
| # Agent Zero 0.9.7+ might need explicit embedding or indexing. | |
| # However, simply having it in the folder makes it accessible to 'knowledge' tool usually. | |
| target_path = os.path.join(TARGET_DIR, filename) | |
| if os.path.exists(target_path): | |
| logger.info(f"File already exists in knowledge base: {filename}") | |
| return | |
| shutil.copy2(event.src_path, target_path) | |
| logger.info(f"Ingested {filename} into {target_path}") | |
| # TODO: If explicit API call is needed to trigger embedding, do it here. | |
| # For now, we assume A0 indexes on access or via background jobs. | |
| except Exception as e: | |
| logger.error(f"Error processing file {event.src_path}: {e}") | |
| if __name__ == "__main__": | |
| # Ensure directories exist | |
| os.makedirs(WATCH_DIR, exist_ok=True) | |
| os.makedirs(TARGET_DIR, exist_ok=True) | |
| event_handler = KnowledgeHandler() | |
| observer = Observer() | |
| observer.schedule(event_handler, WATCH_DIR, recursive=False) | |
| logger.info(f"Starting Knowledge Watcher on {WATCH_DIR} -> {TARGET_DIR}") | |
| observer.start() | |
| try: | |
| while True: | |
| time.sleep(1) | |
| except KeyboardInterrupt: | |
| observer.stop() | |
| observer.join() | |