Spaces:
Sleeping
Sleeping
File size: 16,976 Bytes
ff23105 78331be ff23105 78331be ff23105 78331be ff23105 78331be ff23105 78331be ff23105 78331be ff23105 78331be ff23105 78331be ff23105 78331be ff23105 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 | import asyncio
import json
import logging
import requests
from urllib.parse import urljoin
from playwright.async_api import async_playwright
from bs4 import BeautifulSoup
from agent import get_agent
logger = logging.getLogger(__name__)
async def solve_quiz(initial_url: str, email: str, secret: str):
import uuid
task_id = str(uuid.uuid4())[:8]
logger.info(f"[{task_id}] Starting quiz solver workflow for {email}")
current_url = initial_url
last_url = None
url_attempts = 0
failure_details = None
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
page = await context.new_page()
try:
while current_url:
# Generate a NEW session ID for each task/URL to keep memory clean
import uuid
# Generate a NEW session ID for each task/URL to keep memory clean
import uuid
session_id = str(uuid.uuid4())
# Update attempt counter
if current_url == last_url:
url_attempts += 1
else:
last_url = current_url
url_attempts = 1
if url_attempts > 60:
logger.error(f"Max attempts (60) reached for {current_url}. Stopping loop.")
break
logger.info(f"Started new agent session for {current_url} (Attempt {url_attempts}): {session_id}")
logger.info(f"Navigating to {current_url}")
await page.goto(current_url)
# Wait for content
await page.wait_for_selector("body")
# Check for email input and fill it if present
# Many quizzes require entering the email to see the question
try:
email_input = await page.query_selector("input[type='email'], input[name='email'], input[placeholder*='email']")
if email_input:
logger.info(f"Found email input, filling with {email}")
await email_input.fill(email)
await email_input.press("Enter")
# Wait for potential update/navigation
await page.wait_for_load_state("networkidle")
await asyncio.sleep(2) # Extra buffer for JS updates
except Exception as e:
logger.warning(f"Error handling email input: {e}")
# Extract content
# Extract content
# Get full HTML to parse links and media
html_content = await page.content()
soup = BeautifulSoup(html_content, 'html.parser')
# Extract text
text_content = soup.get_text(separator='\n', strip=True)
# Check for <base> tag
base_url = current_url
base_tag = soup.find('base', href=True)
if base_tag:
base_url = urljoin(current_url, base_tag['href'])
logger.info(f"Found <base> tag, using base URL: {base_url}")
# Extract links and media to append to context
links = []
for a in soup.find_all('a', href=True):
href = a['href']
full_url = urljoin(base_url, href)
links.append(f"Link: [{a.get_text(strip=True)}]({full_url})")
audio_sources = []
for audio in soup.find_all('audio'):
if audio.get('src'):
src = audio['src']
full_src = urljoin(base_url, src)
audio_sources.append(f"Audio: {full_src}")
for source in audio.find_all('source', src=True):
src = source['src']
full_src = urljoin(base_url, src)
audio_sources.append(f"Audio: {full_src}")
images = []
for img in soup.find_all('img'):
src = img.get('src')
if src:
full_src = urljoin(base_url, src)
alt = img.get('alt', 'No description')
images.append(f"Image: [{alt}]({full_src})")
# Conditional Screenshot Logic
# If there are visual elements (canvas or images), capture the page state to a file.
# This allows the agent to "see" the page if needed, without cluttering context with base64.
try:
has_visuals = await page.evaluate("() => document.querySelectorAll('canvas, img').length > 0")
if has_visuals:
screenshot_path = f"/tmp/screenshot_{session_id}.jpg"
await page.screenshot(path=screenshot_path, full_page=True, type='jpeg', quality=50)
images.append(f"Image: [Page Screenshot]({screenshot_path})")
logger.info(f"Visual elements detected. Saved screenshot to {screenshot_path}")
else:
logger.info("No significant visual elements detected. Skipping screenshot.")
except Exception as e:
logger.warning(f"Error handling screenshot: {e}")
# Combine into a rich context
content = text_content + "\n\n--- Extracted Links & Media ---\n" + "\n".join(links + audio_sources + images)
# If the content is empty or loading, wait a bit
if not content.strip():
await asyncio.sleep(1)
content = await page.evaluate("document.body.innerText")
logger.info(f"Extracted content (first 100 chars): {content[:100]}")
# Use agent to solve (initialize here if needed, but we use get_agent() outside if we wanted persistent agent object,
# but we want fresh memory per task, so we rely on session_id)
agent = get_agent()
failure_info = ""
if failure_details:
failure_info = f"""
***PREVIOUS FAILED SUBMISSION***
- The previous answer you submitted was INCORRECT.
- **Submitted Answer**: {json.dumps(failure_details['answer'].get('answer', 'UNKNOWN'))}
- **Error/Reason**: {failure_details['reason']}
- **GUIDANCE**: Analyze the error message carefully. Do NOT repeat the same answer. Try a different approach or format.
***
"""
prompt = f"""
{failure_info}
You are a highly capable Quiz Solver Agent.
Current Page URL: {current_url}
Page Content:
---
{content.replace("{", "{{").replace("}", "}}")}
---
**GOAL**
Solve the task on the current page.
**GUIDELINES**
- **Conciseness**: Plan and explain in **2-3 lines maximum**.
- **Action**: Respond **IMMEDIATELY** with a tool call or the final JSON. **DO NOT** output conversational text or plans like "I need to...". Just run the code.
- Have a dummy answer for the intial request
**TOOL USAGE**
- **Secret Codes**: Return exactly as requested (no extra spaces).
- **Media**: (this is the order in which you should understand the contents of the page)
- Audio: Use `transcribe_audio(url)`.
- Images: Use `understand_image(url, prompt)`. (if /tmp is involved its stored locally and not in the url)
- PDF/ZIP: Use `read_pdf(url)` or `read_zip(url)`.
- History: Use `search_history(query)` with the previous quiz URL.
- API: Use `call_api(url)` only if explicitly mentioned to call an api and not otherwise.
- **Files**:
- **Local**: Paths starting with `/tmp/` are local. Pass them directly to tools.
- **Relative URLs**: Resolve against the Current Page URL using `urllib.parse.urljoin`.
- **Scripts**: Use `fetch_page_scripts(url)` **ONLY** if the page explicitly asks to "run javascript" or "embedded javascript".
- **Stuck?**: If content is unchanged or ambiguous, **FIRST** **CRITICAL** verify visually with `understand_image`. Only then consider scripts.
**DATA ANALYSIS**
- **Process**:
1. **Inspect**: Download/read the file and print `df.head()` and `df.columns`. **STOP**.
2. **Analyze**: Read the inspection output, *then* write new code to solve.
- **Downloads**: Always check `status_code == 200`. Save to `/tmp`.
- **CSV**: Use `header=None` for number lists.
- **Output**: Calculate the final answer in Python. Print **ONLY** the result. Do not print large dataframes.
**DIRECT ANSWER**
If the answer is in the text, provide it directly.
6. If sometimes the answer is wrong when previously you submitted it in some format (integer/string/float) try submitting it in another format.
7. If the task asks for email or uses an example email, then the email address is 22f3000730@ds.study.iitm.ac.in. You can use it if needed. But DO NOT confuse it for anything else based on your assumptions.
8. Email number for '22f3000730@ds.study.iitm.ac.in' is 45519 using sha1 hash. You can use it if needed. But DO NOT confuse it for anything else based on your assumptions. DO NOT assume that this is the answer unless the page explicitly states to submit it.
9. If the tasks ask for the secret, it is vjay@15
10. Screenshots are mentioned in the prompt like this: Image: [Page Screenshot](/tmp/screenshot_12345.jpg) use understand_image(/tmp/screenshot_12345.jpg, "") to understand the screenshot.
**OUTPUT**
When you have the final answer, return a JSON object with the following structure:
{{
"answer_payload": {{"email": "...", "secret": "...", "url": "...", "answer": "..."}},
"submit_url": "...",
"reasoning": "..."
}}
If submission url is not available, use https://tds-llm-analysis.s-anand.net/submit to submit.
"""
# Run agent with session_id for memory
max_retries = 3
for attempt in range(max_retries):
response = agent.run(prompt, session_id=session_id)
logger.info(f"LLM Response: {response.content}")
# Parse response
try:
response_text = response.content
logger.info(f"Raw LLM Response: {response_text}")
# Robust JSON extraction using regex
import re
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
if json_match:
response_text = json_match.group(0)
result = json.loads(response_text)
# Check if agent returned python_code instead of final answer
if "python_code" in result and "answer_payload" not in result:
python_code = result.get("python_code")
logger.info(f"Agent provided Python code to execute")
# Execute the code
from tools import execute_python
code_output = execute_python(python_code)
logger.info(f"Python code executed, output: {code_output[:200]}...")
# Ask agent to format final JSON with code output
followup_prompt = f"""
The Python code executed successfully. Output:
{code_output.replace("{", "{{").replace("}", "}}")}
Now return the final JSON for submission:
{{
"answer_payload": {{"email": "{email}", "secret": "{secret}", "url": "{current_url}", "answer": <extract from output above>}},
"submit_url": <submit URL from original page>,
"reasoning": <brief explanation>
}}
"""
response = agent.run(followup_prompt, session_id=session_id)
logger.info(f"LLM Follow-up Response: {response.content}")
# Parse follow-up response
response_text = response.content
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
if json_match:
response_text = json_match.group(0)
result = json.loads(response_text)
answer_payload = result.get("answer_payload")
submit_url = result.get("submit_url")
if not answer_payload or not submit_url:
logger.error("Agent failed to provide answer_payload or submit_url")
if attempt < max_retries - 1:
prompt = "Error: You must return a JSON object with 'answer_payload' and 'submit_url'. Do not return conversational text."
continue
break
if answer_payload:
# Trust the LLM's payload
pass
# Resolve relative URL
submit_url = urljoin(current_url, submit_url)
logger.info(f"Solved. Submitting to {submit_url}")
# Submit answer
submission_response = submit_answer(submit_url, answer_payload)
logger.info(f"Submission Response: {json.dumps(submission_response, indent=2)}")
# Check for next URL first (priority over correctness for navigation)
next_url = submission_response.get("url")
is_correct = submission_response.get("correct")
if next_url:
logger.info(f"Received next URL: {next_url}")
if not is_correct:
logger.warning(f"Answer was incorrect, but moving to next URL as instructed.")
else:
logger.info("Answer correct! Moving to next URL.")
current_url = next_url
failure_details = None # Reset on success
break # Break retry loop to process new URL
# No new URL provided
if is_correct:
logger.info("Answer correct! No new URL provided. Quiz completed!")
current_url = None # Break outer loop
break # Break retry loop
else:
logger.warning(f"Answer incorrect: {submission_response.get('reason')}")
logger.info("No new URL provided. Retrying same URL in 20 seconds...")
current_url = current_url # Just to be explicit, logic loops anyway
failure_details = {"answer": answer_payload, "reason": submission_response.get("reason")}
await asyncio.sleep(20)
# Break inner loop to refresh page and try again
break
except json.JSONDecodeError:
logger.error(f"Failed to parse agent response: {response.content}")
if attempt < max_retries - 1:
prompt = "Error: Your response was not valid JSON. Please return ONLY a JSON object. Do not include any conversational text."
continue
break
except Exception as e:
logger.error(f"Error in solver loop: {e}")
finally:
await browser.close()
def submit_answer(submit_url, payload):
try:
logger.info(f"Submitting answer to {submit_url} with payload: {json.dumps(payload, indent=2)}")
response = requests.post(submit_url, json=payload)
return response.json()
except Exception as e:
logger.error(f"Submission failed: {e}")
return {"correct": False, "reason": str(e)}
|