|
|
""" |
|
|
RAG (Retrieval Augmented Generation) implementation for project assistant. |
|
|
""" |
|
|
from pathlib import Path |
|
|
from typing import List, Dict, Any |
|
|
from datetime import datetime |
|
|
import chromadb |
|
|
from chromadb.config import Settings |
|
|
from langchain_huggingface import HuggingFaceEmbeddings |
|
|
from langchain_text_splitters import RecursiveCharacterTextSplitter |
|
|
from src.parsers import MeetingNote, load_meetings_from_directory |
|
|
|
|
|
|
|
|
class ProjectRAG: |
|
|
"""RAG system for project meeting notes.""" |
|
|
|
|
|
def __init__(self, data_dir: Path, persist_dir: Path = None): |
|
|
"""Initialize the RAG system.""" |
|
|
self.data_dir = data_dir |
|
|
self.persist_dir = persist_dir or Path("./chroma_db") |
|
|
|
|
|
|
|
|
self.embeddings = HuggingFaceEmbeddings( |
|
|
model_name="sentence-transformers/all-MiniLM-L6-v2" |
|
|
) |
|
|
|
|
|
|
|
|
self.client = chromadb.PersistentClient(path=str(self.persist_dir)) |
|
|
self.collection = self.client.get_or_create_collection( |
|
|
name="meeting_notes", |
|
|
metadata={"hnsw:space": "cosine"} |
|
|
) |
|
|
|
|
|
|
|
|
self.text_splitter = RecursiveCharacterTextSplitter( |
|
|
chunk_size=500, |
|
|
chunk_overlap=50, |
|
|
separators=["\n\n", "\n", ". ", " ", ""] |
|
|
) |
|
|
|
|
|
self.meetings: List[MeetingNote] = [] |
|
|
|
|
|
def load_and_index(self): |
|
|
"""Load all meetings and index them in the vector store.""" |
|
|
print("Loading meetings from directory...") |
|
|
self.meetings = load_meetings_from_directory(self.data_dir) |
|
|
print(f"Loaded {len(self.meetings)} meetings") |
|
|
|
|
|
if not self.meetings: |
|
|
print("No meetings found. Please add meeting notes to the data directory.") |
|
|
return |
|
|
|
|
|
|
|
|
self.client.delete_collection("meeting_notes") |
|
|
self.collection = self.client.create_collection( |
|
|
name="meeting_notes", |
|
|
metadata={"hnsw:space": "cosine"} |
|
|
) |
|
|
|
|
|
print("Indexing meetings...") |
|
|
documents = [] |
|
|
metadatas = [] |
|
|
ids = [] |
|
|
|
|
|
for idx, meeting in enumerate(self.meetings): |
|
|
|
|
|
doc_parts = [ |
|
|
f"Project: {meeting.project_name}", |
|
|
f"Meeting: {meeting.title}", |
|
|
f"Date: {meeting.date.strftime('%Y-%m-%d') if meeting.date else 'Unknown'}", |
|
|
] |
|
|
|
|
|
if meeting.participants: |
|
|
doc_parts.append(f"Participants: {', '.join(meeting.participants)}") |
|
|
|
|
|
if meeting.discussion: |
|
|
doc_parts.append(f"Discussion:\n{meeting.discussion}") |
|
|
|
|
|
if meeting.decisions: |
|
|
doc_parts.append("Decisions:") |
|
|
doc_parts.extend([f"- {d}" for d in meeting.decisions]) |
|
|
|
|
|
if meeting.action_items: |
|
|
doc_parts.append("Action Items:") |
|
|
for item in meeting.action_items: |
|
|
status = "✓" if item.completed else "○" |
|
|
assignee = f"{item.assignee}: " if item.assignee else "" |
|
|
deadline = f" (by {item.deadline})" if item.deadline else "" |
|
|
doc_parts.append(f"{status} {assignee}{item.task}{deadline}") |
|
|
|
|
|
if meeting.blockers: |
|
|
doc_parts.append("Blockers:") |
|
|
doc_parts.extend([f"- {b}" for b in meeting.blockers]) |
|
|
|
|
|
full_doc = "\n".join(doc_parts) |
|
|
|
|
|
|
|
|
chunks = self.text_splitter.split_text(full_doc) |
|
|
|
|
|
for chunk_idx, chunk in enumerate(chunks): |
|
|
documents.append(chunk) |
|
|
metadatas.append({ |
|
|
"meeting_idx": idx, |
|
|
"project": meeting.project_name, |
|
|
"title": meeting.title, |
|
|
"date": meeting.date.isoformat() if meeting.date else "", |
|
|
"file_path": meeting.file_path, |
|
|
"chunk_idx": chunk_idx |
|
|
}) |
|
|
ids.append(f"meeting_{idx}_chunk_{chunk_idx}") |
|
|
|
|
|
|
|
|
if documents: |
|
|
|
|
|
embeddings_list = self.embeddings.embed_documents(documents) |
|
|
|
|
|
self.collection.add( |
|
|
embeddings=embeddings_list, |
|
|
documents=documents, |
|
|
metadatas=metadatas, |
|
|
ids=ids |
|
|
) |
|
|
print(f"Indexed {len(documents)} chunks from {len(self.meetings)} meetings") |
|
|
|
|
|
def search(self, query: str, n_results: int = 5, project_filter: str = None) -> List[Dict[str, Any]]: |
|
|
"""Search for relevant meeting content.""" |
|
|
|
|
|
query_embedding = self.embeddings.embed_query(query) |
|
|
|
|
|
|
|
|
where = None |
|
|
if project_filter: |
|
|
where = {"project": project_filter} |
|
|
|
|
|
|
|
|
results = self.collection.query( |
|
|
query_embeddings=[query_embedding], |
|
|
n_results=n_results, |
|
|
where=where |
|
|
) |
|
|
|
|
|
|
|
|
formatted_results = [] |
|
|
if results['documents'] and results['documents'][0]: |
|
|
for i in range(len(results['documents'][0])): |
|
|
formatted_results.append({ |
|
|
'content': results['documents'][0][i], |
|
|
'metadata': results['metadatas'][0][i], |
|
|
'distance': results['distances'][0][i] if 'distances' in results else None |
|
|
}) |
|
|
|
|
|
return formatted_results |
|
|
|
|
|
def get_all_projects(self) -> List[str]: |
|
|
"""Get list of all project names.""" |
|
|
return list(set(m.project_name for m in self.meetings)) |
|
|
|
|
|
def get_open_action_items(self, project: str = None) -> List[Dict[str, Any]]: |
|
|
"""Get all open action items, optionally filtered by project.""" |
|
|
action_items = [] |
|
|
|
|
|
for meeting in self.meetings: |
|
|
if project and meeting.project_name != project: |
|
|
continue |
|
|
|
|
|
for item in meeting.action_items: |
|
|
if not item.completed: |
|
|
action_items.append({ |
|
|
'project': meeting.project_name, |
|
|
'meeting': meeting.title, |
|
|
'date': meeting.date, |
|
|
'assignee': item.assignee, |
|
|
'task': item.task, |
|
|
'deadline': item.deadline |
|
|
}) |
|
|
|
|
|
return action_items |
|
|
|
|
|
def get_blockers(self, project: str = None) -> List[Dict[str, Any]]: |
|
|
"""Get all blockers, optionally filtered by project.""" |
|
|
blockers = [] |
|
|
|
|
|
for meeting in self.meetings: |
|
|
if project and meeting.project_name != project: |
|
|
continue |
|
|
|
|
|
for blocker in meeting.blockers: |
|
|
blockers.append({ |
|
|
'project': meeting.project_name, |
|
|
'meeting': meeting.title, |
|
|
'date': meeting.date, |
|
|
'blocker': blocker |
|
|
}) |
|
|
|
|
|
return blockers |
|
|
|
|
|
def get_recent_decisions(self, project: str = None, limit: int = 10) -> List[Dict[str, Any]]: |
|
|
"""Get recent decisions, optionally filtered by project.""" |
|
|
decisions = [] |
|
|
|
|
|
for meeting in sorted(self.meetings, key=lambda m: m.date or datetime.min, reverse=True): |
|
|
if project and meeting.project_name != project: |
|
|
continue |
|
|
|
|
|
for decision in meeting.decisions: |
|
|
decisions.append({ |
|
|
'project': meeting.project_name, |
|
|
'meeting': meeting.title, |
|
|
'date': meeting.date, |
|
|
'decision': decision |
|
|
}) |
|
|
|
|
|
if len(decisions) >= limit: |
|
|
return decisions |
|
|
|
|
|
return decisions |
|
|
|
|
|
def get_project_documents(self, project: str) -> List: |
|
|
"""Get all meeting documents for a specific project.""" |
|
|
from langchain_core.documents import Document |
|
|
|
|
|
documents = [] |
|
|
for meeting in sorted(self.meetings, key=lambda m: m.date or datetime.min): |
|
|
if meeting.project_name != project: |
|
|
continue |
|
|
|
|
|
|
|
|
doc_parts = [ |
|
|
f"# Meeting: {meeting.title}", |
|
|
f"**Date:** {meeting.date.strftime('%Y-%m-%d') if meeting.date else 'Unknown'}", |
|
|
] |
|
|
|
|
|
if meeting.participants: |
|
|
doc_parts.append(f"**Participants:** {', '.join(meeting.participants)}") |
|
|
|
|
|
if meeting.discussion: |
|
|
doc_parts.append(f"\n## Discussion\n{meeting.discussion}") |
|
|
|
|
|
if meeting.decisions: |
|
|
doc_parts.append("\n## Decisions") |
|
|
doc_parts.extend([f"- {d}" for d in meeting.decisions]) |
|
|
|
|
|
if meeting.action_items: |
|
|
doc_parts.append("\n## Action Items") |
|
|
for item in meeting.action_items: |
|
|
status = "[x]" if item.completed else "[ ]" |
|
|
assignee = f"{item.assignee}: " if item.assignee else "" |
|
|
deadline = f" (by {item.deadline})" if item.deadline else "" |
|
|
doc_parts.append(f"- {status} {assignee}{item.task}{deadline}") |
|
|
|
|
|
if meeting.blockers: |
|
|
doc_parts.append("\n## Blockers") |
|
|
doc_parts.extend([f"- {b}" for b in meeting.blockers]) |
|
|
|
|
|
full_content = "\n".join(doc_parts) |
|
|
documents.append(Document( |
|
|
page_content=full_content, |
|
|
metadata={ |
|
|
"project": meeting.project_name, |
|
|
"title": meeting.title, |
|
|
"date": meeting.date.isoformat() if meeting.date else "" |
|
|
} |
|
|
)) |
|
|
|
|
|
return documents |
|
|
|