File size: 3,420 Bytes
bd2139a
 
2cfbf68
bd2139a
 
 
a9fe782
bd2139a
2b1a623
8557221
77770c5
ad90944
bd2139a
 
 
51966dd
bd2139a
92f1b25
2cfbf68
a9fe782
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2cfbf68
a9fe782
2cfbf68
 
 
92f1b25
2cfbf68
32d5829
b4321bf
 
 
 
 
32d5829
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7bb0628
 
 
 
32d5829
 
b4321bf
32d5829
 
2cfbf68
25797e0
bd2139a
 
 
2cfbf68
bd2139a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2b1a623
 
 
 
 
bd2139a
 
 
2b1a623
 
25797e0
bd2139a
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from fastapi import FastAPI, Query
import csv
import os
from threading import Thread
import time
import pandas as pd
import tempfile
from google.cloud import storage
from pydantic import BaseModel

app = FastAPI()

data_location = None
reading_thread = None
stop_reading = False
counter = 1

gcs_bucket_name = "ow-stu-us-ce1-ai-platform"

# process of getting credentials
def get_credentials():
    creds_json_str = os.getenv("BOB") # get json credentials stored as a string
    if creds_json_str is None:
        raise ValueError("GOOGLE_APPLICATION_CREDENTIALS_JSON not found in environment")

    # create a temporary file
    with tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=".json") as temp:
        temp.write(creds_json_str) # write in json format
        temp_filename = temp.name 

    return temp_filename
    
# pass
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]= get_credentials()

# Ensure the GCS bucket exists
gcs_client = storage.Client()
gcs_bucket = gcs_client.bucket(gcs_bucket_name)

# File path in GCS bucket
gcs_file_path = "deepak_6593/db.csv"

def create_empty_csv():
    # Initialize GCS client
    gcs_client = storage.Client()
    gcs_bucket = gcs_client.bucket(gcs_bucket_name)
    blob = gcs_bucket.blob(gcs_file_path)

    # Define columns for the empty DataFrame
    columns = [
        "timestamp",
        "time",
        "Vibration (mm/s)",
        "Temperature (°C)",
        "Pressure (psi)",
        "Flow Rate (L/min)",
        "Level (%)",
        "Current (A)",
        "Humidity (%)",
        "Oil Quality (%)",
        "pump_status",
        "location"
    ]

    # Create an empty DataFrame with the specified columns
    empty_df = pd.DataFrame(columns=columns)

    # Convert the empty DataFrame to CSV format
    csv_data = empty_df.to_csv(index=False)

    # Delete file if it exists
    if blob.exists():
        blob.delete()

    # Upload the CSV data to the GCS bucket
    blob.upload_from_string(csv_data, content_type="text/csv")

# Create an empty CSV file before endpoint is accessed
create_empty_csv()

def read_csv(location="tambaram"):
    global stop_reading
    global data_location
    global counter
    blob = gcs_bucket.blob(gcs_file_path)
    while not stop_reading:
        file_path = f"https://storage.googleapis.com/dev.openweaver.com/demo/ai/{location}.csv"
        data = pd.read_csv(file_path)
        csv_data = data[0:counter].to_csv(index=False).encode('utf-8')
        blob.upload_from_string(csv_data, content_type='text/csv')
        time.sleep(5)
        counter += 1


def start_reading(location):
    global reading_thread
    reading_thread = Thread(target=read_csv, args=(location,))
    reading_thread.start()

def stop_reading_thread():
    global stop_reading
    global reading_thread
    stop_reading = True
    if reading_thread is not None:
        reading_thread.join()

class LocationRequest(BaseModel):
    location: str

@app.post("/update_location/")
async def update_location(location_request: LocationRequest):
    global reading_thread
    global stop_reading

    location = location_request.location

    if location not in ["tambaram", "velachery"]:
        return {"error": "Invalid location"}

    if reading_thread is not None and reading_thread.is_alive():
        stop_reading_thread()

    stop_reading = False
    start_reading(location)

    return {"message": f"Location updated to {location}"}