|
|
"""
|
|
|
Quiz solver module - main logic for solving quizzes.
|
|
|
"""
|
|
|
import asyncio
|
|
|
import json
|
|
|
import logging
|
|
|
import re
|
|
|
from typing import Optional, Dict, Any, List
|
|
|
import requests
|
|
|
from bs4 import BeautifulSoup
|
|
|
import pandas as pd
|
|
|
import io
|
|
|
import base64
|
|
|
|
|
|
from app.browser import get_browser, cleanup_browser
|
|
|
from app.llm import ask_gpt, parse_question_with_llm, solve_with_llm, initialize_llm
|
|
|
from app.utils import extract_submit_url, clean_text, extract_json_from_text, is_valid_url
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
initialize_llm()
|
|
|
|
|
|
|
|
|
class QuizSolver:
|
|
|
"""Main quiz solver class."""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.browser = None
|
|
|
self.max_recursion = 10
|
|
|
self.current_recursion = 0
|
|
|
|
|
|
async def solve_quiz(self, url: str, email: str, secret: str) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Main entry point for solving a quiz.
|
|
|
|
|
|
Args:
|
|
|
url: Quiz page URL
|
|
|
email: User email
|
|
|
secret: Secret key
|
|
|
|
|
|
Returns:
|
|
|
Final response from quiz system
|
|
|
"""
|
|
|
self.current_recursion = 0
|
|
|
self.browser = await get_browser()
|
|
|
|
|
|
try:
|
|
|
return await self._solve_recursive(url, email, secret)
|
|
|
finally:
|
|
|
|
|
|
pass
|
|
|
|
|
|
async def _solve_recursive(self, url: str, email: str, secret: str) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Recursively solve quizzes.
|
|
|
|
|
|
Args:
|
|
|
url: Current quiz URL
|
|
|
email: User email
|
|
|
secret: Secret key
|
|
|
|
|
|
Returns:
|
|
|
Response from quiz system
|
|
|
"""
|
|
|
if self.current_recursion >= self.max_recursion:
|
|
|
logger.error("Maximum recursion depth reached")
|
|
|
return {"error": "Maximum recursion depth reached"}
|
|
|
|
|
|
self.current_recursion += 1
|
|
|
logger.info(f"Solving quiz {self.current_recursion}: {url}")
|
|
|
|
|
|
try:
|
|
|
|
|
|
page_content = await self.browser.load_page(url, wait_time=3)
|
|
|
|
|
|
|
|
|
submit_url = extract_submit_url(page_content['text'], url)
|
|
|
if not submit_url:
|
|
|
|
|
|
soup = BeautifulSoup(page_content['html'], 'html.parser')
|
|
|
submit_url = extract_submit_url(soup.get_text(), url)
|
|
|
|
|
|
if not submit_url:
|
|
|
logger.error("Could not find submit URL")
|
|
|
return {"error": "Submit URL not found"}
|
|
|
|
|
|
|
|
|
question_text = self._extract_question(page_content)
|
|
|
logger.info(f"Question extracted: {question_text[:200]}...")
|
|
|
|
|
|
|
|
|
answer = await self._solve_question(question_text, page_content)
|
|
|
|
|
|
|
|
|
answer = self._normalize_answer(answer)
|
|
|
logger.info(f"Answer computed: {str(answer)[:200]}...")
|
|
|
|
|
|
|
|
|
response = await self._submit_answer(
|
|
|
submit_url, email, secret, url, answer
|
|
|
)
|
|
|
|
|
|
|
|
|
if isinstance(response, dict) and 'url' in response:
|
|
|
next_url = response['url']
|
|
|
if next_url and next_url != url and is_valid_url(next_url):
|
|
|
logger.info(f"Next quiz found: {next_url}")
|
|
|
|
|
|
next_response = await self._solve_recursive(next_url, email, secret)
|
|
|
return next_response
|
|
|
|
|
|
return response
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error solving quiz: {e}", exc_info=True)
|
|
|
return {"error": str(e)}
|
|
|
|
|
|
def _extract_question(self, page_content: Dict[str, Any]) -> str:
|
|
|
"""
|
|
|
Extract question text from page content.
|
|
|
|
|
|
Args:
|
|
|
page_content: Page content dictionary
|
|
|
|
|
|
Returns:
|
|
|
Question text
|
|
|
"""
|
|
|
text = page_content.get('all_text', page_content.get('text', ''))
|
|
|
|
|
|
|
|
|
question_patterns = [
|
|
|
r'[Qq]uestion[:\s]+(.*?)(?:\n\n|\n[A-Z]|$)',
|
|
|
r'[Pp]roblem[:\s]+(.*?)(?:\n\n|\n[A-Z]|$)',
|
|
|
r'[Tt]ask[:\s]+(.*?)(?:\n\n|\n[A-Z]|$)',
|
|
|
]
|
|
|
|
|
|
for pattern in question_patterns:
|
|
|
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
|
|
|
if match:
|
|
|
return clean_text(match.group(1))
|
|
|
|
|
|
|
|
|
paragraphs = [p.strip() for p in text.split('\n\n') if len(p.strip()) > 50]
|
|
|
if paragraphs:
|
|
|
return paragraphs[0]
|
|
|
|
|
|
return clean_text(text[:1000])
|
|
|
|
|
|
async def _solve_question(self, question: str, page_content: Dict[str, Any]) -> Any:
|
|
|
"""
|
|
|
Solve a quiz question using various strategies.
|
|
|
|
|
|
Args:
|
|
|
question: Question text
|
|
|
page_content: Full page content
|
|
|
|
|
|
Returns:
|
|
|
Answer (can be dict, list, string, number, etc.)
|
|
|
"""
|
|
|
logger.info("Analyzing question type...")
|
|
|
|
|
|
|
|
|
parsed = await parse_question_with_llm(question, page_content.get('text', ''))
|
|
|
|
|
|
|
|
|
available_data = self._extract_data_from_page(page_content)
|
|
|
|
|
|
|
|
|
answer_in_page = self._find_answer_in_page(page_content, question)
|
|
|
if answer_in_page:
|
|
|
logger.info("Answer found in page content")
|
|
|
return answer_in_page
|
|
|
|
|
|
|
|
|
data_files = self._find_data_files(page_content)
|
|
|
if data_files:
|
|
|
logger.info(f"Found data files: {data_files}")
|
|
|
processed_data = await self._process_data_files(data_files)
|
|
|
if processed_data:
|
|
|
answer = await self._solve_with_data(question, processed_data)
|
|
|
if answer:
|
|
|
return answer
|
|
|
|
|
|
|
|
|
logger.info("Attempting to solve with LLM...")
|
|
|
llm_answer = await solve_with_llm(question, available_data)
|
|
|
if llm_answer:
|
|
|
|
|
|
json_answer = extract_json_from_text(llm_answer)
|
|
|
if json_answer:
|
|
|
return json_answer
|
|
|
return llm_answer
|
|
|
|
|
|
|
|
|
|
|
|
simple_answer = self._extract_simple_answer(question, page_content)
|
|
|
if simple_answer:
|
|
|
logger.info("Extracted simple answer from question")
|
|
|
return simple_answer
|
|
|
|
|
|
|
|
|
logger.warning("Could not solve question, using default answer")
|
|
|
return "answer"
|
|
|
|
|
|
def _extract_data_from_page(self, page_content: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Extract structured data from page.
|
|
|
|
|
|
Args:
|
|
|
page_content: Page content dictionary
|
|
|
|
|
|
Returns:
|
|
|
Dictionary of extracted data
|
|
|
"""
|
|
|
data = {
|
|
|
'text': page_content.get('text', ''),
|
|
|
'html': page_content.get('html', ''),
|
|
|
'links': page_content.get('links', []),
|
|
|
'images': page_content.get('images', []),
|
|
|
}
|
|
|
|
|
|
|
|
|
try:
|
|
|
soup = BeautifulSoup(page_content.get('html', ''), 'html.parser')
|
|
|
tables = soup.find_all('table')
|
|
|
if tables:
|
|
|
data['tables'] = []
|
|
|
for table in tables:
|
|
|
try:
|
|
|
df = pd.read_html(str(table))[0]
|
|
|
data['tables'].append(df.to_dict('records'))
|
|
|
except:
|
|
|
pass
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Error extracting tables: {e}")
|
|
|
|
|
|
|
|
|
json_data = extract_json_from_text(page_content.get('text', ''))
|
|
|
if json_data:
|
|
|
data['json'] = json_data
|
|
|
|
|
|
return data
|
|
|
|
|
|
def _find_answer_in_page(self, page_content: Dict[str, Any], question: str) -> Optional[Any]:
|
|
|
"""
|
|
|
Check if answer is already present in page content.
|
|
|
|
|
|
Args:
|
|
|
page_content: Page content
|
|
|
question: Question text
|
|
|
|
|
|
Returns:
|
|
|
Answer if found, None otherwise
|
|
|
"""
|
|
|
text = page_content.get('all_text', page_content.get('text', ''))
|
|
|
|
|
|
|
|
|
answer_patterns = [
|
|
|
r'[Aa]nswer[:\s]+(.*?)(?:\n\n|$)',
|
|
|
r'[Ss]olution[:\s]+(.*?)(?:\n\n|$)',
|
|
|
r'[Rr]esult[:\s]+(.*?)(?:\n\n|$)',
|
|
|
]
|
|
|
|
|
|
for pattern in answer_patterns:
|
|
|
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
|
|
|
if match:
|
|
|
answer_text = clean_text(match.group(1))
|
|
|
|
|
|
json_answer = extract_json_from_text(answer_text)
|
|
|
if json_answer:
|
|
|
return json_answer
|
|
|
return answer_text
|
|
|
|
|
|
return None
|
|
|
|
|
|
def _find_data_files(self, page_content: Dict[str, Any]) -> List[str]:
|
|
|
"""
|
|
|
Find data files (CSV, JSON, PDF, etc.) linked in the page.
|
|
|
|
|
|
Args:
|
|
|
page_content: Page content
|
|
|
|
|
|
Returns:
|
|
|
List of file URLs
|
|
|
"""
|
|
|
files = []
|
|
|
|
|
|
|
|
|
for link in page_content.get('links', []):
|
|
|
href = link.get('href', '')
|
|
|
if any(href.lower().endswith(ext) for ext in ['.csv', '.json', '.pdf', '.xlsx', '.txt']):
|
|
|
files.append(href)
|
|
|
|
|
|
|
|
|
text = page_content.get('text', '')
|
|
|
file_pattern = r'https?://[^\s<>"\'\)]+\.(csv|json|pdf|xlsx|txt)'
|
|
|
matches = re.findall(file_pattern, text, re.IGNORECASE)
|
|
|
files.extend([m[0] for m in matches if m[0] not in files])
|
|
|
|
|
|
return files
|
|
|
|
|
|
async def _process_data_files(self, file_urls: List[str]) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Download and process data files.
|
|
|
|
|
|
Args:
|
|
|
file_urls: List of file URLs
|
|
|
|
|
|
Returns:
|
|
|
Dictionary of processed data
|
|
|
"""
|
|
|
processed = {}
|
|
|
|
|
|
for url in file_urls:
|
|
|
try:
|
|
|
logger.info(f"Downloading file: {url}")
|
|
|
response = requests.get(url, timeout=30)
|
|
|
response.raise_for_status()
|
|
|
|
|
|
content_type = response.headers.get('content-type', '').lower()
|
|
|
filename = url.split('/')[-1]
|
|
|
|
|
|
if 'csv' in content_type or filename.endswith('.csv'):
|
|
|
df = pd.read_csv(io.StringIO(response.text))
|
|
|
processed[filename] = df.to_dict('records')
|
|
|
|
|
|
elif 'json' in content_type or filename.endswith('.json'):
|
|
|
processed[filename] = response.json()
|
|
|
|
|
|
elif 'pdf' in content_type or filename.endswith('.pdf'):
|
|
|
|
|
|
text = None
|
|
|
|
|
|
|
|
|
try:
|
|
|
import pdfplumber
|
|
|
with pdfplumber.open(io.BytesIO(response.content)) as pdf:
|
|
|
text = ""
|
|
|
for page in pdf.pages:
|
|
|
page_text = page.extract_text()
|
|
|
if page_text:
|
|
|
text += page_text + "\n"
|
|
|
if text:
|
|
|
processed[filename] = text.strip()
|
|
|
except ImportError:
|
|
|
logger.debug("pdfplumber not available")
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Error reading PDF with pdfplumber {filename}: {e}")
|
|
|
|
|
|
|
|
|
if not text or filename not in processed:
|
|
|
try:
|
|
|
import PyPDF2
|
|
|
pdf_file = io.BytesIO(response.content)
|
|
|
pdf_reader = PyPDF2.PdfReader(pdf_file)
|
|
|
text = ""
|
|
|
for page in pdf_reader.pages:
|
|
|
page_text = page.extract_text()
|
|
|
if page_text:
|
|
|
text += page_text + "\n"
|
|
|
if text:
|
|
|
processed[filename] = text.strip()
|
|
|
except ImportError:
|
|
|
logger.warning("Neither pdfplumber nor PyPDF2 available for PDF processing")
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Error reading PDF with PyPDF2 {filename}: {e}")
|
|
|
|
|
|
elif filename.endswith('.txt'):
|
|
|
processed[filename] = response.text
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error processing file {url}: {e}")
|
|
|
continue
|
|
|
|
|
|
return processed
|
|
|
|
|
|
def _normalize_answer(self, answer: Any) -> Any:
|
|
|
"""
|
|
|
Normalize answer to ensure it's JSON-serializable and in correct format.
|
|
|
|
|
|
Args:
|
|
|
answer: Raw answer (can be dict, list, string, etc.)
|
|
|
|
|
|
Returns:
|
|
|
Normalized answer (preferably string or simple JSON)
|
|
|
"""
|
|
|
if answer is None:
|
|
|
return "answer"
|
|
|
|
|
|
|
|
|
if isinstance(answer, dict):
|
|
|
|
|
|
if 'answer' in answer:
|
|
|
return self._normalize_answer(answer['answer'])
|
|
|
|
|
|
if 'question' in answer and len(answer) > 1:
|
|
|
|
|
|
return "answer"
|
|
|
|
|
|
if len(answer) <= 3:
|
|
|
try:
|
|
|
return json.dumps(answer)
|
|
|
except:
|
|
|
return str(answer)
|
|
|
|
|
|
try:
|
|
|
return json.dumps(answer)
|
|
|
except:
|
|
|
return str(answer)
|
|
|
|
|
|
|
|
|
if isinstance(answer, list):
|
|
|
if len(answer) <= 10:
|
|
|
try:
|
|
|
return json.dumps(answer)
|
|
|
except:
|
|
|
return str(answer)
|
|
|
return str(answer)
|
|
|
|
|
|
|
|
|
if isinstance(answer, str):
|
|
|
|
|
|
answer = ' '.join(answer.split())
|
|
|
|
|
|
if len(answer) > 1000:
|
|
|
answer = answer[:1000] + "..."
|
|
|
return answer
|
|
|
|
|
|
|
|
|
return str(answer)
|
|
|
|
|
|
def _extract_simple_answer(self, question: str, page_content: Dict[str, Any]) -> Optional[str]:
|
|
|
"""
|
|
|
Try to extract a simple answer from the question or page.
|
|
|
|
|
|
Args:
|
|
|
question: Question text
|
|
|
page_content: Page content
|
|
|
|
|
|
Returns:
|
|
|
Simple answer string or None
|
|
|
"""
|
|
|
text = page_content.get('all_text', page_content.get('text', ''))
|
|
|
combined = question + "\n\n" + text
|
|
|
|
|
|
|
|
|
if re.search(r'"answer"\s*:\s*"anything\s+you\s+want"', combined, re.IGNORECASE):
|
|
|
return "answer"
|
|
|
if re.search(r'"answer"\s*:\s*"anything"', combined, re.IGNORECASE):
|
|
|
return "answer"
|
|
|
if re.search(r'anything\s+you\s+want|any\s+value|any\s+string|any\s+text|anything', question, re.IGNORECASE):
|
|
|
return "answer"
|
|
|
|
|
|
|
|
|
patterns = [
|
|
|
r'"answer"\s*:\s*"([^"]+)"',
|
|
|
r'[Aa]nswer[:\s]+["\']?([^"\'\n]+)["\']?',
|
|
|
r'[Tt]he\s+[Aa]nswer\s+[Ii]s[:\s]+["\']?([^"\'\n]+)["\']?',
|
|
|
r'[Yy]our\s+[Aa]nswer[:\s]+["\']?([^"\'\n]+)["\']?',
|
|
|
]
|
|
|
|
|
|
for pattern in patterns:
|
|
|
match = re.search(pattern, combined, re.IGNORECASE)
|
|
|
if match:
|
|
|
answer = match.group(1).strip()
|
|
|
|
|
|
if answer and len(answer) < 200 and answer.lower() not in ['your email', 'your secret', 'anything you want', 'anything']:
|
|
|
return answer
|
|
|
|
|
|
return None
|
|
|
|
|
|
async def _solve_with_data(self, question: str, data: Dict[str, Any]) -> Optional[Any]:
|
|
|
"""
|
|
|
Solve question using processed data.
|
|
|
|
|
|
Args:
|
|
|
question: Question text
|
|
|
data: Processed data dictionary
|
|
|
|
|
|
Returns:
|
|
|
Answer or None
|
|
|
"""
|
|
|
|
|
|
prompt = f"""Solve this question using the provided data:
|
|
|
|
|
|
Question: {question}
|
|
|
|
|
|
Data:
|
|
|
{json.dumps(data, indent=2, default=str)}
|
|
|
|
|
|
Provide the answer. If JSON format is required, return valid JSON.
|
|
|
"""
|
|
|
|
|
|
answer = await ask_gpt(prompt, max_tokens=3000)
|
|
|
if answer:
|
|
|
json_answer = extract_json_from_text(answer)
|
|
|
if json_answer:
|
|
|
return json_answer
|
|
|
return answer
|
|
|
|
|
|
return None
|
|
|
|
|
|
async def _submit_answer(self, submit_url: str, email: str, secret: str,
|
|
|
quiz_url: str, answer: Any) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Submit answer to the quiz system.
|
|
|
|
|
|
Args:
|
|
|
submit_url: URL to submit answer to
|
|
|
email: User email
|
|
|
secret: Secret key
|
|
|
quiz_url: Original quiz URL
|
|
|
answer: Computed answer
|
|
|
|
|
|
Returns:
|
|
|
Response from submission endpoint
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
json.dumps(answer)
|
|
|
except (TypeError, ValueError) as e:
|
|
|
logger.warning(f"Answer is not JSON-serializable, converting to string: {e}")
|
|
|
|
|
|
if isinstance(answer, (dict, list)):
|
|
|
answer = json.dumps(answer)
|
|
|
else:
|
|
|
answer = str(answer)
|
|
|
|
|
|
payload = {
|
|
|
"email": email,
|
|
|
"secret": secret,
|
|
|
"url": quiz_url,
|
|
|
"answer": answer
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
logger.info(f"Submitting answer to: {submit_url}")
|
|
|
logger.debug(f"Payload: {json.dumps(payload, indent=2, default=str)}")
|
|
|
|
|
|
response = requests.post(
|
|
|
submit_url,
|
|
|
json=payload,
|
|
|
headers={'Content-Type': 'application/json'},
|
|
|
timeout=60
|
|
|
)
|
|
|
|
|
|
|
|
|
logger.info(f"Response status: {response.status_code}")
|
|
|
logger.debug(f"Response headers: {dict(response.headers)}")
|
|
|
|
|
|
response.raise_for_status()
|
|
|
|
|
|
try:
|
|
|
result = response.json()
|
|
|
logger.info(f"Submission successful: {result}")
|
|
|
return result
|
|
|
except json.JSONDecodeError:
|
|
|
logger.warning(f"Response is not JSON, returning text: {response.text[:500]}")
|
|
|
return {"response": response.text, "status_code": response.status_code}
|
|
|
|
|
|
except requests.exceptions.HTTPError as e:
|
|
|
logger.error(f"HTTP error submitting answer: {e}")
|
|
|
if hasattr(e, 'response') and e.response is not None:
|
|
|
try:
|
|
|
error_response = e.response.json()
|
|
|
logger.error(f"Error response: {error_response}")
|
|
|
return error_response
|
|
|
except:
|
|
|
logger.error(f"Error response text: {e.response.text[:500]}")
|
|
|
return {"error": e.response.text, "status_code": e.response.status_code}
|
|
|
return {"error": str(e)}
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
logger.error(f"Error submitting answer: {e}", exc_info=True)
|
|
|
return {"error": str(e)}
|
|
|
|
|
|
|
|
|
async def solve_quiz(url: str, email: str, secret: str) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Convenience function to solve a quiz.
|
|
|
|
|
|
Args:
|
|
|
url: Quiz page URL
|
|
|
email: User email
|
|
|
secret: Secret key
|
|
|
|
|
|
Returns:
|
|
|
Final response from quiz system
|
|
|
"""
|
|
|
solver = QuizSolver()
|
|
|
return await solver.solve_quiz(url, email, secret)
|
|
|
|
|
|
|