File size: 61,431 Bytes
5f43c7d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
#!/usr/bin/env python3
"""Her · हेर — local API server. 100% LOCAL, 127.0.0.1 ONLY.

A thin HTTP transport over the deterministic engine. It does three jobs and no
more (the engine stays the product; this just carries its output to the UI):

  GET  /api/health           -> {ok, llama}                  liveness + model reachable?
  GET  /api/sessions         -> projects[] of real sessions  (discovery.py; cwd from inside files)
  GET  /api/analyze?path=..  -> enriched engine JSON          (cli/analyze, cached by mtime)
  POST /api/chat  {question, path}                            grounded Q&A over ONE session's trace
  GET  /  (and assets)       -> the built UI (ui/dist)        single origin, no CORS

Non-negotiables honoured:
  * NO model and NO network in the engine path; the ONLY model call is the chat,
    and it goes to the LOCAL llama-server via NarratorClient (localhost-guarded).
  * Trace content never leaves the machine: bind 127.0.0.1, llama is localhost,
    no outbound calls anywhere.
  * cwd is trusted from inside each file (discovery.py), never decoded from the
    lossy folder name.
  * Path safety: only .jsonl files under ~/.claude or this repo may be read.
"""
from __future__ import annotations

import json
import os
import re
import sys
import urllib.parse
from collections import Counter
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path

REPO = Path(__file__).resolve().parent.parent
if str(REPO) not in sys.path:
    sys.path.insert(0, str(REPO))

from engine.contract import to_jsonable                # noqa: E402
from engine.core.analyze import analyze_path           # noqa: E402
from engine.core.best_practices import practice_for    # noqa: E402
from engine.core.binaries_db import load_registry       # noqa: E402
from engine.core import impact                           # noqa: E402
from engine.loaders.jsonl_loader import load           # noqa: E402
from engine.entities import extract_entities, entity_totals  # noqa: E402
from engine.binaries import extract_binaries, unknown_binary_names  # noqa: E402
from engine import discovery                            # noqa: E402
from narrator.client import NarratorClient             # noqa: E402
from narrator.factory import get_narrator              # noqa: E402

HOST = "127.0.0.1"
PORT = int(os.environ.get("HER_PORT", os.environ.get("TRACE_PORT", "8765")))
DIST = REPO / "ui" / "dist"
PUBLIC = REPO / "ui" / "public"
# The ONE bundled demo session (identity-sanitized). It is NOT a default: it loads
# only via the explicit "__demo__" sentinel below (the landing demo button), never as
# an auto-fallback for a missing/empty path.
DEMO = REPO / "fixtures" / "demo-session.jsonl"
CLAUDE_DIR = (Path.home() / ".claude").resolve()
# An extra allowed root for session files. The ZeroGPU Space mounts an HF storage
# bucket at /data and sets HER_EXTRA_ROOT=/data; uploaded sessions live under it
# (namespaced per client). The local product leaves this unset → behavior unchanged.
_EXTRA_ROOT_ENV = os.environ.get("HER_EXTRA_ROOT")
EXTRA_ROOT = Path(_EXTRA_ROOT_ENV).resolve() if _EXTRA_ROOT_ENV else None

# --------------------------------------------------------------------------- #
# analyze cache — keyed by (realpath, mtime) so editing/replacing a file busts it
# --------------------------------------------------------------------------- #
_CACHE: dict[tuple[str, int], dict] = {}

# Passive enricher work-queue: bare binary names discovered during analysis that
# the registry can't yet name. The background daemon (Phase B) drains this; until
# then it just accumulates (deduped, bounded) and nothing blocks the response.
_ENRICH_QUEUE: "set[str]" = set()


def _enqueue_unknown_binaries(binaries: list) -> None:
    """Add not-yet-identified binary NAMES (bare data only — never command text)
    to the enricher queue. Fire-and-forget; safe if the enricher is disabled."""
    if os.environ.get("HER_ENRICH") == "0":
        return
    for u in unknown_binary_names(binaries):
        if len(_ENRICH_QUEUE) < 500:
            _ENRICH_QUEUE.add(u["name"])


# --------------------------------------------------------------------------- #
# consent — the first-run disclaimer's opt-in for sharing learnings (default on).
# Persisted to ~/.her-consent.json so the daemon knows whether to upload and the
# user is asked only once. The disclaimer + slider live in the UI (DisclaimerModal).
# --------------------------------------------------------------------------- #
CONSENT_PATH = Path.home() / ".her-consent.json"
_CONSENT: dict = {"accepted": False, "share": True}  # default share=on (per owner)


def _load_consent() -> None:
    global _CONSENT
    try:
        data = json.loads(CONSENT_PATH.read_text(encoding="utf-8"))
        if isinstance(data, dict):
            _CONSENT = {"accepted": bool(data.get("accepted")), "share": bool(data.get("share", True))}
    except (OSError, ValueError):
        pass


def _save_consent(accepted: bool, share: bool) -> None:
    global _CONSENT
    _CONSENT = {"accepted": bool(accepted), "share": bool(share)}
    try:
        CONSENT_PATH.write_text(json.dumps(_CONSENT), encoding="utf-8")
    except OSError:
        pass


_load_consent()


def _enricher_daemon() -> None:
    """PASSIVE background worker: drain the unknown-binary queue and enrich it via
    the local model + public package registries (bare names only — the one
    owner-approved egress, NN#2). Never blocks any request. When it learns
    something, it busts the analyze/brief caches so the new product name + logo
    appear on the next view; and, ONLY if the user opted in (consent.share), it
    shares the credential-scrubbed learnings file to the write-only R2 collector.
    Opt out of enrichment with HER_ENRICH=0; opt out of sharing in the disclaimer."""
    import time
    try:
        from narrator.enricher import enrich_names, share_learnings
    except Exception:
        return  # enricher not available -> stay silent, queue just accumulates
    while True:
        time.sleep(5)
        if not _ENRICH_QUEUE:
            continue
        batch = []
        while _ENRICH_QUEUE and len(batch) < 8:
            batch.append(_ENRICH_QUEUE.pop())
        try:
            learned = enrich_names(batch)
        except Exception:
            learned = 0
        if learned:
            # the registry mtime-cache auto-refreshes; bust the result caches so a
            # now-known binary stops showing as bare on the next analyze/project.
            _CACHE.clear()
            _BRIEF_CACHE.clear()
            # share the (scrubbed) learnings to R2 ONLY if the OWNER explicitly
            # enabled it (HER_SHARE=1) AND consent allows. DISABLED BY DEFAULT IN CODE:
            # HER_SHARE defaults to "0" here (and the hosted Space also sets it to 0), so
            # NO learnings ever egress unless someone deliberately opts in — a file
            # reader sees the phone-home is off in the default config. share_learnings()
            # re-checks the same flag itself, so this is defence-in-depth, not the only
            # gate.
            if (os.environ.get("HER_SHARE", "0") == "1"
                    and _CONSENT.get("accepted") and _CONSENT.get("share")):
                try:
                    share_learnings()
                except Exception:
                    pass


def _start_enricher() -> None:
    """Start the passive enricher daemon thread unless disabled (HER_ENRICH=0)."""
    if os.environ.get("HER_ENRICH") == "0":
        return
    import threading
    threading.Thread(target=_enricher_daemon, daemon=True, name="her-enricher").start()


