botsi's picture
Update app.py
d35bd86 verified
raw
history blame
7.16 kB
import gradio as gr
import time
import random
import json
import mysql.connector
import os
import csv
from datetime import datetime
from huggingface_hub import Repository, hf_hub_download
DATASET_REPO_URL = "https://huggingface.co/datasets/botsi/trust-game-llama-2-7b-chat"
DATASET_REPO_ID = "botsi/trust-game-llama-2-7b-chat"
DATA_FILENAME = "history_trust-game-llama-2-7b-chat.csv"
DATA_FILE = os.path.join("data", DATA_FILENAME)
HF_TOKEN = os.environ.get("HF_TOKEN")
# Initialize Hugging Face Dataset Repository
try:
hf_hub_download(
repo_id=DATASET_REPO_ID,
filename=DATA_FILENAME,
force_filename=DATA_FILENAME
)
except:
print("File not found")
repo = Repository(
local_dir="data", clone_from=DATASET_REPO_URL, use_auth_token=HF_TOKEN
)
# Store Message Function
def store_message(name: str, message: str):
if name and message:
with open(DATA_FILE, "a") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=["name", "message", "time"])
writer.writerow(
{"name": name, "message": message, "time": str(datetime.now())}
)
commit_url = repo.push_to_hub()
return generate_html()
session_index = None
get_window_url_params = """
function() {
const params = new URLSearchParams(window.location.search);
const url_params = Object.fromEntries(params);
return url_params;
}
"""
get_window_session_index = """
function() {
const urlParams = new URLSearchParams(window.location.search);
const session_index = urlParams.get('session_index');
return session_index;
}
"""
with gr.Blocks() as demo:
gr.Markdown("""## Gradio send queryparam to chatbot
type `read query`
""")
url_params = gr.JSON({}, visible=False, label="URL Params")
chatbot = gr.Chatbot().style(height=500)
msg = gr.Textbox()
clear = gr.Button("Clear")
def user(user_message, url_params, history):
return "", history + [[user_message, None]]
def bot(history, url_params):
if "read query" in history[-1][0]:
session_index = url_params.get('session_index')
bot_message = f"""
here your URL params:
{json.dumps(url_params, indent=4)}
"""
# Connect to the database
conn = mysql.connector.connect(
host="18.153.94.89",
user="root",
password="N12RXMKtKxRj",
database="lionessdb"
)
# Create a cursor object
cursor = conn.cursor()
# Replace the placeholders with your actual database and table names
core_table = "e5390g37096_core"
decisions_table = "e5390g37096_decisions"
session_index = session_index
#print(session_index)
playerNr = "1"
# Query to fetch relevant data from both tables based on session_index
query = f"""
SELECT
{core_table}.playerNr,
{core_table}.subjectNr,
{core_table}.onPage,
{decisions_table}.transfer1,
{decisions_table}.tripledAmount1,
{decisions_table}.keptForSelf1,
{decisions_table}.returned1,
{decisions_table}.newCreditRound2,
{decisions_table}.transfer2,
{decisions_table}.tripledAmount2,
{decisions_table}.keptForSelf2,
{decisions_table}.returned2,
{decisions_table}.resultsWithoutAI
FROM {core_table}
JOIN {decisions_table} ON
{core_table}.playerNr = {decisions_table}.playerNr AND
{core_table}.groupNr = {decisions_table}.groupNr AND
{core_table}.subjectNr = {decisions_table}.subjectNr
WHERE {decisions_table}.session_index = '{session_index}'
"""
cursor.execute(query)
# Fetch all rows and convert to a list of dictionaries
rows = cursor.fetchall()
result = []
for row in rows:
d = {}
for i, col in enumerate(cursor.description):
d[col[0]] = row[i]
result.append(d)
# Convert the list of dictionaries to JSON
personalized_result = json.dumps(result)
print(personalized_result)
# Close the database connection
conn.close()
elif "print data" in history[-1][0]:
personalized_data = fetch_personalized_data()
print(personalized_data)
bot_message = "Data printed."
else:
bot_message = random.choice(["Yes", "No"])
history[-1][1] = bot_message
time.sleep(1)
return history
msg.submit(user, inputs=[msg, url_params, chatbot], outputs=[msg, chatbot], queue=False).then(
fn=bot, inputs=[chatbot, url_params], outputs=[chatbot]
)
clear.click(lambda: None, None, chatbot, queue=False)
demo.load(
fn=lambda x: x,
inputs=[url_params],
outputs=[url_params],
_js=get_window_url_params,
queue=False
)
# Connect to the database
conn = mysql.connector.connect(
host="18.153.94.89",
user="root",
password="N12RXMKtKxRj",
database="lionessdb"
)
# Create a cursor object
cursor = conn.cursor()
# Replace the placeholders with your actual database and table names
core_table = "e5390g37096_core"
decisions_table = "e5390g37096_decisions"
session_index = eb3636167d3a63fbeee32934610e5b2f
print(session_index)
playerNr = "1"
# Query to fetch relevant data from both tables based on session_index
query = f"""
SELECT
{core_table}.playerNr,
{core_table}.subjectNr,
{core_table}.onPage,
{decisions_table}.transfer1,
{decisions_table}.tripledAmount1,
{decisions_table}.keptForSelf1,
{decisions_table}.returned1,
{decisions_table}.newCreditRound2,
{decisions_table}.transfer2,
{decisions_table}.tripledAmount2,
{decisions_table}.keptForSelf2,
{decisions_table}.returned2,
{decisions_table}.resultsWithoutAI
FROM {core_table}
JOIN {decisions_table} ON
{core_table}.playerNr = {decisions_table}.playerNr AND
{core_table}.groupNr = {decisions_table}.groupNr AND
{core_table}.subjectNr = {decisions_table}.subjectNr
WHERE {decisions_table}.session_index = '{session_index}'
"""
cursor.execute(query)
# Fetch all rows and convert to a list of dictionaries
rows = cursor.fetchall()
result = []
for row in rows:
d = {}
for i, col in enumerate(cursor.description):
d[col[0]] = row[i]
result.append(d)
# Convert the list of dictionaries to JSON
personalized_result = json.dumps(result)
print(personalized_result)
# Close the database connection
conn.close()
demo.launch()