Spaces:
Sleeping
Sleeping
| from typing import List | |
| from openai import OpenAI | |
| import tiktoken | |
| from src.core.types import DEFAULT_OPENAI_ANALYZER, DEFAULT_SYSTEM_PROMPT, DEFAULT_USER_PROMPT | |
| from src.models.analyzer_models import AnalyzerResult | |
| from src.models.scrape_models import ScrapeResult | |
| from src.core.interface.analyzer_interface import AnalyzerInterface | |
| class OpenaiAnalyzer(AnalyzerInterface): | |
| def __init__(self, api_key, model_name=DEFAULT_OPENAI_ANALYZER): | |
| self.client = OpenAI(api_key=api_key) | |
| self.model_name = model_name | |
| # Initialize tokenizer for the model | |
| self.tokenizer = tiktoken.encoding_for_model(model_name if model_name != "o1" else "gpt-4") | |
| def _count_tokens(self, text: str) -> int: | |
| """Count tokens in text using tiktoken.""" | |
| return len(self.tokenizer.encode(text)) | |
| def _truncate_content(self, content: str, max_tokens: int = 2000) -> str: | |
| """ | |
| Truncate content to a maximum token count intelligently. | |
| Args: | |
| content: The text content to truncate | |
| max_tokens: Maximum number of tokens allowed | |
| Returns: | |
| Truncated content | |
| """ | |
| tokens = self.tokenizer.encode(content) | |
| if len(tokens) <= max_tokens: | |
| return content | |
| # Truncate to max_tokens and add ellipsis | |
| truncated_tokens = tokens[:max_tokens] | |
| return self.tokenizer.decode(truncated_tokens) + "... [truncated]" | |
| def _summarize_scrape_result(self, result: ScrapeResult) -> ScrapeResult: | |
| """ | |
| Summarize a single ScrapeResult's content to reduce token usage. | |
| Args: | |
| result: Original ScrapeResult object | |
| Returns: | |
| New ScrapeResult with summarized content | |
| """ | |
| try: | |
| # Assuming ScrapeResult has 'content' and 'url' attributes | |
| # Adjust field names based on your actual ScrapeResult structure | |
| # If content is already short enough, return as-is | |
| content_tokens = self._count_tokens(result.content) | |
| if content_tokens <= 500: # Adjust threshold as needed | |
| return result | |
| # Create a summary prompt | |
| summary_prompt = f""" | |
| Summarize the key information from this text in 3-4 concise bullet points. | |
| Focus on the most important facts, findings, or arguments. | |
| Keep the summary under 150 words. | |
| Text to summarize: {self._truncate_content(result.content, 3000)} | |
| """ | |
| # Get summary from OpenAI | |
| completion = self.client.chat.completions.create( | |
| model=self.model_name, | |
| messages=[ | |
| {"role": "system", "content": "You are a helpful summarizer. Create concise, informative summaries."}, | |
| {"role": "user", "content": summary_prompt} | |
| ], | |
| max_tokens=300, | |
| temperature=0.3 | |
| ) | |
| summary = completion.choices[0].message.content | |
| # Create a new ScrapeResult with summarized content | |
| # Adjust based on your actual ScrapeResult constructor | |
| return ScrapeResult( | |
| url=result.url, | |
| title=result.title, | |
| content=summary, # Use summary instead of full content | |
| # Copy any other necessary attributes from the original result | |
| **{k: v for k, v in result.__dict__.items() | |
| if k not in ['url', 'title', 'content']} | |
| ) | |
| except Exception as e: | |
| # If summarization fails, return truncated version | |
| print(f"Summarization failed for {result.url}: {str(e)}") | |
| return ScrapeResult( | |
| url=result.url, | |
| title=result.title, | |
| content=self._truncate_content(result.content, 500), | |
| **{k: v for k, v in result.__dict__.items() | |
| if k not in ['url', 'title', 'content']} | |
| ) | |
| def _filter_and_summarize_results( | |
| self, | |
| search_results: List[ScrapeResult], | |
| max_total_tokens: int = 6000 | |
| ) -> List[ScrapeResult]: | |
| """ | |
| Filter and summarize search results to fit within token limits. | |
| Args: | |
| search_results: Original list of ScrapeResult objects | |
| max_total_tokens: Maximum total tokens allowed for all results | |
| Returns: | |
| Processed list of ScrapeResult objects | |
| """ | |
| processed_results = [] | |
| current_tokens = 0 | |
| # Sort by relevance if you have a relevance score, otherwise take first N | |
| # Assuming you might want to prioritize certain results | |
| sorted_results = search_results[:] # Could sort by relevance_score if available | |
| for result in sorted_results: | |
| # Summarize the result | |
| summarized_result = self._summarize_scrape_result(result) | |
| # Estimate tokens for this result in the final prompt | |
| # We'll count tokens for a string representation | |
| result_str = f"{summarized_result}" | |
| result_tokens = self._count_tokens(result_str) | |
| # Check if adding this result would exceed token limit | |
| if current_tokens + result_tokens > max_total_tokens: | |
| # Option 1: Skip remaining results | |
| break | |
| # Option 2: Add with further truncation | |
| # summarized_result.content = self._truncate_content(summarized_result.content, 100) | |
| # result_tokens = self._count_tokens(f"{summarized_result}") | |
| processed_results.append(summarized_result) | |
| current_tokens += result_tokens | |
| # Optional: Limit number of results | |
| if len(processed_results) >= 10: # Adjust as needed | |
| break | |
| return processed_results | |
| def analyze_search_result(self, query: str, search_results: List[ScrapeResult]) -> AnalyzerResult: | |
| """ | |
| Analyzes the provided search results based on the given query. | |
| Args: | |
| query (str): The search query string. | |
| search_results (List[ScrapeResult]): A list of search results to be analyzed. | |
| Returns: | |
| AnalyzerResult: The result of the analysis. | |
| Raises: | |
| Exception: If an error occurs during analysis. | |
| """ | |
| try: | |
| # Step 1: Filter and summarize results to fit within token limits | |
| processed_results = self._filter_and_summarize_results( | |
| search_results, | |
| max_total_tokens=6000 # Leave room for prompt and response | |
| ) | |
| # Step 2: Check total token count before proceeding | |
| results_str = f"{processed_results}" | |
| prompt_without_results = DEFAULT_USER_PROMPT.replace("query", query).replace("scrape_results", "") | |
| total_prompt_tokens = self._count_tokens( | |
| DEFAULT_SYSTEM_PROMPT + prompt_without_results + results_str | |
| ) | |
| print(f"Total prompt tokens: {total_prompt_tokens}") | |
| if total_prompt_tokens > 8000: # Safety margin | |
| # Further truncate if still too large | |
| for result in processed_results: | |
| result.content = self._truncate_content(result.content, 200) | |
| # Step 3: Construct final prompt | |
| user_prompt = DEFAULT_USER_PROMPT.replace("query", query).replace("scrape_results", f"{processed_results}") | |
| # Step 4: Call OpenAI API with structured output | |
| completion = self.client.beta.chat.completions.parse( | |
| model=self.model_name, | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": DEFAULT_SYSTEM_PROMPT | |
| }, | |
| { | |
| "role": "user", | |
| "content": user_prompt | |
| } | |
| ], | |
| response_format=AnalyzerResult | |
| ) | |
| response = completion.choices[0].message.parsed | |
| return response | |
| except Exception as e: | |
| raise Exception(f"Error while analyzing search result: {str(e)}") |