def _serialize(result: dict) -> dict:
    return {
        "session": result["session"],
        "turns": [to_jsonable(t) for t in result["turns"]],
        "events": [to_jsonable(e) for e in result["events"]],
        "findings": result["findings"],
        "recommendations": result.get("recommendations", []),
    }


def _safe_session_path(raw: str | None) -> Path | None:
    """Resolve a requested session path. Only .jsonl files under ~/.claude or the
    repo are allowed; everything else is refused.

    The literal sentinel "__demo__" resolves to the bundled demo session — this is the
    ONLY way it loads (the landing demo button sends it). An empty/None path is NOT a
    session and returns None: there is deliberately no silent demo/fixture default."""
    if raw == "__demo__":
        return DEMO if DEMO.is_file() else None
    if not raw:
        return None
    try:
        p = Path(raw).expanduser().resolve()
    except (OSError, RuntimeError):
        return None
    if p.suffix != ".jsonl" or not p.is_file():
        return None
    # Real ancestor containment (not a raw string prefix, which would accept a sibling
    # like <repo>-evil/x.jsonl). Allows ~/.claude and anything under the repo (incl.
    # the Space's REPO/.uploads). is_relative_to is Py3.9+; the repo targets 3.10+.
    roots = [CLAUDE_DIR, REPO.resolve()] + ([EXTRA_ROOT] if EXTRA_ROOT else [])
    try:
        ok = any(p.is_relative_to(r) for r in roots)
    except AttributeError:  # pragma: no cover - Py<3.9 boundary-aware fallback
        ok = any((str(p) + os.sep).startswith(str(r) + os.sep) for r in roots)
    if not ok:
        return None
    return p


def _analyze_cached(path: Path) -> dict:
    key = (str(path), path.stat().st_mtime_ns)
    if key not in _CACHE:
        _CACHE.clear()  # one session at a time is plenty; keep memory flat
        payload = _serialize(analyze_path(str(path)))
        # named entities (skills / sub-agents / MCP) for per-session tracing
        payload["entities"] = extract_entities(payload["turns"])
        # binaries run via Bash (npx remotion -> remotion, railway, …) — a separate
        # dimension from tool calls, enriched from the registry; unknowns queued for
        # the background enricher (passive — never blocks this response).
        payload["binaries"] = extract_binaries(payload["turns"], load_registry())
        # actions worth reviewing + risk level + outcome (deterministic, suggest-only)
        payload["impact"] = impact.detect_impact(payload["turns"], payload["binaries"])
        _enqueue_unknown_binaries(payload["binaries"])
        _CACHE[key] = payload
    return _CACHE[key]


# --------------------------------------------------------------------------- #
# sessions inventory for the browser (discovery + light file stats)
# --------------------------------------------------------------------------- #
def _sessions_payload(projects_dir: str | None = None) -> dict:
    refs = discovery.discover_sessions(projects_dir)
    by_cwd: dict[str, list[dict]] = {}
    for r in refs:
        if not r.cwd:
            continue
        try:
            st = os.stat(r.path)
            mtime, size = int(st.st_mtime), st.st_size
        except OSError:
            mtime, size = 0, 0
        by_cwd.setdefault(r.cwd, []).append({
            "path": r.path,
            "sessionId": r.sessionId,
            "encodedDir": r.encodedDir,
            "mtime": mtime,
            "sizeBytes": size,
            # real session start time read from inside the file (Shripal: tell
            # sessions apart). getattr keeps this safe if discovery is older.
            "startedAt": getattr(r, "startedAt", None),
        })
    projects = []
    for cwd in sorted(by_cwd):
        sess = sorted(by_cwd[cwd], key=lambda s: s["mtime"], reverse=True)
        projects.append({"cwd": cwd, "count": len(sess), "sessions": sess})
    projects.sort(key=lambda p: p["count"], reverse=True)
    total = sum(p["count"] for p in projects)
    return {"projects": projects, "total": total, "projectCount": len(projects)}


# --------------------------------------------------------------------------- #
# grounded chat — deterministic retrieval over ONE session, model writes prose
# --------------------------------------------------------------------------- #
_STOP = {"the", "and", "why", "did", "this", "that", "what", "how", "was", "were",
         "for", "with", "you", "are", "does", "doing", "happen", "happened",
         "show", "tell", "explain", "which", "where", "when", "who", "from",
         "into", "over", "about", "there", "here", "have", "has", "its"}


def _words(text: str) -> list[str]:
    out, cur = [], []
    for ch in (text or "").lower():
        if ch.isalnum() or ch in "._/-":
            cur.append(ch)
        else:
            if cur:
                out.append("".join(cur)); cur = []
    if cur:
        out.append("".join(cur))
    return [w for w in out if len(w) >= 3 and w not in _STOP]


def _turn_blob(t: dict) -> str:
    parts = [t.get("prompt", ""), t.get("reply", "")]
    for tc in t.get("tools", []):
        parts.append(tc.get("summary", ""))
        if tc.get("flowValue"):
            parts.append(str(tc["flowValue"]))
    if t.get("guide"):
        g = t["guide"]
        parts.append(f"{g.get('head','')} {g.get('body','')}")
    return " ".join(parts)


def _best_practice_block(analysis: dict) -> str:
    """A compact, cited 'what could be better' block, built from the SAME
    deterministic `recommendations` the UI renders (engine output). Each line pairs
    the observed pattern with its cited Anthropic fix. Empty `recommendations` ->
    '' (silence is a valid result, build rule #6). The model may teach ONLY from
    what's here; it cannot invent a best practice."""
    recs = analysis.get("recommendations", []) or []
    if not recs:
        return ""
    lines = [
        "WHAT COULD BE BETTER (deterministic signals + the cited Anthropic best "
        "practice each maps to; suggest-only, cite the turn):"
    ]
    source = None
    for r in recs:
        tstr = ", ".join(f"turn {i}" for i in r.get("turns", []))
        practice = r.get("practice")
        head = r.get("headline", "")
        advice = r.get("advice", "")
        if practice:
            lines.append(f"- {tstr}: {head} -> best practice \"{practice}\": {advice}")
            source = r.get("source") or source
        else:
            lines.append(f"- {tstr}: {head}{advice}")
    if source:
        lines.append(f"(Source: {source})")
    return "\n".join(lines)


