huytrao123's picture
Upload 103 files
ced61cd verified
"""
Auto-Sync Module for RAG Personal Diary Chatbot
Handles automatic synchronization between database and vector store
"""
import os
import sys
import sqlite3
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List
import streamlit as st
# Add paths for imports
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'Indexingstep'))
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
try:
from pipeline import DiaryIndexingPipeline
from embedding_and_storing import DiaryEmbeddingAndStorage
from run_user_indexing import UserIsolatedIndexingPipeline
except ImportError as e:
logging.error(f"Could not import indexing modules: {e}")
DiaryIndexingPipeline = None
DiaryEmbeddingAndStorage = None
UserIsolatedIndexingPipeline = None
class AutoSyncManager:
"""Manages automatic synchronization between SQL database and vector database"""
def __init__(self, user_id: int = 1):
self.user_id = user_id
# Use user-specific database path
self.db_path = os.path.join(os.path.dirname(__file__), "backend", f"user_{user_id}_diary.db")
self.vector_db_path = os.path.join(os.path.dirname(__file__), "..", "Indexingstep", f"user_{user_id}_vector_db")
self.collection_name = f"user_{user_id}_diary_entries"
# Load API key
from dotenv import load_dotenv
load_dotenv(os.path.join(os.path.dirname(__file__), '..', 'Indexingstep', '.env'))
self.api_key = os.getenv("GOOGLE_API_KEY")
# Setup logging
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def get_last_sync_time(self) -> Optional[datetime]:
"""Get the last sync timestamp from a tracking file"""
sync_file = os.path.join(os.path.dirname(__file__), f"last_sync_user_{self.user_id}.txt")
try:
if os.path.exists(sync_file):
with open(sync_file, 'r') as f:
timestamp_str = f.read().strip()
return datetime.fromisoformat(timestamp_str)
except Exception as e:
self.logger.warning(f"Could not read last sync time: {e}")
return None
def update_last_sync_time(self, timestamp: datetime = None):
"""Update the last sync timestamp"""
if timestamp is None:
timestamp = datetime.now()
sync_file = os.path.join(os.path.dirname(__file__), f"last_sync_user_{self.user_id}.txt")
try:
with open(sync_file, 'w') as f:
f.write(timestamp.isoformat())
except Exception as e:
self.logger.warning(f"Could not update last sync time: {e}")
def get_changed_entries(self, since: Optional[datetime] = None) -> Dict[str, List]:
"""Get entries that changed since the last sync"""
if since is None:
since = self.get_last_sync_time()
if since is None:
since = datetime.now() - timedelta(days=7) # Default to last week
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Get new/updated entries
since_str = since.strftime('%Y-%m-%d %H:%M:%S')
cursor.execute("""
SELECT id, date, content, created_at, tags
FROM diary_entries
WHERE user_id = ? AND created_at > ?
ORDER BY created_at DESC
""", (self.user_id, since_str))
new_entries = []
for row in cursor.fetchall():
new_entries.append({
'id': row[0],
'date': row[1],
'content': row[2],
'created_at': row[3],
'tags': row[4] or ''
})
conn.close()
return {
'new_entries': new_entries,
'deleted_entries': [] # TODO: Implement deletion tracking
}
except Exception as e:
self.logger.error(f"Error getting changed entries: {e}")
return {'new_entries': [], 'deleted_entries': []}
def auto_index_new_entries(self, entries: List[Dict]) -> bool:
"""Automatically index new entries"""
if not entries or not self.api_key:
return True
try:
# Run incremental indexing for new entries
config = {
"google_api_key": self.api_key,
"db_path": self.db_path,
"persist_directory": self.vector_db_path,
"collection_name": self.collection_name,
"embedding_model": "models/embedding-001",
"chunk_size": 800,
"chunk_overlap": 100,
"batch_size": 50
}
# Get date range for new entries
if entries:
dates = [entry['date'] for entry in entries]
start_date = min(dates)
end_date = max(dates)
# Use the USER-ISOLATED indexing approach
if UserIsolatedIndexingPipeline:
pipeline = UserIsolatedIndexingPipeline(
user_id=self.user_id,
google_api_key=config["google_api_key"],
base_db_path=os.path.dirname(config["db_path"]),
base_persist_directory=os.path.dirname(config["persist_directory"]),
embedding_model=config["embedding_model"],
chunk_size=config["chunk_size"],
chunk_overlap=config["chunk_overlap"],
batch_size=config["batch_size"]
)
# Run incremental indexing
success = pipeline.run_incremental_indexing()
if success:
self.logger.info(f"Successfully indexed {len(entries)} new entries")
return True
else:
self.logger.warning(f"Indexing completed with warnings")
return False
else:
# Fallback to basic pipeline if UserIsolatedIndexingPipeline not available
self.logger.warning("UserIsolatedIndexingPipeline not available, falling back to basic pipeline")
if DiaryIndexingPipeline:
pipeline = DiaryIndexingPipeline()
pipeline.run()
return True
else:
self.logger.error("No indexing pipeline available")
return False
except Exception as e:
self.logger.error(f"Error auto-indexing new entries: {e}")
return False
return True
def auto_remove_deleted_entries(self, deleted_entry_ids: List[int]) -> bool:
"""Automatically remove deleted entries from vector database"""
if not deleted_entry_ids or not self.api_key:
return True
try:
embedding_storage = DiaryEmbeddingAndStorage(
user_id=self.user_id,
api_key=self.api_key,
base_persist_directory=os.path.dirname(self.vector_db_path),
embedding_model="models/embedding-001"
)
# Remove each deleted entry
for entry_id in deleted_entry_ids:
filter_criteria = {"entry_id": str(entry_id)}
success = embedding_storage.delete_documents_by_metadata(filter_criteria)
self.logger.info(f"Removed entry {entry_id} from vector DB: {success}")
return True
except Exception as e:
self.logger.error(f"Error removing deleted entries: {e}")
return False
def run_sync(self) -> Dict[str, Any]:
"""
Run the complete synchronization process and return results.
This is the main entry point to be called from the UI.
"""
self.logger.info(f"🚀 Starting sync for user {self.user_id}...")
results = {'status': 'failed', 'indexed_count': 0, 'deleted_count': 0, 'error': None}
try:
# 1. Get changes from the database
last_sync_time = self.get_last_sync_time()
self.logger.info(f"Last sync time: {last_sync_time}")
changed_data = self.get_changed_entries(last_sync_time)
new_entries = changed_data.get('new_entries', [])
# deleted_ids = changed_data.get('deleted_entries', []) # Deletion not implemented yet
self.logger.info(f"Found {len(new_entries)} new entries to index.")
if not new_entries:
results['status'] = 'success'
results['message'] = "No new entries to index."
self.logger.info("✅ Sync finished: No new entries.")
self.update_last_sync_time() # Update sync time even if no changes
return results
# 2. Index new entries
index_success = self.auto_index_new_entries(new_entries)
if not index_success:
raise RuntimeError("Failed to index new entries.")
results['indexed_count'] = len(new_entries)
# 3. Update last sync time
self.update_last_sync_time()
results['status'] = 'success'
results['message'] = f"Successfully indexed {len(new_entries)} new entries."
self.logger.info(f"✅ Sync successful for user {self.user_id}.")
except Exception as e:
self.logger.error(f"❌ Sync failed for user {self.user_id}: {e}", exc_info=True)
results['error'] = str(e)
return results
def perform_auto_sync(self) -> Dict[str, Any]:
"""Perform automatic synchronization"""
try:
# Get changes since last sync
changes = self.get_changed_entries()
new_entries = changes['new_entries']
deleted_entries = changes['deleted_entries']
results = {
'success': True,
'new_entries_count': len(new_entries),
'deleted_entries_count': len(deleted_entries),
'errors': []
}
# Index new entries
if new_entries:
index_success = self.auto_index_new_entries(new_entries)
if not index_success:
results['errors'].append("Failed to index some new entries")
# Remove deleted entries
if deleted_entries:
delete_success = self.auto_remove_deleted_entries(deleted_entries)
if not delete_success:
results['errors'].append("Failed to remove some deleted entries")
# Update sync timestamp
self.update_last_sync_time()
results['success'] = len(results['errors']) == 0
return results
except Exception as e:
self.logger.error(f"Auto-sync failed: {e}")
return {
'success': False,
'new_entries_count': 0,
'deleted_entries_count': 0,
'errors': [str(e)]
}
# Streamlit helper functions
def run_auto_sync(user_id: int = None) -> bool:
"""Run auto-sync and show results in Streamlit"""
if user_id is None:
user_id = getattr(st.session_state, 'current_user_id', 1)
try:
# Simple approach: call the indexing script directly
import subprocess
script_path = os.path.join(
os.path.dirname(__file__),
'..',
'Indexingstep',
'run_user_indexing.py'
)
if not os.path.exists(script_path):
return False
# Get virtual environment python
venv_python = os.path.join(
os.path.dirname(__file__),
'..',
'..',
'.venv',
'Scripts',
'python.exe'
)
python_cmd = venv_python if os.path.exists(venv_python) else sys.executable
# Run incremental indexing for the user
result = subprocess.run(
[python_cmd, script_path, '--user-id', str(user_id)],
cwd=os.path.dirname(script_path),
capture_output=True,
text=True,
timeout=120 # 2 minutes timeout
)
if result.returncode == 0:
return True
else:
return False
except Exception as e:
return False
def run_auto_sync_legacy(user_id: int = None) -> bool:
"""Legacy auto-sync using the AutoSyncManager class"""
if user_id is None:
user_id = getattr(st.session_state, 'current_user_id', 1)
try:
sync_manager = AutoSyncManager(user_id)
results = sync_manager.perform_auto_sync()
if results['success']:
if results['new_entries_count'] > 0:
st.success(f"✅ Auto-sync: {results['new_entries_count']} new entries indexed")
return True
else:
st.warning(f"⚠️ Auto-sync completed with warnings: {', '.join(results['errors'])}")
return True
except Exception as e:
st.warning(f"⚠️ Auto-sync failed: {str(e)}")
return False
def schedule_auto_sync():
"""Schedule auto-sync to run periodically"""
# This could be enhanced with background tasks or scheduled jobs
# For now, we'll call it manually when entries are created/deleted
pass