| | """ |
| | LangChain-Compatible Tools for SPARKNET |
| | All tools follow LangChain's tool interface for seamless integration |
| | with LangGraph agents and workflows. |
| | """ |
| |
|
| | from typing import Optional, List, Dict, Any |
| | from pydantic import BaseModel, Field |
| | from langchain_core.tools import StructuredTool, tool |
| | from loguru import logger |
| | import json |
| |
|
| | |
| | try: |
| | import PyPDF2 |
| | import fitz |
| | PDF_AVAILABLE = True |
| | except ImportError: |
| | PDF_AVAILABLE = False |
| | logger.warning("PDF libraries not available. Install PyPDF2 and pymupdf.") |
| |
|
| | |
| | try: |
| | from reportlab.lib.pagesizes import letter |
| | from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer |
| | from reportlab.lib.styles import getSampleStyleSheet |
| | REPORTLAB_AVAILABLE = True |
| | except ImportError: |
| | REPORTLAB_AVAILABLE = False |
| | logger.warning("ReportLab not available. Install reportlab for PDF generation.") |
| |
|
| | |
| | try: |
| | from duckduckgo_search import DDGS |
| | DDGS_AVAILABLE = True |
| | except ImportError: |
| | DDGS_AVAILABLE = False |
| | logger.warning("DuckDuckGo search not available.") |
| |
|
| | try: |
| | import wikipedia |
| | WIKIPEDIA_AVAILABLE = True |
| | except ImportError: |
| | WIKIPEDIA_AVAILABLE = False |
| | logger.warning("Wikipedia not available.") |
| |
|
| | try: |
| | import arxiv |
| | ARXIV_AVAILABLE = True |
| | except ImportError: |
| | ARXIV_AVAILABLE = False |
| | logger.warning("Arxiv not available.") |
| |
|
| | |
| | from ..utils.gpu_manager import get_gpu_manager |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class PDFExtractorInput(BaseModel): |
| | """Input schema for PDF extraction.""" |
| | file_path: str = Field(..., description="Path to the PDF file") |
| | page_range: Optional[str] = Field(None, description="Page range (e.g., '1-5', 'all')") |
| | extract_metadata: bool = Field(True, description="Extract PDF metadata") |
| |
|
| |
|
| | class PatentParserInput(BaseModel): |
| | """Input schema for patent parsing.""" |
| | text: str = Field(..., description="Patent text to parse") |
| | extract_claims: bool = Field(True, description="Extract patent claims") |
| | extract_abstract: bool = Field(True, description="Extract abstract") |
| | extract_description: bool = Field(True, description="Extract description") |
| |
|
| |
|
| | class WebSearchInput(BaseModel): |
| | """Input schema for web search.""" |
| | query: str = Field(..., description="Search query") |
| | max_results: int = Field(5, description="Maximum number of results") |
| | region: str = Field("wt-wt", description="Search region (e.g., 'us-en', 'wt-wt')") |
| |
|
| |
|
| | class WikipediaInput(BaseModel): |
| | """Input schema for Wikipedia lookup.""" |
| | query: str = Field(..., description="Wikipedia search query") |
| | sentences: int = Field(3, description="Number of sentences to return") |
| |
|
| |
|
| | class ArxivInput(BaseModel): |
| | """Input schema for Arxiv search.""" |
| | query: str = Field(..., description="Search query") |
| | max_results: int = Field(5, description="Maximum number of results") |
| | sort_by: str = Field("relevance", description="Sort by: relevance, lastUpdatedDate, submittedDate") |
| |
|
| |
|
| | class DocumentGeneratorInput(BaseModel): |
| | """Input schema for document generation.""" |
| | output_path: str = Field(..., description="Output PDF file path") |
| | title: str = Field(..., description="Document title") |
| | content: str = Field(..., description="Document content (markdown or plain text)") |
| | author: Optional[str] = Field(None, description="Document author") |
| |
|
| |
|
| | class GPUMonitorInput(BaseModel): |
| | """Input schema for GPU monitoring.""" |
| | gpu_id: Optional[int] = Field(None, description="Specific GPU ID or None for all GPUs") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def pdf_extractor_func(file_path: str, page_range: Optional[str] = None, |
| | extract_metadata: bool = True) -> str: |
| | """ |
| | Extract text and metadata from PDF files. |
| | Supports both PyPDF2 and PyMuPDF (fitz) backends. |
| | |
| | Args: |
| | file_path: Path to PDF file |
| | page_range: Page range like '1-5' or 'all' (default: all) |
| | extract_metadata: Whether to extract metadata |
| | |
| | Returns: |
| | Extracted text and metadata as formatted string |
| | """ |
| | if not PDF_AVAILABLE: |
| | return "Error: PDF libraries not installed. Run: pip install PyPDF2 pymupdf" |
| |
|
| | try: |
| | |
| | doc = fitz.open(file_path) |
| |
|
| | |
| | if page_range and page_range.lower() != 'all': |
| | start, end = map(int, page_range.split('-')) |
| | pages = range(start - 1, min(end, len(doc))) |
| | else: |
| | pages = range(len(doc)) |
| |
|
| | |
| | text_parts = [] |
| | for page_num in pages: |
| | page = doc[page_num] |
| | text_parts.append(f"--- Page {page_num + 1} ---\n{page.get_text()}") |
| |
|
| | extracted_text = "\n\n".join(text_parts) |
| |
|
| | |
| | result = f"PDF: {file_path}\n" |
| | result += f"Total Pages: {len(doc)}\n" |
| | result += f"Extracted Pages: {len(pages)}\n\n" |
| |
|
| | if extract_metadata: |
| | metadata = doc.metadata |
| | result += "Metadata:\n" |
| | for key, value in metadata.items(): |
| | if value: |
| | result += f" {key}: {value}\n" |
| | result += "\n" |
| |
|
| | result += "=" * 80 + "\n" |
| | result += "EXTRACTED TEXT:\n" |
| | result += "=" * 80 + "\n" |
| | result += extracted_text |
| |
|
| | doc.close() |
| |
|
| | logger.info(f"Extracted {len(pages)} pages from {file_path}") |
| | return result |
| |
|
| | except Exception as e: |
| | logger.error(f"PDF extraction failed: {e}") |
| | return f"Error extracting PDF: {str(e)}" |
| |
|
| |
|
| | def patent_parser_func(text: str, extract_claims: bool = True, |
| | extract_abstract: bool = True, extract_description: bool = True) -> str: |
| | """ |
| | Parse patent document structure and extract key sections. |
| | Uses heuristics to identify: abstract, claims, description, drawings. |
| | |
| | Args: |
| | text: Patent text (from PDF or plain text) |
| | extract_claims: Extract patent claims |
| | extract_abstract: Extract abstract |
| | extract_description: Extract detailed description |
| | |
| | Returns: |
| | Structured patent information as JSON string |
| | """ |
| | try: |
| | result = { |
| | "abstract": "", |
| | "claims": [], |
| | "description": "", |
| | "metadata": {} |
| | } |
| |
|
| | lines = text.split('\n') |
| | current_section = None |
| |
|
| | |
| | for i, line in enumerate(lines): |
| | line_lower = line.lower().strip() |
| |
|
| | |
| | if 'abstract' in line_lower and len(line_lower) < 50: |
| | current_section = 'abstract' |
| | continue |
| | elif 'claim' in line_lower and len(line_lower) < 50: |
| | current_section = 'claims' |
| | continue |
| | elif 'description' in line_lower or 'detailed description' in line_lower: |
| | if len(line_lower) < 100: |
| | current_section = 'description' |
| | continue |
| | elif 'drawing' in line_lower or 'figure' in line_lower: |
| | if len(line_lower) < 50: |
| | current_section = 'drawings' |
| | continue |
| |
|
| | |
| | if current_section == 'abstract' and extract_abstract: |
| | if line.strip(): |
| | result['abstract'] += line + "\n" |
| | elif current_section == 'claims' and extract_claims: |
| | if line.strip() and (line.strip()[0].isdigit() or 'wherein' in line_lower): |
| | result['claims'].append(line.strip()) |
| | elif current_section == 'description' and extract_description: |
| | if line.strip(): |
| | result['description'] += line + "\n" |
| |
|
| | |
| | for line in lines[:20]: |
| | if 'patent' in line.lower() and any(char.isdigit() for char in line): |
| | result['metadata']['patent_number'] = line.strip() |
| | break |
| |
|
| | |
| | output = "PATENT ANALYSIS\n" |
| | output += "=" * 80 + "\n\n" |
| |
|
| | if result['abstract']: |
| | output += "ABSTRACT:\n" |
| | output += result['abstract'].strip()[:500] |
| | output += "\n\n" |
| |
|
| | if result['claims']: |
| | output += f"CLAIMS ({len(result['claims'])} found):\n" |
| | for i, claim in enumerate(result['claims'][:10], 1): |
| | output += f"\n{i}. {claim}\n" |
| | output += "\n" |
| |
|
| | if result['description']: |
| | output += "DESCRIPTION (excerpt):\n" |
| | output += result['description'].strip()[:1000] |
| | output += "\n\n" |
| |
|
| | output += "=" * 80 + "\n" |
| | output += f"JSON OUTPUT:\n{json.dumps(result, indent=2)}" |
| |
|
| | logger.info(f"Parsed patent: {len(result['claims'])} claims extracted") |
| | return output |
| |
|
| | except Exception as e: |
| | logger.error(f"Patent parsing failed: {e}") |
| | return f"Error parsing patent: {str(e)}" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def web_search_func(query: str, max_results: int = 5, region: str = "wt-wt") -> str: |
| | """ |
| | Search the web using DuckDuckGo. |
| | Returns top results with title, snippet, and URL. |
| | |
| | Args: |
| | query: Search query |
| | max_results: Maximum number of results |
| | region: Search region code |
| | |
| | Returns: |
| | Formatted search results |
| | """ |
| | if not DDGS_AVAILABLE: |
| | return "Error: DuckDuckGo search not installed. Run: pip install duckduckgo-search" |
| |
|
| | try: |
| | ddgs = DDGS() |
| | results = list(ddgs.text(query, region=region, max_results=max_results)) |
| |
|
| | if not results: |
| | return f"No results found for: {query}" |
| |
|
| | output = f"WEB SEARCH RESULTS: {query}\n" |
| | output += "=" * 80 + "\n\n" |
| |
|
| | for i, result in enumerate(results, 1): |
| | output += f"{i}. {result.get('title', 'No title')}\n" |
| | output += f" {result.get('body', 'No description')}\n" |
| | output += f" URL: {result.get('href', 'No URL')}\n\n" |
| |
|
| | logger.info(f"Web search completed: {len(results)} results for '{query}'") |
| | return output |
| |
|
| | except Exception as e: |
| | logger.error(f"Web search failed: {e}") |
| | return f"Error performing web search: {str(e)}" |
| |
|
| |
|
| | def wikipedia_func(query: str, sentences: int = 3) -> str: |
| | """ |
| | Search Wikipedia and return summary. |
| | |
| | Args: |
| | query: Wikipedia search query |
| | sentences: Number of sentences to return |
| | |
| | Returns: |
| | Wikipedia summary |
| | """ |
| | if not WIKIPEDIA_AVAILABLE: |
| | return "Error: Wikipedia not installed. Run: pip install wikipedia" |
| |
|
| | try: |
| | |
| | search_results = wikipedia.search(query) |
| |
|
| | if not search_results: |
| | return f"No Wikipedia page found for: {query}" |
| |
|
| | |
| | page = wikipedia.page(search_results[0], auto_suggest=False) |
| |
|
| | |
| | summary = wikipedia.summary(search_results[0], sentences=sentences, auto_suggest=False) |
| |
|
| | output = f"WIKIPEDIA: {page.title}\n" |
| | output += "=" * 80 + "\n\n" |
| | output += summary + "\n\n" |
| | output += f"URL: {page.url}\n" |
| | output += f"Categories: {', '.join(page.categories[:5])}\n" |
| |
|
| | logger.info(f"Wikipedia lookup completed: {page.title}") |
| | return output |
| |
|
| | except wikipedia.exceptions.DisambiguationError as e: |
| | options = ', '.join(e.options[:5]) |
| | return f"Disambiguation needed for '{query}'. Options: {options}" |
| | except wikipedia.exceptions.PageError: |
| | return f"No Wikipedia page found for: {query}" |
| | except Exception as e: |
| | logger.error(f"Wikipedia lookup failed: {e}") |
| | return f"Error: {str(e)}" |
| |
|
| |
|
| | def arxiv_func(query: str, max_results: int = 5, sort_by: str = "relevance") -> str: |
| | """ |
| | Search Arxiv for academic papers. |
| | |
| | Args: |
| | query: Search query |
| | max_results: Maximum number of results |
| | sort_by: Sort by relevance, lastUpdatedDate, or submittedDate |
| | |
| | Returns: |
| | Formatted Arxiv results |
| | """ |
| | if not ARXIV_AVAILABLE: |
| | return "Error: Arxiv not installed. Run: pip install arxiv" |
| |
|
| | try: |
| | |
| | sort_map = { |
| | "relevance": arxiv.SortCriterion.Relevance, |
| | "lastUpdatedDate": arxiv.SortCriterion.LastUpdatedDate, |
| | "submittedDate": arxiv.SortCriterion.SubmittedDate, |
| | } |
| | sort_criterion = sort_map.get(sort_by, arxiv.SortCriterion.Relevance) |
| |
|
| | |
| | search = arxiv.Search( |
| | query=query, |
| | max_results=max_results, |
| | sort_by=sort_criterion |
| | ) |
| |
|
| | results = list(search.results()) |
| |
|
| | if not results: |
| | return f"No Arxiv papers found for: {query}" |
| |
|
| | output = f"ARXIV SEARCH: {query}\n" |
| | output += "=" * 80 + "\n\n" |
| |
|
| | for i, paper in enumerate(results, 1): |
| | output += f"{i}. {paper.title}\n" |
| | output += f" Authors: {', '.join(str(author) for author in paper.authors[:3])}\n" |
| | output += f" Published: {paper.published.strftime('%Y-%m-%d')}\n" |
| | output += f" Summary: {paper.summary[:200]}...\n" |
| | output += f" PDF: {paper.pdf_url}\n" |
| | output += f" Categories: {', '.join(paper.categories)}\n\n" |
| |
|
| | logger.info(f"Arxiv search completed: {len(results)} papers for '{query}'") |
| | return output |
| |
|
| | except Exception as e: |
| | logger.error(f"Arxiv search failed: {e}") |
| | return f"Error searching Arxiv: {str(e)}" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def document_generator_func(output_path: str, title: str, content: str, |
| | author: Optional[str] = None) -> str: |
| | """ |
| | Generate PDF document from text content. |
| | Supports basic formatting and styling. |
| | |
| | Args: |
| | output_path: Output PDF file path |
| | title: Document title |
| | content: Document content (plain text or simple markdown) |
| | author: Optional author name |
| | |
| | Returns: |
| | Success message with file path |
| | """ |
| | if not REPORTLAB_AVAILABLE: |
| | return "Error: ReportLab not installed. Run: pip install reportlab" |
| |
|
| | try: |
| | |
| | doc = SimpleDocTemplate(output_path, pagesize=letter) |
| | styles = getSampleStyleSheet() |
| | story = [] |
| |
|
| | |
| | title_style = styles['Title'] |
| | story.append(Paragraph(title, title_style)) |
| | story.append(Spacer(1, 12)) |
| |
|
| | |
| | if author: |
| | author_style = styles['Normal'] |
| | story.append(Paragraph(f"By: {author}", author_style)) |
| | story.append(Spacer(1, 12)) |
| |
|
| | |
| | paragraphs = content.split('\n\n') |
| | for para in paragraphs: |
| | if para.strip(): |
| | |
| | if para.strip().startswith('#'): |
| | |
| | heading_text = para.strip().lstrip('#').strip() |
| | story.append(Paragraph(heading_text, styles['Heading2'])) |
| | else: |
| | |
| | story.append(Paragraph(para.strip(), styles['Normal'])) |
| | story.append(Spacer(1, 6)) |
| |
|
| | |
| | doc.build(story) |
| |
|
| | logger.info(f"Generated PDF: {output_path}") |
| | return f"Successfully generated PDF: {output_path}\nTitle: {title}\nPages: {len(paragraphs)}" |
| |
|
| | except Exception as e: |
| | logger.error(f"PDF generation failed: {e}") |
| | return f"Error generating PDF: {str(e)}" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def gpu_monitor_func(gpu_id: Optional[int] = None) -> str: |
| | """ |
| | Monitor GPU status, memory usage, and utilization. |
| | |
| | Args: |
| | gpu_id: Specific GPU ID or None for all GPUs |
| | |
| | Returns: |
| | Formatted GPU status information |
| | """ |
| | try: |
| | gpu_manager = get_gpu_manager() |
| |
|
| | if gpu_id is not None: |
| | |
| | info = gpu_manager.get_gpu_info(gpu_id) |
| |
|
| | if "error" in info: |
| | return f"Error: {info['error']}" |
| |
|
| | output = f"GPU {info['gpu_id']}: {info['name']}\n" |
| | output += f"Memory: {info['memory_used'] / 1024**3:.2f} GB / {info['memory_total'] / 1024**3:.2f} GB " |
| | output += f"({info['memory_percent']:.1f}% used)\n" |
| | output += f"Free Memory: {info['memory_free'] / 1024**3:.2f} GB\n" |
| | output += f"GPU Utilization: {info['gpu_utilization']}%\n" |
| | output += f"Temperature: {info['temperature']}°C\n" |
| |
|
| | return output |
| | else: |
| | |
| | return gpu_manager.monitor() |
| |
|
| | except Exception as e: |
| | logger.error(f"GPU monitoring error: {e}") |
| | return f"Error monitoring GPU: {str(e)}" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | |
| | pdf_extractor_tool = StructuredTool.from_function( |
| | func=pdf_extractor_func, |
| | name="pdf_extractor", |
| | description=( |
| | "Extract text and metadata from PDF files. " |
| | "Useful for analyzing patent documents, research papers, and legal documents. " |
| | "Supports page range selection and metadata extraction." |
| | ), |
| | args_schema=PDFExtractorInput, |
| | return_direct=False, |
| | ) |
| |
|
| | patent_parser_tool = StructuredTool.from_function( |
| | func=patent_parser_func, |
| | name="patent_parser", |
| | description=( |
| | "Parse patent document structure and extract key sections: abstract, claims, description. " |
| | "Useful for analyzing patent documents and identifying key innovations." |
| | ), |
| | args_schema=PatentParserInput, |
| | return_direct=False, |
| | ) |
| |
|
| | web_search_tool = StructuredTool.from_function( |
| | func=web_search_func, |
| | name="web_search", |
| | description=( |
| | "Search the web using DuckDuckGo. Returns top results with titles, snippets, and URLs. " |
| | "Useful for market research, competitor analysis, and finding relevant information." |
| | ), |
| | args_schema=WebSearchInput, |
| | return_direct=False, |
| | ) |
| |
|
| | wikipedia_tool = StructuredTool.from_function( |
| | func=wikipedia_func, |
| | name="wikipedia", |
| | description=( |
| | "Search Wikipedia and get article summaries. " |
| | "Useful for background information on technologies, companies, and concepts." |
| | ), |
| | args_schema=WikipediaInput, |
| | return_direct=False, |
| | ) |
| |
|
| | arxiv_tool = StructuredTool.from_function( |
| | func=arxiv_func, |
| | name="arxiv_search", |
| | description=( |
| | "Search Arxiv for academic papers and preprints. " |
| | "Useful for finding relevant research, state-of-the-art methods, and technical background." |
| | ), |
| | args_schema=ArxivInput, |
| | return_direct=False, |
| | ) |
| |
|
| | document_generator_tool = StructuredTool.from_function( |
| | func=document_generator_func, |
| | name="document_generator", |
| | description=( |
| | "Generate PDF documents from text content. " |
| | "Useful for creating reports, briefs, and documentation." |
| | ), |
| | args_schema=DocumentGeneratorInput, |
| | return_direct=False, |
| | ) |
| |
|
| | gpu_monitor_tool = StructuredTool.from_function( |
| | func=gpu_monitor_func, |
| | name="gpu_monitor", |
| | description=( |
| | "Monitor GPU status including memory usage, utilization, and temperature. " |
| | "Useful for checking GPU availability before running models." |
| | ), |
| | args_schema=GPUMonitorInput, |
| | return_direct=False, |
| | ) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class VISTAToolRegistry: |
| | """ |
| | Registry of tools organized by VISTA scenario. |
| | Enables scenario-specific tool selection for optimal performance. |
| | """ |
| |
|
| | SCENARIO_TOOLS = { |
| | "patent_wakeup": [ |
| | pdf_extractor_tool, |
| | patent_parser_tool, |
| | web_search_tool, |
| | wikipedia_tool, |
| | arxiv_tool, |
| | document_generator_tool, |
| | ], |
| | "agreement_safety": [ |
| | pdf_extractor_tool, |
| | web_search_tool, |
| | document_generator_tool, |
| | ], |
| | "partner_matching": [ |
| | web_search_tool, |
| | wikipedia_tool, |
| | arxiv_tool, |
| | ], |
| | "general": [ |
| | pdf_extractor_tool, |
| | patent_parser_tool, |
| | web_search_tool, |
| | wikipedia_tool, |
| | arxiv_tool, |
| | document_generator_tool, |
| | gpu_monitor_tool, |
| | ], |
| | } |
| |
|
| | @classmethod |
| | def get_tools(cls, scenario: str = "general") -> List[StructuredTool]: |
| | """ |
| | Get tools for a specific VISTA scenario. |
| | |
| | Args: |
| | scenario: VISTA scenario type |
| | |
| | Returns: |
| | List of LangChain tools |
| | """ |
| | tools = cls.SCENARIO_TOOLS.get(scenario, cls.SCENARIO_TOOLS["general"]) |
| | logger.info(f"Retrieved {len(tools)} tools for scenario: {scenario}") |
| | return tools |
| |
|
| | @classmethod |
| | def get_all_tools(cls) -> List[StructuredTool]: |
| | """Get all available tools.""" |
| | return cls.SCENARIO_TOOLS["general"] |
| |
|
| | @classmethod |
| | def list_scenarios(cls) -> List[str]: |
| | """List available scenarios.""" |
| | return list(cls.SCENARIO_TOOLS.keys()) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def get_vista_tools(scenario: str = "general") -> List[StructuredTool]: |
| | """ |
| | Get LangChain tools for a VISTA scenario. |
| | |
| | Args: |
| | scenario: Scenario name (patent_wakeup, agreement_safety, partner_matching, general) |
| | |
| | Returns: |
| | List of LangChain StructuredTool instances |
| | """ |
| | return VISTAToolRegistry.get_tools(scenario) |
| |
|
| |
|
| | def get_all_tools() -> List[StructuredTool]: |
| | """Get all available LangChain tools.""" |
| | return VISTAToolRegistry.get_all_tools() |
| |
|
| |
|
| | |
| | __all__ = [ |
| | "pdf_extractor_tool", |
| | "patent_parser_tool", |
| | "web_search_tool", |
| | "wikipedia_tool", |
| | "arxiv_tool", |
| | "document_generator_tool", |
| | "gpu_monitor_tool", |
| | "VISTAToolRegistry", |
| | "get_vista_tools", |
| | "get_all_tools", |
| | ] |
| |
|