File size: 10,293 Bytes
1560d7e
 
 
 
 
515e162
1560d7e
515e162
1560d7e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ce927b1
1560d7e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
515e162
 
 
 
1560d7e
 
515e162
 
 
1560d7e
 
 
515e162
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
import requests
import os
import tempfile
import requests
import json
import re
from pathlib import Path
from typing import Optional, Tuple

from langchain_openai import ChatOpenAI
from langchain_deepseek import ChatDeepSeek
from openai import OpenAI

current_dir = Path(__file__).parent.absolute()
env_path = current_dir / ".env"

# read .config file
with open('.config', 'r') as f:
    config = json.load(f)

BASE_URL = config['BASE_URL']
DEBUG_MODE = config['DEBUG_MODE']

def check_api_keys():
    """Check for the presence of required API keys."""
    required_keys = ['OPENAI_API_KEY', 'DEEPSEEK_API_KEY', 'TAVILY_API_KEY']
    missing_keys = [key for key in required_keys if not os.environ.get(key)]

    if missing_keys:
        return False
    else:
        return True

def setup_llm():
    """
    Setup the LLMs for the agent.
    """
    llm_agent_management = ChatDeepSeek(model="deepseek-chat", temperature=0)
    llm_question_decomposition = ChatDeepSeek(model="deepseek-chat", temperature=0)  # "deepseek-chat" / "deepseek-reasoner"
    # llm_question_analysis = ChatAnthropic(model="claude-3-7-sonnet-20250219", temperature=0)
    # llm_question_analysis = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0)
    llm_tool_use = ChatDeepSeek(model="deepseek-chat", temperature=0)
    llm_vision = ChatOpenAI(model="gpt-4o", temperature=0)  # gemini-2.0-flash
    # llm_vision = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0)
    openai_client = OpenAI()
    return llm_agent_management, llm_question_decomposition, llm_tool_use, llm_vision, openai_client
"""
def determine_file_type(file_data: bytes) -> str:
    try:
        magika = Magika()
        result = magika.identify_bytes(file_data)
        # Ensure the extension starts with a dot
        label = result.output.label
        if label:
            return f".{label}" if not label.startswith('.') else label
        else:
            return ".bin"  # Default binary extension
    except Exception as e:
        print(f"File type identification failed: {str(e)}")
        return ".unknown"
"""
def download_and_save_task_file(task_id: str, original_filename: str) -> Optional[str]:
    """
    Downloads a file associated with a task_id, uses the extension from
    original_filename, and saves it to a temporary directory.
    The saved filename will be task_id + extension_from_original_filename.

    Args:
        task_id: The ID of the task to download the file for.
        original_filename: The original filename from the task metadata.
                           The extension from this name will be used.

    Returns:
        The full path to the saved temporary file, or None if any step fails.
        The path to the file can be used as an input for the tools.
    """
    try:
        # 1. Download the file data
        url = f"{BASE_URL}/files/{task_id}"
        file_response = requests.get(url, timeout=20)
        file_response.raise_for_status()
        file_data = file_response.content
        if not file_data:
            print(f"No file data downloaded for task {task_id}")
            return None
        print(f"Downloaded associated file for task {task_id}")

        # 2. Determine the file extension solely from original_filename
        chosen_extension = ""
        if original_filename and isinstance(original_filename, str):
            name, ext = os.path.splitext(original_filename)
            if ext and ext != ".":  # Check if extension from original filename is valid
                chosen_extension = ext
            else:
                print(f"Warning: No valid extension found in original_filename ('{original_filename}') for task {task_id}. File will be saved without an extension in its name if task_id part also lacks one.")
        else:
            print(f"Warning: original_filename was not a valid string for task {task_id}. File may be saved without a proper extension.")
            
        # Ensure chosen_extension starts with a dot if it's not empty and doesn't already
        if chosen_extension and not chosen_extension.startswith('.'):
            chosen_extension = '.' + chosen_extension
        # If chosen_extension is still empty here, the file will be saved as 'task_id' (no explicit extension part added)

        # 3. Construct temporary file path
        temp_dir = tempfile.gettempdir()
        # The filename is task_id + the derived extension.
        temp_file_name = f"{task_id}{chosen_extension}"
        temp_file_path = os.path.join(temp_dir, temp_file_name)

        # 4. Save the file
        with open(temp_file_path, 'wb') as f:
            f.write(file_data)
        print(f"Saved remote file for task {task_id} to {temp_file_path}")
        return temp_file_path

    except requests.RequestException as e:
        print(f"Error downloading file for task {task_id}: {str(e)}")
        return None
    except Exception as e: # Catch other potential errors like issues with os.path.splitext if original_filename is weird
        print(f"Error processing or saving file for task {task_id}: {str(e)}")
        return None

