Spaces:
Sleeping
Sleeping
| """ | |
| Utilities for analyzing and understanding questions. | |
| """ | |
| import re | |
| import json | |
| import os | |
| from typing import Dict, Any, List, Optional, Tuple, Set | |
| class QuestionAnalyzer: | |
| """ | |
| Class for analyzing and understanding questions. | |
| """ | |
| def __init__(self, resource_dir: str, metadata_path: Optional[str] = None): | |
| """ | |
| Initialize the question analyzer. | |
| Args: | |
| resource_dir: Directory containing resource files | |
| metadata_path: Path to the metadata file (optional) | |
| """ | |
| self.resource_dir = resource_dir | |
| self.metadata_path = metadata_path or os.path.join(resource_dir, 'metadata.jsonl') | |
| self.metadata = self._load_metadata() | |
| def _load_metadata(self) -> Dict[str, Dict[str, Any]]: | |
| """ | |
| Load metadata from the metadata file. | |
| Returns: | |
| Dictionary mapping task IDs to metadata | |
| """ | |
| metadata = {} | |
| if os.path.exists(self.metadata_path): | |
| try: | |
| with open(self.metadata_path, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| entry = json.loads(line.strip()) | |
| task_id = entry.get('task_id') | |
| if task_id: | |
| metadata[task_id] = entry | |
| except Exception as e: | |
| print(f"Error loading metadata: {e}") | |
| return metadata | |
| def extract_file_mention(self, question: str) -> Optional[str]: | |
| """ | |
| Extract mentioned file name from the question. | |
| Args: | |
| question: The question to analyze | |
| Returns: | |
| Mentioned file name, or None if no file is mentioned | |
| """ | |
| # Look for "attached file" or "attached spreadsheet" patterns | |
| attached_pattern = r'attached (?:file|spreadsheet|document|image|picture|pdf|excel|csv|text file|zip|archive) (?:named |called |")?([\w\.-]+)' | |
| match = re.search(attached_pattern, question, re.IGNORECASE) | |
| if match: | |
| return match.group(1) | |
| # Look for file extensions | |
| extensions = [ | |
| '.xlsx', '.xls', '.csv', '.txt', '.pdf', '.jpg', '.jpeg', | |
| '.png', '.docx', '.pptx', '.json', '.jsonld', '.zip', '.pdb', '.py' | |
| ] | |
| for ext in extensions: | |
| pattern = r'(\w+(?:-\w+)*' + re.escape(ext) + r')' | |
| match = re.search(pattern, question, re.IGNORECASE) | |
| if match: | |
| return match.group(1) | |
| return None | |
| def find_relevant_file(self, question: str, task_id: Optional[str] = None) -> Optional[str]: | |
| """ | |
| Find the relevant file for a question. | |
| Args: | |
| question: The question to analyze | |
| task_id: The task ID (optional) | |
| Returns: | |
| Path to the relevant file, or None if no file is found | |
| """ | |
| # Check if task_id is in metadata and has a file_name | |
| if task_id and task_id in self.metadata: | |
| file_name = self.metadata[task_id].get('file_name') | |
| if file_name and file_name.strip(): # Make sure file_name is not empty | |
| file_path = os.path.join(self.resource_dir, file_name) | |
| if os.path.exists(file_path): | |
| print(f"Found file in metadata for task_id {task_id}: {file_path}") | |
| return file_path | |
| # Try to find task_id in all metadata entries by matching the question | |
| if not task_id: | |
| for entry_id, entry in self.metadata.items(): | |
| if entry.get('Question') and entry.get('Question') == question: | |
| file_name = entry.get('file_name') | |
| if file_name and file_name.strip(): | |
| file_path = os.path.join(self.resource_dir, file_name) | |
| if os.path.exists(file_path): | |
| print(f"Found file in metadata by matching question: {file_path}") | |
| return file_path | |
| # Extract file mention from question | |
| file_mention = self.extract_file_mention(question) | |
| if file_mention: | |
| # Check if the mentioned file exists | |
| file_path = os.path.join(self.resource_dir, file_mention) | |
| if os.path.exists(file_path): | |
| print(f"Found file by direct mention: {file_path}") | |
| return file_path | |
| # Check if there's a file with a similar name | |
| for file_name in os.listdir(self.resource_dir): | |
| if file_mention.lower() in file_name.lower(): | |
| file_path = os.path.join(self.resource_dir, file_name) | |
| print(f"Found file by partial name match: {file_path}") | |
| return file_path | |
| # Look for UUID pattern in the question which might be a file name without extension | |
| uuid_pattern = r'([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})' | |
| uuid_match = re.search(uuid_pattern, question, re.IGNORECASE) | |
| if uuid_match: | |
| uuid = uuid_match.group(1) | |
| for file_name in os.listdir(self.resource_dir): | |
| if uuid in file_name: | |
| file_path = os.path.join(self.resource_dir, file_name) | |
| print(f"Found file by UUID match: {file_path}") | |
| return file_path | |
| # If no file is found, try to find a file mentioned in the metadata | |
| if task_id and task_id in self.metadata: | |
| # Extract keywords from the question | |
| keywords = self._extract_keywords(question) | |
| # Check all files in the resource directory | |
| best_match = None | |
| best_score = 0 | |
| for file_name in os.listdir(self.resource_dir): | |
| # Skip metadata file | |
| if file_name == 'metadata.jsonl': | |
| continue | |
| # Calculate score based on keyword matches | |
| score = 0 | |
| for keyword in keywords: | |
| if keyword.lower() in file_name.lower(): | |
| score += 1 | |
| if score > best_score: | |
| best_score = score | |
| best_match = file_name | |
| if best_match: | |
| file_path = os.path.join(self.resource_dir, best_match) | |
| print(f"Found file by keyword matching: {file_path}") | |
| return file_path | |
| # If still no match, check the content of metadata.jsonl for clues | |
| try: | |
| with open(self.metadata_path, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| entry = json.loads(line.strip()) | |
| if 'Question' in entry and entry['Question'] and 'file_name' in entry and entry['file_name']: | |
| # Compare with current question | |
| if self._questions_are_similar(question, entry['Question']): | |
| file_name = entry['file_name'] | |
| file_path = os.path.join(self.resource_dir, file_name) | |
| if os.path.exists(file_path): | |
| print(f"Found file by similar question in metadata: {file_path}") | |
| return file_path | |
| except Exception as e: | |
| print(f"Error searching metadata for similar questions: {e}") | |
| return None | |
| def _questions_are_similar(self, q1: str, q2: str) -> bool: | |
| """ | |
| Check if two questions are similar. | |
| Args: | |
| q1: First question | |
| q2: Second question | |
| Returns: | |
| True if the questions are similar, False otherwise | |
| """ | |
| # Convert to lowercase and remove punctuation | |
| q1 = re.sub(r'[^\w\s]', '', q1.lower()) | |
| q2 = re.sub(r'[^\w\s]', '', q2.lower()) | |
| # Split into words | |
| words1 = set(q1.split()) | |
| words2 = set(q2.split()) | |
| # Calculate Jaccard similarity | |
| intersection = len(words1.intersection(words2)) | |
| union = len(words1.union(words2)) | |
| if union == 0: | |
| return False | |
| similarity = intersection / union | |
| # Return True if similarity is above threshold | |
| return similarity > 0.5 | |
| def _extract_keywords(self, text: str) -> Set[str]: | |
| """ | |
| Extract keywords from text. | |
| Args: | |
| text: The text to analyze | |
| Returns: | |
| Set of keywords | |
| """ | |
| # Remove common stop words | |
| stop_words = { | |
| 'a', 'an', 'the', 'and', 'or', 'but', 'if', 'then', 'else', 'when', | |
| 'at', 'from', 'by', 'for', 'with', 'about', 'against', 'between', | |
| 'into', 'through', 'during', 'before', 'after', 'above', 'below', | |
| 'to', 'of', 'in', 'on', 'is', 'are', 'was', 'were', 'be', 'been', | |
| 'being', 'have', 'has', 'had', 'having', 'do', 'does', 'did', | |
| 'doing', 'would', 'should', 'could', 'might', 'will', 'shall', | |
| 'can', 'may', 'must', 'ought' | |
| } | |
| # Extract words | |
| words = re.findall(r'\b\w+\b', text.lower()) | |
| # Filter out stop words and short words | |
| keywords = {word for word in words if word not in stop_words and len(word) > 2} | |
| return keywords | |
| def analyze_question(self, question: str, task_id: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Analyze a question to understand what it's asking. | |
| Args: | |
| question: The question to analyze | |
| task_id: The task ID (optional) | |
| Returns: | |
| Dictionary containing analysis results | |
| """ | |
| result = { | |
| 'question': question, | |
| 'task_id': task_id, | |
| 'file_path': None, | |
| 'keywords': list(self._extract_keywords(question)), | |
| 'expected_answer': None, | |
| } | |
| # Try to extract task_id from the question if not provided | |
| if not task_id: | |
| task_id_match = re.search(r'task_id[: ]+([\w\-]+)', question, re.IGNORECASE) | |
| if task_id_match: | |
| result['task_id'] = task_id_match.group(1) | |
| task_id = result['task_id'] | |
| # Find relevant file | |
| file_path = self.find_relevant_file(question, task_id) | |
| if file_path: | |
| result['file_path'] = file_path | |
| # Get expected answer if available | |
| if task_id and task_id in self.metadata: | |
| # Check multiple possible fields for the answer | |
| for answer_field in ['answer', 'Final answer', 'expected_answer']: | |
| if answer_field in self.metadata[task_id]: | |
| result['expected_answer'] = self.metadata[task_id].get(answer_field) | |
| break | |
| # If we still don't have an expected answer, search the metadata file again | |
| if not result['expected_answer'] and os.path.exists(self.metadata_path): | |
| try: | |
| with open(self.metadata_path, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| try: | |
| entry = json.loads(line.strip()) | |
| if entry.get('task_id') == task_id: | |
| for answer_field in ['answer', 'Final answer', 'expected_answer']: | |
| if answer_field in entry: | |
| result['expected_answer'] = entry[answer_field] | |
| break | |
| if result['expected_answer']: | |
| break | |
| # Also check if the task_id is in the question field | |
| if task_id and 'question' in entry and task_id in entry['question']: | |
| for answer_field in ['answer', 'Final answer', 'expected_answer']: | |
| if answer_field in entry: | |
| result['expected_answer'] = entry[answer_field] | |
| break | |
| if result['expected_answer']: | |
| break | |
| except json.JSONDecodeError: | |
| continue | |
| except Exception as e: | |
| print(f"Error searching metadata for expected answer: {e}") | |
| return result | |
| def find_file_by_task_id(self, task_id: str) -> Optional[str]: | |
| """ | |
| Find a file path by task_id in metadata. | |
| Args: | |
| task_id: The task ID | |
| Returns: | |
| File path if found, None otherwise | |
| """ | |
| if not task_id: | |
| return None | |
| # Check if we have this task_id in our metadata | |
| if task_id in self.metadata: | |
| file_name = self.metadata[task_id].get('file_name') | |
| if file_name: | |
| file_path = os.path.join(self.resource_dir, file_name) | |
| if os.path.exists(file_path): | |
| print(f"Found file in metadata for task_id {task_id}: {file_path}") | |
| return file_path | |
| # Search through metadata file again to find the task_id | |
| try: | |
| with open(self.metadata_path, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| try: | |
| entry = json.loads(line.strip()) | |
| if entry.get('task_id') == task_id and 'file_name' in entry: | |
| file_name = entry['file_name'] | |
| file_path = os.path.join(self.resource_dir, file_name) | |
| if os.path.exists(file_path): | |
| print(f"Found file in metadata for task_id {task_id}: {file_path}") | |
| return file_path | |
| # If the file doesn't exist with the exact path, look for similar files | |
| for existing_file in os.listdir(self.resource_dir): | |
| if task_id in existing_file: | |
| file_path = os.path.join(self.resource_dir, existing_file) | |
| print(f"Found file matching task_id {task_id}: {file_path}") | |
| return file_path | |
| except json.JSONDecodeError: | |
| continue | |
| except Exception as e: | |
| print(f"Error searching metadata for file by task_id: {e}") | |
| return None | |