from fastapi import FastAPI, Query import csv import os from threading import Thread import time import pandas as pd import tempfile from google.cloud import storage from pydantic import BaseModel app = FastAPI() data_location = None reading_thread = None stop_reading = False counter = 1 gcs_bucket_name = "ow-stu-us-ce1-ai-platform" # process of getting credentials def get_credentials(): creds_json_str = os.getenv("BOB") # get json credentials stored as a string if creds_json_str is None: raise ValueError("GOOGLE_APPLICATION_CREDENTIALS_JSON not found in environment") # create a temporary file with tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=".json") as temp: temp.write(creds_json_str) # write in json format temp_filename = temp.name return temp_filename # pass os.environ["GOOGLE_APPLICATION_CREDENTIALS"]= get_credentials() # Ensure the GCS bucket exists gcs_client = storage.Client() gcs_bucket = gcs_client.bucket(gcs_bucket_name) # File path in GCS bucket gcs_file_path = "deepak_6593/db.csv" def create_empty_csv(): # Initialize GCS client gcs_client = storage.Client() gcs_bucket = gcs_client.bucket(gcs_bucket_name) blob = gcs_bucket.blob(gcs_file_path) # Define columns for the empty DataFrame columns = [ "timestamp", "time", "Vibration (mm/s)", "Temperature (°C)", "Pressure (psi)", "Flow Rate (L/min)", "Level (%)", "Current (A)", "Humidity (%)", "Oil Quality (%)", "pump_status", "location" ] # Create an empty DataFrame with the specified columns empty_df = pd.DataFrame(columns=columns) # Convert the empty DataFrame to CSV format csv_data = empty_df.to_csv(index=False) # Delete file if it exists if blob.exists(): blob.delete() # Upload the CSV data to the GCS bucket blob.upload_from_string(csv_data, content_type="text/csv") # Create an empty CSV file before endpoint is accessed create_empty_csv() def read_csv(location="tambaram"): global stop_reading global data_location global counter blob = gcs_bucket.blob(gcs_file_path) while not stop_reading: file_path = f"https://storage.googleapis.com/dev.openweaver.com/demo/ai/{location}.csv" data = pd.read_csv(file_path) csv_data = data[0:counter].to_csv(index=False).encode('utf-8') blob.upload_from_string(csv_data, content_type='text/csv') time.sleep(5) counter += 1 def start_reading(location): global reading_thread reading_thread = Thread(target=read_csv, args=(location,)) reading_thread.start() def stop_reading_thread(): global stop_reading global reading_thread stop_reading = True if reading_thread is not None: reading_thread.join() class LocationRequest(BaseModel): location: str @app.post("/update_location/") async def update_location(location_request: LocationRequest): global reading_thread global stop_reading location = location_request.location if location not in ["tambaram", "velachery"]: return {"error": "Invalid location"} if reading_thread is not None and reading_thread.is_alive(): stop_reading_thread() stop_reading = False start_reading(location) return {"message": f"Location updated to {location}"}