GravityFalls / python /tools /watch_knowledge.py
OnyxMunk's picture
Add Hugging Face Spaces deployment configuration (Dockerfile, supervisord, watcher)
5da3cbc
import time
import os
import shutil
import logging
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# Configuration
WATCH_DIR = os.getenv("KNOWLEDGE_WATCH_DIR", "/knowledge_data")
# Agent Zero internal knowledge directory
TARGET_DIR = "/a0/knowledge/default" # 'default' knowledge base
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
logger = logging.getLogger(__name__)
class KnowledgeHandler(FileSystemEventHandler):
def on_created(self, event):
if event.is_directory:
return
filename = os.path.basename(event.src_path)
# unexpected files filter
if filename.startswith(".") or filename.endswith(".tmp"):
return
logger.info(f"New file detected: {event.src_path}")
# We wait a bit to ensure file write is complete
time.sleep(1)
try:
# Copy to Agent Zero knowledge directory
# This triggers Agent Zero's internal mechanisms if it has any auto-ingest,
# OR we simply place it there so it's available for the 'knowledge' tool.
# Agent Zero 0.9.7+ might need explicit embedding or indexing.
# However, simply having it in the folder makes it accessible to 'knowledge' tool usually.
target_path = os.path.join(TARGET_DIR, filename)
if os.path.exists(target_path):
logger.info(f"File already exists in knowledge base: {filename}")
return
shutil.copy2(event.src_path, target_path)
logger.info(f"Ingested {filename} into {target_path}")
# TODO: If explicit API call is needed to trigger embedding, do it here.
# For now, we assume A0 indexes on access or via background jobs.
except Exception as e:
logger.error(f"Error processing file {event.src_path}: {e}")
if __name__ == "__main__":
# Ensure directories exist
os.makedirs(WATCH_DIR, exist_ok=True)
os.makedirs(TARGET_DIR, exist_ok=True)
event_handler = KnowledgeHandler()
observer = Observer()
observer.schedule(event_handler, WATCH_DIR, recursive=False)
logger.info(f"Starting Knowledge Watcher on {WATCH_DIR} -> {TARGET_DIR}")
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()