baba521's picture
test chatbot
b13226f
"""
Financial Report MCP Server using the official MCP Python SDK
This server provides tools for downloading and processing financial reports.
"""
import asyncio
import logging
import os
import sys
from pathlib import Path
from typing import Optional, Dict, Any, List
from datetime import datetime
import aiohttp
import ssl
import pdfplumber
from bs4 import BeautifulSoup
import httpx
import json
import re
from huggingface_hub import InferenceClient
# Configure logging - write to stderr instead of stdout to avoid interfering with stdio communication
logging.basicConfig(level=logging.INFO, stream=sys.stderr)
logger = logging.getLogger(__name__)
# Import the official MCP SDK
try:
from mcp.server.fastmcp import FastMCP, Context
from mcp.server.session import ServerSession
logger.info("MCP SDK imported successfully")
except ImportError as e:
logger.error(f"Failed to import MCP SDK: {e}")
raise
# Create the MCP server
mcp = FastMCP("Financial Report MCP Server", "1.0.0")
# Ensure the financial_reports directory exists
reports_dir = Path("financial_reports")
reports_dir.mkdir(exist_ok=True)
logger.info(f"Financial reports directory: {reports_dir.absolute()}")
@mcp.tool()
async def download_financial_report(url: str) -> Dict[str, Any]:
"""
Download a financial report from a URL
Args:
url: The URL of the financial report to download
Returns:
Dictionary with download information
"""
logger.info(f"Downloading financial report from {url}")
try:
# Decode URL if it contains encoded characters
import urllib.parse
decoded_url = urllib.parse.unquote(url)
logger.info(f"Decoded URL: {decoded_url}")
# Re-encode the URL properly to handle spaces and other special characters
encoded_url = urllib.parse.quote(decoded_url, safe=':/?#[]@!$&\'()*+,;=%')
logger.info(f"Re-encoded URL: {encoded_url}")
# Create SSL context that doesn't verify certificates (for testing)
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
# Add timeout and headers for better reliability
timeout = aiohttp.ClientTimeout(total=30) # 30 second timeout
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(encoded_url, ssl=ssl_context, headers=headers) as response:
if response.status != 200:
raise Exception(f"HTTP {response.status} when downloading {encoded_url}")
# CRITICAL: Check if this is an HTML investor relations page
# If so, try to extract PDF links instead of downloading the HTML
content_type = response.headers.get('content-type', '').lower()
is_html = 'html' in content_type
is_investor_page = any(pattern in url.lower() for pattern in ['investor', 'ir.', 'press-release', 'earnings', 'financial'])
if is_html and is_investor_page:
logger.info(f"[DOWNLOAD] Detected HTML investor relations page, attempting to extract PDF links")
# Try to extract PDF links from this page
pdf_links = await extract_pdf_links_from_page(url, "")
if pdf_links:
# Found PDF link(s), download the first PDF instead
pdf_url = pdf_links[0]["url"]
logger.info(f"[DOWNLOAD] Found PDF link, redirecting download to: {pdf_url}")
# Recursively call ourselves with the PDF URL
return await download_financial_report(pdf_url)
else:
logger.warning(f"[DOWNLOAD] No PDF links found on investor page, downloading HTML anyway")
# Determine filename from decoded URL to preserve original filename
filename = decoded_url.split("/")[-1]
if not filename or "." not in filename:
if 'pdf' in content_type:
filename = f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf"
elif 'html' in content_type:
filename = f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
else:
filename = f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.dat"
# Save file
file_path = Path("financial_reports") / filename
content = await response.read()
logger.info(f"Saving report to {file_path.absolute()}")
with open(file_path, "wb") as f:
f.write(content)
logger.info(f"Successfully downloaded report to {file_path}")
return {
"filename": filename,
"filepath": str(file_path),
"size": len(content),
"download_time": datetime.now().isoformat(),
"source_url": url # CRITICAL: Include original URL for analysis context
}
except aiohttp.ClientError as e:
logger.error(f"Network error downloading financial report: {str(e)}")
raise Exception(f"Network error downloading financial report: {str(e)}. This may be due to network restrictions in the execution environment.")
except Exception as e:
logger.error(f"Error downloading financial report: {str(e)}")
raise Exception(f"Error downloading financial report: {str(e)}")
@mcp.tool()
async def list_downloaded_reports() -> Dict[str, Any]:
"""
List all downloaded financial reports
Returns:
Dictionary with list of reports
"""
try:
reports = []
download_dir = Path("financial_reports")
if download_dir.exists():
for file_path in download_dir.iterdir():
if file_path.is_file():
stat = file_path.stat()
# Import urllib.parse here to avoid undefined name error
import urllib.parse
reports.append({
"filename": file_path.name,
"filepath": str(file_path),
"size": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
"encoded_filename": urllib.parse.quote(file_path.name, safe=':/?#[]@!$&\'()*+,;=%')
})
return {
"reports": reports
}
except Exception as e:
logger.error(f"Error listing downloaded reports: {str(e)}")
raise Exception(f"Error listing downloaded reports: {str(e)}")
@mcp.tool()
async def analyze_financial_report_file(filename: str, source_url: str = "") -> Dict[str, Any]:
"""
Analyze a downloaded financial report file and provide investment insights
Args:
filename: Name of the financial report file to analyze
source_url: Optional original URL where the report was downloaded from
Returns:
Dictionary with analysis results and investment insights
"""
logger.info(f"Analyzing financial report file: {filename}")
if source_url:
logger.info(f"Source URL: {source_url}")
try:
# CRITICAL: If filename is empty, auto-detect the most recently downloaded file
if not filename or filename.strip() == "":
logger.info("[AUTO-DETECT] No filename provided, looking for most recent downloaded file")
reports_dir = Path("financial_reports")
if reports_dir.exists():
# Get all files in the directory
files = [(f, f.stat().st_mtime) for f in reports_dir.iterdir() if f.is_file()]
if files:
# Sort by modification time (most recent first)
files.sort(key=lambda x: x[1], reverse=True)
filename = files[0][0].name
logger.info(f"[AUTO-DETECT] Found most recent file: {filename}")
else:
raise Exception("No filename provided and no downloaded files found in financial_reports directory")
else:
raise Exception("No filename provided and financial_reports directory does not exist")
# Use absolute path to ensure correct file access in different environments
reports_dir = Path("financial_reports").absolute()
file_path = reports_dir / filename
if not file_path.exists():
# Also check with relative path as fallback
relative_path = Path("financial_reports") / filename
if relative_path.exists():
file_path = relative_path
else:
raise Exception(f"File not found: {filename}. Searched in {reports_dir} and relative path {relative_path}")
# Handle PDF files properly
file_content = ""
if filename.lower().endswith('.pdf'):
try:
import pdfplumber
with pdfplumber.open(file_path) as pdf:
text = ""
# Extract text from first few pages to avoid overwhelming the model
pages_to_extract = min(10, len(pdf.pages)) # Limit to first 10 pages
for i in range(pages_to_extract):
page = pdf.pages[i]
text += page.extract_text() or ""
file_content = text
except Exception as e:
# If PDF extraction fails, return error message
logger.error(f"Error extracting text from PDF {filename}: {str(e)}")
file_content = f"Error extracting text from PDF {filename}: {str(e)}"
else:
# For text-based files, read normally
with open(file_path, "r", encoding="utf-8") as f:
file_content = f.read()
# CRITICAL: If this is HTML content and we have source_url, extract clean text instead
is_html = (
filename.lower().endswith('.html') or
'<html' in file_content.lower()[:500] or
'<!doctype html' in file_content.lower()[:500] or
'<meta' in file_content.lower()[:500]
)
if is_html and source_url:
logger.info(f"[HTML EXTRACTION] Detected HTML content, extracting text from source URL: {source_url}")
try:
from bs4 import BeautifulSoup
# Re-fetch the page to get full content (not truncated)
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(source_url, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
response.raise_for_status()
# Parse HTML and extract text
soup = BeautifulSoup(response.text, 'html.parser')
# Remove script, style, nav, header, footer
for element in soup(["script", "style", "nav", "header", "footer", "noscript"]):
element.decompose()
# Get text
text = soup.get_text(separator='\n', strip=True)
# Clean up whitespace
lines = [line.strip() for line in text.splitlines() if line.strip()]
clean_text = '\n'.join(lines)
if clean_text:
file_content = clean_text
logger.info(f"[HTML EXTRACTION] Successfully extracted {len(file_content)} characters of clean text")
else:
logger.warning(f"[HTML EXTRACTION] No text extracted, using original HTML")
except Exception as e:
logger.error(f"[HTML EXTRACTION] Failed to extract text: {str(e)}")
logger.info(f"[HTML EXTRACTION] Falling back to original HTML content")
# Keep using the original HTML file_content
# Truncate content if too long for the model
if len(file_content) > 15000:
file_content = file_content[:15000] + "... (truncated)"
# Return file analysis trigger with content for the main app to process
# This allows app.py to do streaming analysis which is better for UX
result = {
"type": "file_analysis_trigger",
"file_path": str(file_path),
"filename": filename,
"content": file_content, # Include full content for analysis
"content_preview": file_content[:500] + "... (preview truncated)" if len(file_content) > 500 else file_content
}
# CRITICAL: Include source URL if available for analysis context
if source_url:
result["source_url"] = source_url
logger.info(f"Including source URL in analysis result: {source_url}")
return result
except Exception as e:
logger.error(f"Error analyzing financial report file {filename}: {str(e)}")
raise Exception(f"Error analyzing financial report file {filename}: {str(e)}")
# New tool for searching financial reports online
@mcp.tool()
async def search_and_extract_financial_report(user_query: str) -> Dict[str, Any]:
"""
Search for financial reports online based on user's query and return raw search results for Agent analysis
Args:
user_query: The user's complete search query
Returns:
Dictionary with raw search results for Agent analysis
"""
search_base_url = 'https://www.googleapis.com/customsearch/v1'
params = {
"key": "AIzaSyARhFllOKRdpHjij5idJZ-vXa-0fdIQqGI",
"cx": "51d2770bb9e304626",
"q": user_query
}
logger.info(f"Searching for financial reports with query: {user_query}")
try:
async with httpx.AsyncClient() as client:
response = await client.get(search_base_url, params=params)
response.raise_for_status()
search_results = response.json()
# Check if we have search results
if "items" in search_results and search_results["items"]:
# Return search results with proper structure
return {
"type": "search_results",
"results": search_results["items"],
"message": f"Successfully found {len(search_results['items'])} search results for query: {user_query}"
}
else:
# No results found
return {
"type": "search_no_results",
"message": f"No financial reports found for query: {user_query}",
"suggestion": "Please provide a direct URL (or PDF format URL) for the financial report you're looking for."
}
except httpx.RequestError as e:
logger.error(f"Error performing web search: {str(e)}")
return {
"type": "search_error",
"error": str(e),
"message": f"Exception while searching for financial reports with query '{user_query}': {str(e)}",
"suggestion": "Please ask user to provide a direct URL (or PDF format URL) for the financial report due to search error."
}
@mcp.tool()
def rank_pdf_links_by_relevance(pdf_links: List[Dict[str, str]], user_request: str) -> List[Dict[str, str]]:
"""
Rank PDF links by relevance to user request
Args:
pdf_links: List of PDF links to rank
user_request: User's specific request
Returns:
Ranked list of PDF links
"""
# Convert user request to lowercase for case-insensitive matching
user_request_lower = user_request.lower()
# Score each PDF link based on relevance using dynamic token matching
scored_links = []
for link in pdf_links:
title = link.get("title", "").lower()
snippet = link.get("snippet", "").lower()
score = 0
# Dynamic keyword matching - extract tokens from user request and compare
request_tokens = set(user_request_lower.split())
title_tokens = set(title.split())
snippet_tokens = set(snippet.split())
# Calculate token overlap
title_overlap = len(request_tokens & title_tokens)
snippet_overlap = len(request_tokens & snippet_tokens)
if title_overlap > 0:
score += title_overlap * 2 # Each matching word in title = +2 points
if snippet_overlap > 0:
score += snippet_overlap # Each matching word in snippet = +1 point
# Prefer more recent reports - dynamically check for year patterns
import re
year_matches = re.findall(r'\b(19|20)\d{2}\b', user_request_lower)
for year in year_matches:
if year in title or year in snippet:
score += 1
# Check for "recent" indicators dynamically
recent_indicators = ['最近', 'recent', 'latest', 'newest']
if any(indicator in user_request_lower for indicator in recent_indicators):
# Prefer links with recent years in title
current_year = datetime.now().year
for i in range(3): # Check for current year and 2 previous years
year_str = str(current_year - i)
if year_str in title or year_str in snippet:
score += (3 - i) # Higher score for more recent years
scored_links.append((score, link))
# Sort by score (descending)
scored_links.sort(key=lambda x: x[0], reverse=True)
# Return links without scores
return [link for score, link in scored_links]
async def extract_pdf_links_from_page(url: str, user_request: str = "") -> List[Dict[str, str]]:
"""
Extract PDF links from a financial report index page and rank them based on user request
Args:
url: URL of the index page to parse
user_request: User's specific request for filtering relevant PDFs
Returns:
List of dictionaries containing PDF link information, sorted by relevance
"""
logger.info(f"Extracting PDF links from page: {url}")
try:
# Create SSL context that doesn't verify certificates (for testing)
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
# Add timeout and headers for better reliability
timeout = aiohttp.ClientTimeout(total=30) # 30 second timeout
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url, ssl=ssl_context, headers=headers) as response:
if response.status != 200:
logger.warning(f"HTTP {response.status} when fetching {url}")
return []
content = await response.text()
soup = BeautifulSoup(content, 'html.parser')
pdf_links = []
# Look for PDF links in the page
for link_elem in soup.find_all('a', href=True):
href = link_elem['href']
title = link_elem.get_text(strip=True)
# Check if this is a PDF link
if href.lower().endswith('.pdf'):
# Make absolute URL if needed
if href.startswith('//'):
href = 'https:' + href
elif href.startswith('/'):
# Construct absolute URL from base URL
from urllib.parse import urljoin
href = urljoin(url, href)
elif not href.startswith('http'):
# Relative URL, construct absolute URL
from urllib.parse import urljoin
href = urljoin(url, href)
pdf_links.append({
"url": href,
"title": title or "PDF Report",
"snippet": f"PDF document: {title}"
})
# Also look for links with potential PDF indicators in text
# Use dynamic matching instead of hardcoded keywords
for link_elem in soup.find_all('a', href=True):
href = link_elem['href']
title = link_elem.get_text(strip=True)
title_lower = title.lower()
# Dynamic check: if link text contains PDF-related terms from user request
# or common report indicators, consider it
request_tokens = set(user_request.lower().split()) if user_request else set()
title_tokens = set(title_lower.split())
# Check for overlap with user request OR common PDF indicators
has_request_match = len(request_tokens & title_tokens) > 0 if request_tokens else False
has_pdf_indicator = 'pdf' in title_lower or '.pdf' in href.lower()
if has_request_match or has_pdf_indicator:
# Make absolute URL if needed
if href.startswith('//'):
href = 'https:' + href
elif href.startswith('/'):
# Construct absolute URL from base URL
from urllib.parse import urljoin
href = urljoin(url, href)
elif not href.startswith('http'):
# Relative URL, construct absolute URL
from urllib.parse import urljoin
href = urljoin(url, href)
# If it's a PDF link, add it
if href.lower().endswith('.pdf'):
pdf_links.append({
"url": href,
"title": title or "PDF Report",
"snippet": f"PDF document: {title}"
})
# Rank PDF links based on user request
if user_request:
ranked_links = rank_pdf_links_by_relevance(pdf_links, user_request)
else:
ranked_links = pdf_links
logger.info(f"Found {len(ranked_links)} PDF links on page {url}")
return ranked_links
except Exception as e:
logger.error(f"Error extracting PDF links from {url}: {str(e)}")
return []
@mcp.tool()
async def deep_analyze_and_extract_download_link(search_results: List[Dict[str, Any]], user_request: str) -> Dict[str, Any]:
"""
Deep analyze search results using LLM and extract the most relevant download link based on user request
Args:
search_results: List of search results from search_and_extract_financial_report
user_request: The user's specific request
Returns:
Dictionary with the most relevant download link and related information
"""
logger.info(f"Deep analyzing search results for user request: {user_request}")
# CRITICAL: Detect if user is requesting MULTIPLE quarters/reports
# Use dynamic regex pattern matching instead of hardcoding quarter names
user_request_lower = user_request.lower()
# Detect quarter requests dynamically using regex
quarters_requested = []
# Pattern 1: Q1, Q2, Q3, Q4 (case insensitive)
import re
q_pattern = re.findall(r'\bq([1-4])\b', user_request_lower)
for q_num in q_pattern:
quarter_key = f'q{q_num}'
if quarter_key not in quarters_requested:
quarters_requested.append(quarter_key)
# Pattern 2: "first", "second", "third", "fourth" + "quarter"
quarter_words = {
'first': 'q1',
'second': 'q2',
'third': 'q3',
'fourth': 'q4',
'1st': 'q1',
'2nd': 'q2',
'3rd': 'q3',
'4th': 'q4'
}
for word, q_key in quarter_words.items():
if word in user_request_lower and 'quarter' in user_request_lower:
if q_key not in quarters_requested:
quarters_requested.append(q_key)
is_multiple_quarter_request = len(quarters_requested) > 1
logger.info(f"[MULTI-QUARTER DETECTION] Quarters requested: {quarters_requested}, is_multiple: {is_multiple_quarter_request}")
try:
# Convert search results to a more readable format for LLM analysis
formatted_results = []
for i, result in enumerate(search_results[:10]): # Limit to top 10 results
formatted_results.append({
"index": i,
"title": result.get("title", ""),
"link": result.get("link", ""),
"snippet": result.get("snippet", "")
})
# Create prompt for LLM to analyze search results
prompt = f"""
You are a financial report analysis expert. Your task is to analyze search results and identify the most relevant download link for a user's specific request.
User Request: {user_request}
Search Results:
{json.dumps(formatted_results, indent=2)}
Please analyze these search results and identify the most relevant financial report that matches the user's request. Consider factors such as:
1. **CRITICAL: Prefer direct PDF download links (.pdf URLs) over web pages** - Users want downloadable files, not landing pages
2. Relevance to the user's specific request (company name, report type, quarter/year, etc.)
3. Source credibility (official company websites, SEC.gov, etc.)
4. Match the exact period requested (e.g., if user asks for Q1 2025, prioritize Q1 2025 reports over annual reports)
5. Avoid generic index pages or landing pages - look for specific report PDFs
Priority Rules:
- Direct PDF link for the exact period requested = HIGHEST PRIORITY
- Direct PDF link for a related period = HIGH PRIORITY
- Web page or landing page = LOW PRIORITY (only if no PDF available)
Respond with a JSON object in the following format:
{{
"selected_index": 0,
"reasoning": "Explanation of why this result was selected",
"confidence": "high|medium|low"
}}
If none of the results are relevant, respond with:
{{
"selected_index": -1,
"reasoning": "Explanation of why no results are relevant",
"confidence": "low"
}}
"""
# Call LLM for analysis
try:
import sys
import os
print(f"[LLM-DEBUG] About to initialize InferenceClient...", file=sys.stderr)
# Get token from environment
hf_token = os.getenv("HUGGING_FACE_HUB_TOKEN")
if hf_token:
print(f"[LLM-DEBUG] Found HUGGING_FACE_HUB_TOKEN (length: {len(hf_token)})", file=sys.stderr)
else:
print(f"[LLM-DEBUG] WARNING: No token found", file=sys.stderr)
# Initialize the Hugging Face Inference Client with explicit endpoint
from huggingface_hub import InferenceClient
client = InferenceClient(
token=hf_token,
base_url="https://api-inference.huggingface.co/models/Qwen/Qwen2.5-72B-Instruct"
)
print(f"[LLM-DEBUG] InferenceClient initialized successfully", file=sys.stderr)
messages = [
{"role": "system", "content": "You are a precise JSON generator that helps analyze financial report search results. You are also helpful in guiding users to find the most relevant financial reports. You should ONLY generate valid JSON responses in the specified format."},
{"role": "user", "content": prompt}
]
# Get response from LLM
response = client.chat.completions.create(
model="Qwen/Qwen2.5-72B-Instruct",
messages=messages,
max_tokens=500,
temperature=0.3,
)
# Extract the JSON response
if hasattr(response, 'choices') and len(response.choices) > 0:
content = response.choices[0].message.content if hasattr(response.choices[0].message, 'content') else str(response.choices[0].message)
else:
content = str(response)
# Try to parse as JSON
try:
# Extract JSON from the response if it's wrapped in other text
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
json_str = json_match.group(0)
llm_result = json.loads(json_str)
# Extract the selected index
selected_index = llm_result.get("selected_index", -1)
reasoning = llm_result.get("reasoning", "No reasoning provided")
confidence = llm_result.get("confidence", "low")
# If a valid index was selected, return that result
if 0 <= selected_index < len(formatted_results):
selected_result = formatted_results[selected_index]
original_result = search_results[selected_index]
# CRITICAL: If LLM selected a non-PDF link, try to extract PDF from the page first
link = selected_result["link"]
if not link.lower().endswith(".pdf"):
# Check if it looks like an investor relations page
if "investor" in link or "ir." in link or "press-release" in link or "earnings" in link:
logger.info(f"[LLM-SELECTED] Non-PDF link detected, attempting to extract PDF from page: {link}")
pdf_links = await extract_pdf_links_from_page(link, user_request)
if pdf_links:
# Return the first PDF link found
pdf_link = pdf_links[0]
logger.info(f"[LLM-SELECTED] Successfully extracted PDF: {pdf_link.get('title', 'PDF Report')}")
return {
"type": "download_link_extracted",
"title": f"{selected_result['title']} - {pdf_link.get('title', 'PDF Report')}",
"link": pdf_link["url"],
"snippet": pdf_link.get("snippet", selected_result["snippet"]),
"message": f"Found the most relevant financial report for your request: {pdf_link.get('title', 'PDF Report')}",
"confidence": confidence,
"reasoning": f"{reasoning}. Extracted PDF link from the selected page."
}
else:
logger.warning(f"[LLM-SELECTED] No PDF links found on page: {link}")
return {
"type": "download_link_extracted",
"title": selected_result["title"],
"link": selected_result["link"],
"snippet": selected_result["snippet"],
"message": f"Found the most relevant financial report for your request: {selected_result['title']}",
"confidence": confidence,
"reasoning": reasoning
}
elif selected_index == -1:
# No relevant results found
if search_results:
first_result = search_results[0]
return {
"type": "download_link_extracted",
"title": first_result.get("title", ""),
"link": first_result.get("link", ""),
"snippet": first_result.get("snippet", ""),
"message": "Found a potential financial report, but it may not exactly match your request.",
"confidence": "low",
"reasoning": reasoning
}
else:
return {
"type": "no_results",
"message": "No search results available to analyze.",
"suggestion": "Please try a different search or provide a direct URL.",
"reasoning": "No search results were provided for analysis."
}
else:
# Invalid index, fall back to heuristic-based selection
raise ValueError("Invalid selected_index from LLM response")
else:
# If no JSON found, fall back to heuristic-based selection
raise ValueError("No valid JSON found in LLM response")
except (json.JSONDecodeError, ValueError) as e:
# If JSON parsing fails, fall back to heuristic-based selection
logger.warning(f"LLM response parsing failed, falling back to heuristic analysis: {str(e)}")
pass
except Exception as llm_error:
# If LLM call fails, fall back to heuristic-based selection
logger.warning(f"LLM call failed, falling back to heuristic analysis: {str(llm_error)}")
pass
# Fallback: Simple heuristic-based selection
logger.info("Using heuristic-based selection as fallback")
best_match_index = -1
best_score = -1
user_request_lower = user_request.lower()
# CRITICAL: Dynamically extract company names from search results
# Strategy: Identify unique domains/companies that appear in results
# The company mentioned in MOST results is likely the requested company
company_mentions = {} # {company_identifier: count}
domain_to_company = {} # {domain: company_name}
# First pass: Learn which companies appear in the search results
for result in formatted_results:
title = result.get("title", "").lower()
link = result.get("link", "").lower()
# Extract domain
domain_match = re.search(r'https?://(?:www\.)?([^/]+)', link)
if domain_match:
domain = domain_match.group(1)
# Extract company identifier from domain dynamically
# Strategy: Use the main part of domain as company key
# e.g., "intc.com" -> "intc", "aboutamazon.com" -> "aboutamazon", "ir.tesla.com" -> "tesla"
# Remove common prefixes/suffixes
domain_parts = domain.replace('www.', '').replace('ir.', '').replace('investor.', '').replace('investors.', '')
# Get the core domain name (before .com/.net/etc)
core_domain = domain_parts.split('.')[0]
# Use core domain as company identifier
company_key = core_domain
# Track company mentions
company_mentions[company_key] = company_mentions.get(company_key, 0) + 1
domain_to_company[domain] = company_key
# Determine the PRIMARY requested company (most mentioned in results)
primary_company = None
if company_mentions:
primary_company = max(company_mentions.items(), key=lambda x: x[1])[0]
logger.info(f"[COMPANY DETECTION] Detected primary company: '{primary_company}' (mentioned in {company_mentions[primary_company]} results)")
logger.info(f"[COMPANY DETECTION] All companies found: {company_mentions}")
for i, result in enumerate(formatted_results):
title = result.get("title", "").lower()
snippet = result.get("snippet", "").lower()
link = result.get("link", "")
# Get original result for metadata access
original_result = search_results[i] if i < len(search_results) else {}
# Calculate relevance score
score = 0
# CRITICAL #1: Company matching (HIGHEST PRIORITY)
# If we detected a primary company from search results, prioritize results from that company
if primary_company:
# Extract domain from this result
domain_match = re.search(r'https?://(?:www\.)?([^/]+)', link)
if domain_match:
result_domain = domain_match.group(1)
result_company = domain_to_company.get(result_domain, None)
if result_company == primary_company:
# This result is from the primary company!
score += 30 # HUGE bonus for matching primary company
logger.info(f"[SCORE] Result {i} from primary company '{primary_company}' (domain: {result_domain}) - score +30")
elif result_company and result_company != primary_company:
# This result is from a DIFFERENT company
score -= 100 # MASSIVE penalty for wrong company
logger.info(f"[SCORE] Result {i} from WRONG company '{result_company}' (expected '{primary_company}') - score -100")
# CRITICAL #2: Heavily prefer direct PDF files
# Check both URL extension AND mime type metadata
is_pdf = False
if link.lower().endswith(".pdf"):
is_pdf = True
score += 10 # Base PDF score
# BONUS: Check for explicit PDF metadata (mime type and fileFormat)
if original_result.get("mime") == "application/pdf" or original_result.get("fileFormat") == "PDF/Adobe Acrobat":
is_pdf = True
score += 12 # Even higher score for confirmed PDFs with metadata!
logger.info(f"[SCORE] Result {i} has PDF metadata (mime/fileFormat) - score +12")
# Check for keywords/patterns matching between user request and result
# Extract key terms from user request dynamically
request_tokens = set(user_request_lower.split())
title_tokens = set(title.split())
snippet_tokens = set(snippet.split())
# Calculate token overlap (how many words match)
title_overlap = len(request_tokens & title_tokens)
snippet_overlap = len(request_tokens & snippet_tokens)
# Bonus for word matches
if title_overlap > 0:
score += title_overlap * 2 # Each matching word in title = +2 points
logger.info(f"[SCORE] Result {i} has {title_overlap} matching words in title - score +{title_overlap * 2}")
if snippet_overlap > 0:
score += snippet_overlap # Each matching word in snippet = +1 point
logger.info(f"[SCORE] Result {i} has {snippet_overlap} matching words in snippet - score +{snippet_overlap}")
# Check for year patterns in user request and result
year_patterns = re.findall(r'\b(19|20)\d{2}\b', user_request_lower)
for year in year_patterns:
if year in title or year in snippet or year in link:
score += 2
logger.info(f"[SCORE] Result {i} matches year '{year}' - score +2")
# Penalize landing/index pages if they're NOT PDFs
# Dynamic check: look for common index page patterns in URL
if not is_pdf:
# Check if URL looks like an index/landing page (contains common patterns)
index_patterns = ['results', 'default', 'index', 'overview', 'main', 'performance']
if any(pattern in link for pattern in index_patterns):
score -= 5 # Heavy penalty for index pages
logger.info(f"[SCORE] Result {i} is an index/landing page - score -5")
# Prefer press-release pages over performance/overview pages
if 'press-release' in link or 'press_release' in link or 'webcast' in link:
score += 8 # Bonus for press release pages (likely to have PDFs)
logger.info(f"[SCORE] Result {i} is a press-release page - score +8")
# Prefer official sources (but only if it's a PDF)
# Dynamic check: look for credible domain indicators
if is_pdf:
credible_indicators = ['.gov', 'investor', 'ir.', 'cdn']
if any(indicator in link for indicator in credible_indicators):
score += 2
# Update best match if this score is higher
if score > best_score:
best_score = score
best_match_index = i
# SPECIAL HANDLING: If user requested multiple quarters, return multiple links
if is_multiple_quarter_request and len(quarters_requested) > 1:
logger.info(f"[MULTI-QUARTER] User requested {len(quarters_requested)} quarters, returning multiple links")
# Group results by quarter using dynamic scoring
quarter_results = {q: [] for q in quarters_requested}
for i, result in enumerate(formatted_results):
title = result.get("title", "").lower()
snippet = result.get("snippet", "").lower()
link = result.get("link", "")
# Get original result for metadata access
original_result = search_results[i] if i < len(search_results) else {}
# CRITICAL: Check if this is a PDF link
is_pdf = link.lower().endswith('.pdf')
# Also check PDF metadata
if original_result.get("mime") == "application/pdf" or original_result.get("fileFormat") == "PDF/Adobe Acrobat":
is_pdf = True
# Calculate relevance score for each quarter dynamically
# This avoids hardcoding patterns
quarter_scores = {}
for quarter in quarters_requested:
score = 0
# PRIORITY #1: Company matching (if we detected primary company)
if primary_company:
domain_match = re.search(r'https?://(?:www\.)?([^/]+)', link)
if domain_match:
result_domain = domain_match.group(1)
result_company = domain_to_company.get(result_domain, None)
if result_company == primary_company:
score += 30 # HUGE bonus for matching primary company
elif result_company and result_company != primary_company:
score -= 100 # MASSIVE penalty for wrong company
# PRIORITY #2: HUGE bonus for PDF files - we want direct download links!
if is_pdf:
score += 20 # PDF links get massive priority
# PRIORITY #3: Check if quarter appears in title/snippet/link
if quarter in title or quarter in snippet or quarter in link.lower():
score += 10
# Also check for numeric representation (e.g., "1" for q1)
quarter_num = quarter[1] # Extract '1' from 'q1'
if f"q{quarter_num}" in title or f"q{quarter_num}" in snippet or f"q{quarter_num}" in link.lower():
score += 5
# Penalize index/landing pages
if not is_pdf:
index_indicators = ['default.aspx', 'investor-relations', '/overview/', 'index']
if any(indicator in link.lower() for indicator in index_indicators):
score -= 15 # Heavy penalty for index pages
quarter_scores[quarter] = score
# Assign to the quarter with highest score (if score > 0)
if quarter_scores:
best_quarter = max(quarter_scores.items(), key=lambda x: x[1])
if best_quarter[1] > 0: # Only assign if score > 0
quarter_results[best_quarter[0]].append({
"index": i,
"title": result.get("title", ""),
"link": link,
"snippet": result.get("snippet", ""),
"score": best_quarter[1],
"is_pdf": is_pdf
})
# Select best result for each requested quarter
selected_links = []
for quarter in quarters_requested:
if quarter_results[quarter]:
# Sort by score and get the best result (PDF links will rank highest)
sorted_results = sorted(quarter_results[quarter], key=lambda x: x.get("score", 0), reverse=True)
best_for_quarter = sorted_results[0]
selected_links.append({
"quarter": quarter.upper(),
"title": best_for_quarter["title"],
"link": best_for_quarter["link"],
"snippet": best_for_quarter["snippet"]
})
is_pdf_marker = "[PDF]" if best_for_quarter.get("is_pdf", False) else "[Web Page]"
logger.info(f"[MULTI-QUARTER] Found result for {quarter.upper()}: {is_pdf_marker} {best_for_quarter['title'][:50]} (score: {best_for_quarter['score']})")
else:
logger.warning(f"[MULTI-QUARTER] No result found for {quarter.upper()}")
if selected_links:
return {
"type": "multiple_download_links",
"links": selected_links,
"message": f"Found {len(selected_links)} financial reports for the requested quarters: {', '.join([q.upper() for q in quarters_requested])}",
"confidence": "high" if len(selected_links) == len(quarters_requested) else "medium",
"reasoning": f"Selected best result for each requested quarter. Found {len(selected_links)} out of {len(quarters_requested)} quarters."
}
# If we found a reasonable match (score > 0), return it
if best_match_index >= 0 and best_score > 0:
selected_result = formatted_results[best_match_index]
original_result = search_results[best_match_index]
# Check if the link is an index page that needs further parsing
link = selected_result["link"]
if not link.lower().endswith(".pdf") and ("investor" in link or "ir." in link or "financial-report" in link):
# Try to extract PDF links from the index page
pdf_links = await extract_pdf_links_from_page(link, user_request)
if pdf_links:
# For requests asking for multiple reports (like "2份" or "two"), return multiple links
if "2份" in user_request.lower() or "two" in user_request.lower() or "2" in user_request.lower():
# Return up to 2 most relevant PDF links
relevant_links = pdf_links[:2]
return {
"type": "download_links_extracted",
"links": relevant_links,
"message": f"Found {len(relevant_links)} most relevant financial reports for your request",
"confidence": "high" if best_score >= 5 else ("medium" if best_score >= 2 else "low"),
"reasoning": f"Selected based on relevance scoring (score: {best_score}) and extracted {len(relevant_links)} PDF links from index page."
}
else:
# Return the first PDF link found
pdf_link = pdf_links[0]
return {
"type": "download_link_extracted",
"title": f"{selected_result['title']} - {pdf_link.get('title', 'PDF Report')}",
"link": pdf_link["url"],
"snippet": pdf_link.get("snippet", selected_result["snippet"]),
"message": f"Found the most relevant financial report for your request: {pdf_link.get('title', 'PDF Report')}",
"confidence": "high" if best_score >= 5 else ("medium" if best_score >= 2 else "low"),
"reasoning": f"Selected based on relevance scoring (score: {best_score}) and extracted PDF link from index page."
}
return {
"type": "download_link_extracted",
"title": selected_result["title"],
"link": selected_result["link"],
"snippet": selected_result["snippet"],
"message": f"Found the most relevant financial report for your request: {selected_result['title']}",
"confidence": "high" if best_score >= 5 else ("medium" if best_score >= 2 else "low"),
"reasoning": f"Selected based on relevance scoring (score: {best_score}). This result matches key terms in your request."
}
else:
# If no clearly relevant results, return the first result with low confidence
if search_results:
first_result = search_results[0]
link = first_result.get("link", "")
# Check if the link is an index page that needs further parsing
if not link.lower().endswith(".pdf") and ("investor" in link or "ir." in link or "financial-report" in link):
# Try to extract PDF links from the index page
pdf_links = await extract_pdf_links_from_page(link, user_request)
if pdf_links:
# For requests asking for multiple reports (like "2份" or "two"), return multiple links
if "2份" in user_request.lower() or "two" in user_request.lower() or "2" in user_request.lower():
# Return up to 2 most relevant PDF links
relevant_links = pdf_links[:2]
return {
"type": "download_links_extracted",
"links": relevant_links,
"message": f"Found {len(relevant_links)} most relevant financial reports for your request",
"confidence": "low",
"reasoning": f"Extracted {len(relevant_links)} PDF links from index page. No highly relevant results found using keyword matching."
}
else:
# Return the first PDF link found
pdf_link = pdf_links[0]
return {
"type": "download_link_extracted",
"title": pdf_link.get("title", f"{first_result.get('title', 'Financial Report')} - PDF"),
"link": pdf_link["url"],
"snippet": pdf_link.get("snippet", first_result.get("snippet", "")),
"message": f"Found a potential financial report: {pdf_link.get('title', 'PDF Report')}",
"confidence": "low",
"reasoning": "Extracted PDF link from index page. No highly relevant results found using keyword matching."
}
return {
"type": "download_link_extracted",
"title": first_result.get("title", ""),
"link": first_result.get("link", ""),
"snippet": first_result.get("snippet", ""),
"message": "Found a potential financial report, but it may not exactly match your request.",
"confidence": "low",
"reasoning": "No highly relevant results found using keyword matching."
}
else:
return {
"type": "no_results",
"message": "No search results available to analyze.",
"suggestion": "Please try a different search or provide a direct URL.",
"reasoning": "No search results were provided for analysis."
}
except Exception as e:
logger.error(f"Error in deep analysis: {str(e)}")
return {
"type": "analysis_error",
"error": str(e),
"message": f"Error occurred while analyzing search results: {str(e)}",
"suggestion": "Please try again or provide a direct URL for the financial report."
}
# Resource for accessing extracted financial report content
@mcp.resource("financial-report://{filename}")
def get_financial_report_content(filename: str) -> str:
"""
Get the content of an extracted financial report
Args:
filename: Name of the extracted file
Returns:
Content of the financial report
"""
# Use absolute path to ensure correct file access in different environments
reports_dir = Path("financial_reports").absolute()
file_path = reports_dir / filename
if not file_path.exists():
# Also check with relative path as fallback
relative_path = Path("financial_reports") / filename
if relative_path.exists():
file_path = relative_path
else:
raise Exception(f"File not found: {filename}. Searched in {reports_dir} and relative path {relative_path}")
# Handle PDF files properly
if filename.lower().endswith('.pdf'):
try:
import pdfplumber
with pdfplumber.open(file_path) as pdf:
text = ""
for page in pdf.pages:
text += page.extract_text() or ""
return text
except Exception as e:
# If PDF extraction fails, return error message
logger.error(f"Error extracting text from PDF {filename}: {str(e)}")
return f"Error extracting text from PDF {filename}: {str(e)}"
else:
# For text-based files, read normally
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
if __name__ == "__main__":
# Run the server with stdio transport
# Note: We should avoid printing to stdout here as it interferes with stdio communication
# Log to stderr instead
import sys
print("MCP SDK imported successfully", file=sys.stderr)
mcp.run(transport="stdio")