| """ |
| Financial Report MCP Server using the official MCP Python SDK |
| |
| This server provides tools for downloading and processing financial reports. |
| """ |
|
|
| import asyncio |
| import logging |
| import os |
| import sys |
| from pathlib import Path |
| from typing import Optional, Dict, Any, List |
| from datetime import datetime |
| import aiohttp |
| import ssl |
| import pdfplumber |
| from bs4 import BeautifulSoup |
| import httpx |
| import json |
| import re |
| from huggingface_hub import InferenceClient |
|
|
| |
| logging.basicConfig(level=logging.INFO, stream=sys.stderr) |
| logger = logging.getLogger(__name__) |
|
|
| |
| try: |
| from mcp.server.fastmcp import FastMCP, Context |
| from mcp.server.session import ServerSession |
| logger.info("MCP SDK imported successfully") |
| except ImportError as e: |
| logger.error(f"Failed to import MCP SDK: {e}") |
| raise |
|
|
| |
| mcp = FastMCP("Financial Report MCP Server", "1.0.0") |
|
|
| |
| reports_dir = Path("financial_reports") |
| reports_dir.mkdir(exist_ok=True) |
| logger.info(f"Financial reports directory: {reports_dir.absolute()}") |
|
|
| @mcp.tool() |
| async def download_financial_report(url: str) -> Dict[str, Any]: |
| """ |
| Download a financial report from a URL |
| |
| Args: |
| url: The URL of the financial report to download |
| |
| Returns: |
| Dictionary with download information |
| """ |
| logger.info(f"Downloading financial report from {url}") |
| |
| try: |
| |
| import urllib.parse |
| decoded_url = urllib.parse.unquote(url) |
| logger.info(f"Decoded URL: {decoded_url}") |
| |
| |
| encoded_url = urllib.parse.quote(decoded_url, safe=':/?#[]@!$&\'()*+,;=%') |
| logger.info(f"Re-encoded URL: {encoded_url}") |
| |
| |
| ssl_context = ssl.create_default_context() |
| ssl_context.check_hostname = False |
| ssl_context.verify_mode = ssl.CERT_NONE |
| |
| |
| timeout = aiohttp.ClientTimeout(total=30) |
| headers = { |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
| } |
| |
| async with aiohttp.ClientSession(timeout=timeout) as session: |
| async with session.get(encoded_url, ssl=ssl_context, headers=headers) as response: |
| if response.status != 200: |
| raise Exception(f"HTTP {response.status} when downloading {encoded_url}") |
| |
| |
| |
| content_type = response.headers.get('content-type', '').lower() |
| is_html = 'html' in content_type |
| is_investor_page = any(pattern in url.lower() for pattern in ['investor', 'ir.', 'press-release', 'earnings', 'financial']) |
| |
| if is_html and is_investor_page: |
| logger.info(f"[DOWNLOAD] Detected HTML investor relations page, attempting to extract PDF links") |
| |
| pdf_links = await extract_pdf_links_from_page(url, "") |
| if pdf_links: |
| |
| pdf_url = pdf_links[0]["url"] |
| logger.info(f"[DOWNLOAD] Found PDF link, redirecting download to: {pdf_url}") |
| |
| return await download_financial_report(pdf_url) |
| else: |
| logger.warning(f"[DOWNLOAD] No PDF links found on investor page, downloading HTML anyway") |
| |
| |
| filename = decoded_url.split("/")[-1] |
| if not filename or "." not in filename: |
| if 'pdf' in content_type: |
| filename = f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf" |
| elif 'html' in content_type: |
| filename = f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html" |
| else: |
| filename = f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.dat" |
| |
| |
| file_path = Path("financial_reports") / filename |
| content = await response.read() |
| |
| logger.info(f"Saving report to {file_path.absolute()}") |
| with open(file_path, "wb") as f: |
| f.write(content) |
| |
| logger.info(f"Successfully downloaded report to {file_path}") |
| |
| return { |
| "filename": filename, |
| "filepath": str(file_path), |
| "size": len(content), |
| "download_time": datetime.now().isoformat(), |
| "source_url": url |
| } |
| except aiohttp.ClientError as e: |
| logger.error(f"Network error downloading financial report: {str(e)}") |
| raise Exception(f"Network error downloading financial report: {str(e)}. This may be due to network restrictions in the execution environment.") |
| except Exception as e: |
| logger.error(f"Error downloading financial report: {str(e)}") |
| raise Exception(f"Error downloading financial report: {str(e)}") |
|
|
| @mcp.tool() |
| async def list_downloaded_reports() -> Dict[str, Any]: |
| """ |
| List all downloaded financial reports |
| |
| Returns: |
| Dictionary with list of reports |
| """ |
| try: |
| reports = [] |
| download_dir = Path("financial_reports") |
| if download_dir.exists(): |
| for file_path in download_dir.iterdir(): |
| if file_path.is_file(): |
| stat = file_path.stat() |
| |
| import urllib.parse |
| reports.append({ |
| "filename": file_path.name, |
| "filepath": str(file_path), |
| "size": stat.st_size, |
| "modified": datetime.fromtimestamp(stat.st_mtime).isoformat(), |
| "encoded_filename": urllib.parse.quote(file_path.name, safe=':/?#[]@!$&\'()*+,;=%') |
| }) |
| |
| return { |
| "reports": reports |
| } |
| except Exception as e: |
| logger.error(f"Error listing downloaded reports: {str(e)}") |
| raise Exception(f"Error listing downloaded reports: {str(e)}") |
|
|
|
|
| @mcp.tool() |
| async def analyze_financial_report_file(filename: str, source_url: str = "") -> Dict[str, Any]: |
| """ |
| Analyze a downloaded financial report file and provide investment insights |
| |
| Args: |
| filename: Name of the financial report file to analyze |
| source_url: Optional original URL where the report was downloaded from |
| |
| Returns: |
| Dictionary with analysis results and investment insights |
| """ |
| logger.info(f"Analyzing financial report file: {filename}") |
| if source_url: |
| logger.info(f"Source URL: {source_url}") |
| |
| try: |
| |
| if not filename or filename.strip() == "": |
| logger.info("[AUTO-DETECT] No filename provided, looking for most recent downloaded file") |
| reports_dir = Path("financial_reports") |
| if reports_dir.exists(): |
| |
| files = [(f, f.stat().st_mtime) for f in reports_dir.iterdir() if f.is_file()] |
| if files: |
| |
| files.sort(key=lambda x: x[1], reverse=True) |
| filename = files[0][0].name |
| logger.info(f"[AUTO-DETECT] Found most recent file: {filename}") |
| else: |
| raise Exception("No filename provided and no downloaded files found in financial_reports directory") |
| else: |
| raise Exception("No filename provided and financial_reports directory does not exist") |
| |
| |
| reports_dir = Path("financial_reports").absolute() |
| file_path = reports_dir / filename |
| |
| if not file_path.exists(): |
| |
| relative_path = Path("financial_reports") / filename |
| if relative_path.exists(): |
| file_path = relative_path |
| else: |
| raise Exception(f"File not found: {filename}. Searched in {reports_dir} and relative path {relative_path}") |
| |
| |
| file_content = "" |
| if filename.lower().endswith('.pdf'): |
| try: |
| import pdfplumber |
| with pdfplumber.open(file_path) as pdf: |
| text = "" |
| |
| pages_to_extract = min(10, len(pdf.pages)) |
| for i in range(pages_to_extract): |
| page = pdf.pages[i] |
| text += page.extract_text() or "" |
| file_content = text |
| except Exception as e: |
| |
| logger.error(f"Error extracting text from PDF {filename}: {str(e)}") |
| file_content = f"Error extracting text from PDF {filename}: {str(e)}" |
| else: |
| |
| with open(file_path, "r", encoding="utf-8") as f: |
| file_content = f.read() |
| |
| |
| is_html = ( |
| filename.lower().endswith('.html') or |
| '<html' in file_content.lower()[:500] or |
| '<!doctype html' in file_content.lower()[:500] or |
| '<meta' in file_content.lower()[:500] |
| ) |
| |
| if is_html and source_url: |
| logger.info(f"[HTML EXTRACTION] Detected HTML content, extracting text from source URL: {source_url}") |
| try: |
| from bs4 import BeautifulSoup |
| |
| |
| async with httpx.AsyncClient(timeout=30.0) as client: |
| response = await client.get(source_url, headers={ |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' |
| }) |
| response.raise_for_status() |
| |
| |
| soup = BeautifulSoup(response.text, 'html.parser') |
| |
| |
| for element in soup(["script", "style", "nav", "header", "footer", "noscript"]): |
| element.decompose() |
| |
| |
| text = soup.get_text(separator='\n', strip=True) |
| |
| |
| lines = [line.strip() for line in text.splitlines() if line.strip()] |
| clean_text = '\n'.join(lines) |
| |
| if clean_text: |
| file_content = clean_text |
| logger.info(f"[HTML EXTRACTION] Successfully extracted {len(file_content)} characters of clean text") |
| else: |
| logger.warning(f"[HTML EXTRACTION] No text extracted, using original HTML") |
| |
| except Exception as e: |
| logger.error(f"[HTML EXTRACTION] Failed to extract text: {str(e)}") |
| logger.info(f"[HTML EXTRACTION] Falling back to original HTML content") |
| |
| |
| |
| if len(file_content) > 15000: |
| file_content = file_content[:15000] + "... (truncated)" |
| |
| |
| |
| result = { |
| "type": "file_analysis_trigger", |
| "file_path": str(file_path), |
| "filename": filename, |
| "content": file_content, |
| "content_preview": file_content[:500] + "... (preview truncated)" if len(file_content) > 500 else file_content |
| } |
| |
| |
| if source_url: |
| result["source_url"] = source_url |
| logger.info(f"Including source URL in analysis result: {source_url}") |
| |
| return result |
| except Exception as e: |
| logger.error(f"Error analyzing financial report file {filename}: {str(e)}") |
| raise Exception(f"Error analyzing financial report file {filename}: {str(e)}") |
|
|
|
|
| |
| @mcp.tool() |
| async def search_and_extract_financial_report(user_query: str) -> Dict[str, Any]: |
| """ |
| Search for financial reports online based on user's query and return raw search results for Agent analysis |
| |
| Args: |
| user_query: The user's complete search query |
| |
| Returns: |
| Dictionary with raw search results for Agent analysis |
| """ |
| |
| search_base_url = 'https://www.googleapis.com/customsearch/v1' |
| |
| params = { |
| "key": "AIzaSyARhFllOKRdpHjij5idJZ-vXa-0fdIQqGI", |
| "cx": "51d2770bb9e304626", |
| "q": user_query |
| } |
|
|
| logger.info(f"Searching for financial reports with query: {user_query}") |
|
|
| try: |
| async with httpx.AsyncClient() as client: |
| response = await client.get(search_base_url, params=params) |
| response.raise_for_status() |
| search_results = response.json() |
| |
| |
| if "items" in search_results and search_results["items"]: |
| |
| return { |
| "type": "search_results", |
| "results": search_results["items"], |
| "message": f"Successfully found {len(search_results['items'])} search results for query: {user_query}" |
| } |
| else: |
| |
| return { |
| "type": "search_no_results", |
| "message": f"No financial reports found for query: {user_query}", |
| "suggestion": "Please provide a direct URL (or PDF format URL) for the financial report you're looking for." |
| } |
| except httpx.RequestError as e: |
| logger.error(f"Error performing web search: {str(e)}") |
| return { |
| "type": "search_error", |
| "error": str(e), |
| "message": f"Exception while searching for financial reports with query '{user_query}': {str(e)}", |
| "suggestion": "Please ask user to provide a direct URL (or PDF format URL) for the financial report due to search error." |
| } |
| |
|
|
| @mcp.tool() |
| def rank_pdf_links_by_relevance(pdf_links: List[Dict[str, str]], user_request: str) -> List[Dict[str, str]]: |
| """ |
| Rank PDF links by relevance to user request |
| |
| Args: |
| pdf_links: List of PDF links to rank |
| user_request: User's specific request |
| |
| Returns: |
| Ranked list of PDF links |
| """ |
| |
| user_request_lower = user_request.lower() |
| |
| |
| scored_links = [] |
| for link in pdf_links: |
| title = link.get("title", "").lower() |
| snippet = link.get("snippet", "").lower() |
| |
| score = 0 |
| |
| |
| request_tokens = set(user_request_lower.split()) |
| title_tokens = set(title.split()) |
| snippet_tokens = set(snippet.split()) |
| |
| |
| title_overlap = len(request_tokens & title_tokens) |
| snippet_overlap = len(request_tokens & snippet_tokens) |
| |
| if title_overlap > 0: |
| score += title_overlap * 2 |
| if snippet_overlap > 0: |
| score += snippet_overlap |
| |
| |
| import re |
| year_matches = re.findall(r'\b(19|20)\d{2}\b', user_request_lower) |
| for year in year_matches: |
| if year in title or year in snippet: |
| score += 1 |
| |
| |
| recent_indicators = ['最近', 'recent', 'latest', 'newest'] |
| if any(indicator in user_request_lower for indicator in recent_indicators): |
| |
| current_year = datetime.now().year |
| for i in range(3): |
| year_str = str(current_year - i) |
| if year_str in title or year_str in snippet: |
| score += (3 - i) |
| |
| scored_links.append((score, link)) |
| |
| |
| scored_links.sort(key=lambda x: x[0], reverse=True) |
| |
| |
| return [link for score, link in scored_links] |
|
|
|
|
| async def extract_pdf_links_from_page(url: str, user_request: str = "") -> List[Dict[str, str]]: |
| """ |
| Extract PDF links from a financial report index page and rank them based on user request |
| |
| Args: |
| url: URL of the index page to parse |
| user_request: User's specific request for filtering relevant PDFs |
| |
| Returns: |
| List of dictionaries containing PDF link information, sorted by relevance |
| """ |
| logger.info(f"Extracting PDF links from page: {url}") |
| |
| try: |
| |
| ssl_context = ssl.create_default_context() |
| ssl_context.check_hostname = False |
| ssl_context.verify_mode = ssl.CERT_NONE |
| |
| |
| timeout = aiohttp.ClientTimeout(total=30) |
| headers = { |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
| } |
| |
| async with aiohttp.ClientSession(timeout=timeout) as session: |
| async with session.get(url, ssl=ssl_context, headers=headers) as response: |
| if response.status != 200: |
| logger.warning(f"HTTP {response.status} when fetching {url}") |
| return [] |
| |
| content = await response.text() |
| soup = BeautifulSoup(content, 'html.parser') |
| |
| pdf_links = [] |
| |
| |
| for link_elem in soup.find_all('a', href=True): |
| href = link_elem['href'] |
| title = link_elem.get_text(strip=True) |
| |
| |
| if href.lower().endswith('.pdf'): |
| |
| if href.startswith('//'): |
| href = 'https:' + href |
| elif href.startswith('/'): |
| |
| from urllib.parse import urljoin |
| href = urljoin(url, href) |
| elif not href.startswith('http'): |
| |
| from urllib.parse import urljoin |
| href = urljoin(url, href) |
| |
| pdf_links.append({ |
| "url": href, |
| "title": title or "PDF Report", |
| "snippet": f"PDF document: {title}" |
| }) |
| |
| |
| |
| for link_elem in soup.find_all('a', href=True): |
| href = link_elem['href'] |
| title = link_elem.get_text(strip=True) |
| title_lower = title.lower() |
| |
| |
| |
| request_tokens = set(user_request.lower().split()) if user_request else set() |
| title_tokens = set(title_lower.split()) |
| |
| |
| has_request_match = len(request_tokens & title_tokens) > 0 if request_tokens else False |
| has_pdf_indicator = 'pdf' in title_lower or '.pdf' in href.lower() |
| |
| if has_request_match or has_pdf_indicator: |
| |
| if href.startswith('//'): |
| href = 'https:' + href |
| elif href.startswith('/'): |
| |
| from urllib.parse import urljoin |
| href = urljoin(url, href) |
| elif not href.startswith('http'): |
| |
| from urllib.parse import urljoin |
| href = urljoin(url, href) |
| |
| |
| if href.lower().endswith('.pdf'): |
| pdf_links.append({ |
| "url": href, |
| "title": title or "PDF Report", |
| "snippet": f"PDF document: {title}" |
| }) |
| |
| |
| if user_request: |
| ranked_links = rank_pdf_links_by_relevance(pdf_links, user_request) |
| else: |
| ranked_links = pdf_links |
| |
| logger.info(f"Found {len(ranked_links)} PDF links on page {url}") |
| return ranked_links |
| except Exception as e: |
| logger.error(f"Error extracting PDF links from {url}: {str(e)}") |
| return [] |
|
|
|
|
| @mcp.tool() |
| async def deep_analyze_and_extract_download_link(search_results: List[Dict[str, Any]], user_request: str) -> Dict[str, Any]: |
| """ |
| Deep analyze search results using LLM and extract the most relevant download link based on user request |
| |
| Args: |
| search_results: List of search results from search_and_extract_financial_report |
| user_request: The user's specific request |
| |
| Returns: |
| Dictionary with the most relevant download link and related information |
| """ |
| logger.info(f"Deep analyzing search results for user request: {user_request}") |
| |
| |
| |
| user_request_lower = user_request.lower() |
| |
| |
| quarters_requested = [] |
| |
| |
| import re |
| q_pattern = re.findall(r'\bq([1-4])\b', user_request_lower) |
| for q_num in q_pattern: |
| quarter_key = f'q{q_num}' |
| if quarter_key not in quarters_requested: |
| quarters_requested.append(quarter_key) |
| |
| |
| quarter_words = { |
| 'first': 'q1', |
| 'second': 'q2', |
| 'third': 'q3', |
| 'fourth': 'q4', |
| '1st': 'q1', |
| '2nd': 'q2', |
| '3rd': 'q3', |
| '4th': 'q4' |
| } |
| |
| for word, q_key in quarter_words.items(): |
| if word in user_request_lower and 'quarter' in user_request_lower: |
| if q_key not in quarters_requested: |
| quarters_requested.append(q_key) |
| |
| is_multiple_quarter_request = len(quarters_requested) > 1 |
| logger.info(f"[MULTI-QUARTER DETECTION] Quarters requested: {quarters_requested}, is_multiple: {is_multiple_quarter_request}") |
| |
| try: |
| |
| formatted_results = [] |
| for i, result in enumerate(search_results[:10]): |
| formatted_results.append({ |
| "index": i, |
| "title": result.get("title", ""), |
| "link": result.get("link", ""), |
| "snippet": result.get("snippet", "") |
| }) |
| |
| |
| prompt = f""" |
| You are a financial report analysis expert. Your task is to analyze search results and identify the most relevant download link for a user's specific request. |
| |
| User Request: {user_request} |
| |
| Search Results: |
| {json.dumps(formatted_results, indent=2)} |
| |
| Please analyze these search results and identify the most relevant financial report that matches the user's request. Consider factors such as: |
| 1. **CRITICAL: Prefer direct PDF download links (.pdf URLs) over web pages** - Users want downloadable files, not landing pages |
| 2. Relevance to the user's specific request (company name, report type, quarter/year, etc.) |
| 3. Source credibility (official company websites, SEC.gov, etc.) |
| 4. Match the exact period requested (e.g., if user asks for Q1 2025, prioritize Q1 2025 reports over annual reports) |
| 5. Avoid generic index pages or landing pages - look for specific report PDFs |
| |
| Priority Rules: |
| - Direct PDF link for the exact period requested = HIGHEST PRIORITY |
| - Direct PDF link for a related period = HIGH PRIORITY |
| - Web page or landing page = LOW PRIORITY (only if no PDF available) |
| |
| Respond with a JSON object in the following format: |
| {{ |
| "selected_index": 0, |
| "reasoning": "Explanation of why this result was selected", |
| "confidence": "high|medium|low" |
| }} |
| |
| If none of the results are relevant, respond with: |
| {{ |
| "selected_index": -1, |
| "reasoning": "Explanation of why no results are relevant", |
| "confidence": "low" |
| }} |
| """ |
| |
| |
| try: |
| import sys |
| import os |
| print(f"[LLM-DEBUG] About to initialize InferenceClient...", file=sys.stderr) |
| |
| |
| hf_token = os.getenv("HUGGING_FACE_HUB_TOKEN") |
| if hf_token: |
| print(f"[LLM-DEBUG] Found HUGGING_FACE_HUB_TOKEN (length: {len(hf_token)})", file=sys.stderr) |
| else: |
| print(f"[LLM-DEBUG] WARNING: No token found", file=sys.stderr) |
| |
| |
| from huggingface_hub import InferenceClient |
| client = InferenceClient( |
| token=hf_token, |
| base_url="https://api-inference.huggingface.co/models/Qwen/Qwen2.5-72B-Instruct" |
| ) |
| print(f"[LLM-DEBUG] InferenceClient initialized successfully", file=sys.stderr) |
| |
| |
| messages = [ |
| {"role": "system", "content": "You are a precise JSON generator that helps analyze financial report search results. You are also helpful in guiding users to find the most relevant financial reports. You should ONLY generate valid JSON responses in the specified format."}, |
| {"role": "user", "content": prompt} |
| ] |
| |
| |
| response = client.chat.completions.create( |
| model="Qwen/Qwen2.5-72B-Instruct", |
| messages=messages, |
| max_tokens=500, |
| temperature=0.3, |
| ) |
| |
| |
| if hasattr(response, 'choices') and len(response.choices) > 0: |
| content = response.choices[0].message.content if hasattr(response.choices[0].message, 'content') else str(response.choices[0].message) |
| else: |
| content = str(response) |
| |
| |
| try: |
| |
| json_match = re.search(r'\{.*\}', content, re.DOTALL) |
| if json_match: |
| json_str = json_match.group(0) |
| llm_result = json.loads(json_str) |
| |
| |
| selected_index = llm_result.get("selected_index", -1) |
| reasoning = llm_result.get("reasoning", "No reasoning provided") |
| confidence = llm_result.get("confidence", "low") |
| |
| |
| if 0 <= selected_index < len(formatted_results): |
| selected_result = formatted_results[selected_index] |
| original_result = search_results[selected_index] |
| |
| |
| link = selected_result["link"] |
| if not link.lower().endswith(".pdf"): |
| |
| if "investor" in link or "ir." in link or "press-release" in link or "earnings" in link: |
| logger.info(f"[LLM-SELECTED] Non-PDF link detected, attempting to extract PDF from page: {link}") |
| pdf_links = await extract_pdf_links_from_page(link, user_request) |
| if pdf_links: |
| |
| pdf_link = pdf_links[0] |
| logger.info(f"[LLM-SELECTED] Successfully extracted PDF: {pdf_link.get('title', 'PDF Report')}") |
| return { |
| "type": "download_link_extracted", |
| "title": f"{selected_result['title']} - {pdf_link.get('title', 'PDF Report')}", |
| "link": pdf_link["url"], |
| "snippet": pdf_link.get("snippet", selected_result["snippet"]), |
| "message": f"Found the most relevant financial report for your request: {pdf_link.get('title', 'PDF Report')}", |
| "confidence": confidence, |
| "reasoning": f"{reasoning}. Extracted PDF link from the selected page." |
| } |
| else: |
| logger.warning(f"[LLM-SELECTED] No PDF links found on page: {link}") |
| |
| return { |
| "type": "download_link_extracted", |
| "title": selected_result["title"], |
| "link": selected_result["link"], |
| "snippet": selected_result["snippet"], |
| "message": f"Found the most relevant financial report for your request: {selected_result['title']}", |
| "confidence": confidence, |
| "reasoning": reasoning |
| } |
| elif selected_index == -1: |
| |
| if search_results: |
| first_result = search_results[0] |
| return { |
| "type": "download_link_extracted", |
| "title": first_result.get("title", ""), |
| "link": first_result.get("link", ""), |
| "snippet": first_result.get("snippet", ""), |
| "message": "Found a potential financial report, but it may not exactly match your request.", |
| "confidence": "low", |
| "reasoning": reasoning |
| } |
| else: |
| return { |
| "type": "no_results", |
| "message": "No search results available to analyze.", |
| "suggestion": "Please try a different search or provide a direct URL.", |
| "reasoning": "No search results were provided for analysis." |
| } |
| else: |
| |
| raise ValueError("Invalid selected_index from LLM response") |
| else: |
| |
| raise ValueError("No valid JSON found in LLM response") |
| except (json.JSONDecodeError, ValueError) as e: |
| |
| logger.warning(f"LLM response parsing failed, falling back to heuristic analysis: {str(e)}") |
| pass |
| except Exception as llm_error: |
| |
| logger.warning(f"LLM call failed, falling back to heuristic analysis: {str(llm_error)}") |
| pass |
| |
| |
| logger.info("Using heuristic-based selection as fallback") |
| best_match_index = -1 |
| best_score = -1 |
| |
| user_request_lower = user_request.lower() |
| |
| |
| |
| |
| company_mentions = {} |
| domain_to_company = {} |
| |
| |
| for result in formatted_results: |
| title = result.get("title", "").lower() |
| link = result.get("link", "").lower() |
| |
| |
| domain_match = re.search(r'https?://(?:www\.)?([^/]+)', link) |
| if domain_match: |
| domain = domain_match.group(1) |
| |
| |
| |
| |
| |
| |
| domain_parts = domain.replace('www.', '').replace('ir.', '').replace('investor.', '').replace('investors.', '') |
| |
| |
| core_domain = domain_parts.split('.')[0] |
| |
| |
| company_key = core_domain |
| |
| |
| company_mentions[company_key] = company_mentions.get(company_key, 0) + 1 |
| domain_to_company[domain] = company_key |
| |
| |
| primary_company = None |
| if company_mentions: |
| primary_company = max(company_mentions.items(), key=lambda x: x[1])[0] |
| logger.info(f"[COMPANY DETECTION] Detected primary company: '{primary_company}' (mentioned in {company_mentions[primary_company]} results)") |
| logger.info(f"[COMPANY DETECTION] All companies found: {company_mentions}") |
| |
| for i, result in enumerate(formatted_results): |
| title = result.get("title", "").lower() |
| snippet = result.get("snippet", "").lower() |
| link = result.get("link", "") |
| |
| |
| original_result = search_results[i] if i < len(search_results) else {} |
| |
| |
| score = 0 |
| |
| |
| |
| if primary_company: |
| |
| domain_match = re.search(r'https?://(?:www\.)?([^/]+)', link) |
| if domain_match: |
| result_domain = domain_match.group(1) |
| result_company = domain_to_company.get(result_domain, None) |
| |
| if result_company == primary_company: |
| |
| score += 30 |
| logger.info(f"[SCORE] Result {i} from primary company '{primary_company}' (domain: {result_domain}) - score +30") |
| elif result_company and result_company != primary_company: |
| |
| score -= 100 |
| logger.info(f"[SCORE] Result {i} from WRONG company '{result_company}' (expected '{primary_company}') - score -100") |
| |
| |
| |
| is_pdf = False |
| if link.lower().endswith(".pdf"): |
| is_pdf = True |
| score += 10 |
| |
| |
| if original_result.get("mime") == "application/pdf" or original_result.get("fileFormat") == "PDF/Adobe Acrobat": |
| is_pdf = True |
| score += 12 |
| logger.info(f"[SCORE] Result {i} has PDF metadata (mime/fileFormat) - score +12") |
| |
| |
| |
| request_tokens = set(user_request_lower.split()) |
| title_tokens = set(title.split()) |
| snippet_tokens = set(snippet.split()) |
| |
| |
| title_overlap = len(request_tokens & title_tokens) |
| snippet_overlap = len(request_tokens & snippet_tokens) |
| |
| |
| if title_overlap > 0: |
| score += title_overlap * 2 |
| logger.info(f"[SCORE] Result {i} has {title_overlap} matching words in title - score +{title_overlap * 2}") |
| |
| if snippet_overlap > 0: |
| score += snippet_overlap |
| logger.info(f"[SCORE] Result {i} has {snippet_overlap} matching words in snippet - score +{snippet_overlap}") |
| |
| |
| year_patterns = re.findall(r'\b(19|20)\d{2}\b', user_request_lower) |
| for year in year_patterns: |
| if year in title or year in snippet or year in link: |
| score += 2 |
| logger.info(f"[SCORE] Result {i} matches year '{year}' - score +2") |
| |
| |
| |
| if not is_pdf: |
| |
| index_patterns = ['results', 'default', 'index', 'overview', 'main', 'performance'] |
| if any(pattern in link for pattern in index_patterns): |
| score -= 5 |
| logger.info(f"[SCORE] Result {i} is an index/landing page - score -5") |
| |
| |
| if 'press-release' in link or 'press_release' in link or 'webcast' in link: |
| score += 8 |
| logger.info(f"[SCORE] Result {i} is a press-release page - score +8") |
| |
| |
| |
| if is_pdf: |
| credible_indicators = ['.gov', 'investor', 'ir.', 'cdn'] |
| if any(indicator in link for indicator in credible_indicators): |
| score += 2 |
| |
| |
| if score > best_score: |
| best_score = score |
| best_match_index = i |
| |
| |
| if is_multiple_quarter_request and len(quarters_requested) > 1: |
| logger.info(f"[MULTI-QUARTER] User requested {len(quarters_requested)} quarters, returning multiple links") |
| |
| |
| quarter_results = {q: [] for q in quarters_requested} |
| |
| for i, result in enumerate(formatted_results): |
| title = result.get("title", "").lower() |
| snippet = result.get("snippet", "").lower() |
| link = result.get("link", "") |
| |
| |
| original_result = search_results[i] if i < len(search_results) else {} |
| |
| |
| is_pdf = link.lower().endswith('.pdf') |
| |
| |
| if original_result.get("mime") == "application/pdf" or original_result.get("fileFormat") == "PDF/Adobe Acrobat": |
| is_pdf = True |
| |
| |
| |
| quarter_scores = {} |
| for quarter in quarters_requested: |
| score = 0 |
| |
| |
| if primary_company: |
| domain_match = re.search(r'https?://(?:www\.)?([^/]+)', link) |
| if domain_match: |
| result_domain = domain_match.group(1) |
| result_company = domain_to_company.get(result_domain, None) |
| |
| if result_company == primary_company: |
| score += 30 |
| elif result_company and result_company != primary_company: |
| score -= 100 |
| |
| |
| if is_pdf: |
| score += 20 |
| |
| |
| if quarter in title or quarter in snippet or quarter in link.lower(): |
| score += 10 |
| |
| |
| quarter_num = quarter[1] |
| if f"q{quarter_num}" in title or f"q{quarter_num}" in snippet or f"q{quarter_num}" in link.lower(): |
| score += 5 |
| |
| |
| if not is_pdf: |
| index_indicators = ['default.aspx', 'investor-relations', '/overview/', 'index'] |
| if any(indicator in link.lower() for indicator in index_indicators): |
| score -= 15 |
| |
| quarter_scores[quarter] = score |
| |
| |
| if quarter_scores: |
| best_quarter = max(quarter_scores.items(), key=lambda x: x[1]) |
| if best_quarter[1] > 0: |
| quarter_results[best_quarter[0]].append({ |
| "index": i, |
| "title": result.get("title", ""), |
| "link": link, |
| "snippet": result.get("snippet", ""), |
| "score": best_quarter[1], |
| "is_pdf": is_pdf |
| }) |
| |
| |
| selected_links = [] |
| for quarter in quarters_requested: |
| if quarter_results[quarter]: |
| |
| sorted_results = sorted(quarter_results[quarter], key=lambda x: x.get("score", 0), reverse=True) |
| best_for_quarter = sorted_results[0] |
| selected_links.append({ |
| "quarter": quarter.upper(), |
| "title": best_for_quarter["title"], |
| "link": best_for_quarter["link"], |
| "snippet": best_for_quarter["snippet"] |
| }) |
| is_pdf_marker = "[PDF]" if best_for_quarter.get("is_pdf", False) else "[Web Page]" |
| logger.info(f"[MULTI-QUARTER] Found result for {quarter.upper()}: {is_pdf_marker} {best_for_quarter['title'][:50]} (score: {best_for_quarter['score']})") |
| else: |
| logger.warning(f"[MULTI-QUARTER] No result found for {quarter.upper()}") |
| |
| if selected_links: |
| return { |
| "type": "multiple_download_links", |
| "links": selected_links, |
| "message": f"Found {len(selected_links)} financial reports for the requested quarters: {', '.join([q.upper() for q in quarters_requested])}", |
| "confidence": "high" if len(selected_links) == len(quarters_requested) else "medium", |
| "reasoning": f"Selected best result for each requested quarter. Found {len(selected_links)} out of {len(quarters_requested)} quarters." |
| } |
| |
| |
| if best_match_index >= 0 and best_score > 0: |
| selected_result = formatted_results[best_match_index] |
| original_result = search_results[best_match_index] |
| |
| |
| link = selected_result["link"] |
| if not link.lower().endswith(".pdf") and ("investor" in link or "ir." in link or "financial-report" in link): |
| |
| pdf_links = await extract_pdf_links_from_page(link, user_request) |
| if pdf_links: |
| |
| if "2份" in user_request.lower() or "two" in user_request.lower() or "2" in user_request.lower(): |
| |
| relevant_links = pdf_links[:2] |
| return { |
| "type": "download_links_extracted", |
| "links": relevant_links, |
| "message": f"Found {len(relevant_links)} most relevant financial reports for your request", |
| "confidence": "high" if best_score >= 5 else ("medium" if best_score >= 2 else "low"), |
| "reasoning": f"Selected based on relevance scoring (score: {best_score}) and extracted {len(relevant_links)} PDF links from index page." |
| } |
| else: |
| |
| pdf_link = pdf_links[0] |
| return { |
| "type": "download_link_extracted", |
| "title": f"{selected_result['title']} - {pdf_link.get('title', 'PDF Report')}", |
| "link": pdf_link["url"], |
| "snippet": pdf_link.get("snippet", selected_result["snippet"]), |
| "message": f"Found the most relevant financial report for your request: {pdf_link.get('title', 'PDF Report')}", |
| "confidence": "high" if best_score >= 5 else ("medium" if best_score >= 2 else "low"), |
| "reasoning": f"Selected based on relevance scoring (score: {best_score}) and extracted PDF link from index page." |
| } |
| |
| return { |
| "type": "download_link_extracted", |
| "title": selected_result["title"], |
| "link": selected_result["link"], |
| "snippet": selected_result["snippet"], |
| "message": f"Found the most relevant financial report for your request: {selected_result['title']}", |
| "confidence": "high" if best_score >= 5 else ("medium" if best_score >= 2 else "low"), |
| "reasoning": f"Selected based on relevance scoring (score: {best_score}). This result matches key terms in your request." |
| } |
| else: |
| |
| if search_results: |
| first_result = search_results[0] |
| link = first_result.get("link", "") |
| |
| |
| if not link.lower().endswith(".pdf") and ("investor" in link or "ir." in link or "financial-report" in link): |
| |
| pdf_links = await extract_pdf_links_from_page(link, user_request) |
| if pdf_links: |
| |
| if "2份" in user_request.lower() or "two" in user_request.lower() or "2" in user_request.lower(): |
| |
| relevant_links = pdf_links[:2] |
| return { |
| "type": "download_links_extracted", |
| "links": relevant_links, |
| "message": f"Found {len(relevant_links)} most relevant financial reports for your request", |
| "confidence": "low", |
| "reasoning": f"Extracted {len(relevant_links)} PDF links from index page. No highly relevant results found using keyword matching." |
| } |
| else: |
| |
| pdf_link = pdf_links[0] |
| return { |
| "type": "download_link_extracted", |
| "title": pdf_link.get("title", f"{first_result.get('title', 'Financial Report')} - PDF"), |
| "link": pdf_link["url"], |
| "snippet": pdf_link.get("snippet", first_result.get("snippet", "")), |
| "message": f"Found a potential financial report: {pdf_link.get('title', 'PDF Report')}", |
| "confidence": "low", |
| "reasoning": "Extracted PDF link from index page. No highly relevant results found using keyword matching." |
| } |
| |
| return { |
| "type": "download_link_extracted", |
| "title": first_result.get("title", ""), |
| "link": first_result.get("link", ""), |
| "snippet": first_result.get("snippet", ""), |
| "message": "Found a potential financial report, but it may not exactly match your request.", |
| "confidence": "low", |
| "reasoning": "No highly relevant results found using keyword matching." |
| } |
| else: |
| return { |
| "type": "no_results", |
| "message": "No search results available to analyze.", |
| "suggestion": "Please try a different search or provide a direct URL.", |
| "reasoning": "No search results were provided for analysis." |
| } |
| except Exception as e: |
| logger.error(f"Error in deep analysis: {str(e)}") |
| return { |
| "type": "analysis_error", |
| "error": str(e), |
| "message": f"Error occurred while analyzing search results: {str(e)}", |
| "suggestion": "Please try again or provide a direct URL for the financial report." |
| } |
|
|
| |
| @mcp.resource("financial-report://{filename}") |
| def get_financial_report_content(filename: str) -> str: |
| """ |
| Get the content of an extracted financial report |
| |
| Args: |
| filename: Name of the extracted file |
| |
| Returns: |
| Content of the financial report |
| """ |
| |
| reports_dir = Path("financial_reports").absolute() |
| file_path = reports_dir / filename |
| |
| if not file_path.exists(): |
| |
| relative_path = Path("financial_reports") / filename |
| if relative_path.exists(): |
| file_path = relative_path |
| else: |
| raise Exception(f"File not found: {filename}. Searched in {reports_dir} and relative path {relative_path}") |
| |
| |
| if filename.lower().endswith('.pdf'): |
| try: |
| import pdfplumber |
| with pdfplumber.open(file_path) as pdf: |
| text = "" |
| for page in pdf.pages: |
| text += page.extract_text() or "" |
| return text |
| except Exception as e: |
| |
| logger.error(f"Error extracting text from PDF {filename}: {str(e)}") |
| return f"Error extracting text from PDF {filename}: {str(e)}" |
| else: |
| |
| with open(file_path, "r", encoding="utf-8") as f: |
| return f.read() |
|
|
| if __name__ == "__main__": |
| |
| |
| |
| import sys |
| print("MCP SDK imported successfully", file=sys.stderr) |
| mcp.run(transport="stdio") |