def _retrieve(analysis: dict, question: str) -> tuple[int, list[int], str]:
    """Deterministic: score every turn by keyword overlap with the question (plus
    explicit 'turn N' references and cost-intent boosts). Return
    (focus_turn_index, cited_turn_indices, context_text)."""
    turns = analysis["turns"]
    sess = analysis["session"]
    qwords = set(_words(question))
    ql = (question or "").lower()

    # explicit "turn N" / "query N" references
    explicit: set[int] = set()
    toks = ql.replace("#", " ").split()
    for i, tok in enumerate(toks):
        if tok in ("turn", "query", "turns", "queries") and i + 1 < len(toks):
            num = "".join(c for c in toks[i + 1] if c.isdigit())
            if num != "":
                explicit.add(int(num))

    cost_intent = any(w in ql for w in ("expensive", "cost", "slow", "heavy", "token",
                                        "loop", "re-read", "reread", "churn", "spend"))
    err_intent = any(w in ql for w in ("error", "fail", "failed", "broke", "broken", "wrong", "stuck"))
    # window intent: questions about the live context window / fill / compaction —
    # answered from the deterministic gauge (session.context), NOT the cumulative sums.
    ctx_intent = any(w in ql for w in ("context window", "window", "compact", "fill",
                                       "full", "fit", "1m", "overflow", "ran out", "gauge"))

    scored = []
    compact_turns = {c.get("atTurn") for c in (sess.get("context", {}) or {}).get("compactions", [])}
    for t in turns:
        blob = set(_words(_turn_blob(t)))
        score = len(qwords & blob)
        if t["i"] in explicit:
            score += 100
        if cost_intent and t.get("heavy"):
            score += 3
        if cost_intent and t.get("guide"):
            score += 2
        if err_intent and any(tc.get("errored") for tc in t.get("tools", [])):
            score += 3
        if ctx_intent and t["i"] in compact_turns:  # window question → surface compactions
            score += 3
        scored.append((score, -t["i"], t))  # tie-break: earlier turn first
    scored.sort(reverse=True)

    # focus = top turn (fall back to heaviest if the question matched nothing)
    if scored[0][0] <= 0:
        heavy = sess.get("heavyTurns") or [0]
        focus = max(heavy, key=lambda i: turns[i]["tokens"]["cacheRead"])
        top = [focus]
    else:
        focus = scored[0][2]["i"]
        top = [s[2]["i"] for s in scored[:3] if s[0] > 0]
    if not top:
        top = [focus]

    # build a compact, faithful context from the chosen turns
    ctxw = sess.get("context", {}) or {}
    comps = ctxw.get("compactions", []) or []
    over = ctxw.get("overLimit", []) or []
    # CUMULATIVE token sums (no ceiling — re-paid every round-trip) vs the POINT-IN-TIME
    # window gauge (bounded by the model's window). Spell out both so the model never
    # conflates a multi-million cache-read total with the ≤1M context window.
    lines = [
        f"SESSION: cwd={sess.get('cwd')} · {sess.get('turns')} turns "
        f"({sess.get('humanTurns')} human, {sess.get('systemTurns')} system) · "
        f"{sess.get('tools')} tool calls · cache re-reads {sess.get('tokens',{}).get('cacheRead'):,} "
        f"(CUMULATIVE across all round-trips, ~{round(sess.get('cacheReadOverOut',0))}x generated — NOT window size) · "
        f"agent-driven {round(100*sess.get('indirectRatio',0))}% "
        f"({sess.get('indirect')} indirect / {sess.get('direct')} direct) · "
        f"heavy turns {sess.get('heavyTurns')} · real retry loops 0.",
        f"CONTEXT WINDOW (point-in-time gauge, bounded by the model's window): "
        f"peak fill {ctxw.get('peak',0):,} / {ctxw.get('limit',1_000_000):,} "
        f"({round(100*ctxw.get('peakPct',0))}% of the window) · "
        f"compactions: {len(comps)}"
        + (f" (at turns {[c.get('atTurn') for c in comps]}, e.g. {comps[0].get('before'):,}->{comps[0].get('after'):,})" if comps else " (the window never had to be trimmed)")
        + (f" · WARNING: {len(over)} request(s) reported occupancy ABOVE the window (turns {over}) — the source data or parse is suspect" if over else "")
        + ". This gauge is point-in-time; the cache-read total above is cumulative — they are different quantities and the cumulative one is expected to exceed the window.",
    ]
    # Always include the cited best-practice block (when any signal fired) so
    # "what could I have done better?" is answerable even when keyword scoring
    # wouldn't surface the relevant turns.
    bp_block = _best_practice_block(analysis)
    if bp_block:
        lines.append("\n" + bp_block)
    for i in top:
        t = turns[i]
        tools = t.get("tools", [])
        toolbits = []
        for tc in tools[:14]:
            tag = tc.get("provenance", "direct")
            if tc.get("flowValue"):
                tag += f"<-{tc.get('sourceTool')}:{tc['flowValue']}"
            if tc.get("errored"):
                tag += ",ERRORED"
            toolbits.append(f"{tc.get('summary','')[:70]} [{tag}]")
        more = f" (+{len(tools)-14} more)" if len(tools) > 14 else ""
        guide = ""
        if t.get("guide"):
            guide = f" GUIDE[{t['guide'].get('head')}]: {t['guide'].get('body')}"
        lines.append(
            f"\nTURN {i} ({t.get('origin')}){' HEAVY' if t.get('heavy') else ''}: "
            f"prompt={t.get('prompt','')[:300]!r}\n"
            f"  reply={t.get('reply','')[:240]!r}\n"
            f"  tokens: cacheRead={t['tokens']['cacheRead']:,} out={t['tokens']['out']:,} "
            f"reqs={t.get('reqs')} · direct={t.get('direct')} indirect={t.get('indirect')}{guide}\n"
            f"  tools: " + " | ".join(toolbits) + more
        )
    return focus, sorted(set(top) | explicit & {t['i'] for t in turns}), "\n".join(lines)


_CHAT_SYSTEM = (
    "You are a forensic assistant for ONE coding-agent session (Claude Code). "
    "Answer ONLY from the TRACE CONTEXT provided — never invent files, tools, or "
    "numbers. Cite turns as 'turn N' using the turn numbers in the context. "
    "Numbers in the context are computed by a deterministic engine; quote them, "
    "do not recompute. Keep two quantities distinct and never conflate them: "
    "'cache re-reads' (and cost) are CUMULATIVE token sums across every round-trip "
    "and routinely reach the millions — they have no ceiling; the CONTEXT WINDOW "
    "gauge (peak fill / limit, e.g. 848k / 1M) is point-in-time and IS bounded by "
    "the window. A multi-million cache-read total does NOT mean the window overflowed. "
    "Only treat the window as over-full if the context explicitly flags a request above "
    "the limit. SUGGEST, never assert a fix ('looks like…', 'worth "
    "checking…', not 'the bug is X'). If the answer is not in the trace, say so "
    "plainly. Be concise: 2-4 sentences, plain English, no jargon dumps. "
    "If the user asks what they could have done better, use ONLY the items in the "
    "'WHAT COULD BE BETTER' block (each already carries the cited Anthropic best "
    "practice); cite the turn and phrase it as a gentle suggestion. Never introduce "
    "a best practice that is not in that block. If the block is absent, say the "
    "session looks clean and there's nothing notable to change."
)


def _relevant_tool(turn: dict, qwords: set, err_intent: bool) -> int | None:
    """The single tool in a turn most relevant to the question — so a citation can
    land on the exact tool, not just the turn. Error-flavoured questions point at
    the first errored tool; otherwise the best keyword/flowValue overlap; else the
    first errored or first proven value-flow tool. Deterministic."""
    tools = turn.get("tools", [])
    if not tools:
        return None
    if err_intent:
        for idx, tc in enumerate(tools):
            if tc.get("errored"):
                return idx
    best, best_score = None, 0
    for idx, tc in enumerate(tools):
        blob = set(_words(" ".join([
            tc.get("summary", ""), str(tc.get("flowValue") or ""),
            tc.get("name", ""), str(tc.get("sourceTool") or ""),
        ])))
        score = len(qwords & blob)
        if score > best_score:
            best, best_score = idx, score
    if best is not None and best_score > 0:
        return best
    for idx, tc in enumerate(tools):
        if tc.get("errored"):
            return idx
    for idx, tc in enumerate(tools):
        if tc.get("provenance") == "indirect" and tc.get("flowValue"):
            return idx
    return None


