| import streamlit as st | |
| import pandas as pd | |
| import os | |
| import re | |
| import json | |
| from PIL import Image | |
| from datetime import datetime | |
| from google.cloud import vision | |
| from google.oauth2 import service_account | |
| st.set_page_config(layout="wide") | |
| import database as db | |
| GCP_SERVICE_ACCOUNT_JSON = os.getenv("GCP_SERVICE_ACCOUNT_JSON") | |
| def get_vision_client(): | |
| if not GCP_SERVICE_ACCOUNT_JSON: | |
| raise Exception("GCP service account JSON is missing. Check your environment variables.") | |
| credentials_dict = json.loads(GCP_SERVICE_ACCOUNT_JSON) | |
| credentials = service_account.Credentials.from_service_account_info(credentials_dict) | |
| return vision.ImageAnnotatorClient(credentials=credentials) | |
| client = get_vision_client() | |
| def parse_description(description): | |
| """ | |
| A naive regex approach that grabs lines containing $XX.XX | |
| and pairs them with the line above as the item name. | |
| """ | |
| lines = description.split("\n") | |
| items = [] | |
| price_pattern = r"\$(\d+\.\d{2})" | |
| for i in range(1, len(lines)): | |
| line = lines[i].strip() | |
| prev_line = lines[i - 1].strip() | |
| match = re.search(price_pattern, line) | |
| if match: | |
| item_name = prev_line | |
| price = float(match.group(1)) | |
| items.append({"name": item_name, "price": price}) | |
| return items | |
| def extract_invoice_data(image_data: bytes): | |
| """ | |
| Calls Google Vision to detect text from the uploaded image bytes, | |
| then uses parse_description to produce a list of item dicts. | |
| """ | |
| image = vision.Image(content=image_data) | |
| response = client.text_detection(image=image) | |
| if response.error.message: | |
| raise Exception(f"Vision API Error: {response.error.message}") | |
| response_dict = vision.AnnotateImageResponse.to_dict(response) | |
| annotations = response_dict.get("text_annotations", []) | |
| if not annotations: | |
| return [] | |
| description = annotations[0]["description"] | |
| return parse_description(description) | |
| def load_existing_data(): | |
| if 'logged_in' not in st.session_state: | |
| st.session_state['logged_in'] = False | |
| if 'email' not in st.session_state: | |
| st.session_state['email'] = None | |
| if 'section' not in st.session_state: | |
| st.session_state['section'] = None | |
| if 'participants' not in st.session_state: | |
| st.session_state['participants'] = [] | |
| if 'items' not in st.session_state: | |
| st.session_state['items'] = [] | |
| if 'selected_items' not in st.session_state: | |
| st.session_state['selected_items'] = [] | |
| if 'submitted_items' not in st.session_state: | |
| st.session_state['submitted_items'] = [] | |
| if 'submitted' not in st.session_state: | |
| st.session_state['submitted'] = False | |
| if 'sections_loaded' not in st.session_state: | |
| st.session_state['sections_loaded'] = False | |
| if 'existing_sections' not in st.session_state: | |
| st.session_state['existing_sections'] = [] | |
| def load_section_from_db(section_name): | |
| owner_email = st.session_state['email'] | |
| section_doc = db.get_section(owner_email, section_name) | |
| if section_doc: | |
| st.session_state['section'] = section_name | |
| st.session_state['participants'] = section_doc['participants'] | |
| st.session_state['items'] = [] | |
| st.session_state['selected_items'] = [] | |
| st.session_state['submitted_items'] = [] | |
| st.session_state['submitted'] = False | |
| def save_section_to_db(section_name, participants): | |
| owner_email = st.session_state['email'] | |
| db.update_section(owner_email, section_name, participants) | |
| def update_submitted_items(): | |
| owner_email = st.session_state['email'] | |
| section_name = st.session_state['section'] | |
| st.session_state['submitted_items'] = db.get_submitted_items(owner_email, section_name) | |
| def get_most_bought_item(): | |
| owner_email = st.session_state['email'] | |
| section_name = st.session_state['section'] | |
| return db.get_most_bought_item(owner_email, section_name) | |
| def main(): | |
| load_existing_data() | |
| st.title("EzSplit - Scan. Split. Quit Arguing.") | |
| if not st.session_state['logged_in']: | |
| tab1, tab2 = st.tabs(["Login", "Sign Up"]) | |
| with tab1: | |
| st.header("Login") | |
| email = st.text_input("Email") | |
| password = st.text_input("Password", type="password") | |
| if st.button("Login"): | |
| user_doc = db.get_user_by_email_and_password(email, password) | |
| if user_doc: | |
| st.session_state['logged_in'] = True | |
| st.session_state['email'] = email | |
| st.rerun() | |
| else: | |
| st.error("Invalid email or password") | |
| with tab2: | |
| st.header("Sign Up") | |
| signup_email = st.text_input("New Email") | |
| signup_password = st.text_input("New Password", type="password") | |
| confirm_password = st.text_input("Confirm Password", type="password") | |
| if st.button("Sign Up"): | |
| if signup_password == confirm_password: | |
| try: | |
| db.create_user(signup_email, signup_password) | |
| st.success("Sign up successful! Please log in.") | |
| except ValueError: | |
| st.error("Email already exists") | |
| else: | |
| st.error("Passwords do not match") | |
| else: | |
| with st.sidebar: | |
| st.write(f"Welcome, {st.session_state['email']}") | |
| if st.button("Sign Out"): | |
| st.session_state.clear() | |
| st.rerun() | |
| st.header("Billing Sections") | |
| if not st.session_state['sections_loaded']: | |
| owner_email = st.session_state['email'] | |
| existing_sections = db.get_all_sections(owner_email) | |
| st.session_state['sections_loaded'] = True | |
| st.session_state['existing_sections'] = existing_sections | |
| section_name = st.radio("Select a section", | |
| ["Create New"] + st.session_state['existing_sections']) | |
| if section_name == "Create New": | |
| new_section_name = st.text_input("New Section Name") | |
| participants_str = st.text_area("Participants (comma-separated)") | |
| if st.button("Create Section"): | |
| try: | |
| participants_list = [p.strip() for p in participants_str.split(",") if p.strip()] | |
| db.create_section(st.session_state['email'], new_section_name, participants_list) | |
| st.session_state['section'] = new_section_name | |
| st.session_state['participants'] = participants_list | |
| st.session_state['items'] = [] | |
| st.session_state['selected_items'] = [] | |
| st.session_state['submitted_items'] = [] | |
| st.session_state['submitted'] = False | |
| st.session_state['existing_sections'].append(new_section_name) | |
| st.rerun() | |
| except ValueError as e: | |
| st.error(str(e)) | |
| else: | |
| if st.button("Load Section"): | |
| load_section_from_db(section_name) | |
| update_submitted_items() | |
| st.rerun() | |
| if st.button("Delete Section"): | |
| db.delete_section(st.session_state['email'], section_name) | |
| st.session_state['existing_sections'].remove(section_name) | |
| if st.session_state['section'] == section_name: | |
| st.session_state['section'] = None | |
| st.session_state['participants'] = [] | |
| st.session_state['items'] = [] | |
| st.session_state['selected_items'] = [] | |
| st.session_state['submitted_items'] = [] | |
| st.session_state['submitted'] = False | |
| st.rerun() | |
| if st.session_state['section'] and st.session_state['participants']: | |
| col1, col2 = st.columns([3, 1]) | |
| with col1: | |
| st.subheader(f"Section: {st.session_state['section']}") | |
| st.write("Participants: " + ", ".join(st.session_state['participants'])) | |
| st.subheader("Bill Image (OCR)") | |
| uploaded_file = st.file_uploader("Upload a bill image (jpg, jpeg, png)", | |
| type=["jpg", "jpeg", "png"]) | |
| if uploaded_file is not None: | |
| if st.button("Extract Items via OCR"): | |
| try: | |
| image_data = uploaded_file.read() | |
| ocr_items = extract_invoice_data(image_data) | |
| if ocr_items: | |
| st.session_state['items'].extend(ocr_items) | |
| st.success("OCR extraction successful. Items appended.") | |
| else: | |
| st.warning("No items found in the extracted text.") | |
| except Exception as e: | |
| st.error(f"OCR failed: {e}") | |
| st.subheader("Manual Item Entry") | |
| manual_item_name = st.text_input("Item Name", key="manual_item_name") | |
| manual_item_price = st.number_input("Item Price", min_value=0.0, step=0.01, key="manual_item_price") | |
| if st.button("Add Item Manually"): | |
| if manual_item_name.strip(): | |
| st.session_state['items'].append({ | |
| "name": manual_item_name.strip(), | |
| "price": manual_item_price | |
| }) | |
| st.success(f"Added '{manual_item_name}' at ${manual_item_price:.2f}") | |
| else: | |
| st.warning("Please provide a non-empty item name.") | |
| st.subheader("Available Items") | |
| if st.session_state['items']: | |
| st.write([f"{itm['name']} (${itm['price']})" for itm in st.session_state['items']]) | |
| else: | |
| st.write("No items found yet.") | |
| if st.session_state['items']: | |
| not_submitted = [ | |
| itm["name"] for itm in st.session_state['items'] | |
| if itm["name"] not in st.session_state['submitted_items'] | |
| ] | |
| selected_items = st.multiselect("Select Items to Tag", not_submitted, key="item_select") | |
| if st.button("Tag Selected Items"): | |
| st.session_state['selected_items'] = selected_items | |
| st.session_state['submitted'] = True | |
| if st.session_state['submitted'] and st.session_state['selected_items']: | |
| chosen_participant = st.selectbox("Participant to Assign Items", | |
| st.session_state['participants'], | |
| key="participant_select") | |
| if st.button("Confirm Assignment"): | |
| for it in st.session_state['items']: | |
| if it['name'] in st.session_state['selected_items']: | |
| db.create_bill( | |
| owner_email=st.session_state['email'], | |
| section_name=st.session_state['section'], | |
| participant=chosen_participant, | |
| item=it['name'], | |
| price=it['price'] | |
| ) | |
| st.session_state['submitted_items'].append(it['name']) | |
| st.success("Items assigned successfully!") | |
| st.session_state['submitted'] = False | |
| st.session_state['selected_items'] = [] | |
| st.subheader("Billing History") | |
| history = db.get_billing_history(st.session_state['email'], st.session_state['section']) | |
| if history: | |
| data_list = [] | |
| for row in history: | |
| data_list.append({ | |
| "Participant": row["_id"], | |
| "Total Price": row["total_price"], | |
| "Last Updated": row["last_updated"] | |
| }) | |
| df = pd.DataFrame(data_list) | |
| st.dataframe(df) | |
| else: | |
| st.write("No bills recorded yet.") | |
| with col2: | |
| st.subheader("Most Bought Item") | |
| top_item = get_most_bought_item() | |
| if top_item: | |
| item_name, count_val, price_val = top_item | |
| st.write(f"Item: {item_name}, Count: {count_val}, Price: ${price_val}") | |
| else: | |
| st.write("No items purchased yet.") | |
| st.subheader("Manage Participants") | |
| new_participant = st.text_input("Add Participant") | |
| if st.button("Add Participant"): | |
| if new_participant.strip(): | |
| st.session_state['participants'].append(new_participant.strip()) | |
| save_section_to_db(st.session_state['section'], st.session_state['participants']) | |
| st.rerun() | |
| remove_part = st.selectbox("Remove Participant", st.session_state['participants'], key="remove_participant") | |
| if st.button("Remove Participant"): | |
| if remove_part: | |
| st.session_state['participants'].remove(remove_part) | |
| save_section_to_db(st.session_state['section'], st.session_state['participants']) | |
| db.remove_items( | |
| st.session_state['email'], | |
| st.session_state['section'], | |
| remove_part, | |
| items_to_remove=None | |
| ) | |
| st.rerun() | |
| st.subheader("Remove Wrongly Tagged Items") | |
| part_to_remove_from = st.selectbox("Select Participant", | |
| st.session_state['participants'], | |
| key="remove_participant_select") | |
| if part_to_remove_from: | |
| pipeline = [ | |
| {"$match": { | |
| "owner_email": st.session_state['email'], | |
| "section_name": st.session_state['section'], | |
| "participant": part_to_remove_from | |
| }} | |
| ] | |
| results = list(db.bills_coll.aggregate(pipeline)) | |
| if results: | |
| tagged_items = [doc["item"] for doc in results] | |
| remove_items_select = st.multiselect("Select Items to Remove", | |
| tagged_items, | |
| key="remove_item_select") | |
| if st.button("Remove Items"): | |
| db.remove_items( | |
| st.session_state['email'], | |
| st.session_state['section'], | |
| part_to_remove_from, | |
| remove_items_select | |
| ) | |
| for itm in remove_items_select: | |
| if itm in st.session_state['submitted_items']: | |
| st.session_state['submitted_items'].remove(itm) | |
| st.success("Items removed successfully!") | |
| st.rerun() | |
| else: | |
| st.write("No items found for this participant.") | |
| if __name__ == "__main__": | |
| main() | |