File size: 3,981 Bytes
c9ab476
 
 
 
f844f16
fe36046
 
 
 
 
 
 
 
f844f16
 
 
fe36046
 
 
 
c9ab476
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f844f16
 
fe36046
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f844f16
 
fe36046
f844f16
 
 
 
 
fe36046
 
 
 
 
 
 
 
 
 
 
 
f844f16
 
fe36046
 
 
c9ab476
 
 
f844f16
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import os
import sys
import tempfile
import requests
import asyncio
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Add the current directory to Python path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))

# Import the latest agent system
from langgraph_agent_system import run_agent_system
from observability import flush_traces, shutdown_observability

# Default API URL and Task ID - Using the same URL as the original basic_agent.py
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
DEFAULT_TASK_ID = "f918266a-b3e0-4914-865d-4faa564f1aef"

def fetch_question_by_id(task_id: str, api_base: str = DEFAULT_API_URL):
    """Return JSON of a question for a given task_id.

    The scoring API does not (yet) expose an explicit /question/{id} endpoint,
    so we fetch the full /questions list and filter locally. This works fine
    because the list is small (<100 items).
    """
    try:
        resp = requests.get(f"{api_base}/questions", timeout=30)
        resp.raise_for_status()
        questions = resp.json()
    except Exception as e:
        raise RuntimeError(f"Failed to fetch questions list: {e}") from e

    for q in questions:
        if str(q.get("task_id")) == str(task_id):
            return q

    raise ValueError(f"Task ID {task_id} not found in /questions list.")


def maybe_download_file(task_id: str, api_base: str = DEFAULT_API_URL) -> str | None:
    """Try to download the file associated with a given task id. Returns local path or None."""
    url = f"{api_base}/files/{task_id}"
    try:
        resp = requests.get(url, timeout=60)
        if resp.status_code != 200:
            print(f"No file associated with task {task_id} (status {resp.status_code}).")
            return None
        # Create temp file with same name from headers if available
        filename = resp.headers.get("content-disposition", "").split("filename=")[-1].strip("\"") or f"{task_id}_attachment"
        tmp_path = os.path.join(tempfile.gettempdir(), filename)
        with open(tmp_path, "wb") as f:
            f.write(resp.content)
        print(f"Downloaded attachment to {tmp_path}")
        return tmp_path
    except requests.HTTPError as e:
        print(f"Could not download file for task {task_id}: {e}")
    except Exception as e:
        print(f"Error downloading file: {e}")
    return None


async def main():
    print("Specific Agent Test - Latest LangGraph Multi-Agent System")
    print("=" * 60)
    
    try:
        # Determine the task ID (CLI arg > env var > default)
        task_id = (
            sys.argv[1] if len(sys.argv) > 1 else os.environ.get("TASK_ID", DEFAULT_TASK_ID)
        )
        print(f"Using task ID: {task_id}")

        # Fetch specific question
        q = fetch_question_by_id(task_id)
        question_text = q["question"]

        print("\n=== Specific Question ===")
        print(f"Task ID : {task_id}")
        print(f"Question: {question_text}")

        # Attempt to get attachment if any
        attachment_path = maybe_download_file(task_id)
        if attachment_path:
            question_text += f"\n\nAttachment available at: {attachment_path}"

        # Run the latest agent system
        print("\n=== Running Latest LangGraph Multi-Agent System ===")
        
        result = await run_agent_system(
            query=question_text,
            user_id="test_user",
            session_id=f"session_{task_id}"
        )
        
        print("\n=== Agent Answer ===")
        print(result)
        
    except Exception as e:
        print(f"Error in main execution: {e}")
        import traceback
        traceback.print_exc()
    
    finally:
        # Cleanup
        try:
            flush_traces(background=False)
            shutdown_observability()
            print("\n✅ Agent cleanup completed")
        except Exception as e:
            print(f"⚠️ Cleanup warning: {e}")


if __name__ == "__main__":
    asyncio.run(main())