File size: 3,773 Bytes
33a40c7
 
 
4f7d0fd
33a40c7
 
 
 
 
 
 
 
4f7d0fd
 
33a40c7
 
 
 
 
 
 
 
 
 
 
 
4f7d0fd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33a40c7
4f7d0fd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33a40c7
4f7d0fd
 
33a40c7
4f7d0fd
 
33a40c7
4f7d0fd
 
 
 
33a40c7
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import os, re, io, subprocess, requests, pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

def _parse_corpus(text):
    items = []
    pat = re.compile(r'Question\s*:\s*(.*?)\n\s*Final answer\s*:\s*(.*?)(?="\n|"$)', re.S)
    for m in pat.finditer(text):
        q = m.group(1).strip().strip('"')
        a = m.group(2).strip().strip('"')
        items.append((q, a))
    return items

class BasicAgent:
    def __init__(self, api_url: str, corpus_path: str | None = None):
        self.api_url = api_url.rstrip("/")
        path = corpus_path or os.getenv("CORPUS_PATH", "corpus.txt")
        with open(path, "r", encoding="utf-8") as f:
            txt = f.read()
        qa = _parse_corpus(txt)
        if not qa:
            raise ValueError("Corpus empty or malformed")
        self.questions = [q for q, _ in qa]
        self.answers = [a for _, a in qa]
        self.vec = TfidfVectorizer(ngram_range=(1, 2), stop_words="english", min_df=1)
        self.mat = self.vec.fit_transform([q.lower() for q in self.questions])

    def _fetch_files(self, task_id: str):
        try:
            r = requests.get(f"{self.api_url}/files/{task_id}", timeout=30)
            r.raise_for_status()
            data = r.json()
            if isinstance(data, dict) and "files" in data:
                return data["files"]
            if isinstance(data, dict) and "file_url" in data:
                return [data]
            return []
        except Exception:
            return []

    def _solve_with_files(self, task_id: str):
        files = self._fetch_files(task_id)
        for f in files:
            url = f.get("file_url") or f.get("url") or ""
            name = (f.get("filename") or f.get("name") or "").lower()
            if not url:
                continue
            try:
                data = requests.get(url, timeout=60).content
            except Exception:
                continue
            if name.endswith((".xlsx", ".xls")):
                try:
                    df = pd.read_excel(io.BytesIO(data))
                    if "Category" in df.columns:
                        food = df[df["Category"].astype(str).str.lower().eq("food")]
                        if "Sales" in food.columns:
                            total = float(food["Sales"].sum())
                        else:
                            total = float(food.select_dtypes(include="number").sum().sum())
                        return f"{total:.2f}"
                    scols = df.select_dtypes(include="number")
                    total = float(scols.sum().sum())
                    return f"{total:.2f}"
                except Exception:
                    pass
            if name.endswith(".py"):
                try:
                    p = subprocess.run(["python", "-"], input=data, capture_output=True, text=True, timeout=10)
                    out = (p.stdout or "").strip()
                    if out:
                        return out.splitlines()[-1].strip().strip('"').strip("'")
                except Exception:
                    pass
            if name.endswith((".mp3", ".wav", ".m4a", ".flac", ".png", ".jpg", ".jpeg", ".gif", ".webp", ".pdf", ".txt", ".csv", ".json")):
                return ""
        return None

    def __call__(self, question: str, task_id: str | None = None) -> str:
        if not question:
            return ""
        qv = self.vec.transform([question.lower()])
        sims = cosine_similarity(qv, self.mat)[0]
        idx = int(sims.argmax())
        ans = self.answers[idx] if sims[idx] > 0 else ""
        if ans or not task_id:
            return ans
        f = self._solve_with_files(task_id)
        return f if f is not None else ""