| """ |
| استيراد قوالب WQL من Excel إلى قاعدة البيانات. |
| |
| يُشغَّل مرة واحدة: |
| cd d:\GP\murshid_backend |
| .venv\Scripts\python.exe scripts\import_excel_templates.py |
| """ |
|
|
| import sys |
| import re |
| from pathlib import Path |
|
|
| |
| sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) |
|
|
| import openpyxl |
| from sqlalchemy.orm import Session |
|
|
| from app.config import settings |
| from app.db.session import SessionLocal |
| from app.models.query_template import QueryTemplate |
| from app.models.technique import Technique |
|
|
| EXCEL_PATH = Path(settings.murshid_models_dir) / "murshid_query_template_structure_clean_shared.xlsx" |
|
|
| |
| if not EXCEL_PATH.is_file(): |
| EXCEL_PATH = Path(__file__).resolve().parent.parent.parent / "murshid_query_template_structure_clean_shared.xlsx" |
|
|
| def normalise_query(q: str | None) -> str: |
| """Collapse whitespace/newlines in WQL query.""" |
| if not q: |
| return "" |
| return re.sub(r"\s+", " ", q.strip()) |
|
|
|
|
| def run(db: Session, replace: bool = False) -> dict: |
| if not EXCEL_PATH.is_file(): |
| return {"error": f"Excel file not found: {EXCEL_PATH}"} |
|
|
| wb = openpyxl.load_workbook(EXCEL_PATH) |
| ws = wb.active |
|
|
| rows = list(ws.iter_rows(min_row=2, values_only=True)) |
|
|
| inserted_techniques = 0 |
| inserted_templates = 0 |
| skipped = 0 |
| errors = [] |
|
|
| for idx, row in enumerate(rows, start=2): |
| technique_id = str(row[0] or "").strip() |
| technique_name = str(row[1] or "").strip() |
| template_id_str = str(row[2] or "").strip() |
| purpose = str(row[3] or "").strip() or None |
| wql_query = normalise_query(str(row[4] or "")) |
| note = str(row[5] or "").strip() or None |
|
|
| if not technique_id or not wql_query: |
| skipped += 1 |
| continue |
|
|
| |
| tech = db.get(Technique, technique_id) |
| if tech is None: |
| tech = Technique( |
| technique_id=technique_id, |
| technique_name=technique_name or technique_id, |
| tactic=None, |
| ) |
| db.add(tech) |
| db.flush() |
| inserted_techniques += 1 |
| elif technique_name and not tech.technique_name: |
| tech.technique_name = technique_name |
|
|
| |
| |
| existing = ( |
| db.query(QueryTemplate) |
| .filter( |
| QueryTemplate.technique_id == technique_id, |
| QueryTemplate.purpose == purpose, |
| ) |
| .first() |
| ) |
|
|
| if existing: |
| if replace: |
| existing.wql_query = wql_query |
| existing.note = note |
| existing.is_active = True |
| inserted_templates += 1 |
| else: |
| skipped += 1 |
| continue |
|
|
| tpl = QueryTemplate( |
| technique_id=technique_id, |
| purpose=purpose, |
| wql_query=wql_query, |
| note=note, |
| is_active=True, |
| ) |
| db.add(tpl) |
| inserted_templates += 1 |
|
|
| db.commit() |
|
|
| return { |
| "excel_path": str(EXCEL_PATH), |
| "rows_processed": len(rows), |
| "techniques_inserted": inserted_techniques, |
| "templates_inserted": inserted_templates, |
| "skipped": skipped, |
| "errors": errors, |
| } |
|
|
|
|
| if __name__ == "__main__": |
| replace = "--replace" in sys.argv |
|
|
| db: Session = SessionLocal() |
| try: |
| result = run(db, replace=replace) |
| print("\n=== Import Result ===") |
| for k, v in result.items(): |
| print(f" {k}: {v}") |
| finally: |
| db.close() |
|
|