search-web-MCP-server / src /analyzer /openai_analyzer.py
Xenobd's picture
Update src/analyzer/openai_analyzer.py
78b82dc verified
from typing import List
from openai import OpenAI
import tiktoken
from src.core.types import DEFAULT_OPENAI_ANALYZER, DEFAULT_SYSTEM_PROMPT, DEFAULT_USER_PROMPT
from src.models.analyzer_models import AnalyzerResult
from src.models.scrape_models import ScrapeResult
from src.core.interface.analyzer_interface import AnalyzerInterface
class OpenaiAnalyzer(AnalyzerInterface):
def __init__(self, api_key, model_name=DEFAULT_OPENAI_ANALYZER):
self.client = OpenAI(api_key=api_key)
self.model_name = model_name
# Initialize tokenizer for the model
self.tokenizer = tiktoken.encoding_for_model(model_name if model_name != "o1" else "gpt-4")
def _count_tokens(self, text: str) -> int:
"""Count tokens in text using tiktoken."""
return len(self.tokenizer.encode(text))
def _truncate_content(self, content: str, max_tokens: int = 2000) -> str:
"""
Truncate content to a maximum token count intelligently.
Args:
content: The text content to truncate
max_tokens: Maximum number of tokens allowed
Returns:
Truncated content
"""
tokens = self.tokenizer.encode(content)
if len(tokens) <= max_tokens:
return content
# Truncate to max_tokens and add ellipsis
truncated_tokens = tokens[:max_tokens]
return self.tokenizer.decode(truncated_tokens) + "... [truncated]"
def _summarize_scrape_result(self, result: ScrapeResult) -> ScrapeResult:
"""
Summarize a single ScrapeResult's content to reduce token usage.
Args:
result: Original ScrapeResult object
Returns:
New ScrapeResult with summarized content
"""
try:
# Assuming ScrapeResult has 'content' and 'url' attributes
# Adjust field names based on your actual ScrapeResult structure
# If content is already short enough, return as-is
content_tokens = self._count_tokens(result.content)
if content_tokens <= 500: # Adjust threshold as needed
return result
# Create a summary prompt
summary_prompt = f"""
Summarize the key information from this text in 3-4 concise bullet points.
Focus on the most important facts, findings, or arguments.
Keep the summary under 150 words.
Text to summarize: {self._truncate_content(result.content, 3000)}
"""
# Get summary from OpenAI
completion = self.client.chat.completions.create(
model=self.model_name,
messages=[
{"role": "system", "content": "You are a helpful summarizer. Create concise, informative summaries."},
{"role": "user", "content": summary_prompt}
],
max_tokens=300,
temperature=0.3
)
summary = completion.choices[0].message.content
# Create a new ScrapeResult with summarized content
# Adjust based on your actual ScrapeResult constructor
return ScrapeResult(
url=result.url,
title=result.title,
content=summary, # Use summary instead of full content
# Copy any other necessary attributes from the original result
**{k: v for k, v in result.__dict__.items()
if k not in ['url', 'title', 'content']}
)
except Exception as e:
# If summarization fails, return truncated version
print(f"Summarization failed for {result.url}: {str(e)}")
return ScrapeResult(
url=result.url,
title=result.title,
content=self._truncate_content(result.content, 500),
**{k: v for k, v in result.__dict__.items()
if k not in ['url', 'title', 'content']}
)
def _filter_and_summarize_results(
self,
search_results: List[ScrapeResult],
max_total_tokens: int = 6000
) -> List[ScrapeResult]:
"""
Filter and summarize search results to fit within token limits.
Args:
search_results: Original list of ScrapeResult objects
max_total_tokens: Maximum total tokens allowed for all results
Returns:
Processed list of ScrapeResult objects
"""
processed_results = []
current_tokens = 0
# Sort by relevance if you have a relevance score, otherwise take first N
# Assuming you might want to prioritize certain results
sorted_results = search_results[:] # Could sort by relevance_score if available
for result in sorted_results:
# Summarize the result
summarized_result = self._summarize_scrape_result(result)
# Estimate tokens for this result in the final prompt
# We'll count tokens for a string representation
result_str = f"{summarized_result}"
result_tokens = self._count_tokens(result_str)
# Check if adding this result would exceed token limit
if current_tokens + result_tokens > max_total_tokens:
# Option 1: Skip remaining results
break
# Option 2: Add with further truncation
# summarized_result.content = self._truncate_content(summarized_result.content, 100)
# result_tokens = self._count_tokens(f"{summarized_result}")
processed_results.append(summarized_result)
current_tokens += result_tokens
# Optional: Limit number of results
if len(processed_results) >= 10: # Adjust as needed
break
return processed_results
def analyze_search_result(self, query: str, search_results: List[ScrapeResult]) -> AnalyzerResult:
"""
Analyzes the provided search results based on the given query.
Args:
query (str): The search query string.
search_results (List[ScrapeResult]): A list of search results to be analyzed.
Returns:
AnalyzerResult: The result of the analysis.
Raises:
Exception: If an error occurs during analysis.
"""
try:
# Step 1: Filter and summarize results to fit within token limits
processed_results = self._filter_and_summarize_results(
search_results,
max_total_tokens=6000 # Leave room for prompt and response
)
# Step 2: Check total token count before proceeding
results_str = f"{processed_results}"
prompt_without_results = DEFAULT_USER_PROMPT.replace("query", query).replace("scrape_results", "")
total_prompt_tokens = self._count_tokens(
DEFAULT_SYSTEM_PROMPT + prompt_without_results + results_str
)
print(f"Total prompt tokens: {total_prompt_tokens}")
if total_prompt_tokens > 8000: # Safety margin
# Further truncate if still too large
for result in processed_results:
result.content = self._truncate_content(result.content, 200)
# Step 3: Construct final prompt
user_prompt = DEFAULT_USER_PROMPT.replace("query", query).replace("scrape_results", f"{processed_results}")
# Step 4: Call OpenAI API with structured output
completion = self.client.beta.chat.completions.parse(
model=self.model_name,
messages=[
{
"role": "system",
"content": DEFAULT_SYSTEM_PROMPT
},
{
"role": "user",
"content": user_prompt
}
],
response_format=AnalyzerResult
)
response = completion.choices[0].message.parsed
return response
except Exception as e:
raise Exception(f"Error while analyzing search result: {str(e)}")