igortech commited on
Commit
92c739f
·
verified ·
1 Parent(s): 19601ea

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +84 -217
app.py CHANGED
@@ -3,8 +3,8 @@ import os
3
  import re
4
  import csv
5
  import tempfile
6
- from difflib import SequenceMatcher
7
  import datetime
 
8
  import gradio as gr
9
 
10
  # -----------------------------
@@ -12,260 +12,127 @@ import gradio as gr
12
  # -----------------------------
13
  DATA_PATH = "quotes.json"
14
 
 
15
  def load_dataset():
16
  if os.path.exists(DATA_PATH):
17
  with open(DATA_PATH, "r", encoding="utf-8") as f:
18
- data = json.load(f)
19
- if "staged_responses" not in data:
20
- data["staged_responses"] = []
21
- return data
22
  return {"staged_responses": []}
23
 
 
 
 
 
 
 
24
  dataset = load_dataset()
25
 
26
  # -----------------------------
27
- # Matching helpers
28
  # -----------------------------
29
- def normalize_text(s: str) -> str:
30
- return re.sub(r"\W+", " ", (s or "").lower()).strip()
31
-
32
- def tokens(s: str):
33
- return set(t for t in normalize_text(s).split() if t)
34
-
35
- def score_quote(user_input: str, quote_text: str):
36
- """
37
- Score a quote vs user input:
38
- - token overlap yields a boosted score
39
- - otherwise fallback to SequenceMatcher ratio
40
- """
41
- u_toks = tokens(user_input)
42
- q_toks = tokens(quote_text)
43
- overlap = len(u_toks & q_toks)
44
- if overlap > 0:
45
- return 1.0 + (overlap / max(1, len(q_toks)))
46
- return SequenceMatcher(None, user_input.lower(), quote_text.lower()).ratio()
47
-
48
- def find_best_quotes(category, user_input, top_n=3, threshold=0.15):
49
- """
50
- Find best matches:
51
- - try within `category` first (if provided)
52
- - if none above `threshold`, search across all categories
53
- - return list of tuples (score, quote, category)
54
- """
55
- if not user_input or not user_input.strip():
56
- return []
57
-
58
- def score_list_for_cat(cat):
59
- scored = []
60
- for item in dataset.get(cat, []):
61
- q = item.get("quote", "")
62
- s = score_quote(user_input, q)
63
- scored.append((s, q, cat))
64
- return scored
65
-
66
- # 1) search selected category first (if present)
67
- if category and category in dataset and category != "staged_responses":
68
- scored = score_list_for_cat(category)
69
- scored.sort(key=lambda x: x[0], reverse=True)
70
- if scored and scored[0][0] >= threshold:
71
- return scored[:top_n]
72
-
73
- # 2) fallback: search all categories
74
- all_scored = []
75
- for cat in dataset.keys():
76
- if cat == "staged_responses":
77
  continue
78
- all_scored.extend(score_list_for_cat(cat))
79
- all_scored.sort(key=lambda x: x[0], reverse=True)
80
- if all_scored and all_scored[0][0] >= threshold:
81
- return all_scored[:top_n]
 
 
82
 
83
- # 3) nothing matches well enough
84
- return []
85
 
86
- # -----------------------------
87
- # Response generation
88
- # -----------------------------
89
- def generate_three_fold(category, user_text):
90
- matches = find_best_quotes(category, user_text, top_n=3, threshold=0.15)
91
  if not matches:
92
- unknown_msg = f"No data about {user_text} (unknown)."
93
- return unknown_msg, unknown_msg, "Reference: None"
94
-
95
- top_quote = matches[0][1]
96
- first_sentence = top_quote.split(".")[0].strip()
97
- summary = f"Summary: {first_sentence}."
98
- fused = " ".join(dict.fromkeys([m[1] for m in matches])) # unique preserve order
99
- fusion = f"Fusion: {fused}"
100
- top_cat = matches[0][2]
101
- reference = f"Reference: Example search for '{category}' (top match from '{top_cat}')."
102
- return summary, fusion, reference
 
103
 
104
  # -----------------------------
105
- # Conversation & staging utilities
106
  # -----------------------------
