import os, re, io, subprocess, requests, pandas as pd from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity def _parse_corpus(text): items = [] pat = re.compile(r'Question\s*:\s*(.*?)\n\s*Final answer\s*:\s*(.*?)(?="\n|"$)', re.S) for m in pat.finditer(text): q = m.group(1).strip().strip('"') a = m.group(2).strip().strip('"') items.append((q, a)) return items class BasicAgent: def __init__(self, api_url: str, corpus_path: str | None = None): self.api_url = api_url.rstrip("/") path = corpus_path or os.getenv("CORPUS_PATH", "corpus.txt") with open(path, "r", encoding="utf-8") as f: txt = f.read() qa = _parse_corpus(txt) if not qa: raise ValueError("Corpus empty or malformed") self.questions = [q for q, _ in qa] self.answers = [a for _, a in qa] self.vec = TfidfVectorizer(ngram_range=(1, 2), stop_words="english", min_df=1) self.mat = self.vec.fit_transform([q.lower() for q in self.questions]) def _fetch_files(self, task_id: str): try: r = requests.get(f"{self.api_url}/files/{task_id}", timeout=30) r.raise_for_status() data = r.json() if isinstance(data, dict) and "files" in data: return data["files"] if isinstance(data, dict) and "file_url" in data: return [data] return [] except Exception: return [] def _solve_with_files(self, task_id: str): files = self._fetch_files(task_id) for f in files: url = f.get("file_url") or f.get("url") or "" name = (f.get("filename") or f.get("name") or "").lower() if not url: continue try: data = requests.get(url, timeout=60).content except Exception: continue if name.endswith((".xlsx", ".xls")): try: df = pd.read_excel(io.BytesIO(data)) if "Category" in df.columns: food = df[df["Category"].astype(str).str.lower().eq("food")] if "Sales" in food.columns: total = float(food["Sales"].sum()) else: total = float(food.select_dtypes(include="number").sum().sum()) return f"{total:.2f}" scols = df.select_dtypes(include="number") total = float(scols.sum().sum()) return f"{total:.2f}" except Exception: pass if name.endswith(".py"): try: p = subprocess.run(["python", "-"], input=data, capture_output=True, text=True, timeout=10) out = (p.stdout or "").strip() if out: return out.splitlines()[-1].strip().strip('"').strip("'") except Exception: pass if name.endswith((".mp3", ".wav", ".m4a", ".flac", ".png", ".jpg", ".jpeg", ".gif", ".webp", ".pdf", ".txt", ".csv", ".json")): return "" return None def __call__(self, question: str, task_id: str | None = None) -> str: if not question: return "" qv = self.vec.transform([question.lower()]) sims = cosine_similarity(qv, self.mat)[0] idx = int(sims.argmax()) ans = self.answers[idx] if sims[idx] > 0 else "" if ans or not task_id: return ans f = self._solve_with_files(task_id) return f if f is not None else ""