File size: 3,954 Bytes
26e1c2e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""
استيراد قوالب WQL من Excel إلى قاعدة البيانات.

يُشغَّل مرة واحدة:
    cd d:\GP\murshid_backend
    .venv\Scripts\python.exe scripts\import_excel_templates.py
"""

import sys
import re
from pathlib import Path

# أضف مسار الباكند لاستيراد app
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))

import openpyxl
from sqlalchemy.orm import Session

from app.config import settings
from app.db.session import SessionLocal
from app.models.query_template import QueryTemplate
from app.models.technique import Technique

EXCEL_PATH = Path(settings.murshid_models_dir) / "murshid_query_template_structure_clean_shared.xlsx"

# Fallback: same directory as project root
if not EXCEL_PATH.is_file():
    EXCEL_PATH = Path(__file__).resolve().parent.parent.parent / "murshid_query_template_structure_clean_shared.xlsx"

def normalise_query(q: str | None) -> str:
    """Collapse whitespace/newlines in WQL query."""
    if not q:
        return ""
    return re.sub(r"\s+", " ", q.strip())


def run(db: Session, replace: bool = False) -> dict:
    if not EXCEL_PATH.is_file():
        return {"error": f"Excel file not found: {EXCEL_PATH}"}

    wb = openpyxl.load_workbook(EXCEL_PATH)
    ws = wb.active

    rows = list(ws.iter_rows(min_row=2, values_only=True))

    inserted_techniques = 0
    inserted_templates  = 0
    skipped             = 0
    errors              = []

    for idx, row in enumerate(rows, start=2):
        technique_id   = str(row[0] or "").strip()
        technique_name = str(row[1] or "").strip()
        template_id_str = str(row[2] or "").strip()  # e.g. "T1484-1"
        purpose        = str(row[3] or "").strip() or None
        wql_query      = normalise_query(str(row[4] or ""))
        note           = str(row[5] or "").strip() or None

        if not technique_id or not wql_query:
            skipped += 1
            continue

        # 1. Upsert Technique
        tech = db.get(Technique, technique_id)
        if tech is None:
            tech = Technique(
                technique_id=technique_id,
                technique_name=technique_name or technique_id,
                tactic=None,
            )
            db.add(tech)
            db.flush()
            inserted_techniques += 1
        elif technique_name and not tech.technique_name:
            tech.technique_name = technique_name

        # 2. Insert QueryTemplate (skip duplicate template_id_str unless replace=True)
        # Check uniqueness by (technique_id + purpose) to avoid duplicates on re-run
        existing = (
            db.query(QueryTemplate)
            .filter(
                QueryTemplate.technique_id == technique_id,
                QueryTemplate.purpose == purpose,
            )
            .first()
        )

        if existing:
            if replace:
                existing.wql_query = wql_query
                existing.note      = note
                existing.is_active = True
                inserted_templates += 1
            else:
                skipped += 1
            continue

        tpl = QueryTemplate(
            technique_id=technique_id,
            purpose=purpose,
            wql_query=wql_query,
            note=note,
            is_active=True,
        )
        db.add(tpl)
        inserted_templates += 1

    db.commit()

    return {
        "excel_path":          str(EXCEL_PATH),
        "rows_processed":      len(rows),
        "techniques_inserted": inserted_techniques,
        "templates_inserted":  inserted_templates,
        "skipped":             skipped,
        "errors":              errors,
    }


if __name__ == "__main__":
    replace = "--replace" in sys.argv

    db: Session = SessionLocal()
    try:
        result = run(db, replace=replace)
        print("\n=== Import Result ===")
        for k, v in result.items():
            print(f"  {k}: {v}")
    finally:
        db.close()