SafeAIScan / tasks.py
TafadzwaTaps
fix: latest deploymnent
0d670df
"""
tasks.py β€” Background Scan Worker
===================================
Runs async repo scans as FastAPI BackgroundTasks.
No plan gating, no org logic β€” just clone β†’ validate β†’ scan β†’ save.
State machine: QUEUED β†’ CLONING β†’ VALIDATING β†’ SCANNING β†’ DONE
β†˜ FAILED
"""
import logging
import traceback
from datetime import datetime, timezone
import db
from scanner import safe_clone, validate_repo_size, scan_directory, build_result
logger = logging.getLogger("secretscan.tasks")
def run_repo_scan(task_id: str, repo_url: str, user_id: str, is_pro: bool) -> None:
"""
Full async pipeline for scanning a GitHub repository.
Called by FastAPI's BackgroundTasks β€” runs in a thread after the
HTTP response has already been sent to the client.
The client polls GET /scan/status/{task_id} for progress.
"""
import shutil
clone_dir = None
def _update(state: str, message: str, progress: int, result_json=None):
"""Helper: persist task state to the DB."""
fields = {
"state": state,
"message": message,
"progress": progress,
"updated_at": datetime.now(timezone.utc).isoformat(),
}
if result_json is not None:
fields["result_json"] = result_json
db.update_scan_task(task_id, fields)
try:
# ── 1. Clone ──────────────────────────────────────────
logger.info(f"[{task_id}] Starting repo scan: {repo_url}")
_update("CLONING", "Cloning repository…", 10)
clone_dir = safe_clone(repo_url)
# ── 2. Validate size ──────────────────────────────────
_update("VALIDATING", "Checking repository size…", 30)
validate_repo_size(clone_dir)
# ── 3. Scan for secrets ───────────────────────────────
_update("SCANNING", "Scanning for hardcoded secrets…", 60)
findings = scan_directory(clone_dir)
# ── 4. Build result and save ──────────────────────────
_update("SCANNING", "Preparing report…", 90)
result = build_result(findings, source=repo_url, is_pro=is_pro)
# Persist to scans table so GET /report/{id} works
scan_id = db.save_scan(user_id, repo_url, result)
_update("DONE", "Scan complete", 100, result_json={
**result,
"scan_id": scan_id,
})
logger.info(
f"[{task_id}] Done β€” {result['total_secrets']} secret(s), "
f"risk={result['risk_level']}"
)
except (ValueError, RuntimeError) as exc:
# Expected failures: bad URL, repo too large, clone failed
logger.warning(f"[{task_id}] Scan error: {exc}")
_update("FAILED", str(exc), 0, result_json={"error": str(exc)})
except Exception as exc:
# Unexpected failures β€” log full traceback for debugging
logger.error(f"[{task_id}] Unexpected error: {exc}")
traceback.print_exc()
_update("FAILED", "An unexpected error occurred.", 0,
result_json={"error": "Scan failed unexpectedly."})
finally:
if clone_dir:
shutil.rmtree(clone_dir, ignore_errors=True)
# ── Backwards-compatibility alias ─────────────────────────────────────────────
# app.py imports `run_scan` (old name). The function was renamed to
# run_repo_scan but the alias keeps the import working without touching app.py.
run_scan = run_repo_scan