| | """ |
| | Cache manager for storing and retrieving agent answers. |
| | """ |
| |
|
| | import os |
| | import json |
| | import hashlib |
| | from typing import Optional, Dict, Any, List |
| | from datetime import datetime |
| |
|
| | class CacheManager: |
| | """Manages caching of agent answers to avoid redundant processing.""" |
| |
|
| | def __init__(self, cache_dir: str = "cache"): |
| | self.cache_dir = cache_dir |
| | self.ensure_cache_dir() |
| |
|
| | def ensure_cache_dir(self): |
| | """Create cache directory if it doesn't exist.""" |
| | if not os.path.exists(self.cache_dir): |
| | os.makedirs(self.cache_dir) |
| |
|
| | def _get_question_hash(self, question: str) -> str: |
| | """Generate a hash for the question to use as filename.""" |
| | return hashlib.md5(question.encode('utf-8')).hexdigest()[:12] |
| |
|
| | def _get_cache_path(self, question: str) -> str: |
| | """Get the cache file path for a question.""" |
| | question_hash = self._get_question_hash(question) |
| | return os.path.join(self.cache_dir, f"question_{question_hash}.json") |
| |
|
| | def get_cached_answer(self, question: str) -> Optional[Dict[str, Any]]: |
| | """ |
| | Retrieve cached answer for a question. |
| | |
| | Args: |
| | question: The question to look up |
| | |
| | Returns: |
| | Dictionary with answer, iterations, and metadata if cached, None otherwise |
| | """ |
| | cache_path = self._get_cache_path(question) |
| | if not os.path.exists(cache_path): |
| | return None |
| | try: |
| | with open(cache_path, 'r', encoding='utf-8') as f: |
| | data = json.load(f) |
| | answers = data.get('answers', []) |
| | if not answers: |
| | return None |
| | last_answer = answers[-1] |
| | return { |
| | 'answer': last_answer.get('answer', ''), |
| | 'iterations': last_answer.get('iterations', 0), |
| | 'timestamp': last_answer.get('timestamp', ''), |
| | 'cache_valid': data.get('cache_valid', False), |
| | 'file_name': data.get('file_name', None) |
| | } |
| | except Exception as e: |
| | print(f"Error reading cache: {e}") |
| | return None |
| |
|
| | def cache_answer(self, question: str, answer: Optional[str], iterations: int = 1, file_name: Optional[str] = None) -> bool: |
| | """ |
| | Cache an answer for a question with iteration count. |
| | |
| | Args: |
| | question: The question that was asked |
| | answer: The answer to cache |
| | iterations: Number of iterations/steps used (should be 1-10 typically) |
| | |
| | Returns: |
| | True if cached successfully, False otherwise |
| | """ |
| | cache_path = self._get_cache_path(question) |
| | cache_valid = bool(answer and self.validate_answer_content(answer)) |
| | now = datetime.now().isoformat() |
| | try: |
| | if os.path.exists(cache_path): |
| | with open(cache_path, 'r', encoding='utf-8') as f: |
| | data = json.load(f) |
| | else: |
| | data = { |
| | 'question': question, |
| | 'answers': [], |
| | 'cache_valid': False, |
| | 'file_name': file_name |
| | } |
| | |
| | if file_name: |
| | data['file_name'] = file_name |
| | print(f"[CacheManager] file_name submitted: {file_name}") |
| | |
| | if cache_valid: |
| | data['answers'].append({ |
| | 'answer': answer, |
| | 'iterations': iterations, |
| | 'timestamp': now |
| | }) |
| | data['cache_valid'] = True |
| | else: |
| | |
| | data['answers'].append({ |
| | 'answer': answer if answer else "", |
| | 'iterations': iterations, |
| | 'timestamp': now |
| | }) |
| | data['cache_valid'] = False |
| | with open(cache_path, 'w', encoding='utf-8') as f: |
| | json.dump(data, f, indent=2) |
| | return True |
| | except Exception as e: |
| | print(f"Error caching answer: {e}") |
| | return False |
| |
|
| | def validate_answer_content(self, answer: str) -> bool: |
| | """ |
| | Validate that answer content is reasonable to cache. |
| | Error messages and corrupted responses should NOT be cached as valid. |
| | |
| | Args: |
| | answer: The answer content to validate |
| | |
| | Returns: |
| | True if answer is valid to cache, False otherwise |
| | """ |
| | if not answer or not isinstance(answer, str): |
| | return False |
| | |
| | clean_answer = answer.strip() |
| | if len(clean_answer) < 3: |
| | return False |
| | |
| | |
| | error_patterns = [ |
| | 'error calling llm', |
| | 'error running agent', |
| | 'error in', |
| | 'error processing', |
| | 'litellm.badrequest', |
| | 'litellm.exception', |
| | 'vertexaiexception', |
| | 'badrequest', |
| | 'invalid_argument', |
| | 'authentication', |
| | 'credentials', |
| | 'api key', |
| | 'traceback', |
| | 'exception occurred', |
| | 'failed to', |
| | 'unable to submit', |
| | 'mimetype parameter', |
| | 'not supported' |
| | ] |
| | |
| | |
| | lower_answer = clean_answer.lower() |
| | for pattern in error_patterns: |
| | if pattern in lower_answer: |
| | print(f"[CacheManager] Rejecting answer containing error pattern: '{pattern}'") |
| | return False |
| | |
| | |
| | corrupt_patterns = [']', '[', '{}', '()', '""', "''", 'null', 'undefined'] |
| | if clean_answer in corrupt_patterns: |
| | return False |
| | |
| | |
| | if all(c in '[]{}()' for c in clean_answer): |
| | return False |
| | |
| | return True |
| |
|
| | def clear_cache(self): |
| | """Clear all cached answers.""" |
| | try: |
| | for filename in os.listdir(self.cache_dir): |
| | file_path = os.path.join(self.cache_dir, filename) |
| | if os.path.isfile(file_path): |
| | os.remove(file_path) |
| | print("Cache cleared successfully") |
| | except Exception as e: |
| | print(f"Error clearing cache: {e}") |
| |
|
| | def list_cached_questions(self) -> List[Dict[str, Any]]: |
| | """List all cached questions with metadata.""" |
| | cached_questions = [] |
| | try: |
| | for filename in os.listdir(self.cache_dir): |
| | if filename.startswith('question_') and filename.endswith('.json'): |
| | cache_path = os.path.join(self.cache_dir, filename) |
| | with open(cache_path, 'r', encoding='utf-8') as f: |
| | data = json.load(f) |
| | cached_questions.append({ |
| | 'question': data.get('question', ''), |
| | 'cache_valid': data.get('cache_valid', False), |
| | 'file_name': data.get('file_name', None), |
| | 'last_timestamp': data['answers'][-1]['timestamp'] if data.get('answers') else None |
| | }) |
| | except Exception as e: |
| | print(f"Error listing cached questions: {e}") |
| | return sorted(cached_questions, key=lambda x: x.get('last_timestamp', ''), reverse=True) |
| | |
| | def cleanup_invalid_cache_entries(self) -> int: |
| | """ |
| | Clean up cache entries that contain error messages or invalid content. |
| | |
| | Returns: |
| | Number of entries cleaned up |
| | """ |
| | cleaned_count = 0 |
| | try: |
| | for filename in os.listdir(self.cache_dir): |
| | if filename.startswith('question_') and filename.endswith('.json'): |
| | cache_path = os.path.join(self.cache_dir, filename) |
| | |
| | try: |
| | with open(cache_path, 'r', encoding='utf-8') as f: |
| | data = json.load(f) |
| | |
| | |
| | should_cleanup = False |
| | |
| | |
| | if data.get('cache_valid', False): |
| | answers = data.get('answers', []) |
| | for answer_entry in answers: |
| | answer_text = answer_entry.get('answer', '') |
| | if not self.validate_answer_content(answer_text): |
| | print(f"Found invalid cached answer in {filename}: {answer_text[:100]}...") |
| | should_cleanup = True |
| | break |
| | |
| | if should_cleanup: |
| | |
| | data['cache_valid'] = False |
| | with open(cache_path, 'w', encoding='utf-8') as f: |
| | json.dump(data, f, indent=2) |
| | cleaned_count += 1 |
| | print(f"Cleaned up invalid cache entry: {filename}") |
| | |
| | except Exception as e: |
| | print(f"Error processing cache file {filename}: {e}") |
| | continue |
| | |
| | except Exception as e: |
| | print(f"Error during cache cleanup: {e}") |
| | |
| | print(f"Cache cleanup completed. {cleaned_count} entries cleaned up.") |
| | return cleaned_count |