""" RAG (Retrieval Augmented Generation) implementation for project assistant. """ from pathlib import Path from typing import List, Dict, Any from datetime import datetime import chromadb from chromadb.config import Settings from langchain_huggingface import HuggingFaceEmbeddings from langchain_text_splitters import RecursiveCharacterTextSplitter from src.parsers import MeetingNote, load_meetings_from_directory class ProjectRAG: """RAG system for project meeting notes.""" def __init__(self, data_dir: Path, persist_dir: Path = None): """Initialize the RAG system.""" self.data_dir = data_dir self.persist_dir = persist_dir or Path("./chroma_db") # Initialize embeddings self.embeddings = HuggingFaceEmbeddings( model_name="sentence-transformers/all-MiniLM-L6-v2" ) # Initialize ChromaDB self.client = chromadb.PersistentClient(path=str(self.persist_dir)) self.collection = self.client.get_or_create_collection( name="meeting_notes", metadata={"hnsw:space": "cosine"} ) # Text splitter for chunking self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=500, chunk_overlap=50, separators=["\n\n", "\n", ". ", " ", ""] ) self.meetings: List[MeetingNote] = [] def load_and_index(self): """Load all meetings and index them in the vector store.""" print("Loading meetings from directory...") self.meetings = load_meetings_from_directory(self.data_dir) print(f"Loaded {len(self.meetings)} meetings") if not self.meetings: print("No meetings found. Please add meeting notes to the data directory.") return # Clear existing collection self.client.delete_collection("meeting_notes") self.collection = self.client.create_collection( name="meeting_notes", metadata={"hnsw:space": "cosine"} ) print("Indexing meetings...") documents = [] metadatas = [] ids = [] for idx, meeting in enumerate(self.meetings): # Create a rich document representation doc_parts = [ f"Project: {meeting.project_name}", f"Meeting: {meeting.title}", f"Date: {meeting.date.strftime('%Y-%m-%d') if meeting.date else 'Unknown'}", ] if meeting.participants: doc_parts.append(f"Participants: {', '.join(meeting.participants)}") if meeting.discussion: doc_parts.append(f"Discussion:\n{meeting.discussion}") if meeting.decisions: doc_parts.append("Decisions:") doc_parts.extend([f"- {d}" for d in meeting.decisions]) if meeting.action_items: doc_parts.append("Action Items:") for item in meeting.action_items: status = "✓" if item.completed else "○" assignee = f"{item.assignee}: " if item.assignee else "" deadline = f" (by {item.deadline})" if item.deadline else "" doc_parts.append(f"{status} {assignee}{item.task}{deadline}") if meeting.blockers: doc_parts.append("Blockers:") doc_parts.extend([f"- {b}" for b in meeting.blockers]) full_doc = "\n".join(doc_parts) # Chunk the document chunks = self.text_splitter.split_text(full_doc) for chunk_idx, chunk in enumerate(chunks): documents.append(chunk) metadatas.append({ "meeting_idx": idx, "project": meeting.project_name, "title": meeting.title, "date": meeting.date.isoformat() if meeting.date else "", "file_path": meeting.file_path, "chunk_idx": chunk_idx }) ids.append(f"meeting_{idx}_chunk_{chunk_idx}") # Add to ChromaDB if documents: # Embed documents embeddings_list = self.embeddings.embed_documents(documents) self.collection.add( embeddings=embeddings_list, documents=documents, metadatas=metadatas, ids=ids ) print(f"Indexed {len(documents)} chunks from {len(self.meetings)} meetings") def search(self, query: str, n_results: int = 5, project_filter: str = None) -> List[Dict[str, Any]]: """Search for relevant meeting content.""" # Embed the query query_embedding = self.embeddings.embed_query(query) # Prepare where clause for filtering where = None if project_filter: where = {"project": project_filter} # Search in ChromaDB results = self.collection.query( query_embeddings=[query_embedding], n_results=n_results, where=where ) # Format results formatted_results = [] if results['documents'] and results['documents'][0]: for i in range(len(results['documents'][0])): formatted_results.append({ 'content': results['documents'][0][i], 'metadata': results['metadatas'][0][i], 'distance': results['distances'][0][i] if 'distances' in results else None }) return formatted_results def get_all_projects(self) -> List[str]: """Get list of all project names.""" return list(set(m.project_name for m in self.meetings)) def get_open_action_items(self, project: str = None) -> List[Dict[str, Any]]: """Get all open action items, optionally filtered by project.""" action_items = [] for meeting in self.meetings: if project and meeting.project_name != project: continue for item in meeting.action_items: if not item.completed: action_items.append({ 'project': meeting.project_name, 'meeting': meeting.title, 'date': meeting.date, 'assignee': item.assignee, 'task': item.task, 'deadline': item.deadline }) return action_items def get_blockers(self, project: str = None) -> List[Dict[str, Any]]: """Get all blockers, optionally filtered by project.""" blockers = [] for meeting in self.meetings: if project and meeting.project_name != project: continue for blocker in meeting.blockers: blockers.append({ 'project': meeting.project_name, 'meeting': meeting.title, 'date': meeting.date, 'blocker': blocker }) return blockers def get_recent_decisions(self, project: str = None, limit: int = 10) -> List[Dict[str, Any]]: """Get recent decisions, optionally filtered by project.""" decisions = [] for meeting in sorted(self.meetings, key=lambda m: m.date or datetime.min, reverse=True): if project and meeting.project_name != project: continue for decision in meeting.decisions: decisions.append({ 'project': meeting.project_name, 'meeting': meeting.title, 'date': meeting.date, 'decision': decision }) if len(decisions) >= limit: return decisions return decisions def get_project_documents(self, project: str) -> List: """Get all meeting documents for a specific project.""" from langchain_core.documents import Document documents = [] for meeting in sorted(self.meetings, key=lambda m: m.date or datetime.min): if meeting.project_name != project: continue # Build full meeting content doc_parts = [ f"# Meeting: {meeting.title}", f"**Date:** {meeting.date.strftime('%Y-%m-%d') if meeting.date else 'Unknown'}", ] if meeting.participants: doc_parts.append(f"**Participants:** {', '.join(meeting.participants)}") if meeting.discussion: doc_parts.append(f"\n## Discussion\n{meeting.discussion}") if meeting.decisions: doc_parts.append("\n## Decisions") doc_parts.extend([f"- {d}" for d in meeting.decisions]) if meeting.action_items: doc_parts.append("\n## Action Items") for item in meeting.action_items: status = "[x]" if item.completed else "[ ]" assignee = f"{item.assignee}: " if item.assignee else "" deadline = f" (by {item.deadline})" if item.deadline else "" doc_parts.append(f"- {status} {assignee}{item.task}{deadline}") if meeting.blockers: doc_parts.append("\n## Blockers") doc_parts.extend([f"- {b}" for b in meeting.blockers]) full_content = "\n".join(doc_parts) documents.append(Document( page_content=full_content, metadata={ "project": meeting.project_name, "title": meeting.title, "date": meeting.date.isoformat() if meeting.date else "" } )) return documents