Spaces:
Sleeping
Sleeping
| import lancedb | |
| import uuid | |
| import os | |
| from typing import List, Dict, Optional | |
| from datetime import datetime | |
| from app.services.llm import llm_service | |
| class JournalService: | |
| LANCE_PATH = "data/journal_lancedb" | |
| TABLE_NAME = "journal_nodes" | |
| def __init__(self): | |
| if not os.path.exists("data"): | |
| os.makedirs("data") | |
| self.db = lancedb.connect(self.LANCE_PATH) | |
| async def create_entry(self, user_id: str, text: str, verses: List[str] = None, tags: List[str] = None) -> str: | |
| """ | |
| Creates a new journal node, embeds it, and automatically finds links to previous entries. | |
| """ | |
| entry_id = str(uuid.uuid4()) | |
| timestamp = datetime.now().isoformat() | |
| # 1. Generate Embedding | |
| vector = await llm_service.get_embedding(text) | |
| if not vector: | |
| print("JournalService: Failed to generate embedding.") | |
| return None | |
| # 2. Prepare Data | |
| data = [{ | |
| "vector": vector, | |
| "id": entry_id, | |
| "user_id": user_id, | |
| "text": text, | |
| "verses": verses or [], | |
| "tags": tags or [], | |
| "timestamp": timestamp | |
| }] | |
| # 3. Store in LanceDB | |
| if self.TABLE_NAME in self.db.table_names(): | |
| tbl = self.db.open_table(self.TABLE_NAME) | |
| tbl.add(data) | |
| else: | |
| self.db.create_table(self.TABLE_NAME, data=data) | |
| print(f"JournalService: Created entry {entry_id} for user {user_id}") | |
| return entry_id | |
| async def get_related_entries(self, user_id: str, entry_text: str, limit: int = 3) -> List[Dict]: | |
| """ | |
| Finds the 'Zettelkasten links' — other entries that are semantically related. | |
| """ | |
| if self.TABLE_NAME not in self.db.table_names(): | |
| return [] | |
| query_vec = await llm_service.get_embedding(entry_text) | |
| if not query_vec: | |
| return [] | |
| tbl = self.db.open_table(self.TABLE_NAME) | |
| # Search for similar entries by the same user | |
| results = (tbl.search(query_vec) | |
| .where(f"user_id = '{user_id}'", prefilter=True) | |
| .limit(limit + 1) | |
| .to_list()) | |
| # Filter out exact matches (the entry itself) | |
| filtered = [r for r in results if r['text'] != entry_text] | |
| return filtered[:limit] | |
| async def get_user_entries(self, user_id: str, limit: int = 20) -> List[Dict]: | |
| """Retrieves a timeline of entries.""" | |
| if self.TABLE_NAME not in self.db.table_names(): | |
| return [] | |
| tbl = self.db.open_table(self.TABLE_NAME) | |
| # Using a simple to_list and manual sort for small POC datasets | |
| results = tbl.search().where(f"user_id = '{user_id}'").to_list() | |
| results.sort(key=lambda x: x['timestamp'], reverse=True) | |
| return results[:limit] | |
| # Singleton instance | |
| journal_service = JournalService() | |