Spaces:
Sleeping
Sleeping
| """ | |
| Utility functions for working with different file formats in the resources directory | |
| """ | |
| import os | |
| import json | |
| import pandas as pd | |
| from typing import Dict, Any, Union, List, Optional | |
| import logging | |
| from PIL import Image | |
| import base64 | |
| from io import BytesIO | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Constants | |
| RESOURCE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "resource") | |
| def list_resources() -> List[str]: | |
| """List all files in the resources directory""" | |
| try: | |
| return [f for f in os.listdir(RESOURCE_DIR) if os.path.isfile(os.path.join(RESOURCE_DIR, f))] | |
| except Exception as e: | |
| logger.error(f"Error listing resources: {e}") | |
| return [] | |
| def load_excel(file_path: str) -> Union[pd.DataFrame, None]: | |
| """Load data from an Excel file""" | |
| try: | |
| return pd.read_excel(file_path) | |
| except Exception as e: | |
| logger.error(f"Error reading Excel file {file_path}: {e}") | |
| return None | |
| def load_csv(file_path: str) -> Union[pd.DataFrame, None]: | |
| """Load data from a CSV file""" | |
| try: | |
| return pd.read_csv(file_path) | |
| except Exception as e: | |
| logger.error(f"Error reading CSV file {file_path}: {e}") | |
| return None | |
| def load_text(file_path: str) -> Union[str, None]: | |
| """Load content from a text file""" | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| return f.read() | |
| except Exception as e: | |
| logger.error(f"Error reading text file {file_path}: {e}") | |
| return None | |
| def load_json(file_path: str) -> Union[Dict, List, None]: | |
| """Load data from a JSON file""" | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| except Exception as e: | |
| logger.error(f"Error reading JSON file {file_path}: {e}") | |
| return None | |
| def load_image(file_path: str) -> Union[str, None]: | |
| """Load an image file and return base64 representation""" | |
| try: | |
| with Image.open(file_path) as img: | |
| buffered = BytesIO() | |
| img.save(buffered, format=img.format) | |
| img_str = base64.b64encode(buffered.getvalue()).decode() | |
| return f"data:image/{img.format.lower()};base64,{img_str}" | |
| except Exception as e: | |
| logger.error(f"Error reading image file {file_path}: {e}") | |
| return None | |
| def get_file_handler(file_path: str) -> Union[Any, None]: | |
| """Get the appropriate handler for a file based on its extension""" | |
| if not os.path.exists(file_path): | |
| logger.error(f"File not found: {file_path}") | |
| return None | |
| ext = os.path.splitext(file_path)[1].lower() | |
| if ext in ['.xlsx', '.xls']: | |
| return load_excel(file_path) | |
| elif ext == '.csv': | |
| return load_csv(file_path) | |
| elif ext in ['.txt', '.md', '.py']: | |
| return load_text(file_path) | |
| elif ext in ['.json', '.jsonld']: | |
| return load_json(file_path) | |
| elif ext in ['.jpg', '.jpeg', '.png', '.gif']: | |
| return load_image(file_path) | |
| else: | |
| logger.warning(f"No handler for file type {ext}") | |
| return None | |
| def search_metadata_by_question(question: str) -> List[Dict]: | |
| """ | |
| Search the metadata.jsonl file for entries that match a given question | |
| """ | |
| results = [] | |
| metadata_path = os.path.join(RESOURCE_DIR, "metadata.jsonl") | |
| try: | |
| with open(metadata_path, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| data = json.loads(line) | |
| metadata_question = data.get('Question', '').lower() | |
| # Check for question match | |
| if question.lower() in metadata_question or metadata_question in question.lower(): | |
| results.append(data) | |
| # Check if this is a file-based question | |
| if 'attached' in question.lower() or 'spreadsheet' in question.lower(): | |
| if data.get('file_name'): | |
| results.append(data) | |
| except Exception as e: | |
| logger.error(f"Error searching metadata: {e}") | |
| return results | |
| def get_metadata_answer(task_id: str) -> str: | |
| """Get the answer for a specific task ID from metadata""" | |
| metadata_path = os.path.join(RESOURCE_DIR, "metadata.jsonl") | |
| try: | |
| with open(metadata_path, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| data = json.loads(line) | |
| if data.get('task_id') == task_id: | |
| return data.get('Final answer', '') | |
| except Exception as e: | |
| logger.error(f"Error getting metadata answer: {e}") | |
| return "" | |