import gradio as gr import pandas as pd from difflib import SequenceMatcher import pymongo from pymongo import MongoClient import logging import os from dotenv import load_dotenv load_dotenv() logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def connect_mongodb(): """Connect to MongoDB database""" try: connection_string = os.getenv("MONGODB_CONNECTION", "mongodb://localhost:27017/") database_name = os.getenv("DATABASE_NAME", "oda_welfare") client = MongoClient(connection_string) db = client[database_name] client.admin.command('ping') return db, client except Exception as e: logger.error(f"Failed to connect to MongoDB: {e}") return None, None def load_members_data(): """Load the ODA Welfare Department members from MongoDB""" try: db, client = connect_mongodb() if db is None: return pd.DataFrame({'No': [], 'Name': [], 'Amount': [], 'Mission': []}) members_collection = db.members members_cursor = members_collection.find({}, {'_id': 0, 'member_id': 1, 'name': 1, 'amount': 1, 'mission': 1}) members_list = list(members_cursor) if client: client.close() if not members_list: return pd.DataFrame({'No': [], 'Name': [], 'Amount': [], 'Mission': []}) # Convert to DataFrame with matching column names df = pd.DataFrame(members_list) # Ensure all required columns exist, add missing ones with default values if 'mission' not in df.columns: df['mission'] = '' df.rename(columns={'member_id': 'No', 'name': 'Name', 'amount': 'Amount', 'mission': 'Mission'}, inplace=True) # Fill any remaining missing values df['Mission'] = df['Mission'].fillna('') return df except Exception as e: logger.error(f"Error loading members from MongoDB: {e}") return pd.DataFrame({'No': [], 'Name': [], 'Amount': [], 'Mission': []}) def load_contributions_data(): """Load the ODA Welfare Department contributions from MongoDB""" try: db, client = connect_mongodb() if db is None: return pd.DataFrame({'Contribution_Type': [], 'Name': [], 'Amount': [], 'Date': []}) contributions_collection = db.contributions contributions_cursor = contributions_collection.find({}, {'_id': 0, 'contribution_type': 1, 'name': 1, 'amount': 1, 'date': 1}) contributions_list = list(contributions_cursor) if client: client.close() if not contributions_list: return pd.DataFrame({'Contribution_Type': [], 'Name': [], 'Amount': [], 'Date': []}) # Convert to DataFrame with matching column names df = pd.DataFrame(contributions_list) df.rename(columns={'contribution_type': 'Contribution_Type', 'name': 'Name', 'amount': 'Amount', 'date': 'Date'}, inplace=True) return df except Exception as e: logger.error(f"Error loading contributions from MongoDB: {e}") return pd.DataFrame({'Contribution_Type': [], 'Name': [], 'Amount': [], 'Date': []}) def search_member(search_term): """Search for members by name and return formatted HTML""" df = load_members_data() if df.empty: return "❌ No data found. Please ensure MongoDB is properly configured and indexed." if not search_term.strip(): return "â„šī¸ Please enter a name to search for." # Case-insensitive search with fuzzy matching fallback search_term_clean = search_term.strip().lower() # First try exact substring match matches = df[df['Name'].str.lower().str.contains(search_term_clean, na=False)] # If no matches and multiple words, try fuzzy matching if matches.empty and len(search_term_clean.split()) > 1: fuzzy_matches = [] for _, row in df.iterrows(): if fuzzy_match_name(search_term, row['Name']): fuzzy_matches.append(row) if fuzzy_matches: matches = pd.DataFrame(fuzzy_matches) if matches.empty: return f"🔍 No members found matching '{search_term}'" return matches def get_nickname_matches(name): """Return possible nicknames and full names for a given name""" nickname_map = { # Common African/Kenyan name variations 'ben': ['benard', 'bernard', 'benjamin'], 'benard': ['ben', 'benny'], 'bernard': ['ben', 'benny'], 'benjamin': ['ben', 'benny'], 'chris': ['christopher', 'christian', 'Chrispine'], 'christopher': ['chris'], 'christian': ['chris'], 'mike': ['michael'], 'michael': ['mike'], 'joe': ['joseph'], 'joseph': ['joe'], 'dan': ['daniel'], 'daniel': ['dan'], 'pete': ['peter'], 'peter': ['pete'], 'abram': ['abraham'], 'abraham': ['abram'], 'vic': ['victor'], 'victor': ['vic'], 'sam': ['samson', 'samuel', 'samwel'], 'samson': ['sam'], 'samuel': ['sam'], 'samwel': ['sam'], 'fred': ['frederick', 'fredrick'], 'frederick': ['fred'], 'fredrick': ['fred'] } name_lower = name.lower() matches = [name_lower] if name_lower in nickname_map: matches.extend(nickname_map[name_lower]) return matches def fuzzy_match_name(search_term, full_name, threshold=0.7): """Use difflib to match names with fuzzy logic for typos and variations""" search_term = search_term.lower().strip() full_name = full_name.lower().strip() # Direct substring match (highest priority) if search_term in full_name: return True # Split into name parts for matching search_parts = search_term.split() name_parts = full_name.split() # Filter out titles and short words from name_parts filtered_name_parts = [part for part in name_parts if len(part) > 2 and not part.endswith('.')] # If multiple search terms, ALL must have good matches if len(search_parts) > 1: matched_parts = 0 for search_part in search_parts: found_match = False # First check nickname matching search_nicknames = get_nickname_matches(search_part) for name_part in filtered_name_parts: name_nicknames = get_nickname_matches(name_part) # Check if any nickname combinations match for search_nick in search_nicknames: for name_nick in name_nicknames: if search_nick == name_nick: found_match = True break if found_match: break if found_match: break # If no nickname match, try similarity matching if not found_match: # Check against filtered name parts first for name_part in filtered_name_parts: similarity = SequenceMatcher(None, search_part, name_part).ratio() if similarity >= threshold: found_match = True break # If not found, check against all parts with lower threshold if not found_match: for name_part in name_parts: similarity = SequenceMatcher(None, search_part, name_part).ratio() if similarity >= 0.8: # Higher threshold for unfiltered found_match = True break if found_match: matched_parts += 1 # Require ALL search parts to have matches return matched_parts == len(search_parts) # For single search term, find any matching part else: search_part = search_parts[0] # First check nickname matching search_nicknames = get_nickname_matches(search_part) for name_part in filtered_name_parts: name_nicknames = get_nickname_matches(name_part) # Check if any nickname combinations match for search_nick in search_nicknames: for name_nick in name_nicknames: if search_nick == name_nick: return True # If no nickname match, try similarity matching # Check filtered parts first for name_part in filtered_name_parts: similarity = SequenceMatcher(None, search_part, name_part).ratio() if similarity >= threshold: return True # Then check all parts for name_part in name_parts: similarity = SequenceMatcher(None, search_part, name_part).ratio() if similarity >= 0.8: return True return False def get_contribution_types(): """Get list of all contribution types for dropdown""" df = load_contributions_data() if df.empty: return ["All Contributions"] contribution_types = ["All Contributions"] + sorted(df['Contribution_Type'].unique().tolist()) return contribution_types def search_contributions(search_term, contribution_type="All Contributions"): """Search for member contributions with optional filtering by contribution type""" df = load_contributions_data() if df.empty: return "❌ No contribution data found. Please ensure MongoDB is properly configured and indexed." # Filter by contribution type if specified if contribution_type != "All Contributions": df = df[df['Contribution_Type'] == contribution_type] # If no search term provided, return all records for selected type if not search_term.strip(): if contribution_type == "All Contributions": return "â„šī¸ Please enter a name to search for contributions." else: # Show all contributors for this specific contribution type results = [] for _, row in df.iterrows(): results.append({ 'Contribution Type': row['Contribution_Type'], 'Name': row['Name'], 'Amount': f"KSH {row['Amount']}/=", 'Date': row['Date'] }) return pd.DataFrame(results) if results else f"No contributors found for {contribution_type}" search_term = search_term.strip() results = [] for _, row in df.iterrows(): if fuzzy_match_name(search_term, row['Name']): results.append({ 'Contribution Type': row['Contribution_Type'], 'Name': row['Name'], 'Amount': f"KSH {row['Amount']}/=", 'Date': row['Date'] }) if not results: filter_msg = f" in {contribution_type}" if contribution_type != "All Contributions" else "" return f"🔍 No contributions found for '{search_term}'{filter_msg}" return pd.DataFrame(results) def create_about_tab(): """Create the About tab content - mobile optimized""" with gr.Column(): gr.Markdown(""" # â›Ē ODA Welfare **Organisation of Deacons and Acolytes** Nairobi Diocese ## đŸŽ¯ Mission Welfare support for deacons and acolytes, fostering brotherhood and spiritual growth. ## đŸ‘ī¸ Vision No member walks alone in times of need. ## 📜 Foundation > *"Love one another."* > **John 13:35** ## 🤝 Services â€ĸ Financial assistance â€ĸ Medical support â€ĸ Educational support â€ĸ Spiritual counseling """) def create_search_tab(): """Create the Search tab content - mobile optimized""" with gr.Column(): gr.Markdown("## 🔍 Search Members") # Registration Process Section with gr.Accordion("📝 New Member Registration", open=False): gr.Markdown(""" ### M-PESA Payment Instructions To register as a new member, please follow these steps: 1. **GO TO MPESA MENU** 2. **CLICK PAYBILL** 3. **ENTER PAYBILL NUMBER:** `522533` 4. **ENTER ACCOUNT NUMBER:** `7973379` 5. **ENTER AMOUNT:** `200` 6. **ENTER PIN** 7. **FORWARD MPESA MESSAGE TO:** `0719769270` *After completing payment and forwarding the M-PESA message, your membership will be processed.* """) search_input = gr.Textbox( placeholder="Enter name (e.g. Abraham)...", label="Member Name", container=False ) search_btn = gr.Button("🔍 Search", variant="primary", size="lg", scale=1) status_output = gr.Markdown() results_output = gr.Dataframe( headers=["#", "Name", "Amount", "Mission"], datatype=["number", "str", "number", "str"], interactive=False, wrap=True ) def search_and_display(search_term): result = search_member(search_term) if isinstance(result, str): # Error or no results - return empty dataframe and status message return pd.DataFrame(columns=["No", "Name", "Amount", "Mission"]), result else: # Success - return dataframe and success message status_msg = f"✅ Found {len(result)} member(s) matching '{search_term}'" return result, status_msg # Event handlers search_btn.click(search_and_display, inputs=search_input, outputs=[results_output, status_output]) search_input.submit(search_and_display, inputs=search_input, outputs=[results_output, status_output]) def create_contributions_tab(): """Create the Contributions tab content - mobile optimized""" with gr.Column(): gr.Markdown("## 💰 Search Contributions") gr.Markdown("*Search for member contributions across all welfare activities*") # Stack vertically on mobile for better usability search_input = gr.Textbox( placeholder="Enter name (e.g. Lucas)...", label="Member Name", container=False ) contribution_dropdown = gr.Dropdown( choices=get_contribution_types(), value="All Contributions", label="Contribution Type", container=False ) search_btn = gr.Button("🔍 Search Contributions", variant="primary", size="lg", scale=1) status_output = gr.Markdown() results_output = gr.Dataframe( headers=["Contribution Type", "Name", "Amount", "Date"], datatype=["str", "str", "str", "str"], interactive=False, wrap=True ) def search_and_display_contributions(search_term, contribution_type): result = search_contributions(search_term, contribution_type) if isinstance(result, str): # Error or no results - return empty dataframe and status message return pd.DataFrame(columns=["Contribution Type", "Name", "Amount", "Date"]), result else: # Success - return dataframe and success message if search_term.strip(): status_msg = f"✅ Found {len(result)} contribution(s) for '{search_term}'" if contribution_type != "All Contributions": status_msg += f" in {contribution_type}" else: status_msg = f"✅ Showing all {len(result)} contributors for {contribution_type}" return result, status_msg # Event handlers search_btn.click( search_and_display_contributions, inputs=[search_input, contribution_dropdown], outputs=[results_output, status_output] ) search_input.submit( search_and_display_contributions, inputs=[search_input, contribution_dropdown], outputs=[results_output, status_output] ) contribution_dropdown.change( search_and_display_contributions, inputs=[search_input, contribution_dropdown], outputs=[results_output, status_output] ) def create_app(): """Create mobile-optimized Gradio interface""" mobile_css = """ .gradio-container { max-width: 100% !important; padding: 8px !important; margin: 0 !important; } /* Mobile-first responsive design */ @media (max-width: 768px) { .gr-button { width: 100% !important; margin: 5px 0 !important; } .gr-textbox, .gr-dropdown { margin-bottom: 10px !important; } .gr-dataframe { font-size: 12px !important; } h1, h2 { font-size: 1.2em !important; margin: 10px 0 !important; } } /* Improve table readability on mobile */ .gr-dataframe table { font-size: 14px; word-wrap: break-word; } """ with gr.Blocks( title="ODA Welfare", theme=gr.themes.Soft(), css=mobile_css ) as demo: gr.Markdown("# â›Ē ODA Welfare") gr.Markdown("*Nairobi Diocese*") with gr.Tabs(): with gr.TabItem("🔍 Search"): create_search_tab() with gr.TabItem("💰 Contributions"): create_contributions_tab() with gr.TabItem("📖 About"): create_about_tab() return demo if __name__ == "__main__": app = create_app() app.launch( share=False, show_api=False, favicon_path=None, )