Militaryint commited on
Commit
68ad3c2
·
verified ·
1 Parent(s): affafde

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +223 -68
app.py CHANGED
@@ -1,70 +1,225 @@
1
- import gradio as gr
2
- from huggingface_hub import InferenceClient
3
-
4
-
5
- def respond(
6
- message,
7
- history: list[dict[str, str]],
8
- system_message,
9
- max_tokens,
10
- temperature,
11
- top_p,
12
- hf_token: gr.OAuthToken,
13
- ):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14
  """
15
- For more information on `huggingface_hub` Inference API support, please check the docs: https://huggingface.co/docs/huggingface_hub/v0.22.2/en/guides/inference
 
16
  """
17
- client = InferenceClient(token=hf_token.token, model="openai/gpt-oss-20b")
18
-
19
- messages = [{"role": "system", "content": system_message}]
20
-
21
- messages.extend(history)
22
-
23
- messages.append({"role": "user", "content": message})
24
-
25
- response = ""
26
-
27
- for message in client.chat_completion(
28
- messages,
29
- max_tokens=max_tokens,
30
- stream=True,
31
- temperature=temperature,
32
- top_p=top_p,
33
- ):
34
- choices = message.choices
35
- token = ""
36
- if len(choices) and choices[0].delta.content:
37
- token = choices[0].delta.content
38
-
39
- response += token
40
- yield response
41
-
42
-
43
- """
44
- For information on how to customize the ChatInterface, peruse the gradio docs: https://www.gradio.app/docs/chatinterface
45
- """
46
- chatbot = gr.ChatInterface(
47
- respond,
48
- type="messages",
49
- additional_inputs=[
50
- gr.Textbox(value="You are a friendly Chatbot.", label="System message"),
51
- gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"),
52
- gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"),
53
- gr.Slider(
54
- minimum=0.1,
55
- maximum=1.0,
56
- value=0.95,
57
- step=0.05,
58
- label="Top-p (nucleus sampling)",
59
- ),
60
- ],
61
- )
62
-
63
- with gr.Blocks() as demo:
64
- with gr.Sidebar():
65
- gr.LoginButton()
66
- chatbot.render()
67
-
68
-
69
- if __name__ == "__main__":
70
- demo.launch()
 
