Kilos1's picture
Update app.py
d546965 verified
#!pip install sqlite3 # Install the sqlite3 library if not already installed
import shutil
import sqlite3
# Define the database file path
DB_FILE = "./reviews.db"
# Connect to the SQLite database
db = sqlite3.connect(DB_FILE)
# Attempt to create the 'reviews' table if it doesn't exist
try:
# Try to select all rows from the 'reviews' table
db.execute("SELECT * FROM reviews").fetchall()
# Close the database connection if the table exists
db.close()
except sqlite3.OperationalError:
# If the table doesn't exist, create it
db.execute(
'''
CREATE TABLE reviews (id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
name TEXT, review INTEGER, comments TEXT)
'''
)
# Commit the changes to the database
db.commit()
# Close the database connection
db.close()
# Function to retrieve the latest reviews from the database
def get_latest_reviews(db: sqlite3.Connection):
# Execute a query to get the 10 most recent reviews, ordered by id descending
reviews = db.execute("SELECT * FROM reviews ORDER BY id DESC limit 10").fetchall()
# Get the total number of reviews in the database
total_reviews = db.execute("Select COUNT(id) from reviews").fetchone()[0]
# Convert the reviews to a pandas DataFrame for easier handling
reviews = pd.DataFrame(
reviews,
columns=["id", "date_created", "name", "review", "comments"]
)
# Return the reviews DataFrame and the total number of reviews
return reviews, total_reviews
# Function to add a new review to the database
def add_review(name: str, review: int, comments: str):
# Connect to the database
db = sqlite3.connect(DB_FILE)
# Create a cursor object to execute SQL commands
cursor = db.cursor()
# Insert the new review into the database
cursor.execute("INSERT INTO reviews(name, review, comments) VALUES(?,?,?)",
[name, review, comments])
# Commit the changes to the database
db.commit()
# Retrieve the updated list of reviews and total count
reviews, total_reviews = get_latest_reviews(db)
# Close the database connection
db.close()
# Return the updated reviews and total count
return reviews, total_reviews
def load_data():
db = sqlite3.connect(DB_FILE)
reviews, total_reviews = get_latest_reviews(db)
db.close()
return reviews, total_reviews
import gradio as gr
# Create a Gradio Blocks interface
with gr.Blocks() as demo:
# Create a row to organize elements horizontally
with gr.Row():
# Create a column for input elements
with gr.Column():
# Create a text input for the user's name
name = gr.Textbox(label="Name", placeholder="What is your name?")
# Create a radio button group for rating satisfaction
review = gr.Radio(label="How satisfied are you with using gradio?",
choices=[1, 2, 3, 4, 5])
# Create a multi-line text input for comments
comments = gr.Textbox(
label="Comments",
lines=10,
placeholder="Do you have any feedback on gradio?"
)
# Create a submit button
submit = gr.Button(value="Submit Feedback")
# Create a column for output elements
with gr.Column():
# Create a dataframe to display the most recent 10 reviews
data = gr.Dataframe(label="Most recently created 10 rows")
# Create a number display for the total review count
count = gr.Number(label="Total number of reviews")
# Define the action when the submit button is clicked
submit.click(add_review,
[name, review, comments],
[data, count])
# Define the action when the demo is loaded
demo.load(load_data, None, [data, count])
# Retrieve the Hugging Face Hub token from environment variables
import huggingface_hub
import os
secret_value = os.getenv("openai_api_key")
TOKEN = secret_value
# Create a Repository object for interacting with a Hugging Face dataset
repo = huggingface_hub.Repository(
# Specify the local directory where the repository will be cloned
local_dir="data",
# Set the repository type to "dataset"
repo_type="dataset",
# Specify the name of the dataset to clone from Hugging Face Hub
clone_from="Kilos1/my-reviews",
# Use the authentication token for accessing the repository
use_auth_token=TOKEN
)
# Pull the latest changes from the remote repository
repo.git_pull()
# Check if the file exists in the expected location
import os
if os.path.exists("./data/reviews.db"):
# Copy the reviews database file from the cloned repository to the local DB_FILE location
shutil.copyfile("./data/reviews.db", DB_FILE)
else:
print("File 'reviews.db' not found in the repository. Please check the file path.")
# If the file is not in the expected location, you may need to adjust the path
# based on its actual location in the repository.
# For example, if the file is in a subdirectory called 'database', you would use:
# shutil.copyfile("./data/database/reviews.db", DB_FILE)
# Import the BackgroundScheduler from APScheduler library
from apscheduler.schedulers.background import BackgroundScheduler
import pandas as pd
import datetime
# Define a function to backup the database
def backup_db():
# Copy the current database file to the data directory
shutil.copyfile(DB_FILE, "./data/reviews.db")
# Connect to the database
db = sqlite3.connect(DB_FILE)
# Fetch all reviews from the database
reviews = db.execute("SELECT * FROM reviews").fetchall()
# Convert the reviews to a pandas DataFrame and save as CSV
pd.DataFrame(reviews).to_csv("./data/reviews.csv", index=False)
# Print a message indicating the update is in progress
print("updating db")
# Push the updated data to the Hugging Face Hub
repo.push_to_hub(blocking=False,
commit_message=f"Updating data at {datetime.datetime.now()}")
# Create a BackgroundScheduler instance
scheduler = BackgroundScheduler()
# Add a job to run the backup_db function every 60 seconds
scheduler.add_job(func=backup_db,
trigger="interval",
seconds=60)
# Start the scheduler
scheduler.start()
demo.launch()