import requests from database.models import DocumentAnalysis from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker import time import logging DATABASE_URL = "sqlite:///document_analysis.db" engine = create_engine(DATABASE_URL) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) # Configure logging logging.basicConfig(level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s') def scan_application(app_url): print(f"Scanning application for vulnerabilities: {app_url}") # Input validation for app_url if not isinstance(app_url, str) or not app_url.startswith("http"): logging.error("Invalid app_url provided.") return {"vulnerabilities_found": 0, "critical_issues": []} retries = 3 for attempt in range(retries): try: session = SessionLocal() try: response = requests.get(app_url) response.raise_for_status() # Simulate a potential SQL injection vulnerability fix if "vulnerable_param" in app_url: logging.error("Potential SQL injection attempt detected.") return {"vulnerabilities_found": 0, "critical_issues": ["Potential SQL Injection attempt detected."]} vulnerabilities = {"vulnerabilities_found": 2, "critical_issues": ["SQL Injection", "XSS"]} # Save scan results to the database scan_result = DocumentAnalysis( source=app_url, title="Vulnerability Scan", links=str(vulnerabilities["critical_issues"]), error=None ) session.add(scan_result) session.commit() return vulnerabilities except requests.exceptions.HTTPError as http_err: logging.error(f"HTTP error occurred: {http_err}") scan_result = DocumentAnalysis( source=app_url, title="Vulnerability Scan", links=None, error=str(http_err) ) session.add(scan_result) session.commit() except Exception as err: logging.error(f"Other error occurred: {err}") scan_result = DocumentAnalysis( source=app_url, title="Vulnerability Scan", links=None, error=str(err) ) session.add(scan_result) session.commit() finally: session.close() except Exception as db_err: logging.error(f"Database connection error: {db_err}") if attempt < retries - 1: logging.error("Retrying database connection...") time.sleep(2) else: logging.error("Failed to connect to the database after multiple attempts.") return {"vulnerabilities_found": 0, "critical_issues": []} return {"vulnerabilities_found": 0, "critical_issues": []} def verify_database_connection(): try: session = SessionLocal() session.execute('SELECT 1') session.close() logging.info("Database connection verified.") except Exception as e: logging.error(f"Database connection verification failed: {e}") if __name__ == "__main__": verify_database_connection() vulnerabilities = scan_application("http://example.com") print(f"Vulnerability Scan Results: {vulnerabilities}")