# app/solver.py import time import requests import tempfile import subprocess import httpx import sys import json import mimetypes import importlib.util import itertools import base64 import re import os import cv2 from bs4 import BeautifulSoup import pandas as pd from playwright.sync_api import sync_playwright from typing import Any, Dict, Optional from datetime import datetime, timedelta from urllib.parse import urlparse, urlunparse from models import QuizSubmitPayload, QuizSubmitResponse, QuizRequest from config import STUDENT_SECRET, LLM_API_KEY TIMEOUT_SECONDS = 180 import logging logging.basicConfig(filename="app.log", level=logging.INFO) console = logging.StreamHandler() # logs to stdout console.setLevel(logging.INFO) logging.getLogger().addHandler(console) class QuizSolver: def __init__(self, request_payload: QuizRequest, start_time: datetime): self.email = request_payload.email self.secret = request_payload.secret self.current_url = request_payload.url self.start_time = start_time # Ensure temporary directory exists self.temp_dir = tempfile.TemporaryDirectory() def _get_time_remaining(self) -> float: """Calculates the time remaining until the 3-minute deadline.""" elapsed = (datetime.now() - self.start_time).total_seconds() remaining = TIMEOUT_SECONDS - elapsed return max(0.0, remaining) def _llm_analyze_and_generate_code(self, quiz_content_text: str, link_data_str: str,p) -> Optional[Dict[str, Any]]: """ Uses an LLM to analyze the question and generate a structured JSON containing the necessary Python code. """ sample_data=[] processed_data={} if link_data_str: try: for i in link_data_str: ext=os.path.splitext(i)[1].lower() if ext == '.csv': with open(i, 'r', encoding='utf-8') as f: processed_data[i] = "".join(list(itertools.islice(f, 5))) sample_data.append(processed_data) elif ext == '.json': with open(i, 'r', encoding='utf-8') as f: data = json.load(f) if isinstance(data, dict): processed_data[i]=data.keys() else: processed_data[i]=data[:2] if len(data) > 0 else [] sample_data.append(processed_data) elif ext == '.txt': with open(i, 'r', encoding='utf-8') as f: processed_data[i] = f.read()[:50] sample_data.append(processed_data) elif ext == '.md': with open(i, 'r', encoding='utf-8') as f: processed_data[i] = f.read()[:50] sample_data.append(processed_data) elif ext in ['.xls', '.xlsx']: processed_data[i] = pd.read_excel(i, header=None).head(3) sample_data.append(processed_data) elif ext in ['.jpg', '.jpeg', '.png', '.gif', '.bmp']: processed_data[i] = cv2.imread(i) sample_data.append(processed_data) except: sample_data=[] logging.info("-> LLM: Analyzing question and generating code...") system_prompt=""" You are a quiz-solving assistant. Your task: Given the scraped text of a quiz page, you must extract: 1. The submission URL where the final answer must be Posted. 2. Accurate and executable Python code that computes ONLY the answer (a single value) and stores it in a variable that MUST be called answer. 3. All non-built in Python dependencies that the code requires as a list. Return your output ONLY as JSON with this exact structure: { "submit_url": "string", "python_code": ["line1", "line2", "..."], "dependencies": ["package1", "package2"], "explaination": "string" (Explain what you understood from the prompt) } Rules: - The submission URL ALWAYS appears in the text. It typically looks like: "Post your answer to https://....". - You MUST extract the submission URL. - DO NOT guess. Extract it literally from the text. - Do NOT include any explanation or commentary. - Only produce valid JSON output. - **Follow the instructions EXACTLY as GIVEN in the text or audio form to get the correct answer.** - If audio is present, trascribe it first and then proceed with the solution. - You are provided with a web scraping tool 'scrape_web_page'. If the question asks to scrape a url, you must use this tool. Python rules: - The python_code section MUST contain only the code needed to compute the answer and store it in a variable called "answer". - Ignore any submission JSON shown in the page. - DO NOT include code to send the submission. - The backend will run your code inside a Python subprocess. - Store the final answer in the variable named "answer". - **Ensure that the final answer is a Python-native type (int, float, str, bool), NOT a NumPy or pandas dtype. If the result is a NumPy scalar, call .item() to convert it.** - **ALWAYS ENSURE the variable "answer" is JSON-serialisable.** - If your code imports libraries other than the standard library modules, then those libraries MUST be listed in "dependencies". - DO NOT include standard library modules in "dependencies". - DO NOT include any comments in the python code because the code will not run and the answer cannot be computed. """ # --- CRITICAL LLM PROMPT --- user_prompt = f""" You are a highly skilled Question Solver. Your task is to analyze the quiz question provided below and generate the executable Python code to solve it. 1. **Quiz Content:** (The main question and submission text scraped from the website) --- {quiz_content_text} --- 2. **Files Available:** (A list of paths of available files) --- {link_data_str} --- 3. **Sample of the Files Available** to be used for understanding the structure of the data only. --- {sample_data} --- The code must adhere to these rules: 1. All dependencies that your generated code requires must appear in "dependencies" key of json output structure. 2. ONLY include dependencies that are actually imported. 3. If no extra libraries are required, return an empty list. 4. Do NOT send or execute the submission request. Only generate the code that computes the answer. 5. The final answer must be assigned to a variable named **'answer'**. 6. **Follow the instructions exactly as given in the text or audio form to get the correct answer.** 7. If the question asks you to scrape a url, use the scrape_web_page tool to retrieve the content of the url. You must also extract the submission url (link to which the answer should be sent to) from the question content. Respond ONLY with a single JSON object that strictly adheres to the schema below. JSON Schema: ```json {{ "submit_url": "The submission URL extracted from the content (e.g., [https://example.com/submit](https://example.com/submit)).", "python_code": [ "import ...", "# ... generated code to process data and set final_answer ...", "answer = ..." ], "dependencies":[...] (eg. ["pandas","numpy",...]), "explaination": Explain what you understood from the prompt }} ``` """ # --- END CRITICAL LLM PROMPT --- # if not llm_client: # logging.info("LLM client not initialized. Cannot generate code.") # return None try: # api_url='https://aipipe.org/openai/v1/chat/completions' headers={ "Content-Type": "application/json", "Authorization": f"Bearer{LLM_API_KEY}" } # data = { # "model": "gpt-5-mini", # "messages": [ # { # "role": "system", # "content": system_prompt # }, # { # "role": "user", # "content": user_prompt # } # ], # } # response=requests.post(api_url, headers=headers, json=data) api_url='https://aipipe.org/openrouter/v1/responses' content= [ { "type": "input_text", "text": user_prompt } ] files_payload = [] # uploading audio or image data audio_extensions = {".mp3", ".wav", ".m4a", ".opus", ".flac",".mp4"} # if link_data_str: # for f in link_data_str: # if os.path.splitext(f)[1].lower() in audio_extensions: # with open(f, "rb") as audio_file: # audio_bytes = audio_file.read() # audio_base64 = base64.b64encode(audio_bytes).decode("utf-8")[:200] # format=os.path.splitext(f)[1].lower()[1:] # content.append({ # "type": "input_audio", # "audio": { "file": base64.b64encode(open(f,"rb").read()).decode("utf-8")[:200]} # }) # mime_type, _ = mimetypes.guess_type(f) # files_payload.append( # ("audio", (f, open(f, "rb"), mime_type)) # ) # if os.path.splitext(f)[1].lower() =='.png': # content.append({ # "type": "input_image", # "audio": {"file": open(f, "rb")} # }) # files_payload.append(("image.png", (f, open(f, "rb"), "image/png"))) # web scraping helper function def scrape_web_page(url: str) -> str: if url.find("https://")==-1: parsed = urlparse(self.current_url) base_url = urlunparse((parsed.scheme, parsed.netloc, "", "", "", "")) url=base_url+url browser = p.chromium.launch(headless=True) page = browser.new_page() page.goto(url, wait_until="networkidle") html = page.content() soup = BeautifulSoup(html, "html.parser") for script in soup.find_all("script"): script.decompose() browser.close() return soup.get_text() data = { "model": "openai/gpt-5-mini", "input": [ { "role": "system", "content": [ { "type": "input_text", "text": system_prompt } ] }, { "role": "user", "content": content } ], "tools":[{ "type": "function", "name": "scrape_web_page", "description": "Fetches and extracts readable text from a given URL.", "parameters": { "type": "object", "properties": {"url": {"type": "string"} }, "required": ["url"] } }] } response=requests.post(api_url, headers=headers, json=data, files=files_payload) # llm_output=response.json()["choices"][0]["message"]["content"] tool_call = response.json()["output"][1] if tool_call["type"]=="function_call" and tool_call["name"] == "scrape_web_page": args = tool_call["arguments"] if isinstance(args, str): args = json.loads(args) url = args["url"] scraped = scrape_web_page(url) user_prompt+=f"Scraped content: {scraped}" # Send tool output back data={ "model": "openai/gpt-5-mini", "tools":[{ "type": "function", "name": "scrape_web_page", "description": "Fetches and extracts readable text from a given URL.", "parameters": { "type": "object", "properties": {"url": {"type": "string"} }, "required": ["url"] } }], "input": [ { "role": "system", "content": [ { "type": "input_text", "text": system_prompt } ] }, { "role": "user", "content": user_prompt } ], } response=requests.post(api_url, headers=headers, json=data) llm_output=response.json()["output"][1]["content"][0]["text"] cleaned = re.sub(r"^```json\s*|\s*```$", "", llm_output.strip(), flags=re.MULTILINE) llm_plan = json.loads(cleaned) logging.info(f"plan: ,{llm_plan}") # llm_output='''{ # "submit_url":"https://submit", # "python_code":["import pandas as pd","df=pd.DataFrame({'a':[1,2,3,4,5]})","answer=df.a.sum().item()"], # "dependencies":["pandas"] # }''' except Exception as e: logging.info(e) parsed = urlparse(self.current_url) base_url = urlunparse((parsed.scheme, parsed.netloc, "", "", "", "")) url=base_url+"/submit" ans=''' { "submit_url":url, "python_code":["answer=100"], "dependencies":[] } ''' return ans return llm_plan def _download_file(self,file_list): """ Downloads files one after another using requests. Parameters: file_list (list): List of dicts with {"text": ..., "href": ...} save_dir (str): Directory to save the downloaded files. Returns: List of file paths to the downloaded files. """ # Create folder if missing downloaded_paths = [] for item in file_list: name = item.get("text", "file").replace(" ", "_") url = item["href"] # Guess filename from URL filename = os.path.basename(url) if not filename: filename = f"{name}.html" # fallback save_path = os.path.join(self.temp_dir.name, filename) try: response = requests.get(url, timeout=20) response.raise_for_status() # Save file with open(save_path, "wb") as f: f.write(response.content) downloaded_paths.append(save_path) except Exception as e: continue return downloaded_paths def _execute_generated_code(self, code_lines: list,dependencies: list) -> Any: """ Safely executes the LLM-generated Python code in an isolated environment. """ logging.info("-> Executing generated code locally...") code_block = "\n".join(code_lines) if dependencies!=[]: copy_dep=dependencies.copy() for module_name in copy_dep: spec = importlib.util.find_spec(module_name) if spec.origin == "built-in": dependencies.remove(module_name) # built-in modules have origin 'built-in' if dependencies!=[]: try: to_install=[sys.executable, "-m", "pip", "install"] + dependencies result = subprocess.run( to_install, check=True, text=True, capture_output=True ) if result.returncode!=0: return 100 except: return 100 file_path = os.path.join(self.temp_dir.name, "script.py") with open(file_path, "w", encoding="utf-8") as f: f.write(code_block) script_path = f.name try: namespace = {} with open(file_path, "r", encoding="utf-8") as f: code = f.read() exec(code, namespace) # Step 3: extract the answer answer = namespace["answer"] return answer except: return 100 # (The _solve_and_submit method remains the same) def _solve_and_submit(self, submission_url: str, answer: Any) -> QuizSubmitResponse: # ... (implementation from previous response) # time_remaining = self._get_time_remaining() # if time_remaining <= 5: # Keep a 5s buffer # logging.info("Submission cancelled: Deadline is approaching.") # return QuizSubmitResponse(correct=False, reason="Deadline exceeded.") # submission_url="https://tds-llm-analysis.s-anand.net/submit" payload = QuizSubmitPayload( email=self.email, secret=self.secret, url=self.current_url, answer=answer ).model_dump() logging.info(f"Request to submit: {payload}") max=3 for i in range(1,7): try: headers = {"Content-Type": "application/json"} response = requests.post(submission_url, json=payload) response.raise_for_status() # Raise exception for bad status codes logging.info(response) submit_response = QuizSubmitResponse(**response.json()) logging.info(submit_response) return submit_response except requests.exceptions.RequestException as e: logging.info(f"Submission failed for {submission_url}: {e}") if i==3: submission_url="https://tds-llm-analysis.s-anand.net/submit" elif i==6: return QuizSubmitResponse(correct=False, reason=f"Submission failed: {e}") else: continue # (The run_quiz_loop method is updated to use the new LLM and execution steps) def run_quiz_loop(self) -> str: """ Main loop to solve the quiz and follow new URLs until completion or timeout. """ repeats=0 while self.current_url: with sync_playwright() as p: # Running headless to save resources browser = p.chromium.launch(headless=True) page = browser.new_page() logging.info(f"\n--- Solving Quiz: {self.current_url} ---") # 1. Visit URL and get content try: page.goto(self.current_url, wait_until="networkidle") # 1. Get entire rendered text content from the final_quiz_url = page.url html = page.content() soup = BeautifulSoup(html, "html.parser") # for script in soup.find_all("script"): # script.decompose() quiz_content_text = soup # 2. Get ALL links on the page for the LLM to analyze # We capture the text and the URL for every anchor tag () all_links = page.evaluate('''() => { const links = Array.from(document.querySelectorAll('a')); const fileExtensions = [ '.pdf', '.doc', '.docx', '.xls', '.xlsx', '.csv', '.zip', '.rar', '.tar', '.gz', '.jpg', '.jpeg', '.png', '.gif', '.svg', '.mp3', '.mp4', '.avi', '.mov', '.json', '.xml', '.txt', '.opus', '.flac' ]; return links.map(link => ({ text: link.innerText.trim(), href: link.href })) .filter(link => { const href = link.href.toLowerCase(); return fileExtensions.some(ext => href.endsWith(ext)); }); }''') audio_element = page.query_selector("audio") if audio_element: audio_src = audio_element.get_attribute("src") # safe # or absolute URL audio_src = page.evaluate("(el) => el.src", audio_element) all_links.append({'audio':'Audio file','href':audio_src}) canvas_element=page.query_selector("canvas") if canvas_element: data_url = page.evaluate(""" () => { const canvas = document.querySelector("canvas"); return canvas.toDataURL("image/png"); } """) # Strip off the prefix and decode header, encoded = data_url.split(",", 1) image_bytes = base64.b64decode(encoded) # Now you can send `image_bytes` or `encoded` directly to your LLM # Example: save locally if needed canvas_path = os.path.join(self.temp_dir.name, "image.png") with open(canvas_path, "wb") as f: f.write(image_bytes) # link_data_str = json.dumps(all_links) except Exception as e: logging.info(f"Navigation/Scraping failed: {e}") browser.close() # Download files if all_links !=[]: download= self._download_file(all_links) if canvas_element: download.append(canvas_path) else: download=None download=[] if canvas_element: download.append(canvas_path) # 2. LLM Analysis and Code Generation llm_plan = self._llm_analyze_and_generate_code(quiz_content_text,download,p) submission_url = llm_plan['submit_url'] if submission_url=='': submission_url="https://tds-llm-analysis.s-anand.net/submit" final_answer = self._execute_generated_code( llm_plan['python_code'],llm_plan['dependencies'] ) logging.info(f"Received result of python code: {final_answer}") # 5. Submit the Answer if submission_url.find("https://")==-1: parsed = urlparse(self.current_url) base_url = urlunparse((parsed.scheme, parsed.netloc, "", "", "", "")) submission_url=base_url+submission_url logging.info(f"Submission url used: {submission_url}") submit_response = self._solve_and_submit(submission_url, final_answer) self.temp_dir.cleanup() self.temp_dir = tempfile.TemporaryDirectory() logging.info(submit_response) if submit_response.correct: if submit_response.url: self.current_url = submit_response.url self.start_time= datetime.now() logging.info(f" ✅ Correct! Proceeding to new quiz: {self.current_url}") repeats=0 else: logging.info("Quiz sequence complete!") return "Success: Quiz sequence completed." else: logging.info("Incorrect") if submit_response.url: if self._get_time_remaining() <= 10: self.current_url = submit_response.url self.start_time= datetime.now() logging.info(f"Skipping to new quiz because no time to redo: {self.current_url}") repeats=0 else: repeats+=1 if repeats<=1: logging.info("Repeating question") continue else: if submit_response.url: self.current_url = submit_response.url self.start_time= datetime.now() logging.info(f"Skipping to new quiz after repeating: {self.current_url}") repeats=0 else: return "Done" else: repeats+=1 if repeats<=1: logging.info("Repeating question") continue else: if submit_response.url: self.current_url = submit_response.url self.start_time= datetime.now() logging.info(f"Skipping to new quiz after repeating: {self.current_url}") repeats=0 else: return "Done" # Else: Loop continues for re-submission on the same URL if time remains. # if self._get_time_remaining() <= 10: # return "Timeout: Did not complete quiz within 3 minutes." # return "Failed to complete quiz sequence."