1
+ # ================================================================
2
+ # === INT STRATEGIC BRAIN v4 — Enemy Analysis & Tab Integration ===
3
+ # ================================================================
4
+ # Paste this block inside your app. For UI, insert the Gradio tab block
5
+ # within your existing `with gr.Blocks(...) as demo:` where other tabs appear.
6
+ # ================================================================
7
+
8
+ import math
9
+ import random
10
+ import datetime
11
+ import json
12
+ from collections import Counter
13
+
14
+ # -------------------------
15
+ # INTBrainV4 core extension
16
+ # -------------------------
17
+ class INTBrainV4:
18
+ def __init__(self, base=None, memory_file="int_brain_v4.json"):
19
+ self.base = base
20
+ self.memory_file = memory_file
21
+ self.seed = 42
22
+ random.seed(self.seed)
23
+
24
+ def _now(self):
25
+ return datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC")
26
+
27
+ def infer_intent(self, text):
28
+ """Lightweight intent inference (doctrinal, non-operational)."""
29
+ t = text.lower()
30
+ intents = []
31
+ if any(k in t for k in ["recon","observe","surveil","watch"]):
32
+ intents.append("Reconnaissance / Surveillance")
33
+ if any(k in t for k in ["attack","ambush","assault","blast","gunfire","strike"]):
34
+ intents.append("Direct Attack / Ambush")
35
+ if any(k in t for k in ["probe","test","drill","feint"]):
36
+ intents.append("Probe / Feint")
37
+ if any(k in t for k in ["disrupt","sabotage","deny service","block"]):
38
+ intents.append("Disruption / Sabotage")
39
+ if not intents:
40
+ intents.append("Unknown / Recon Required")
41
+ return intents
42
+
43
+ def extract_order_of_battle(self, text):
44
+ """Very simple OOB extractor: counts actors, platforms, activity tokens."""
45
+ tokens = [w.strip(".,:;()[]\"'").lower() for w in text.split()]
46
+ counts = Counter(tokens)
47
+ # heuristic features to present
48
+ features = {
49
+ "actors_mentioned": sorted(set([w for w in tokens if len(w)>3]) )[:12],
50
+ "mention_count": sum(v for k,v in counts.items() if len(k)>3),
51
+ }
52
+ return features
53
+
54
+ def cluster_ttp(self, reports, threshold=0.33):
55
+ """Group short TTP-like fragments into rough clusters (lexical heuristic)."""
56
+ clusters = []
57
+ for r in reports:
58
+ placed = False
59
+ for c in clusters:
60
+ if len(set(r.split()) & set(c["centroid"].split()))/max(1,len(set(r.split())|set(c["centroid"].split()))) > threshold:
61
+ c["members"].append(r)
62
+ # pick longest as centroid
63
+ c["centroid"] = max(c["members"], key=lambda s: len(s.split()))
64
+ placed = True
65
+ break
66
+ if not placed:
67
+ clusters.append({"centroid": r, "members": [r]})
68
+ clusters.sort(key=lambda c: len(c["members"]), reverse=True)
69
+ return clusters
70
+
71
+ def enemy_analysis(self, text, extra_reports=None):
72
+ """
73
+ High-level enemy analysis (doctrinal). Returns JSON-friendly dict:
74
+ - inferred intent
75
+ - OOB features
76
+ - TTP clusters
77
+ - quick doctrinal assessment (risk_level)
78
+ """
79
+ res = {"timestamp": self._now(), "input": text}
80
+ res["intent"] = self.infer_intent(text)
81
+ res["oob"] = self.extract_order_of_battle(text)
82
+
83
+ ttp_fragments = []
84
+ if extra_reports:
85
+ for er in extra_reports:
86
+ if isinstance(er, str):
87
+ ttp_fragments.extend([s.strip() for s in er.splitlines() if s.strip()])
88
+ # include short sentences from text as potential TTP fragments
89
+ ttp_fragments.extend([s.strip() for s in text.split(".") if s.strip()])
90
+ ttp_fragments = list(dict.fromkeys(ttp_fragments))[:60]
91
+ res["ttp_clusters"] = self.cluster_ttp(ttp_fragments)
92
+
93
+ # Simple, safe risk-level heuristic
94
+ t = text.lower()
95
+ if any(k in t for k in ["mass", "large", "suicide", "23 terrorists", "dozen", "multiple attackers", "armored"]):
96
+ risk = "High"
97
+ elif any(k in t for k in ["attack","ambush","firefight","assault","blast","gunfire"]):
98
+ risk = "Medium-High"
99
+ elif any(k in t for k in ["threat","probe","suspicious","riot","breach"]):
100
+ risk = "Medium"
101
+ else:
102
+ risk = "Low"
103
+ res["risk_level"] = risk
104
+
105
+ # Human-friendly doctrinal blurb (non-operational)
106
+ res["doctrinal_summary"] = (
107
+ f"At {res['timestamp']}: Inferred intent(s): {', '.join(res['intent'])}. "
108
+ f"Risk level (doctrinal heuristic): {res['risk_level']}. "
109
+ f"Top OOB markers: {', '.join(res['oob'].get('actors_mentioned',[])[:6]) or '[none]'}. "
110
+ "TTP clusters and reasoning provided for planning-level assessment only."
111
+ )
112
+
113
+ return res
114
+
115
+ # wrapper instance and runner
116
+ INTBrainV4 = INTBrainV4(base=globals().get("INTStrategicBrainV3", None))
117
+
118
+ def run_int_brain_v4(text, extra_reports=None):
119
+ try:
120
+ out = INTBrainV4.enemy_analysis(text, extra_reports=extra_reports or [])
121
+ return out
122
+ except Exception as e:
123
+ return {"error": str(e), "timestamp": datetime.datetime.utcnow().isoformat()}
124
+
125
+ # -------------------------
126
+ # UI: Gradio Tab block (paste inside your build_interface demo)
127
+ # -------------------------
128
+ # Insert this block inside your with gr.Blocks(...) as demo: where other `with gr.Tab(...)` are defined.
129
+
130
+ with gr.Tab("Enemy Analysis"):
131
+ gr.Markdown("""
132
+ ### Enemy Analysis (Doctrinal)
133
+ Paste reports, sightings, or chatter. The system produces a doctrinal, non-operational
134
+ Enemy Analysis: inferred intent, order-of-battle markers, TTP clusters, and a planning-level risk indicator.
135
+ """)
136
+
137
+ enemy_input = gr.Textbox(label="Paste Enemy-Related Text / Logs / Reports", lines=6,
138
+ placeholder="e.g., '23 attackers attacked convoy at 0300, RPGs observed, vehicles spotted near waypoint X'")
139
+
140
+ extra_reports = gr.Textbox(label="Optional: Additional short reports (one per line)", lines=4,
141
+ placeholder="Paste supporting notes or HUMINT fragments (optional).")
142
+
143
+ enemy_run = gr.Button("Run Enemy Analysis")
144
+ enemy_out_json = gr.JSON(label="Enemy Analysis (structured)") # shows JSON
145
+ enemy_out_text = gr.Textbox(label="Enemy Analysis (human-friendly)", lines=12, interactive=False)
146
+
147
+ def _enemy_handler(text, extras):
148
+ txt = (text or "").strip()
149
+ if not txt:
150
+ return {"error":"No input provided."}, "No input provided."
151
+
152
+ try:
153
+ extra_list = [s for s in (extras or "").splitlines() if s.strip()]
154
+ analysis = run_int_brain_v4(txt, extra_reports=extra_list)
155
+
156
+ # Derive a polished, non-actionable brief if enrich available
157
+ brief = None
158
+ try:
159
+ prompt = (
160
+ "You are a doctrinal security planner. Produce a concise, non-operational planning brief "
161
+ "from this structured enemy analysis. Keep it high-level and non-actionable.\n\n"
162
+ + json.dumps(analysis, indent=2, ensure_ascii=False)
163
+ )
164
+ brief = enrich_with_llm(prompt, context_title="Enemy Analysis Brief") if "enrich_with_llm" in globals() else None
165
+ except Exception:
166
+ brief = None
167
+
168
+ human = brief if brief else analysis.get("doctrinal_summary", "[No summary available]")
169
+
170
+ # Register with director shared state for later fusion
171
+ try:
172
+ update_shared_state("Enemy Analysis", {
173
+ "input": txt[:800],
174
+ "summary": analysis.get("doctrinal_summary", "")[:800],
175
+ "risk_level": analysis.get("risk_level", "Unknown"),
176
+ "timestamp": analysis.get("timestamp", "")
177
+ })
178
+ except Exception:
179
+ pass
180
+
181
+ return analysis, human
182
+
183
+ except Exception as e:
184
+ return {"error": str(e)}, f"[Error] {e}"
185
+
186
+ enemy_run.click(_enemy_handler, inputs=[enemy_input, extra_reports], outputs=[enemy_out_json, enemy_out_text])
187
+
188
+ # -------------------------
189
+ # Director hook snippet (small) - safe to paste before your Director button handler
190
+ # -------------------------
191
+ def director_collect_and_summarize(extra_context=""):
192
  """
193
+ Lightweight wrapper: aggregates shared_reports_state, calls LLM if available,
194
+ and returns doctrinal consolidated brief (non-operational).
195
  """
