Spaces:
Running
Running
| import streamlit as st | |
| import requests | |
| from pydantic import BaseModel | |
| from typing import Optional, List | |
| # Define the Customer model, mirroring the one in main.py | |
| class Customer(BaseModel): | |
| name: str | |
| email: str | |
| phone: Optional[int] = None | |
| address: Optional[str] = None | |
| id: int | |
| # FastAPI backend URL | |
| BASE_URL = "https://vishnugupta-enrollmentapi.hf.space" | |
| def get_customers(): | |
| """Fetches all customers from the backend.""" | |
| try: | |
| response = requests.get(f"{BASE_URL}/customers") | |
| response.raise_for_status() # Raise an exception for bad status codes | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| st.error(f"Error fetching customers: {e}") | |
| return [] | |
| def create_customer(customer: Customer): | |
| """Creates a new customer.""" | |
| try: | |
| response = requests.post(f"{BASE_URL}/customers", json=customer.dict()) | |
| response.raise_for_status() | |
| st.success("Customer created successfully!") | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| st.error(f"Error creating customer: {e}") | |
| return None | |
| def update_customer(customer_id: int, customer: Customer): | |
| """Updates an existing customer.""" | |
| try: | |
| response = requests.put(f"{BASE_URL}/customer/{customer_id}", json=customer.dict()) | |
| response.raise_for_status() | |
| st.success("Customer updated successfully!") | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| st.error(f"Error updating customer: {e}") | |
| return None | |
| def delete_customer(customer_id: int): | |
| """Deletes a customer.""" | |
| try: | |
| response = requests.delete(f"{BASE_URL}/customer/{customer_id}") | |
| response.raise_for_status() | |
| st.success("Customer deleted successfully!") | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| st.error(f"Error deleting customer: {e}") | |
| return None | |
| def main(): | |
| """Main function to run the Streamlit app.""" | |
| st.set_page_config(page_title="CRM Dashboard", layout="wide") | |
| st.title("Customer Relationship Management (CRM)") | |
| menu = ["View Customers", "Add Customer", "Update Customer", "Delete Customer"] | |
| choice = st.sidebar.selectbox("Menu", menu) | |
| if choice == "View Customers": | |
| st.subheader("All Customers") | |
| customers = get_customers() | |
| if customers: | |
| st.table(customers) | |
| else: | |
| st.info("No customers found.") | |
| elif choice == "Add Customer": | |
| st.subheader("Add a New Customer") | |
| with st.form("add_customer_form"): | |
| name = st.text_input("Name") | |
| email = st.text_input("Email") | |
| phone = st.number_input("Phone", value=0, format="%d") | |
| address = st.text_area("Address") | |
| customer_id = len(requests.get(f"{BASE_URL}/customers").json())+1 | |
| submitted = st.form_submit_button("Add Customer") | |
| if submitted: | |
| new_customer = Customer(name=name, email=email, phone=phone, address=address, id=customer_id) | |
| create_customer(new_customer) | |
| elif choice == "Update Customer": | |
| st.subheader("Update a Customer") | |
| customers = get_customers() | |
| if customers: | |
| customer_ids = [c["id"] for c in customers] | |
| selected_id = st.selectbox("Select Customer ID to Update", customer_ids) | |
| selected_customer = next((c for c in customers if c["id"] == selected_id), None) | |
| if selected_customer: | |
| with st.form("update_customer_form"): | |
| name = st.text_input("Name", value=selected_customer["name"]) | |
| email = st.text_input("Email", value=selected_customer["email"]) | |
| phone = st.number_input("Phone", value=selected_customer.get("phone") or 0, format="%d") | |
| address = st.text_area("Address", value=selected_customer.get("address", "")) | |
| updated = st.form_submit_button("Update Customer") | |
| if updated: | |
| updated_customer = Customer(name=name, email=email, phone=phone, address=address, id=selected_id) | |
| update_customer(selected_id, updated_customer) | |
| else: | |
| st.warning("Selected customer not found.") | |
| else: | |
| st.info("No customers to update.") | |
| elif choice == "Delete Customer": | |
| st.subheader("Delete a Customer") | |
| customers = get_customers() | |
| if customers: | |
| customer_ids = [c["id"] for c in customers] | |
| selected_id = st.selectbox("Select Customer ID to Delete", customer_ids) | |
| selected_customer = next((c for c in customers if c["id"] == selected_id), None) | |
| if st.button("Delete Customer"): | |
| delete_customer(selected_id) | |
| else: | |
| st.info("No customers to delete.") | |
| if __name__ == "__main__": | |
| main() | |