Spaces:
Sleeping
Sleeping
| """ | |
| Auto-Sync Module for RAG Personal Diary Chatbot | |
| Handles automatic synchronization between database and vector store | |
| """ | |
| import os | |
| import sys | |
| import sqlite3 | |
| import logging | |
| from datetime import datetime, timedelta | |
| from typing import Optional, Dict, Any, List | |
| import streamlit as st | |
| # Add paths for imports | |
| sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'Indexingstep')) | |
| sys.path.append(os.path.dirname(os.path.dirname(__file__))) | |
| try: | |
| from pipeline import DiaryIndexingPipeline | |
| from embedding_and_storing import DiaryEmbeddingAndStorage | |
| from run_user_indexing import UserIsolatedIndexingPipeline | |
| except ImportError as e: | |
| logging.error(f"Could not import indexing modules: {e}") | |
| DiaryIndexingPipeline = None | |
| DiaryEmbeddingAndStorage = None | |
| UserIsolatedIndexingPipeline = None | |
| class AutoSyncManager: | |
| """Manages automatic synchronization between SQL database and vector database""" | |
| def __init__(self, user_id: int = 1): | |
| self.user_id = user_id | |
| # Use user-specific database path | |
| self.db_path = os.path.join(os.path.dirname(__file__), "backend", f"user_{user_id}_diary.db") | |
| self.vector_db_path = os.path.join(os.path.dirname(__file__), "..", "Indexingstep", f"user_{user_id}_vector_db") | |
| self.collection_name = f"user_{user_id}_diary_entries" | |
| # Load API key | |
| from dotenv import load_dotenv | |
| load_dotenv(os.path.join(os.path.dirname(__file__), '..', 'Indexingstep', '.env')) | |
| self.api_key = os.getenv("GOOGLE_API_KEY") | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO) | |
| self.logger = logging.getLogger(__name__) | |
| def get_last_sync_time(self) -> Optional[datetime]: | |
| """Get the last sync timestamp from a tracking file""" | |
| sync_file = os.path.join(os.path.dirname(__file__), f"last_sync_user_{self.user_id}.txt") | |
| try: | |
| if os.path.exists(sync_file): | |
| with open(sync_file, 'r') as f: | |
| timestamp_str = f.read().strip() | |
| return datetime.fromisoformat(timestamp_str) | |
| except Exception as e: | |
| self.logger.warning(f"Could not read last sync time: {e}") | |
| return None | |
| def update_last_sync_time(self, timestamp: datetime = None): | |
| """Update the last sync timestamp""" | |
| if timestamp is None: | |
| timestamp = datetime.now() | |
| sync_file = os.path.join(os.path.dirname(__file__), f"last_sync_user_{self.user_id}.txt") | |
| try: | |
| with open(sync_file, 'w') as f: | |
| f.write(timestamp.isoformat()) | |
| except Exception as e: | |
| self.logger.warning(f"Could not update last sync time: {e}") | |
| def get_changed_entries(self, since: Optional[datetime] = None) -> Dict[str, List]: | |
| """Get entries that changed since the last sync""" | |
| if since is None: | |
| since = self.get_last_sync_time() | |
| if since is None: | |
| since = datetime.now() - timedelta(days=7) # Default to last week | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Get new/updated entries | |
| since_str = since.strftime('%Y-%m-%d %H:%M:%S') | |
| cursor.execute(""" | |
| SELECT id, date, content, created_at, tags | |
| FROM diary_entries | |
| WHERE user_id = ? AND created_at > ? | |
| ORDER BY created_at DESC | |
| """, (self.user_id, since_str)) | |
| new_entries = [] | |
| for row in cursor.fetchall(): | |
| new_entries.append({ | |
| 'id': row[0], | |
| 'date': row[1], | |
| 'content': row[2], | |
| 'created_at': row[3], | |
| 'tags': row[4] or '' | |
| }) | |
| conn.close() | |
| return { | |
| 'new_entries': new_entries, | |
| 'deleted_entries': [] # TODO: Implement deletion tracking | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Error getting changed entries: {e}") | |
| return {'new_entries': [], 'deleted_entries': []} | |
| def auto_index_new_entries(self, entries: List[Dict]) -> bool: | |
| """Automatically index new entries""" | |
| if not entries or not self.api_key: | |
| return True | |
| try: | |
| # Run incremental indexing for new entries | |
| config = { | |
| "google_api_key": self.api_key, | |
| "db_path": self.db_path, | |
| "persist_directory": self.vector_db_path, | |
| "collection_name": self.collection_name, | |
| "embedding_model": "models/embedding-001", | |
| "chunk_size": 800, | |
| "chunk_overlap": 100, | |
| "batch_size": 50 | |
| } | |
| # Get date range for new entries | |
| if entries: | |
| dates = [entry['date'] for entry in entries] | |
| start_date = min(dates) | |
| end_date = max(dates) | |
| # Use the USER-ISOLATED indexing approach | |
| if UserIsolatedIndexingPipeline: | |
| pipeline = UserIsolatedIndexingPipeline( | |
| user_id=self.user_id, | |
| google_api_key=config["google_api_key"], | |
| base_db_path=os.path.dirname(config["db_path"]), | |
| base_persist_directory=os.path.dirname(config["persist_directory"]), | |
| embedding_model=config["embedding_model"], | |
| chunk_size=config["chunk_size"], | |
| chunk_overlap=config["chunk_overlap"], | |
| batch_size=config["batch_size"] | |
| ) | |
| # Run incremental indexing | |
| success = pipeline.run_incremental_indexing() | |
| if success: | |
| self.logger.info(f"Successfully indexed {len(entries)} new entries") | |
| return True | |
| else: | |
| self.logger.warning(f"Indexing completed with warnings") | |
| return False | |
| else: | |
| # Fallback to basic pipeline if UserIsolatedIndexingPipeline not available | |
| self.logger.warning("UserIsolatedIndexingPipeline not available, falling back to basic pipeline") | |
| if DiaryIndexingPipeline: | |
| pipeline = DiaryIndexingPipeline() | |
| pipeline.run() | |
| return True | |
| else: | |
| self.logger.error("No indexing pipeline available") | |
| return False | |
| except Exception as e: | |
| self.logger.error(f"Error auto-indexing new entries: {e}") | |
| return False | |
| return True | |
| def auto_remove_deleted_entries(self, deleted_entry_ids: List[int]) -> bool: | |
| """Automatically remove deleted entries from vector database""" | |
| if not deleted_entry_ids or not self.api_key: | |
| return True | |
| try: | |
| embedding_storage = DiaryEmbeddingAndStorage( | |
| user_id=self.user_id, | |
| api_key=self.api_key, | |
| base_persist_directory=os.path.dirname(self.vector_db_path), | |
| embedding_model="models/embedding-001" | |
| ) | |
| # Remove each deleted entry | |
| for entry_id in deleted_entry_ids: | |
| filter_criteria = {"entry_id": str(entry_id)} | |
| success = embedding_storage.delete_documents_by_metadata(filter_criteria) | |
| self.logger.info(f"Removed entry {entry_id} from vector DB: {success}") | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"Error removing deleted entries: {e}") | |
| return False | |
| def run_sync(self) -> Dict[str, Any]: | |
| """ | |
| Run the complete synchronization process and return results. | |
| This is the main entry point to be called from the UI. | |
| """ | |
| self.logger.info(f"🚀 Starting sync for user {self.user_id}...") | |
| results = {'status': 'failed', 'indexed_count': 0, 'deleted_count': 0, 'error': None} | |
| try: | |
| # 1. Get changes from the database | |
| last_sync_time = self.get_last_sync_time() | |
| self.logger.info(f"Last sync time: {last_sync_time}") | |
| changed_data = self.get_changed_entries(last_sync_time) | |
| new_entries = changed_data.get('new_entries', []) | |
| # deleted_ids = changed_data.get('deleted_entries', []) # Deletion not implemented yet | |
| self.logger.info(f"Found {len(new_entries)} new entries to index.") | |
| if not new_entries: | |
| results['status'] = 'success' | |
| results['message'] = "No new entries to index." | |
| self.logger.info("✅ Sync finished: No new entries.") | |
| self.update_last_sync_time() # Update sync time even if no changes | |
| return results | |
| # 2. Index new entries | |
| index_success = self.auto_index_new_entries(new_entries) | |
| if not index_success: | |
| raise RuntimeError("Failed to index new entries.") | |
| results['indexed_count'] = len(new_entries) | |
| # 3. Update last sync time | |
| self.update_last_sync_time() | |
| results['status'] = 'success' | |
| results['message'] = f"Successfully indexed {len(new_entries)} new entries." | |
| self.logger.info(f"✅ Sync successful for user {self.user_id}.") | |
| except Exception as e: | |
| self.logger.error(f"❌ Sync failed for user {self.user_id}: {e}", exc_info=True) | |
| results['error'] = str(e) | |
| return results | |
| def perform_auto_sync(self) -> Dict[str, Any]: | |
| """Perform automatic synchronization""" | |
| try: | |
| # Get changes since last sync | |
| changes = self.get_changed_entries() | |
| new_entries = changes['new_entries'] | |
| deleted_entries = changes['deleted_entries'] | |
| results = { | |
| 'success': True, | |
| 'new_entries_count': len(new_entries), | |
| 'deleted_entries_count': len(deleted_entries), | |
| 'errors': [] | |
| } | |
| # Index new entries | |
| if new_entries: | |
| index_success = self.auto_index_new_entries(new_entries) | |
| if not index_success: | |
| results['errors'].append("Failed to index some new entries") | |
| # Remove deleted entries | |
| if deleted_entries: | |
| delete_success = self.auto_remove_deleted_entries(deleted_entries) | |
| if not delete_success: | |
| results['errors'].append("Failed to remove some deleted entries") | |
| # Update sync timestamp | |
| self.update_last_sync_time() | |
| results['success'] = len(results['errors']) == 0 | |
| return results | |
| except Exception as e: | |
| self.logger.error(f"Auto-sync failed: {e}") | |
| return { | |
| 'success': False, | |
| 'new_entries_count': 0, | |
| 'deleted_entries_count': 0, | |
| 'errors': [str(e)] | |
| } | |
| # Streamlit helper functions | |
| def run_auto_sync(user_id: int = None) -> bool: | |
| """Run auto-sync and show results in Streamlit""" | |
| if user_id is None: | |
| user_id = getattr(st.session_state, 'current_user_id', 1) | |
| try: | |
| # Simple approach: call the indexing script directly | |
| import subprocess | |
| script_path = os.path.join( | |
| os.path.dirname(__file__), | |
| '..', | |
| 'Indexingstep', | |
| 'run_user_indexing.py' | |
| ) | |
| if not os.path.exists(script_path): | |
| return False | |
| # Get virtual environment python | |
| venv_python = os.path.join( | |
| os.path.dirname(__file__), | |
| '..', | |
| '..', | |
| '.venv', | |
| 'Scripts', | |
| 'python.exe' | |
| ) | |
| python_cmd = venv_python if os.path.exists(venv_python) else sys.executable | |
| # Run incremental indexing for the user | |
| result = subprocess.run( | |
| [python_cmd, script_path, '--user-id', str(user_id)], | |
| cwd=os.path.dirname(script_path), | |
| capture_output=True, | |
| text=True, | |
| timeout=120 # 2 minutes timeout | |
| ) | |
| if result.returncode == 0: | |
| return True | |
| else: | |
| return False | |
| except Exception as e: | |
| return False | |
| def run_auto_sync_legacy(user_id: int = None) -> bool: | |
| """Legacy auto-sync using the AutoSyncManager class""" | |
| if user_id is None: | |
| user_id = getattr(st.session_state, 'current_user_id', 1) | |
| try: | |
| sync_manager = AutoSyncManager(user_id) | |
| results = sync_manager.perform_auto_sync() | |
| if results['success']: | |
| if results['new_entries_count'] > 0: | |
| st.success(f"✅ Auto-sync: {results['new_entries_count']} new entries indexed") | |
| return True | |
| else: | |
| st.warning(f"⚠️ Auto-sync completed with warnings: {', '.join(results['errors'])}") | |
| return True | |
| except Exception as e: | |
| st.warning(f"⚠️ Auto-sync failed: {str(e)}") | |
| return False | |
| def schedule_auto_sync(): | |
| """Schedule auto-sync to run periodically""" | |
| # This could be enhanced with background tasks or scheduled jobs | |
| # For now, we'll call it manually when entries are created/deleted | |
| pass | |