igortech commited on
Commit
2170185
·
verified ·
1 Parent(s): b2c8e1d

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +250 -90
app.py CHANGED
@@ -1,7 +1,10 @@
1
  import json
2
- import difflib
3
- import csv
4
  import os
 
 
 
 
 
5
  import gradio as gr
6
 
7
  # -----------------------------
@@ -9,123 +12,280 @@ import gradio as gr
9
  # -----------------------------
10
  DATA_PATH = "quotes.json"
11
 
12
- if os.path.exists(DATA_PATH):
13
- with open(DATA_PATH, "r", encoding="utf-8") as f:
14
- dataset = json.load(f)
15
- else:
16
- dataset = {"staged_responses": []}
 
 
 
 
 
17
 
 
18
 
19
  # -----------------------------
20
- # Helpers
21
  # -----------------------------
22
- def find_best_matches(user_input, category=None, n=3, threshold=0.4):
 
 
 
 
 
 
 
 
 
 
23
  """
24
- Try to find best fuzzy matches in the dataset.
25
- If category is given and fails, fallback to all categories.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
26
  """
27
- matches = []
28
- search_categories = [category] if category and category in dataset else dataset.keys()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29
 
30
- # First pass: search within selected category
31
- for cat in search_categories:
 
32
  if cat == "staged_responses":
33
  continue
34
- for item in dataset.get(cat, []):
35
- text = item.get("quote", "")
36
- score = difflib.SequenceMatcher(None, user_input.lower(), text.lower()).ratio()
37
- if score >= threshold:
38
- matches.append((score, text, cat))
39
-
40
- # If nothing found and category was specified, search all categories
41
- if not matches and category and category in dataset:
42
- for cat in dataset.keys():
43
- if cat == "staged_responses":
44
- continue
45
- for item in dataset.get(cat, []):
46
- text = item.get("quote", "")
47
- score = difflib.SequenceMatcher(None, user_input.lower(), text.lower()).ratio()
48
- if score >= threshold:
49
- matches.append((score, text, cat))
50
-
51
- # Sort and return top n
52
- matches.sort(key=lambda x: x[0], reverse=True)
53
- return matches[:n]
54
-
55
-
56
- def chatbot_response(message, history, category):
57
- if not message.strip():
58
- return history + [("User", "Message is empty.")]
59
-
60
- best_matches = find_best_matches(message, category)
61
-
62
- if best_matches:
63
- responses = [f"[{cat}] {quote}" for _, quote, cat in best_matches]
64
- else:
65
- responses = [f"No data about {message}."]
66
-
67
- history.append(("User", message))
68
- for resp in responses:
69
- history.append(("Bot", resp))
70
 
71
- return history
 
72
 
 
 
 
 
 
 
 
 
 
73
 
74
- def stage_response(message, category):
75
- """Stage a message into a category in dataset."""
76
- if not message.strip():
77
- return "Message is empty."
78
 
79
- if category not in dataset:
80
- dataset[category] = []
 
81
 
82
- dataset[category].append({"quote": message})
83
- return f"Message staged to category '{category}'."
 
 
84
 
 
 
 
 
 
 
 
 
 
85
 
86
- def download_json():
87
- return json.dumps(dataset, indent=2, ensure_ascii=False)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88
 
 
 
 
 
 
 
 
 
 
 
89
 
90
- def download_csv():
91
- csv_file = "dataset.csv"
92
- with open(csv_file, "w", newline="", encoding="utf-8") as f:
 
 
 
 
93
  writer = csv.writer(f)
94
- writer.writerow(["Category", "Quote"])
95
- for cat, items in dataset.items():
96
- if cat == "staged_responses":
97
- continue
98
- for item in items:
99
- writer.writerow([cat, item.get("quote", "")])
100
- return csv_file
101
 
 
 
 
 
 
 
 
 
 
 
 
102
 
103
- def clear_history():
104
- return []
 
105
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
106
 
107
  # -----------------------------
108
- # UI
109
  # -----------------------------
110
  with gr.Blocks() as demo:
111
- gr.Markdown("# 🎓 Campus Experience Chatbot")
 
 
 
 
 
 
112
 
113
- chatbot = gr.Chatbot(label="Conversation", type="messages")
114
- msg = gr.Textbox(label="Type your question here...", placeholder="Ask me anything about campus life", lines=2)
115
- category = gr.Dropdown(choices=[c for c in dataset.keys() if c != "staged_responses"], label="Select Category")
116
  send = gr.Button("Send")