107
- def append_user_assistant(history, user_text, assistant_text):
108
- history = history or []
109
- history.append({"role": "user", "content": user_text})
110
- history.append({"role": "assistant", "content": assistant_text})
111
- return history
112
-
113
- def get_last_user_and_assistant(history):
114
- last_user = None
115
- last_assistant = None
116
  if not history:
117
- return None, None
118
- # find last user and assistant after it
119
- # traverse backwards to find last user; then find next assistant after that index
120
- last_user_idx = None
121
- for i in range(len(history)-1, -1, -1):
122
- if history[i].get("role") == "user":
123
- last_user_idx = i
124
- last_user = history[i].get("content")
125
- break
126
- if last_user_idx is not None:
127
- # find assistant after user (forward from user index)
128
- for j in range(last_user_idx+1, len(history)):
129
- if history[j].get("role") == "assistant":
130
- last_assistant = history[j].get("content")
131
- break
132
- return last_user, last_assistant
133
 
134
  # -----------------------------
135
- # Temp file helpers
136
  # -----------------------------
137
- def write_temp_json(obj, suffix=".json"):
138
- tf = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
139
- path = tf.name
140
- tf.close()
141
- with open(path, "w", encoding="utf-8") as f:
142
- json.dump(obj, f, indent=2, ensure_ascii=False)
143
- return path
144
-
145
- def write_temp_csv_from_history(history, suffix=".csv"):
146
  if not history:
147
  return None
148
- tf = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
149
- path = tf.name
150
- tf.close()
151
- with open(path, "w", newline="", encoding="utf-8") as f:
152
- writer = csv.writer(f)
153
- writer.writerow(["role", "content"])
154
- for m in history:
155
- writer.writerow([m.get("role",""), m.get("content","")])
156
- return path
157
 
158
- # -----------------------------
159
- # Gradio callbacks (UI-safe)
160
- # -----------------------------
161
- def respond(message, state, category):
162
- """
163
- Called by Send button or Enter.
164
- Returns: cleared input, updated state, updated chatbot display (state replicated)
165
- """
166
- history = state or []
167
- if not (message and message.strip()):
168
- return "", history, history
169
-
170
- summary, fusion, reference = generate_three_fold(category, message)
171
- assistant_text = f"{summary}\n\n{fusion}\n\n{reference}"
172
- history = append_user_assistant(history, message, assistant_text)
173
- return "", history, history
174
-
175
- def clear_all():
176
- # clear textbox, state and chatbot
177
- return "", [], []
178
-
179
- def upload_json(filepath):
180
- """Load uploaded dataset file (filepath is local path inside container)"""
181
- global dataset, DATA_PATH
182
- try:
183
- with open(filepath, "r", encoding="utf-8") as f:
184
- data = json.load(f)
185
- if not isinstance(data, dict):
186
- return "Upload failed: root must be an object", gr.update(choices=sorted(list(dataset.keys())), value=None)
187
- if "staged_responses" not in data:
188
- data["staged_responses"] = []
189
- dataset = data
190
- DATA_PATH = os.path.basename(filepath)
191
- cats = sorted([k for k in dataset.keys() if k != "staged_responses"])
192
- status = f"Loaded {len(cats)} categories from {DATA_PATH}."
193
- return status, gr.update(choices=cats, value=(cats[0] if cats else None))
194
- except Exception as e:
195
- return f"Error loading file: {e}", gr.update(choices=sorted(list(dataset.keys())), value=None)
196
-
197
- def stage_last_conversation(state, target_category):
198
- """
199
- Stage the last user + assistant pair into dataset['staged_responses']
200
- (stored as {"question":..., "answer":..., "category":...})
201
- """
202
- if not state:
203
- return "No conversation in memory."
204
- last_user, last_assistant = get_last_user_and_assistant(state)
205
- if not last_user:
206
- return "No user message to stage."
207
- entry = {"question": last_user, "answer": last_assistant or "", "category": target_category}
208
- if "staged_responses" not in dataset:
209
- dataset["staged_responses"] = []
210
- dataset["staged_responses"].append(entry)
211
- return f"Staged last Q/A into '{target_category}'."
212
 
