api / app.py
RishiXD's picture
Update app.py
aff4090 verified
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, validator
import requests
import uvicorn
import firebase_admin
from firebase_admin import credentials, firestore
import logging
from datetime import datetime
import pytz
import os
from fastapi import APIRouter, HTTPException
import requests
router = APIRouter()
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
@app.get("/", include_in_schema=False)
async def _health_check():
return {"status": "healthy"}
app.include_router(router)
# Initialize Firebase Admin SDK
try:
cred = credentials.Certificate("serviceAccountKey.json")
try:
# if an app is already initialized, this will succeed
firebase_admin.get_app()
except ValueError:
# otherwise initialize it exactly once
firebase_admin.initialize_app(cred)
db = firestore.client()
except Exception as e:
logger.error(f"Failed to initialize Firebase Admin SDK: {str(e)}")
raise
class UpdateStatusRequest(BaseModel):
userId: str
medicineName: str
medicineTime: str # Expected format: "HH:MM"
class Credentials(BaseModel):
esp32_ip: str
user: str # Changed from email to user
@validator("esp32_ip")
def validate_esp32_ip(cls, value):
# Remove http:// or https:// if present
value = value.replace("http://", "").replace("https://", "")
# Check for invalid IP like 0.0.0.0
if "0.0.0.0" in value:
raise ValueError("Invalid ESP32 IP address: 0.0.0.0 is not a valid destination address")
return value
def convert_to_serializable(obj):
"""Convert Firestore timestamps and other non-serializable objects to JSON-serializable format."""
if isinstance(obj, firebase_admin.firestore.firestore.DocumentReference):
return str(obj.path)
if hasattr(obj, 'isoformat'): # Handles DatetimeWithNanoseconds
return obj.isoformat()
if isinstance(obj, dict):
return {k: convert_to_serializable(v) for k, v in obj.items()}
if isinstance(obj, list):
return [convert_to_serializable(item) for item in obj]
return obj
def convert_times_to_ist(medicine_times):
utc = pytz.utc
ist = pytz.timezone("Asia/Kolkata")
converted_times = []
for t in medicine_times:
try:
if isinstance(t, datetime):
utc_time = t.astimezone(utc)
elif isinstance(t, str):
if "Z" in t:
utc_time = datetime.fromisoformat(t.replace("Z", "+00:00")).astimezone(utc)
else:
utc_time = datetime.fromisoformat(t).astimezone(utc)
else:
logger.warning(f"[convert_times_to_ist] Unsupported type for medicineTime: {type(t)}")
continue
ist_time = utc_time.astimezone(ist)
converted_times.append(ist_time.strftime("%H:%M")) # HH:MM format
except Exception as e:
logger.warning(f"[convert_times_to_ist] Error converting time '{t}': {e}")
return converted_times
@app.post("/send_medicineData")
async def send_medicineData(data: Credentials):
try:
logger.info(f"Received request: {data}")
# Step 1: Retrieve the medicines array from Firestore using user ID
logger.info(f"Fetching medicines data for user: {data.user}")
doc_ref = db.collection("users").document(data.user)
doc = doc_ref.get()
if not doc.exists:
logger.error("User data not found")
raise HTTPException(status_code=404, detail={"error": "User data not found"})
user_data = doc.to_dict()
medicines_data = user_data.get("medicines", []) # Fetch medicines list safely
if not medicines_data:
logger.error("Medicines data not found for this user")
raise HTTPException(status_code=404, detail={"error": "Medicines data not found for this user"})
# Step 2: Extract and filter medicine details based on endDate
current_date = datetime.now(pytz.UTC) # Current date in UTC
logger.info(f"Current date (UTC): {current_date}")
extracted_medicines = []
for medicine in medicines_data:
end_date_raw = medicine.get("endDate")
if not end_date_raw:
logger.warning(f"Medicine missing endDate: {medicine}")
continue
try:
# Handle endDate (already converted to datetime by Firestore Admin SDK)
if isinstance(end_date_raw, datetime):
end_date = end_date_raw
if end_date.tzinfo is None: # If no timezone info, assume UTC
end_date = end_date.replace(tzinfo=pytz.UTC)
else:
end_date = end_date.astimezone(pytz.UTC) # Convert to UTC for comparison
elif isinstance(end_date_raw, str):
# Fallback for string format (e.g., "April 17, 2025 at 12:00:00 AM UTC+5:30")
end_date_str = end_date_raw.replace("at ", "").replace("\u202f", " ")
end_date = datetime.strptime(end_date_str, "%B %d, %Y %I:%M:%S %p UTC%z")
end_date = end_date.astimezone(pytz.UTC) # Convert to UTC for comparison
else:
logger.warning(f"Unsupported endDate type: {type(end_date_raw)}, value: {end_date_raw}")
continue
# Only include medicines with endDate in the future
if end_date > current_date:
extracted_medicines.append({
"doseFrequency": medicine.get("doseFrequency"),
"endDate": medicine.get("endDate"),
"medicineNames": medicine.get("medicineNames", []),
"medicineTimes": convert_times_to_ist(medicine.get("medicineTimes", {})),
"selectedDays": medicine.get("selectedDays", [])
})
else:
logger.info(f"Skipping medicine with expired endDate: {end_date_raw}")
except ValueError as e:
logger.warning(f"Failed to parse endDate: {end_date_raw}, error: {str(e)}")
continue
# Step 3: Check if there are any valid medicines to send
if not extracted_medicines:
logger.info("No medicines with endDate in the future found.")
return {
"message": "No medicines with endDate in the future found.",
"medicines": []
}
# Convert any non-serializable objects (e.g., timestamps)
extracted_medicines = convert_to_serializable(extracted_medicines)
# Step 4: Send the medicines data to ESP32
esp32_ip = data.esp32_ip.replace("http://", "").replace("https://", "")
esp32_url = f"http://{esp32_ip}/credentials"
logger.info(f"Sending medicines data to ESP32 at {esp32_url}")
esp32_response = requests.post(esp32_url, json={"medicines": extracted_medicines}, timeout=20)
esp32_response.raise_for_status()
logger.info("Medicines data sent successfully")
return {
"message": "Medicines data sent successfully",
"medicines": [
{
"medicineNames": med.get("medicineNames", []),
"medicineTimes": med.get("medicineTimes", [])
}
for med in extracted_medicines
],
"esp32_response": esp32_response.text
}
except requests.exceptions.RequestException as e:
logger.error(f"Request error: {str(e)}")
raise HTTPException(status_code=500, detail={"error": f"Failed to connect to ESP32: {str(e)}"})
except HTTPException as e:
raise e
except Exception as e:
logger.error(f"Unexpected error: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail={"error": f"Unexpected error: {str(e)}"})
def send_command(data: Credentials, command: str):
try:
esp32_ip = data.esp32_ip.replace("http://", "").replace("https://", "")
url = f"http://{esp32_ip}/command/{command}"
logger.info(f"Sending command: {command} to {url}")
response = requests.get(url, timeout=10)
response.raise_for_status()
return {"message": f"{command.capitalize()} command sent successfully", "esp32_response": response.text}
except requests.exceptions.RequestException as e:
raise HTTPException(status_code=500, detail={"error": f"Failed to send {command} command: {str(e)}"})
def ping_device(ip: str) -> bool:
try:
# Run the ping command and capture the output
response = os.popen(f'ping {ip}').read()
# Check if "Request timed out" or "Destination host unreachable" appears in the response
if "Request timed out" in response or "Destination host unreachable" in response:
return False
else:
return True
except Exception as e:
print(f"Error while pinging: {e}")
return False
# Endpoint to check the microcontroller's availability
@app.post("/command/microcontroller")
def check_microcontroller(data: Credentials):
# First, check if the ESP32 is reachable using the ping_device function
if not ping_device(data.esp32_ip):
return {"status": "error", "message": f"Microcontroller at {data.esp32_ip} is not reachable."}
# If reachable, return a success message without sending any command to the ESP32
return {"status": "success", "message": f"Microcontroller at {data.esp32_ip} is reachable."}
@app.post("/command/reset")
def reset_device(data: Credentials):
return send_command(data, "reset")
@app.post("/command/camera")
def check_camera(data: Credentials):
return send_command(data, "camera")
@app.post("/command/lcd")
def check_lcd(data: Credentials):
return send_command(data, "lcd")
@app.post("/command/motor")
def check_motor(data: Credentials):
return send_command(data, "motor")
@app.post("/command/battery")
def check_battery(data: Credentials):
return send_command(data, "battery")
@app.post("/update_medicine_status")
def update_medicine_status(req: UpdateStatusRequest):
try:
current_date = datetime.now().strftime('%d-%m-%Y')
print(f"[INFO] Processing for user={req.userId}, medicine={req.medicineName}, time={req.medicineTime}")
user_ref = db.collection("users").document(req.userId)
user_doc = user_ref.get()
if not user_doc.exists:
raise HTTPException(status_code=404, detail="User not found")
user_data = user_doc.to_dict()
medicines = user_data.get("medicines", [])
print(f"[DEBUG] Total medicine entries: {len(medicines)}")
# First, find all matches by name
matched_indices = [
idx for idx, med in enumerate(medicines)
if req.medicineName in med.get("medicineNames", [])
]
print(f"[DEBUG] Found {len(matched_indices)} medicine(s) with name match")
target_index = None
if len(matched_indices) == 1:
target_index = matched_indices[0]
else:
# Multiple matches, match by time
for idx in matched_indices:
for ts in medicines[idx].get("medicineTimes", []):
try:
if hasattr(ts, "strftime"):
time_str = ts.strftime("%H:%M")
print(f"[TRACE] Comparing time: {time_str} == {req.medicineTime}")
if time_str == req.medicineTime:
target_index = idx
break
except Exception as e:
print(f"[ERROR] Timestamp parsing failed: {e}")
if target_index is not None:
break
if target_index is None:
raise HTTPException(status_code=404, detail="Matching medicine name and time not found.")
# Update the status
print(f"[INFO] Updating index {target_index}")
if "status" not in medicines[target_index]:
medicines[target_index]["status"] = {}
medicines[target_index]["status"][current_date] = "taken"
# Push the full list back
user_ref.update({"medicines": medicines})
print(f"[SUCCESS] Updated medicine status at index {target_index}")
return {"message": f"Medicine status updated at index {target_index}"}
except Exception as e:
print(f"[ERROR] {e}")
raise HTTPException(status_code=500, detail=str(e))
class BuzzerRequest(BaseModel):
esp32_ip: str
sound_type: str # Options: relaxing, soft_melody, gentle_chime, calm_tune, beep_beep
@app.post("/buzzer/play")
def play_buzzer(data: BuzzerRequest):
valid_sounds = ["relaxing", "soft_melody", "gentle_chime", "calm_tune", "beep_beep"]
if data.sound_type not in valid_sounds:
raise HTTPException(status_code=400, detail="Invalid sound type selected.")
try:
url = f"http://{data.esp32_ip}/play_buzzer?sound={data.sound_type}"
response = requests.get(url, timeout=5)
response.raise_for_status()
return {
"status": "success",
"message": f"Playing {data.sound_type} on ESP32",
"esp32_response": response.text
}
except requests.RequestException as e:
raise HTTPException(status_code=500, detail=f"Failed to send buzzer command: {e}")
if __name__ == "__main__":
import os
port = int(os.environ.get("PORT", 7860))
uvicorn.run("app:app", host="0.0.0.0", port=port)