Prj2 / solver.py
iitmbs24f's picture
Upload 37 files
2f95553 verified
"""
Quiz solver module - main logic for solving quizzes.
"""
import asyncio
import json
import logging
import re
from typing import Optional, Dict, Any, List
import requests
from bs4 import BeautifulSoup
import pandas as pd
import io
import base64
from app.browser import get_browser, cleanup_browser
from app.llm import ask_gpt, parse_question_with_llm, solve_with_llm, initialize_llm
from app.utils import extract_submit_url, clean_text, extract_json_from_text, is_valid_url
logger = logging.getLogger(__name__)
# Initialize LLM on module load
initialize_llm()
class QuizSolver:
"""Main quiz solver class."""
def __init__(self):
self.browser = None
self.max_recursion = 10
self.current_recursion = 0
async def solve_quiz(self, url: str, email: str, secret: str) -> Dict[str, Any]:
"""
Main entry point for solving a quiz.
Args:
url: Quiz page URL
email: User email
secret: Secret key
Returns:
Final response from quiz system
"""
self.current_recursion = 0
self.browser = await get_browser()
try:
return await self._solve_recursive(url, email, secret)
finally:
# Don't close browser here as it might be reused
pass
async def _solve_recursive(self, url: str, email: str, secret: str) -> Dict[str, Any]:
"""
Recursively solve quizzes.
Args:
url: Current quiz URL
email: User email
secret: Secret key
Returns:
Response from quiz system
"""
if self.current_recursion >= self.max_recursion:
logger.error("Maximum recursion depth reached")
return {"error": "Maximum recursion depth reached"}
self.current_recursion += 1
logger.info(f"Solving quiz {self.current_recursion}: {url}")
try:
# Load the quiz page
page_content = await self.browser.load_page(url, wait_time=3)
# Extract submit URL
submit_url = extract_submit_url(page_content['text'], url)
if not submit_url:
# Try from HTML
soup = BeautifulSoup(page_content['html'], 'html.parser')
submit_url = extract_submit_url(soup.get_text(), url)
if not submit_url:
logger.error("Could not find submit URL")
return {"error": "Submit URL not found"}
# Extract question and solve
question_text = self._extract_question(page_content)
logger.info(f"Question extracted: {question_text[:200]}...")
# Solve the question
answer = await self._solve_question(question_text, page_content)
# Ensure answer is in the correct format (string or simple JSON-serializable)
answer = self._normalize_answer(answer)
logger.info(f"Answer computed: {str(answer)[:200]}...")
# Submit answer
response = await self._submit_answer(
submit_url, email, secret, url, answer
)
# Check if there's a next quiz
if isinstance(response, dict) and 'url' in response:
next_url = response['url']
if next_url and next_url != url and is_valid_url(next_url):
logger.info(f"Next quiz found: {next_url}")
# Recursively solve next quiz
next_response = await self._solve_recursive(next_url, email, secret)
return next_response
return response
except Exception as e:
logger.error(f"Error solving quiz: {e}", exc_info=True)
return {"error": str(e)}
def _extract_question(self, page_content: Dict[str, Any]) -> str:
"""
Extract question text from page content.
Args:
page_content: Page content dictionary
Returns:
Question text
"""
text = page_content.get('all_text', page_content.get('text', ''))
# Try to find question markers
question_patterns = [
r'[Qq]uestion[:\s]+(.*?)(?:\n\n|\n[A-Z]|$)',
r'[Pp]roblem[:\s]+(.*?)(?:\n\n|\n[A-Z]|$)',
r'[Tt]ask[:\s]+(.*?)(?:\n\n|\n[A-Z]|$)',
]
for pattern in question_patterns:
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
if match:
return clean_text(match.group(1))
# If no pattern matches, return first substantial paragraph
paragraphs = [p.strip() for p in text.split('\n\n') if len(p.strip()) > 50]
if paragraphs:
return paragraphs[0]
return clean_text(text[:1000]) # Return first 1000 chars
async def _solve_question(self, question: str, page_content: Dict[str, Any]) -> Any:
"""
Solve a quiz question using various strategies.
Args:
question: Question text
page_content: Full page content
Returns:
Answer (can be dict, list, string, number, etc.)
"""
logger.info("Analyzing question type...")
# Try to parse question with LLM first
parsed = await parse_question_with_llm(question, page_content.get('text', ''))
# Extract data from page
available_data = self._extract_data_from_page(page_content)
# Strategy 1: Check if answer is already in the page
answer_in_page = self._find_answer_in_page(page_content, question)
if answer_in_page:
logger.info("Answer found in page content")
return answer_in_page
# Strategy 2: Check for data files/links to download
data_files = self._find_data_files(page_content)
if data_files:
logger.info(f"Found data files: {data_files}")
processed_data = await self._process_data_files(data_files)
if processed_data:
answer = await self._solve_with_data(question, processed_data)
if answer:
return answer
# Strategy 3: Use LLM to solve
logger.info("Attempting to solve with LLM...")
llm_answer = await solve_with_llm(question, available_data)
if llm_answer:
# Try to parse as JSON if it looks like JSON
json_answer = extract_json_from_text(llm_answer)
if json_answer:
return json_answer
return llm_answer
# Strategy 4: Fallback - try to extract a simple answer from the question
# Many quiz pages have the answer in the question itself
simple_answer = self._extract_simple_answer(question, page_content)
if simple_answer:
logger.info("Extracted simple answer from question")
return simple_answer
# Strategy 5: Last resort - return a default answer
logger.warning("Could not solve question, using default answer")
return "answer"
def _extract_data_from_page(self, page_content: Dict[str, Any]) -> Dict[str, Any]:
"""
Extract structured data from page.
Args:
page_content: Page content dictionary
Returns:
Dictionary of extracted data
"""
data = {
'text': page_content.get('text', ''),
'html': page_content.get('html', ''),
'links': page_content.get('links', []),
'images': page_content.get('images', []),
}
# Try to extract tables
try:
soup = BeautifulSoup(page_content.get('html', ''), 'html.parser')
tables = soup.find_all('table')
if tables:
data['tables'] = []
for table in tables:
try:
df = pd.read_html(str(table))[0]
data['tables'].append(df.to_dict('records'))
except:
pass
except Exception as e:
logger.warning(f"Error extracting tables: {e}")
# Try to extract JSON from page
json_data = extract_json_from_text(page_content.get('text', ''))
if json_data:
data['json'] = json_data
return data
def _find_answer_in_page(self, page_content: Dict[str, Any], question: str) -> Optional[Any]:
"""
Check if answer is already present in page content.
Args:
page_content: Page content
question: Question text
Returns:
Answer if found, None otherwise
"""
text = page_content.get('all_text', page_content.get('text', ''))
# Look for answer patterns
answer_patterns = [
r'[Aa]nswer[:\s]+(.*?)(?:\n\n|$)',
r'[Ss]olution[:\s]+(.*?)(?:\n\n|$)',
r'[Rr]esult[:\s]+(.*?)(?:\n\n|$)',
]
for pattern in answer_patterns:
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
if match:
answer_text = clean_text(match.group(1))
# Try to parse as JSON
json_answer = extract_json_from_text(answer_text)
if json_answer:
return json_answer
return answer_text
return None
def _find_data_files(self, page_content: Dict[str, Any]) -> List[str]:
"""
Find data files (CSV, JSON, PDF, etc.) linked in the page.
Args:
page_content: Page content
Returns:
List of file URLs
"""
files = []
# Check links
for link in page_content.get('links', []):
href = link.get('href', '')
if any(href.lower().endswith(ext) for ext in ['.csv', '.json', '.pdf', '.xlsx', '.txt']):
files.append(href)
# Check text for file URLs
text = page_content.get('text', '')
file_pattern = r'https?://[^\s<>"\'\)]+\.(csv|json|pdf|xlsx|txt)'
matches = re.findall(file_pattern, text, re.IGNORECASE)
files.extend([m[0] for m in matches if m[0] not in files])
return files
async def _process_data_files(self, file_urls: List[str]) -> Dict[str, Any]:
"""
Download and process data files.
Args:
file_urls: List of file URLs
Returns:
Dictionary of processed data
"""
processed = {}
for url in file_urls:
try:
logger.info(f"Downloading file: {url}")
response = requests.get(url, timeout=30)
response.raise_for_status()
content_type = response.headers.get('content-type', '').lower()
filename = url.split('/')[-1]
if 'csv' in content_type or filename.endswith('.csv'):
df = pd.read_csv(io.StringIO(response.text))
processed[filename] = df.to_dict('records')
elif 'json' in content_type or filename.endswith('.json'):
processed[filename] = response.json()
elif 'pdf' in content_type or filename.endswith('.pdf'):
# PDF processing - try pdfplumber first, then PyPDF2
text = None
# Try pdfplumber
try:
import pdfplumber
with pdfplumber.open(io.BytesIO(response.content)) as pdf:
text = ""
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
if text:
processed[filename] = text.strip()
except ImportError:
logger.debug("pdfplumber not available")
except Exception as e:
logger.warning(f"Error reading PDF with pdfplumber {filename}: {e}")
# Fallback to PyPDF2
if not text or filename not in processed:
try:
import PyPDF2
pdf_file = io.BytesIO(response.content)
pdf_reader = PyPDF2.PdfReader(pdf_file)
text = ""
for page in pdf_reader.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
if text:
processed[filename] = text.strip()
except ImportError:
logger.warning("Neither pdfplumber nor PyPDF2 available for PDF processing")
except Exception as e:
logger.warning(f"Error reading PDF with PyPDF2 {filename}: {e}")
elif filename.endswith('.txt'):
processed[filename] = response.text
except Exception as e:
logger.error(f"Error processing file {url}: {e}")
continue
return processed
def _normalize_answer(self, answer: Any) -> Any:
"""
Normalize answer to ensure it's JSON-serializable and in correct format.
Args:
answer: Raw answer (can be dict, list, string, etc.)
Returns:
Normalized answer (preferably string or simple JSON)
"""
if answer is None:
return "answer"
# If it's a dict with question/analysis, extract a simple answer
if isinstance(answer, dict):
# If it contains an 'answer' key, use that
if 'answer' in answer:
return self._normalize_answer(answer['answer'])
# If it's an analysis dict, try to extract something useful
if 'question' in answer and len(answer) > 1:
# Return a simple string instead of the whole dict
return "answer"
# If it's a simple dict, convert to JSON string
if len(answer) <= 3:
try:
return json.dumps(answer)
except:
return str(answer)
# Complex dict - return as JSON string
try:
return json.dumps(answer)
except:
return str(answer)
# If it's a list, convert to JSON string if small, otherwise string
if isinstance(answer, list):
if len(answer) <= 10:
try:
return json.dumps(answer)
except:
return str(answer)
return str(answer)
# For strings, return as-is (but clean up)
if isinstance(answer, str):
# Remove excessive whitespace
answer = ' '.join(answer.split())
# If it's very long, truncate
if len(answer) > 1000:
answer = answer[:1000] + "..."
return answer
# For other types, convert to string
return str(answer)
def _extract_simple_answer(self, question: str, page_content: Dict[str, Any]) -> Optional[str]:
"""
Try to extract a simple answer from the question or page.
Args:
question: Question text
page_content: Page content
Returns:
Simple answer string or None
"""
text = page_content.get('all_text', page_content.get('text', ''))
combined = question + "\n\n" + text
# Check if question says "anything" or similar - very common in demo quizzes
if re.search(r'"answer"\s*:\s*"anything\s+you\s+want"', combined, re.IGNORECASE):
return "answer"
if re.search(r'"answer"\s*:\s*"anything"', combined, re.IGNORECASE):
return "answer"
if re.search(r'anything\s+you\s+want|any\s+value|any\s+string|any\s+text|anything', question, re.IGNORECASE):
return "answer"
# Look for patterns like "answer: X" or "the answer is X"
patterns = [
r'"answer"\s*:\s*"([^"]+)"', # JSON format: "answer": "value"
r'[Aa]nswer[:\s]+["\']?([^"\'\n]+)["\']?',
r'[Tt]he\s+[Aa]nswer\s+[Ii]s[:\s]+["\']?([^"\'\n]+)["\']?',
r'[Yy]our\s+[Aa]nswer[:\s]+["\']?([^"\'\n]+)["\']?',
]
for pattern in patterns:
match = re.search(pattern, combined, re.IGNORECASE)
if match:
answer = match.group(1).strip()
# Skip if it's a placeholder or instruction
if answer and len(answer) < 200 and answer.lower() not in ['your email', 'your secret', 'anything you want', 'anything']:
return answer
return None
async def _solve_with_data(self, question: str, data: Dict[str, Any]) -> Optional[Any]:
"""
Solve question using processed data.
Args:
question: Question text
data: Processed data dictionary
Returns:
Answer or None
"""
# Use LLM to solve with data
prompt = f"""Solve this question using the provided data:
Question: {question}
Data:
{json.dumps(data, indent=2, default=str)}
Provide the answer. If JSON format is required, return valid JSON.
"""
answer = await ask_gpt(prompt, max_tokens=3000)
if answer:
json_answer = extract_json_from_text(answer)
if json_answer:
return json_answer
return answer
return None
async def _submit_answer(self, submit_url: str, email: str, secret: str,
quiz_url: str, answer: Any) -> Dict[str, Any]:
"""
Submit answer to the quiz system.
Args:
submit_url: URL to submit answer to
email: User email
secret: Secret key
quiz_url: Original quiz URL
answer: Computed answer
Returns:
Response from submission endpoint
"""
# Ensure answer is JSON-serializable
try:
# Try to serialize answer to check if it's valid JSON
json.dumps(answer)
except (TypeError, ValueError) as e:
logger.warning(f"Answer is not JSON-serializable, converting to string: {e}")
# Convert complex objects to string representation
if isinstance(answer, (dict, list)):
answer = json.dumps(answer)
else:
answer = str(answer)
payload = {
"email": email,
"secret": secret,
"url": quiz_url,
"answer": answer
}
try:
logger.info(f"Submitting answer to: {submit_url}")
logger.debug(f"Payload: {json.dumps(payload, indent=2, default=str)}")
response = requests.post(
submit_url,
json=payload,
headers={'Content-Type': 'application/json'},
timeout=60
)
# Log response details
logger.info(f"Response status: {response.status_code}")
logger.debug(f"Response headers: {dict(response.headers)}")
response.raise_for_status()
try:
result = response.json()
logger.info(f"Submission successful: {result}")
return result
except json.JSONDecodeError:
logger.warning(f"Response is not JSON, returning text: {response.text[:500]}")
return {"response": response.text, "status_code": response.status_code}
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP error submitting answer: {e}")
if hasattr(e, 'response') and e.response is not None:
try:
error_response = e.response.json()
logger.error(f"Error response: {error_response}")
return error_response
except:
logger.error(f"Error response text: {e.response.text[:500]}")
return {"error": e.response.text, "status_code": e.response.status_code}
return {"error": str(e)}
except requests.exceptions.RequestException as e:
logger.error(f"Error submitting answer: {e}", exc_info=True)
return {"error": str(e)}
async def solve_quiz(url: str, email: str, secret: str) -> Dict[str, Any]:
"""
Convenience function to solve a quiz.
Args:
url: Quiz page URL
email: User email
secret: Secret key
Returns:
Final response from quiz system
"""
solver = QuizSolver()
return await solver.solve_quiz(url, email, secret)