lmcuong199 commited on
Commit
c8020a7
Β·
verified Β·
1 Parent(s): 1569ead

Update early_warning_module.py

Browse files
Files changed (1) hide show
  1. early_warning_module.py +45 -72
early_warning_module.py CHANGED
@@ -2,8 +2,8 @@ from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
2
  from collections import Counter
3
  from datetime import datetime, timedelta
4
  import random
 
5
 
6
- # ── Trigger keywords theo tα»«ng loαΊ‘i vαΊ₯n đề ───────────────────────────
7
  WARNING_RULES = {
8
  "waste_overflow": {
9
  "keywords": ["overflow", "overflowing", "full bin", "no bin", "trash everywhere", "garbage everywhere"],
@@ -37,95 +37,71 @@ WARNING_RULES = {
37
  },
38
  }
39
 
40
- SEVERITY_EMOJI = {
41
- "CRITICAL": "🚨",
42
- "HIGH": "πŸ”΄",
43
- "MEDIUM": "🟑",
44
- "LOW": "🟒"
45
- }
46
 
47
- # ── Simulate historical reviews (để demo trend) ───────────────────────
48
- def simulate_review_history(current_reviews: str, risk_score: float):
49
- """TαΊ‘o lα»‹ch sα»­ 7 ngΓ y để vαΊ½ trend β€” dα»±a trΓͺn risk score hiện tαΊ‘i."""
50
- base = risk_score
51
- history = []
52
  for i in range(6, 0, -1):
53
- date = (datetime.now() - timedelta(days=i)).strftime("%b %d")
54
- # GiαΊ£ lαΊ­p fluctuation quanh base
55
  noise = random.uniform(-0.15, 0.15)
56
- score = round(max(0.0, min(1.0, base + noise - 0.05 * i)), 2)
57
- history.append({"date": date, "score": score})
58
- # NgΓ y hΓ΄m nay = score thαΊ­t
59
- history.append({"date": "Today", "score": round(risk_score, 2)})
60
- return history
61
-
62
- # ── Main function ─────────────────────────────────────────────────────
63
- def check_early_warning(reviews_text: str, risk_score: float) -> str:
 
 
 
 
 
 
64
  if not reviews_text.strip():
65
- return "No reviews provided for early warning analysis."
66
 
67
  reviews = [r.strip().lower() for r in reviews_text.strip().split("\n") if r.strip()]
68
- analyzer = SentimentIntensityAnalyzer()
69
 
70
- # ── Step 1: Trigger-based detection ──────────────────────────────
71
  triggered = []
72
  for rule_name, rule in WARNING_RULES.items():
73
  hits = []
74
  for review in reviews:
75
  found = [kw for kw in rule["keywords"] if kw in review]
76
- if found:
77
- hits.extend(found)
78
  if hits:
79
- freq = Counter(hits)
80
  triggered.append({
81
  "rule": rule_name,
82
  "message": rule["message"],
83
  "severity": rule["severity"],
84
- "hits": dict(freq),
85
  "count": len(hits)
86
  })
87
 
88
- # Sort by severity
89
  severity_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3}
90
  triggered.sort(key=lambda x: severity_order.get(x["severity"], 99))
91
 
92
- # ── Step 2: Trend simulation ──────────────────────────────────────
93
- history = simulate_review_history(reviews_text, risk_score)
94
- trend_values = [h["score"] for h in history]
95
-
96
- # Detect spike: nαΊΏu 2 ngΓ y gαΊ§n nhαΊ₯t tΔƒng > 0.15 = spike
97
- spike = False
98
- if len(trend_values) >= 3:
99
- recent_change = trend_values[-1] - trend_values[-3]
100
- spike = recent_change > 0.15
101
-
102
- # Draw trend chart (text-based)
103
- trend_rows = ""
104
- for h in history:
105
- bar_len = int(h["score"] * 20)
106
- bar = "β–ˆ" * bar_len
107
- flag = " ⚠️" if h["score"] > 0.6 else ""
108
- trend_rows += f"| {h['date']} | {bar:<20} | {h['score']:.2f}{flag} |\n"
109
-
110
- # ── Step 3: Format output ─────────────────────────────────────────
111
  if not triggered:
112
  warning_section = "βœ… **No warning triggers detected.** Reviews appear safe.\n"
113
  else:
114
  warning_section = ""
115
  for t in triggered:
116
- emoji = SEVERITY_EMOJI.get(t["severity"], "⚠️")
117
- kw_str = ", ".join([f"'{k}' x{v}" for k, v in t["hits"].items()])
118
  warning_section += f"""
119
  #### {emoji} {t["severity"]} β€” {t["message"]}
120
  - **Keywords found:** {kw_str}
121
- - **Frequency:** {t["count"]} mention(s) across reviews
122
  """
123
 
124
- spike_alert = ""
125
- if spike:
126
- spike_alert = "\n> 🚨 **SPIKE DETECTED:** Risk score increased significantly in the last 48 hours β€” possible emerging issue!\n"
127
-
128
- # Overall alert level
129
  if any(t["severity"] == "CRITICAL" for t in triggered):
130
  overall = "🚨 CRITICAL β€” Immediate intervention required"
131
  elif any(t["severity"] == "HIGH" for t in triggered):
@@ -135,31 +111,28 @@ def check_early_warning(reviews_text: str, risk_score: float) -> str:
135
  else:
136
  overall = "🟒 ALL CLEAR β€” No immediate warnings"
137
 
