| |
|
|
| import time |
| import requests |
| import tempfile |
| import subprocess |
| import httpx |
| import sys |
| import json |
| import mimetypes |
| import importlib.util |
| import itertools |
| import base64 |
| import re |
| import os |
| import cv2 |
| from bs4 import BeautifulSoup |
| import pandas as pd |
| from playwright.sync_api import sync_playwright |
| from typing import Any, Dict, Optional |
| from datetime import datetime, timedelta |
| from urllib.parse import urlparse, urlunparse |
|
|
|
|
| from models import QuizSubmitPayload, QuizSubmitResponse, QuizRequest |
| from config import STUDENT_SECRET, LLM_API_KEY |
|
|
| TIMEOUT_SECONDS = 180 |
|
|
| import logging |
|
|
| logging.basicConfig(filename="app.log", level=logging.INFO) |
|
|
| console = logging.StreamHandler() |
| console.setLevel(logging.INFO) |
| logging.getLogger().addHandler(console) |
|
|
|
|
|
|
| class QuizSolver: |
| def __init__(self, request_payload: QuizRequest, start_time: datetime): |
| self.email = request_payload.email |
| self.secret = request_payload.secret |
| self.current_url = request_payload.url |
| self.start_time = start_time |
| |
| self.temp_dir = tempfile.TemporaryDirectory() |
|
|
| def _get_time_remaining(self) -> float: |
| """Calculates the time remaining until the 3-minute deadline.""" |
| elapsed = (datetime.now() - self.start_time).total_seconds() |
| remaining = TIMEOUT_SECONDS - elapsed |
| return max(0.0, remaining) |
|
|
| def _llm_analyze_and_generate_code(self, quiz_content_text: str, link_data_str: str,p) -> Optional[Dict[str, Any]]: |
| """ |
| Uses an LLM to analyze the question and generate a structured JSON |
| containing the necessary Python code. |
| """ |
|
|
| sample_data=[] |
| processed_data={} |
|
|
| if link_data_str: |
|
|
| try: |
|
|
| for i in link_data_str: |
| ext=os.path.splitext(i)[1].lower() |
| |
| if ext == '.csv': |
| with open(i, 'r', encoding='utf-8') as f: |
| processed_data[i] = "".join(list(itertools.islice(f, 5))) |
| sample_data.append(processed_data) |
| |
| |
| elif ext == '.json': |
| with open(i, 'r', encoding='utf-8') as f: |
| data = json.load(f) |
| if isinstance(data, dict): |
| processed_data[i]=data.keys() |
| else: |
| processed_data[i]=data[:2] if len(data) > 0 else [] |
| sample_data.append(processed_data) |
| |
| |
| elif ext == '.txt': |
| with open(i, 'r', encoding='utf-8') as f: |
| processed_data[i] = f.read()[:50] |
| sample_data.append(processed_data) |
| |
| elif ext == '.md': |
| with open(i, 'r', encoding='utf-8') as f: |
| processed_data[i] = f.read()[:50] |
| sample_data.append(processed_data) |
| |
| |
| elif ext in ['.xls', '.xlsx']: |
| processed_data[i] = pd.read_excel(i, header=None).head(3) |
| sample_data.append(processed_data) |
|
|
| elif ext in ['.jpg', '.jpeg', '.png', '.gif', '.bmp']: |
| |
| processed_data[i] = cv2.imread(i) |
| sample_data.append(processed_data) |
| |
|
|
| except: |
| sample_data=[] |
|
|
|
|
|
|
| logging.info("-> LLM: Analyzing question and generating code...") |
| |
| system_prompt=""" |
| You are a quiz-solving assistant. |
| |
| Your task: Given the scraped text of a quiz page, you must extract: |
| 1. The submission URL where the final answer must be Posted. |
| 2. Accurate and executable Python code that computes ONLY the answer (a single value) and stores it in a variable that MUST be called answer. |
| 3. All non-built in Python dependencies that the code requires as a list. |
| |
| Return your output ONLY as JSON with this exact structure: |
| |
| { |
| "submit_url": "string", |
| "python_code": ["line1", "line2", "..."], |
| "dependencies": ["package1", "package2"], |
| "explaination": "string" (Explain what you understood from the prompt) |
| } |
| |
| Rules: |
| - The submission URL ALWAYS appears in the text. It typically looks like: "Post your answer to https://....". |
| - You MUST extract the submission URL. |
| - DO NOT guess. Extract it literally from the text. |
| - Do NOT include any explanation or commentary. |
| - Only produce valid JSON output. |
| - **Follow the instructions EXACTLY as GIVEN in the text or audio form to get the correct answer.** |
| - If audio is present, trascribe it first and then proceed with the solution. |
| - You are provided with a web scraping tool 'scrape_web_page'. If the question asks to scrape a url, you must use this tool. |
| |
| |
| |
| Python rules: |
| - The python_code section MUST contain only the code needed to compute the answer and store it in a variable called "answer". |
| - Ignore any submission JSON shown in the page. |
| - DO NOT include code to send the submission. |
| - The backend will run your code inside a Python subprocess. |
| - Store the final answer in the variable named "answer". |
| - **Ensure that the final answer is a Python-native type (int, float, str, bool), NOT a NumPy or pandas dtype. If the result is a NumPy scalar, call .item() to convert it.** |
| - **ALWAYS ENSURE the variable "answer" is JSON-serialisable.** |
| - If your code imports libraries other than the standard library modules, then those libraries MUST be listed in "dependencies". |
| - DO NOT include standard library modules in "dependencies". |
| - DO NOT include any comments in the python code because the code will not run and the answer cannot be computed. |
| |
| |
| |
| """ |
| |
| |
| user_prompt = f""" |
| You are a highly skilled Question Solver. Your task is to analyze the quiz question |
| provided below and generate the executable Python code to solve it. |
| |
| 1. **Quiz Content:** (The main question and submission text scraped from the website) |
| --- |
| {quiz_content_text} |
| --- |
| |
| 2. **Files Available:** (A list of paths of available files) |
| --- |
| {link_data_str} |
| --- |
| 3. **Sample of the Files Available** to be used for understanding the structure of the data only. |
| --- |
| {sample_data} |
| --- |
| |
| The code must adhere to these rules: |
| 1. All dependencies that your generated code requires must appear in "dependencies" key of json output structure. |
| 2. ONLY include dependencies that are actually imported. |
| 3. If no extra libraries are required, return an empty list. |
| 4. Do NOT send or execute the submission request. Only generate the code that computes the answer. |
| 5. The final answer must be assigned to a variable named **'answer'**. |
| 6. **Follow the instructions exactly as given in the text or audio form to get the correct answer.** |
| 7. If the question asks you to scrape a url, use the scrape_web_page tool to retrieve the content of the url. |
| |
| You must also extract the submission url (link to which the answer should be sent to) from the question content. |
| |
| Respond ONLY with a single JSON object that strictly adheres to the schema below. |
| |
| JSON Schema: |
| ```json |
| {{ |
| "submit_url": "The submission URL extracted from the content (e.g., [https://example.com/submit](https://example.com/submit)).", |
| "python_code": [ |
| "import ...", |
| "# ... generated code to process data and set final_answer ...", |
| "answer = ..." |
| ], |
| "dependencies":[...] (eg. ["pandas","numpy",...]), |
| "explaination": Explain what you understood from the prompt |
| }} |
| ``` |
| """ |
|
|
| |
|
|
| |
|
|
| |
| |
| |
| |
| try: |
| |
| |
|
|
| headers={ |
| "Content-Type": "application/json", |
| "Authorization": f"Bearer{LLM_API_KEY}" |
| } |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| api_url='https://aipipe.org/openrouter/v1/responses' |
|
|
| content= [ |
| { |
| "type": "input_text", |
| "text": user_prompt |
| } |
| ] |
|
|
|
|
| files_payload = [] |
|
|
| |
| audio_extensions = {".mp3", ".wav", ".m4a", ".opus", ".flac",".mp4"} |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| |
| def scrape_web_page(url: str) -> str: |
| if url.find("https://")==-1: |
| |
| parsed = urlparse(self.current_url) |
| base_url = urlunparse((parsed.scheme, parsed.netloc, "", "", "", "")) |
| url=base_url+url |
| |
| browser = p.chromium.launch(headless=True) |
| page = browser.new_page() |
| page.goto(url, wait_until="networkidle") |
| html = page.content() |
| soup = BeautifulSoup(html, "html.parser") |
| for script in soup.find_all("script"): |
| script.decompose() |
| browser.close() |
| return soup.get_text() |
|
|
| |
|
|
|
|
|
|
| data = { |
| "model": "openai/gpt-5-mini", |
| "input": [ |
| { |
| "role": "system", |
| "content": [ |
| { |
| "type": "input_text", |
| "text": system_prompt |
| } |
| ] |
| }, |
| { |
| "role": "user", |
| "content": content |
| } |
| ], |
| "tools":[{ |
| "type": "function", |
| "name": "scrape_web_page", |
| "description": "Fetches and extracts readable text from a given URL.", |
| "parameters": { |
| "type": "object", |
| "properties": |
| {"url": |
| {"type": "string"} |
| }, |
| "required": ["url"] |
| } |
| }] |
| } |
|
|
| |
|
|
| response=requests.post(api_url, headers=headers, json=data, files=files_payload) |
|
|
| |
| |
|
|
| |
|
|
|
|
| tool_call = response.json()["output"][1] |
| if tool_call["type"]=="function_call" and tool_call["name"] == "scrape_web_page": |
| args = tool_call["arguments"] |
| if isinstance(args, str): |
| args = json.loads(args) |
| url = args["url"] |
| scraped = scrape_web_page(url) |
| |
|
|
| user_prompt+=f"Scraped content: {scraped}" |
|
|
| |
| data={ |
| "model": "openai/gpt-5-mini", |
| "tools":[{ |
| "type": "function", |
| "name": "scrape_web_page", |
| "description": "Fetches and extracts readable text from a given URL.", |
| "parameters": { |
| "type": "object", |
| "properties": |
| {"url": |
| {"type": "string"} |
| }, |
| "required": ["url"] |
| } |
| }], |
| "input": [ |
| { |
| "role": "system", |
| "content": [ |
| { |
| "type": "input_text", |
| "text": system_prompt |
| } |
| ] |
| }, |
| { |
| "role": "user", |
| "content": user_prompt |
| } |
| ], |
| } |
|
|
| |
| response=requests.post(api_url, headers=headers, json=data) |
| |
|
|
| llm_output=response.json()["output"][1]["content"][0]["text"] |
|
|
|
|
| cleaned = re.sub(r"^```json\s*|\s*```$", "", llm_output.strip(), flags=re.MULTILINE) |
|
|
| llm_plan = json.loads(cleaned) |
|
|
| logging.info(f"plan: ,{llm_plan}") |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| except Exception as e: |
| logging.info(e) |
| parsed = urlparse(self.current_url) |
| base_url = urlunparse((parsed.scheme, parsed.netloc, "", "", "", "")) |
| url=base_url+"/submit" |
| ans=''' |
| { |
| "submit_url":url, |
| "python_code":["answer=100"], |
| "dependencies":[] |
| } |
| ''' |
| return ans |
| |
| return llm_plan |
|
|
| def _download_file(self,file_list): |
| """ |
| Downloads files one after another using requests. |
| |
| Parameters: |
| file_list (list): List of dicts with {"text": ..., "href": ...} |
| save_dir (str): Directory to save the downloaded files. |
| |
| Returns: |
| List of file paths to the downloaded files. |
| """ |
| |
| |
| |
|
|
| downloaded_paths = [] |
|
|
| for item in file_list: |
| name = item.get("text", "file").replace(" ", "_") |
| url = item["href"] |
|
|
| |
| filename = os.path.basename(url) |
| if not filename: |
| filename = f"{name}.html" |
| |
| save_path = os.path.join(self.temp_dir.name, filename) |
|
|
| try: |
| |
| response = requests.get(url, timeout=20) |
| response.raise_for_status() |
|
|
| |
| with open(save_path, "wb") as f: |
| f.write(response.content) |
|
|
| downloaded_paths.append(save_path) |
|
|
| except Exception as e: |
| continue |
| |
| return downloaded_paths |
|
|
| def _execute_generated_code(self, code_lines: list,dependencies: list) -> Any: |
| """ |
| Safely executes the LLM-generated Python code in an isolated environment. |
| """ |
| logging.info("-> Executing generated code locally...") |
| |
| code_block = "\n".join(code_lines) |
|
|
| if dependencies!=[]: |
| copy_dep=dependencies.copy() |
| for module_name in copy_dep: |
| spec = importlib.util.find_spec(module_name) |
| if spec.origin == "built-in": |
| dependencies.remove(module_name) |
| |
| |
|
|
|
|
| if dependencies!=[]: |
|
|
| try: |
| to_install=[sys.executable, "-m", "pip", "install"] + dependencies |
| |
| |
| |
| result = subprocess.run( |
| to_install, |
| check=True, |
| text=True, |
| capture_output=True |
| ) |
| |
| if result.returncode!=0: |
| return 100 |
| except: |
| return 100 |
|
|
| file_path = os.path.join(self.temp_dir.name, "script.py") |
|
|
| |
|
|
| with open(file_path, "w", encoding="utf-8") as f: |
| f.write(code_block) |
| script_path = f.name |
|
|
| |
| try: |
| namespace = {} |
| with open(file_path, "r", encoding="utf-8") as f: |
| code = f.read() |
|
|
| exec(code, namespace) |
|
|
| |
| answer = namespace["answer"] |
| |
| return answer |
|
|
| except: |
| return 100 |
|
|
| |
| def _solve_and_submit(self, submission_url: str, answer: Any) -> QuizSubmitResponse: |
| |
| |
| |
| |
| |
|
|
| |
|
|
| payload = QuizSubmitPayload( |
| email=self.email, |
| secret=self.secret, |
| url=self.current_url, |
| answer=answer |
| ).model_dump() |
|
|
|
|
| logging.info(f"Request to submit: {payload}") |
| max=3 |
| for i in range(1,7): |
| try: |
| headers = {"Content-Type": "application/json"} |
| response = requests.post(submission_url, json=payload) |
| response.raise_for_status() |
| |
| logging.info(response) |
| |
| submit_response = QuizSubmitResponse(**response.json()) |
| logging.info(submit_response) |
| return submit_response |
| except requests.exceptions.RequestException as e: |
| logging.info(f"Submission failed for {submission_url}: {e}") |
| if i==3: |
| submission_url="https://tds-llm-analysis.s-anand.net/submit" |
| elif i==6: |
| return QuizSubmitResponse(correct=False, reason=f"Submission failed: {e}") |
| else: |
| continue |
| |
| |
|
|
| |
| def run_quiz_loop(self) -> str: |
| """ |
| Main loop to solve the quiz and follow new URLs until completion or timeout. |
| """ |
| repeats=0 |
|
|
| while self.current_url: |
|
|
| with sync_playwright() as p: |
| |
| browser = p.chromium.launch(headless=True) |
| page = browser.new_page() |
|
|
| |
| logging.info(f"\n--- Solving Quiz: {self.current_url} ---") |
| |
| |
| try: |
| page.goto(self.current_url, wait_until="networkidle") |
| |
| |
| final_quiz_url = page.url |
| html = page.content() |
| soup = BeautifulSoup(html, "html.parser") |
| |
| |
| quiz_content_text = soup |
| |
| |
| |
| |
| all_links = page.evaluate('''() => { |
| const links = Array.from(document.querySelectorAll('a')); |
| const fileExtensions = [ |
| '.pdf', '.doc', '.docx', '.xls', '.xlsx', '.csv', '.zip', '.rar', |
| '.tar', '.gz', '.jpg', '.jpeg', '.png', '.gif', '.svg', '.mp3', |
| '.mp4', '.avi', '.mov', '.json', '.xml', '.txt', '.opus', '.flac' |
| ]; |
| return links.map(link => ({ |
| text: link.innerText.trim(), |
| href: link.href |
| })) |
| .filter(link => { |
| const href = link.href.toLowerCase(); |
| return fileExtensions.some(ext => href.endsWith(ext)); |
| }); |
| }''') |
|
|
|
|
| audio_element = page.query_selector("audio") |
| if audio_element: |
| audio_src = audio_element.get_attribute("src") |
| |
| audio_src = page.evaluate("(el) => el.src", audio_element) |
| all_links.append({'audio':'Audio file','href':audio_src}) |
|
|
|
|
| canvas_element=page.query_selector("canvas") |
| if canvas_element: |
| data_url = page.evaluate(""" |
| () => { |
| const canvas = document.querySelector("canvas"); |
| return canvas.toDataURL("image/png"); |
| } |
| """) |
|
|
| |
| header, encoded = data_url.split(",", 1) |
| image_bytes = base64.b64decode(encoded) |
|
|
| |
| |
| canvas_path = os.path.join(self.temp_dir.name, "image.png") |
|
|
| with open(canvas_path, "wb") as f: |
| f.write(image_bytes) |
|
|
|
|
|
|
| |
| |
|
|
|
|
| except Exception as e: |
| logging.info(f"Navigation/Scraping failed: {e}") |
|
|
| browser.close() |
| |
|
|
| |
|
|
| |
| |
| if all_links !=[]: |
| |
| download= self._download_file(all_links) |
| if canvas_element: |
| download.append(canvas_path) |
| else: |
| download=None |
| download=[] |
| if canvas_element: |
| download.append(canvas_path) |
|
|
| |
|
|
| |
|
|
| |
| llm_plan = self._llm_analyze_and_generate_code(quiz_content_text,download,p) |
| |
| |
| submission_url = llm_plan['submit_url'] |
| if submission_url=='': |
| submission_url="https://tds-llm-analysis.s-anand.net/submit" |
|
|
|
|
| |
| |
| final_answer = self._execute_generated_code( |
| llm_plan['python_code'],llm_plan['dependencies'] |
| ) |
|
|
| |
| |
| |
| |
| logging.info(f"Received result of python code: {final_answer}") |
|
|
| |
|
|
| |
|
|
| if submission_url.find("https://")==-1: |
| parsed = urlparse(self.current_url) |
| base_url = urlunparse((parsed.scheme, parsed.netloc, "", "", "", "")) |
| submission_url=base_url+submission_url |
|
|
| logging.info(f"Submission url used: {submission_url}") |
|
|
| submit_response = self._solve_and_submit(submission_url, final_answer) |
|
|
| |
|
|
| self.temp_dir.cleanup() |
|
|
| self.temp_dir = tempfile.TemporaryDirectory() |
|
|
| logging.info(submit_response) |
|
|
| if submit_response.correct: |
| if submit_response.url: |
| self.current_url = submit_response.url |
| self.start_time= datetime.now() |
| logging.info(f" ✅ Correct! Proceeding to new quiz: {self.current_url}") |
| |
| repeats=0 |
| |
| else: |
| logging.info("Quiz sequence complete!") |
| |
| return "Success: Quiz sequence completed." |
| else: |
| |
| logging.info("Incorrect") |
| if submit_response.url: |
| if self._get_time_remaining() <= 10: |
| self.current_url = submit_response.url |
| self.start_time= datetime.now() |
| logging.info(f"Skipping to new quiz because no time to redo: {self.current_url}") |
| |
| repeats=0 |
| else: |
| repeats+=1 |
| if repeats<=1: |
| logging.info("Repeating question") |
| continue |
| else: |
| if submit_response.url: |
| self.current_url = submit_response.url |
| self.start_time= datetime.now() |
| logging.info(f"Skipping to new quiz after repeating: {self.current_url}") |
| |
| repeats=0 |
| else: |
| return "Done" |
|
|
| |
| else: |
| repeats+=1 |
| if repeats<=1: |
| logging.info("Repeating question") |
| continue |
| else: |
| if submit_response.url: |
| self.current_url = submit_response.url |
| self.start_time= datetime.now() |
| logging.info(f"Skipping to new quiz after repeating: {self.current_url}") |
| |
| repeats=0 |
| else: |
| return "Done" |
| |
|
|
| |
|
|
| |
| |
| |
| |
| |
| |