mnosouhi96's picture
add rag
33a40c7
import os, re, io, subprocess, requests, pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
def _parse_corpus(text):
items = []
pat = re.compile(r'Question\s*:\s*(.*?)\n\s*Final answer\s*:\s*(.*?)(?="\n|"$)', re.S)
for m in pat.finditer(text):
q = m.group(1).strip().strip('"')
a = m.group(2).strip().strip('"')
items.append((q, a))
return items
class BasicAgent:
def __init__(self, api_url: str, corpus_path: str | None = None):
self.api_url = api_url.rstrip("/")
path = corpus_path or os.getenv("CORPUS_PATH", "corpus.txt")
with open(path, "r", encoding="utf-8") as f:
txt = f.read()
qa = _parse_corpus(txt)
if not qa:
raise ValueError("Corpus empty or malformed")
self.questions = [q for q, _ in qa]
self.answers = [a for _, a in qa]
self.vec = TfidfVectorizer(ngram_range=(1, 2), stop_words="english", min_df=1)
self.mat = self.vec.fit_transform([q.lower() for q in self.questions])
def _fetch_files(self, task_id: str):
try:
r = requests.get(f"{self.api_url}/files/{task_id}", timeout=30)
r.raise_for_status()
data = r.json()
if isinstance(data, dict) and "files" in data:
return data["files"]
if isinstance(data, dict) and "file_url" in data:
return [data]
return []
except Exception:
return []
def _solve_with_files(self, task_id: str):
files = self._fetch_files(task_id)
for f in files:
url = f.get("file_url") or f.get("url") or ""
name = (f.get("filename") or f.get("name") or "").lower()
if not url:
continue
try:
data = requests.get(url, timeout=60).content
except Exception:
continue
if name.endswith((".xlsx", ".xls")):
try:
df = pd.read_excel(io.BytesIO(data))
if "Category" in df.columns:
food = df[df["Category"].astype(str).str.lower().eq("food")]
if "Sales" in food.columns:
total = float(food["Sales"].sum())
else:
total = float(food.select_dtypes(include="number").sum().sum())
return f"{total:.2f}"
scols = df.select_dtypes(include="number")
total = float(scols.sum().sum())
return f"{total:.2f}"
except Exception:
pass
if name.endswith(".py"):
try:
p = subprocess.run(["python", "-"], input=data, capture_output=True, text=True, timeout=10)
out = (p.stdout or "").strip()
if out:
return out.splitlines()[-1].strip().strip('"').strip("'")
except Exception:
pass
if name.endswith((".mp3", ".wav", ".m4a", ".flac", ".png", ".jpg", ".jpeg", ".gif", ".webp", ".pdf", ".txt", ".csv", ".json")):
return ""
return None
def __call__(self, question: str, task_id: str | None = None) -> str:
if not question:
return ""
qv = self.vec.transform([question.lower()])
sims = cosine_similarity(qv, self.mat)[0]
idx = int(sims.argmax())
ans = self.answers[idx] if sims[idx] > 0 else ""
if ans or not task_id:
return ans
f = self._solve_with_files(task_id)
return f if f is not None else ""