213
- def download_conversation_csv(state):
214
- path = write_temp_csv_from_history(state or [])
215
- if not path:
216
- return gr.File.update(value=None)
217
- return gr.File.update(value=path)
218
 
219
- def download_current_dataset():
220
- path = write_temp_json(dataset, suffix=".json")
221
- return gr.File.update(value=path)
222
 
223
  # -----------------------------
224
- # Gradio UI (components + wiring)
225
  # -----------------------------
226
  with gr.Blocks() as demo:
227
- gr.Markdown("## Campus Life — 3-fold responses, staging, CSV/JSON downloads")
228
-
229
- # dropdown choices exclude staged_responses
230
- category_choices = sorted([k for k in dataset.keys() if k != "staged_responses"])
231
- with gr.Row():
232
- category = gr.Dropdown(label="Category", choices=category_choices,
233
- value=(category_choices[0] if category_choices else None))
234
 
235
  chatbot = gr.Chatbot(label="Conversation", height=360, type="messages")
236
- conversation_state = gr.State([]) # holds list of {"role":..,"content":..}
237
- msg = gr.Textbox(label="Your message", placeholder="Type and press Enter (or click Send)", autofocus=True)
238
- send = gr.Button("Send")
239
- clear = gr.Button("Clear")
240
 
241
  with gr.Row():
242
- stage_btn = gr.Button("Stage last Q/A to category")
243
- stage_status = gr.Textbox(label="Stage status", interactive=False, value="")
 
244
 
245
  with gr.Row():
246
- upload = gr.File(label="Upload dataset (.json)", file_types=[".json"], type="filepath")
247
- upload_status = gr.Textbox(label="Upload status", interactive=False, value="")
248
- download_json_btn = gr.Button("Download current dataset (JSON)")
249
- download_json_file = gr.File(label="Download JSON", interactive=True)
250
- download_csv_btn = gr.Button("Download conversation (CSV)")
251
- download_csv_file = gr.File(label="Download CSV", interactive=True)
252
 
253
- # events
254
- msg.submit(respond, [msg, conversation_state, category], [msg, conversation_state, chatbot])
255
- send.click(respond, [msg, conversation_state, category], [msg, conversation_state, chatbot])
256
- clear.click(clear_all, [], [msg, conversation_state, chatbot])
257
 
258
- stage_btn.click(stage_last_conversation, [conversation_state, category], stage_status)
 
259
 
260
- upload.upload(upload_json, upload, [upload_status, category])
 
 
 
261
 
262
- download_csv_btn.click(download_conversation_csv, [conversation_state], download_csv_file)
263
- download_json_btn.click(download_current_dataset, None, download_json_file)
264
 
265
- # -----------------------------
266
- # Startup log
267
- # -----------------------------
268
- print("===== Application startup =====")
269
- print(f"Dataset categories: {[k for k in dataset.keys()]}")
270
  if __name__ == "__main__":
271
- demo.launch(server_name="0.0.0.0", server_port=7860)
 
3
  import re
4
  import csv
5
  import tempfile
 
6
  import datetime
7
+ from difflib import SequenceMatcher
8
  import gradio as gr
9
 
10
  # -----------------------------
 
12
  # -----------------------------
13
  DATA_PATH = "quotes.json"
14
 
15
+
16
  def load_dataset():
17
  if os.path.exists(DATA_PATH):
18
  with open(DATA_PATH, "r", encoding="utf-8") as f:
19
+ return json.load(f)
 
 
 
20
  return {"staged_responses": []}
21
 
22
+
23
+ def save_dataset(data):
24
+ with open(DATA_PATH, "w", encoding="utf-8") as f:
25
+ json.dump(data, f, indent=2, ensure_ascii=False)
26
+
27
+
28
  dataset = load_dataset()
29
 
30
  # -----------------------------
31
+ # Core logic
32
  # -----------------------------
33
+ def find_best_matches(user_input, dataset, top_n=3, threshold=0.3):
34
+ matches = []
35
+ for category, quotes in dataset.items():
36
+ if category == "staged_responses":
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37
  continue
38
+ for entry in quotes:
39
+ quote = entry["quote"]
40
+ score = SequenceMatcher(None, user_input.lower(), quote.lower()).ratio()
41
+ matches.append((score, category, quote))
42
+ matches.sort(key=lambda x: x[0], reverse=True)
43
+ return [m for m in matches if m[0] >= threshold][:top_n]
44
 
 
 