196
+ try:
197
+ reports = shared_reports_state.copy() if isinstance(shared_reports_state, dict) else {}
198
+ if not reports:
199
+ return "⚠️ No departmental reports available yet."
200
+
201
+ compiled = "DIRECTOR CONSOLIDATED (DOCTRINAL)\nGenerated: " + datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC") + "\n\n"
202
+ for dept, r in reports.items():
203
+ compiled += f"--- {dept} ---\n"
204
+ if isinstance(r, dict):
205
+ compiled += f"Risk: {r.get('risk_level','Unknown')}\nSummary: {r.get('summary','')[:600]}\n\n"
206
+ else:
207
+ compiled += str(r)[:800] + "\n\n"
208
+
209
+ # Optional LLM polish (safe, non-operational)
210
+ try:
211
+ if "enrich_with_llm" in globals():
212
+ prompt = "Polish the following doctrinal consolidated report. Keep it non-operational.\n\n" + compiled + "\n\nContext: " + (extra_context or "")
213
+ polished = enrich_with_llm(prompt, context_title="Director Consolidated Brief")
214
+ return polished or compiled
215
+ except Exception:
216
+ pass
217
+
218
+ return compiled
219
+
220
+ except Exception as e:
221
+ return f"[Director aggregation error] {e}"
222
+
223
+ # ================================================================
224
+ # === END OF INT v4 Enemy Analysis block
225
+ # ================================================================