|
|
import os
|
|
|
import json
|
|
|
from sentence_transformers import SentenceTransformer
|
|
|
from sklearn.metrics.pairwise import cosine_similarity
|
|
|
import numpy as np
|
|
|
from huggingface_hub import upload_file, hf_hub_download, InferenceClient
|
|
|
from flask import Flask, request, jsonify
|
|
|
import time
|
|
|
|
|
|
|
|
|
os.environ["HF_HOME"] = "/tmp/.cache"
|
|
|
os.environ["HF_DATASETS_CACHE"] = "/tmp/.cache"
|
|
|
os.environ["SENTENCE_TRANSFORMERS_HOME"] = "/tmp/.cache"
|
|
|
os.makedirs("/tmp/.cache", exist_ok=True)
|
|
|
os.makedirs("/tmp/outputs", exist_ok=True)
|
|
|
|
|
|
|
|
|
embedding_model = SentenceTransformer('paraphrase-mpnet-base-v2')
|
|
|
token = os.getenv("HF_TOKEN") or os.getenv("NEW_PUP_AI_Project")
|
|
|
inference_client = InferenceClient(
|
|
|
model="mistralai/Mixtral-8x7B-Instruct-v0.1",
|
|
|
token=token
|
|
|
)
|
|
|
|
|
|
|
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
|
DATASET_PATH = os.path.join(BASE_DIR, "dataset.json")
|
|
|
with open(DATASET_PATH, "r") as f:
|
|
|
dataset = json.load(f)
|
|
|
|
|
|
questions = [item["question"] for item in dataset]
|
|
|
answers = [item["answer"] for item in dataset]
|
|
|
question_embeddings = embedding_model.encode(questions, convert_to_tensor=True)
|
|
|
|
|
|
|
|
|
feedback_data = []
|
|
|
feedback_questions = []
|
|
|
feedback_embeddings = None
|
|
|
dev_mode = {"enabled": False}
|
|
|
feedback_path = "/tmp/outputs/feedback.json"
|
|
|
COMMENTS_PATH = "/tmp/outputs/Comments.json"
|
|
|
|
|
|
if not os.path.exists(COMMENTS_PATH):
|
|
|
with open(COMMENTS_PATH, "w") as f:
|
|
|
json.dump([], f, indent=4)
|
|
|
|
|
|
try:
|
|
|
hf_token = os.getenv("NEW_PUP_AI_Project")
|
|
|
downloaded_path = hf_hub_download(
|
|
|
repo_id="oceddyyy/University_Inquiries_Feedback",
|
|
|
filename="feedback.json",
|
|
|
repo_type="dataset",
|
|
|
token=hf_token
|
|
|
)
|
|
|
with open(downloaded_path, "r") as f:
|
|
|
feedback_data = json.load(f)
|
|
|
feedback_questions = [item["question"] for item in feedback_data]
|
|
|
if feedback_questions:
|
|
|
feedback_embeddings = embedding_model.encode(feedback_questions, convert_to_tensor=True)
|
|
|
|
|
|
with open(feedback_path, "w") as f_local:
|
|
|
json.dump(feedback_data, f_local, indent=4)
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"[Startup] Feedback not loaded from Hugging Face. Using local only. Reason: {e}")
|
|
|
feedback_data = []
|
|
|
|
|
|
|
|
|
def upload_file_to_hf(local_path, remote_filename):
|
|
|
"""Helper to upload any file to Hugging Face dataset repo."""
|
|
|
hf_token = os.getenv("NEW_PUP_AI_Project")
|
|
|
if not hf_token:
|
|
|
raise ValueError("Hugging Face token not found in environment variables!")
|
|
|
|
|
|
try:
|
|
|
upload_file(
|
|
|
path_or_fileobj=local_path,
|
|
|
path_in_repo=remote_filename,
|
|
|
repo_id="oceddyyy/University_Inquiries_Feedback",
|
|
|
repo_type="dataset",
|
|
|
token=hf_token
|
|
|
)
|
|
|
print(f"{remote_filename} uploaded to Hugging Face successfully.")
|
|
|
except Exception as e:
|
|
|
print(f"Error uploading {remote_filename} to HF: {e}")
|
|
|
|
|
|
|
|
|
def chatbot_response(query, dev_mode_flag):
|
|
|
query_embedding = embedding_model.encode([query], convert_to_tensor=True)
|
|
|
|
|
|
if feedback_embeddings is not None:
|
|
|
feedback_scores = cosine_similarity(query_embedding.cpu().numpy(), feedback_embeddings.cpu().numpy())[0]
|
|
|
best_idx = int(np.argmax(feedback_scores))
|
|
|
best_score = feedback_scores[best_idx]
|
|
|
matched_feedback = feedback_data[best_idx]
|
|
|
|
|
|
base_threshold = 0.8
|
|
|
upvotes = matched_feedback.get("upvotes", 0)
|
|
|
downvotes = matched_feedback.get("downvotes", 0)
|
|
|
adjusted_threshold = base_threshold - (0.01 * upvotes) + (0.01 * downvotes)
|
|
|
dynamic_threshold = min(max(adjusted_threshold, 0.4), 1.0)
|
|
|
|
|
|
if best_score >= dynamic_threshold:
|
|
|
return matched_feedback["response"], "Feedback", 0.0
|
|
|
|
|
|
similarity_scores = cosine_similarity(query_embedding.cpu().numpy(), question_embeddings.cpu().numpy())[0]
|
|
|
top_k = 3
|
|
|
top_k_indices = np.argsort(similarity_scores)[-top_k:][::-1]
|
|
|
top_k_items = [dataset[idx] for idx in top_k_indices]
|
|
|
top_k_scores = [similarity_scores[idx] for idx in top_k_indices]
|
|
|
|
|
|
matched_item = top_k_items[0]
|
|
|
matched_a = matched_item.get("answer", "")
|
|
|
matched_source = matched_item.get("source", "PUP Handbook")
|
|
|
best_score = top_k_scores[0]
|
|
|
|
|
|
if dev_mode_flag:
|
|
|
context = ""
|
|
|
for i, item in enumerate(top_k_items):
|
|
|
context += f"Relevant info #{i+1} (score: {top_k_scores[i]:.2f}):\n\"{item.get('answer', '')}\"\n\n"
|
|
|
|
|
|
prompt = (
|
|
|
f"You are an expert university assistant. "
|
|
|
f"A student asked: \"{query}\"\n"
|
|
|
f"Here are the most relevant handbook information snippets:\n{context}"
|
|
|
f"Using only the information above, answer the student's question in your own words. "
|
|
|
f"If the handbook info is not relevant, say you don't know."
|
|
|
)
|
|
|
|
|
|
try:
|
|
|
start_time = time.time()
|
|
|
response = ""
|
|
|
|
|
|
if hasattr(inference_client, "chat_completion"):
|
|
|
conversation = [
|
|
|
{"role": "system", "content": "You are an expert university assistant."},
|
|
|
{"role": "user", "content": prompt}
|
|
|
]
|
|
|
llm_response = inference_client.chat_completion(
|
|
|
messages=conversation,
|
|
|
model="mistralai/Mixtral-8x7B-Instruct-v0.1",
|
|
|
max_tokens=200,
|
|
|
temperature=0.7
|
|
|
)
|
|
|
if isinstance(llm_response, dict) and "choices" in llm_response:
|
|
|
response = llm_response["choices"][0]["message"]["content"]
|
|
|
elif hasattr(llm_response, "generated_text"):
|
|
|
response = llm_response.generated_text
|
|
|
else:
|
|
|
llm_response = inference_client.text_generation(
|
|
|
prompt,
|
|
|
max_new_tokens=200,
|
|
|
temperature=0.7
|
|
|
)
|
|
|
if isinstance(llm_response, dict) and "generated_text" in llm_response:
|
|
|
response = llm_response["generated_text"]
|
|
|
elif hasattr(llm_response, "generated_text"):
|
|
|
response = llm_response.generated_text
|
|
|
|
|
|
elapsed = time.time() - start_time
|
|
|
|
|
|
if not response.strip() or response.strip() == matched_a.strip():
|
|
|
if "month" in matched_item and "year" in matched_item:
|
|
|
response = f"As of {matched_item['month']}, {matched_item['year']}, {matched_a}"
|
|
|
else:
|
|
|
response = f"According to 2019 Proposed PUP Handbook, {matched_a}"
|
|
|
return response.strip(), matched_source, elapsed
|
|
|
|
|
|
except Exception as e:
|
|
|
error_msg = f"[ERROR] HF inference failed: {e}"
|
|
|
return f"(UnivAI+++ error: {error_msg})", matched_source, 0.0
|
|
|
|
|
|
if best_score < 0.4:
|
|
|
response = "Sorry, but the PUP handbook does not contain such information."
|
|
|
else:
|
|
|
if "month" in matched_item and "year" in matched_item:
|
|
|
response = f"As of {matched_item['month']}, {matched_item['year']}, {matched_a}"
|
|
|
else:
|
|
|
response = f"According to 2019 Proposed PUP Handbook, {matched_a}"
|
|
|
return response.strip(), matched_source, 0.0
|
|
|
|
|
|
|
|
|
def record_feedback(feedback, query, response, comment=None):
|
|
|
"""Records user feedback and optional comment."""
|
|
|
global feedback_embeddings, feedback_questions
|
|
|
matched = False
|
|
|
new_embedding = embedding_model.encode([query], convert_to_tensor=True)
|
|
|
|
|
|
for item in feedback_data:
|
|
|
existing_embedding = embedding_model.encode([item["question"]], convert_to_tensor=True)
|
|
|
similarity = cosine_similarity(existing_embedding.cpu().numpy(), new_embedding.cpu().numpy())[0][0]
|
|
|
if similarity >= 0.8 and item["response"] == response:
|
|
|
matched = True
|
|
|
votes = {"positive": "upvotes", "negative": "downvotes"}
|
|
|
item[votes[feedback]] = item.get(votes[feedback], 0) + 1
|
|
|
break
|
|
|
|
|
|
if not matched:
|
|
|
entry = {
|
|
|
"question": query,
|
|
|
"response": response,
|
|
|
"feedback": feedback,
|
|
|
"upvotes": 1 if feedback == "positive" else 0,
|
|
|
"downvotes": 1 if feedback == "negative" else 0
|
|
|
}
|
|
|
feedback_data.append(entry)
|
|
|
|
|
|
with open(feedback_path, "w") as f:
|
|
|
json.dump(feedback_data, f, indent=4)
|
|
|
|
|
|
feedback_questions = [item["question"] for item in feedback_data]
|
|
|
if feedback_questions:
|
|
|
feedback_embeddings = embedding_model.encode(feedback_questions, convert_to_tensor=True)
|
|
|
|
|
|
upload_file_to_hf(feedback_path, "feedback.json")
|
|
|
|
|
|
|
|
|
if comment and comment.strip():
|
|
|
try:
|
|
|
with open(COMMENTS_PATH, "r") as f:
|
|
|
comments_list = json.load(f)
|
|
|
except json.JSONDecodeError:
|
|
|
comments_list = []
|
|
|
|
|
|
comment_entry = {
|
|
|
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
|
|
|
"question": query,
|
|
|
"response": response,
|
|
|
"feedback": feedback,
|
|
|
"comment": comment.strip()
|
|
|
}
|
|
|
comments_list.append(comment_entry)
|
|
|
|
|
|
with open(COMMENTS_PATH, "w") as f:
|
|
|
json.dump(comments_list, f, indent=4)
|
|
|
|
|
|
upload_file_to_hf(COMMENTS_PATH, "Comments.json")
|
|
|
|
|
|
app = Flask(__name__)
|
|
|
|
|
|
@app.route("/api/chat", methods=["POST"])
|
|
|
def chat():
|
|
|
data = request.json
|
|
|
query = data.get("query", "")
|
|
|
dev = data.get("dev_mode", False)
|
|
|
dev_mode["enabled"] = dev
|
|
|
response, source, elapsed = chatbot_response(query, dev)
|
|
|
return jsonify({"response": response, "source": source, "response_time": elapsed})
|
|
|
|
|
|
@app.route("/api/feedback", methods=["POST"])
|
|
|
def feedback():
|
|
|
data = request.json
|
|
|
query = data.get("query", "")
|
|
|
response = data.get("response", "")
|
|
|
feedback_type = data.get("feedback", "")
|
|
|
comment = data.get("comment", None)
|
|
|
record_feedback(feedback_type, query, response, comment)
|
|
|
return jsonify({"status": "success"})
|
|
|
|
|
|
@app.route("/", methods=["GET"])
|
|
|
def index():
|
|
|
return "University Inquiries AI Chatbot API. Use POST /api/chat or /api/feedback.", 200
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
app.run(host="0.0.0.0", port=7861)
|
|
|
|