45
 
46
+ def generate_response(message, history):
47
+ matches = find_best_matches(message, dataset)
48
+
 
 
49
  if not matches:
50
+ return (
51
+ history
52
+ + [{"role": "assistant", "content": f"No data about {message}."}]
53
+ )
54
+
55
+ responses = []
56
+ for score, category, quote in matches:
57
+ responses.append(f"Category: {category}\nWhat real people say:\n{quote}")
58
+
59
+ reply = "\n\n".join(responses)
60
+ return history + [{"role": "assistant", "content": reply}]
61
+
62
 
63
  # -----------------------------
64
+ # Conversation & staging
65
  # -----------------------------
66
+ def stage_conversation(history, category):
 
 
 
 
 
 
 
 
67
  if not history:
68
+ return "No conversation to stage."
69
+
70
+ convo_text = "\n".join([f"{msg['role']}: {msg['content']}" for msg in history])
71
+
72
+ new_entry = {"quote": convo_text}
73
+
74
+ if "staged_responses" not in dataset:
75
+ dataset["staged_responses"] = []
76
+ dataset["staged_responses"].append(new_entry)
77
+
78
+ save_dataset(dataset)
79
+ return f"Conversation staged under {category}."
80
+
 
 
 
81
 
82
  # -----------------------------
83
+ # Download helpers
84
  # -----------------------------
85
+ def download_conversation_csv(history):
 
 
 
 
 
 
 
 
86
  if not history:
87
  return None
 
 
 
 
 
 
 
 
 
88
 
89
+ tmpfile = tempfile.NamedTemporaryFile(delete=False, suffix=".csv", mode="w", newline="", encoding="utf-8")
90
+ writer = csv.writer(tmpfile)
91
+ writer.writerow(["role", "content"])
92
+ for msg in history:
93
+ writer.writerow([msg["role"], msg["content"]])
94
+ tmpfile.close()
95
+ return tmpfile.name
96
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97
 
98
+ def download_dataset():
99
+ tmpfile = tempfile.NamedTemporaryFile(delete=False, suffix=".json", mode="w", encoding="utf-8")
100
+ json.dump(dataset, tmpfile, indent=2, ensure_ascii=False)
101
+ tmpfile.close()
102
+ return tmpfile.name
103
 
 
 
 
104
 
105
  # -----------------------------
106
+ # Gradio UI
107
  # -----------------------------
108
  with gr.Blocks() as demo:
109
+ gr.Markdown("# Campus Conversation Bot")
 
 
 
 
 
 
110
 
111
  chatbot = gr.Chatbot(label="Conversation", height=360, type="messages")
112
+ msg = gr.Textbox(label="Type your question", placeholder="Ask me something...", container=True)
 
 
 
113
 
114
  with gr.Row():
115
+ clear_btn = gr.Button("Clear")
116
+ export_csv_btn = gr.Button("Export Conversation to CSV")
117
+ download_json_btn = gr.Button("Download Current Dataset")
118
 
119
  with gr.Row():
120
+ category_dropdown = gr.Dropdown(choices=list(dataset.keys()), label="Choose category to stage", interactive=True)
121
+ stage_btn = gr.Button("Stage Conversation to Category")
 
 
 
 
122
 
123
+ # Events
124
+ msg.submit(generate_response, [msg, chatbot], chatbot)
125
+ msg.submit(lambda: "", None, msg) # clear textbox on Enter
126
+ clear_btn.click(lambda: [], None, chatbot)
127
 
128
+ export_csv_file = gr.File(label="Download Conversation CSV")
129
+ export_csv_btn.click(download_conversation_csv, chatbot, export_csv_file)
130
 
131
+ download_json_file = gr.File(label="Download Dataset JSON")
132
+ download_json_btn.click(download_dataset, None, download_json_file)
133
+
134
+ stage_btn.click(stage_conversation, [chatbot, category_dropdown], None)
135
 
 
 
136
 
 
 
 
 
 
137
  if __name__ == "__main__":
138
+ demo.launch()