Spaces:
Sleeping
Sleeping
| """ | |
| LangChain Tools for Meeting Intelligence Agent | |
| This module defines tools that can be used by LangChain agents to interact | |
| with meeting transcripts stored in Pinecone. | |
| Tools follow the official @tool decorator pattern from LangChain. | |
| Reference: https://docs.langchain.com/oss/python/langchain/tools#create-tools | |
| """ | |
| from typing import List, Dict, Any, Optional | |
| from datetime import datetime | |
| import uuid | |
| import requests | |
| import traceback | |
| from langchain.tools import tool | |
| from langchain_core.documents import Document | |
| from src.retrievers.pipeline import process_transcript_to_documents | |
| from src.processing.metadata_extractor import MetadataExtractor | |
| from src.config.settings import Config | |
| # Global reference to PineconeManager (will be set during initialization) | |
| _pinecone_manager = None | |
| def initialize_tools(pinecone_manager): | |
| """ | |
| Initialize tools with a PineconeManager instance. | |
| Args: | |
| pinecone_manager: Instance of PineconeManager for database access | |
| """ | |
| global _pinecone_manager | |
| _pinecone_manager = pinecone_manager | |
| def search_meetings(query: str, max_results: int = 5, meeting_id: Optional[str] = None) -> str: | |
| """ | |
| Search meeting transcripts for relevant information using semantic search. | |
| Use this tool when you need to find specific information across meeting transcripts. | |
| The search uses AI-powered semantic matching to find the most relevant segments. | |
| Args: | |
| query: The search query or question to find relevant meeting content | |
| max_results: Maximum number of results to return (default: 5) | |
| meeting_id: Optional meeting ID to search within a specific meeting (e.g., "meeting_abc12345"). DO NOT use indices like "1" or "2". | |
| Returns: | |
| A formatted string containing the most relevant meeting transcript segments | |
| Example: | |
| search_meetings("What were the action items?", max_results=3) | |
| search_meetings("budget discussion", meeting_id="meeting_abc12345") | |
| """ | |
| if not _pinecone_manager: | |
| return "Error: Pinecone service is not initialized. Cannot search meetings." | |
| try: | |
| # Build search kwargs | |
| search_kwargs = {"k": max_results} | |
| # Add meeting_id filter if provided | |
| if meeting_id: | |
| search_kwargs["filter"] = {"meeting_id": {"$eq": meeting_id}} | |
| # Get retriever and perform search | |
| retriever = _pinecone_manager.get_retriever( | |
| namespace=Config.PINECONE_NAMESPACE, | |
| search_kwargs=search_kwargs | |
| ) | |
| docs = retriever.invoke(query) | |
| if not docs: | |
| return "No relevant meeting segments found for your query." | |
| # Format results | |
| result_parts = [f"Found {len(docs)} relevant meeting segments:\n"] | |
| for i, doc in enumerate(docs, 1): | |
| metadata = doc.metadata | |
| meeting_id = metadata.get("meeting_id", "unknown") | |
| meeting_date = metadata.get("meeting_date", "N/A") # Fixed missing variable | |
| meeting_title = metadata.get("meeting_title", "Untitled") # Added title | |
| chunk_index = metadata.get("chunk_index", "?") | |
| summary = metadata.get("summary", "N/A") | |
| speakers = metadata.get("speaker_mapping", "N/A") | |
| result_parts.append( | |
| f"\n--- Segment {i} ---\n" | |
| f"Meeting: {meeting_title} (ID: {meeting_id})\n" | |
| f"Date: {meeting_date}\n" | |
| f"Summary: {summary}\n" | |
| f"Speakers: {speakers}\n" | |
| f"Chunk: {chunk_index}\n" | |
| f"Content:\n{doc.page_content}\n" | |
| ) | |
| return "".join(result_parts) | |
| except Exception as e: | |
| traceback.print_exc() | |
| return f"Error searching meetings: {str(e)}" | |
| def get_meeting_metadata(meeting_id: str) -> str: | |
| """ | |
| Retrieve metadata and summary information for a specific meeting. | |
| Use this tool when you need to get details about a specific meeting, | |
| such as date, title, participants, or other metadata. | |
| Args: | |
| meeting_id: The unique identifier for the meeting (e.g., "meeting_abc12345") | |
| Returns: | |
| A formatted string containing the meeting's metadata | |
| Example: | |
| get_meeting_metadata("meeting_abc12345") | |
| """ | |
| if not _pinecone_manager: | |
| return "Error: Pinecone service is not initialized. Cannot retrieve metadata." | |
| try: | |
| # Search for any document from this meeting to get metadata | |
| retriever = _pinecone_manager.get_retriever( | |
| namespace=Config.PINECONE_NAMESPACE, | |
| search_kwargs={ | |
| "k": 1, | |
| "filter": {"meeting_id": {"$eq": meeting_id}} | |
| } | |
| ) | |
| # Use a generic query to get any chunk from this meeting | |
| docs = retriever.invoke("meeting content") | |
| if not docs: | |
| return f"No meeting found with ID: {meeting_id}" | |
| # Extract metadata from the first document | |
| metadata = docs[0].metadata | |
| result_parts = [ | |
| f"Meeting Information for {meeting_id}:\n", | |
| f"- Date: {metadata.get('meeting_date', 'N/A')}", # β Fixed: was 'date' | |
| f"- Title: {metadata.get('meeting_title', 'N/A')}", # β Fixed: was 'title' | |
| f"- Summary: {metadata.get('summary', 'N/A')}", # β Added summary | |
| f"- Source: {metadata.get('source', 'N/A')}", | |
| f"- Source File: {metadata.get('source_file', 'N/A')}", | |
| f"- Language: {metadata.get('language', 'N/A')}", | |
| f"- Transcription Model: {metadata.get('transcription_model', 'N/A')}", | |
| f"- Duration: {metadata.get('meeting_duration', 'N/A')}", # β Added duration | |
| ] | |
| return "\n".join(result_parts) | |
| except Exception as e: | |
| traceback.print_exc() | |
| return f"Error retrieving meeting metadata: {str(e)}" | |
| def list_recent_meetings(limit: int = 10) -> str: | |
| """ | |
| Get a list of recent meetings stored in the system. | |
| Use this tool when you need to see what meetings are available, | |
| or to help the user understand what they can ask about. | |
| Args: | |
| limit: Maximum number of meetings to return (default: 10) | |
| Returns: | |
| A formatted string listing recent meetings with their IDs and dates | |
| Example: | |
| list_recent_meetings(limit=5) | |
| """ | |
| if not _pinecone_manager: | |
| return "Error: Pinecone service is not initialized. Cannot list meetings." | |
| try: | |
| # Get retriever with high k to fetch many documents | |
| retriever = _pinecone_manager.get_retriever( | |
| namespace=Config.PINECONE_NAMESPACE, | |
| search_kwargs={"k": 500} # Fetch many to find unique meetings | |
| ) | |
| # Use a generic query to get documents | |
| docs = retriever.invoke("meeting") | |
| if not docs: | |
| return "No meetings found in the system." | |
| # Extract unique meetings | |
| meetings_dict = {} | |
| for doc in docs: | |
| metadata = doc.metadata | |
| meeting_id = metadata.get("meeting_id") | |
| if meeting_id and meeting_id not in meetings_dict: | |
| meetings_dict[meeting_id] = { | |
| "date": metadata.get("meeting_date", "N/A"), # β Fixed: was "date" | |
| "title": metadata.get("meeting_title", "N/A"), # β Fixed: was "title" | |
| "source_file": metadata.get("source_file", "N/A") | |
| } | |
| # Stop if we've found enough unique meetings | |
| if len(meetings_dict) >= limit: | |
| break | |
| if not meetings_dict: | |
| return "No meetings found in the system." | |
| # Format results | |
| result_parts = [f"Found {len(meetings_dict)} recent meetings:\n"] | |
| for i, (meeting_id, info) in enumerate(meetings_dict.items(), 1): | |
| result_parts.append( | |
| f"\n{i}. {meeting_id}\n" | |
| f" Date: {info['date']}\n" | |
| f" Title: {info['title']}\n" | |
| f" Source: {info['source_file']}" | |
| ) | |
| return "\n".join(result_parts) | |
| except Exception as e: | |
| traceback.print_exc() | |
| return f"Error listing meetings: {str(e)}" | |
| def get_current_time() -> str: | |
| """ | |
| Get the current date and time. | |
| Use this tool when you need to answer questions about relative time | |
| (e.g., "what happened yesterday?", "meetings from last week?"). | |
| Returns: | |
| Current date and time in YYYY-MM-DD HH:MM format | |
| """ | |
| return datetime.now().strftime("%Y-%m-%d %H:%M") | |
| def import_notion_to_pinecone(query: str) -> str: | |
| """ | |
| Directly import a Notion page to Pinecone by name. | |
| Fetch a Notion page and save it TO Pinecone. | |
| Use this tool ONLY when the user wants to *Import* or *Sync* a page FROM Notion INTO the database. | |
| Do NOT use this tool to write content TO Notion. Use `API-post-page` or `API-append-block-children` for that. | |
| This tool handles the entire process (Search -> Fetch Content -> Upsert) automatically. | |
| Args: | |
| query: The name of the Notion page to find (e.g., "Meeting 1"). | |
| Returns: | |
| Status message indicating success or failure. | |
| """ | |
| if not Config.NOTION_TOKEN: | |
| return "β Error: NOTION_TOKEN not set in configuration." | |
| headers = { | |
| "Authorization": f"Bearer {Config.NOTION_TOKEN}", | |
| "Notion-Version": "2022-06-28", | |
| "Content-Type": "application/json" | |
| } | |
| def fetch_blocks_recursive(block_id: str, depth: int = 0) -> List[str]: | |
| """Recursive helper to fetch blocks and their children.""" | |
| if depth > 5: # Safety limit for recursion depth | |
| return [] | |
| collected_text = [] | |
| cursor = None | |
| has_more = True | |
| while has_more: | |
| blocks_url = f"https://api.notion.com/v1/blocks/{block_id}/children" | |
| params = {"page_size": 100} | |
| if cursor: | |
| params["start_cursor"] = cursor | |
| resp = requests.get(blocks_url, headers=headers, params=params) | |
| if resp.status_code != 200: | |
| print(f"β οΈ Error fetching sub-blocks for {block_id}: {resp.text}") | |
| return [] | |
| data = resp.json() | |
| blocks = data.get("results", []) | |
| for block in blocks: | |
| # 1. Extract text from this block | |
| b_type = block.get("type") | |
| plain_text = "" | |
| if b_type and block.get(b_type) and "rich_text" in block[b_type]: | |
| rich_text = block[b_type]["rich_text"] | |
| plain_text = "".join([t.get("plain_text", "") for t in rich_text]) | |
| # Append text if present | |
| if plain_text.strip(): | |
| collected_text.append(plain_text) | |
| # 2. Check for children (Recursion) | |
| if block.get("has_children", False): | |
| # Fetch children text and append | |
| children_text = fetch_blocks_recursive(block["id"], depth + 1) | |
| collected_text.extend(children_text) | |
| has_more = data.get("has_more", False) | |
| cursor = data.get("next_cursor") | |
| return collected_text | |
| try: | |
| # 1. Search for the page | |
| print(f"π Searching Notion for: {query}...") | |
| search_url = "https://api.notion.com/v1/search" | |
| search_payload = { | |
| "query": query, | |
| "filter": {"value": "page", "property": "object"}, | |
| "sort": {"direction": "descending", "timestamp": "last_edited_time"}, | |
| "page_size": 25 | |
| } | |
| response = requests.post(search_url, headers=headers, json=search_payload) | |
| if response.status_code != 200: | |
| return f"β Notion Search Error: {response.text}" | |
| results = response.json().get("results", []) | |
| if not results: | |
| return f"β No Notion page found matching '{query}'." | |
| # Select best match | |
| best_page = None | |
| exact_match = None | |
| substring_match = None | |
| for p in results: | |
| # Extract title for this page | |
| p_props = p.get("properties", {}) | |
| p_title_prop = next((v for k, v in p_props.items() if v["id"] == "title"), None) | |
| p_title = "" | |
| if p_title_prop and p_title_prop.get("title"): | |
| p_title = "".join([t.get("plain_text", "") for t in p_title_prop.get("title", [])]) | |
| p_title_clean = p_title.lower().strip() | |
| query_clean = query.lower().strip() | |
| # Check 1: Exact Match | |
| if p_title_clean == query_clean: | |
| exact_match = p | |
| print(f"β Exact match found: '{p_title}'") | |
| break # Found the perfect match | |
| # Check 2: Substring Match (save the first one found) | |
| # Check 2: Substring Match (save the first one found) | |
| if query_clean in p_title_clean and substring_match is None: | |
| substring_match = p | |
| print(f"π Substring match candidate: '{p_title}'") | |
| # Print for debugging | |
| print(f" - Found result: '{p_title}'") | |
| # Decide which page to use | |
| if exact_match: | |
| best_page = exact_match | |
| elif substring_match: | |
| best_page = substring_match | |
| print("β οΈ Using substring match.") | |
| else: | |
| # Generate list of titles found to guide the user | |
| titles_found = [] | |
| for p in results: | |
| p_props = p.get("properties", {}) | |
| p_title_prop = next((v for k, v in p_props.items() if v["id"] == "title"), None) | |
| if p_title_prop and p_title_prop.get("title"): | |
| titles_found.append("".join([t.get("plain_text", "") for t in p_title_prop.get("title", [])])) | |
| return f"β Could not find a specific match for '{query}'. Found these pages instead: {', '.join(titles_found)}. Please try again with the exact name." | |
| page = best_page | |
| page_id = page["id"] | |
| # Re-extract title for the selected page for final usage | |
| props = page.get("properties", {}) | |
| title_prop = next((v for k, v in props.items() if v["id"] == "title"), None) | |
| title = "Untitled" | |
| if title_prop and title_prop.get("title"): | |
| title = "".join([t.get("plain_text", "") for t in title_prop.get("title", [])]) | |
| print(f"π Found Page: '{title}' ({page_id})") | |
| # 2. Recursive Fetch of All Content | |
| all_text_lines = fetch_blocks_recursive(page_id) | |
| if not all_text_lines: | |
| return f"β οΈ Page '{title}' found but appears empty or has no text blocks." | |
| full_content = "\n\n".join(all_text_lines) | |
| # 3. Upsert to Pinecone | |
| return upsert_text_to_pinecone.invoke({"text": full_content, "title": title, "source": "Notion"}) | |
| except Exception as e: | |
| traceback.print_exc() | |
| return f"β Import failed: {str(e)}" | |
| # Export all tools for easy import | |
| __all__ = [ | |
| "initialize_tools", | |
| "search_meetings", | |
| "get_meeting_metadata", | |
| "list_recent_meetings", | |
| "upsert_text_to_pinecone", | |
| "import_notion_to_pinecone", | |
| "create_notion_page", | |
| "get_current_time" | |
| ] | |
| def create_notion_page(title: str, content: str) -> str: | |
| """ | |
| Create a new page in Notion with a Title and Text Content. | |
| Use this tool for ANY request to "Write to Notion", "Save to Notion", "Create a page", "Draft an email in Notion". | |
| This tool handles all the formatting automatically. | |
| Args: | |
| title: The title of the new page. | |
| content: The text content of the page. | |
| Returns: | |
| Status message with link to the new page. | |
| """ | |
| if not Config.NOTION_TOKEN: | |
| return "β Error: NOTION_TOKEN not set." | |
| headers = { | |
| "Authorization": f"Bearer {Config.NOTION_TOKEN}", | |
| "Notion-Version": "2022-06-28", | |
| "Content-Type": "application/json" | |
| } | |
| # Split content into chunks of 2000 chars (Notion block limit) | |
| chunks = [content[i:i+2000] for i in range(0, len(content), 2000)] | |
| children_blocks = [] | |
| for chunk in chunks: | |
| children_blocks.append({ | |
| "object": "block", | |
| "type": "paragraph", | |
| "paragraph": { | |
| "rich_text": [{"type": "text", "text": {"content": chunk}}] | |
| } | |
| }) | |
| # Default parent page: Meetings Summary Test | |
| parent_page_id = "2bc5a424-5cbb-80ec-8aa9-c4fd989e67bc" | |
| payload = { | |
| "parent": {"page_id": parent_page_id}, | |
| "properties": { | |
| "title": [ | |
| { | |
| "text": { | |
| "content": title | |
| } | |
| } | |
| ] | |
| }, | |
| "children": children_blocks | |
| } | |
| try: | |
| url = "https://api.notion.com/v1/pages" | |
| resp = requests.post(url, headers=headers, json=payload) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| url = data.get('url', 'URL not found') | |
| return f"β Successfully created Notion page: '{title}'.\nLink: {url}" | |
| else: | |
| return f"β Failed to create Notion page: {resp.status_code} - {resp.text}" | |
| except Exception as e: | |
| traceback.print_exc() | |
| return f"β Error creating page: {str(e)}" | |
| def upsert_text_to_pinecone(text: str, title: str, source: str = "Manual Entry", date: str = None) -> str: | |
| """ | |
| Upsert any text content (e.g., Notion pages, manual notes) to Pinecone. | |
| Automatically extracts metadata (summary, date, speakers) from the text. | |
| Use this tool when retrieving full content from Notion or other sources. | |
| CRITICAL: Do NOT use this tool if the user wants to "Save to Notion" or "Create a Page". | |
| Use the Notion tools (`API-post-page`) for that. Use this ONLY for saving to Pinecone/Database. | |
| Args: | |
| text: The FULL content to save (do not summarize!) | |
| title: Title of the document/meeting | |
| source: Source of the content (e.g., "Notion", "Manual Entry") | |
| date: Optional date override (YYYY-MM-DD). If not provided, AI extracts it from text or uses today. | |
| Returns: | |
| Success message with the generated meeting_id | |
| """ | |
| if not _pinecone_manager: | |
| return "Error: Pinecone service is not initialized." | |
| try: | |
| # 1. Extract intelligent metadata | |
| print(f"π Extracting metadata for '{title}'...") | |
| extractor = MetadataExtractor() | |
| extracted = extractor.extract_metadata(text) | |
| # 2. Resolve final metadata values | |
| final_summary = extracted.get("summary") or f"Imported from {source}" | |
| # Date logic: Argument > Extracted > Today | |
| if date: | |
| final_date = date | |
| elif extracted.get("meeting_date"): | |
| final_date = extracted.get("meeting_date") | |
| else: | |
| final_date = datetime.now().strftime("%Y-%m-%d") | |
| speaker_mapping = extracted.get("speaker_mapping", {}) | |
| # 3. Apply speaker mapping to text (improves searchability) | |
| # Replaces "SPEAKER_00" -> "Name" directly in the text content | |
| processed_text = extractor.apply_speaker_mapping(text, speaker_mapping) | |
| # 4. Generate ID and prepare metadata | |
| meeting_id = f"doc_{uuid.uuid4().hex[:8]}" | |
| meeting_metadata = { | |
| "meeting_id": meeting_id, | |
| "meeting_date": final_date, | |
| "date_transcribed": datetime.now().strftime("%Y-%m-%d"), | |
| "source": source, | |
| "meeting_title": title, | |
| "summary": final_summary, | |
| "source_file": f"{source.lower()}_upload", | |
| "transcription_model": "text_import", | |
| "language": "en", | |
| "speaker_mapping": speaker_mapping | |
| } | |
| # 5. Process text into documents | |
| docs = process_transcript_to_documents( | |
| transcript_text=processed_text, | |
| speaker_data=None, # Uses fallback chunking | |
| meeting_id=meeting_id, | |
| meeting_metadata=meeting_metadata | |
| ) | |
| # 6. Upsert to Pinecone | |
| _pinecone_manager.upsert_documents(docs, namespace=Config.PINECONE_NAMESPACE) | |
| return (f"β Successfully saved '{title}' to Pinecone (ID: {meeting_id})\n" | |
| f" - Date: {final_date}\n" | |
| f" - Extracted Speakers: {', '.join(speaker_mapping.values()) if speaker_mapping else 'None'}") | |
| except Exception as e: | |
| traceback.print_exc() | |
| return f"β Error saving to Pinecone: {str(e)}" | |