File size: 12,640 Bytes
a51a1a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
"""Generalisation Model Tuning β€” Stage 3 of the Generalisation pipeline."""

import streamlit as st
import cv2
import numpy as np
import time
import plotly.graph_objects as go

from src.detectors.rce.features import REGISTRY
from src.models import BACKBONES, RecognitionHead
from src.utils import build_rce_vector


def render():
    st.title("βš™οΈ Model Tuning: Train & Compare")

    pipe = st.session_state.get("gen_pipeline")
    if not pipe or "crop" not in pipe:
        st.error("Please complete the **Data Lab** first.")
        st.stop()

    crop      = pipe["crop"]
    crop_aug  = pipe.get("crop_aug", crop)
    train_img = pipe["train_image"]
    bbox      = pipe.get("crop_bbox", (0, 0, crop.shape[1], crop.shape[0]))
    rois      = pipe.get("rois", [{"label": "object", "bbox": bbox,
                                    "crop": crop, "crop_aug": crop_aug}])
    active_modules = pipe.get("active_modules", {k: True for k in REGISTRY})
    is_multi = len(rois) > 1

    def build_training_set():
        images, labels = [], []
        for roi in rois:
            images.append(roi["crop"]); labels.append(roi["label"])
            images.append(roi["crop_aug"]); labels.append(roi["label"])
        all_bboxes = [roi["bbox"] for roi in rois]
        H, W = train_img.shape[:2]
        x0r, y0r, x1r, y1r = rois[0]["bbox"]
        ch, cw = y1r - y0r, x1r - x0r
        rng = np.random.default_rng(42)
        n_neg_target = len(images) * 2
        attempts, negatives = 0, []
        while len(negatives) < n_neg_target and attempts < 300:
            rx = rng.integers(0, max(W - cw, 1))
            ry = rng.integers(0, max(H - ch, 1))
            overlaps = any(rx < bx1 and rx + cw > bx0 and ry < by1 and ry + ch > by0
                           for bx0, by0, bx1, by1 in all_bboxes)
            if overlaps:
                attempts += 1; continue
            patch = train_img[ry:ry+ch, rx:rx+cw]
            if patch.shape[0] > 0 and patch.shape[1] > 0:
                negatives.append(patch)
            attempts += 1
        images.extend(negatives)
        labels.extend(["background"] * len(negatives))
        return images, labels, len(negatives) < n_neg_target // 2

    # Show training data
    st.subheader("Training Data (from Data Lab)")
    if is_multi:
        st.caption(f"**{len(rois)} classes** defined.")
        roi_cols = st.columns(min(len(rois), 4))
        for i, roi in enumerate(rois):
            with roi_cols[i % len(roi_cols)]:
                st.image(cv2.cvtColor(roi["crop"], cv2.COLOR_BGR2RGB),
                         caption=f"βœ… {roi['label']}", width=140)
    else:
        td1, td2 = st.columns(2)
        td1.image(cv2.cvtColor(crop, cv2.COLOR_BGR2RGB),
                  caption="Original Crop (positive)", width=180)
        td2.image(cv2.cvtColor(crop_aug, cv2.COLOR_BGR2RGB),
                  caption="Augmented Crop (positive)", width=180)
    st.divider()

    col_rce, col_cnn, col_orb = st.columns(3)

    # -------------------------------------------------------------------
    # RCE Training
    # -------------------------------------------------------------------
    with col_rce:
        st.header("🧬 RCE Training")
        active_names = [REGISTRY[k]["label"] for k in active_modules if active_modules[k]]
        if not active_names:
            st.error("No RCE modules selected.")
        else:
            st.write(f"**Active modules:** {', '.join(active_names)}")
            rce_C = st.slider("Regularization (C)", 0.01, 10.0, 1.0, step=0.01, key="gen_rce_c")
            rce_max_iter = st.slider("Max Iterations", 100, 5000, 1000, step=100, key="gen_rce_iter")

            if st.button("πŸš€ Train RCE Head", key="gen_train_rce"):
                images, labels, neg_short = build_training_set()
                if neg_short:
                    st.warning("⚠️ Few negatives collected.")
                from sklearn.metrics import accuracy_score
                from sklearn.model_selection import cross_val_score
                progress = st.progress(0, text="Extracting RCE features...")
                n = len(images)
                X = [build_rce_vector(img, active_modules) for i, img in enumerate(images)
                     if not progress.progress((i + 1) / n, text=f"Feature extraction: {i+1}/{n}") or True]
                # rebuild X properly
                X = []
                for i, img in enumerate(images):
                    X.append(build_rce_vector(img, active_modules))
                    progress.progress((i + 1) / n, text=f"Feature extraction: {i+1}/{n}")
                X = np.array(X)
                progress.progress(1.0, text="Fitting Logistic Regression...")

                t0 = time.perf_counter()
                try:
                    head = RecognitionHead(C=rce_C, max_iter=rce_max_iter).fit(X, labels)
                except ValueError as e:
                    st.error(f"Training failed: {e}"); st.stop()
                train_time = time.perf_counter() - t0
                progress.progress(1.0, text="βœ… Training complete!")

                train_acc = accuracy_score(labels, head.model.predict(X))
                st.success(f"Trained in **{train_time:.2f}s**")
                m1, m2, m3, m4 = st.columns(4)
                m1.metric("Train Accuracy", f"{train_acc:.1%}")
                if len(images) >= 6:
                    n_splits = min(5, len(set(labels)))
                    if n_splits >= 2:
                        cv_scores = cross_val_score(head.model, X, labels, cv=min(3, len(images) // 2))
                        m2.metric("CV Accuracy", f"{cv_scores.mean():.1%}", delta=f"Β±{cv_scores.std():.1%}")
                    else:
                        m2.metric("CV Accuracy", "N/A")
                else:
                    m2.metric("CV Accuracy", "N/A")
                m3.metric("Vector Size", f"{X.shape[1]} floats")
                m4.metric("Samples", f"{len(images)}")

                # Feature importance
                coefs = head.model.coef_
                feat_names = []
                for key, meta_r in REGISTRY.items():
                    if active_modules.get(key, False):
                        for b in range(10):
                            feat_names.append(f"{meta_r['label']}[{b}]")
                if coefs.shape[0] == 1:
                    fig_imp = go.Figure(go.Bar(x=feat_names, y=np.abs(coefs[0])))
                    fig_imp.update_layout(title="LogReg Coefficient Magnitude",
                                          template="plotly_dark", height=300)
                else:
                    fig_imp = go.Figure()
                    for ci, cls in enumerate(head.classes_):
                        if cls == "background": continue
                        fig_imp.add_trace(go.Bar(x=feat_names, y=np.abs(coefs[ci]),
                                                  name=cls, opacity=0.8))
                    fig_imp.update_layout(title="Coefficients per Class",
                                          template="plotly_dark", height=300, barmode="group")
                st.plotly_chart(fig_imp, use_container_width=True)

                pipe["rce_head"] = head
                pipe["rce_train_acc"] = train_acc
                st.session_state["gen_pipeline"] = pipe

            if pipe.get("rce_head"):
                st.divider()
                st.subheader("Quick Predict")
                head = pipe["rce_head"]
                vec = build_rce_vector(crop_aug, active_modules)
                label, conf = head.predict(vec)
                st.write(f"**{label}** β€” {conf:.1%} confidence")

    # -------------------------------------------------------------------
    # CNN Fine-Tuning
    # -------------------------------------------------------------------
    with col_cnn:
        st.header("🧠 CNN Fine-Tuning")
        selected = st.selectbox("Select Model", list(BACKBONES.keys()), key="gen_mt_cnn")
        meta = BACKBONES[selected]
        st.caption(f"Backbone: **{meta['dim']}D** β†’ Logistic Regression")
        cnn_C = st.slider("Regularization (C) ", 0.01, 10.0, 1.0, step=0.01, key="gen_cnn_c")
        cnn_max_iter = st.slider("Max Iterations ", 100, 5000, 1000, step=100, key="gen_cnn_iter")

        if st.button(f"πŸš€ Train {selected} Head", key="gen_train_cnn"):
            images, labels, neg_short = build_training_set()
            backbone = meta["loader"]()
            from sklearn.metrics import accuracy_score
            progress = st.progress(0, text=f"Extracting {selected} features...")
            n = len(images)
            X = []
            for i, img in enumerate(images):
                X.append(backbone.get_features(img))
                progress.progress((i + 1) / n, text=f"Feature extraction: {i+1}/{n}")
            X = np.array(X)
            progress.progress(1.0, text="Fitting...")

            t0 = time.perf_counter()
            try:
                head = RecognitionHead(C=cnn_C, max_iter=cnn_max_iter).fit(X, labels)
            except ValueError as e:
                st.error(f"Training failed: {e}"); st.stop()
            train_time = time.perf_counter() - t0
            progress.progress(1.0, text="βœ… Done!")

            train_acc = accuracy_score(labels, head.model.predict(X))
            st.success(f"Trained in **{train_time:.2f}s**")
            m1, m2 = st.columns(2)
            m1.metric("Train Accuracy", f"{train_acc:.1%}")
            m2.metric("Samples", f"{len(images)}")

            pipe[f"cnn_head_{selected}"] = head
            pipe[f"cnn_acc_{selected}"] = train_acc
            st.session_state["gen_pipeline"] = pipe

        if pipe.get(f"cnn_head_{selected}"):
            st.divider()
            st.subheader("Quick Predict")
            backbone = meta["loader"]()
            head = pipe[f"cnn_head_{selected}"]
            feats = backbone.get_features(crop_aug)
            label, conf = head.predict(feats)
            st.write(f"**{label}** β€” {conf:.1%} confidence")

    # -------------------------------------------------------------------
    # ORB Training
    # -------------------------------------------------------------------
    with col_orb:
        st.header("πŸ›οΈ ORB Matching")
        from src.detectors.orb import ORBDetector
        orb_dist_thresh = st.slider("Match Distance Threshold", 10, 100, 70, key="gen_orb_dist")
        orb_min_matches = st.slider("Min Good Matches", 1, 20, 5, key="gen_orb_min")

        if st.button("πŸš€ Train ORB Reference", key="gen_train_orb"):
            orb = ORBDetector()
            orb_refs = {}
            for i, roi in enumerate(rois):
                gray = cv2.cvtColor(roi["crop_aug"], cv2.COLOR_BGR2GRAY)
                clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
                gray = clahe.apply(gray)
                kp, des = orb.orb.detectAndCompute(gray, None)
                n_feat = 0 if des is None else len(des)
                orb_refs[roi["label"]] = {"descriptors": des, "n_features": n_feat,
                                           "keypoints": kp, "crop": roi["crop_aug"]}

            for lbl, ref in orb_refs.items():
                if ref["keypoints"]:
                    vis = cv2.drawKeypoints(ref["crop"], ref["keypoints"], None, color=(0, 255, 0))
                    st.image(cv2.cvtColor(vis, cv2.COLOR_BGR2RGB),
                             caption=f"{lbl}: {ref['n_features']} keypoints",
                             use_container_width=True)

            pipe["orb_detector"] = orb
            pipe["orb_refs"] = orb_refs
            pipe["orb_dist_thresh"] = orb_dist_thresh
            pipe["orb_min_matches"] = orb_min_matches
            st.session_state["gen_pipeline"] = pipe
            st.success("ORB references stored!")

    # Comparison Table
    st.divider()
    st.subheader("πŸ“Š Training Comparison")
    rows = []
    if pipe.get("rce_train_acc") is not None:
        rows.append({"Model": "RCE", "Type": "Feature Engineering",
                     "Train Accuracy": f"{pipe['rce_train_acc']:.1%}"})
    for name in BACKBONES:
        acc = pipe.get(f"cnn_acc_{name}")
        if acc is not None:
            rows.append({"Model": name, "Type": "CNN Backbone",
                         "Train Accuracy": f"{acc:.1%}"})
    if pipe.get("orb_refs"):
        rows.append({"Model": "ORB", "Type": "Keypoint Matching",
                     "Train Accuracy": "N/A"})
    if rows:
        import pandas as pd
        st.dataframe(pd.DataFrame(rows), use_container_width=True, hide_index=True)
    else:
        st.info("Train at least one model to see the comparison.")