117
- stage_btn = gr.Button("Stage conversation to category")
118
- download_json_btn = gr.Button("Download JSON")
119
- download_csv_btn = gr.Button("Download CSV")
120
- clear = gr.Button("Clear Conversation")
 
 
 
 
 
 
 
 
 
121
 
122
- send.click(chatbot_response, inputs=[msg, chatbot, category], outputs=chatbot)
123
- msg.submit(chatbot_response, inputs=[msg, chatbot, category], outputs=chatbot)
 
 
124
 
125
- stage_btn.click(stage_response, inputs=[msg, category], outputs=None)
126
- download_json_btn.click(download_json, outputs=gr.File())
127
- download_csv_btn.click(download_csv, outputs=gr.File())
128
- clear.click(clear_history, outputs=chatbot)
129
 
 
 
 
 
 
 
 
 
 
 
130
  if __name__ == "__main__":
131
- demo.launch()
 
1
  import json
 
 
2
  import os
3
+ import re
4
+ import csv
5
+ import tempfile
6
+ from difflib import SequenceMatcher
7
+ import datetime
8
  import gradio as gr
9
 
10
  # -----------------------------
 
12
  # -----------------------------
13
  DATA_PATH = "quotes.json"
14
 
15
+ def load_dataset():
16
+ if os.path.exists(DATA_PATH):
17
+ with open(DATA_PATH, "r", encoding="utf-8") as f:
18
+ data = json.load(f)
19
+ # ensure staged_responses exists
20
+ if "staged_responses" not in data:
21
+ data["staged_responses"] = []
22
+ return data
23
+ # default empty dataset with staged bucket
24
+ return {"staged_responses": []}
25
 
26
+ dataset = load_dataset()
27
 
28
  # -----------------------------
29
+ # Matching helpers
30
  # -----------------------------
31
+ def normalize_text(s: str) -> str:
32
+ return re.sub(r"\W+", " ", (s or "").lower()).strip()
33
+
34
+ def tokens(s: str):
35
+ return set(t for t in normalize_text(s).split() if t)
36
+
37
+ def score_quote(user_input: str, quote_text: str):
38
+ """
39
+ Score a quote vs user input:
40
+ - token overlap gets a boosted score
41
+ - otherwise fallback to SequenceMatcher ratio
42
  """
43
+ u_toks = tokens(user_input)
44
+ q_toks = tokens(quote_text)
45
+ overlap = len(u_toks & q_toks)
46
+ if overlap > 0:
47
+ # strong signal: >=1.0 plus a small bonus for proportion overlap
48
+ return 1.0 + (overlap / max(1, len(q_toks)))
49
+ # fuzzy fallback
50
+ return SequenceMatcher(None, user_input.lower(), quote_text.lower()).ratio()
51
+
52
+ def find_best_quotes(category, user_input, top_n=3, threshold=0.15):
53
+ """
54
+ Find best matches:
55
+ - try within `category` first (if provided)
56
+ - if none above `threshold`, search across all categories
57
+ - return list of tuples (score, quote, category)
58
+ - if nothing passes threshold, return empty list
59
  """
60
+ if not user_input or not user_input.strip():
61
+ return []
62
+
63
+ def score_list_for_cat(cat):
64
+ scored = []
65
+ for item in dataset.get(cat, []):
66
+ q = item.get("quote", "")
67
+ s = score_quote(user_input, q)
68
+ scored.append((s, q, cat))
69
+ return scored
70
+
71
+ # 1) try selected category first
72
+ scored = []
73
+ if category and category in dataset and category != "staged_responses":
74
+ scored = score_list_for_cat(category)
75
+ scored.sort(key=lambda x: x[0], reverse=True)
76
+ if scored and scored[0][0] >= threshold:
77
+ return scored[:top_n]
78
 
79
+ # 2) fallback: search all categories
80
+ all_scored = []
81
+ for cat in dataset.keys():
82
  if cat == "staged_responses":
83
  continue
84
+ all_scored.extend(score_list_for_cat(cat))
85
+ all_scored.sort(key=lambda x: x[0], reverse=True)
86
+ if all_scored and all_scored[0][0] >= threshold:
87
+ return all_scored[:top_n]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88
 
89
+ # 3) nothing
90
+ return []
91
 
