Spaces:
Running
Running
| # ============================================================ | |
| # Site Backup & Error Checker - Backend v1.3.2 (Fixed) | |
| # ============================================================ | |
| import os | |
| import time | |
| import json | |
| import uuid | |
| import logging | |
| import base64 | |
| import io | |
| from datetime import datetime | |
| from typing import Optional, Dict | |
| from contextlib import asynccontextmanager | |
| from urllib.parse import urlparse | |
| from fastapi import FastAPI, HTTPException, Depends, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import StreamingResponse, JSONResponse | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from pydantic import BaseModel | |
| from selenium import webdriver | |
| from selenium.webdriver.chrome.options import Options | |
| from selenium.webdriver.chrome.service import Service | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.common.keys import Keys | |
| from selenium.webdriver.common.action_chains import ActionChains | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.support import expected_conditions as EC | |
| from selenium.common.exceptions import ( | |
| WebDriverException, NoSuchWindowException, | |
| InvalidSessionIdException, TimeoutException, | |
| StaleElementReferenceException, NoSuchElementException | |
| ) | |
| from utils.backup import SiteBackup | |
| from utils.error_checker import SiteErrorChecker | |
| # ============================================================ | |
| # CONFIGURAÇÃO DE LOG | |
| # ============================================================ | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # ============================================================ | |
| # AUTENTICAÇÃO | |
| # ============================================================ | |
| API_TOKEN = os.environ.get("API_TOKEN", "").strip() | |
| if API_TOKEN: | |
| logger.info(f"[AUTH] API_TOKEN configurado ({len(API_TOKEN)} caracteres)") | |
| else: | |
| logger.warning("[AUTH] API_TOKEN NÃO configurado - sistema sem autenticação") | |
| security = HTTPBearer(auto_error=False) | |
| async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): | |
| if not API_TOKEN: | |
| return True | |
| if not credentials: | |
| raise HTTPException(status_code=401, detail="Token não fornecido") | |
| if credentials.credentials != API_TOKEN: | |
| raise HTTPException(status_code=403, detail="Token inválido") | |
| return True | |
| # ============================================================ | |
| # HELPER: SAFE DRIVER OPERATIONS | |
| # ============================================================ | |
| def safe_switch_to_active_window(driver): | |
| """Garante que o driver está conectado a uma janela válida.""" | |
| try: | |
| handles = driver.window_handles | |
| if not handles: | |
| raise NoSuchWindowException("Nenhuma janela disponível") | |
| current = None | |
| try: | |
| current = driver.current_window_handle | |
| if current in handles: | |
| return current | |
| except: | |
| pass | |
| driver.switch_to.window(handles[-1]) | |
| logger.info(f"[WINDOW] Switched to window {handles[-1][:8]}... (total: {len(handles)})") | |
| return handles[-1] | |
| except Exception as e: | |
| logger.error(f"[WINDOW] Cannot recover window: {e}") | |
| raise | |
| def safe_get_screenshot(driver): | |
| """Tira screenshot com tratamento de erro.""" | |
| try: | |
| safe_switch_to_active_window(driver) | |
| return base64.b64encode(driver.get_screenshot_as_png()).decode('utf-8') | |
| except Exception as e: | |
| logger.warning(f"[SCREENSHOT] Failed: {e}") | |
| return None | |
| def safe_get_url_title(driver, session): | |
| """Obtém URL e título atuais com fallback.""" | |
| try: | |
| safe_switch_to_active_window(driver) | |
| url = driver.current_url | |
| title = driver.title or "Sem título" | |
| session["url"] = url | |
| session["title"] = title | |
| return url, title | |
| except: | |
| return session.get("url", ""), session.get("title", "Sem título") | |
| def safe_wait_page_load(driver, timeout=15): | |
| """Espera a página carregar com fallback.""" | |
| try: | |
| WebDriverWait(driver, timeout).until( | |
| lambda d: d.execute_script("return document.readyState") == "complete" | |
| ) | |
| except: | |
| time.sleep(3) | |
| def safe_wait_after_navigation(driver, original_url, timeout=8): | |
| """Espera redirecionamento ou carregamento após ação de navegação.""" | |
| try: | |
| WebDriverWait(driver, timeout).until( | |
| lambda d: d.current_url != original_url | |
| ) | |
| except: | |
| time.sleep(2) | |
| try: | |
| handles = driver.window_handles | |
| if len(handles) > 1: | |
| driver.switch_to.window(handles[-1]) | |
| logger.info(f"[NAV] Switched to new tab, total handles: {len(handles)}") | |
| except: | |
| pass | |
| safe_wait_page_load(driver, 10) | |
| # ============================================================ | |
| # SESSION MANAGER | |
| # ============================================================ | |
| class SessionManager: | |
| def __init__(self): | |
| self.sessions: Dict[str, dict] = {} | |
| self.max_sessions = 5 | |
| self.session_timeout = 1800 | |
| def create_session(self, url: str) -> dict: | |
| self._cleanup_expired() | |
| if len(self.sessions) >= self.max_sessions: | |
| oldest = min(self.sessions.items(), key=lambda x: x[1]['last_used']) | |
| self.close_session(oldest[0]) | |
| session_id = str(uuid.uuid4()) | |
| chrome_options = Options() | |
| chrome_options.add_argument('--headless=new') | |
| chrome_options.add_argument('--no-sandbox') | |
| chrome_options.add_argument('--disable-dev-shm-usage') | |
| chrome_options.add_argument('--disable-gpu') | |
| chrome_options.add_argument('--window-size=1920,1080') | |
| chrome_options.add_argument('--disable-extensions') | |
| chrome_options.add_argument('--disable-plugins') | |
| chrome_options.add_argument('--disable-blink-features=AutomationControlled') | |
| chrome_options.add_argument('--disable-popup-blocking') | |
| chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36') | |
| chrome_options.binary_location = '/usr/bin/google-chrome' | |
| service = Service('/usr/local/bin/chromedriver') | |
| try: | |
| driver = webdriver.Chrome(service=service, options=chrome_options) | |
| driver.set_page_load_timeout(60) | |
| driver.implicitly_wait(10) | |
| driver.execute_cdp_cmd('Runtime.enable', {}) | |
| driver.execute_cdp_cmd('Log.enable', {}) | |
| driver.execute_cdp_cmd('Network.enable', {}) | |
| except Exception as e: | |
| logger.error(f"[SESSION] Erro ao criar driver: {e}") | |
| raise HTTPException(status_code=500, detail=f"Erro ao iniciar navegador: {str(e)}") | |
| try: | |
| driver.get(url) | |
| safe_wait_page_load(driver, 30) | |
| except Exception as e: | |
| driver.quit() | |
| logger.error(f"[SESSION] Erro ao carregar URL: {e}") | |
| raise HTTPException(status_code=408, detail=f"Timeout ao carregar: {str(e)}") | |
| title = driver.title or "Sem título" | |
| self.sessions[session_id] = { | |
| 'driver': driver, | |
| 'url': url, | |
| 'title': title, | |
| 'created_at': time.time(), | |
| 'last_used': time.time(), | |
| 'status': 'active' | |
| } | |
| logger.info(f"[SESSION] Criada: {session_id[:8]}... -> {url}") | |
| return { | |
| 'session_id': session_id, | |
| 'url': url, | |
| 'title': title, | |
| 'status': 'active' | |
| } | |
| def get_session(self, session_id: str) -> dict: | |
| if session_id not in self.sessions: | |
| raise HTTPException(status_code=404, detail="Sessão não encontrada") | |
| session = self.sessions[session_id] | |
| if time.time() - session['last_used'] > self.session_timeout: | |
| self.close_session(session_id) | |
| raise HTTPException(status_code=410, detail="Sessão expirada") | |
| session['last_used'] = time.time() | |
| driver = session['driver'] | |
| try: | |
| safe_switch_to_active_window(driver) | |
| except: | |
| logger.warning(f"[SESSION] Driver dead for {session_id[:8]}, attempting recovery...") | |
| try: | |
| driver.quit() | |
| except: | |
| pass | |
| try: | |
| chrome_options = Options() | |
| chrome_options.add_argument('--headless=new') | |
| chrome_options.add_argument('--no-sandbox') | |
| chrome_options.add_argument('--disable-dev-shm-usage') | |
| chrome_options.add_argument('--disable-gpu') | |
| chrome_options.add_argument('--window-size=1920,1080') | |
| chrome_options.add_argument('--disable-extensions') | |
| chrome_options.add_argument('--disable-plugins') | |
| chrome_options.add_argument('--disable-blink-features=AutomationControlled') | |
| chrome_options.add_argument('--disable-popup-blocking') | |
| chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36') | |
| chrome_options.binary_location = '/usr/bin/google-chrome' | |
| service = Service('/usr/local/bin/chromedriver') | |
| new_driver = webdriver.Chrome(service=service, options=chrome_options) | |
| new_driver.set_page_load_timeout(60) | |
| new_driver.implicitly_wait(10) | |
| new_driver.get(session['url']) | |
| safe_wait_page_load(new_driver, 30) | |
| session['driver'] = new_driver | |
| session['title'] = new_driver.title or "Sem título" | |
| session['status'] = 'recovered' | |
| logger.info(f"[SESSION] Recovered: {session_id[:8]}...") | |
| except Exception as re: | |
| logger.error(f"[SESSION] Recovery failed: {re}") | |
| self.close_session(session_id) | |
| raise HTTPException(status_code=410, detail="Sessão perdida e não foi possível recuperar. Abra o site novamente.") | |
| return session | |
| def close_session(self, session_id: str): | |
| if session_id in self.sessions: | |
| try: | |
| self.sessions[session_id]['driver'].quit() | |
| except: | |
| pass | |
| del self.sessions[session_id] | |
| logger.info(f"[SESSION] Fechada: {session_id[:8]}...") | |
| def _cleanup_expired(self): | |
| expired = [ | |
| sid for sid, s in self.sessions.items() | |
| if time.time() - s['last_used'] > self.session_timeout | |
| ] | |
| for sid in expired: | |
| self.close_session(sid) | |
| def close_all(self): | |
| for sid in list(self.sessions.keys()): | |
| self.close_session(sid) | |
| session_manager = SessionManager() | |
| # ============================================================ | |
| # FASTAPI APP | |
| # ============================================================ | |
| async def lifespan(app: FastAPI): | |
| logger.info("=" * 50) | |
| logger.info("[SERVER] Site Backup & Error Checker v1.3.2") | |
| logger.info(f"[SERVER] Auth: {'ATIVO' if API_TOKEN else 'DESATIVADO'}") | |
| logger.info("=" * 50) | |
| yield | |
| logger.info("[SERVER] Encerrando servidor...") | |
| session_manager.close_all() | |
| app = FastAPI( | |
| title="Site Backup & Error Checker", | |
| version="1.3.2", | |
| lifespan=lifespan | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| expose_headers=[ | |
| "Content-Disposition", "X-Backup-Errors", | |
| "X-Total-Errors", "X-Total-Warnings", "X-Report-Name" | |
| ] | |
| ) | |
| # ============================================================ | |
| # MODELOS PYDANTIC | |
| # ============================================================ | |
| class OpenSiteRequest(BaseModel): | |
| url: str | |
| class SessionRequest(BaseModel): | |
| session_id: str | |
| class BackupRequest(BaseModel): | |
| session_id: str | |
| folder_name: Optional[str] = "backup" | |
| class ErrorCheckRequest(BaseModel): | |
| session_id: str | |
| folder_name: Optional[str] = "erros" | |
| class NavigateRequest(BaseModel): | |
| session_id: str | |
| url: str | |
| class SearchSiteRequest(BaseModel): | |
| session_id: str | |
| term: str | |
| folder_name: Optional[str] = "busca" | |
| # ============================================================ | |
| # ROTAS PÚBLICAS | |
| # ============================================================ | |
| async def root(): | |
| return { | |
| "status": "online", | |
| "service": "Site Backup & Error Checker", | |
| "version": "1.3.2", | |
| "auth_required": bool(API_TOKEN) | |
| } | |
| async def health(): | |
| return { | |
| "status": "healthy", | |
| "timestamp": time.time(), | |
| "active_sessions": len(session_manager.sessions), | |
| "auth_required": bool(API_TOKEN) | |
| } | |
| # ============================================================ | |
| # ROTAS PROTEGIDAS | |
| # ============================================================ | |
| async def auth_verify(auth=Depends(verify_token)): | |
| return {"valid": True, "message": "Token válido"} | |
| async def open_site(request: OpenSiteRequest, auth=Depends(verify_token)): | |
| url = request.url.strip() | |
| if not url.startswith(('http://', 'https://')): | |
| url = 'https://' + url | |
| try: | |
| result = session_manager.create_session(url) | |
| return result | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"[OPEN] Erro: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def session_status(request: SessionRequest, auth=Depends(verify_token)): | |
| session = session_manager.get_session(request.session_id) | |
| url, title = safe_get_url_title(session['driver'], session) | |
| return { | |
| "session_id": request.session_id, | |
| "url": url, "title": title, | |
| "status": session['status'], | |
| "created_at": datetime.fromtimestamp(session['created_at']).strftime("%d/%m/%Y %H:%M:%S"), | |
| "uptime": round(time.time() - session['created_at'], 1) | |
| } | |
| async def take_screenshot(request: SessionRequest, auth=Depends(verify_token)): | |
| session = session_manager.get_session(request.session_id) | |
| driver = session['driver'] | |
| try: | |
| safe_switch_to_active_window(driver) | |
| screenshot = driver.get_screenshot_as_png() | |
| return StreamingResponse( | |
| io.BytesIO(screenshot), | |
| media_type="image/png", | |
| headers={"Content-Disposition": "inline; filename=screenshot.png"} | |
| ) | |
| except Exception as e: | |
| logger.error(f"[SCREENSHOT] Erro: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def backup_site(request: BackupRequest, auth=Depends(verify_token)): | |
| session = session_manager.get_session(request.session_id) | |
| driver = session['driver'] | |
| url = session['url'] | |
| try: | |
| safe_switch_to_active_window(driver) | |
| session['status'] = 'backing_up' | |
| backup = SiteBackup(driver, url) | |
| zip_result = backup.generate_backup_zip(request.folder_name or "backup") | |
| if isinstance(zip_result, tuple): | |
| zip_buffer = zip_result[0] | |
| error_count = zip_result[1] | |
| else: | |
| zip_buffer = zip_result | |
| error_count = len(backup.errors) if hasattr(backup, 'errors') else 0 | |
| session['status'] = 'active' | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| folder = request.folder_name or "backup" | |
| filename = f"{folder}_{timestamp}.zip" | |
| errors_count = len(backup.errors) if hasattr(backup, 'errors') else 0 | |
| return StreamingResponse( | |
| io.BytesIO(zip_buffer.getvalue()), | |
| media_type="application/zip", | |
| headers={ | |
| "Content-Disposition": f"attachment; filename={filename}", | |
| "X-Backup-Errors": str(errors_count) | |
| } | |
| ) | |
| except Exception as e: | |
| session['status'] = 'active' | |
| logger.error(f"[BACKUP] Erro: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def check_errors(request: ErrorCheckRequest, auth=Depends(verify_token)): | |
| session = session_manager.get_session(request.session_id) | |
| driver = session['driver'] | |
| url = session['url'] | |
| try: | |
| safe_switch_to_active_window(driver) | |
| session['status'] = 'checking_errors' | |
| checker = SiteErrorChecker(driver, url) | |
| checker.run_all_checks() | |
| report = checker.generate_report_txt() | |
| session['status'] = 'active' | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| folder = request.folder_name or "erros" | |
| filename = f"{folder}_{timestamp}.txt" | |
| return StreamingResponse( | |
| io.BytesIO(report.encode('utf-8')), | |
| media_type="text/plain; charset=utf-8", | |
| headers={ | |
| "Content-Disposition": f"attachment; filename={filename}", | |
| "X-Total-Errors": str(checker.total_errors), | |
| "X-Total-Warnings": str(checker.total_warnings) | |
| } | |
| ) | |
| except Exception as e: | |
| session['status'] = 'active' | |
| logger.error(f"[ERRORS] Erro: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def check_errors_json(request: ErrorCheckRequest, auth=Depends(verify_token)): | |
| session = session_manager.get_session(request.session_id) | |
| driver = session['driver'] | |
| url = session['url'] | |
| try: | |
| safe_switch_to_active_window(driver) | |
| session['status'] = 'checking_errors' | |
| checker = SiteErrorChecker(driver, url) | |
| checker.run_all_checks() | |
| report = checker.generate_report_json() | |
| session['status'] = 'active' | |
| return JSONResponse( | |
| content=report, | |
| headers={ | |
| "X-Total-Errors": str(checker.total_errors), | |
| "X-Total-Warnings": str(checker.total_warnings) | |
| } | |
| ) | |
| except Exception as e: | |
| session['status'] = 'active' | |
| logger.error(f"[ERRORS-JSON] Erro: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def close_session(request: SessionRequest, auth=Depends(verify_token)): | |
| session_manager.close_session(request.session_id) | |
| return {"status": "closed", "message": "Sessão encerrada com sucesso"} | |
| async def list_sessions(auth=Depends(verify_token)): | |
| sessions_list = [] | |
| for sid, s in session_manager.sessions.items(): | |
| sessions_list.append({ | |
| "session_id": sid, "url": s['url'], "title": s['title'], | |
| "status": s['status'], | |
| "created_at": datetime.fromtimestamp(s['created_at']).strftime("%d/%m/%Y %H:%M:%S"), | |
| "uptime": round(time.time() - s['created_at'], 1) | |
| }) | |
| return {"sessions": sessions_list, "total": len(sessions_list)} | |
| async def navigate(request: NavigateRequest, auth=Depends(verify_token)): | |
| session = session_manager.get_session(request.session_id) | |
| driver = session['driver'] | |
| url = request.url.strip() | |
| if not url.startswith(('http://', 'https://', 'javascript:')): | |
| url = 'https://' + url | |
| try: | |
| safe_switch_to_active_window(driver) | |
| driver.get(url) | |
| safe_wait_page_load(driver) | |
| url_now, title_now = safe_get_url_title(driver, session) | |
| return {"status": "navigated", "url": url_now, "title": title_now} | |
| except Exception as e: | |
| logger.error(f"[NAVIGATE] Erro: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ============================================================ | |
| # INJECT COOKIES | |
| # ============================================================ | |
| async def inject_cookies(request: dict, token: str = Depends(verify_token)): | |
| """Inject cookies into Selenium session and navigate to target URL""" | |
| try: | |
| session_id = request.get("session_id") | |
| cookies = request.get("cookies", []) | |
| target_url = request.get("target_url", "") | |
| if not session_id: | |
| raise HTTPException(status_code=400, detail="session_id é obrigatório") | |
| if not cookies or not isinstance(cookies, list): | |
| raise HTTPException(status_code=400, detail="Lista de cookies é obrigatória") | |
| session = session_manager.get_session(session_id) | |
| driver = session["driver"] | |
| safe_switch_to_active_window(driver) | |
| if not target_url: | |
| target_url = session.get("url", driver.current_url) | |
| injected = 0 | |
| errors = [] | |
| domain_cookies = {} | |
| for c in cookies: | |
| d = c.get("domain", "").lstrip(".") | |
| if not d: | |
| parsed = urlparse(target_url) | |
| d = parsed.hostname or "" | |
| if d not in domain_cookies: | |
| domain_cookies[d] = [] | |
| domain_cookies[d].append(c) | |
| for domain, dcookies in domain_cookies.items(): | |
| try: | |
| try: | |
| driver.get(f"https://{domain}") | |
| time.sleep(1) | |
| except: | |
| pass | |
| try: | |
| driver.delete_all_cookies() | |
| except: | |
| pass | |
| for cookie in dcookies: | |
| try: | |
| selenium_cookie = { | |
| "name": cookie.get("name", ""), | |
| "value": cookie.get("value", ""), | |
| "path": cookie.get("path", "/"), | |
| } | |
| cookie_domain = cookie.get("domain", "") | |
| if cookie_domain: | |
| selenium_cookie["domain"] = cookie_domain | |
| if cookie.get("secure"): | |
| selenium_cookie["secure"] = True | |
| if cookie.get("expiry"): | |
| selenium_cookie["expiry"] = int(cookie["expiry"]) | |
| elif cookie.get("expirationDate"): | |
| selenium_cookie["expiry"] = int(cookie["expirationDate"]) | |
| same_site = cookie.get("sameSite", "") | |
| if same_site and same_site != "unspecified": | |
| for v in ["Strict", "Lax", "None"]: | |
| if same_site.lower() == v.lower(): | |
| selenium_cookie["sameSite"] = v | |
| break | |
| driver.add_cookie(selenium_cookie) | |
| injected += 1 | |
| except Exception as ce: | |
| errors.append(f"Cookie '{cookie.get('name', '?')}': {str(ce)[:60]}") | |
| except Exception as de: | |
| errors.append(f"Domain '{domain}': {str(de)[:60]}") | |
| try: | |
| driver.get(target_url) | |
| safe_wait_page_load(driver) | |
| time.sleep(2) | |
| except Exception as ne: | |
| errors.append(f"Navegação: {str(ne)[:60]}") | |
| url_now, title_now = safe_get_url_title(driver, session) | |
| login_success = False | |
| try: | |
| check = driver.execute_script(""" | |
| var b = document.body ? document.body.innerText.toLowerCase() : ''; | |
| var hasLoginForm = !!document.querySelector('input[type="password"]:not([style*="display: none"])'); | |
| return { | |
| hasLogout: b.includes('logout') || b.includes('sair') || b.includes('desconectar'), | |
| hasDash: b.includes('dashboard') || b.includes('painel') || b.includes('meus cursos'), | |
| hasProfile: b.includes('perfil') || b.includes('minha conta'), | |
| isLoginPage: hasLoginForm | |
| }; | |
| """) | |
| login_success = (check.get('hasLogout') or check.get('hasDash') or check.get('hasProfile')) and not check.get('isLoginPage') | |
| except: | |
| pass | |
| session["status"] = "logged_in" if login_success else "cookies_injected" | |
| screenshot = safe_get_screenshot(driver) | |
| return JSONResponse(content={ | |
| "status": "success", | |
| "injected_count": injected, "total_cookies": len(cookies), | |
| "errors": errors, "login_success": login_success, | |
| "final_url": url_now, "final_title": title_now, | |
| "url": url_now, "title": title_now, | |
| "screenshot": screenshot | |
| }) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Erro ao injetar cookies: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ============================================================ | |
| # AUTO LOGIN | |
| # ============================================================ | |
| async def auto_login(request: dict, token: str = Depends(verify_token)): | |
| """Automatically fill login form and submit""" | |
| try: | |
| session_id = request.get("session_id") | |
| email = request.get("email", "") | |
| password = request.get("password", "") | |
| if not session_id: | |
| raise HTTPException(status_code=400, detail="session_id é obrigatório") | |
| if not email or not password: | |
| raise HTTPException(status_code=400, detail="Email e senha são obrigatórios") | |
| session = session_manager.get_session(session_id) | |
| driver = session["driver"] | |
| safe_switch_to_active_window(driver) | |
| steps = [] | |
| original_url = driver.current_url | |
| wait = WebDriverWait(driver, 8) | |
| has_login_form = False | |
| try: | |
| email_fields = driver.find_elements(By.CSS_SELECTOR, | |
| "input[type='email'], input[name='email'], input[name='username'], " | |
| "input[autocomplete='email'], input[autocomplete='username'], " | |
| "input[placeholder*='email' i], input[placeholder*='e-mail' i]") | |
| pw_fields = driver.find_elements(By.CSS_SELECTOR, "input[type='password']") | |
| visible_email = [e for e in email_fields if e.is_displayed()] | |
| visible_pw = [e for e in pw_fields if e.is_displayed()] | |
| has_login_form = len(visible_email) > 0 or len(visible_pw) > 0 | |
| except: | |
| pass | |
| login_clicked = False | |
| if not has_login_form: | |
| try: | |
| login_nav_keywords = ['login', 'entrar', 'sign in', 'acessar', 'área do aluno'] | |
| clickable = driver.find_elements(By.CSS_SELECTOR, "a, button") | |
| for el in clickable: | |
| try: | |
| if not el.is_displayed() or not el.is_enabled(): | |
| continue | |
| txt = (el.text or '').strip().lower() | |
| href = (el.get_attribute('href') or '').lower() | |
| is_submit = el.get_attribute('type') in ['submit', 'button'] | |
| is_link = el.tag_name.lower() == 'a' | |
| if is_link or (not is_submit): | |
| if any(kw in txt or kw in href for kw in login_nav_keywords): | |
| driver.execute_script("arguments[0].click();", el) | |
| login_clicked = True | |
| steps.append(f"Navegou para login: {txt[:50]}") | |
| break | |
| except: | |
| continue | |
| except Exception as e: | |
| steps.append(f"Erro ao buscar link de login: {str(e)[:80]}") | |
| if login_clicked: | |
| try: | |
| safe_wait_after_navigation(driver, original_url, 10) | |
| safe_switch_to_active_window(driver) | |
| steps.append(f"Página de login: {driver.current_url[:80]}") | |
| except: | |
| time.sleep(2) | |
| else: | |
| steps.append("Formulário de login já presente na página") | |
| email_filled = False | |
| try: | |
| email_selectors = [ | |
| "input[type='email']", "input[name='email']", | |
| "input[name='username']", "input[id='email']", | |
| "input[id='username']", "input[autocomplete='email']", | |
| "input[autocomplete='username']", | |
| "input[placeholder*='email' i]", "input[placeholder*='e-mail' i]", | |
| "input[placeholder*='usuário' i]", | |
| ] | |
| for sel in email_selectors: | |
| try: | |
| els = driver.find_elements(By.CSS_SELECTOR, sel) | |
| for el in els: | |
| if el.is_displayed() and el.is_enabled(): | |
| el.click() | |
| el.clear() | |
| el.send_keys(email) | |
| email_filled = True | |
| steps.append(f"Email preenchido: {sel}") | |
| break | |
| except: | |
| continue | |
| if email_filled: | |
| break | |
| if not email_filled: | |
| inputs = driver.find_elements(By.CSS_SELECTOR, "input[type='text'], input[type='email'], input:not([type])") | |
| for inp in inputs: | |
| try: | |
| if inp.is_displayed() and inp.is_enabled(): | |
| inp_type = inp.get_attribute('type') or '' | |
| if inp_type not in ['hidden', 'submit', 'button', 'checkbox', 'radio', 'password']: | |
| inp.click() | |
| inp.clear() | |
| inp.send_keys(email) | |
| email_filled = True | |
| steps.append("Email preenchido no primeiro input visível") | |
| break | |
| except: | |
| continue | |
| except Exception as e: | |
| steps.append(f"Erro no email: {str(e)[:80]}") | |
| try: | |
| pw_visible = [p for p in driver.find_elements(By.CSS_SELECTOR, "input[type='password']") if p.is_displayed()] | |
| if not pw_visible: | |
| next_btns = driver.find_elements(By.CSS_SELECTOR, "button[type='submit'], button") | |
| for btn in next_btns: | |
| try: | |
| txt = (btn.text or '').strip().lower() | |
| if btn.is_displayed() and btn.is_enabled() and any(kw in txt for kw in ['próximo', 'next', 'continuar', 'avançar']): | |
| driver.execute_script("arguments[0].click();", btn) | |
| steps.append("Botão 'Próximo' clicado") | |
| try: | |
| wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "input[type='password']"))) | |
| except: | |
| time.sleep(2) | |
| break | |
| except: | |
| continue | |
| except: | |
| pass | |
| password_filled = False | |
| try: | |
| pw_fields = driver.find_elements(By.CSS_SELECTOR, "input[type='password']") | |
| for pf in pw_fields: | |
| try: | |
| if pf.is_displayed() and pf.is_enabled(): | |
| pf.click() | |
| pf.clear() | |
| pf.send_keys(password) | |
| password_filled = True | |
| steps.append("Senha preenchida") | |
| break | |
| except: | |
| continue | |
| except Exception as e: | |
| steps.append(f"Erro na senha: {str(e)[:80]}") | |
| submit_clicked = False | |
| url_before_submit = driver.current_url | |
| try: | |
| submit_keywords = ['entrar', 'login', 'sign in', 'acessar', 'enviar', 'submit', 'log in'] | |
| buttons = driver.find_elements(By.CSS_SELECTOR, "button[type='submit'], input[type='submit'], button") | |
| for btn in buttons: | |
| try: | |
| txt = (btn.text or '').strip().lower() | |
| if btn.is_displayed() and btn.is_enabled() and any(kw in txt for kw in submit_keywords): | |
| driver.execute_script("arguments[0].click();", btn) | |
| submit_clicked = True | |
| steps.append(f"Submit clicado: {(btn.text or '').strip()[:30]}") | |
| break | |
| except: | |
| continue | |
| if not submit_clicked and password_filled: | |
| pw_fields = driver.find_elements(By.CSS_SELECTOR, "input[type='password']") | |
| for pf in pw_fields: | |
| if pf.is_displayed(): | |
| pf.send_keys(Keys.ENTER) | |
| submit_clicked = True | |
| steps.append("Enter pressionado no campo de senha") | |
| break | |
| except Exception as e: | |
| steps.append(f"Erro no submit: {str(e)[:80]}") | |
| if submit_clicked: | |
| try: | |
| safe_wait_after_navigation(driver, url_before_submit, 10) | |
| safe_switch_to_active_window(driver) | |
| steps.append(f"Página após login: {driver.current_url[:80]}") | |
| except NoSuchWindowException: | |
| try: | |
| safe_switch_to_active_window(driver) | |
| steps.append(f"Recuperado para: {driver.current_url[:80]}") | |
| except: | |
| steps.append("Janela perdida após submit") | |
| except Exception as e: | |
| steps.append(f"Aguardando: {str(e)[:60]}") | |
| time.sleep(3) | |
| try: | |
| safe_switch_to_active_window(driver) | |
| except: | |
| pass | |
| url_now, title_now = safe_get_url_title(driver, session) | |
| login_success = False | |
| try: | |
| check = driver.execute_script(""" | |
| var b = document.body ? document.body.innerText.toLowerCase() : ''; | |
| return { | |
| hasLogout: b.includes('logout') || b.includes('sair') || b.includes('desconectar'), | |
| hasDash: b.includes('dashboard') || b.includes('painel') || b.includes('meus cursos') || b.includes('meu conteúdo'), | |
| hasProfile: b.includes('perfil') || b.includes('minha conta'), | |
| hasError: b.includes('senha incorreta') || b.includes('credenciais') || b.includes('inválid') || b.includes('incorrect'), | |
| urlChanged: arguments[0] !== arguments[1] | |
| }; | |
| """, original_url, url_now) | |
| login_success = (check.get('hasLogout') or check.get('hasDash') or check.get('hasProfile') or check.get('urlChanged')) and not check.get('hasError') | |
| if check.get('hasError'): | |
| steps.append("Possível erro de credenciais detectado") | |
| except: | |
| pass | |
| session["status"] = "logged_in" if login_success else "login_attempted" | |
| screenshot = safe_get_screenshot(driver) | |
| return JSONResponse(content={ | |
| "status": "success", | |
| "login_success": login_success, | |
| "steps_completed": steps, | |
| "email_filled": email_filled, | |
| "password_filled": password_filled, | |
| "submit_clicked": submit_clicked, | |
| "login_button_clicked": login_clicked, | |
| "original_url": original_url, | |
| "final_url": url_now, | |
| "final_title": title_now, | |
| "url_changed": original_url != url_now, | |
| "screenshot": screenshot | |
| }) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Erro no auto-login: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ============================================================ | |
| # SCROLL | |
| # ============================================================ | |
| async def scroll_page(request: dict, token: str = Depends(verify_token)): | |
| """Scroll the page up or down""" | |
| try: | |
| session_id = request.get("session_id") | |
| direction = request.get("direction", "down") | |
| amount = request.get("amount", 400) | |
| if not session_id: | |
| raise HTTPException(status_code=400, detail="session_id é obrigatório") | |
| session = session_manager.get_session(session_id) | |
| driver = session["driver"] | |
| safe_switch_to_active_window(driver) | |
| if direction == "up": | |
| driver.execute_script(f"window.scrollBy(0, -{amount});") | |
| else: | |
| driver.execute_script(f"window.scrollBy(0, {amount});") | |
| time.sleep(0.5) | |
| scroll_info = driver.execute_script(""" | |
| return { | |
| scrollTop: window.pageYOffset || document.documentElement.scrollTop, | |
| scrollHeight: document.documentElement.scrollHeight, | |
| clientHeight: document.documentElement.clientHeight | |
| }; | |
| """) | |
| screenshot = safe_get_screenshot(driver) | |
| url_now, title_now = safe_get_url_title(driver, session) | |
| return JSONResponse(content={ | |
| "status": "success", "direction": direction, "amount": amount, | |
| "scroll_top": scroll_info.get("scrollTop", 0), | |
| "scroll_height": scroll_info.get("scrollHeight", 0), | |
| "client_height": scroll_info.get("clientHeight", 0), | |
| "url": url_now, "title": title_now, | |
| "screenshot": screenshot | |
| }) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"[SCROLL] Erro: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ============================================================ | |
| # GET SELENIUM COOKIES | |
| # ============================================================ | |
| async def get_selenium_cookies(request: dict, token: str = Depends(verify_token)): | |
| """Get all cookies from Selenium session""" | |
| try: | |
| session_id = request.get("session_id") | |
| if not session_id: | |
| raise HTTPException(status_code=400, detail="session_id é obrigatório") | |
| session = session_manager.get_session(session_id) | |
| driver = session["driver"] | |
| safe_switch_to_active_window(driver) | |
| cookies = driver.get_cookies() | |
| return JSONResponse(content={ | |
| "status": "success", "cookies": cookies, | |
| "total": len(cookies), "url": driver.current_url, | |
| "domain": urlparse(driver.current_url).hostname or "" | |
| }) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"[COOKIES] Erro: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ============================================================ | |
| # REFRESH PAGE | |
| # ============================================================ | |
| async def refresh_page(request: dict, token: str = Depends(verify_token)): | |
| """Refresh the current page in Selenium""" | |
| try: | |
| session_id = request.get("session_id") | |
| if not session_id: | |
| raise HTTPException(status_code=400, detail="session_id é obrigatório") | |
| session = session_manager.get_session(session_id) | |
| driver = session["driver"] | |
| safe_switch_to_active_window(driver) | |
| driver.refresh() | |
| safe_wait_page_load(driver) | |
| time.sleep(1) | |
| url_now, title_now = safe_get_url_title(driver, session) | |
| screenshot = safe_get_screenshot(driver) | |
| return JSONResponse(content={ | |
| "status": "success", "url": url_now, | |
| "title": title_now, "screenshot": screenshot | |
| }) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"[REFRESH] Erro: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ============================================================ | |
| # CLICK ELEMENT (UNIFIED - robust window handling) | |
| # ============================================================ | |
| async def click_element(request: dict, token: str = Depends(verify_token)): | |
| """Click at specific coordinates on the page""" | |
| try: | |
| session_id = request.get("session_id") | |
| x = request.get("x", 0) | |
| y = request.get("y", 0) | |
| img_width = request.get("img_width", 1920) | |
| img_height = request.get("img_height", 1080) | |
| session = session_manager.get_session(session_id) | |
| if not session: | |
| raise HTTPException(status_code=404, detail="Sessao nao encontrada") | |
| driver = session["driver"] | |
| safe_switch_to_active_window(driver) | |
| viewport_width = driver.execute_script("return window.innerWidth;") | |
| viewport_height = driver.execute_script("return window.innerHeight;") | |
| actual_x = int(x * viewport_width / img_width) if img_width > 0 else int(x) | |
| actual_y = int(y * viewport_height / img_height) if img_height > 0 else int(y) | |
| actual_x = max(0, min(actual_x, viewport_width - 1)) | |
| actual_y = max(0, min(actual_y, viewport_height - 1)) | |
| logger.info(f"Click: preview({x},{y}) img({img_width}x{img_height}) -> real({actual_x},{actual_y}) viewport({viewport_width}x{viewport_height})") | |
| original_url = driver.current_url | |
| element_info = driver.execute_script(""" | |
| var ax = arguments[0], ay = arguments[1]; | |
| var elem = document.elementFromPoint(ax, ay); | |
| var info = null; | |
| if (elem) { | |
| info = { | |
| tag: elem.tagName || '', | |
| tagName: elem.tagName || '', | |
| id: elem.id || '', | |
| text: (elem.textContent || '').substring(0, 100).trim(), | |
| type: elem.type || elem.getAttribute('type') || '', | |
| href: elem.href || elem.getAttribute('href') || '', | |
| className: (typeof elem.className === 'string' ? elem.className : '').substring(0, 100) | |
| }; | |
| // Se o elemento é um iframe, tentar clicar dentro dele | |
| if (elem.tagName === 'IFRAME') { | |
| try { | |
| var iframeRect = elem.getBoundingClientRect(); | |
| var innerX = ax - iframeRect.left; | |
| var innerY = ay - iframeRect.top; | |
| var iframeDoc = elem.contentDocument || elem.contentWindow.document; | |
| var innerElem = iframeDoc.elementFromPoint(innerX, innerY); | |
| if (innerElem) { | |
| innerElem.focus(); | |
| ['mousedown', 'mouseup', 'click'].forEach(function(evtName) { | |
| innerElem.dispatchEvent(new MouseEvent(evtName, { | |
| bubbles: true, cancelable: true, | |
| clientX: innerX, clientY: innerY, view: elem.contentWindow | |
| })); | |
| }); | |
| return info; | |
| } | |
| } catch(e) { | |
| // iframe cross-origin | |
| } | |
| } | |
| // Focar no elemento primeiro | |
| elem.focus(); | |
| // Disparar eventos de mouse completos | |
| ['mousedown', 'mouseup', 'click'].forEach(function(evtName) { | |
| elem.dispatchEvent(new MouseEvent(evtName, { | |
| bubbles: true, cancelable: true, | |
| clientX: ax, clientY: ay, view: window | |
| })); | |
| }); | |
| // Se for input/textarea, colocar cursor no final | |
| if (elem.tagName === 'INPUT' || elem.tagName === 'TEXTAREA') { | |
| elem.focus(); | |
| try { elem.setSelectionRange(elem.value.length, elem.value.length); } catch(e) {} | |
| } | |
| } else { | |
| document.body.dispatchEvent(new MouseEvent('click', { | |
| bubbles: true, cancelable: true, | |
| clientX: ax, clientY: ay, view: window | |
| })); | |
| } | |
| return info; | |
| """, actual_x, actual_y) | |
| time.sleep(2) | |
| try: | |
| handles = driver.window_handles | |
| if len(handles) > 1: | |
| driver.switch_to.window(handles[-1]) | |
| logger.info(f"[CLICK] Switched to new tab, total: {len(handles)}") | |
| except: | |
| pass | |
| safe_switch_to_active_window(driver) | |
| safe_wait_page_load(driver, 8) | |
| screenshot = safe_get_screenshot(driver) | |
| current_url = "" | |
| title = "" | |
| try: | |
| current_url = driver.current_url | |
| title = driver.title | |
| except: | |
| pass | |
| session["last_url"] = current_url | |
| session["last_activity"] = time.time() | |
| url_changed = current_url != original_url | |
| return { | |
| "success": True, | |
| "message": f"Clique em ({actual_x}, {actual_y})", | |
| "clicked": {"x": actual_x, "y": actual_y}, | |
| "element": element_info or {"tag": "none", "text": "Nenhum elemento"}, | |
| "url": current_url, | |
| "current_url": current_url, | |
| "title": title, | |
| "url_changed": url_changed, | |
| "screenshot": screenshot | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"[CLICK] Erro: {e}") | |
| screenshot = None | |
| try: | |
| screenshot = safe_get_screenshot(session["driver"]) | |
| except: | |
| pass | |
| return {"success": False, "message": str(e), "screenshot": screenshot} | |
| # ============================================================ | |
| # TYPE TEXT (FIXED - robust window handling) | |
| # ============================================================ | |
| async def type_text(request: dict, token: str = Depends(verify_token)): | |
| """Type text or press special keys with robust error recovery""" | |
| try: | |
| session_id = request.get("session_id") | |
| text = request.get("text", "") | |
| press_enter = request.get("press_enter", False) | |
| clear_first = request.get("clear_first", False) | |
| selector = request.get("selector", None) | |
| if not session_id: | |
| raise HTTPException(status_code=400, detail="session_id required") | |
| session = session_manager.get_session(session_id) | |
| driver = session["driver"] | |
| safe_switch_to_active_window(driver) | |
| url_before = driver.current_url | |
| # Tratar SELECT_ALL (Ctrl+A) | |
| if text == "SELECT_ALL": | |
| actions = ActionChains(driver) | |
| actions.key_down(Keys.CONTROL).send_keys('a').key_up(Keys.CONTROL).perform() | |
| time.sleep(0.3) | |
| screenshot = safe_get_screenshot(driver) | |
| return { | |
| "success": True, | |
| "typed": True, | |
| "screenshot": screenshot, | |
| "url": driver.current_url, | |
| "title": driver.title | |
| } | |
| special_keys = { | |
| '\uE004': Keys.TAB, '\uE00C': Keys.ESCAPE, | |
| '\uE007': Keys.ENTER, '\uE003': Keys.BACKSPACE, | |
| '\uE006': Keys.RETURN, | |
| '\uE012': Keys.ARROW_LEFT, '\uE013': Keys.ARROW_UP, | |
| '\uE014': Keys.ARROW_RIGHT, '\uE015': Keys.ARROW_DOWN, | |
| '\uE010': Keys.END, '\uE011': Keys.HOME, | |
| '\uE00D': Keys.SPACE, '\uE017': Keys.DELETE, | |
| } | |
| key_names = { | |
| '\uE004': 'Tab', '\uE00C': 'Escape', '\uE007': 'Enter', | |
| '\uE003': 'Backspace', '\uE006': 'Return', | |
| '\uE012': 'ArrowLeft', '\uE013': 'ArrowUp', | |
| '\uE014': 'ArrowRight', '\uE015': 'ArrowDown', | |
| '\uE010': 'End', '\uE011': 'Home', | |
| '\uE00D': 'Space', '\uE017': 'Delete' | |
| } | |
| typed = False | |
| element_info = {"tagName": "none", "method": "none"} | |
| is_special = text in special_keys | |
| try: | |
| if is_special: | |
| actions = ActionChains(driver) | |
| actions.send_keys(special_keys[text]).perform() | |
| typed = True | |
| element_info = {"tagName": "body", "method": "ActionChains", "key": key_names.get(text, 'Special')} | |
| else: | |
| target = None | |
| if selector: | |
| try: | |
| target = driver.find_element(By.CSS_SELECTOR, selector) | |
| except: | |
| pass | |
| if not target: | |
| try: | |
| target = driver.switch_to.active_element | |
| except: | |
| pass | |
| if not target: | |
| try: | |
| focusable = driver.find_elements(By.CSS_SELECTOR, | |
| "input:focus, textarea:focus, [contenteditable='true']:focus") | |
| if focusable: | |
| target = focusable[0] | |
| except: | |
| pass | |
| if target: | |
| try: | |
| if clear_first: | |
| target.clear() | |
| target.send_keys(text) | |
| typed = True | |
| tag = "unknown" | |
| try: | |
| tag = target.tag_name | |
| except: | |
| pass | |
| element_info = {"tagName": tag, "method": "send_keys"} | |
| except Exception as e: | |
| logger.warning(f"[TYPE] send_keys failed: {e}") | |
| actions = ActionChains(driver) | |
| if clear_first: | |
| actions.key_down(Keys.CONTROL).send_keys('a').key_up(Keys.CONTROL) | |
| actions.send_keys(Keys.DELETE) | |
| actions.send_keys(text) | |
| actions.perform() | |
| typed = True | |
| element_info = {"tagName": "body", "method": "ActionChains_fallback"} | |
| else: | |
| actions = ActionChains(driver) | |
| actions.send_keys(text) | |
| actions.perform() | |
| typed = True | |
| element_info = {"tagName": "body", "method": "ActionChains_no_target"} | |
| except Exception as e: | |
| logger.error(f"[TYPE] Error: {e}") | |
| return {"success": False, "typed": False, "message": str(e)} | |
| # Após digitar, verificar se precisa pressionar Enter | |
| if press_enter and typed: | |
| try: | |
| time.sleep(0.3) | |
| actions = ActionChains(driver) | |
| actions.send_keys(Keys.ENTER).perform() | |
| except: | |
| pass | |
| time.sleep(0.5) | |
| # Verificar se houve navegação | |
| try: | |
| safe_switch_to_active_window(driver) | |
| except: | |
| pass | |
| safe_wait_page_load(driver, 5) | |
| url_now = driver.current_url | |
| title_now = driver.title | |
| url_changed = url_before != url_now | |
| screenshot = safe_get_screenshot(driver) | |
| return { | |
| "success": True, | |
| "typed": typed, | |
| "text_length": len(text), | |
| "element": element_info, | |
| "url": url_now, | |
| "title": title_now, | |
| "url_changed": url_changed, | |
| "screenshot": screenshot | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"[TYPE-TEXT] Erro: {e}") | |
| screenshot = None | |
| try: | |
| screenshot = safe_get_screenshot(session["driver"]) | |
| except: | |
| pass | |
| return {"success": False, "typed": False, "message": str(e), "screenshot": screenshot} | |
| # ============================================================ | |
| # SEARCH SITE | |
| # ============================================================ | |
| async def search_site(request: SearchSiteRequest, auth=Depends(verify_token)): | |
| """Busca elementos específicos no site""" | |
| session = session_manager.get_session(request.session_id) | |
| driver = session['driver'] | |
| safe_switch_to_active_window(driver) | |
| term = request.term.strip().lower() | |
| search_results = {"findings": [], "term": term, "url": driver.current_url, "title": driver.title} | |
| try: | |
| session['status'] = 'searching' | |
| findings = driver.execute_script(""" | |
| var term = arguments[0]; | |
| var results = []; | |
| var body = document.body ? document.body.innerText : ''; | |
| var lines = body.split('\\n'); | |
| for (var i = 0; i < lines.length; i++) { | |
| if (lines[i].toLowerCase().indexOf(term) >= 0) { | |
| results.push({ | |
| type: 'Texto visível', | |
| value: lines[i].trim().substring(0, 200), | |
| category: 'text', | |
| details: 'Linha ' + (i + 1) | |
| }); | |
| } | |
| } | |
| var links = document.querySelectorAll('a[href]'); | |
| links.forEach(function(a) { | |
| var href = a.href || ''; | |
| var text = (a.textContent || '').trim(); | |
| if (href.toLowerCase().indexOf(term) >= 0 || text.toLowerCase().indexOf(term) >= 0) { | |
| results.push({ | |
| type: 'Link', | |
| value: text.substring(0, 100), | |
| category: 'links', | |
| details: href.substring(0, 200) | |
| }); | |
| } | |
| }); | |
| var imgs = document.querySelectorAll('img'); | |
| imgs.forEach(function(img) { | |
| var src = img.src || ''; | |
| var alt = img.alt || ''; | |
| if (src.toLowerCase().indexOf(term) >= 0 || alt.toLowerCase().indexOf(term) >= 0) { | |
| results.push({ | |
| type: 'Imagem', | |
| value: alt || src.substring(0, 100), | |
| category: 'images', | |
| details: src.substring(0, 200) | |
| }); | |
| } | |
| }); | |
| var scripts = document.querySelectorAll('script:not([src])'); | |
| scripts.forEach(function(s, idx) { | |
| var text = s.textContent || ''; | |
| if (text.toLowerCase().indexOf(term) >= 0) { | |
| var pos = text.toLowerCase().indexOf(term); | |
| var snippet = text.substring(Math.max(0, pos - 50), pos + term.length + 50); | |
| results.push({ | |
| type: 'Script inline', | |
| value: snippet.trim(), | |
| category: 'scripts', | |
| details: 'Script #' + (idx + 1) | |
| }); | |
| } | |
| }); | |
| var extScripts = document.querySelectorAll('script[src]'); | |
| extScripts.forEach(function(s) { | |
| if (s.src.toLowerCase().indexOf(term) >= 0) { | |
| results.push({ | |
| type: 'Script externo', | |
| value: s.src, | |
| category: 'scripts', | |
| details: 'URL do script' | |
| }); | |
| } | |
| }); | |
| var metas = document.querySelectorAll('meta'); | |
| metas.forEach(function(m) { | |
| var content = m.content || ''; | |
| var name = m.name || m.getAttribute('property') || ''; | |
| if (content.toLowerCase().indexOf(term) >= 0 || name.toLowerCase().indexOf(term) >= 0) { | |
| results.push({ | |
| type: 'Meta tag', | |
| value: name + ': ' + content.substring(0, 150), | |
| category: 'meta', | |
| details: 'Meta tag' | |
| }); | |
| } | |
| }); | |
| var inputs = document.querySelectorAll('input, textarea, select'); | |
| inputs.forEach(function(inp) { | |
| var name = inp.name || ''; | |
| var id = inp.id || ''; | |
| var placeholder = inp.placeholder || ''; | |
| var val = inp.value || ''; | |
| var searchStr = (name + ' ' + id + ' ' + placeholder + ' ' + val).toLowerCase(); | |
| if (searchStr.indexOf(term) >= 0) { | |
| results.push({ | |
| type: 'Formulário', | |
| value: '<' + inp.tagName + '> name=' + name + ' id=' + id, | |
| category: 'forms', | |
| details: 'placeholder: ' + placeholder | |
| }); | |
| } | |
| }); | |
| var allEls = document.querySelectorAll('*'); | |
| var cssResults = []; | |
| allEls.forEach(function(el) { | |
| var cls = (typeof el.className === 'string' ? el.className : '').toLowerCase(); | |
| var id = (el.id || '').toLowerCase(); | |
| if ((cls.indexOf(term) >= 0 || id.indexOf(term) >= 0) && cssResults.length < 20) { | |
| cssResults.push({ | |
| type: 'Elemento CSS', | |
| value: '<' + el.tagName + '> class="' + (typeof el.className === 'string' ? el.className : '').substring(0, 80) + '" id="' + (el.id || '') + '"', | |
| category: 'css_elements', | |
| details: el.tagName | |
| }); | |
| } | |
| }); | |
| results = results.concat(cssResults); | |
| return results; | |
| """, term) | |
| if any(kw in term for kw in ['api', 'endpoint', 'fetch', 'ajax', 'request', 'requisição', 'requisicao']): | |
| try: | |
| api_findings = driver.execute_script(""" | |
| var results = []; | |
| var scripts = document.querySelectorAll('script'); | |
| scripts.forEach(function(s) { | |
| var text = s.textContent || s.innerText || ''; | |
| var urlPattern = /(?:fetch|axios\\.(?:get|post|put|delete)|XMLHttpRequest)\\s*\\(\\s*['\"`]([^'\"`]+)['\"`]/g; | |
| var match; | |
| while ((match = urlPattern.exec(text)) !== null) { | |
| results.push({type: 'API Endpoint', value: match[1], category: 'api', details: 'fetch/axios'}); | |
| } | |
| var apiPattern = /['\"`]((?:https?:\\/\\/[^'\"`]*\\/api\\/[^'\"`]*)|(?:\\/api\\/[^'\"`]*))['\\"`]/g; | |
| while ((match = apiPattern.exec(text)) !== null) { | |
| results.push({type: 'API URL', value: match[1], category: 'api', details: 'URL pattern'}); | |
| } | |
| }); | |
| return results; | |
| """) | |
| if api_findings: | |
| findings.extend(api_findings) | |
| except: | |
| pass | |
| search_results["findings"] = findings or [] | |
| search_results["total_found"] = len(findings or []) | |
| session['status'] = 'active' | |
| return JSONResponse(content=search_results) | |
| except Exception as e: | |
| session['status'] = 'active' | |
| logger.error(f"[SEARCH] Erro: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def search_site_txt(request: SearchSiteRequest, auth=Depends(verify_token)): | |
| """Busca no site e retorna relatório TXT""" | |
| session = session_manager.get_session(request.session_id) | |
| driver = session['driver'] | |
| safe_switch_to_active_window(driver) | |
| term = request.term.strip().lower() | |
| try: | |
| session['status'] = 'searching' | |
| findings = driver.execute_script(""" | |
| var term = arguments[0]; | |
| var results = []; | |
| var body = document.body ? document.body.innerText : ''; | |
| var lines = body.split('\\n'); | |
| for (var i = 0; i < lines.length; i++) { | |
| if (lines[i].toLowerCase().indexOf(term) >= 0) { | |
| results.push({type: 'Texto', value: lines[i].trim().substring(0, 200), details: 'Linha ' + (i+1)}); | |
| } | |
| } | |
| var links = document.querySelectorAll('a[href]'); | |
| links.forEach(function(a) { | |
| var href = a.href || ''; | |
| var text = (a.textContent || '').trim(); | |
| if (href.toLowerCase().indexOf(term) >= 0 || text.toLowerCase().indexOf(term) >= 0) { | |
| results.push({type: 'Link', value: text.substring(0, 100), details: href.substring(0, 200)}); | |
| } | |
| }); | |
| var scripts = document.querySelectorAll('script:not([src])'); | |
| scripts.forEach(function(s, idx) { | |
| var text = s.textContent || ''; | |
| if (text.toLowerCase().indexOf(term) >= 0) { | |
| var pos = text.toLowerCase().indexOf(term); | |
| results.push({type: 'Script', value: text.substring(Math.max(0, pos-50), pos+term.length+50).trim(), details: 'Script #'+(idx+1)}); | |
| } | |
| }); | |
| return results; | |
| """, term) | |
| report = [] | |
| report.append("=" * 60) | |
| report.append(" RELATÓRIO DE BUSCA NO SITE") | |
| report.append("=" * 60) | |
| report.append(f"\nTermo buscado: {request.term}") | |
| report.append(f"URL: {driver.current_url}") | |
| report.append(f"Título: {driver.title}") | |
| report.append(f"Data: {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}") | |
| report.append(f"Total encontrado: {len(findings or [])}") | |
| report.append(f"\n{'─' * 40}") | |
| if findings: | |
| for i, item in enumerate(findings): | |
| report.append(f"\n[{i+1}] {item.get('type', 'Item')}:") | |
| report.append(f" Valor: {item.get('value', '')}") | |
| if item.get('details'): | |
| report.append(f" Detalhes: {item['details']}") | |
| else: | |
| report.append("\nNenhum resultado encontrado.") | |
| report.append(f"\n{'=' * 60}") | |
| report_text = "\n".join(report) | |
| session['status'] = 'active' | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| folder = request.folder_name or "busca" | |
| filename = f"{folder}_{timestamp}.txt" | |
| return StreamingResponse( | |
| io.BytesIO(report_text.encode('utf-8')), | |
| media_type="text/plain; charset=utf-8", | |
| headers={"Content-Disposition": f"attachment; filename={filename}"} | |
| ) | |
| except Exception as e: | |
| session['status'] = 'active' | |
| logger.error(f"[SEARCH-TXT] Erro: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |