File size: 3,954 Bytes
26e1c2e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | """
استيراد قوالب WQL من Excel إلى قاعدة البيانات.
يُشغَّل مرة واحدة:
cd d:\GP\murshid_backend
.venv\Scripts\python.exe scripts\import_excel_templates.py
"""
import sys
import re
from pathlib import Path
# أضف مسار الباكند لاستيراد app
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
import openpyxl
from sqlalchemy.orm import Session
from app.config import settings
from app.db.session import SessionLocal
from app.models.query_template import QueryTemplate
from app.models.technique import Technique
EXCEL_PATH = Path(settings.murshid_models_dir) / "murshid_query_template_structure_clean_shared.xlsx"
# Fallback: same directory as project root
if not EXCEL_PATH.is_file():
EXCEL_PATH = Path(__file__).resolve().parent.parent.parent / "murshid_query_template_structure_clean_shared.xlsx"
def normalise_query(q: str | None) -> str:
"""Collapse whitespace/newlines in WQL query."""
if not q:
return ""
return re.sub(r"\s+", " ", q.strip())
def run(db: Session, replace: bool = False) -> dict:
if not EXCEL_PATH.is_file():
return {"error": f"Excel file not found: {EXCEL_PATH}"}
wb = openpyxl.load_workbook(EXCEL_PATH)
ws = wb.active
rows = list(ws.iter_rows(min_row=2, values_only=True))
inserted_techniques = 0
inserted_templates = 0
skipped = 0
errors = []
for idx, row in enumerate(rows, start=2):
technique_id = str(row[0] or "").strip()
technique_name = str(row[1] or "").strip()
template_id_str = str(row[2] or "").strip() # e.g. "T1484-1"
purpose = str(row[3] or "").strip() or None
wql_query = normalise_query(str(row[4] or ""))
note = str(row[5] or "").strip() or None
if not technique_id or not wql_query:
skipped += 1
continue
# 1. Upsert Technique
tech = db.get(Technique, technique_id)
if tech is None:
tech = Technique(
technique_id=technique_id,
technique_name=technique_name or technique_id,
tactic=None,
)
db.add(tech)
db.flush()
inserted_techniques += 1
elif technique_name and not tech.technique_name:
tech.technique_name = technique_name
# 2. Insert QueryTemplate (skip duplicate template_id_str unless replace=True)
# Check uniqueness by (technique_id + purpose) to avoid duplicates on re-run
existing = (
db.query(QueryTemplate)
.filter(
QueryTemplate.technique_id == technique_id,
QueryTemplate.purpose == purpose,
)
.first()
)
if existing:
if replace:
existing.wql_query = wql_query
existing.note = note
existing.is_active = True
inserted_templates += 1
else:
skipped += 1
continue
tpl = QueryTemplate(
technique_id=technique_id,
purpose=purpose,
wql_query=wql_query,
note=note,
is_active=True,
)
db.add(tpl)
inserted_templates += 1
db.commit()
return {
"excel_path": str(EXCEL_PATH),
"rows_processed": len(rows),
"techniques_inserted": inserted_techniques,
"templates_inserted": inserted_templates,
"skipped": skipped,
"errors": errors,
}
if __name__ == "__main__":
replace = "--replace" in sys.argv
db: Session = SessionLocal()
try:
result = run(db, replace=replace)
print("\n=== Import Result ===")
for k, v in result.items():
print(f" {k}: {v}")
finally:
db.close()
|