92
+ # -----------------------------
93
+ # Response generation
94
+ # -----------------------------
95
+ def generate_three_fold(category, user_text):
96
+ matches = find_best_quotes(category, user_text, top_n=3, threshold=0.15)
97
+ if not matches:
98
+ # Unknown fallback
99
+ unknown_msg = f"No data about {user_text} (unknown)."
100
+ return unknown_msg, unknown_msg, "Reference: None"
101
 
102
+ # Build summary from top match's first sentence
103
+ top_quote = matches[0][1]
104
+ first_sentence = top_quote.split(".")[0].strip()
105
+ summary = f"Summary: {first_sentence}."
106
 
107
+ # Fusion: join unique quotes (up to 3)
108
+ fused = " ".join(dict.fromkeys([m[1] for m in matches])) # preserve order, remove duplicates
109
+ fusion = f"Fusion: {fused}"
110
 
111
+ # Reference: simple placeholder with category and top matched category
112
+ top_cat = matches[0][2]
113
+ reference = f"Reference: Example search for '{category}' (top match from '{top_cat}')."
114
+ return summary, fusion, reference
115
 
116
+ # -----------------------------
117
+ # Conversation & staging utilities
118
+ # -----------------------------
119
+ def append_user_assistant(history, user_text, assistant_text):
120
+ # history is a list of message dicts: {"role": "user"/"assistant", "content": "..."}
121
+ history = history or []
122
+ history.append({"role": "user", "content": user_text})
123
+ history.append({"role": "assistant", "content": assistant_text})
124
+ return history
125
 
126
+ def get_last_user_and_assistant(history):
127
+ # Find the last user message and the first assistant message that follows it
128
+ last_user = None
129
+ last_assistant = None
130
+ if not history:
131
+ return None, None
132
+ # traverse backwards
133
+ for i in range(len(history)-1, -1, -1):
134
+ msg = history[i]
135
+ if last_assistant is None and msg["role"] == "assistant":
136
+ last_assistant = msg["content"]
137
+ if msg["role"] == "user":
138
+ last_user = msg["content"]
139
+ # once we have both, break
140
+ break
141
+ # if assistant message came *before* last user (unlikely in our flow), try to find assistant after user
142
+ if last_user and not last_assistant:
143
+ for i in range(len(history)-1, -1, -1):
144
+ if history[i]["role"] == "assistant":
145
+ last_assistant = history[i]["content"]
146
+ break
147
+ return last_user, last_assistant
148
 
149
+ # -----------------------------
150
+ # File helpers (use temp files)
151
+ # -----------------------------
152
+ def write_temp_json(obj, suffix=".json"):
153
+ tf = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
154
+ path = tf.name
155
+ tf.close()
156
+ with open(path, "w", encoding="utf-8") as f:
157
+ json.dump(obj, f, indent=2, ensure_ascii=False)
158
+ return path
159
 
160
+ def write_temp_csv_from_history(history, suffix=".csv"):
161
+ if not history:
162
+ return None
163
+ tf = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
164
+ path = tf.name
165
+ tf.close()
166
+ with open(path, "w", newline="", encoding="utf-8") as f:
167
  writer = csv.writer(f)
168
+ writer.writerow(["role", "content"])
169
+ for m in history:
170
+ writer.writerow([m.get("role",""), m.get("content","")])
171
+ return path
 
 
 
172
 
173
+ # -----------------------------
174
+ # Gradio callbacks (UI-safe)
175
+ # -----------------------------
176
+ def respond(message, state, category):
177
+ """
178
+ Called by Send button or Enter.
179
+ Returns: cleared input, updated state, updated chatbot display (state replicated)
180
+ """
181
+ history = state or []
182
+ if not (message and message.strip()):
183
+ return "", history, history
184
 
185
+ # generate 3-fold reply
186
+ summary, fusion, reference = generate_three_fold(category, message)
187
+ assistant_text = f"{summary}\n\n{fusion}\n\n{reference}"
188
 
