Spaces:
Sleeping
Sleeping
| # app/solver.py | |
| import time | |
| import requests | |
| import tempfile | |
| import subprocess | |
| import httpx | |
| import sys | |
| import json | |
| import mimetypes | |
| import importlib.util | |
| import itertools | |
| import base64 | |
| import re | |
| import os | |
| import cv2 | |
| from bs4 import BeautifulSoup | |
| import pandas as pd | |
| from playwright.sync_api import sync_playwright | |
| from typing import Any, Dict, Optional | |
| from datetime import datetime, timedelta | |
| from urllib.parse import urlparse, urlunparse | |
| from models import QuizSubmitPayload, QuizSubmitResponse, QuizRequest | |
| from config import STUDENT_SECRET, LLM_API_KEY | |
| TIMEOUT_SECONDS = 180 | |
| import logging | |
| logging.basicConfig(filename="app.log", level=logging.INFO) | |
| console = logging.StreamHandler() # logs to stdout | |
| console.setLevel(logging.INFO) | |
| logging.getLogger().addHandler(console) | |
| class QuizSolver: | |
| def __init__(self, request_payload: QuizRequest, start_time: datetime): | |
| self.email = request_payload.email | |
| self.secret = request_payload.secret | |
| self.current_url = request_payload.url | |
| self.start_time = start_time | |
| # Ensure temporary directory exists | |
| self.temp_dir = tempfile.TemporaryDirectory() | |
| def _get_time_remaining(self) -> float: | |
| """Calculates the time remaining until the 3-minute deadline.""" | |
| elapsed = (datetime.now() - self.start_time).total_seconds() | |
| remaining = TIMEOUT_SECONDS - elapsed | |
| return max(0.0, remaining) | |
| def _llm_analyze_and_generate_code(self, quiz_content_text: str, link_data_str: str,p) -> Optional[Dict[str, Any]]: | |
| """ | |
| Uses an LLM to analyze the question and generate a structured JSON | |
| containing the necessary Python code. | |
| """ | |
| sample_data=[] | |
| processed_data={} | |
| if link_data_str: | |
| try: | |
| for i in link_data_str: | |
| ext=os.path.splitext(i)[1].lower() | |
| if ext == '.csv': | |
| with open(i, 'r', encoding='utf-8') as f: | |
| processed_data[i] = "".join(list(itertools.islice(f, 5))) | |
| sample_data.append(processed_data) | |
| elif ext == '.json': | |
| with open(i, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| if isinstance(data, dict): | |
| processed_data[i]=data.keys() | |
| else: | |
| processed_data[i]=data[:2] if len(data) > 0 else [] | |
| sample_data.append(processed_data) | |
| elif ext == '.txt': | |
| with open(i, 'r', encoding='utf-8') as f: | |
| processed_data[i] = f.read()[:50] | |
| sample_data.append(processed_data) | |
| elif ext == '.md': | |
| with open(i, 'r', encoding='utf-8') as f: | |
| processed_data[i] = f.read()[:50] | |
| sample_data.append(processed_data) | |
| elif ext in ['.xls', '.xlsx']: | |
| processed_data[i] = pd.read_excel(i, header=None).head(3) | |
| sample_data.append(processed_data) | |
| elif ext in ['.jpg', '.jpeg', '.png', '.gif', '.bmp']: | |
| processed_data[i] = cv2.imread(i) | |
| sample_data.append(processed_data) | |
| except: | |
| sample_data=[] | |
| logging.info("-> LLM: Analyzing question and generating code...") | |
| system_prompt=""" | |
| You are a quiz-solving assistant. | |
| Your task: Given the scraped text of a quiz page, you must extract: | |
| 1. The submission URL where the final answer must be Posted. | |
| 2. Accurate and executable Python code that computes ONLY the answer (a single value) and stores it in a variable that MUST be called answer. | |
| 3. All non-built in Python dependencies that the code requires as a list. | |
| Return your output ONLY as JSON with this exact structure: | |
| { | |
| "submit_url": "string", | |
| "python_code": ["line1", "line2", "..."], | |
| "dependencies": ["package1", "package2"], | |
| "explaination": "string" (Explain what you understood from the prompt) | |
| } | |
| Rules: | |
| - The submission URL ALWAYS appears in the text. It typically looks like: "Post your answer to https://....". | |
| - You MUST extract the submission URL. | |
| - DO NOT guess. Extract it literally from the text. | |
| - Do NOT include any explanation or commentary. | |
| - Only produce valid JSON output. | |
| - **Follow the instructions EXACTLY as GIVEN in the text or audio form to get the correct answer.** | |
| - If audio is present, trascribe it first and then proceed with the solution. | |
| - You are provided with a web scraping tool 'scrape_web_page'. If the question asks to scrape a url, you must use this tool. | |
| Python rules: | |
| - The python_code section MUST contain only the code needed to compute the answer and store it in a variable called "answer". | |
| - Ignore any submission JSON shown in the page. | |
| - DO NOT include code to send the submission. | |
| - The backend will run your code inside a Python subprocess. | |
| - Store the final answer in the variable named "answer". | |
| - **Ensure that the final answer is a Python-native type (int, float, str, bool), NOT a NumPy or pandas dtype. If the result is a NumPy scalar, call .item() to convert it.** | |
| - **ALWAYS ENSURE the variable "answer" is JSON-serialisable.** | |
| - If your code imports libraries other than the standard library modules, then those libraries MUST be listed in "dependencies". | |
| - DO NOT include standard library modules in "dependencies". | |
| - DO NOT include any comments in the python code because the code will not run and the answer cannot be computed. | |
| """ | |
| # --- CRITICAL LLM PROMPT --- | |
| user_prompt = f""" | |
| You are a highly skilled Question Solver. Your task is to analyze the quiz question | |
| provided below and generate the executable Python code to solve it. | |
| 1. **Quiz Content:** (The main question and submission text scraped from the website) | |
| --- | |
| {quiz_content_text} | |
| --- | |
| 2. **Files Available:** (A list of paths of available files) | |
| --- | |
| {link_data_str} | |
| --- | |
| 3. **Sample of the Files Available** to be used for understanding the structure of the data only. | |
| --- | |
| {sample_data} | |
| --- | |
| The code must adhere to these rules: | |
| 1. All dependencies that your generated code requires must appear in "dependencies" key of json output structure. | |
| 2. ONLY include dependencies that are actually imported. | |
| 3. If no extra libraries are required, return an empty list. | |
| 4. Do NOT send or execute the submission request. Only generate the code that computes the answer. | |
| 5. The final answer must be assigned to a variable named **'answer'**. | |
| 6. **Follow the instructions exactly as given in the text or audio form to get the correct answer.** | |
| 7. If the question asks you to scrape a url, use the scrape_web_page tool to retrieve the content of the url. | |
| You must also extract the submission url (link to which the answer should be sent to) from the question content. | |
| Respond ONLY with a single JSON object that strictly adheres to the schema below. | |
| JSON Schema: | |
| ```json | |
| {{ | |
| "submit_url": "The submission URL extracted from the content (e.g., [https://example.com/submit](https://example.com/submit)).", | |
| "python_code": [ | |
| "import ...", | |
| "# ... generated code to process data and set final_answer ...", | |
| "answer = ..." | |
| ], | |
| "dependencies":[...] (eg. ["pandas","numpy",...]), | |
| "explaination": Explain what you understood from the prompt | |
| }} | |
| ``` | |
| """ | |
| # --- END CRITICAL LLM PROMPT --- | |
| # if not llm_client: | |
| # logging.info("LLM client not initialized. Cannot generate code.") | |
| # return None | |
| try: | |
| # api_url='https://aipipe.org/openai/v1/chat/completions' | |
| headers={ | |
| "Content-Type": "application/json", | |
| "Authorization": f"Bearer{LLM_API_KEY}" | |
| } | |
| # data = { | |
| # "model": "gpt-5-mini", | |
| # "messages": [ | |
| # { | |
| # "role": "system", | |
| # "content": system_prompt | |
| # }, | |
| # { | |
| # "role": "user", | |
| # "content": user_prompt | |
| # } | |
| # ], | |
| # } | |
| # response=requests.post(api_url, headers=headers, json=data) | |
| api_url='https://aipipe.org/openrouter/v1/responses' | |
| content= [ | |
| { | |
| "type": "input_text", | |
| "text": user_prompt | |
| } | |
| ] | |
| files_payload = [] | |
| # uploading audio or image data | |
| audio_extensions = {".mp3", ".wav", ".m4a", ".opus", ".flac",".mp4"} | |
| # if link_data_str: | |
| # for f in link_data_str: | |
| # if os.path.splitext(f)[1].lower() in audio_extensions: | |
| # with open(f, "rb") as audio_file: | |
| # audio_bytes = audio_file.read() | |
| # audio_base64 = base64.b64encode(audio_bytes).decode("utf-8")[:200] | |
| # format=os.path.splitext(f)[1].lower()[1:] | |
| # content.append({ | |
| # "type": "input_audio", | |
| # "audio": { "file": base64.b64encode(open(f,"rb").read()).decode("utf-8")[:200]} | |
| # }) | |
| # mime_type, _ = mimetypes.guess_type(f) | |
| # files_payload.append( | |
| # ("audio", (f, open(f, "rb"), mime_type)) | |
| # ) | |
| # if os.path.splitext(f)[1].lower() =='.png': | |
| # content.append({ | |
| # "type": "input_image", | |
| # "audio": {"file": open(f, "rb")} | |
| # }) | |
| # files_payload.append(("image.png", (f, open(f, "rb"), "image/png"))) | |
| # web scraping helper function | |
| def scrape_web_page(url: str) -> str: | |
| if url.find("https://")==-1: | |
| parsed = urlparse(self.current_url) | |
| base_url = urlunparse((parsed.scheme, parsed.netloc, "", "", "", "")) | |
| url=base_url+url | |
| browser = p.chromium.launch(headless=True) | |
| page = browser.new_page() | |
| page.goto(url, wait_until="networkidle") | |
| html = page.content() | |
| soup = BeautifulSoup(html, "html.parser") | |
| for script in soup.find_all("script"): | |
| script.decompose() | |
| browser.close() | |
| return soup.get_text() | |
| data = { | |
| "model": "openai/gpt-5-mini", | |
| "input": [ | |
| { | |
| "role": "system", | |
| "content": [ | |
| { | |
| "type": "input_text", | |
| "text": system_prompt | |
| } | |
| ] | |
| }, | |
| { | |
| "role": "user", | |
| "content": content | |
| } | |
| ], | |
| "tools":[{ | |
| "type": "function", | |
| "name": "scrape_web_page", | |
| "description": "Fetches and extracts readable text from a given URL.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": | |
| {"url": | |
| {"type": "string"} | |
| }, | |
| "required": ["url"] | |
| } | |
| }] | |
| } | |
| response=requests.post(api_url, headers=headers, json=data, files=files_payload) | |
| # llm_output=response.json()["choices"][0]["message"]["content"] | |
| tool_call = response.json()["output"][1] | |
| if tool_call["type"]=="function_call" and tool_call["name"] == "scrape_web_page": | |
| args = tool_call["arguments"] | |
| if isinstance(args, str): | |
| args = json.loads(args) | |
| url = args["url"] | |
| scraped = scrape_web_page(url) | |
| user_prompt+=f"Scraped content: {scraped}" | |
| # Send tool output back | |
| data={ | |
| "model": "openai/gpt-5-mini", | |
| "tools":[{ | |
| "type": "function", | |
| "name": "scrape_web_page", | |
| "description": "Fetches and extracts readable text from a given URL.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": | |
| {"url": | |
| {"type": "string"} | |
| }, | |
| "required": ["url"] | |
| } | |
| }], | |
| "input": [ | |
| { | |
| "role": "system", | |
| "content": [ | |
| { | |
| "type": "input_text", | |
| "text": system_prompt | |
| } | |
| ] | |
| }, | |
| { | |
| "role": "user", | |
| "content": user_prompt | |
| } | |
| ], | |
| } | |
| response=requests.post(api_url, headers=headers, json=data) | |
| llm_output=response.json()["output"][1]["content"][0]["text"] | |
| cleaned = re.sub(r"^```json\s*|\s*```$", "", llm_output.strip(), flags=re.MULTILINE) | |
| llm_plan = json.loads(cleaned) | |
| logging.info(f"plan: ,{llm_plan}") | |
| # llm_output='''{ | |
| # "submit_url":"https://submit", | |
| # "python_code":["import pandas as pd","df=pd.DataFrame({'a':[1,2,3,4,5]})","answer=df.a.sum().item()"], | |
| # "dependencies":["pandas"] | |
| # }''' | |
| except Exception as e: | |
| logging.info(e) | |
| parsed = urlparse(self.current_url) | |
| base_url = urlunparse((parsed.scheme, parsed.netloc, "", "", "", "")) | |
| url=base_url+"/submit" | |
| ans=''' | |
| { | |
| "submit_url":url, | |
| "python_code":["answer=100"], | |
| "dependencies":[] | |
| } | |
| ''' | |
| return ans | |
| return llm_plan | |
| def _download_file(self,file_list): | |
| """ | |
| Downloads files one after another using requests. | |
| Parameters: | |
| file_list (list): List of dicts with {"text": ..., "href": ...} | |
| save_dir (str): Directory to save the downloaded files. | |
| Returns: | |
| List of file paths to the downloaded files. | |
| """ | |
| # Create folder if missing | |
| downloaded_paths = [] | |
| for item in file_list: | |
| name = item.get("text", "file").replace(" ", "_") | |
| url = item["href"] | |
| # Guess filename from URL | |
| filename = os.path.basename(url) | |
| if not filename: | |
| filename = f"{name}.html" # fallback | |
| save_path = os.path.join(self.temp_dir.name, filename) | |
| try: | |
| response = requests.get(url, timeout=20) | |
| response.raise_for_status() | |
| # Save file | |
| with open(save_path, "wb") as f: | |
| f.write(response.content) | |
| downloaded_paths.append(save_path) | |
| except Exception as e: | |
| continue | |
| return downloaded_paths | |
| def _execute_generated_code(self, code_lines: list,dependencies: list) -> Any: | |
| """ | |
| Safely executes the LLM-generated Python code in an isolated environment. | |
| """ | |
| logging.info("-> Executing generated code locally...") | |
| code_block = "\n".join(code_lines) | |
| if dependencies!=[]: | |
| copy_dep=dependencies.copy() | |
| for module_name in copy_dep: | |
| spec = importlib.util.find_spec(module_name) | |
| if spec.origin == "built-in": | |
| dependencies.remove(module_name) | |
| # built-in modules have origin 'built-in' | |
| if dependencies!=[]: | |
| try: | |
| to_install=[sys.executable, "-m", "pip", "install"] + dependencies | |
| result = subprocess.run( | |
| to_install, | |
| check=True, | |
| text=True, | |
| capture_output=True | |
| ) | |
| if result.returncode!=0: | |
| return 100 | |
| except: | |
| return 100 | |
| file_path = os.path.join(self.temp_dir.name, "script.py") | |
| with open(file_path, "w", encoding="utf-8") as f: | |
| f.write(code_block) | |
| script_path = f.name | |
| try: | |
| namespace = {} | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| code = f.read() | |
| exec(code, namespace) | |
| # Step 3: extract the answer | |
| answer = namespace["answer"] | |
| return answer | |
| except: | |
| return 100 | |
| # (The _solve_and_submit method remains the same) | |
| def _solve_and_submit(self, submission_url: str, answer: Any) -> QuizSubmitResponse: | |
| # ... (implementation from previous response) | |
| # time_remaining = self._get_time_remaining() | |
| # if time_remaining <= 5: # Keep a 5s buffer | |
| # logging.info("Submission cancelled: Deadline is approaching.") | |
| # return QuizSubmitResponse(correct=False, reason="Deadline exceeded.") | |
| # submission_url="https://tds-llm-analysis.s-anand.net/submit" | |
| payload = QuizSubmitPayload( | |
| email=self.email, | |
| secret=self.secret, | |
| url=self.current_url, | |
| answer=answer | |
| ).model_dump() | |
| logging.info(f"Request to submit: {payload}") | |
| max=3 | |
| for i in range(1,7): | |
| try: | |
| headers = {"Content-Type": "application/json"} | |
| response = requests.post(submission_url, json=payload) | |
| response.raise_for_status() # Raise exception for bad status codes | |
| logging.info(response) | |
| submit_response = QuizSubmitResponse(**response.json()) | |
| logging.info(submit_response) | |
| return submit_response | |
| except requests.exceptions.RequestException as e: | |
| logging.info(f"Submission failed for {submission_url}: {e}") | |
| if i==3: | |
| submission_url="https://tds-llm-analysis.s-anand.net/submit" | |
| elif i==6: | |
| return QuizSubmitResponse(correct=False, reason=f"Submission failed: {e}") | |
| else: | |
| continue | |
| # (The run_quiz_loop method is updated to use the new LLM and execution steps) | |
| def run_quiz_loop(self) -> str: | |
| """ | |
| Main loop to solve the quiz and follow new URLs until completion or timeout. | |
| """ | |
| repeats=0 | |
| while self.current_url: | |
| with sync_playwright() as p: | |
| # Running headless to save resources | |
| browser = p.chromium.launch(headless=True) | |
| page = browser.new_page() | |
| logging.info(f"\n--- Solving Quiz: {self.current_url} ---") | |
| # 1. Visit URL and get content | |
| try: | |
| page.goto(self.current_url, wait_until="networkidle") | |
| # 1. Get entire rendered text content from the <body> | |
| final_quiz_url = page.url | |
| html = page.content() | |
| soup = BeautifulSoup(html, "html.parser") | |
| # for script in soup.find_all("script"): | |
| # script.decompose() | |
| quiz_content_text = soup | |
| # 2. Get ALL links on the page for the LLM to analyze | |
| # We capture the text and the URL for every anchor tag (<a>) | |
| all_links = page.evaluate('''() => { | |
| const links = Array.from(document.querySelectorAll('a')); | |
| const fileExtensions = [ | |
| '.pdf', '.doc', '.docx', '.xls', '.xlsx', '.csv', '.zip', '.rar', | |
| '.tar', '.gz', '.jpg', '.jpeg', '.png', '.gif', '.svg', '.mp3', | |
| '.mp4', '.avi', '.mov', '.json', '.xml', '.txt', '.opus', '.flac' | |
| ]; | |
| return links.map(link => ({ | |
| text: link.innerText.trim(), | |
| href: link.href | |
| })) | |
| .filter(link => { | |
| const href = link.href.toLowerCase(); | |
| return fileExtensions.some(ext => href.endsWith(ext)); | |
| }); | |
| }''') | |
| audio_element = page.query_selector("audio") | |
| if audio_element: | |
| audio_src = audio_element.get_attribute("src") # safe | |
| # or absolute URL | |
| audio_src = page.evaluate("(el) => el.src", audio_element) | |
| all_links.append({'audio':'Audio file','href':audio_src}) | |
| canvas_element=page.query_selector("canvas") | |
| if canvas_element: | |
| data_url = page.evaluate(""" | |
| () => { | |
| const canvas = document.querySelector("canvas"); | |
| return canvas.toDataURL("image/png"); | |
| } | |
| """) | |
| # Strip off the prefix and decode | |
| header, encoded = data_url.split(",", 1) | |
| image_bytes = base64.b64decode(encoded) | |
| # Now you can send `image_bytes` or `encoded` directly to your LLM | |
| # Example: save locally if needed | |
| canvas_path = os.path.join(self.temp_dir.name, "image.png") | |
| with open(canvas_path, "wb") as f: | |
| f.write(image_bytes) | |
| # link_data_str = json.dumps(all_links) | |
| except Exception as e: | |
| logging.info(f"Navigation/Scraping failed: {e}") | |
| browser.close() | |
| # Download files | |
| if all_links !=[]: | |
| download= self._download_file(all_links) | |
| if canvas_element: | |
| download.append(canvas_path) | |
| else: | |
| download=None | |
| download=[] | |
| if canvas_element: | |
| download.append(canvas_path) | |
| # 2. LLM Analysis and Code Generation | |
| llm_plan = self._llm_analyze_and_generate_code(quiz_content_text,download,p) | |
| submission_url = llm_plan['submit_url'] | |
| if submission_url=='': | |
| submission_url="https://tds-llm-analysis.s-anand.net/submit" | |
| final_answer = self._execute_generated_code( | |
| llm_plan['python_code'],llm_plan['dependencies'] | |
| ) | |
| logging.info(f"Received result of python code: {final_answer}") | |
| # 5. Submit the Answer | |
| if submission_url.find("https://")==-1: | |
| parsed = urlparse(self.current_url) | |
| base_url = urlunparse((parsed.scheme, parsed.netloc, "", "", "", "")) | |
| submission_url=base_url+submission_url | |
| logging.info(f"Submission url used: {submission_url}") | |
| submit_response = self._solve_and_submit(submission_url, final_answer) | |
| self.temp_dir.cleanup() | |
| self.temp_dir = tempfile.TemporaryDirectory() | |
| logging.info(submit_response) | |
| if submit_response.correct: | |
| if submit_response.url: | |
| self.current_url = submit_response.url | |
| self.start_time= datetime.now() | |
| logging.info(f" ✅ Correct! Proceeding to new quiz: {self.current_url}") | |
| repeats=0 | |
| else: | |
| logging.info("Quiz sequence complete!") | |
| return "Success: Quiz sequence completed." | |
| else: | |
| logging.info("Incorrect") | |
| if submit_response.url: | |
| if self._get_time_remaining() <= 10: | |
| self.current_url = submit_response.url | |
| self.start_time= datetime.now() | |
| logging.info(f"Skipping to new quiz because no time to redo: {self.current_url}") | |
| repeats=0 | |
| else: | |
| repeats+=1 | |
| if repeats<=1: | |
| logging.info("Repeating question") | |
| continue | |
| else: | |
| if submit_response.url: | |
| self.current_url = submit_response.url | |
| self.start_time= datetime.now() | |
| logging.info(f"Skipping to new quiz after repeating: {self.current_url}") | |
| repeats=0 | |
| else: | |
| return "Done" | |
| else: | |
| repeats+=1 | |
| if repeats<=1: | |
| logging.info("Repeating question") | |
| continue | |
| else: | |
| if submit_response.url: | |
| self.current_url = submit_response.url | |
| self.start_time= datetime.now() | |
| logging.info(f"Skipping to new quiz after repeating: {self.current_url}") | |
| repeats=0 | |
| else: | |
| return "Done" | |
| # Else: Loop continues for re-submission on the same URL if time remains. | |
| # if self._get_time_remaining() <= 10: | |
| # return "Timeout: Did not complete quiz within 3 minutes." | |
| # return "Failed to complete quiz sequence." |