Spaces:
Build error
Build error
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel | |
| from typing import List | |
| import requests | |
| import base64 | |
| import json | |
| import os | |
| import time | |
| import asyncio | |
| from bs4 import BeautifulSoup | |
| import logging | |
| import re | |
| from selenium import webdriver | |
| from selenium.webdriver.chrome.options import Options | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.support import expected_conditions as EC | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| app = FastAPI(title="HackRx Mission API", version="1.0.0") | |
| class ChallengeRequest(BaseModel): | |
| url: str | |
| questions: List[str] | |
| class ChallengeResponse(BaseModel): | |
| answers: List[str] | |
| LLM_URL = "https://register.hackrx.in/llm/openai" | |
| SUBSCRIPTION_KEY = os.getenv("SUBSCRIPTION_KEY", "sk-****") | |
| def call_llm(messages: List[dict], max_tokens: int = 150) -> str: | |
| """Call the LLM API with token optimization""" | |
| try: | |
| headers = { | |
| 'Content-Type': 'application/json', | |
| 'x-subscription-key': SUBSCRIPTION_KEY | |
| } | |
| data = { | |
| "messages": messages, | |
| "model": "gpt-5-nano", | |
| "max_tokens": max_tokens, | |
| "temperature": 0.1 | |
| } | |
| response = requests.post(LLM_URL, headers=headers, json=data) | |
| response.raise_for_status() | |
| result = response.json() | |
| return result.get('choices', [{}])[0].get('message', {}).get('content', '') | |
| except Exception as e: | |
| logger.error(f"LLM API call failed: {e}") | |
| return "" | |
| def get_chrome_driver(): | |
| """Setup Chrome driver with console logging capabilities""" | |
| try: | |
| chrome_options = Options() | |
| chrome_options.add_argument("--headless") | |
| chrome_options.add_argument("--no-sandbox") | |
| chrome_options.add_argument("--disable-dev-shm-usage") | |
| chrome_options.add_argument("--disable-gpu") | |
| chrome_options.add_argument("--window-size=1920,1080") | |
| chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36") | |
| # Enable logging | |
| chrome_options.add_argument("--enable-logging") | |
| chrome_options.add_argument("--log-level=0") | |
| chrome_options.set_capability('goog:loggingPrefs', {'browser': 'ALL', 'performance': 'ALL'}) | |
| driver = webdriver.Chrome(options=chrome_options) | |
| return driver | |
| except Exception as e: | |
| logger.error(f"Failed to setup Chrome driver: {e}") | |
| return None | |
| def extract_console_logs_with_selenium(url: str) -> dict: | |
| """Extract console logs using Selenium""" | |
| driver = None | |
| try: | |
| driver = get_chrome_driver() | |
| if not driver: | |
| return {} | |
| logger.info(f"Loading page with Selenium: {url}") | |
| driver.get(url) | |
| # Wait for 3 seconds for console logs to happen | |
| time.sleep(3) | |
| # Get console logs | |
| console_logs = [] | |
| try: | |
| logs = driver.get_log('browser') | |
| for log in logs: | |
| if log['level'] in ['INFO', 'WARNING', 'SEVERE']: | |
| console_logs.append(f"Console {log['level']}: {log['message']}") | |
| except Exception as log_error: | |
| logger.warning(f"Could not retrieve console logs: {log_error}") | |
| # Get page source after waiting | |
| page_source = driver.page_source | |
| # Execute JavaScript to capture any additional console output | |
| try: | |
| # Inject console capture script | |
| console_capture_script = """ | |
| var consoleOutput = []; | |
| var originalLog = console.log; | |
| console.log = function() { | |
| consoleOutput.push(Array.from(arguments).join(' ')); | |
| originalLog.apply(console, arguments); | |
| }; | |
| // Wait a bit more and return captured output | |
| setTimeout(function() { | |
| window.capturedConsoleOutput = consoleOutput; | |
| }, 1000); | |
| return window.capturedConsoleOutput || []; | |
| """ | |
| captured_output = driver.execute_script(console_capture_script) | |
| if captured_output: | |
| for output in captured_output: | |
| console_logs.append(f"Captured console: {output}") | |
| except Exception as js_error: | |
| logger.warning(f"JavaScript execution failed: {js_error}") | |
| return { | |
| 'page_source': page_source, | |
| 'console_logs': console_logs | |
| } | |
| except Exception as e: | |
| logger.error(f"Selenium extraction failed: {e}") | |
| return {} | |
| finally: | |
| if driver: | |
| driver.quit() | |
| def extract_hidden_elements(html_content: str) -> List[str]: | |
| """Extract hidden elements from HTML""" | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| hidden_elements = [] | |
| hidden_inputs = soup.find_all('input', {'type': 'hidden'}) | |
| for inp in hidden_inputs: | |
| if inp.get('value'): | |
| hidden_elements.append(f"Hidden input: {inp.get('name', 'unnamed')} = {inp.get('value')}") | |
| comments = soup.find_all(string=lambda text: isinstance(text, str) and text.strip().startswith('<!--')) | |
| for comment in comments: | |
| clean_comment = comment.strip().replace('<!--', '').replace('-->', '').strip() | |
| if clean_comment: | |
| hidden_elements.append(f"Comment: {clean_comment}") | |
| hidden_divs = soup.find_all(attrs={'style': re.compile(r'display\s*:\s*none', re.I)}) | |
| for div in hidden_divs: | |
| text = div.get_text(strip=True) | |
| if text: | |
| hidden_elements.append(f"Hidden element: {text}") | |
| elements_with_data = soup.find_all(attrs=lambda x: x and any(key.startswith('data-') for key in x.keys())) | |
| for elem in elements_with_data: | |
| for attr, value in elem.attrs.items(): | |
| if attr.startswith('data-') and value: | |
| hidden_elements.append(f"Data attribute {attr}: {value}") | |
| return hidden_elements | |
| def advanced_scrape_with_console(url: str) -> dict: | |
| """Enhanced scraping with console log extraction""" | |
| try: | |
| # First try with Selenium for console logs | |
| selenium_data = extract_console_logs_with_selenium(url) | |
| # Fallback to requests if Selenium fails | |
| if not selenium_data: | |
| logger.info("Selenium failed, falling back to requests") | |
| session = requests.Session() | |
| session.headers.update({ | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
| 'Accept-Language': 'en-US,en;q=0.5', | |
| 'Accept-Encoding': 'gzip, deflate', | |
| 'Connection': 'keep-alive' | |
| }) | |
| response = session.get(url, timeout=30) | |
| response.raise_for_status() | |
| html_content = response.text | |
| console_logs = [] | |
| else: | |
| html_content = selenium_data.get('page_source', '') | |
| console_logs = selenium_data.get('console_logs', []) | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| title = soup.find('title') | |
| title_text = title.get_text().strip() if title else "No title" | |
| visible_text = soup.get_text(separator=' ', strip=True) | |
| hidden_elements = extract_hidden_elements(html_content) | |
| scripts = soup.find_all('script') | |
| script_data = [] | |
| for script in scripts: | |
| if script.string: | |
| script_content = script.string.strip() | |
| if any(keyword in script_content.lower() for keyword in ['challenge', 'code', 'answer', 'hidden', 'console.log']): | |
| script_data.append(f"Script data: {script_content[:300]}") | |
| # Look for meta tags | |
| meta_data = [] | |
| meta_tags = soup.find_all('meta') | |
| for meta in meta_tags: | |
| if meta.get('content'): | |
| meta_data.append(f"Meta {meta.get('name', 'unknown')}: {meta.get('content')}") | |
| return { | |
| 'title': title_text, | |
| 'visible_text': visible_text[:2000], | |
| 'hidden_elements': hidden_elements, | |
| 'script_data': script_data, | |
| 'meta_data': meta_data[:5], | |
| 'console_logs': console_logs, | |
| 'html': html_content | |
| } | |
| except Exception as e: | |
| logger.error(f"Advanced scraping with console failed for {url}: {e}") | |
| return {} | |
| def analyze_content_intelligently(content: dict, question: str) -> str: | |
| """Intelligent content analysis with console log support""" | |
| if not content: | |
| return "Unable to access page content" | |
| # Strategy 1: Check console logs first for direct answers | |
| console_logs = content.get('console_logs', []) | |
| if console_logs: | |
| logger.info(f"Found {len(console_logs)} console logs") | |
| for log in console_logs: | |
| if any(keyword in log.lower() for keyword in ['challenge', 'answer', 'code', 'name']): | |
| # Extract potential answer from console log | |
| parts = log.split(':') | |
| if len(parts) > 1: | |
| potential_answer = parts[-1].strip().strip('"').strip("'") | |
| if len(potential_answer) > 2: | |
| return potential_answer | |
| # Strategy 2: Direct pattern matching for common questions | |
| if "challenge name" in question.lower(): | |
| # Look in title first | |
| if content.get('title') and content['title'] != "No title": | |
| return content['title'] | |
| # Look in console logs | |
| for log in console_logs: | |
| if 'challenge' in log.lower() or 'name' in log.lower(): | |
| parts = log.split(':') | |
| if len(parts) > 1: | |
| return parts[-1].strip().strip('"').strip("'") | |
| # Look in hidden elements | |
| for element in content.get('hidden_elements', []): | |
| if 'challenge' in element.lower(): | |
| parts = element.split(':') | |
| if len(parts) > 1: | |
| return parts[-1].strip().strip('"').strip("'") | |
| # Look in visible text for patterns | |
| visible = content.get('visible_text', '') | |
| challenge_patterns = [ | |
| r'challenge[:\s]+([^.\n]+)', | |
| r'name[:\s]+([^.\n]+)', | |
| r'title[:\s]+([^.\n]+)' | |
| ] | |
| for pattern in challenge_patterns: | |
| match = re.search(pattern, visible, re.IGNORECASE) | |
| if match: | |
| return match.group(1).strip() | |
| # Strategy 3: Use LLM for complex analysis including console logs | |
| context_parts = [] | |
| if content.get('title'): | |
| context_parts.append(f"Title: {content['title']}") | |
| if content.get('visible_text'): | |
| context_parts.append(f"Text: {content['visible_text'][:800]}") | |
| if console_logs: | |
| context_parts.append(f"Console Logs: {'; '.join(console_logs[:5])}") | |
| if content.get('hidden_elements'): | |
| context_parts.append(f"Hidden: {'; '.join(content['hidden_elements'][:3])}") | |
| if content.get('script_data'): | |
| context_parts.append(f"Scripts: {'; '.join(content['script_data'][:2])}") | |
| context = "\n".join(context_parts) | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": "Extract the specific answer from webpage content including console logs. Be direct and concise. Focus on challenge names, codes, or specific elements requested. Console logs often contain the answer." | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"Question: {question}\n\nContent:\n{context}\n\nAnswer:" | |
| } | |
| ] | |
| llm_answer = call_llm(messages, max_tokens=50) | |
| # Strategy 4: Fallback to first meaningful console log or hidden element | |
| if not llm_answer or len(llm_answer.strip()) < 3: | |
| # Try console logs first | |
| for log in console_logs: | |
| if len(log.split(':')) > 1: | |
| return log.split(':')[-1].strip() | |
| # Then try hidden elements | |
| for element in content.get('hidden_elements', []): | |
| if len(element.split(':')) > 1: | |
| return element.split(':')[-1].strip() | |
| return llm_answer.strip() if llm_answer else "Information not found" | |
| async def solve_challenge(request: ChallengeRequest): | |
| """Main endpoint to solve HackRx challenges with console log support""" | |
| logger.info(f"Received challenge request - URL: {request.url}") | |
| logger.info(f"Questions: {request.questions}") | |
| answers = [] | |
| try: | |
| for question in request.questions: | |
| logger.info(f"Processing question: {question}") | |
| # Scrape the page with console log extraction | |
| page_content = advanced_scrape_with_console(request.url) | |
| # Log console output for debugging | |
| if page_content.get('console_logs'): | |
| logger.info(f"Console logs found: {page_content['console_logs']}") | |
| # Analyze and get answer | |
| answer = analyze_content_intelligently(page_content, question) | |
| answers.append(answer) | |
| logger.info(f"Answer found: {answer}") | |
| except Exception as e: | |
| logger.error(f"Error processing challenge: {e}") | |
| raise HTTPException(status_code=500, detail=f"Challenge processing failed: {str(e)}") | |
| return ChallengeResponse(answers=answers) | |
| async def health_check(): | |
| """Health check with Selenium availability""" | |
| selenium_available = False | |
| try: | |
| driver = get_chrome_driver() | |
| if driver: | |
| selenium_available = True | |
| driver.quit() | |
| except: | |
| pass | |
| return {"status": "healthy", "selenium_available": selenium_available} | |
| async def root(): | |
| return { | |
| "message": "HackRx Mission API - Ready for action with Console Log Support!", | |
| "mode": "selenium-enhanced", | |
| "features": [ | |
| "Console log extraction", | |
| "3-second wait for dynamic content", | |
| "Hidden element detection", | |
| "JavaScript execution" | |
| ], | |
| "endpoints": { | |
| "challenge": "/challenge (POST)", | |
| "health": "/health (GET)" | |
| } | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", 8000))) |