138
- output = f"""## ⚑ Early Warning System
 
 
 
 
 
 
139
 
140
  ### Overall Status: {overall}
141
  {spike_alert}
142
 
143
  ---
144
 
145
- ### πŸ” Warning Triggers Detected
146
  {warning_section}
147
 
148
  ---
149
 
150
- ### πŸ“ˆ Risk Score Trend (Last 7 Days)
151
-
152
- | Date | Risk Level | Score |
153
- |------|-----------|-------|
154
- {trend_rows}
155
-
156
- {'⚠️ **Trend is RISING** β€” investigate before peak season.' if spike else 'πŸ“‰ Trend is stable or improving.'}
157
-
158
- ---
159
-
160
  ### πŸ“¬ Auto-Alert Summary
161
- If this were a live system, the following alerts would be sent:
162
 
163
- {chr(10).join([f"- **{t['severity']}** alert to operator: {t['message']}" for t in triggered]) if triggered else "- No alerts triggered."}
 
164
  """
165
- return output
 
2
  from collections import Counter
3
  from datetime import datetime, timedelta
4
  import random
5
+ import pandas as pd
6
 
 
7
  WARNING_RULES = {
8
  "waste_overflow": {
9
  "keywords": ["overflow", "overflowing", "full bin", "no bin", "trash everywhere", "garbage everywhere"],
 
37
  },
38
  }
39
 
40
+ SEVERITY_EMOJI = {"CRITICAL": "🚨", "HIGH": "πŸ”΄", "MEDIUM": "🟑", "LOW": "🟒"}
 
 
 
 
 
41
 
42
+ def build_trend_df(risk_score: float) -> pd.DataFrame:
43
+ """TαΊ‘o DataFrame 7 ngΓ y để vαΊ½ line chart thαΊ­t bαΊ±ng gr.LinePlot."""
44
+ rows = []
 
 
45
  for i in range(6, 0, -1):
46
+ date = datetime.now() - timedelta(days=i)
 
47
  noise = random.uniform(-0.15, 0.15)
48
+ score = round(max(0.0, min(1.0, risk_score + noise - 0.04 * i)), 3)
49
+ rows.append({"date": date, "risk_score": score, "period": "Historical"})
50
+ # HΓ΄m nay = Δ‘iểm thαΊ­t
51
+ rows.append({"date": datetime.now(), "risk_score": round(risk_score, 3), "period": "Today"})
52
+ return pd.DataFrame(rows)
53
+
54
+ def check_early_warning(reviews_text: str, risk_score: float):
55
+ """
56
+ Returns:
57
+ warning_text (str): markdown output
58
+ trend_df (pd.DataFrame): dΓΉng cho gr.LinePlot
59
+ """
60
+ empty_df = pd.DataFrame({"date": [], "risk_score": []})
61
+
62
  if not reviews_text.strip():
63
+ return "⚠️ Run Text Analysis first (Tab 1).", empty_df
64
 
65
  reviews = [r.strip().lower() for r in reviews_text.strip().split("\n") if r.strip()]
 
66
 
67
+ # ── Trigger detection ─────────────────────────────────────────────
68
  triggered = []
69
  for rule_name, rule in WARNING_RULES.items():
70
  hits = []
71
  for review in reviews:
72
  found = [kw for kw in rule["keywords"] if kw in review]
73
+ hits.extend(found)
 
74
  if hits:
 
75
  triggered.append({
76
  "rule": rule_name,
77
  "message": rule["message"],
78
  "severity": rule["severity"],
79
+ "hits": dict(Counter(hits)),
80
  "count": len(hits)
81
  })
82
 
 
83
  severity_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3}
84
  triggered.sort(key=lambda x: severity_order.get(x["severity"], 99))
85
 
86
+ # ── Trend ─────────────────────────────────────────────���───────────
87
+ trend_df = build_trend_df(risk_score)
88
+ trend_values = trend_df["risk_score"].tolist()
89
+ spike = len(trend_values) >= 3 and (trend_values[-1] - trend_values[-3]) > 0.15
90
+
91
+ # ── Format warnings ───────────────────────────────────────────────
 
 
 
 
 
 
 
 
 
 
 
 
 
92
  if not triggered:
93
  warning_section = "βœ… **No warning triggers detected.** Reviews appear safe.\n"
94
  else:
95
  warning_section = ""
96
  for t in triggered:
97
+ emoji = SEVERITY_EMOJI.get(t["severity"], "⚠️")
98
+ kw_str = ", ".join([f"'{k}' Γ—{v}" for k, v in t["hits"].items()])
99
  warning_section += f"""
100
  #### {emoji} {t["severity"]} β€” {t["message"]}
101
  - **Keywords found:** {kw_str}
102
+ - **Frequency:** {t["count"]} mention(s)
103
  """
104
 
 
 
 
 
 
105
  if any(t["severity"] == "CRITICAL" for t in triggered):
106
  overall = "🚨 CRITICAL β€” Immediate intervention required"
107
  elif any(t["severity"] == "HIGH" for t in triggered):
 
111
  else:
112
  overall = "🟒 ALL CLEAR β€” No immediate warnings"
113
 
114
+ spike_alert = "\n> 🚨 **SPIKE DETECTED:** Risk increased significantly in last 48h!\n" if spike else ""
115
+
116
+ alert_list = "\n".join([
117
+ f"- **{t['severity']}** β†’ {t['message']}" for t in triggered
118
+ ]) if triggered else "- No alerts triggered."
119
+
120
+ warning_text = f"""## ⚑ Early Warning System
121
 
122
  ### Overall Status: {overall}
123
  {spike_alert}
124
 
125
  ---
126
 
127
+ ### πŸ” Warning Triggers
128
  {warning_section}
129
 
130
  ---
131
 
 
 
 
 
 
 
 
 
 
 
132
  ### πŸ“¬ Auto-Alert Summary
133
+ {alert_list}
134
 
135
+ ---
136
+ *(Risk trend chart shown below)*
137
  """
138
+ return warning_text, trend_df