def _chip_label(turn: dict, tool_idx: int | None) -> str:
    """Friendly label for a citation chip: 'turn 5 · Bash ●err' / 'turn 9 · Read migrate.js'."""
    i = turn["i"]
    if tool_idx is None:
        return f"turn {i}"
    tc = turn["tools"][tool_idx]
    name = f"{tc['mcp']['server']}:{tc['mcp']['tool']}" if tc.get("mcp") else tc.get("name", "tool")
    return f"turn {i} · {name}{' ●err' if tc.get('errored') else ''}"


def _chat(question: str, path: Path) -> dict:
    analysis = _analyze_cached(path)
    turns = analysis["turns"]
    qwords = set(_words(question))
    ql = (question or "").lower()
    err_intent = any(w in ql for w in ("error", "fail", "failed", "broke", "broken", "wrong", "stuck", "retry", "retries"))

    focus, cited, context = _retrieve(analysis, question)
    user = f"TRACE CONTEXT:\n{context}\n\nQUESTION: {question}\n\nAnswer from the trace above, citing turn numbers."
    model_used = None
    answer = None
    try:
        client = get_narrator()
        if client.wait_until_ready(max_wait=4.0, interval=1.0):
            model_used = client.model_id()
            answer = client.chat(_CHAT_SYSTEM, user, temperature=0.2, max_tokens=320)
    except Exception:
        answer = None
    if not answer:
        # Deterministic fallback so the feature works even with the model off.
        t = turns[focus]
        answer = (
            f"(model offline — showing the trace) Turn {focus} is the most relevant: "
            f"{t.get('prompt','')[:120]}… It made {len(t.get('tools',[]))} tool calls, "
            f"{t.get('indirect')} of them agent-driven, with "
            f"{t['tokens']['cacheRead']:,} context re-read tokens"
            + (f". Tip: {t['guide'].get('body')}" if t.get('guide') else ".")
        )
    # union any 'turn N' the model cited with the retrieval picks
    cited_set = set(cited)
    low = answer.lower().replace("#", " ").split()
    for i, tok in enumerate(low):
        if tok.startswith("turn") and i + 1 < len(low):
            num = "".join(c for c in low[i + 1] if c.isdigit())
            if num != "" and 0 <= int(num) < len(turns):
                cited_set.add(int(num))

    # per-citation tool targeting -> the chip opens the turn AND selects the tool
    focus_tool = _relevant_tool(turns[focus], qwords, err_intent)
    citations = [
        {"turn": i, "tool": _relevant_tool(turns[i], qwords, err_intent),
         "label": _chip_label(turns[i], _relevant_tool(turns[i], qwords, err_intent))}
        for i in sorted(cited_set)
    ]
    return {
        "answer": answer,
        "focusTurn": focus,
        "focusTool": focus_tool,
        "citedTurns": sorted(cited_set),
        "citations": citations,
        "model": model_used,
        "grounded": True,
    }


# --------------------------------------------------------------------------- #
# HTTP handler
# --------------------------------------------------------------------------- #
_OVERVIEW_CACHE: dict[tuple[str, int], dict] = {}
_OVERVIEW_SYSTEM = (
    "You explain what happened in ONE coding-agent session, in plain English for a "
    "non-expert. Read the ordered turns and write 3-5 calm sentences: what the user "
    "was trying to do, what the agent actually did, and how it ended. Name a few "
    "turns as 'turn N'. If something looks like a problem, SUGGEST ('looks like…'), "
    "never assert a fix. Do NOT dwell on token counts or cost — focus on the work "
    "and the outcome. No drama, no marketing; just what happened."
)


def _overview(analysis: dict) -> dict:
    """A plain-English 'what happened overall' for the session — narrator prose, the
    ONLY model call here. Grounded in the ordered turns (prompts + replies + flags)."""
    turns = analysis["turns"]
    sess = analysis["session"]
    lines = [
        f"SESSION: cwd={sess.get('cwd')} · {sess.get('turns')} turns "
        f"({sess.get('humanTurns')} human, {sess.get('systemTurns')} system) · "
        f"{sess.get('tools')} tool calls · heavy turns {sess.get('heavyTurns')}."
    ]
    for t in turns:
        tl = t.get("tools", [])
        err = sum(1 for tc in tl if tc.get("errored"))
        flags = []
        if t.get("heavy"):
            flags.append("heavy")
        if err:
            flags.append(f"{err} errored")
        if t.get("guide"):
            flags.append("flagged-" + str(t["guide"].get("kind")))
        lines.append(
            f"turn {t['i']} ({t.get('origin')}): {(t.get('prompt') or '')[:220]!r} "
            f"=> reply {(t.get('reply') or '')[:170]!r} "
            f"[{', '.join(flags) or 'clean'}; {len(tl)} tools]"
        )
    context = "\n".join(lines)[:6500]
    try:
        client = get_narrator()
        if client.wait_until_ready(max_wait=4.0, interval=1.0):
            text = client.chat(
                _OVERVIEW_SYSTEM,
                "SESSION TURNS:\n" + context + "\n\nWrite the plain-English overview now.",
                temperature=0.3, max_tokens=300,
            )
            return {"overview": text.strip(), "model": client.model_id()}
    except Exception:
        pass
    return {"overview": "", "model": None}


# --------------------------------------------------------------------------- #
# WHAT COULD HAVE BEEN BETTER — the engine DETECTS the fixable signals (proven,
# no model); the LOCAL model WRITES the advice, scoped to THIS session's objective
# and grounded in the cited Anthropic best practice. Model-for-prose-only: the
# finding is deterministic, only the wording is generated. Suggest, never assert.
# Falls back to the engine's transcribed fix text when the model is unreachable.
# --------------------------------------------------------------------------- #
_ADVICE_CACHE: dict[tuple[str, int], dict] = {}

_ADVICE_SYS = (
    "You advise someone learning to drive a coding agent (Claude Code). A "
    "DETERMINISTIC engine already detected ONE specific, fixable pattern in THIS "
    "session — you do not decide whether it happened, you only explain it well. "
    "Using (a) what the user set out to do, (b) what actually happened on the cited "
    "turn(s), and (c) the relevant Anthropic best practice given to you, write 2-3 "
    "sentences of advice that is SCOPED TO THIS SESSION: refer to what they were "
    "actually doing, name the turn ('on turn 9…'), and suggest a concrete better "
    "move grounded in the Anthropic practice. RULES: SUGGEST, never assert "
    "('you could', 'it would have helped' — never 'you must' or 'the bug is'). Do "
    "NOT give generic advice — tie it to this session's work. Do NOT invent files, "
    "tools, or facts not in the context. Plain English, no jargon. Prose only."
)


