Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import json | |
| import base64 | |
| import logging | |
| import time | |
| from typing import List, Dict, Any | |
| from fastapi import FastAPI | |
| from pydantic import BaseModel | |
| from selenium import webdriver | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.support import expected_conditions as EC | |
| from selenium.webdriver.chrome.options import Options | |
| from selenium.webdriver.chrome.service import Service | |
| # ------------------------- | |
| # Logging | |
| # ------------------------- | |
| logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO")) | |
| logger = logging.getLogger("hackrx-round5") | |
| # ------------------------- | |
| # FastAPI app | |
| # ------------------------- | |
| app = FastAPI(title="HackRx Round 5 API", version="1.0.0") | |
| # ------------------------- | |
| # Models | |
| # ------------------------- | |
| class ChallengeRequest(BaseModel): | |
| url: str | |
| questions: List[str] | |
| class ChallengeResponse(BaseModel): | |
| answers: List[str] | |
| # ------------------------- | |
| # Helpers | |
| # ------------------------- | |
| def try_decode_jwt(token: str) -> Dict[str, Any]: | |
| """Try to decode a JWT without verifying signature.""" | |
| try: | |
| parts = token.split(".") | |
| if len(parts) != 3: | |
| return {} | |
| payload_b64 = parts[1] + "=" * (-len(parts[1]) % 4) # pad | |
| payload_json = base64.urlsafe_b64decode(payload_b64).decode("utf-8") | |
| decoded_payload = json.loads(payload_json) | |
| logger.info(f"Decoded JWT payload: {decoded_payload}") | |
| return decoded_payload | |
| except Exception as e: | |
| logger.error(f"JWT decode error: {e}") | |
| return {} | |
| def setup_chrome_driver(): | |
| """Setup Chrome driver with appropriate options.""" | |
| chrome_options = Options() | |
| chrome_options.add_argument("--headless") # Run in background | |
| chrome_options.add_argument("--no-sandbox") | |
| chrome_options.add_argument("--disable-dev-shm-usage") | |
| chrome_options.add_argument("--disable-gpu") | |
| chrome_options.add_argument("--window-size=1920,1080") | |
| chrome_options.add_argument("--user-agent=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0 Safari/537.36") | |
| # Enable logging to capture console messages | |
| chrome_options.add_argument("--enable-logging") | |
| chrome_options.add_argument("--log-level=0") | |
| try: | |
| driver = webdriver.Chrome(options=chrome_options) | |
| return driver | |
| except Exception as e: | |
| logger.error(f"Failed to create Chrome driver: {e}") | |
| return None | |
| # ------------------------- | |
| # Interactive Scraper | |
| # ------------------------- | |
| def scrape_with_selenium(url: str) -> Dict[str, Any]: | |
| """Scrape webpage with Selenium, click Start Challenge, and extract data.""" | |
| driver = None | |
| try: | |
| driver = setup_chrome_driver() | |
| if not driver: | |
| return {} | |
| logger.info(f"Loading URL: {url}") | |
| driver.get(url) | |
| # Wait for page to load | |
| WebDriverWait(driver, 10).until( | |
| EC.presence_of_element_located((By.TAG_NAME, "body")) | |
| ) | |
| time.sleep(2) | |
| # Look for and click "Start Challenge" button | |
| start_button_selectors = [ | |
| "button:contains('Start Challenge')", | |
| "button[id*='start']", | |
| "button[class*='start']", | |
| "input[value*='Start']", | |
| "a[href*='start']", | |
| ".btn:contains('Start')", | |
| "[onclick*='start']" | |
| ] | |
| button_clicked = False | |
| for selector in start_button_selectors: | |
| try: | |
| if "contains" in selector: | |
| # Use XPath for text-based selection | |
| xpath_selector = f"//button[contains(text(), 'Start Challenge')] | //button[contains(text(), 'Start')] | //input[contains(@value, 'Start')]" | |
| elements = driver.find_elements(By.XPATH, xpath_selector) | |
| else: | |
| elements = driver.find_elements(By.CSS_SELECTOR, selector) | |
| if elements: | |
| logger.info(f"Found start button with selector: {selector}") | |
| elements[0].click() | |
| button_clicked = True | |
| time.sleep(3) # Wait for challenge to start | |
| break | |
| except Exception as e: | |
| logger.debug(f"Selector {selector} failed: {e}") | |
| continue | |
| if not button_clicked: | |
| logger.warning("Could not find Start Challenge button, proceeding with current page") | |
| # Get page source after interaction | |
| html = driver.page_source | |
| # Get console logs | |
| console_logs = [] | |
| try: | |
| logs = driver.get_log('browser') | |
| for log in logs: | |
| console_logs.append(log['message']) | |
| logger.info(f"Console log: {log['message']}") | |
| except Exception as e: | |
| logger.warning(f"Could not get console logs: {e}") | |
| # Extract data from HTML | |
| hidden_values: List[str] = [] | |
| jwt_data: Dict[str, Any] = {} | |
| # Look for JWT tokens in HTML and console logs | |
| all_text = html + " ".join(console_logs) | |
| jwt_patterns = [ | |
| r"eyJ[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+", | |
| r"[A-Za-z0-9_-]{20,}\.[A-Za-z0-9_-]{20,}\.[A-Za-z0-9_-]{20,}" | |
| ] | |
| for pattern in jwt_patterns: | |
| jwt_matches = re.findall(pattern, all_text) | |
| for token in jwt_matches: | |
| logger.info(f"Found JWT token: {token[:50]}...") | |
| data = try_decode_jwt(token) | |
| if data: | |
| jwt_data.update(data) | |
| for k, v in data.items(): | |
| hidden_values.append(f"jwt {k}={v}") | |
| # Look for completion codes in console logs | |
| for log in console_logs: | |
| # Look for completion codes | |
| completion_matches = re.findall(r"completion[_\s]*code[:\s]*([A-Za-z0-9\-_]{6,})", log, flags=re.I) | |
| for code in completion_matches: | |
| hidden_values.append(f"completion_code {code}") | |
| # Look for challenge completion messages | |
| if "challenge" in log.lower() and ("complete" in log.lower() or "finished" in log.lower()): | |
| hidden_values.append(f"console_message {log}") | |
| # Execute JavaScript to check for global variables or challenge data | |
| try: | |
| js_result = driver.execute_script(""" | |
| var data = {}; | |
| if (window.challengeData) data.challengeData = window.challengeData; | |
| if (window.challenge) data.challenge = window.challenge; | |
| if (window.completionCode) data.completionCode = window.completionCode; | |
| return data; | |
| """) | |
| if js_result: | |
| for k, v in js_result.items(): | |
| hidden_values.append(f"js_global {k}={v}") | |
| logger.info(f"Found JS global: {k} = {v}") | |
| except Exception as e: | |
| logger.debug(f"JS execution failed: {e}") | |
| # Look for data in local storage | |
| try: | |
| local_storage = driver.execute_script("return window.localStorage;") | |
| if local_storage: | |
| for k, v in local_storage.items(): | |
| if any(keyword in k.lower() for keyword in ['challenge', 'code', 'completion']): | |
| hidden_values.append(f"localStorage {k}={v}") | |
| except Exception as e: | |
| logger.debug(f"LocalStorage check failed: {e}") | |
| logger.info(f"Found {len(hidden_values)} hidden values") | |
| logger.info(f"JWT data: {jwt_data}") | |
| return { | |
| "title": driver.title, | |
| "visible_text": driver.find_element(By.TAG_NAME, "body").text[:6000], | |
| "hidden_values": hidden_values, | |
| "jwt_data": jwt_data, | |
| "console_logs": console_logs, | |
| "button_clicked": button_clicked | |
| } | |
| except Exception as e: | |
| logger.error(f"Selenium scraping failed for {url}: {e}") | |
| return {} | |
| finally: | |
| if driver: | |
| driver.quit() | |
| # ------------------------- | |
| # Answer extractor | |
| # ------------------------- | |
| def answer_question(question: str, content: Dict[str, Any]) -> str: | |
| """Enhanced rule-based extraction for Round 5 questions.""" | |
| ql = question.lower() | |
| hidden = content.get("hidden_values", []) | |
| jwt_data = content.get("jwt_data", {}) | |
| console_logs = content.get("console_logs", []) | |
| logger.info(f"Answering question: {question}") | |
| logger.info(f"Available JWT data: {jwt_data}") | |
| logger.info(f"Hidden values count: {len(hidden)}") | |
| # Challenge ID extraction | |
| if "challenge id" in ql or "challengeid" in ql: | |
| # First check JWT data directly | |
| if "challengeID" in jwt_data: | |
| result = str(jwt_data["challengeID"]) | |
| logger.info(f"Found challengeID in JWT: {result}") | |
| return result | |
| # Check hidden values | |
| for h in hidden: | |
| if "challengeid" in h.lower(): | |
| result = h.split("=", 1)[-1].strip() | |
| logger.info(f"Found challengeID in hidden values: {result}") | |
| return result | |
| # Completion code extraction | |
| if "completion" in ql and "code" in ql: | |
| # Look for explicit completion codes | |
| for h in hidden: | |
| if "completion_code" in h.lower(): | |
| result = h.split("=", 1)[-1].strip() | |
| logger.info(f"Found completion code: {result}") | |
| return result | |
| # Look in console logs for completion codes | |
| for log in console_logs: | |
| completion_matches = re.findall(r"completion[_\s]*code[:\s]*([A-Za-z0-9\-_]{6,})", log, flags=re.I) | |
| if completion_matches: | |
| result = completion_matches[0] | |
| logger.info(f"Found completion code in console: {result}") | |
| return result | |
| # Look for any long tokens that might be completion codes | |
| for h in hidden: | |
| if "token" in h.lower() or "code" in h.lower(): | |
| token = h.split("=", 1)[-1].strip() | |
| if len(token) > 15: # Assuming completion codes are reasonably long | |
| logger.info(f"Found potential completion code: {token}") | |
| return token | |
| # Challenge name extraction | |
| if "challenge name" in ql: | |
| if "coolGuy" in jwt_data: | |
| result = str(jwt_data["coolGuy"]) | |
| logger.info(f"Found challenge name in JWT: {result}") | |
| return result | |
| # Fallback: return any relevant data from JWT | |
| if jwt_data: | |
| for key, value in jwt_data.items(): | |
| if key not in ["iat", "exp"] and isinstance(value, str): | |
| logger.info(f"Fallback: returning JWT field {key}: {value}") | |
| return str(value) | |
| logger.warning("No matching data found for question") | |
| return "Challenge information not found" | |
| # ------------------------- | |
| # Routes | |
| # ------------------------- | |
| def root(): | |
| return { | |
| "message": "HackRx Round 5 API - Ready (with Selenium support)", | |
| "endpoints": {"challenge": "POST /challenge", "health": "GET /health"}, | |
| } | |
| def health(): | |
| return {"status": "healthy"} | |
| def challenge(req: ChallengeRequest): | |
| logger.info(f"Round 5 request: url={req.url}, questions={req.questions}") | |
| content = scrape_with_selenium(req.url) | |
| if not content: | |
| return ChallengeResponse(answers=["Challenge information not found" for _ in req.questions]) | |
| answers = [] | |
| for q in req.questions: | |
| ans = answer_question(q, content) | |
| answers.append(ans) | |
| logger.info(f"Q: {q} → A: {ans}") | |
| logger.info(f"Final answers: {answers}") | |
| return ChallengeResponse(answers=answers) |