Spaces:
Build error
Build error
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel | |
| from typing import List | |
| import requests | |
| import base64 | |
| import json | |
| import os | |
| from bs4 import BeautifulSoup | |
| import logging | |
| import re | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| app = FastAPI(title="HackRx Mission API", version="1.0.0") | |
| class ChallengeRequest(BaseModel): | |
| url: str | |
| questions: List[str] | |
| class ChallengeResponse(BaseModel): | |
| answers: List[str] | |
| # LLM API configuration | |
| LLM_URL = "https://register.hackrx.in/llm/openai" | |
| SUBSCRIPTION_KEY = os.getenv("SUBSCRIPTION_KEY", "sk-****") | |
| def call_llm(messages: List[dict], max_tokens: int = 150) -> str: | |
| """Call the LLM API with token optimization""" | |
| try: | |
| headers = { | |
| 'Content-Type': 'application/json', | |
| 'x-subscription-key': SUBSCRIPTION_KEY | |
| } | |
| data = { | |
| "messages": messages, | |
| "model": "gpt-5-nano", | |
| "max_tokens": max_tokens, | |
| "temperature": 0.1 | |
| } | |
| response = requests.post(LLM_URL, headers=headers, json=data) | |
| response.raise_for_status() | |
| result = response.json() | |
| return result.get('choices', [{}])[0].get('message', {}).get('content', '') | |
| except Exception as e: | |
| logger.error(f"LLM API call failed: {e}") | |
| return "" | |
| def extract_hidden_elements(html_content: str) -> List[str]: | |
| """Extract hidden elements from HTML""" | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| hidden_elements = [] | |
| # Look for hidden inputs | |
| hidden_inputs = soup.find_all('input', {'type': 'hidden'}) | |
| for inp in hidden_inputs: | |
| if inp.get('value'): | |
| hidden_elements.append(f"Hidden input: {inp.get('name', 'unnamed')} = {inp.get('value')}") | |
| # Look for HTML comments | |
| comments = soup.find_all(string=lambda text: isinstance(text, str) and text.strip().startswith('<!--')) | |
| for comment in comments: | |
| clean_comment = comment.strip().replace('<!--', '').replace('-->', '').strip() | |
| if clean_comment: | |
| hidden_elements.append(f"Comment: {clean_comment}") | |
| # Look for elements with display:none | |
| hidden_divs = soup.find_all(attrs={'style': re.compile(r'display\s*:\s*none', re.I)}) | |
| for div in hidden_divs: | |
| text = div.get_text(strip=True) | |
| if text: | |
| hidden_elements.append(f"Hidden element: {text}") | |
| # Look for data attributes | |
| elements_with_data = soup.find_all(attrs=lambda x: x and any(key.startswith('data-') for key in x.keys())) | |
| for elem in elements_with_data: | |
| for attr, value in elem.attrs.items(): | |
| if attr.startswith('data-') and value: | |
| hidden_elements.append(f"Data attribute {attr}: {value}") | |
| return hidden_elements | |
| def advanced_scrape(url: str) -> dict: | |
| """Enhanced scraping with better hidden element detection""" | |
| try: | |
| session = requests.Session() | |
| session.headers.update({ | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
| 'Accept-Language': 'en-US,en;q=0.5', | |
| 'Accept-Encoding': 'gzip, deflate', | |
| 'Connection': 'keep-alive' | |
| }) | |
| response = session.get(url, timeout=30) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Extract comprehensive information | |
| title = soup.find('title') | |
| title_text = title.get_text().strip() if title else "No title" | |
| # Get all text content | |
| visible_text = soup.get_text(separator=' ', strip=True) | |
| # Extract hidden elements | |
| hidden_elements = extract_hidden_elements(response.text) | |
| # Look for scripts that might contain data | |
| scripts = soup.find_all('script') | |
| script_data = [] | |
| for script in scripts: | |
| if script.string: | |
| script_content = script.string.strip() | |
| if any(keyword in script_content.lower() for keyword in ['challenge', 'code', 'answer', 'hidden']): | |
| script_data.append(f"Script data: {script_content[:200]}") | |
| # Look for meta tags | |
| meta_data = [] | |
| meta_tags = soup.find_all('meta') | |
| for meta in meta_tags: | |
| if meta.get('content'): | |
| meta_data.append(f"Meta {meta.get('name', 'unknown')}: {meta.get('content')}") | |
| return { | |
| 'title': title_text, | |
| 'visible_text': visible_text[:2000], | |
| 'hidden_elements': hidden_elements, | |
| 'script_data': script_data, | |
| 'meta_data': meta_data[:5], # Limit meta data | |
| 'html': response.text | |
| } | |
| except Exception as e: | |
| logger.error(f"Advanced scraping failed for {url}: {e}") | |
| return {} | |
| def analyze_content_intelligently(content: dict, question: str) -> str: | |
| """Intelligent content analysis with multiple strategies""" | |
| if not content: | |
| return "Unable to access page content" | |
| # Strategy 1: Direct pattern matching for common questions | |
| if "challenge name" in question.lower(): | |
| # Look in title first | |
| if content.get('title') and content['title'] != "No title": | |
| return content['title'] | |
| # Look in hidden elements | |
| for element in content.get('hidden_elements', []): | |
| if 'challenge' in element.lower(): | |
| parts = element.split(':') | |
| if len(parts) > 1: | |
| return parts[-1].strip().strip('"').strip("'") | |
| # Look in visible text for patterns | |
| visible = content.get('visible_text', '') | |
| challenge_patterns = [ | |
| r'challenge[:\s]+([^.\n]+)', | |
| r'name[:\s]+([^.\n]+)', | |
| r'title[:\s]+([^.\n]+)' | |
| ] | |
| for pattern in challenge_patterns: | |
| match = re.search(pattern, visible, re.IGNORECASE) | |
| if match: | |
| return match.group(1).strip() | |
| # Strategy 2: Use LLM for complex analysis | |
| context_parts = [] | |
| if content.get('title'): | |
| context_parts.append(f"Title: {content['title']}") | |
| if content.get('visible_text'): | |
| context_parts.append(f"Text: {content['visible_text'][:800]}") | |
| if content.get('hidden_elements'): | |
| context_parts.append(f"Hidden: {'; '.join(content['hidden_elements'][:3])}") | |
| if content.get('script_data'): | |
| context_parts.append(f"Scripts: {'; '.join(content['script_data'][:2])}") | |
| context = "\n".join(context_parts) | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": "Extract the specific answer from webpage content. Be direct and concise. Focus on challenge names, codes, or specific elements requested." | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"Question: {question}\n\nContent:\n{context}\n\nAnswer:" | |
| } | |
| ] | |
| llm_answer = call_llm(messages, max_tokens=50) | |
| # Strategy 3: Fallback to first meaningful hidden element | |
| if not llm_answer or len(llm_answer.strip()) < 3: | |
| for element in content.get('hidden_elements', []): | |
| if len(element.split(':')) > 1: | |
| return element.split(':')[-1].strip() | |
| return llm_answer.strip() if llm_answer else "Information not found" | |
| async def solve_challenge(request: ChallengeRequest): | |
| """Main endpoint to solve HackRx challenges""" | |
| logger.info(f"Received challenge request - URL: {request.url}") | |
| logger.info(f"Questions: {request.questions}") | |
| answers = [] | |
| try: | |
| for question in request.questions: | |
| logger.info(f"Processing question: {question}") | |
| # Scrape the page | |
| page_content = advanced_scrape(request.url) | |
| # Analyze and get answer | |
| answer = analyze_content_intelligently(page_content, question) | |
| answers.append(answer) | |
| logger.info(f"Answer found: {answer}") | |
| except Exception as e: | |
| logger.error(f"Error processing challenge: {e}") | |
| raise HTTPException(status_code=500, detail=f"Challenge processing failed: {str(e)}") | |
| return ChallengeResponse(answers=answers) | |
| async def health_check(): | |
| return {"status": "healthy", "selenium_available": False} | |
| async def root(): | |
| return { | |
| "message": "HackRx Mission API - Ready for action!", | |
| "mode": "requests-only", | |
| "endpoints": { | |
| "challenge": "/challenge (POST)", | |
| "health": "/health (GET)" | |
| } | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", 8000))) |