def _advice(analysis: dict) -> dict:
    """Per fired signal, ask the local model for session-scoped advice. Returns
    {recommendations:[{...rec, scoped}], model}. `scoped` is the model's prose, or
    None when the model is offline (the UI then falls back to the engine's cited
    fix text). The deterministic detection (which turns, which signal) is untouched."""
    recs = analysis.get("recommendations", []) or []
    if not recs:
        return {"recommendations": [], "model": None}

    turns = analysis.get("turns", [])
    humans = [t for t in turns if t.get("origin") == "human"]
    objective = ((humans[0]["prompt"] if humans else (turns[0]["prompt"] if turns else "")) or "")[:600]
    by_i = {t["i"]: t for t in turns}

    client = None
    try:
        c = get_narrator()
        if c.wait_until_ready(max_wait=4.0, interval=1.0):
            client = c
    except Exception:
        client = None
    model_used = client.model_id() if client else None

    out = []
    for r in recs:
        ctx_lines = []
        for i in r.get("turns", []):
            t = by_i.get(i)
            if not t:
                continue
            tl = t.get("tools", []) or []
            err = sum(1 for tc in tl if tc.get("errored"))
            mix = ", ".join(f"{c2} {n}" for n, c2 in Counter(tc.get("name") for tc in tl).most_common(4))
            ctx_lines.append(
                f"turn {i}: {((t.get('prompt') or '')[:160])!r} · ran {len(tl)} tools "
                f"({mix}){f', {err} errored' if err else ''}"
            )
        user = (
            f"SESSION OBJECTIVE (what the user set out to do):\n{objective}\n\n"
            f"WHAT HAPPENED ON THE FLAGGED TURN(S):\n" + "\n".join(ctx_lines) +
            f"\n\nDETECTED PATTERN (deterministic): {r.get('headline')}  (signal: {r.get('kind')})\n"
            f"RELEVANT ANTHROPIC BEST PRACTICE: {r.get('practice')}{r.get('advice')}\n\n"
            "Write the scoped suggestion now."
        )
        scoped = None
        if client:
            try:
                txt = client.chat(_ADVICE_SYS, user, temperature=0.3, max_tokens=210)
                scoped = txt.strip() if txt else None
            except Exception:
                scoped = None
        out.append({**r, "scoped": scoped})

    return {"recommendations": out, "model": model_used}


# --------------------------------------------------------------------------- #
# PROJECT level — many sessions under one cwd. A plain-English changelog, an
# entity inventory (skills / sub-agents / MCP servers, traceable to sessions),
# and a cross-session chat ("when did we add column X to sql?").
# --------------------------------------------------------------------------- #
_BRIEF_CACHE: dict[tuple[str, int], dict] = {}
_PROJECT_NARR_CACHE: dict[str, dict] = {}
_PROJECT_CAP = 24  # parse at most the N most-recent sessions, for responsiveness


def _brief(path: Path) -> dict:
    """Per-session facts via the LOADER only (no provenance, no model): counts, a
    title, named entities, and a search blob. Cached by mtime."""
    key = (str(path), path.stat().st_mtime_ns)
    if key in _BRIEF_CACHE:
        return _BRIEF_CACHE[key]
    loaded = load(str(path))
    turns = [to_jsonable(t) for t in loaded["turns"]]
    sess = loaded["session"]
    humans = [t for t in turns if t.get("origin") == "human"]
    title = humans[0]["prompt"] if humans else (turns[0]["prompt"] if turns else "(empty session)")
    title = " ".join(str(title).split())[:100]
    ents = extract_entities(turns)
    bins = extract_binaries(turns, load_registry())
    imp = impact.detect_impact(turns, bins)
    parts = []
    edited: list[str] = []      # distinct files this session CHANGED — the most distinctive
    seen_edit: set[str] = set()  # cross-session signal, and what the changelog should report
    for t in turns:
        parts.append(t.get("prompt", "") or "")
        parts.append((t.get("reply", "") or "")[:200])
        for tc in t.get("tools", []) or []:
            s = tc.get("summary", "") or ""
            parts.append(s)
            if tc.get("flowValue"):
                parts.append(str(tc["flowValue"]))
            # _summary() renders only Edit/Write as "Edit <basename>" (Read is "Read …"),
            # so this prefix uniquely captures files the session wrote, not files it read.
            if s.startswith("Edit "):
                fn = s[5:].strip()
                if fn and fn not in seen_edit:
                    seen_edit.add(fn)
                    edited.append(fn)
    # Anthropic cost (the ranking key) + cacheRead (kept as a secondary metric), via
    # the per-turn token rollup the loader already produced. Pure summation, no model.
    cost = sum((t.get("tokens", {}) or {}).get("cost", 0) for t in turns)
    cache_read = sum((t.get("tokens", {}) or {}).get("cacheRead", 0) for t in turns)
    generated = sum((t.get("tokens", {}) or {}).get("out", 0) for t in turns)
    brief = {
        "path": str(path), "sessionId": sess.get("sessionId"),
        "cwd": sess.get("cwd"), "gitBranch": sess.get("gitBranch"),
        "turns": len(turns), "humanTurns": len(humans),
        "tools": sum(len(t.get("tools", []) or []) for t in turns),
        "cost": cost, "cacheRead": cache_read, "generated": generated,
        "title": title, "firstPrompt": (humans[0]["prompt"][:300] if humans else ""),
        "mtime": int(path.stat().st_mtime),
        # real session start/end timestamps (from inside the file) so the project
        # view can show WHEN each session ran, not just a file-mtime "age".
        "startedAt": sess.get("startedAt"), "endedAt": sess.get("endedAt"),
        "entities": ents, "entityTotals": entity_totals(ents),
        "binaries": bins,
        "impact": imp,
        "editedFiles": edited[:10],
        "blob": " ".join(parts)[:9000],
    }
    _BRIEF_CACHE[key] = brief
    return brief


def _project_sessions(cwd: str, projects_dir: str | None = None) -> list:
    target = discovery._norm(cwd)
    refs = [s for s in discovery.discover_sessions(projects_dir) if s.cwd == target]
    def _mt(s):
        try:
            return os.path.getmtime(s.path)
        except OSError:
            return 0
    refs.sort(key=_mt, reverse=True)
    return refs


def _aggregate_entities(briefs: list) -> dict:
    out = {"skills": {}, "subAgents": {}, "mcpServers": {}}
    for b in briefs:
        sid, path = b["sessionId"], b["path"]
        for kind in out:
            for e in b["entities"].get(kind, []):
                slot = out[kind].setdefault(e["name"], {"name": e["name"], "total": 0, "sessions": []})
                slot["total"] += e["count"]
                slot["sessions"].append({
                    "sessionId": sid, "path": path, "count": e["count"],
                    "turns": e.get("turns", []), "tools": e.get("tools"),
                })
    return {k: sorted(v.values(), key=lambda x: (-x["total"], x["name"])) for k, v in out.items()}


def _aggregate_binaries(briefs: list) -> list:
    """Roll every session's binaries up by name across the project, summing counts
    and recording which sessions/turns each appeared in (the cross-session
    traceback) — and carrying the registry metadata so the inventory shows the
    product name, blurb, logo and security note, not just the bare binary."""
    out: dict = {}
    META = ("product", "blurb", "homepage", "logo", "security", "source", "updated")
    for b in briefs:
        sid, path = b["sessionId"], b["path"]
        for e in b.get("binaries", []) or []:
            slot = out.setdefault(e["name"], {
                "name": e["name"], "binary": e["name"], "total": 0, "sessions": [],
                "via": e.get("via"), "identified": bool(e.get("identified")),
            })
            slot["total"] += e["count"]
            slot["sessions"].append({
                "sessionId": sid, "path": path, "count": e["count"],
                "turns": e.get("turns", []),
            })
            if e.get("identified"):  # first identified session wins the display metadata
                slot["identified"] = True
                for k in META:
                    if e.get(k) is not None and k not in slot:
                        slot[k] = e[k]
    return sorted(out.values(), key=lambda x: (-x["total"], x["name"]))


_RISK_RANK = {"None": 0, "Low": 1, "Medium": 2, "High": 3}
_TAG_ORDER = {"PRODUCTION": 0, "SECURITY": 1, "NETWORK": 2, "CONFIG": 3}


_PROJECT_ACTIONS_CACHE: dict = {}