189
+ history = append_user_assistant(history, message, assistant_text)
190
+ return "", history, history
191
+
192
+ def clear_all():
193
+ # clears textbox, state and chatbot
194
+ return "", [], []
195
+
196
+ def upload_json(filepath):
197
+ """Load uploaded dataset file (filepath is local path inside container)"""
198
+ global dataset, DATA_PATH
199
+ try:
200
+ with open(filepath, "r", encoding="utf-8") as f:
201
+ data = json.load(f)
202
+ if not isinstance(data, dict):
203
+ return "Upload failed: root must be an object", gr.update(choices=sorted(list(dataset.keys())), value=None)
204
+ # ensure staged_responses exists
205
+ if "staged_responses" not in data:
206
+ data["staged_responses"] = []
207
+ dataset = data
208
+ DATA_PATH = os.path.basename(filepath)
209
+ cats = sorted([k for k in dataset.keys() if k != "staged_responses"])
210
+ status = f"Loaded {len(cats)} categories from {DATA_PATH}."
211
+ return status, gr.update(choices=cats, value=(cats[0] if cats else None))
212
+ except Exception as e:
213
+ return f"Error loading file: {e}", gr.update(choices=sorted(list(dataset.keys())), value=None)
214
+
215
+ def stage_last_conversation(state, target_category):
216
+ """
217
+ Stage the last user + assistant pair into dataset['staged_responses']
218
+ (stored as {"question":..., "answer":..., "category":...})
219
+ """
220
+ if not state:
221
+ return "No conversation in memory."
222
+ last_user, last_assistant = get_last_user_and_assistant(state)
223
+ if not last_user:
224
+ return "No user message to stage."
225
+ entry = {"question": last_user, "answer": last_assistant or "", "category": target_category}
226
+ if "staged_responses" not in dataset:
227
+ dataset["staged_responses"] = []
228
+ dataset["staged_responses"].append(entry)
229
+ return f"Staged last Q/A into '{target_category}'."
230
+
231
+ def download_conversation_csv(state):
232
+ # return gr.File.update(value=path) so the File component triggers download
233
+ path = write_temp_csv_from_history(state or [])
234
+ if not path:
235
+ return gr.File.update(value=None)
236
+ return gr.File.update(value=path)
237
+
238
+ def download_current_dataset():
239
+ # include staged_responses in dataset (already in memory)
240
+ path = write_temp_json(dataset, suffix=".json")
241
+ return gr.File.update(value=path)
242
 
243
  # -----------------------------
244
+ # Gradio UI (components + wiring)
245
  # -----------------------------
246
  with gr.Blocks() as demo:
247
+ gr.Markdown("## Campus Life — 3-fold responses, staging, CSV/JSON downloads")
248
+
249
+ # dropdown choices exclude staged_responses
250
+ category_choices = sorted([k for k in dataset.keys() if k != "staged_responses"])
251
+ with gr.Row():
252
+ category = gr.Dropdown(label="Category", choices=category_choices,
253
+ value=(category_choices[0] if category_choices else None))
254
 
255
+ chatbot = gr.Chatbot(label="Conversation", height=360, type="messages")
256
+ state = gr.State([]) # holds list of {"role":..,"content":..}
257
+ msg = gr.Textbox(label="Your message", placeholder="Type and press Enter (or click Send)", autofocus=True)
258
  send = gr.Button("Send")
259
+ clear = gr.Button("Clear")
260
+
261
+ with gr.Row():
262
+ stage_btn = gr.Button("Stage last Q/A to category")
263
+ stage_status = gr.Textbox(label="Stage status", interactive=False, value="")
264
+
265
+ with gr.Row():
266
+ upload = gr.File(label="Upload dataset (.json)", file_types=[".json"], type="filepath")
267
+ upload_status = gr.Textbox(label="Upload status", interactive=False, value="")
268
+ download_json_btn = gr.Button("Download current dataset (JSON)")
269
+ download_json_file = gr.File(label="Download JSON", interactive=False)
270
+ download_csv_btn = gr.Button("Download conversation (CSV)")
271
+ download_csv_file = gr.File(label="Download CSV", interactive=False)
272
 
273
+ # events
274
+ msg.submit(respond, [msg, state, category], [msg, state, chatbot])
275
+ send.click(respond, [msg, state, category], [msg, state, chatbot])
276
+ clear.click(clear_all, [], [msg, state, chatbot])
277
 
278
+ stage_btn.click(stage_last_conversation, [state, category], stage_status)
 
 
 
279
 
280
+ upload.upload(upload_json, upload, [upload_status, category])
281
+
282
+ download_csv_btn.click(download_conversation_csv, state, download_csv_file)
283
+ download_json_btn.click(download_current_dataset, None, download_json_file)
284
+
285
+ # -----------------------------
286
+ # Startup log
287
+ # -----------------------------
288
+ print("===== Application startup =====")
289
+ print(f"Dataset categories: {[k for k in dataset.keys()]}")
290
  if __name__ == "__main__":
291
+ demo.launch(server_name="0.0.0.0", server_port=7860)