TDS-P2-Alt / solver.py
maya-ur's picture
Create solver.py
d8b8c22 verified
# app/solver.py
import time
import requests
import tempfile
import subprocess
import httpx
import sys
import json
import mimetypes
import importlib.util
import itertools
import base64
import re
import os
import cv2
from bs4 import BeautifulSoup
import pandas as pd
from playwright.sync_api import sync_playwright
from typing import Any, Dict, Optional
from datetime import datetime, timedelta
from urllib.parse import urlparse, urlunparse
from models import QuizSubmitPayload, QuizSubmitResponse, QuizRequest
from config import STUDENT_SECRET, LLM_API_KEY
TIMEOUT_SECONDS = 180
import logging
logging.basicConfig(filename="app.log", level=logging.INFO)
console = logging.StreamHandler() # logs to stdout
console.setLevel(logging.INFO)
logging.getLogger().addHandler(console)
class QuizSolver:
def __init__(self, request_payload: QuizRequest, start_time: datetime):
self.email = request_payload.email
self.secret = request_payload.secret
self.current_url = request_payload.url
self.start_time = start_time
# Ensure temporary directory exists
self.temp_dir = tempfile.TemporaryDirectory()
def _get_time_remaining(self) -> float:
"""Calculates the time remaining until the 3-minute deadline."""
elapsed = (datetime.now() - self.start_time).total_seconds()
remaining = TIMEOUT_SECONDS - elapsed
return max(0.0, remaining)
def _llm_analyze_and_generate_code(self, quiz_content_text: str, link_data_str: str,p) -> Optional[Dict[str, Any]]:
"""
Uses an LLM to analyze the question and generate a structured JSON
containing the necessary Python code.
"""
sample_data=[]
processed_data={}
if link_data_str:
try:
for i in link_data_str:
ext=os.path.splitext(i)[1].lower()
if ext == '.csv':
with open(i, 'r', encoding='utf-8') as f:
processed_data[i] = "".join(list(itertools.islice(f, 5)))
sample_data.append(processed_data)
elif ext == '.json':
with open(i, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, dict):
processed_data[i]=data.keys()
else:
processed_data[i]=data[:2] if len(data) > 0 else []
sample_data.append(processed_data)
elif ext == '.txt':
with open(i, 'r', encoding='utf-8') as f:
processed_data[i] = f.read()[:50]
sample_data.append(processed_data)
elif ext == '.md':
with open(i, 'r', encoding='utf-8') as f:
processed_data[i] = f.read()[:50]
sample_data.append(processed_data)
elif ext in ['.xls', '.xlsx']:
processed_data[i] = pd.read_excel(i, header=None).head(3)
sample_data.append(processed_data)
elif ext in ['.jpg', '.jpeg', '.png', '.gif', '.bmp']:
processed_data[i] = cv2.imread(i)
sample_data.append(processed_data)
except:
sample_data=[]
logging.info("-> LLM: Analyzing question and generating code...")
system_prompt="""
You are a quiz-solving assistant.
Your task: Given the scraped text of a quiz page, you must extract:
1. The submission URL where the final answer must be Posted.
2. Accurate and executable Python code that computes ONLY the answer (a single value) and stores it in a variable that MUST be called answer.
3. All non-built in Python dependencies that the code requires as a list.
Return your output ONLY as JSON with this exact structure:
{
"submit_url": "string",
"python_code": ["line1", "line2", "..."],
"dependencies": ["package1", "package2"],
"explaination": "string" (Explain what you understood from the prompt)
}
Rules:
- The submission URL ALWAYS appears in the text. It typically looks like: "Post your answer to https://....".
- You MUST extract the submission URL.
- DO NOT guess. Extract it literally from the text.
- Do NOT include any explanation or commentary.
- Only produce valid JSON output.
- **Follow the instructions EXACTLY as GIVEN in the text or audio form to get the correct answer.**
- If audio is present, trascribe it first and then proceed with the solution.
- You are provided with a web scraping tool 'scrape_web_page'. If the question asks to scrape a url, you must use this tool.
Python rules:
- The python_code section MUST contain only the code needed to compute the answer and store it in a variable called "answer".
- Ignore any submission JSON shown in the page.
- DO NOT include code to send the submission.
- The backend will run your code inside a Python subprocess.
- Store the final answer in the variable named "answer".
- **Ensure that the final answer is a Python-native type (int, float, str, bool), NOT a NumPy or pandas dtype. If the result is a NumPy scalar, call .item() to convert it.**
- **ALWAYS ENSURE the variable "answer" is JSON-serialisable.**
- If your code imports libraries other than the standard library modules, then those libraries MUST be listed in "dependencies".
- DO NOT include standard library modules in "dependencies".
- DO NOT include any comments in the python code because the code will not run and the answer cannot be computed.
"""
# --- CRITICAL LLM PROMPT ---
user_prompt = f"""
You are a highly skilled Question Solver. Your task is to analyze the quiz question
provided below and generate the executable Python code to solve it.
1. **Quiz Content:** (The main question and submission text scraped from the website)
---
{quiz_content_text}
---
2. **Files Available:** (A list of paths of available files)
---
{link_data_str}
---
3. **Sample of the Files Available** to be used for understanding the structure of the data only.
---
{sample_data}
---
The code must adhere to these rules:
1. All dependencies that your generated code requires must appear in "dependencies" key of json output structure.
2. ONLY include dependencies that are actually imported.
3. If no extra libraries are required, return an empty list.
4. Do NOT send or execute the submission request. Only generate the code that computes the answer.
5. The final answer must be assigned to a variable named **'answer'**.
6. **Follow the instructions exactly as given in the text or audio form to get the correct answer.**
7. If the question asks you to scrape a url, use the scrape_web_page tool to retrieve the content of the url.
You must also extract the submission url (link to which the answer should be sent to) from the question content.
Respond ONLY with a single JSON object that strictly adheres to the schema below.
JSON Schema:
```json
{{
"submit_url": "The submission URL extracted from the content (e.g., [https://example.com/submit](https://example.com/submit)).",
"python_code": [
"import ...",
"# ... generated code to process data and set final_answer ...",
"answer = ..."
],
"dependencies":[...] (eg. ["pandas","numpy",...]),
"explaination": Explain what you understood from the prompt
}}
```
"""
# --- END CRITICAL LLM PROMPT ---
# if not llm_client:
# logging.info("LLM client not initialized. Cannot generate code.")
# return None
try:
# api_url='https://aipipe.org/openai/v1/chat/completions'
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer{LLM_API_KEY}"
}
# data = {
# "model": "gpt-5-mini",
# "messages": [
# {
# "role": "system",
# "content": system_prompt
# },
# {
# "role": "user",
# "content": user_prompt
# }
# ],
# }
# response=requests.post(api_url, headers=headers, json=data)
api_url='https://aipipe.org/openrouter/v1/responses'
content= [
{
"type": "input_text",
"text": user_prompt
}
]
files_payload = []
# uploading audio or image data
audio_extensions = {".mp3", ".wav", ".m4a", ".opus", ".flac",".mp4"}
# if link_data_str:
# for f in link_data_str:
# if os.path.splitext(f)[1].lower() in audio_extensions:
# with open(f, "rb") as audio_file:
# audio_bytes = audio_file.read()
# audio_base64 = base64.b64encode(audio_bytes).decode("utf-8")[:200]
# format=os.path.splitext(f)[1].lower()[1:]
# content.append({
# "type": "input_audio",
# "audio": { "file": base64.b64encode(open(f,"rb").read()).decode("utf-8")[:200]}
# })
# mime_type, _ = mimetypes.guess_type(f)
# files_payload.append(
# ("audio", (f, open(f, "rb"), mime_type))
# )
# if os.path.splitext(f)[1].lower() =='.png':
# content.append({
# "type": "input_image",
# "audio": {"file": open(f, "rb")}
# })
# files_payload.append(("image.png", (f, open(f, "rb"), "image/png")))
# web scraping helper function
def scrape_web_page(url: str) -> str:
if url.find("https://")==-1:
parsed = urlparse(self.current_url)
base_url = urlunparse((parsed.scheme, parsed.netloc, "", "", "", ""))
url=base_url+url
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto(url, wait_until="networkidle")
html = page.content()
soup = BeautifulSoup(html, "html.parser")
for script in soup.find_all("script"):
script.decompose()
browser.close()
return soup.get_text()
data = {
"model": "openai/gpt-5-mini",
"input": [
{
"role": "system",
"content": [
{
"type": "input_text",
"text": system_prompt
}
]
},
{
"role": "user",
"content": content
}
],
"tools":[{
"type": "function",
"name": "scrape_web_page",
"description": "Fetches and extracts readable text from a given URL.",
"parameters": {
"type": "object",
"properties":
{"url":
{"type": "string"}
},
"required": ["url"]
}
}]
}
response=requests.post(api_url, headers=headers, json=data, files=files_payload)
# llm_output=response.json()["choices"][0]["message"]["content"]
tool_call = response.json()["output"][1]
if tool_call["type"]=="function_call" and tool_call["name"] == "scrape_web_page":
args = tool_call["arguments"]
if isinstance(args, str):
args = json.loads(args)
url = args["url"]
scraped = scrape_web_page(url)
user_prompt+=f"Scraped content: {scraped}"
# Send tool output back
data={
"model": "openai/gpt-5-mini",
"tools":[{
"type": "function",
"name": "scrape_web_page",
"description": "Fetches and extracts readable text from a given URL.",
"parameters": {
"type": "object",
"properties":
{"url":
{"type": "string"}
},
"required": ["url"]
}
}],
"input": [
{
"role": "system",
"content": [
{
"type": "input_text",
"text": system_prompt
}
]
},
{
"role": "user",
"content": user_prompt
}
],
}
response=requests.post(api_url, headers=headers, json=data)
llm_output=response.json()["output"][1]["content"][0]["text"]
cleaned = re.sub(r"^```json\s*|\s*```$", "", llm_output.strip(), flags=re.MULTILINE)
llm_plan = json.loads(cleaned)
logging.info(f"plan: ,{llm_plan}")
# llm_output='''{
# "submit_url":"https://submit",
# "python_code":["import pandas as pd","df=pd.DataFrame({'a':[1,2,3,4,5]})","answer=df.a.sum().item()"],
# "dependencies":["pandas"]
# }'''
except Exception as e:
logging.info(e)
parsed = urlparse(self.current_url)
base_url = urlunparse((parsed.scheme, parsed.netloc, "", "", "", ""))
url=base_url+"/submit"
ans='''
{
"submit_url":url,
"python_code":["answer=100"],
"dependencies":[]
}
'''
return ans
return llm_plan
def _download_file(self,file_list):
"""
Downloads files one after another using requests.
Parameters:
file_list (list): List of dicts with {"text": ..., "href": ...}
save_dir (str): Directory to save the downloaded files.
Returns:
List of file paths to the downloaded files.
"""
# Create folder if missing
downloaded_paths = []
for item in file_list:
name = item.get("text", "file").replace(" ", "_")
url = item["href"]
# Guess filename from URL
filename = os.path.basename(url)
if not filename:
filename = f"{name}.html" # fallback
save_path = os.path.join(self.temp_dir.name, filename)
try:
response = requests.get(url, timeout=20)
response.raise_for_status()
# Save file
with open(save_path, "wb") as f:
f.write(response.content)
downloaded_paths.append(save_path)
except Exception as e:
continue
return downloaded_paths
def _execute_generated_code(self, code_lines: list,dependencies: list) -> Any:
"""
Safely executes the LLM-generated Python code in an isolated environment.
"""
logging.info("-> Executing generated code locally...")
code_block = "\n".join(code_lines)
if dependencies!=[]:
copy_dep=dependencies.copy()
for module_name in copy_dep:
spec = importlib.util.find_spec(module_name)
if spec.origin == "built-in":
dependencies.remove(module_name)
# built-in modules have origin 'built-in'
if dependencies!=[]:
try:
to_install=[sys.executable, "-m", "pip", "install"] + dependencies
result = subprocess.run(
to_install,
check=True,
text=True,
capture_output=True
)
if result.returncode!=0:
return 100
except:
return 100
file_path = os.path.join(self.temp_dir.name, "script.py")
with open(file_path, "w", encoding="utf-8") as f:
f.write(code_block)
script_path = f.name
try:
namespace = {}
with open(file_path, "r", encoding="utf-8") as f:
code = f.read()
exec(code, namespace)
# Step 3: extract the answer
answer = namespace["answer"]
return answer
except:
return 100
# (The _solve_and_submit method remains the same)
def _solve_and_submit(self, submission_url: str, answer: Any) -> QuizSubmitResponse:
# ... (implementation from previous response)
# time_remaining = self._get_time_remaining()
# if time_remaining <= 5: # Keep a 5s buffer
# logging.info("Submission cancelled: Deadline is approaching.")
# return QuizSubmitResponse(correct=False, reason="Deadline exceeded.")
# submission_url="https://tds-llm-analysis.s-anand.net/submit"
payload = QuizSubmitPayload(
email=self.email,
secret=self.secret,
url=self.current_url,
answer=answer
).model_dump()
logging.info(f"Request to submit: {payload}")
max=3
for i in range(1,7):
try:
headers = {"Content-Type": "application/json"}
response = requests.post(submission_url, json=payload)
response.raise_for_status() # Raise exception for bad status codes
logging.info(response)
submit_response = QuizSubmitResponse(**response.json())
logging.info(submit_response)
return submit_response
except requests.exceptions.RequestException as e:
logging.info(f"Submission failed for {submission_url}: {e}")
if i==3:
submission_url="https://tds-llm-analysis.s-anand.net/submit"
elif i==6:
return QuizSubmitResponse(correct=False, reason=f"Submission failed: {e}")
else:
continue
# (The run_quiz_loop method is updated to use the new LLM and execution steps)
def run_quiz_loop(self) -> str:
"""
Main loop to solve the quiz and follow new URLs until completion or timeout.
"""
repeats=0
while self.current_url:
with sync_playwright() as p:
# Running headless to save resources
browser = p.chromium.launch(headless=True)
page = browser.new_page()
logging.info(f"\n--- Solving Quiz: {self.current_url} ---")
# 1. Visit URL and get content
try:
page.goto(self.current_url, wait_until="networkidle")
# 1. Get entire rendered text content from the <body>
final_quiz_url = page.url
html = page.content()
soup = BeautifulSoup(html, "html.parser")
# for script in soup.find_all("script"):
# script.decompose()
quiz_content_text = soup
# 2. Get ALL links on the page for the LLM to analyze
# We capture the text and the URL for every anchor tag (<a>)
all_links = page.evaluate('''() => {
const links = Array.from(document.querySelectorAll('a'));
const fileExtensions = [
'.pdf', '.doc', '.docx', '.xls', '.xlsx', '.csv', '.zip', '.rar',
'.tar', '.gz', '.jpg', '.jpeg', '.png', '.gif', '.svg', '.mp3',
'.mp4', '.avi', '.mov', '.json', '.xml', '.txt', '.opus', '.flac'
];
return links.map(link => ({
text: link.innerText.trim(),
href: link.href
}))
.filter(link => {
const href = link.href.toLowerCase();
return fileExtensions.some(ext => href.endsWith(ext));
});
}''')
audio_element = page.query_selector("audio")
if audio_element:
audio_src = audio_element.get_attribute("src") # safe
# or absolute URL
audio_src = page.evaluate("(el) => el.src", audio_element)
all_links.append({'audio':'Audio file','href':audio_src})
canvas_element=page.query_selector("canvas")
if canvas_element:
data_url = page.evaluate("""
() => {
const canvas = document.querySelector("canvas");
return canvas.toDataURL("image/png");
}
""")
# Strip off the prefix and decode
header, encoded = data_url.split(",", 1)
image_bytes = base64.b64decode(encoded)
# Now you can send `image_bytes` or `encoded` directly to your LLM
# Example: save locally if needed
canvas_path = os.path.join(self.temp_dir.name, "image.png")
with open(canvas_path, "wb") as f:
f.write(image_bytes)
# link_data_str = json.dumps(all_links)
except Exception as e:
logging.info(f"Navigation/Scraping failed: {e}")
browser.close()
# Download files
if all_links !=[]:
download= self._download_file(all_links)
if canvas_element:
download.append(canvas_path)
else:
download=None
download=[]
if canvas_element:
download.append(canvas_path)
# 2. LLM Analysis and Code Generation
llm_plan = self._llm_analyze_and_generate_code(quiz_content_text,download,p)
submission_url = llm_plan['submit_url']
if submission_url=='':
submission_url="https://tds-llm-analysis.s-anand.net/submit"
final_answer = self._execute_generated_code(
llm_plan['python_code'],llm_plan['dependencies']
)
logging.info(f"Received result of python code: {final_answer}")
# 5. Submit the Answer
if submission_url.find("https://")==-1:
parsed = urlparse(self.current_url)
base_url = urlunparse((parsed.scheme, parsed.netloc, "", "", "", ""))
submission_url=base_url+submission_url
logging.info(f"Submission url used: {submission_url}")
submit_response = self._solve_and_submit(submission_url, final_answer)
self.temp_dir.cleanup()
self.temp_dir = tempfile.TemporaryDirectory()
logging.info(submit_response)
if submit_response.correct:
if submit_response.url:
self.current_url = submit_response.url
self.start_time= datetime.now()
logging.info(f" ✅ Correct! Proceeding to new quiz: {self.current_url}")
repeats=0
else:
logging.info("Quiz sequence complete!")
return "Success: Quiz sequence completed."
else:
logging.info("Incorrect")
if submit_response.url:
if self._get_time_remaining() <= 10:
self.current_url = submit_response.url
self.start_time= datetime.now()
logging.info(f"Skipping to new quiz because no time to redo: {self.current_url}")
repeats=0
else:
repeats+=1
if repeats<=1:
logging.info("Repeating question")
continue
else:
if submit_response.url:
self.current_url = submit_response.url
self.start_time= datetime.now()
logging.info(f"Skipping to new quiz after repeating: {self.current_url}")
repeats=0
else:
return "Done"
else:
repeats+=1
if repeats<=1:
logging.info("Repeating question")
continue
else:
if submit_response.url:
self.current_url = submit_response.url
self.start_time= datetime.now()
logging.info(f"Skipping to new quiz after repeating: {self.current_url}")
repeats=0
else:
return "Done"
# Else: Loop continues for re-submission on the same URL if time remains.
# if self._get_time_remaining() <= 10:
# return "Timeout: Did not complete quiz within 3 minutes."
# return "Failed to complete quiz sequence."