def _project_actions(cwd: str, projects_dir: str | None = None) -> dict:
    """Whole-project 'actions worth reviewing' — scanned across ALL sessions, not
    just the parse-capped subset the changelog uses. This is the safety lens, so it
    must be COMPLETE: a deploy or DB role change in any session must show, even one
    the changelog cap dropped. Cheap: it only regex-scans Bash command strings (no
    full parse, no model). Each action traces back to the sessions it happened in."""
    target = discovery._norm(cwd)
    refs = [s for s in discovery.discover_sessions(projects_dir) if s.cwd == target]
    sig = tuple(sorted(
        (s.path, int(os.path.getmtime(s.path)) if os.path.exists(s.path) else 0) for s in refs
    ))
    key = (target, sig)
    if key in _PROJECT_ACTIONS_CACHE:
        return _PROJECT_ACTIONS_CACHE[key]

    agg: dict = {}
    for s in refs[:250]:  # backstop on pathological project sizes
        sid = s.sessionId
        try:
            with open(s.path, "r", encoding="utf-8") as fh:
                for line in fh:
                    if '"Bash"' not in line:
                        continue
                    try:
                        r = json.loads(line)
                    except (ValueError, json.JSONDecodeError):
                        continue
                    if r.get("type") != "assistant":
                        continue
                    for b in (r.get("message", {}) or {}).get("content", []) or []:
                        if isinstance(b, dict) and b.get("type") == "tool_use" and b.get("name") == "Bash":
                            cmd = str((b.get("input") or {}).get("command", "") or "")
                            for tag, title, detail in impact._scan_command(cmd):
                                slot = agg.setdefault((tag, title), {
                                    "tag": tag, "title": title, "detail": detail,
                                    "total": 0, "sessions": [], "_sids": set(),
                                })
                                slot["total"] += 1
                                if sid not in slot["_sids"]:
                                    slot["_sids"].add(sid)
                                    slot["sessions"].append({"sessionId": sid, "path": s.path})
        except OSError:
            continue

    actions = []
    for a in agg.values():
        a.pop("_sids", None)
        actions.append(a)
    actions.sort(key=lambda a: (impact._TAG_ORDER.get(a["tag"], 9), -a["total"], a["title"]))
    level, _reason = impact.risk_level(actions)
    result = {"riskLevel": level, "actions": actions}
    _PROJECT_ACTIONS_CACHE.clear()  # one project at a time is plenty
    _PROJECT_ACTIONS_CACHE[key] = result
    return result


def _aggregate_impact(briefs: list) -> dict:
    """Roll session impact up to the project: every 'action worth reviewing' across
    sessions (each traceable to the sessions/turns it happened in), and the highest
    risk level seen. Powers the project-level report's safety lens."""
    actions: dict = {}
    level = "None"
    for b in briefs:
        imp = b.get("impact") or {}
        if _RISK_RANK.get(imp.get("riskLevel", "None"), 0) > _RISK_RANK.get(level, 0):
            level = imp.get("riskLevel", "None")
        for a in imp.get("actions", []) or []:
            slot = actions.setdefault((a["tag"], a["title"]), {
                "tag": a["tag"], "title": a["title"], "detail": a.get("detail", ""),
                "total": 0, "sessions": [],
            })
            slot["total"] += 1
            slot["sessions"].append({
                "sessionId": b["sessionId"], "path": b["path"], "turns": a.get("turns", []),
            })
    out = sorted(
        actions.values(),
        key=lambda a: (_TAG_ORDER.get(a["tag"], 9), -a["total"], a["title"]),
    )
    return {"riskLevel": level, "actions": out}


_PROJECT_NARR_SYSTEM = (
    "You write a plain-English changelog of what happened across the coding-agent "
    "sessions in ONE project, for a non-expert. For each session (oldest first) you are "
    "given its short id and what it ACTUALLY DID — the files it changed, the actions it "
    "took, the tools / sub-agents / skills it used. Write flowing prose, no headers, no "
    "bullet list:\n"
    "- Open with one sentence naming what this project is and the through-line across "
    "the sessions.\n"
    "- Then describe the notable work. GROUP sessions that did the same kind of thing "
    "into one statement instead of repeating a line each. Cite sessions as [id].\n"
    "- Report what was BUILT or CHANGED (the files, the actions) — do NOT restate the "
    "request text. If many sessions show the SAME request (e.g. an automated security "
    "or PR-review pipeline), say that ONCE and focus on what differed, never echo it "
    "per session.\n"
    "Concrete and calm; suggest, don't assert. 4 to 8 sentences. Ground ONLY in what "
    "you are given — never invent files, tools, or features."
)

# Auto-generated first prompts (a /security-review run, a slash-command preamble, a PR
# template) repeat VERBATIM across sessions, so the bare first prompt is a useless,
# identical "title" that makes the changelog parrot the same line N times (the screenshot
# of "[id] Review this change for security vulnerabilities…" x16). Detect them so the
# digest describes what the session DID rather than echoing the boilerplate ask.
_BOILERPLATE_TITLE_RX = re.compile(
    r"review this change for security"
    r"|changed files \(you may read"
    r"|caveat: the messages below were generated"
    r"|opened (the |a )?pull request"
    r"|<command-(name|message|args)>"
    r"|^\s*/[a-z][\w-]*",
    re.I,
)


def _session_digest(b: dict) -> str:
    """One DISTINCTIVE line per session for the changelog model: what it actually did
    (the request only if it's not boilerplate, plus impact actions, changed files, and
    named tools/agents/skills) — so the model has something to summarize beyond a first
    prompt that is identical across an automated-review project."""
    sid = (b.get("sessionId") or "?")[:8]
    title = " ".join(str(b.get("title") or "").split())
    bits: list[str] = []
    if title and _BOILERPLATE_TITLE_RX.search(title):
        bits.append("automated security/PR-review run")
    elif title:
        bits.append(f"asked {title[:130]!r}")
    acts = [a["title"] for a in (b.get("impact") or {}).get("actions", [])[:3]]
    if acts:
        bits.append("did: " + "; ".join(acts))
    edited = b.get("editedFiles") or []
    if edited:
        more = f" +{len(edited) - 6} more" if len(edited) > 6 else ""
        bits.append(f"changed {', '.join(edited[:6])}{more}")
    used: list[str] = []
    for kind, lbl in (("subAgents", "agents"), ("skills", "skills"), ("mcpServers", "mcp")):
        names = [e["name"] for e in b.get("entities", {}).get(kind, [])[:3]]
        if names:
            used.append(f"{lbl}:{','.join(names)}")
    tools = [x["name"] for x in (b.get("binaries") or [])[:3]]
    if tools:
        used.append("tools:" + ",".join(tools))
    if used:
        bits.append(" · ".join(used))
    body = " | ".join(bits) if bits else "(no notable activity)"
    return f"[{sid}] {b.get('turns', 0)} turns — {body}"


# Detail at most this many sessions in the changelog context; the rest are summarized by
# count so a big project can't overflow the model's output and get cut off mid-word.
_NARR_DETAIL_CAP = 20


