Spaces:
Sleeping
Sleeping
| import requests | |
| import base64 | |
| import logging | |
| from playwright.sync_api import sync_playwright | |
| import threading | |
| import speech_recognition as sr | |
| from pydub import AudioSegment | |
| import io | |
| import tempfile | |
| import sys | |
| import os | |
| import pandas as pd | |
| import numpy as np | |
| import speech_recognition as sr | |
| from bs4 import BeautifulSoup | |
| import pydub | |
| from pydub import AudioSegment | |
| import pypdf | |
| import zipfile | |
| import duckdb | |
| from PIL import Image | |
| import json | |
| logger = logging.getLogger(__name__) | |
| def run_python_code(code: str) -> str: | |
| """ | |
| Executes Python code and returns the output. | |
| """ | |
| try: | |
| logger.info("Executing Python code...") | |
| # Robustly extract code from markdown blocks if present | |
| if "```python" in code: | |
| code = code.split("```python")[1].split("```")[0].strip() | |
| elif "```" in code: | |
| code = code.split("```")[1].split("```")[0].strip() | |
| logger.info(f"Code:\n{code}") | |
| # Create a buffer to capture stdout | |
| old_stdout = sys.stdout | |
| redirected_output = io.StringIO() | |
| sys.stdout = redirected_output | |
| # Execution context | |
| local_scope = { | |
| "pd": pd, | |
| "np": np, | |
| "requests": requests, | |
| "io": io, | |
| "sr": sr, | |
| "pydub": pydub, | |
| "sys": sys, | |
| "os": os, | |
| "BeautifulSoup": BeautifulSoup, | |
| "pypdf": pypdf, | |
| "zipfile": zipfile, | |
| "duckdb": duckdb, | |
| "Image": Image | |
| } | |
| try: | |
| exec(code, {}, local_scope) | |
| except Exception as exec_error: | |
| return f"Error executing code: {exec_error}" | |
| finally: | |
| sys.stdout = old_stdout | |
| output = redirected_output.getvalue() | |
| logger.info(f"Code Output:\n{output}") | |
| return output if output.strip() else "Code executed successfully but produced no output. Did you forget to print the result?" | |
| except Exception as e: | |
| logger.error(f"System error during code execution: {e}") | |
| return f"System error: {e}" | |
| def execute_python(code: str) -> str: | |
| """ | |
| Executes Python code and returns the output. | |
| Use this for math, data analysis (pandas, numpy, duckdb), and file processing. | |
| """ | |
| return run_python_code(code) | |
| def read_pdf(url: str) -> str: | |
| """ | |
| Downloads a PDF from a URL and extracts its text content. | |
| """ | |
| try: | |
| logger.info(f"Reading PDF from: {url}") | |
| if not url.startswith("http"): | |
| return f"Error: URL must be absolute. Received: {url}" | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_pdf: | |
| temp_pdf.write(response.content) | |
| temp_pdf_path = temp_pdf.name | |
| text = "" | |
| try: | |
| reader = pypdf.PdfReader(temp_pdf_path) | |
| for page in reader.pages: | |
| text += page.extract_text() + "\n" | |
| except Exception as e: | |
| return f"Error reading PDF: {e}" | |
| finally: | |
| os.remove(temp_pdf_path) | |
| return text[:5000] # Truncate if too long to avoid context overflow | |
| except Exception as e: | |
| logger.error(f"Error downloading PDF: {e}") | |
| return f"Error downloading PDF: {e}" | |
| def read_zip(url: str) -> str: | |
| """ | |
| Downloads a ZIP file from a URL, lists its contents, and extracts text from small files. | |
| """ | |
| try: | |
| logger.info(f"Reading ZIP from: {url}") | |
| if not url.startswith("http"): | |
| return f"Error: URL must be absolute. Received: {url}" | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as temp_zip: | |
| temp_zip.write(response.content) | |
| temp_zip_path = temp_zip.name | |
| result = "ZIP Contents:\n" | |
| try: | |
| with zipfile.ZipFile(temp_zip_path, 'r') as zip_ref: | |
| for file_info in zip_ref.infolist(): | |
| result += f"- {file_info.filename} ({file_info.file_size} bytes)\n" | |
| # If it's a small text file, try to read it | |
| if file_info.file_size < 10000 and not file_info.filename.endswith(('.png', '.jpg', '.jpeg', '.gif')): | |
| try: | |
| with zip_ref.open(file_info) as f: | |
| content = f.read().decode('utf-8', errors='ignore') | |
| result += f" Content: {content[:500]}\n" | |
| except: | |
| pass | |
| except Exception as e: | |
| return f"Error reading ZIP: {e}" | |
| finally: | |
| os.remove(temp_zip_path) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error downloading ZIP: {e}") | |
| return f"Error downloading ZIP: {e}" | |
| def search_history(query: str) -> str: | |
| """ | |
| Searches the history of solved quizzes for a given query (e.g., a previous URL). | |
| Returns the answer if found. | |
| """ | |
| try: | |
| history_file = "history.jsonl" | |
| if not os.path.exists(history_file): | |
| return "No history found." | |
| results = [] | |
| with open(history_file, "r") as f: | |
| for line in f: | |
| try: | |
| entry = json.loads(line) | |
| if query in str(entry): | |
| results.append(str(entry)) | |
| except: | |
| pass | |
| if results: | |
| return "\n".join(results) | |
| return "No matching history found." | |
| except Exception as e: | |
| return f"Error searching history: {e}" | |
| def transcribe_audio(url: str) -> str: | |
| """ | |
| Downloads an audio file from a URL and transcribes it using Google Speech Recognition. | |
| Supports MP3, WAV, etc. | |
| """ | |
| try: | |
| logger.info(f"Transcribing audio from: {url}") | |
| if not url.startswith("http"): | |
| return f"Error: URL must be absolute. Received: {url}" | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| # Create a temporary file to save the audio | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_wav: | |
| temp_wav_path = temp_wav.name | |
| # Convert to WAV if needed (using pydub) | |
| try: | |
| audio_content = io.BytesIO(response.content) | |
| audio = AudioSegment.from_file(audio_content) | |
| audio.export(temp_wav_path, format="wav") | |
| except Exception as e: | |
| logger.error(f"Error converting audio: {e}") | |
| return f"Error converting audio: {e}" | |
| # Transcribe using local Whisper (tiny model) | |
| # This runs entirely on-device (CPU/GPU) and does not make external API calls. | |
| import whisper | |
| # Load model (downloads once if not cached) | |
| model = whisper.load_model("tiny") | |
| result = model.transcribe(temp_wav_path) | |
| text = result["text"] | |
| logger.info(f"WHISPER OUTPUT: {text}") | |
| # Clean up | |
| os.remove(temp_wav_path) | |
| return text | |
| except Exception as e: | |
| logger.error(f"Error transcribing audio: {e}") | |
| return f"Error transcribing audio: {e}" | |
| def understand_image(url: str, prompt: str = "Describe this image in detail") -> str: | |
| """ | |
| Analyzes an image from a URL using the agent's vision capabilities (via API). | |
| Returns a description of the image. | |
| """ | |
| try: | |
| logger.info(f"Analyzing image from: {url}") | |
| if os.path.exists(url): | |
| # It's a local file, convert to data URI | |
| try: | |
| with open(url, "rb") as image_file: | |
| encoded_string = base64.b64encode(image_file.read()).decode('utf-8') | |
| # Determine mime type based on extension, default to jpeg | |
| mime_type = "image/jpeg" | |
| if url.lower().endswith(".png"): | |
| mime_type = "image/png" | |
| elif url.lower().endswith(".gif"): | |
| mime_type = "image/gif" | |
| elif url.lower().endswith(".webp"): | |
| mime_type = "image/webp" | |
| url = f"data:{mime_type};base64,{encoded_string}" | |
| except Exception as e: | |
| return f"Error reading local image file: {e}" | |
| if not url.startswith("http") and not url.startswith("data:"): | |
| return f"Error: URL must be absolute (http/https), a data URI, or a valid local file path. Received: {url[:50]}..." | |
| api_key = os.getenv("AI_TOKEN") or os.getenv("OPENROUTER_API_KEY") | |
| if not api_key: | |
| return "Error: AI_TOKEN not found." | |
| # Use OpenRouter API to analyze the image | |
| headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "model": "google/gemini-2.0-flash-lite-001", | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": prompt}, | |
| {"type": "image_url", "image_url": {"url": url}} | |
| ] | |
| } | |
| ] | |
| } | |
| response = requests.post("https://aipipe.org/openrouter/v1/chat/completions", headers=headers, json=payload) | |
| response.raise_for_status() | |
| result = response.json() | |
| description = result['choices'][0]['message']['content'] | |
| logger.info(f"IMAGE ANALYSIS OUTPUT: {description}") | |
| return description | |
| except Exception as e: | |
| logger.error(f"Error analyzing image: {e}") | |
| return f"Error analyzing image: {e}" | |
| def call_api(url: str, method: str = "GET", headers: dict = None, json_data: dict = None) -> str: | |
| """ | |
| Makes an HTTP request to an external API. | |
| Useful for sourcing data from APIs as required by the quiz. | |
| """ | |
| try: | |
| logger.info(f"Calling API: {method} {url}") | |
| if not url.startswith("http"): | |
| return f"Error: URL must be absolute. Received: {url}" | |
| response = requests.request(method, url, headers=headers, json=json_data) | |
| try: | |
| return json.dumps(response.json(), indent=2) | |
| except: | |
| return response.text | |
| except Exception as e: | |
| logger.error(f"Error calling API: {e}") | |
| return f"Error calling API: {e}" | |
| def fetch_page_text(url: str) -> dict: | |
| """ | |
| Fetches the text content of a web page using Playwright. | |
| Also extracts links, audio sources, and takes a screenshot if visual elements are present. | |
| Args: | |
| url (str): The URL of the page to fetch. | |
| Returns: | |
| dict: A dictionary containing the 'content' (text + links/media) or 'error'. | |
| """ | |
| result = {} | |
| try: | |
| logger.info(f"Fetching page text from: {url}") | |
| if not url.startswith("http"): | |
| result["error"] = f"Error: URL must be absolute. Received: {url}" | |
| return result | |
| def run_playwright(): | |
| try: | |
| with sync_playwright() as p: | |
| browser = p.chromium.launch(headless=True) | |
| page = browser.new_page() | |
| page.goto(url) | |
| page.wait_for_load_state("networkidle") | |
| html_content = page.content() | |
| # Screenshot Logic | |
| images = [] | |
| try: | |
| has_visuals = page.evaluate("() => document.querySelectorAll('canvas, img').length > 0") | |
| if has_visuals: | |
| import uuid | |
| unique_id = str(uuid.uuid4())[:8] | |
| screenshot_path = f"/tmp/screenshot_{unique_id}.jpg" | |
| page.screenshot(path=screenshot_path, full_page=True, type='jpeg', quality=50) | |
| images.append(f"Image: [Page Screenshot]({screenshot_path})") | |
| logger.info(f"Visual elements detected. Saved screenshot to {screenshot_path}") | |
| except Exception as e: | |
| logger.warning(f"Error handling screenshot in fetch_page_text: {e}") | |
| browser.close() | |
| # Parse with BS4 | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| text_content = soup.get_text(separator='\n', strip=True) | |
| links = [] | |
| for a in soup.find_all('a', href=True): | |
| href = a['href'] | |
| links.append(f"Link: [{a.get_text(strip=True)}]({href})") | |
| audio_sources = [] | |
| for audio in soup.find_all('audio'): | |
| if audio.get('src'): | |
| audio_sources.append(f"Audio: {audio['src']}") | |
| for source in audio.find_all('source', src=True): | |
| audio_sources.append(f"Audio: {source['src']}") | |
| result["content"] = text_content + "\n\n--- Extracted Links & Media ---\n" + "\n".join(links + audio_sources + images) | |
| except Exception as e: | |
| result["error"] = str(e) | |
| thread = threading.Thread(target=run_playwright) | |
| thread.start() | |
| thread.join() | |
| if "error" in result: | |
| logger.error(f"Error fetching page: {result['error']}") | |
| return f"Error fetching page: {result['error']}" | |
| content = result.get("content", "") | |
| logger.info(f"Fetched content (first 100 chars): {content[:100]}") | |
| return content | |
| except Exception as e: | |
| logger.error(f"Error fetching page: {e}") | |
| return f"Error fetching page: {e}" | |
| def fetch_page_scripts(url: str) -> str: | |
| """ | |
| Fetches only the scripts (inline and src) from a web page. | |
| Useful when the page mentions embedded logic or hidden code. | |
| """ | |
| result = {} | |
| try: | |
| logger.info(f"Fetching page scripts from: {url}") | |
| if not url.startswith("http"): | |
| return f"Error: URL must be absolute. Received: {url}" | |
| def run_playwright(): | |
| try: | |
| with sync_playwright() as p: | |
| browser = p.chromium.launch(headless=True) | |
| page = browser.new_page() | |
| page.goto(url) | |
| page.wait_for_load_state("networkidle") | |
| html_content = page.content() | |
| browser.close() | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| scripts = [] | |
| for script in soup.find_all('script'): | |
| if script.get('src'): | |
| scripts.append(f"Script Source: {script['src']}") | |
| elif script.string and script.string.strip(): | |
| scripts.append(f"Inline Script: {script.string.strip()[:2000]}") | |
| result["content"] = "--- Extracted Scripts ---\n" + "\n".join(scripts) | |
| except Exception as e: | |
| result["error"] = str(e) | |
| thread = threading.Thread(target=run_playwright) | |
| thread.start() | |
| thread.join() | |
| if "error" in result: | |
| return f"Error fetching scripts: {result['error']}" | |
| return result.get("content", "No scripts found.") | |
| except Exception as e: | |
| return f"Error fetching scripts: {e}" | |