Spaces:
Sleeping
Sleeping
| """ | |
| Data processor for processing extracted data. | |
| """ | |
| import re | |
| import os | |
| import json | |
| from typing import Dict, Any, List, Optional, Tuple, Union | |
| import pandas as pd | |
| class DataProcessor: | |
| """ | |
| Class for processing extracted data. | |
| """ | |
| def __init__(self): | |
| """Initialize the data processor.""" | |
| pass | |
| def process_excel_data(self, data: Dict[str, pd.DataFrame], question: str) -> str: | |
| """ | |
| Process data extracted from an Excel file. | |
| Args: | |
| data: Dictionary mapping sheet names to DataFrames | |
| question: The question to answer | |
| Returns: | |
| Answer to the question | |
| """ | |
| # Convert question to lowercase for easier matching | |
| question_lower = question.lower() | |
| # Handle specific question types | |
| if 'oldest' in question_lower: | |
| return self._find_oldest_item(data, question_lower) | |
| elif 'count' in question_lower or 'how many' in question_lower: | |
| return self._count_items(data, question_lower) | |
| elif 'average' in question_lower or 'mean' in question_lower: | |
| return self._calculate_average(data, question_lower) | |
| elif 'total' in question_lower or 'sum' in question_lower: | |
| return self._calculate_total(data, question_lower) | |
| elif 'maximum' in question_lower or 'highest' in question_lower: | |
| return self._find_maximum(data, question_lower) | |
| elif 'minimum' in question_lower or 'lowest' in question_lower: | |
| return self._find_minimum(data, question_lower) | |
| else: | |
| # Try to extract specific information | |
| return self._extract_specific_info(data, question_lower) | |
| def _find_oldest_item(self, data: Dict[str, pd.DataFrame], question: str) -> str: | |
| """Find the oldest item in the data.""" | |
| # Look for mentions of specific columns or items | |
| year_columns = ['year', 'date', 'time', 'created', 'modified', 'release'] | |
| item_type = None | |
| # Try to extract the type of item we're looking for | |
| item_types = [ | |
| 'movie', 'film', 'book', 'song', 'album', 'game', 'video game', | |
| 'dvd', 'cd', 'blu-ray', 'blu ray', 'record', 'cassette', 'vhs' | |
| ] | |
| for item in item_types: | |
| if item in question: | |
| item_type = item | |
| break | |
| # Iterate through sheets and find the oldest item | |
| oldest_year = float('inf') | |
| oldest_item = None | |
| for sheet_name, df in data.items(): | |
| # Skip empty sheets | |
| if df.empty: | |
| continue | |
| # Try to find year/date columns | |
| year_col = None | |
| for col in df.columns: | |
| if any(year_term in col.lower() for year_term in year_columns): | |
| year_col = col | |
| break | |
| if year_col is None: | |
| # If no obvious year column, look for columns with numeric values | |
| for col in df.columns: | |
| if pd.api.types.is_numeric_dtype(df[col]): | |
| try: | |
| # Check if values might be years (between 1900 and current year) | |
| if df[col].min() >= 1900 and df[col].max() <= 2025: | |
| year_col = col | |
| break | |
| except: | |
| continue | |
| if year_col is not None: | |
| # Find title/name column | |
| title_col = None | |
| title_columns = ['title', 'name', 'item', 'product', 'description'] | |
| for col in df.columns: | |
| if any(title_term in col.lower() for title_term in title_columns): | |
| title_col = col | |
| break | |
| if title_col is None and len(df.columns) > 1: | |
| # If no obvious title column, use the first non-year column | |
| for col in df.columns: | |
| if col != year_col: | |
| title_col = col | |
| break | |
| # Filter by item type if specified | |
| if item_type: | |
| filtered_df = df | |
| # Look for a column that might contain item types | |
| type_col = None | |
| type_columns = ['type', 'category', 'format', 'medium', 'platform'] | |
| for col in df.columns: | |
| if any(type_term in col.lower() for type_term in type_columns): | |
| type_col = col | |
| break | |
| if type_col: | |
| # Filter by item type | |
| filtered_df = df[df[type_col].astype(str).str.lower().str.contains(item_type.lower())] | |
| else: | |
| filtered_df = df | |
| if not filtered_df.empty and title_col: | |
| try: | |
| # Find the row with the minimum year | |
| min_year_idx = filtered_df[year_col].astype(float).idxmin() | |
| min_year = filtered_df.loc[min_year_idx, year_col] | |
| if min_year < oldest_year: | |
| oldest_year = min_year | |
| oldest_item = filtered_df.loc[min_year_idx, title_col] | |
| except: | |
| continue | |
| if oldest_item: | |
| return str(oldest_item) | |
| else: | |
| return "Could not determine the oldest item from the data." | |
| def _count_items(self, data: Dict[str, pd.DataFrame], question: str) -> str: | |
| """Count items matching specific criteria.""" | |
| # Extract conditions from the question | |
| conditions = self._extract_conditions(question) | |
| total_count = 0 | |
| for sheet_name, df in data.items(): | |
| # Skip empty sheets | |
| if df.empty: | |
| continue | |
| # Apply conditions to filter the DataFrame | |
| filtered_df = df | |
| for condition in conditions: | |
| col = condition.get('column') | |
| value = condition.get('value') | |
| operator = condition.get('operator', '=') | |
| if col and value is not None: | |
| # Find the best matching column | |
| best_col = self._find_best_matching_column(df, col) | |
| if best_col: | |
| try: | |
| if operator == '=': | |
| filtered_df = filtered_df[filtered_df[best_col].astype(str).str.lower() == str(value).lower()] | |
| elif operator == '>': | |
| filtered_df = filtered_df[filtered_df[best_col] > value] | |
| elif operator == '<': | |
| filtered_df = filtered_df[filtered_df[best_col] < value] | |
| elif operator == '>=': | |
| filtered_df = filtered_df[filtered_df[best_col] >= value] | |
| elif operator == '<=': | |
| filtered_df = filtered_df[filtered_df[best_col] <= value] | |
| elif operator == 'contains': | |
| filtered_df = filtered_df[filtered_df[best_col].astype(str).str.lower().str.contains(str(value).lower())] | |
| elif operator == 'between': | |
| if isinstance(value, list) and len(value) == 2: | |
| filtered_df = filtered_df[(filtered_df[best_col] >= value[0]) & (filtered_df[best_col] <= value[1])] | |
| except: | |
| continue | |
| # Add the count from this sheet | |
| total_count += len(filtered_df) | |
| return str(total_count) | |
| def _calculate_average(self, data: Dict[str, pd.DataFrame], question: str) -> str: | |
| """Calculate the average of a column.""" | |
| # Extract column name from the question | |
| column_name = self._extract_column_name(question) | |
| if not column_name: | |
| return "Could not determine which column to calculate the average for." | |
| for sheet_name, df in data.items(): | |
| # Skip empty sheets | |
| if df.empty: | |
| continue | |
| # Find the best matching column | |
| best_col = self._find_best_matching_column(df, column_name) | |
| if best_col and pd.api.types.is_numeric_dtype(df[best_col]): | |
| try: | |
| avg_value = df[best_col].mean() | |
| return str(avg_value) | |
| except: | |
| continue | |
| return "Could not calculate the average from the data." | |
| def _calculate_total(self, data: Dict[str, pd.DataFrame], question: str) -> str: | |
| """Calculate the total of a column.""" | |
| # Extract column name from the question | |
| column_name = self._extract_column_name(question) | |
| if not column_name: | |
| return "Could not determine which column to calculate the total for." | |
| for sheet_name, df in data.items(): | |
| # Skip empty sheets | |
| if df.empty: | |
| continue | |
| # Find the best matching column | |
| best_col = self._find_best_matching_column(df, column_name) | |
| if best_col and pd.api.types.is_numeric_dtype(df[best_col]): | |
| try: | |
| total_value = df[best_col].sum() | |
| return str(total_value) | |
| except: | |
| continue | |
| return "Could not calculate the total from the data." | |
| def _find_maximum(self, data: Dict[str, pd.DataFrame], question: str) -> str: | |
| """Find the maximum value in a column.""" | |
| # Extract column name from the question | |
| column_name = self._extract_column_name(question) | |
| if not column_name: | |
| return "Could not determine which column to find the maximum for." | |
| for sheet_name, df in data.items(): | |
| # Skip empty sheets | |
| if df.empty: | |
| continue | |
| # Find the best matching column | |
| best_col = self._find_best_matching_column(df, column_name) | |
| if best_col: | |
| try: | |
| max_value = df[best_col].max() | |
| return str(max_value) | |
| except: | |
| continue | |
| return "Could not find the maximum value from the data." | |
| def _find_minimum(self, data: Dict[str, pd.DataFrame], question: str) -> str: | |
| """Find the minimum value in a column.""" | |
| # Extract column name from the question | |
| column_name = self._extract_column_name(question) | |
| if not column_name: | |
| return "Could not determine which column to find the minimum for." | |
| for sheet_name, df in data.items(): | |
| # Skip empty sheets | |
| if df.empty: | |
| continue | |
| # Find the best matching column | |
| best_col = self._find_best_matching_column(df, column_name) | |
| if best_col: | |
| try: | |
| min_value = df[best_col].min() | |
| return str(min_value) | |
| except: | |
| continue | |
| return "Could not find the minimum value from the data." | |
| def _extract_specific_info(self, data: Dict[str, pd.DataFrame], question: str) -> str: | |
| """Extract specific information from the data.""" | |
| # Try to identify what we're looking for | |
| looking_for = self._extract_looking_for(question) | |
| conditions = self._extract_conditions(question) | |
| for sheet_name, df in data.items(): | |
| # Skip empty sheets | |
| if df.empty: | |
| continue | |
| # Apply conditions to filter the DataFrame | |
| filtered_df = df | |
| for condition in conditions: | |
| col = condition.get('column') | |
| value = condition.get('value') | |
| operator = condition.get('operator', '=') | |
| if col and value is not None: | |
| # Find the best matching column | |
| best_col = self._find_best_matching_column(df, col) | |
| if best_col: | |
| try: | |
| if operator == '=': | |
| filtered_df = filtered_df[filtered_df[best_col].astype(str).str.lower() == str(value).lower()] | |
| elif operator == '>': | |
| filtered_df = filtered_df[filtered_df[best_col] > value] | |
| elif operator == '<': | |
| filtered_df = filtered_df[filtered_df[best_col] < value] | |
| elif operator == '>=': | |
| filtered_df = filtered_df[filtered_df[best_col] >= value] | |
| elif operator == '<=': | |
| filtered_df = filtered_df[filtered_df[best_col] <= value] | |
| elif operator == 'contains': | |
| filtered_df = filtered_df[filtered_df[best_col].astype(str).str.lower().str.contains(str(value).lower())] | |
| elif operator == 'between': | |
| if isinstance(value, list) and len(value) == 2: | |
| filtered_df = filtered_df[(filtered_df[best_col] >= value[0]) & (filtered_df[best_col] <= value[1])] | |
| except: | |
| continue | |
| # If we found matching rows and know what to look for | |
| if not filtered_df.empty and looking_for: | |
| # Find the best matching column for what we're looking for | |
| best_col = self._find_best_matching_column(df, looking_for) | |
| if best_col: | |
| try: | |
| # Return the first value | |
| return str(filtered_df.iloc[0][best_col]) | |
| except: | |
| continue | |
| # If we couldn't extract specific information, return a more general response | |
| if data: | |
| # Return basic info about the first non-empty sheet | |
| for sheet_name, df in data.items(): | |
| if not df.empty: | |
| return f"The sheet contains {len(df)} rows and {len(df.columns)} columns." | |
| return "Could not extract the requested information from the data." | |
| def _extract_conditions(self, question: str) -> List[Dict[str, Any]]: | |
| """Extract conditions from the question.""" | |
| conditions = [] | |
| # Check for "between" conditions | |
| between_pattern = r'(\w+) between (\d+) and (\d+)' | |
| for match in re.finditer(between_pattern, question): | |
| column = match.group(1) | |
| start = int(match.group(2)) | |
| end = int(match.group(3)) | |
| conditions.append({ | |
| 'column': column, | |
| 'operator': 'between', | |
| 'value': [start, end], | |
| }) | |
| # Check for comparison conditions | |
| comparison_pattern = r'(\w+) (>|<|>=|<=|=|equals|equal to|contains) (\w+)' | |
| for match in re.finditer(comparison_pattern, question): | |
| column = match.group(1) | |
| op = match.group(2) | |
| value = match.group(3) | |
| # Convert operator text to symbols | |
| if op == 'equals' or op == 'equal to': | |
| op = '=' | |
| elif op == 'contains': | |
| op = 'contains' | |
| # Try to convert value to number | |
| try: | |
| value = float(value) | |
| except: | |
| pass | |
| conditions.append({ | |
| 'column': column, | |
| 'operator': op, | |
| 'value': value, | |
| }) | |
| # Check for simple equality conditions | |
| equality_pattern = r'(?:with|where) (\w+) (?:is|=) (\w+)' | |
| for match in re.finditer(equality_pattern, question): | |
| column = match.group(1) | |
| value = match.group(2) | |
| # Try to convert value to number | |
| try: | |
| value = float(value) | |
| except: | |
| pass | |
| conditions.append({ | |
| 'column': column, | |
| 'operator': '=', | |
| 'value': value, | |
| }) | |
| return conditions | |
| def _extract_column_name(self, question: str) -> Optional[str]: | |
| """Extract column name from the question.""" | |
| # Check for direct mentions of columns | |
| column_pattern = r'(?:column|field) (?:named|called) ["\']?(\w+)["\']?' | |
| match = re.search(column_pattern, question) | |
| if match: | |
| return match.group(1) | |
| # Look for common column references | |
| common_columns = [ | |
| 'year', 'date', 'time', 'name', 'title', 'price', 'cost', | |
| 'amount', 'quantity', 'total', 'value', 'age', 'rating', | |
| 'score', 'grade', 'salary', 'income', 'revenue', 'profit', | |
| 'loss', 'height', 'weight', 'length', 'width', 'depth', | |
| 'area', 'volume' | |
| ] | |
| for col in common_columns: | |
| if col in question: | |
| return col | |
| return None | |
| def _extract_looking_for(self, question: str) -> Optional[str]: | |
| """Extract what we're looking for from the question.""" | |
| # Check for direct mentions of what we're looking for | |
| looking_for_pattern = r'(?:what is|what are|find|get|return) the (\w+)' | |
| match = re.search(looking_for_pattern, question) | |
| if match: | |
| return match.group(1) | |
| # Look for common things we might be looking for | |
| common_items = [ | |
| 'name', 'title', 'price', 'cost', 'amount', 'quantity', | |
| 'total', 'value', 'age', 'rating', 'score', 'grade', | |
| 'salary', 'income', 'revenue', 'profit', 'loss', | |
| 'height', 'weight', 'length', 'width', 'depth', | |
| 'area', 'volume', 'year', 'date', 'time' | |
| ] | |
| for item in common_items: | |
| if item in question: | |
| return item | |
| return None | |
| def _find_best_matching_column(self, df: pd.DataFrame, column_name: str) -> Optional[str]: | |
| """Find the best matching column in a DataFrame.""" | |
| # Check for exact match | |
| if column_name in df.columns: | |
| return column_name | |
| # Check for case-insensitive match | |
| for col in df.columns: | |
| if col.lower() == column_name.lower(): | |
| return col | |
| # Check for partial match | |
| for col in df.columns: | |
| if column_name.lower() in col.lower(): | |
| return col | |
| return None | |
| def process_csv_data(self, data: pd.DataFrame, question: str) -> str: | |
| """ | |
| Process data extracted from a CSV file. | |
| Args: | |
| data: DataFrame containing the CSV data | |
| question: The question to answer | |
| Returns: | |
| Answer to the question | |
| """ | |
| # Wrap in a dictionary to reuse Excel processing logic | |
| return self.process_excel_data({'Sheet1': data}, question) | |
| def process_text_data(self, data: str, question: str) -> str: | |
| """ | |
| Process data extracted from a text file. | |
| Args: | |
| data: Text content of the file | |
| question: The question to answer | |
| Returns: | |
| Answer to the question | |
| """ | |
| question_lower = question.lower() | |
| # Handle specific question types | |
| if 'count' in question_lower or 'how many' in question_lower: | |
| # Count occurrences of a word or phrase | |
| count_pattern = r'(?:count|how many) (?:occurrences of|instances of|times) ["\']?([^"\']+)["\']?' | |
| match = re.search(count_pattern, question_lower) | |
| if match: | |
| term = match.group(1) | |
| count = data.lower().count(term.lower()) | |
| return str(count) | |
| # Check if the question is asking for a specific line | |
| line_pattern = r'(?:what is|what does|what are|show|return) (?:the|on) (?:line|lines) (\d+)(?:\s*(?:to|-)\s*(\d+))?' | |
| match = re.search(line_pattern, question_lower) | |
| if match: | |
| start_line = int(match.group(1)) | |
| end_line = int(match.group(2)) if match.group(2) else start_line | |
| lines = data.split('\n') | |
| if start_line <= len(lines) and end_line <= len(lines): | |
| return '\n'.join(lines[start_line-1:end_line]) | |
| # Check if the question is asking for a specific paragraph | |
| para_pattern = r'(?:what is|what does|what are|show|return) (?:the|in) paragraph (\d+)(?:\s*(?:to|-)\s*(\d+))?' | |
| match = re.search(para_pattern, question_lower) | |
| if match: | |
| start_para = int(match.group(1)) | |
| end_para = int(match.group(2)) if match.group(2) else start_para | |
| paragraphs = re.split(r'\n\s*\n', data) | |
| if start_para <= len(paragraphs) and end_para <= len(paragraphs): | |
| return '\n\n'.join(paragraphs[start_para-1:end_para]) | |
| # Check for specific information requests | |
| info_pattern = r'(?:what|who|where|when|why|how) (?:is|are|was|were|does|do|did) ([^?]+)' | |
| match = re.search(info_pattern, question_lower) | |
| if match: | |
| info = match.group(1).strip() | |
| # Look for this information in the text | |
| sentences = re.split(r'(?<=[.!?])\s+', data) | |
| for sentence in sentences: | |
| if info.lower() in sentence.lower(): | |
| return sentence.strip() | |
| # If nothing specific was found, return a generic summary | |
| words = data.split() | |
| return f"The text contains {len(words)} words and {len(data.split('. '))} sentences." | |
| def process_pdf_data(self, data: Dict[int, str], question: str) -> str: | |
| """ | |
| Process data extracted from a PDF file. | |
| Args: | |
| data: Dictionary mapping page numbers to text content | |
| question: The question to answer | |
| Returns: | |
| Answer to the question | |
| """ | |
| question_lower = question.lower() | |
| # Check if the question is asking for a specific page | |
| page_pattern = r'(?:what is|what does|what are|show|return) (?:on|in) page (\d+)' | |
| match = re.search(page_pattern, question_lower) | |
| if match: | |
| page_num = int(match.group(1)) | |
| if page_num in data: | |
| return data[page_num] | |
| else: | |
| return f"Page {page_num} not found in the PDF." | |
| # Check if the question is asking for a specific information across all pages | |
| info_pattern = r'(?:what|who|where|when|why|how) (?:is|are|was|were|does|do|did) ([^?]+)' | |
| match = re.search(info_pattern, question_lower) | |
| if match: | |
| info = match.group(1).strip() | |
| # Look for this information in all pages | |
| for page_num, content in data.items(): | |
| sentences = re.split(r'(?<=[.!?])\s+', content) | |
| for sentence in sentences: | |
| if info.lower() in sentence.lower(): | |
| return sentence.strip() | |
| # If nothing specific was found, combine all text and return a summary | |
| all_text = ' '.join(data.values()) | |
| words = all_text.split() | |
| return f"The PDF contains {len(data)} pages and approximately {len(words)} words." | |
| def process_image_metadata(self, metadata: Dict[str, Any], question: str) -> str: | |
| """ | |
| Process metadata extracted from an image file. | |
| Args: | |
| metadata: Dictionary containing image metadata | |
| question: The question to answer | |
| Returns: | |
| Answer to the question | |
| """ | |
| question_lower = question.lower() | |
| # Handle specific question types | |
| if 'format' in question_lower or 'type' in question_lower: | |
| return metadata.get('format', 'Unknown format') | |
| elif 'size' in question_lower or 'resolution' in question_lower: | |
| width = metadata.get('width', 0) | |
| height = metadata.get('height', 0) | |
| return f"{width}x{height}" | |
| elif 'width' in question_lower: | |
| return str(metadata.get('width', 0)) | |
| elif 'height' in question_lower: | |
| return str(metadata.get('height', 0)) | |
| elif 'mode' in question_lower or 'color' in question_lower: | |
| return metadata.get('mode', 'Unknown mode') | |
| elif 'exif' in question_lower: | |
| exif = metadata.get('exif', {}) | |
| if exif: | |
| return str(exif) | |
| else: | |
| return "No EXIF data found." | |
| # If nothing specific was found, return basic information | |
| return f"Image format: {metadata.get('format', 'Unknown')}, Size: {metadata.get('width', 0)}x{metadata.get('height', 0)}, Mode: {metadata.get('mode', 'Unknown')}" | |
| def process_docx_data(self, data: str, question: str) -> str: | |
| """ | |
| Process data extracted from a Word document. | |
| Args: | |
| data: Text content of the document | |
| question: The question to answer | |
| Returns: | |
| Answer to the question | |
| """ | |
| # Similar to text processing | |
| return self.process_text_data(data, question) | |
| def process_pptx_data(self, data: Dict[int, str], question: str) -> str: | |
| """ | |
| Process data extracted from a PowerPoint presentation. | |
| Args: | |
| data: Dictionary mapping slide numbers to text content | |
| question: The question to answer | |
| Returns: | |
| Answer to the question | |
| """ | |
| question_lower = question.lower() | |
| # Check if the question is asking for a specific slide | |
| slide_pattern = r'(?:what is|what does|what are|show|return) (?:on|in) slide (\d+)' | |
| match = re.search(slide_pattern, question_lower) | |
| if match: | |
| slide_num = int(match.group(1)) | |
| if slide_num in data: | |
| return data[slide_num] | |
| else: | |
| return f"Slide {slide_num} not found in the presentation." | |
| # Check if the question is asking for a specific information across all slides | |
| info_pattern = r'(?:what|who|where|when|why|how) (?:is|are|was|were|does|do|did) ([^?]+)' | |
| match = re.search(info_pattern, question_lower) | |
| if match: | |
| info = match.group(1).strip() | |
| # Look for this information in all slides | |
| for slide_num, content in data.items(): | |
| if info.lower() in content.lower(): | |
| return content.strip() | |
| # If nothing specific was found, return a summary | |
| return f"The presentation contains {len(data)} slides." | |
| def process_json_data(self, data: Dict[str, Any], question: str) -> str: | |
| """ | |
| Process data extracted from a JSON file. | |
| Args: | |
| data: Parsed JSON content | |
| question: The question to answer | |
| Returns: | |
| Answer to the question | |
| """ | |
| question_lower = question.lower() | |
| # Check if the question is asking for a specific key | |
| key_pattern = r'(?:what is|what are|show|return) (?:the|in) ["\']?(\w+)["\']?' | |
| match = re.search(key_pattern, question_lower) | |
| if match: | |
| key = match.group(1) | |
| # Look for this key in the JSON | |
| if key in data: | |
| return str(data[key]) | |
| # Look for nested keys | |
| for k, v in data.items(): | |
| if isinstance(v, dict) and key in v: | |
| return str(v[key]) | |
| # If nothing specific was found, return a summary | |
| return f"The JSON contains {len(data)} top-level keys: {', '.join(data.keys())}" | |
| def process_zip_data(self, data: Dict[str, Any], question: str) -> str: | |
| """ | |
| Process data extracted from a ZIP archive. | |
| Args: | |
| data: Dictionary containing information about the archive | |
| question: The question to answer | |
| Returns: | |
| Answer to the question | |
| """ | |
| question_lower = question.lower() | |
| # Handle specific question types | |
| if 'how many' in question_lower or 'count' in question_lower: | |
| if 'files' in question_lower: | |
| return str(len(data.get('files', []))) | |
| # Check if the question is asking for a specific file | |
| file_pattern = r'(?:does it contain|is there) (?:a file named|a file called) ["\']?([^"\']+)["\']?' | |
| match = re.search(file_pattern, question_lower) | |
| if match: | |
| filename = match.group(1) | |
| # Check if the file exists in the archive | |
| for file_info in data.get('files', []): | |
| if filename.lower() in file_info.get('filename', '').lower(): | |
| return f"Yes, the archive contains {file_info['filename']} ({file_info['size']} bytes)" | |
| return f"No, the archive does not contain a file named {filename}." | |
| # If nothing specific was found, return a summary | |
| return f"The ZIP archive contains {len(data.get('files', []))} files." | |
| def process_pdb_data(self, data: Dict[str, Any], question: str) -> str: | |
| """ | |
| Process data extracted from a PDB file. | |
| Args: | |
| data: Dictionary containing information about the PDB file | |
| question: The question to answer | |
| Returns: | |
| Answer to the question | |
| """ | |
| question_lower = question.lower() | |
| # Handle specific question types | |
| if 'title' in question_lower: | |
| return data.get('title', 'No title found.') | |
| elif 'header' in question_lower: | |
| return data.get('header', 'No header found.') | |
| elif 'compound' in question_lower or 'compounds' in question_lower: | |
| compounds = data.get('compounds', []) | |
| if compounds: | |
| return '\n'.join(compounds) | |
| else: | |
| return 'No compounds found.' | |
| elif 'author' in question_lower or 'authors' in question_lower: | |
| authors = data.get('authors', []) | |
| if authors: | |
| return '\n'.join(authors) | |
| else: | |
| return 'No authors found.' | |
| elif 'atoms' in question_lower or 'atom count' in question_lower: | |
| return str(data.get('atoms_count', 0)) | |
| # If nothing specific was found, return a summary | |
| return f"PDB file with title: {data.get('title', 'No title')}, containing {data.get('atoms_count', 0)} atoms." | |
| def process_python_data(self, data: Dict[str, Any], question: str) -> str: | |
| """ | |
| Process data extracted from a Python file. | |
| Args: | |
| data: Dictionary containing information about the Python file | |
| question: The question to answer | |
| Returns: | |
| Answer to the question | |
| """ | |
| question_lower = question.lower() | |
| # Handle specific question types | |
| if 'class' in question_lower or 'classes' in question_lower: | |
| classes = data.get('classes', []) | |
| if classes: | |
| class_names = [c['name'] for c in classes] | |
| return ', '.join(class_names) | |
| else: | |
| return 'No classes found in the file.' | |
| elif 'function' in question_lower or 'functions' in question_lower: | |
| functions = data.get('functions', []) | |
| if functions: | |
| func_names = [f['name'] for f in functions] | |
| return ', '.join(func_names) | |
| else: | |
| return 'No functions found in the file.' | |
| elif 'import' in question_lower or 'imports' in question_lower: | |
| imports = data.get('imports', []) | |
| if imports: | |
| import_strs = [] | |
| for imp in imports: | |
| if imp.get('from'): | |
| import_strs.append(f"from {imp['from']} import {imp['import']}") | |
| else: | |
| import_strs.append(f"import {imp['import']}") | |
| return '\n'.join(import_strs) | |
| else: | |
| return 'No imports found in the file.' | |
| # Check if the question is asking for a specific class or function | |
| class_pattern = r'(?:what is|what does) (?:the class|class) ["\']?(\w+)["\']?' | |
| match = re.search(class_pattern, question_lower) | |
| if match: | |
| class_name = match.group(1) | |
| # Look for this class in the data | |
| for cls in data.get('classes', []): | |
| if cls['name'].lower() == class_name.lower(): | |
| parent = f", inherits from {cls['parent']}" if cls['parent'] else "" | |
| return f"Class {cls['name']}{parent}" | |
| func_pattern = r'(?:what is|what does) (?:the function|function) ["\']?(\w+)["\']?' | |
| match = re.search(func_pattern, question_lower) | |
| if match: | |
| func_name = match.group(1) | |
| # Look for this function in the data | |
| for func in data.get('functions', []): | |
| if func['name'].lower() == func_name.lower(): | |
| return f"Function {func['name']}({func['params']})" | |
| # If nothing specific was found, look for the code of a specific function or class | |
| code_pattern = r'(?:show|return) (?:the code for|code of) (?:the )?(?:function|class) ["\']?(\w+)["\']?' | |
| match = re.search(code_pattern, question_lower) | |
| if match: | |
| entity_name = match.group(1) | |
| content = data.get('content', '') | |
| # Look for the code of this entity | |
| lines = content.split('\n') | |
| entity_lines = [] | |
| in_entity = False | |
| indent = 0 | |
| for i, line in enumerate(lines): | |
| # Check for class or function definition | |
| if re.match(rf'(class|def)\s+{re.escape(entity_name)}\s*\(', line): | |
| in_entity = True | |
| entity_lines.append(line) | |
| indent = len(line) - len(line.lstrip()) | |
| continue | |
| if in_entity: | |
| # Check if we're still in the entity based on indentation | |
| if line.strip() and len(line) - len(line.lstrip()) <= indent: | |
| in_entity = False | |
| else: | |
| entity_lines.append(line) | |
| if entity_lines: | |
| return '\n'.join(entity_lines) | |
| # If nothing specific was found, return a summary | |
| return f"Python file with {len(data.get('classes', []))} classes and {len(data.get('functions', []))} functions." | |
| def process_jsonl_data(self, data: List[Dict[str, Any]], question: str) -> str: | |
| """ | |
| Process data extracted from a JSONL file. | |
| Args: | |
| data: List of parsed JSON objects | |
| question: The question to answer | |
| Returns: | |
| Answer to the question | |
| """ | |
| question_lower = question.lower() | |
| # Handle specific question types | |
| if 'how many' in question_lower or 'count' in question_lower: | |
| return str(len(data)) | |
| # Check if the question is asking for a specific entry | |
| entry_pattern = r'(?:what is|what are|show|return) (?:the|in) entry (\d+)' | |
| match = re.search(entry_pattern, question_lower) | |
| if match: | |
| entry_num = int(match.group(1)) | |
| if 0 <= entry_num < len(data): | |
| return str(data[entry_num]) | |
| else: | |
| return f"Entry {entry_num} not found in the data." | |
| # Check if the question is asking for entries with a specific key-value pair | |
| kv_pattern = r'(?:entries|items) where ["\']?(\w+)["\']? (?:is|=|equals|contains) ["\']?([^"\']+)["\']?' | |
| match = re.search(kv_pattern, question_lower) | |
| if match: | |
| key = match.group(1) | |
| value = match.group(2) | |
| # Find entries matching the criteria | |
| matching_entries = [] | |
| for entry in data: | |
| if key in entry and str(entry[key]).lower() == value.lower(): | |
| matching_entries.append(entry) | |
| if matching_entries: | |
| return str(matching_entries) | |
| else: | |
| return f"No entries found where {key} = {value}." | |
| # If nothing specific was found, return a summary | |
| if data and isinstance(data[0], dict): | |
| keys = list(data[0].keys()) | |
| return f"The data contains {len(data)} entries with keys: {', '.join(keys)}" | |
| else: | |
| return f"The data contains {len(data)} entries." | |