| from fastapi import FastAPI, HTTPException |
| from pydantic import BaseModel, validator |
| import requests |
| import uvicorn |
| import firebase_admin |
| from firebase_admin import credentials, firestore |
| import logging |
| from datetime import datetime |
| import pytz |
| import os |
| from fastapi import APIRouter, HTTPException |
| import requests |
|
|
| router = APIRouter() |
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| app = FastAPI() |
|
|
| @app.get("/", include_in_schema=False) |
| async def _health_check(): |
| return {"status": "healthy"} |
|
|
| app.include_router(router) |
|
|
| |
| try: |
| cred = credentials.Certificate("serviceAccountKey.json") |
| try: |
| |
| firebase_admin.get_app() |
| except ValueError: |
| |
| firebase_admin.initialize_app(cred) |
| db = firestore.client() |
| except Exception as e: |
| logger.error(f"Failed to initialize Firebase Admin SDK: {str(e)}") |
| raise |
|
|
| class UpdateStatusRequest(BaseModel): |
| userId: str |
| medicineName: str |
| medicineTime: str |
|
|
| class Credentials(BaseModel): |
| esp32_ip: str |
| user: str |
|
|
| @validator("esp32_ip") |
| def validate_esp32_ip(cls, value): |
| |
| value = value.replace("http://", "").replace("https://", "") |
| |
| if "0.0.0.0" in value: |
| raise ValueError("Invalid ESP32 IP address: 0.0.0.0 is not a valid destination address") |
| return value |
|
|
| def convert_to_serializable(obj): |
| """Convert Firestore timestamps and other non-serializable objects to JSON-serializable format.""" |
| if isinstance(obj, firebase_admin.firestore.firestore.DocumentReference): |
| return str(obj.path) |
| if hasattr(obj, 'isoformat'): |
| return obj.isoformat() |
| if isinstance(obj, dict): |
| return {k: convert_to_serializable(v) for k, v in obj.items()} |
| if isinstance(obj, list): |
| return [convert_to_serializable(item) for item in obj] |
| return obj |
|
|
| def convert_times_to_ist(medicine_times): |
| utc = pytz.utc |
| ist = pytz.timezone("Asia/Kolkata") |
| converted_times = [] |
|
|
| for t in medicine_times: |
| try: |
| if isinstance(t, datetime): |
| utc_time = t.astimezone(utc) |
| elif isinstance(t, str): |
| if "Z" in t: |
| utc_time = datetime.fromisoformat(t.replace("Z", "+00:00")).astimezone(utc) |
| else: |
| utc_time = datetime.fromisoformat(t).astimezone(utc) |
| else: |
| logger.warning(f"[convert_times_to_ist] Unsupported type for medicineTime: {type(t)}") |
| continue |
|
|
| ist_time = utc_time.astimezone(ist) |
| converted_times.append(ist_time.strftime("%H:%M")) |
| except Exception as e: |
| logger.warning(f"[convert_times_to_ist] Error converting time '{t}': {e}") |
| return converted_times |
|
|
|
|
| @app.post("/send_medicineData") |
| async def send_medicineData(data: Credentials): |
| try: |
| logger.info(f"Received request: {data}") |
|
|
| |
| logger.info(f"Fetching medicines data for user: {data.user}") |
| doc_ref = db.collection("users").document(data.user) |
| doc = doc_ref.get() |
|
|
| if not doc.exists: |
| logger.error("User data not found") |
| raise HTTPException(status_code=404, detail={"error": "User data not found"}) |
|
|
| user_data = doc.to_dict() |
| medicines_data = user_data.get("medicines", []) |
|
|
| if not medicines_data: |
| logger.error("Medicines data not found for this user") |
| raise HTTPException(status_code=404, detail={"error": "Medicines data not found for this user"}) |
|
|
| |
| current_date = datetime.now(pytz.UTC) |
| logger.info(f"Current date (UTC): {current_date}") |
| |
| extracted_medicines = [] |
| for medicine in medicines_data: |
| end_date_raw = medicine.get("endDate") |
| if not end_date_raw: |
| logger.warning(f"Medicine missing endDate: {medicine}") |
| continue |
|
|
| try: |
| |
| if isinstance(end_date_raw, datetime): |
| end_date = end_date_raw |
| if end_date.tzinfo is None: |
| end_date = end_date.replace(tzinfo=pytz.UTC) |
| else: |
| end_date = end_date.astimezone(pytz.UTC) |
| elif isinstance(end_date_raw, str): |
| |
| end_date_str = end_date_raw.replace("at ", "").replace("\u202f", " ") |
| end_date = datetime.strptime(end_date_str, "%B %d, %Y %I:%M:%S %p UTC%z") |
| end_date = end_date.astimezone(pytz.UTC) |
| else: |
| logger.warning(f"Unsupported endDate type: {type(end_date_raw)}, value: {end_date_raw}") |
| continue |
|
|
| |
| if end_date > current_date: |
| extracted_medicines.append({ |
| "doseFrequency": medicine.get("doseFrequency"), |
| "endDate": medicine.get("endDate"), |
| "medicineNames": medicine.get("medicineNames", []), |
| "medicineTimes": convert_times_to_ist(medicine.get("medicineTimes", {})), |
| "selectedDays": medicine.get("selectedDays", []) |
| }) |
| |
| else: |
| logger.info(f"Skipping medicine with expired endDate: {end_date_raw}") |
|
|
| except ValueError as e: |
| logger.warning(f"Failed to parse endDate: {end_date_raw}, error: {str(e)}") |
| continue |
|
|
| |
| if not extracted_medicines: |
| logger.info("No medicines with endDate in the future found.") |
| return { |
| "message": "No medicines with endDate in the future found.", |
| "medicines": [] |
| } |
|
|
| |
| extracted_medicines = convert_to_serializable(extracted_medicines) |
|
|
| |
| esp32_ip = data.esp32_ip.replace("http://", "").replace("https://", "") |
| esp32_url = f"http://{esp32_ip}/credentials" |
| logger.info(f"Sending medicines data to ESP32 at {esp32_url}") |
| esp32_response = requests.post(esp32_url, json={"medicines": extracted_medicines}, timeout=20) |
| esp32_response.raise_for_status() |
|
|
| logger.info("Medicines data sent successfully") |
| return { |
| "message": "Medicines data sent successfully", |
| "medicines": [ |
| { |
| "medicineNames": med.get("medicineNames", []), |
| "medicineTimes": med.get("medicineTimes", []) |
| } |
| for med in extracted_medicines |
| ], |
| "esp32_response": esp32_response.text |
| } |
|
|
| except requests.exceptions.RequestException as e: |
| logger.error(f"Request error: {str(e)}") |
| raise HTTPException(status_code=500, detail={"error": f"Failed to connect to ESP32: {str(e)}"}) |
| except HTTPException as e: |
| raise e |
| except Exception as e: |
| logger.error(f"Unexpected error: {str(e)}", exc_info=True) |
| raise HTTPException(status_code=500, detail={"error": f"Unexpected error: {str(e)}"}) |
|
|
| def send_command(data: Credentials, command: str): |
| try: |
| esp32_ip = data.esp32_ip.replace("http://", "").replace("https://", "") |
| url = f"http://{esp32_ip}/command/{command}" |
| logger.info(f"Sending command: {command} to {url}") |
| response = requests.get(url, timeout=10) |
| response.raise_for_status() |
| return {"message": f"{command.capitalize()} command sent successfully", "esp32_response": response.text} |
| except requests.exceptions.RequestException as e: |
| raise HTTPException(status_code=500, detail={"error": f"Failed to send {command} command: {str(e)}"}) |
| |
|
|
|
|
| def ping_device(ip: str) -> bool: |
| try: |
| |
| response = os.popen(f'ping {ip}').read() |
| |
| |
| if "Request timed out" in response or "Destination host unreachable" in response: |
| return False |
| else: |
| return True |
| except Exception as e: |
| print(f"Error while pinging: {e}") |
| return False |
|
|
| |
| @app.post("/command/microcontroller") |
| def check_microcontroller(data: Credentials): |
| |
| if not ping_device(data.esp32_ip): |
| return {"status": "error", "message": f"Microcontroller at {data.esp32_ip} is not reachable."} |
| |
| |
| return {"status": "success", "message": f"Microcontroller at {data.esp32_ip} is reachable."} |
|
|
|
|
| @app.post("/command/reset") |
| def reset_device(data: Credentials): |
| return send_command(data, "reset") |
|
|
| @app.post("/command/camera") |
| def check_camera(data: Credentials): |
| return send_command(data, "camera") |
|
|
| @app.post("/command/lcd") |
| def check_lcd(data: Credentials): |
| return send_command(data, "lcd") |
|
|
| @app.post("/command/motor") |
| def check_motor(data: Credentials): |
| return send_command(data, "motor") |
|
|
| @app.post("/command/battery") |
| def check_battery(data: Credentials): |
| return send_command(data, "battery") |
|
|
| @app.post("/update_medicine_status") |
| def update_medicine_status(req: UpdateStatusRequest): |
| try: |
| current_date = datetime.now().strftime('%d-%m-%Y') |
| print(f"[INFO] Processing for user={req.userId}, medicine={req.medicineName}, time={req.medicineTime}") |
|
|
| user_ref = db.collection("users").document(req.userId) |
| user_doc = user_ref.get() |
|
|
| if not user_doc.exists: |
| raise HTTPException(status_code=404, detail="User not found") |
|
|
| user_data = user_doc.to_dict() |
| medicines = user_data.get("medicines", []) |
|
|
| print(f"[DEBUG] Total medicine entries: {len(medicines)}") |
|
|
| |
| matched_indices = [ |
| idx for idx, med in enumerate(medicines) |
| if req.medicineName in med.get("medicineNames", []) |
| ] |
|
|
| print(f"[DEBUG] Found {len(matched_indices)} medicine(s) with name match") |
|
|
| target_index = None |
|
|
| if len(matched_indices) == 1: |
| target_index = matched_indices[0] |
| else: |
| |
| for idx in matched_indices: |
| for ts in medicines[idx].get("medicineTimes", []): |
| try: |
| if hasattr(ts, "strftime"): |
| time_str = ts.strftime("%H:%M") |
| print(f"[TRACE] Comparing time: {time_str} == {req.medicineTime}") |
| if time_str == req.medicineTime: |
| target_index = idx |
| break |
| except Exception as e: |
| print(f"[ERROR] Timestamp parsing failed: {e}") |
| if target_index is not None: |
| break |
|
|
| if target_index is None: |
| raise HTTPException(status_code=404, detail="Matching medicine name and time not found.") |
|
|
| |
| print(f"[INFO] Updating index {target_index}") |
| if "status" not in medicines[target_index]: |
| medicines[target_index]["status"] = {} |
| medicines[target_index]["status"][current_date] = "taken" |
|
|
| |
| user_ref.update({"medicines": medicines}) |
| print(f"[SUCCESS] Updated medicine status at index {target_index}") |
|
|
| return {"message": f"Medicine status updated at index {target_index}"} |
|
|
| except Exception as e: |
| print(f"[ERROR] {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
| |
| class BuzzerRequest(BaseModel): |
| esp32_ip: str |
| sound_type: str |
|
|
| @app.post("/buzzer/play") |
| def play_buzzer(data: BuzzerRequest): |
| valid_sounds = ["relaxing", "soft_melody", "gentle_chime", "calm_tune", "beep_beep"] |
|
|
| if data.sound_type not in valid_sounds: |
| raise HTTPException(status_code=400, detail="Invalid sound type selected.") |
|
|
| try: |
| url = f"http://{data.esp32_ip}/play_buzzer?sound={data.sound_type}" |
| response = requests.get(url, timeout=5) |
| response.raise_for_status() |
| return { |
| "status": "success", |
| "message": f"Playing {data.sound_type} on ESP32", |
| "esp32_response": response.text |
| } |
| except requests.RequestException as e: |
| raise HTTPException(status_code=500, detail=f"Failed to send buzzer command: {e}") |
|
|
| if __name__ == "__main__": |
| import os |
| port = int(os.environ.get("PORT", 7860)) |
| uvicorn.run("app:app", host="0.0.0.0", port=port) |