murshid / murshid_backend /scripts /import_excel_templates.py
devorbit's picture
Initial deployment - secrets removed
26e1c2e
"""
استيراد قوالب WQL من Excel إلى قاعدة البيانات.
يُشغَّل مرة واحدة:
cd d:\GP\murshid_backend
.venv\Scripts\python.exe scripts\import_excel_templates.py
"""
import sys
import re
from pathlib import Path
# أضف مسار الباكند لاستيراد app
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
import openpyxl
from sqlalchemy.orm import Session
from app.config import settings
from app.db.session import SessionLocal
from app.models.query_template import QueryTemplate
from app.models.technique import Technique
EXCEL_PATH = Path(settings.murshid_models_dir) / "murshid_query_template_structure_clean_shared.xlsx"
# Fallback: same directory as project root
if not EXCEL_PATH.is_file():
EXCEL_PATH = Path(__file__).resolve().parent.parent.parent / "murshid_query_template_structure_clean_shared.xlsx"
def normalise_query(q: str | None) -> str:
"""Collapse whitespace/newlines in WQL query."""
if not q:
return ""
return re.sub(r"\s+", " ", q.strip())
def run(db: Session, replace: bool = False) -> dict:
if not EXCEL_PATH.is_file():
return {"error": f"Excel file not found: {EXCEL_PATH}"}
wb = openpyxl.load_workbook(EXCEL_PATH)
ws = wb.active
rows = list(ws.iter_rows(min_row=2, values_only=True))
inserted_techniques = 0
inserted_templates = 0
skipped = 0
errors = []
for idx, row in enumerate(rows, start=2):
technique_id = str(row[0] or "").strip()
technique_name = str(row[1] or "").strip()
template_id_str = str(row[2] or "").strip() # e.g. "T1484-1"
purpose = str(row[3] or "").strip() or None
wql_query = normalise_query(str(row[4] or ""))
note = str(row[5] or "").strip() or None
if not technique_id or not wql_query:
skipped += 1
continue
# 1. Upsert Technique
tech = db.get(Technique, technique_id)
if tech is None:
tech = Technique(
technique_id=technique_id,
technique_name=technique_name or technique_id,
tactic=None,
)
db.add(tech)
db.flush()
inserted_techniques += 1
elif technique_name and not tech.technique_name:
tech.technique_name = technique_name
# 2. Insert QueryTemplate (skip duplicate template_id_str unless replace=True)
# Check uniqueness by (technique_id + purpose) to avoid duplicates on re-run
existing = (
db.query(QueryTemplate)
.filter(
QueryTemplate.technique_id == technique_id,
QueryTemplate.purpose == purpose,
)
.first()
)
if existing:
if replace:
existing.wql_query = wql_query
existing.note = note
existing.is_active = True
inserted_templates += 1
else:
skipped += 1
continue
tpl = QueryTemplate(
technique_id=technique_id,
purpose=purpose,
wql_query=wql_query,
note=note,
is_active=True,
)
db.add(tpl)
inserted_templates += 1
db.commit()
return {
"excel_path": str(EXCEL_PATH),
"rows_processed": len(rows),
"techniques_inserted": inserted_techniques,
"templates_inserted": inserted_templates,
"skipped": skipped,
"errors": errors,
}
if __name__ == "__main__":
replace = "--replace" in sys.argv
db: Session = SessionLocal()
try:
result = run(db, replace=replace)
print("\n=== Import Result ===")
for k, v in result.items():
print(f" {k}: {v}")
finally:
db.close()