File size: 10,293 Bytes
1560d7e 515e162 1560d7e 515e162 1560d7e ce927b1 1560d7e 515e162 1560d7e 515e162 1560d7e 515e162 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 | import requests
import os
import tempfile
import requests
import json
import re
from pathlib import Path
from typing import Optional, Tuple
from langchain_openai import ChatOpenAI
from langchain_deepseek import ChatDeepSeek
from openai import OpenAI
current_dir = Path(__file__).parent.absolute()
env_path = current_dir / ".env"
# read .config file
with open('.config', 'r') as f:
config = json.load(f)
BASE_URL = config['BASE_URL']
DEBUG_MODE = config['DEBUG_MODE']
def check_api_keys():
"""Check for the presence of required API keys."""
required_keys = ['OPENAI_API_KEY', 'DEEPSEEK_API_KEY', 'TAVILY_API_KEY']
missing_keys = [key for key in required_keys if not os.environ.get(key)]
if missing_keys:
return False
else:
return True
def setup_llm():
"""
Setup the LLMs for the agent.
"""
llm_agent_management = ChatDeepSeek(model="deepseek-chat", temperature=0)
llm_question_decomposition = ChatDeepSeek(model="deepseek-chat", temperature=0) # "deepseek-chat" / "deepseek-reasoner"
# llm_question_analysis = ChatAnthropic(model="claude-3-7-sonnet-20250219", temperature=0)
# llm_question_analysis = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0)
llm_tool_use = ChatDeepSeek(model="deepseek-chat", temperature=0)
llm_vision = ChatOpenAI(model="gpt-4o", temperature=0) # gemini-2.0-flash
# llm_vision = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0)
openai_client = OpenAI()
return llm_agent_management, llm_question_decomposition, llm_tool_use, llm_vision, openai_client
"""
def determine_file_type(file_data: bytes) -> str:
try:
magika = Magika()
result = magika.identify_bytes(file_data)
# Ensure the extension starts with a dot
label = result.output.label
if label:
return f".{label}" if not label.startswith('.') else label
else:
return ".bin" # Default binary extension
except Exception as e:
print(f"File type identification failed: {str(e)}")
return ".unknown"
"""
def download_and_save_task_file(task_id: str, original_filename: str) -> Optional[str]:
"""
Downloads a file associated with a task_id, uses the extension from
original_filename, and saves it to a temporary directory.
The saved filename will be task_id + extension_from_original_filename.
Args:
task_id: The ID of the task to download the file for.
original_filename: The original filename from the task metadata.
The extension from this name will be used.
Returns:
The full path to the saved temporary file, or None if any step fails.
The path to the file can be used as an input for the tools.
"""
try:
# 1. Download the file data
url = f"{BASE_URL}/files/{task_id}"
file_response = requests.get(url, timeout=20)
file_response.raise_for_status()
file_data = file_response.content
if not file_data:
print(f"No file data downloaded for task {task_id}")
return None
print(f"Downloaded associated file for task {task_id}")
# 2. Determine the file extension solely from original_filename
chosen_extension = ""
if original_filename and isinstance(original_filename, str):
name, ext = os.path.splitext(original_filename)
if ext and ext != ".": # Check if extension from original filename is valid
chosen_extension = ext
else:
print(f"Warning: No valid extension found in original_filename ('{original_filename}') for task {task_id}. File will be saved without an extension in its name if task_id part also lacks one.")
else:
print(f"Warning: original_filename was not a valid string for task {task_id}. File may be saved without a proper extension.")
# Ensure chosen_extension starts with a dot if it's not empty and doesn't already
if chosen_extension and not chosen_extension.startswith('.'):
chosen_extension = '.' + chosen_extension
# If chosen_extension is still empty here, the file will be saved as 'task_id' (no explicit extension part added)
# 3. Construct temporary file path
temp_dir = tempfile.gettempdir()
# The filename is task_id + the derived extension.
temp_file_name = f"{task_id}{chosen_extension}"
temp_file_path = os.path.join(temp_dir, temp_file_name)
# 4. Save the file
with open(temp_file_path, 'wb') as f:
f.write(file_data)
print(f"Saved remote file for task {task_id} to {temp_file_path}")
return temp_file_path
except requests.RequestException as e:
print(f"Error downloading file for task {task_id}: {str(e)}")
return None
except Exception as e: # Catch other potential errors like issues with os.path.splitext if original_filename is weird
print(f"Error processing or saving file for task {task_id}: {str(e)}")
return None
def cleanup_temp_files(temp_file_path) -> None:
""" Clean up temporary files created during processing. """
try:
# To be safer, ensure temp_file_path is indeed a Path object if Path.unlink() is to be used.
# Or, if it's a string, os.remove(temp_file_path) is fine.
# Assuming os.path.exists and os.remove for string paths as per original.
if isinstance(temp_file_path, str) and temp_file_path.startswith(tempfile.gettempdir()) and os.path.exists(temp_file_path):
os.remove(temp_file_path)
print(f"Cleaned up temporary file: {temp_file_path}")
elif isinstance(temp_file_path, Path) and str(temp_file_path).startswith(tempfile.gettempdir()) and temp_file_path.exists():
temp_file_path.unlink()
print(f"Cleaned up temporary file: {temp_file_path}")
except Exception as e:
print(f"Error cleaning up temp file {temp_file_path}: {str(e)}")
def process_file_for_task_v2(task_id: str, question_text: str, api_url: str) -> Tuple[str, Optional[Path]]:
"""
Attempts to download a file for a task and appends its path to the question.
Returns: (potentially modified question_text, path_to_downloaded_file or None)
"""
file_download_url = f"{api_url}/files/{task_id}"
print(f"Attempting to download file for task {task_id} from {file_download_url}")
local_file_path = None
try:
response = requests.get(file_download_url, timeout=30)
if response.status_code == 404:
print(f"No file found for task {task_id} (404). Proceeding without file.")
return question_text, None
response.raise_for_status() # Raise an exception for other bad status codes (4xx, 5xx)
except requests.exceptions.RequestException as exc:
print(f"Error downloading file for task {task_id}: {exc}. Proceeding without file.")
return question_text, None
# Determine filename from 'Content-Disposition' header
content_disposition = response.headers.get("content-disposition", "")
# Adjusted regex to be more robust for quoted and unquoted filenames
filename_match = re.search(r'filename="?([^"]+)"?', content_disposition)
filename_from_header = ""
if filename_match:
filename_from_header = filename_match.group(1)
# Sanitize and ensure filename is not empty
if filename_from_header:
# A more robust sanitization might be needed depending on expected filenames
# For now, replace non-alphanumeric (excluding ., _, -) with _
filename = "".join(c if c.isalnum() or c in ('.', '_', '-') else '_' for c in filename_from_header).strip()
if not filename: # If sanitization results in empty string or just spaces
print(f"Warning: Sanitized filename from header for task {task_id} is empty. Using task_id as filename base.")
filename = task_id
else:
print(f"Could not determine filename from Content-Disposition for task {task_id}. Using task_id as filename base.")
filename = task_id
# Ensure a reasonable default extension if none is apparent
if '.' not in Path(filename).suffix: # Check if there's an extension part
content_type = response.headers.get('Content-Type', '').split(';')[0].strip() # Get MIME type part
extension = ""
if content_type == 'image/jpeg': extension = '.jpg'
elif content_type == 'image/png': extension = '.png'
elif content_type == 'application/pdf': extension = '.pdf'
elif content_type == 'text/plain': extension = '.txt'
elif content_type == 'application/json': extension = '.json'
elif content_type == 'text/csv': extension = '.csv'
# Add more mime-type to extension mappings as needed
if extension:
filename += extension
else:
print(f"Warning: Could not determine extension for task {task_id} from Content-Type '{content_type}'. Using '.dat'.")
filename += '.dat' # Generic data extension if type is unknown or unmapped
temp_storage_dir = Path(tempfile.gettempdir()) / "hf_space_agent_files"
temp_storage_dir.mkdir(parents=True, exist_ok=True)
local_file_path = temp_storage_dir / Path(filename).name # Use Path(filename).name to ensure it's just the filename part
try:
with open(local_file_path, 'wb') as f:
f.write(response.content)
print(f"File for task {task_id} saved to: {local_file_path}")
amended_question = (
f"{question_text}\n\n"
f"--- Technical Information ---\n"
f"A file relevant to this task was downloaded and is available to your tools at the following local path. "
f"Your tools that can read local files (like read_file, extract_text_from_image, etc.) should use this path:\n"
f"Local file path: {str(local_file_path)}\n"
f"--- End Technical Information ---\n\n"
)
return amended_question, local_file_path
except IOError as e:
print(f"Error saving file {local_file_path} for task {task_id}: {e}")
return question_text, None # Saving failed
|