# app.py (FIXED - Rule-based Level 2 Agent using Wikipedia + file reading + heuristics) import os import re import io import time import json import requests import pandas as pd import gradio as gr # optional imports; agent works without them but will use if available try: from bs4 import BeautifulSoup except Exception: BeautifulSoup = None try: import PyPDF2 except Exception: PyPDF2 = None try: from PIL import Image import pytesseract except Exception: Image = None pytesseract = None # --- # Constants # --- DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" WIKIPEDIA_API = "https://en.wikipedia.org/w/api.php" USER_AGENT = {"User-Agent": "HF-GAIA-Agent/1.0 (contact: you@example.com)"} # --- # Utility functions # --- def extract_numbers(text): """Return list of numeric strings found in text (integers or floats).""" if not text: return [] # Fixed regex pattern with proper OR operator nums = re.findall(r"\d{1,4}(?:,\d{3})*(?:\.\d+)?|\d+\.\d+|\d+", text.replace("\xa0", " ")) # normalize commas clean = [n.replace(",", "") for n in nums] return clean def simple_normalize(s): return re.sub(r"\s+", " ", (s or "").strip()).lower() def wikipedia_search_first_page(query): """Search wikipedia and return first page title or None.""" params = { "action": "query", "list": "search", "srsearch": query, "format": "json", "srlimit": 3, } try: r = requests.get(WIKIPEDIA_API, params=params, headers=USER_AGENT, timeout=10) r.raise_for_status() data = r.json() hits = data.get("query", {}).get("search", []) if hits: return hits[0].get("title") except Exception: return None return None def wikipedia_get_extract(title): """Return extract (plain text) for a page title.""" params = { "action": "query", "prop": "extracts", "explaintext": 1, "titles": title, "format": "json", "redirects": 1, } try: r = requests.get(WIKIPEDIA_API, params=params, headers=USER_AGENT, timeout=10) r.raise_for_status() data = r.json() pages = data.get("query", {}).get("pages", {}) for pid, page in pages.items(): return page.get("extract", "") except Exception: return "" return "" def wiki_try_find_number(question): """ Heuristic: attempt to craft a search query from question and find numeric answers in page extracts. Returns a candidate numeric string or None. """ q = question # remove leading patterns to get search hint - FIXED with proper OR operator search_hint = q search_hint = re.sub(r"(?i)how many|between.*from.*to.*|included|in the video", "", search_hint) search_hint = search_hint.strip() # fallback use whole q title = wikipedia_search_first_page(search_hint if search_hint else q) if not title: # try full question title = wikipedia_search_first_page(q) if not title: return None extract = wikipedia_get_extract(title) if not extract: return None # first try: context windows where words from question appear words = re.findall(r"[A-Za-z]{3,}", q) words = [w.lower() for w in words][:6] best_context = extract # find sentences containing relevant keywords sentences = re.split(r'(?<=[\.\?\!])\s+', extract) candidate_nums = [] for s in sentences: s_low = s.lower() # prefer sentences that contain several words from question or the phrase 'studio album(s)' etc score = sum(1 for w in words if w in s_low) if score >= 1 or any(k in s_low for k in ["studio album", "album", "species", "population", "released", "released in"]): nums = extract_numbers(s) for n in nums: candidate_nums.append((n, score, s.strip())) if candidate_nums: # sort by score and choose top numeric candidate_nums.sort(key=lambda x: (x[1], len(x[2])), reverse=True) return candidate_nums[0][0] # fallback: any number in extract all_nums = extract_numbers(extract) if all_nums: return all_nums[0] return None def fetch_file_text(api_url, task_id): """Call GET /files/{task_id} to fetch file content if present. Returns text or None. """ try: files_url = f"{api_url}/files/{task_id}" r = requests.get(files_url, headers=USER_AGENT, timeout=15) if r.status_code == 200: content_type = r.headers.get("Content-Type", "") # some endpoints may return raw text or JSON with 'content' and 'filename' if "application/json" in content_type: j = r.json() # expecting {'filename': ..., 'content': '...'} maybe if isinstance(j, dict): if j.get("content"): return j.get("content") # else maybe direct text in 'text' field if j.get("text"): return j.get("text") # else if it's list, return aggregated if isinstance(j, list): texts = [] for it in j: if isinstance(it, dict) and "content" in it: texts.append(it.get("content", "")) return "\n".join(texts) if texts else None # if raw PDF or binary raw = r.content # try to interpret as text try: text = raw.decode("utf-8") # if readable, return if len(text.strip()) > 20: return text except Exception: pass # try pdf via PyPDF2 if available if PyPDF2 is not None: try: reader = PyPDF2.PdfReader(io.BytesIO(raw)) pages = [] for p in reader.pages: try: pages.append(p.extract_text() or "") except Exception: continue return "\n".join(pages).strip() or None except Exception: pass # lastly if image and pytesseract available if Image is not None and pytesseract is not None: try: img = Image.open(io.BytesIO(raw)) txt = pytesseract.image_to_string(img) return txt except Exception: pass except Exception: pass return None def youtube_oembed_title_desc(url): """Try to get title/description using oembed """ try: oembed_url = "https://www.youtube.com/oembed" r = requests.get(oembed_url, params={"url": url, "format": "json"}, headers=USER_AGENT, timeout=10) if r.status_code == 200: j = r.json() title = j.get("title", "") # description often not present in oembed; return title return title except Exception: pass # try noembed try: r = requests.get("https://noembed.com/embed", params={"url": url}, headers=USER_AGENT, timeout=10) if r.status_code == 200: j = r.json() return j.get("title", "") + " " + (j.get("description") or "") except Exception: pass return "" # --- # Agent # --- # Replace the existing BasicAgent with this improved version # ---------- Replace BasicAgent with this v3 ---------- class BasicAgent: """ BasicAgent v3: - Improved Wikipedia discography parser (BeautifulSoup if available) - YouTube metadata/captions heuristics (oEmbed + page scrape + optional transcript lib) - Excel/MP3/PDF file reading via fetch_file_text() helper (already in app) - Reversed-text handler improved - Chess-from-image: fallback to "unknown" unless PGN/FEN provided in files """ def __init__(self): print("BasicAgent v3 initialized.") self.api_url = DEFAULT_API_URL # ---------- helper: normalize numeric string ---------- def norm_num_str(self, s): if s is None: return s s = str(s).strip() # remove commas and .0 s = s.replace(",", "") if re.match(r"^\d+\.0+$", s): return str(int(float(s))) return s # ---------- improved wiki discography parser ---------- def parse_wiki_discography_count(self, artist, y_min, y_max): # search for page title = wikipedia_search_first_page(artist) if not title: return None # try HTML page fetch try: url = f"https://en.wikipedia.org/wiki/{title.replace(' ', '_')}" r = requests.get(url, headers=USER_AGENT, timeout=10) r.raise_for_status() html = r.text except Exception: html = wikipedia_get_extract(title) # fallback to text if not html: return None # if BeautifulSoup available, parse tables/lists if BeautifulSoup is not None: try: soup = BeautifulSoup(html, "html.parser") # First: look for tables with header 'Studio album' or 'Studio albums' # Many pages have a discography table with class "wikitable" tables = soup.find_all("table", {"class": "wikitable"}) candidate_years = [] for tbl in tables: # try to detect if this table is about albums ths = " ".join([th.get_text(" ") for th in tbl.find_all("th")]).lower() if "studio" in ths or "album" in ths or "released" in ths: # gather year-like tokens from table cells for cell in tbl.find_all(["td","th"]): text = cell.get_text(" ").strip() yrs = re.findall(r"\b(?:19|20)\d{2}\b", text) for y in yrs: candidate_years.append(int(y)) # Additionally check lists under headings "Studio albums" or "Discography" headers = soup.find_all(['h2','h3','h4']) for h in headers: htext = h.get_text(" ").lower() if "studio album" in htext or ("discography" in htext and "studio" in htext): # collect subsequent list items sib = h.find_next_sibling() steps = 0 while sib and steps < 30: if getattr(sib, 'name', None) in ['h2','h3','h4']: break # find li entries for li in sib.find_all("li"): txt = li.get_text(" ") yrs = re.findall(r"\b(?:19|20)\d{2}\b", txt) for y in yrs: candidate_years.append(int(y)) sib = sib.next_sibling steps += 1 if candidate_years: count = sum(1 for y in candidate_years if y_min <= y <= y_max) if count > 0: return str(count) except Exception: pass # fallback: analyze plaintext extract extract = wikipedia_get_extract(title) if extract: yrs = re.findall(r"\b(?:19|20)\d{2}\b", extract) yrs = [int(x) for x in yrs] cnt = sum(1 for y in yrs if y_min <= y <= y_max) if cnt: return str(cnt) return None # ---------- improved parse year range ---------- def extract_year_range(self, question): yrs = re.findall(r"\b(?:19|20)\d{2}\b", question) if len(yrs) >= 2: y1 = int(yrs[0]); y2 = int(yrs[1]) return min(y1,y2), max(y1,y2) return None # ---------- improved parse artist ---------- def extract_artist(self, question): # try "by X between" pattern m = re.search(r"by\s+(.+?)\s+between", question, re.I) if m: return m.group(1).strip().strip('"\'.') m2 = re.search(r"by\s+(.+?)\s*\(", question, re.I) if m2: return m2.group(1).strip().strip('"\'.') m3 = re.search(r"published by (.+?) between", question, re.I) if m3: return m3.group(1).strip().strip('"\'.') # last fallback: after 'by' to end m4 = re.search(r"by\s+(.+)", question, re.I) if m4: t = m4.group(1) t = re.sub(r"\s+between.*", "", t, flags=re.I) return t.strip().strip('"\'.') return None # ---------- youtube heuristics: try oembed + page scrape + transcript lib (optional) ---------- def youtube_try_extract_number(self, url): # try oembed/title txt = youtube_oembed_title_desc(url) if txt: nums = extract_numbers(txt) if nums: return nums[0] # try fetching page and scraping numbers around 'species' or 'on camera' try: r = requests.get(url, headers=USER_AGENT, timeout=10) r.raise_for_status() page = r.text.lower() # try to find patterns like 'x species', 'species: x', 'x bird species' m = re.findall(r"(\d{1,3}(?:,\d{3})?(?:\.\d+)?)\s+(?:species|bird species|birds on camera|birds)", page) if m: return m[0].replace(",", "") # fallback: any number in description meta m2 = re.search(r' str: q = (question or "").strip() print("BasicAgent v3 solving:", q[:120].replace("\n"," ") + "...") # 0) reversed-text r = self.detect_and_reverse(q) if r: # cleaned return r.strip() # 1) studio albums between years if "studio album" in q.lower() and ("between" in q.lower() or re.search(r"\b(?:19|20)\d{2}\b", q)): yr = self.extract_year_range(q) if yr: artist = self.extract_artist(q) or "" if artist: try: ans = self.parse_wiki_discography_count(artist, yr[0], yr[1]) if ans: return self.norm_num_str(ans) except Exception: pass # 2) youtube video numeric heuristics if "youtube.com" in q or "youtu.be" in q: m = re.search(r'https?://[^\s"]+', q) if m: url = m.group(0).strip('",') yt_ans = self.youtube_try_extract_number(url) if yt_ans: return self.norm_num_str(yt_ans) # 3) simple math / counting ans = self.solve_math(q) if ans: return self.norm_num_str(ans) ans = self.solve_counting(q) if ans: return self.norm_num_str(ans) # 4) file-based (Excel/audio) if task_id provided if task_id: f_ans = self.handle_file_based_question(task_id) if f_ans: return self.norm_num_str(f_ans) # 5) fallback previous heuristics (simple facts / wiki) ans = self.solve_simple_facts(q) if ans: return ans ans = self.solve_with_wikipedia(q, task_id=task_id) if ans: return self.norm_num_str(ans) # 6) chess/image questions cannot be solved reliably without vision+engine → return unknown if "chess" in q.lower() or "image" in q.lower() or "fen" in q.lower() or "position" in q.lower(): return "unknown" return "unknown" # ---------- end BasicAgent v3 ---------- # Submission runner # --- def run_and_submit_all(profile: gr.OAuthProfile | None): """ Fetches all questions, runs the BasicAgent on them, submits all answers, and displays the results. """ space_id = os.getenv("SPACE_ID") or "unknown-space" if profile: username = f"{profile.username}" else: return "Please Login to Hugging Face with the button.", None api_url = DEFAULT_API_URL questions_url = f"{api_url}/questions" submit_url = f"{api_url}/submit" # Instantiate Agent try: agent = BasicAgent() except Exception as e: return f"Error initializing agent: {e}", None agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" # Fetch Questions try: r = requests.get(questions_url, headers=USER_AGENT, timeout=15) r.raise_for_status() questions_data = r.json() if not isinstance(questions_data, list): return "Questions endpoint returned invalid format.", None except Exception as e: return f"Error fetching questions: {e}", None results_log = [] answers_payload = [] for item in questions_data: task_id = item.get("task_id") question_text = item.get("question") if not task_id or question_text is None: continue try: ans = agent(question_text, task_id=task_id) # ensure answers are strings submitted_answer = str(ans) if ans is not None else "unknown" answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) time.sleep(0.2) # polite pause to avoid hammering external services except Exception as e: results except Exception as e: results_log.append({ "Task ID": task_id, "Question": question_text, "Submitted Answer": f"ERROR: {e}" }) if not answers_payload: return "Agent did not produce any answers.", pd.DataFrame(results_log) submission_data = { "username": username.strip(), "agent_code": agent_code, "answers": answers_payload } try: resp = requests.post( submit_url, json=submission_data, headers=USER_AGENT, timeout=60 ) resp.raise_for_status() result = resp.json() final_status = ( f"Submission Successful!\n" f"User: {result.get('username')}\n" f"Overall Score: {result.get('score', 'N/A')}% " f"({result.get('correct_count', '?')}/{result.get('total_attempted', '?')} correct)\n" f"Message: {result.get('message', '')}" ) return final_status, pd.DataFrame(results_log) except requests.exceptions.HTTPError as e: try: body = e.response.json() detail = body.get("detail") or json.dumps(body)[:400] except Exception: detail = e.response.text[:400] return f"Submission Failed: HTTP {e.response.status_code} - {detail}", pd.DataFrame(results_log) except Exception as e: return f"Submission Failed: {e}", pd.DataFrame(results_log) # ------------------------------ # Gradio UI # ------------------------------ with gr.Blocks() as demo: gr.Markdown("# Level-2 Agent (Rule-based + Wiki/File Tools)") gr.Markdown("Duplicate this space, make it public, then login and press **Run Evaluation & Submit All Answers**.") gr.LoginButton() run_button = gr.Button("Run Evaluation & Submit All Answers") status_output = gr.Textbox( label="Run Status / Submission Result", lines=6, interactive=False ) results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) run_button.click( fn=run_and_submit_all, inputs=[], outputs=[status_output, results_table] ) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)))