botsi's picture
Update app.py
bb317cf verified
raw
history blame
6.22 kB
import gradio as gr
import time
import random
import json
import mysql.connector
import os
import csv
from datetime import datetime
from huggingface_hub import Repository, hf_hub_download
session_index = None
get_window_url_params = """
function() {
const params = new URLSearchParams(window.location.search);
const url_params = Object.fromEntries(params);
return url_params;
}
"""
get_window_session_index = """
function() {
const urlParams = new URLSearchParams(window.location.search);
const session_index = urlParams.get('session_index');
return session_index;
}
"""
DATASET_REPO_URL = "https://huggingface.co/datasets/botsi/trust-game-llama-2-7b-chat"
DATASET_REPO_ID = "botsi/trust-game-llama-2-7b-chat"
DATA_FILENAME = "your_data.csv"
DATA_FILE = os.path.join("data", DATA_FILENAME)
HF_TOKEN = os.environ.get("HF_TOKEN")
# Initialize Hugging Face Dataset Repository
try:
hf_hub_download(
repo_id=DATASET_REPO_ID,
filename=DATA_FILENAME,
force_filename=DATA_FILENAME
)
except:
print("File not found")
repo = Repository(
local_dir="data", clone_from=DATASET_REPO_URL, use_auth_token=HF_TOKEN
)
# Store Message Function
def store_message(name: str, message: str):
if name and message:
with open(DATA_FILE, "a") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=["name", "message", "time"])
writer.writerow(
{"name": name, "message": message, "time": str(datetime.now())}
)
commit_url = repo.push_to_hub()
return generate_html()
# get url params out of JavaScript function as variables and pass the session_index to fetch_personalized_data() function
with gr.Blocks() as demo:
gr.Markdown("""## Gradio send queryparam to chatbot
type `read query`
""")
url_params = gr.JSON({}, visible=False, label="URL Params")
chatbot = gr.Chatbot().style(height=500)
msg = gr.Textbox()
clear = gr.Button("Clear")
def user(user_message, url_params, history):
return "", history + [[user_message, None]]
def bot(history, url_params):
global session_index
if "read query" in history[-1][0]:
session_index = url_params.get('session_index')
bot_message = f"""
here your URL params:
{json.dumps(url_params, indent=4)}
"""
print(url_params)
elif "print data" in history[-1][0]:
personalized_data = fetch_personalized_data()
print(personalized_data)
bot_message = "Data printed."
else:
bot_message = random.choice(["Yes", "No"])
history[-1][1] = bot_message
time.sleep(1)
return history
msg.submit(user, inputs=[msg, url_params, chatbot], outputs=[msg, chatbot], queue=False).then(
fn=bot, inputs=[chatbot, url_params], outputs=[chatbot]
)
clear.click(lambda: None, None, chatbot, queue=False)
demo.load(
fn=lambda x: x,
inputs=[url_params],
outputs=[url_params],
_js=get_window_url_params,
queue=False
)
def fetch_personalized_data():
global session_index
# Connect to the database
conn = mysql.connector.connect(
host="18.153.94.89",
user="root",
password="N12RXMKtKxRj",
database="lionessdb"
)
# Create a cursor object
cursor = conn.cursor()
# Replace the placeholders with your actual database and table names
core_table = "e5390g37096_core"
decisions_table = "e5390g37096_decisions"
#session_index = "your_session_index_value"
playerNr = "1"
# Query to fetch relevant data from both tables based on session_index
query = f"""
SELECT
{core_table}.playerNr,
{core_table}.subjectNr,
{core_table}.onPage,
{decisions_table}.transfer1,
{decisions_table}.tripledAmount1,
{decisions_table}.keptForSelf1,
{decisions_table}.returned1,
{decisions_table}.newCreditRound2,
{decisions_table}.transfer2,
{decisions_table}.tripledAmount2,
{decisions_table}.keptForSelf2,
{decisions_table}.returned2,
{decisions_table}.resultsWithoutAI
FROM {core_table}
JOIN {decisions_table} ON
{core_table}.playerNr = {decisions_table}.playerNr AND
{core_table}.groupNr = {decisions_table}.groupNr AND
{core_table}.subjectNr = {decisions_table}.subjectNr
WHERE {decisions_table}.session_index = '{session_index}'
"""
cursor.execute(query)
# Fetch all rows and convert to a list of dictionaries
rows = cursor.fetchall()
result = []
for row in rows:
d = {}
for i, col in enumerate(cursor.description):
d[col[0]] = row[i]
result.append(d)
# Convert the list of dictionaries to JSON
personalized_result = json.dumps(result)
# Close the database connection
conn.close()
# Return the JSON result
return personalized_result
# Call the function to fetch data as JSON
personalized_data = fetch_personalized_data()
# Print or use the json_result variable as needed
print(personalized_data)
demo.launch()
'''
def bot(history, url_params):
global session_index
if "read query" in history[-1][0]:
session_index = url_params.get('session_index')
bot_message = f"""
here your URL params:
{json.dumps(url_params, indent=4)}
"""
print(url_params)
else:
bot_message = random.choice(["Yes", "No"])
history[-1][1] = bot_message
time.sleep(1)
return history'''
'''
WHERE {decisions_table}.session_index = '{session_index}'
# Fetch the results
results = cursor.fetchall()
# Process the results as needed
for row in results:
print(row)
# Close the cursor and connection when done
cursor.close()
conn.close()
return results
'''