def _project_narrative(cwd: str, briefs: list) -> dict:
    mkey = "|".join(f"{b['sessionId']}:{b['mtime']}" for b in briefs)
    if mkey in _PROJECT_NARR_CACHE:
        return _PROJECT_NARR_CACHE[mkey]
    ordered = sorted(briefs, key=lambda b: b["mtime"])
    lines = [f"PROJECT: {cwd} · {len(ordered)} session(s)."]
    for b in ordered[:_NARR_DETAIL_CAP]:
        lines.append(_session_digest(b))
    if len(ordered) > _NARR_DETAIL_CAP:
        lines.append(f"(+{len(ordered) - _NARR_DETAIL_CAP} older session(s), similar — summarize by count)")
    context = "\n".join(lines)[:8000]
    result = {"narrative": "", "model": None}
    try:
        client = get_narrator()
        if client.wait_until_ready(max_wait=4.0, interval=1.0):
            txt = client.chat(
                _PROJECT_NARR_SYSTEM,
                "SESSIONS (oldest first):\n" + context + "\n\nWrite the changelog now.",
                temperature=0.3, max_tokens=700,
            )
            result = {"narrative": txt.strip(), "model": client.model_id()}
    except Exception:
        pass
    _PROJECT_NARR_CACHE[mkey] = result
    return result


def _project(cwd: str, with_narrative: bool = True, projects_dir: str | None = None) -> dict:
    refs = _project_sessions(cwd, projects_dir)
    briefs = []
    for s in refs[:_PROJECT_CAP]:
        try:
            briefs.append(_brief(Path(s.path)))
        except Exception:
            continue
    # The narrative is the ONLY model call here. On the ZeroGPU Space it must be
    # invoked via the Gradio API (so auth headers forward for GPU quota), so the
    # plain-REST /api/project route passes with_narrative=False and the UI fetches
    # the prose separately through the `project_narrative` Gradio endpoint.
    narr = _project_narrative(cwd, briefs) if with_narrative else {"narrative": "", "model": None}
    # Sessions are RANKED BY COST (Anthropic token consumption) — what the user pays
    # for — not by recency. (Parsing is still capped by recency above; ordering is
    # cost.) Tie-break by mtime so equal-cost sessions stay stable.
    ranked = sorted(briefs, key=lambda b: (-b.get("cost", 0), -b.get("mtime", 0)))
    return {
        "cwd": cwd, "sessionCount": len(refs), "shown": len(briefs),
        "totalCost": sum(b.get("cost", 0) for b in briefs),
        "sessions": [{k: v for k, v in b.items() if k != "blob"} for b in ranked],
        "entities": _aggregate_entities(briefs),
        "binaries": _aggregate_binaries(briefs),
        # impact scans ALL sessions (not the parse-capped subset) — the safety lens
        # must be complete; an action in a dropped session must still show.
        "impact": _project_actions(cwd, projects_dir),
        "narrative": narr.get("narrative", ""), "model": narr.get("model"),
    }


# Anti-fabrication clause appended to every project-chat system prompt — the model
# may ONLY use facts present in the context (this is what stops it inventing a
# "smruti-deploy image" or a column that isn't in the trace).
_NO_INVENT = (
    " Use ONLY facts shown in the context. NEVER invent file names, image names, "
    "commands, columns, tables, or features that are not present. If the context "
    "doesn't say, reply that it isn't clearly in these sessions."
)
_PROJECT_OVERVIEW_SYSTEM = (
    "You explain, for a non-expert, what a multi-session coding PROJECT is and what "
    "was built across it. Ground your answer ONLY in the project changelog and the "
    "session titles/entities given. Write 3-5 plain sentences: the project's purpose "
    "and the main things built or changed. You may cite a few sessions as [id]."
    + _NO_INVENT
)
_PROJECT_LOOKUP_SYSTEM = (
    "You locate WHICH session in a project something happened in. Given candidate "
    "sessions (short id, title, matched snippets), name the session(s) by short id "
    "[id] and say what happened there, quoting only what the snippets actually show. "
    "If nothing matches, say it isn't clearly in these sessions. SUGGEST, never "
    "assert. 2-4 sentences. Remind the user they can open a named session to go deeper."
    + _NO_INVENT
)

# Phrases / shape that mark a BROAD "tell me about the whole project" question
# (grounded on the full changelog) vs a SPECIFIC lookup (keyword-retrieved).
_BROAD_HINTS = (
    "what was built", "what did we build", "what is this project", "what's this project",
    "what is the project", "what was the project", "overall", "in general", "high level",
    "high-level", "summary", "summarize", "the gist", "purpose", "what happened in this project",
    "what are these sessions", "what was done", "tell me about the project", "what's the project",
)
_BROAD_STOP = {
    "overall", "summary", "summarize", "built", "build", "building", "overview",
    "everything", "across", "project", "projects", "gist", "about", "point", "purpose",
    "goal", "goals", "session", "sessions", "these", "this", "general", "high", "level",
    "mean", "meant", "made", "thing", "things", "stuff",
}


def _is_broad(question: str, qwords: set, top_score: int) -> bool:
    ql = (question or "").lower()
    if any(h in ql for h in _BROAD_HINTS):
        return True
    content = [w for w in qwords if w not in _BROAD_STOP]
    return len(content) <= 1 or top_score <= 1


def _project_chat(question: str, cwd: str, projects_dir: str | None = None) -> dict:
    refs = _project_sessions(cwd, projects_dir)
    briefs = []
    for s in refs[:_PROJECT_CAP]:
        try:
            briefs.append(_brief(Path(s.path)))
        except Exception:
            continue
    if not briefs:
        return {"answer": "No sessions found in this project.", "model": None, "sessionHits": []}

    qwords = set(_words(question))
    scored = sorted(
        ((len(qwords & set(_words(b["title"] + " " + b["blob"]))), b) for b in briefs),
        key=lambda x: (-x[0], -x[1]["mtime"]),
    )
    top_score = scored[0][0] if scored else 0

    if _is_broad(question, qwords, top_score):
        # BROAD: ground on the whole project — the (already grounded) changelog plus
        # every session's title/entities. Synthesize; do not cherry-pick noisy hits.
        narr = _project_narrative(cwd, briefs).get("narrative", "")
        lines = [f"PROJECT CHANGELOG (grounded):\n{narr}", "", "ALL SESSIONS (most active first):"]
        for b in sorted(briefs, key=lambda b: -b["turns"]):
            ents = []
            for kind in ("skills", "mcpServers", "subAgents"):
                ents += [e["name"] for e in b["entities"].get(kind, [])[:2]]
            lines.append(
                f"[{(b['sessionId'] or '?')[:8]}] {b['turns']} turns · {b['title']}"
                + (f" · uses {','.join(ents)}" if ents else "")
            )
        context = "\n".join(lines)[:7200]
        system = _PROJECT_OVERVIEW_SYSTEM
        default_hits = sorted(briefs, key=lambda b: -b["turns"])[:4]
    else:
        # SPECIFIC: keyword-retrieved candidate sessions with matched snippets.
        hits0 = [b for sc, b in scored if sc > 0][:4] or [b for sc, b in scored][:2]
        lines = []
        for b in hits0:
            low = b["blob"].lower()
            snip = []
            for w in list(qwords)[:6]:
                idx = low.find(w)
                if idx >= 0:
                    snip.append(b["blob"][max(0, idx - 50):idx + 70].replace("\n", " "))
            lines.append(f"[{(b['sessionId'] or '?')[:8]}] ({b['turns']} turns) title={b['title']!r} snippets={' … '.join(snip[:3])!r}")
        context = "\n".join(lines)[:6500]
        system = _PROJECT_LOOKUP_SYSTEM
        default_hits = hits0

    answer, model_used = None, None
    try:
        client = get_narrator()
        if client.wait_until_ready(max_wait=4.0, interval=1.0):
            model_used = client.model_id()
            answer = client.chat(system, "CONTEXT:\n" + context + f"\n\nQUESTION: {question}", temperature=0.1, max_tokens=320)
    except Exception:
        answer = None
    if not answer:
        b = default_hits[0]
        answer = f"(model offline) Closest match: session [{(b['sessionId'] or '?')[:8]}] — {b['title']}. Open it to go deeper."

    # chips = the sessions the answer actually cited (by short id), then the defaults
    by_short = {(b["sessionId"] or "")[:8]: b for b in briefs if b.get("sessionId")}
    cited = []
    for tok in re.findall(r"\[([0-9a-fA-F]{6,8})\]", answer):
        b = by_short.get(tok.lower()[:8])
        if b is not None and b not in cited:
            cited.append(b)
    hits = (cited + [b for b in default_hits if b not in cited])[:5]
    return {
        "answer": answer, "model": model_used,
        "sessionHits": [{"sessionId": b["sessionId"], "path": b["path"], "title": b["title"], "turns": b["turns"]} for b in hits],
    }


