mlAPI_test / main.py
deepak6593's picture
new code
32d5829
from fastapi import FastAPI, Query
import csv
import os
from threading import Thread
import time
import pandas as pd
import tempfile
from google.cloud import storage
from pydantic import BaseModel
app = FastAPI()
data_location = None
reading_thread = None
stop_reading = False
counter = 1
gcs_bucket_name = "ow-stu-us-ce1-ai-platform"
# process of getting credentials
def get_credentials():
creds_json_str = os.getenv("BOB") # get json credentials stored as a string
if creds_json_str is None:
raise ValueError("GOOGLE_APPLICATION_CREDENTIALS_JSON not found in environment")
# create a temporary file
with tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=".json") as temp:
temp.write(creds_json_str) # write in json format
temp_filename = temp.name
return temp_filename
# pass
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]= get_credentials()
# Ensure the GCS bucket exists
gcs_client = storage.Client()
gcs_bucket = gcs_client.bucket(gcs_bucket_name)
# File path in GCS bucket
gcs_file_path = "deepak_6593/db.csv"
def create_empty_csv():
# Initialize GCS client
gcs_client = storage.Client()
gcs_bucket = gcs_client.bucket(gcs_bucket_name)
blob = gcs_bucket.blob(gcs_file_path)
# Define columns for the empty DataFrame
columns = [
"timestamp",
"time",
"Vibration (mm/s)",
"Temperature (°C)",
"Pressure (psi)",
"Flow Rate (L/min)",
"Level (%)",
"Current (A)",
"Humidity (%)",
"Oil Quality (%)",
"pump_status",
"location"
]
# Create an empty DataFrame with the specified columns
empty_df = pd.DataFrame(columns=columns)
# Convert the empty DataFrame to CSV format
csv_data = empty_df.to_csv(index=False)
# Delete file if it exists
if blob.exists():
blob.delete()
# Upload the CSV data to the GCS bucket
blob.upload_from_string(csv_data, content_type="text/csv")
# Create an empty CSV file before endpoint is accessed
create_empty_csv()
def read_csv(location="tambaram"):
global stop_reading
global data_location
global counter
blob = gcs_bucket.blob(gcs_file_path)
while not stop_reading:
file_path = f"https://storage.googleapis.com/dev.openweaver.com/demo/ai/{location}.csv"
data = pd.read_csv(file_path)
csv_data = data[0:counter].to_csv(index=False).encode('utf-8')
blob.upload_from_string(csv_data, content_type='text/csv')
time.sleep(5)
counter += 1
def start_reading(location):
global reading_thread
reading_thread = Thread(target=read_csv, args=(location,))
reading_thread.start()
def stop_reading_thread():
global stop_reading
global reading_thread
stop_reading = True
if reading_thread is not None:
reading_thread.join()
class LocationRequest(BaseModel):
location: str
@app.post("/update_location/")
async def update_location(location_request: LocationRequest):
global reading_thread
global stop_reading
location = location_request.location
if location not in ["tambaram", "velachery"]:
return {"error": "Invalid location"}
if reading_thread is not None and reading_thread.is_alive():
stop_reading_thread()
stop_reading = False
start_reading(location)
return {"message": f"Location updated to {location}"}