mlAPI_test / test.py
deepak6593's picture
new code
03dc0ec
from fastapi import FastAPI, Query
import csv
import os
from threading import Thread
import time
import pandas as pd
import tempfile
from google.cloud import storage
app = FastAPI()
data_location = None
reading_thread = None
stop_reading = False
counter = 0
gcs_bucket_name = "ow-stu-us-ce1-ai-platform"
# process of getting credentials
def get_credentials():
creds_json_str = os.getenv("BOB") # get json credentials stored as a string
if creds_json_str is None:
raise ValueError("GOOGLE_APPLICATION_CREDENTIALS_JSON not found in environment")
# create a temporary file
with tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=".json") as temp:
temp.write(creds_json_str) # write in json format
temp_filename = temp.name
return temp_filename
# pass
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]= get_credentials()
# Ensure the GCS bucket exists
gcs_client = storage.Client()
gcs_bucket = gcs_client.bucket(gcs_bucket_name)
# File path in GCS bucket
gcs_file_path = "deepak_6593/db.csv"
def read_csv(location="chennai"):
global stop_reading
global data_location
global counter
blob = gcs_bucket.blob(gcs_file_path)
while not stop_reading:
file_path = f"https://storage.googleapis.com/dev.openweaver.com/demo/ai/{location}.csv"
data = pd.read_csv(file_path)
csv_data = data[0:counter].to_csv(index=False).encode('utf-8')
blob.upload_from_string(csv_data, content_type='text/csv')
time.sleep(5)
counter += 1
def start_reading(location):
global reading_thread
reading_thread = Thread(target=read_csv, args=(location,))
reading_thread.start()
def stop_reading_thread():
global stop_reading
global reading_thread
stop_reading = True
if reading_thread is not None:
reading_thread.join()
@app.get("/update_location/")
async def update_location(location: str = Query("tambaram")):
global reading_thread
global stop_reading
if location not in ["tambaram", "velachery"]:
return {"error": "Invalid location"}
if reading_thread is not None and reading_thread.is_alive():
stop_reading_thread()
stop_reading = False
start_reading(location)
return {"message": f"Location updated to {location}"}
# if __name__ == "__main__":
# import uvicorn
# uvicorn.run(app, host="0.0.0.0", port=8080)