gradio_app_test / app.py
deepak6593's picture
new code
2b2b70e
import gradio as gr
import pandas as pd
from google.cloud import storage
import io
import os
import tempfile
gcs_bucket_name = "ow-stu-us-ce1-ai-platform"
# File path in GCS bucket
gcs_file_path = "deepak_6593/db.csv"
# process of getting credentials
def get_credentials():
creds_json_str = os.getenv("BOB") # get json credentials stored as a string
if creds_json_str is None:
raise ValueError("GOOGLE_APPLICATION_CREDENTIALS_JSON not found in environment")
# create a temporary file
with tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=".json") as temp:
temp.write(creds_json_str) # write in json format
temp_filename = temp.name
return temp_filename
# pass
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]= get_credentials()
# Ensure the GCS bucket exists
gcs_client = storage.Client()
gcs_bucket = gcs_client.bucket(gcs_bucket_name)
def get_data():
blob = gcs_bucket.blob(gcs_file_path)
return pd.read_csv(io.BytesIO(blob.download_as_bytes()))
with gr.Blocks() as demo:
gr.Markdown("# ๐Ÿ“ˆ DASHBOARD")
with gr.Row():
with gr.Column():
c_time2 = gr.Textbox(label="Location")
gr.Markdown("# ๐Ÿ—’๏ธ PUMP SENSOR DATA")
gr.DataFrame(get_data, every=5)
gr.Markdown("# ๐Ÿ“ˆ PRESSURE VS VIBRATION")
gr.LinePlot(get_data, every=5, x="Pressure (psi)", y="Vibration (mm/s)", x_title="PRESSURE (PSI)",y_title="VIBRATION (MM/S)", overlay_point=True, width=500, height=500,color_legend_position="bottom",)
with gr.Column():
c_time3 = gr.Textbox(label="Health Status")
gr.Markdown("# ๐Ÿ“ˆ TIME VS TEMPERATURE")
gr.LinePlot(get_data, every=5, x="time", y="Temperature (ยฐC)", y_title="TEMPERATURE(ยฐC)", overlay_point=True, width=500, height=500,color_legend_position="bottom",)
gr.Markdown("# ๐Ÿ“ˆ FLOW RATE VS LEVEL")
gr.LinePlot(get_data, every=5, x="Flow Rate (L/min)", y="Level (%)", x_title="FLOW RATE (L/MIN)",y_title="LEVEL (%)", overlay_point=True, width=500, height=500,color_legend_position="bottom",)
demo.load(lambda: get_data()['location'].iloc[-1] if not get_data().empty else None, None, c_time2, every=5)
demo.load(lambda: get_data()['pump_status'].iloc[-1] if not get_data().empty else None, None, c_time3, every=5)
demo.queue().launch()