class Handler(BaseHTTPRequestHandler):
    server_version = "her/1.0"

    def _send(self, code: int, body: bytes, ctype: str):
        self.send_response(code)
        self.send_header("Content-Type", ctype)
        self.send_header("Content-Length", str(len(body)))
        self.send_header("Cache-Control", "no-store")
        self.end_headers()
        try:
            self.wfile.write(body)
        except (BrokenPipeError, ConnectionResetError):
            pass

    def _json(self, obj, code: int = 200):
        self._send(code, json.dumps(obj, ensure_ascii=False).encode("utf-8"), "application/json")

    def log_message(self, *args):  # quiet; this is a local tool
        pass

    # -- GET: api + static -------------------------------------------------- #
    def do_GET(self):
        u = urllib.parse.urlparse(self.path)
        q = urllib.parse.parse_qs(u.query)

        if u.path == "/api/health":
            llama = False
            try:
                llama = get_narrator().wait_until_ready(max_wait=0.1, interval=0.1)
            except Exception:
                llama = False
            return self._json({"ok": True, "llama": llama})

        if u.path == "/api/consent":
            return self._json(_CONSENT)

        if u.path == "/api/sessions":
            try:
                return self._json(_sessions_payload())
            except Exception as e:  # never 500 the browser
                return self._json({"error": str(e), "projects": [], "total": 0}, 200)

        if u.path == "/api/analyze":
            path = _safe_session_path((q.get("path") or [None])[0])
            if path is None:
                return self._json({"error": "path not allowed"}, 400)
            try:
                return self._json(_analyze_cached(path))
            except Exception as e:
                return self._json({"error": f"analyze failed: {e}"}, 500)

        if u.path == "/api/overview":
            path = _safe_session_path((q.get("path") or [None])[0])
            if path is None:
                return self._json({"error": "path not allowed"}, 400)
            try:
                key = (str(path), path.stat().st_mtime_ns)
                if key not in _OVERVIEW_CACHE:
                    _OVERVIEW_CACHE.clear()
                    _OVERVIEW_CACHE[key] = _overview(_analyze_cached(path))
                return self._json(_OVERVIEW_CACHE[key])
            except Exception as e:
                return self._json({"overview": "", "error": str(e)}, 200)

        if u.path == "/api/advice":
            path = _safe_session_path((q.get("path") or [None])[0])
            if path is None:
                return self._json({"error": "path not allowed"}, 400)
            try:
                key = (str(path), path.stat().st_mtime_ns)
                cached = _ADVICE_CACHE.get(key)
                if cached is None:
                    result = _advice(_analyze_cached(path))
                    # Only cache once the model actually wrote prose, so an offline
                    # warm-up doesn't freeze the deterministic fallback in place.
                    if result.get("model"):
                        _ADVICE_CACHE.clear()
                        _ADVICE_CACHE[key] = result
                    cached = result
                return self._json(cached)
            except Exception as e:
                return self._json({"recommendations": [], "model": None, "error": str(e)}, 200)

        if u.path == "/api/project":
            cwd = (q.get("cwd") or [""])[0]
            if not cwd:
                return self._json({"error": "cwd required"}, 400)
            try:
                return self._json(_project(cwd))
            except Exception as e:
                return self._json({"error": f"project failed: {e}"}, 500)

        return self._serve_static(u.path)

    # -- POST: chat --------------------------------------------------------- #
    def do_POST(self):
        u = urllib.parse.urlparse(self.path)
        if u.path not in ("/api/chat", "/api/project_chat", "/api/consent"):
            return self._json({"error": "not found"}, 404)
        try:
            n = int(self.headers.get("Content-Length", "0"))
            body = json.loads(self.rfile.read(n) or "{}")
        except (ValueError, json.JSONDecodeError):
            return self._json({"error": "bad json"}, 400)

        # first-run disclaimer choice: {accepted, share}. Persisted; gates sharing.
        if u.path == "/api/consent":
            _save_consent(bool(body.get("accepted", True)), bool(body.get("share", True)))
            return self._json(_CONSENT)

        question = (body.get("question") or "").strip()
        if not question:
            return self._json({"error": "empty question"}, 400)

        if u.path == "/api/project_chat":
            cwd = (body.get("cwd") or "").strip()
            if not cwd:
                return self._json({"error": "cwd required"}, 400)
            try:
                return self._json(_project_chat(question, cwd))
            except Exception as e:
                return self._json({"error": f"project chat failed: {e}"}, 500)

        path = _safe_session_path(body.get("path"))
        if path is None:
            return self._json({"error": "path not allowed"}, 400)
        try:
            return self._json(_chat(question, path))
        except Exception as e:
            return self._json({"error": f"chat failed: {e}"}, 500)

    # -- static file serving (the built UI) --------------------------------- #
    def _serve_static(self, path: str):
        rel = path.lstrip("/") or "index.html"
        for root in (DIST, PUBLIC):
            cand = (root / rel).resolve()
            if str(cand).startswith(str(root.resolve())) and cand.is_file():
                return self._send(200, cand.read_bytes(), _ctype(cand))
        # SPA fallback
        idx = DIST / "index.html"
        if idx.is_file():
            return self._send(200, idx.read_bytes(), "text/html")
        return self._send(
            404,
            b"UI not built. Run: cd ui && npm run build  (or use vite dev on :5173)",
            "text/plain",
        )


def _ctype(p: Path) -> str:
    return {
        ".html": "text/html", ".js": "text/javascript", ".css": "text/css",
        ".json": "application/json", ".svg": "image/svg+xml", ".png": "image/png",
        ".ico": "image/x-icon", ".woff2": "font/woff2", ".woff": "font/woff",
    }.get(p.suffix, "application/octet-stream")


def main():
    httpd = ThreadingHTTPServer((HOST, PORT), Handler)
    print(f"Her · हेर — server on http://{HOST}:{PORT}  (UI + /api, 100% local)")
    print(f"  dist: {DIST}  ({'built' if (DIST/'index.html').exists() else 'NOT built — run npm run build'})")
    if os.environ.get("HER_ENRICH") == "0":
        print("  enricher: OFF (HER_ENRICH=0)")
    else:
        print("  enricher: passive background (bare binary names -> npm/brew/pypi; HER_ENRICH=0 to disable)")
        _start_enricher()
    try:
        httpd.serve_forever()
    except KeyboardInterrupt:
        httpd.shutdown()


if __name__ == "__main__":
    main()