""" استيراد قوالب WQL من Excel إلى قاعدة البيانات. يُشغَّل مرة واحدة: cd d:\GP\murshid_backend .venv\Scripts\python.exe scripts\import_excel_templates.py """ import sys import re from pathlib import Path # أضف مسار الباكند لاستيراد app sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) import openpyxl from sqlalchemy.orm import Session from app.config import settings from app.db.session import SessionLocal from app.models.query_template import QueryTemplate from app.models.technique import Technique EXCEL_PATH = Path(settings.murshid_models_dir) / "murshid_query_template_structure_clean_shared.xlsx" # Fallback: same directory as project root if not EXCEL_PATH.is_file(): EXCEL_PATH = Path(__file__).resolve().parent.parent.parent / "murshid_query_template_structure_clean_shared.xlsx" def normalise_query(q: str | None) -> str: """Collapse whitespace/newlines in WQL query.""" if not q: return "" return re.sub(r"\s+", " ", q.strip()) def run(db: Session, replace: bool = False) -> dict: if not EXCEL_PATH.is_file(): return {"error": f"Excel file not found: {EXCEL_PATH}"} wb = openpyxl.load_workbook(EXCEL_PATH) ws = wb.active rows = list(ws.iter_rows(min_row=2, values_only=True)) inserted_techniques = 0 inserted_templates = 0 skipped = 0 errors = [] for idx, row in enumerate(rows, start=2): technique_id = str(row[0] or "").strip() technique_name = str(row[1] or "").strip() template_id_str = str(row[2] or "").strip() # e.g. "T1484-1" purpose = str(row[3] or "").strip() or None wql_query = normalise_query(str(row[4] or "")) note = str(row[5] or "").strip() or None if not technique_id or not wql_query: skipped += 1 continue # 1. Upsert Technique tech = db.get(Technique, technique_id) if tech is None: tech = Technique( technique_id=technique_id, technique_name=technique_name or technique_id, tactic=None, ) db.add(tech) db.flush() inserted_techniques += 1 elif technique_name and not tech.technique_name: tech.technique_name = technique_name # 2. Insert QueryTemplate (skip duplicate template_id_str unless replace=True) # Check uniqueness by (technique_id + purpose) to avoid duplicates on re-run existing = ( db.query(QueryTemplate) .filter( QueryTemplate.technique_id == technique_id, QueryTemplate.purpose == purpose, ) .first() ) if existing: if replace: existing.wql_query = wql_query existing.note = note existing.is_active = True inserted_templates += 1 else: skipped += 1 continue tpl = QueryTemplate( technique_id=technique_id, purpose=purpose, wql_query=wql_query, note=note, is_active=True, ) db.add(tpl) inserted_templates += 1 db.commit() return { "excel_path": str(EXCEL_PATH), "rows_processed": len(rows), "techniques_inserted": inserted_techniques, "templates_inserted": inserted_templates, "skipped": skipped, "errors": errors, } if __name__ == "__main__": replace = "--replace" in sys.argv db: Session = SessionLocal() try: result = run(db, replace=replace) print("\n=== Import Result ===") for k, v in result.items(): print(f" {k}: {v}") finally: db.close()