#!pip install sqlite3 # Install the sqlite3 library if not already installed import shutil import sqlite3 # Define the database file path DB_FILE = "./reviews.db" # Connect to the SQLite database db = sqlite3.connect(DB_FILE) # Attempt to create the 'reviews' table if it doesn't exist try: # Try to select all rows from the 'reviews' table db.execute("SELECT * FROM reviews").fetchall() # Close the database connection if the table exists db.close() except sqlite3.OperationalError: # If the table doesn't exist, create it db.execute( ''' CREATE TABLE reviews (id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL, name TEXT, review INTEGER, comments TEXT) ''' ) # Commit the changes to the database db.commit() # Close the database connection db.close() # Function to retrieve the latest reviews from the database def get_latest_reviews(db: sqlite3.Connection): # Execute a query to get the 10 most recent reviews, ordered by id descending reviews = db.execute("SELECT * FROM reviews ORDER BY id DESC limit 10").fetchall() # Get the total number of reviews in the database total_reviews = db.execute("Select COUNT(id) from reviews").fetchone()[0] # Convert the reviews to a pandas DataFrame for easier handling reviews = pd.DataFrame( reviews, columns=["id", "date_created", "name", "review", "comments"] ) # Return the reviews DataFrame and the total number of reviews return reviews, total_reviews # Function to add a new review to the database def add_review(name: str, review: int, comments: str): # Connect to the database db = sqlite3.connect(DB_FILE) # Create a cursor object to execute SQL commands cursor = db.cursor() # Insert the new review into the database cursor.execute("INSERT INTO reviews(name, review, comments) VALUES(?,?,?)", [name, review, comments]) # Commit the changes to the database db.commit() # Retrieve the updated list of reviews and total count reviews, total_reviews = get_latest_reviews(db) # Close the database connection db.close() # Return the updated reviews and total count return reviews, total_reviews def load_data(): db = sqlite3.connect(DB_FILE) reviews, total_reviews = get_latest_reviews(db) db.close() return reviews, total_reviews import gradio as gr # Create a Gradio Blocks interface with gr.Blocks() as demo: # Create a row to organize elements horizontally with gr.Row(): # Create a column for input elements with gr.Column(): # Create a text input for the user's name name = gr.Textbox(label="Name", placeholder="What is your name?") # Create a radio button group for rating satisfaction review = gr.Radio(label="How satisfied are you with using gradio?", choices=[1, 2, 3, 4, 5]) # Create a multi-line text input for comments comments = gr.Textbox( label="Comments", lines=10, placeholder="Do you have any feedback on gradio?" ) # Create a submit button submit = gr.Button(value="Submit Feedback") # Create a column for output elements with gr.Column(): # Create a dataframe to display the most recent 10 reviews data = gr.Dataframe(label="Most recently created 10 rows") # Create a number display for the total review count count = gr.Number(label="Total number of reviews") # Define the action when the submit button is clicked submit.click(add_review, [name, review, comments], [data, count]) # Define the action when the demo is loaded demo.load(load_data, None, [data, count]) # Retrieve the Hugging Face Hub token from environment variables import huggingface_hub import os secret_value = os.getenv("openai_api_key") TOKEN = secret_value # Create a Repository object for interacting with a Hugging Face dataset repo = huggingface_hub.Repository( # Specify the local directory where the repository will be cloned local_dir="data", # Set the repository type to "dataset" repo_type="dataset", # Specify the name of the dataset to clone from Hugging Face Hub clone_from="Kilos1/my-reviews", # Use the authentication token for accessing the repository use_auth_token=TOKEN ) # Pull the latest changes from the remote repository repo.git_pull() # Check if the file exists in the expected location import os if os.path.exists("./data/reviews.db"): # Copy the reviews database file from the cloned repository to the local DB_FILE location shutil.copyfile("./data/reviews.db", DB_FILE) else: print("File 'reviews.db' not found in the repository. Please check the file path.") # If the file is not in the expected location, you may need to adjust the path # based on its actual location in the repository. # For example, if the file is in a subdirectory called 'database', you would use: # shutil.copyfile("./data/database/reviews.db", DB_FILE) # Import the BackgroundScheduler from APScheduler library from apscheduler.schedulers.background import BackgroundScheduler import pandas as pd import datetime # Define a function to backup the database def backup_db(): # Copy the current database file to the data directory shutil.copyfile(DB_FILE, "./data/reviews.db") # Connect to the database db = sqlite3.connect(DB_FILE) # Fetch all reviews from the database reviews = db.execute("SELECT * FROM reviews").fetchall() # Convert the reviews to a pandas DataFrame and save as CSV pd.DataFrame(reviews).to_csv("./data/reviews.csv", index=False) # Print a message indicating the update is in progress print("updating db") # Push the updated data to the Hugging Face Hub repo.push_to_hub(blocking=False, commit_message=f"Updating data at {datetime.datetime.now()}") # Create a BackgroundScheduler instance scheduler = BackgroundScheduler() # Add a job to run the backup_db function every 60 seconds scheduler.add_job(func=backup_db, trigger="interval", seconds=60) # Start the scheduler scheduler.start() demo.launch()