def cleanup_temp_files(temp_file_path) -> None:
    """ Clean up temporary files created during processing. """
    try:
        # To be safer, ensure temp_file_path is indeed a Path object if Path.unlink() is to be used.
        # Or, if it's a string, os.remove(temp_file_path) is fine.
        # Assuming os.path.exists and os.remove for string paths as per original.
        if isinstance(temp_file_path, str) and temp_file_path.startswith(tempfile.gettempdir()) and os.path.exists(temp_file_path):
            os.remove(temp_file_path)
            print(f"Cleaned up temporary file: {temp_file_path}")
        elif isinstance(temp_file_path, Path) and str(temp_file_path).startswith(tempfile.gettempdir()) and temp_file_path.exists():
            temp_file_path.unlink()
            print(f"Cleaned up temporary file: {temp_file_path}")
    except Exception as e:
        print(f"Error cleaning up temp file {temp_file_path}: {str(e)}")

def process_file_for_task_v2(task_id: str, question_text: str, api_url: str) -> Tuple[str, Optional[Path]]:
    """
    Attempts to download a file for a task and appends its path to the question.
    Returns: (potentially modified question_text, path_to_downloaded_file or None)
    """
    file_download_url = f"{api_url}/files/{task_id}" 
    print(f"Attempting to download file for task {task_id} from {file_download_url}")
    local_file_path = None 

    try:
        response = requests.get(file_download_url, timeout=30)
        if response.status_code == 404:
            print(f"No file found for task {task_id} (404). Proceeding without file.")
            return question_text, None
        response.raise_for_status() # Raise an exception for other bad status codes (4xx, 5xx)
    except requests.exceptions.RequestException as exc:
        print(f"Error downloading file for task {task_id}: {exc}. Proceeding without file.")
        return question_text, None

    # Determine filename from 'Content-Disposition' header
    content_disposition = response.headers.get("content-disposition", "")
    # Adjusted regex to be more robust for quoted and unquoted filenames
    filename_match = re.search(r'filename="?([^"]+)"?', content_disposition) 
    
    filename_from_header = ""
    if filename_match:
        filename_from_header = filename_match.group(1)
    
    # Sanitize and ensure filename is not empty
    if filename_from_header:
        # A more robust sanitization might be needed depending on expected filenames
        # For now, replace non-alphanumeric (excluding ., _, -) with _
        filename = "".join(c if c.isalnum() or c in ('.', '_', '-') else '_' for c in filename_from_header).strip()
        if not filename: # If sanitization results in empty string or just spaces
            print(f"Warning: Sanitized filename from header for task {task_id} is empty. Using task_id as filename base.")
            filename = task_id
    else:
        print(f"Could not determine filename from Content-Disposition for task {task_id}. Using task_id as filename base.")
        filename = task_id

    # Ensure a reasonable default extension if none is apparent
    if '.' not in Path(filename).suffix: # Check if there's an extension part
        content_type = response.headers.get('Content-Type', '').split(';')[0].strip() # Get MIME type part
        extension = ""
        if content_type == 'image/jpeg': extension = '.jpg'
        elif content_type == 'image/png': extension = '.png'
        elif content_type == 'application/pdf': extension = '.pdf'
        elif content_type == 'text/plain': extension = '.txt'
        elif content_type == 'application/json': extension = '.json'
        elif content_type == 'text/csv': extension = '.csv'
        # Add more mime-type to extension mappings as needed
        
        if extension:
            filename += extension
        else:
            print(f"Warning: Could not determine extension for task {task_id} from Content-Type '{content_type}'. Using '.dat'.")
            filename += '.dat' # Generic data extension if type is unknown or unmapped

    temp_storage_dir = Path(tempfile.gettempdir()) / "hf_space_agent_files"
    temp_storage_dir.mkdir(parents=True, exist_ok=True)
    local_file_path = temp_storage_dir / Path(filename).name # Use Path(filename).name to ensure it's just the filename part

    try:
        with open(local_file_path, 'wb') as f:
            f.write(response.content)
        print(f"File for task {task_id} saved to: {local_file_path}")
        amended_question = (
            f"{question_text}\n\n"
            f"--- Technical Information ---\n"
            f"A file relevant to this task was downloaded and is available to your tools at the following local path. "
            f"Your tools that can read local files (like read_file, extract_text_from_image, etc.) should use this path:\n"
            f"Local file path: {str(local_file_path)}\n"
            f"--- End Technical Information ---\n\n"
        )
        return amended_question, local_file_path
    except IOError as e:
        print(f"Error saving file {local_file_path} for task {task_id}: {e}")
        return question_text, None # Saving failed