xyzmin / app.py
ageraustine's picture
Update app.py
97b0724 verified
#!/usr/bin/env python3
"""
ODA Welfare Admin Panel
Separate admin interface for adding new members to MongoDB
"""
import gradio as gr
import pandas as pd
from pymongo import MongoClient
import logging
import os
from dotenv import load_dotenv
from datetime import datetime
# No external plotting libraries needed - using Gradio's native components
load_dotenv()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def connect_mongodb():
"""Connect to MongoDB database"""
try:
connection_string = os.getenv("MONGODB_CONNECTION", "mongodb://localhost:27017/")
database_name = os.getenv("DATABASE_NAME", "oda_welfare")
client = MongoClient(connection_string)
db = client[database_name]
client.admin.command('ping')
return db, client
except Exception as e:
logger.error(f"Failed to connect to MongoDB: {e}")
return None, None
def validate_api_key(api_key):
"""Validate API key for admin operations"""
admin_key = os.getenv("ADMIN_API_KEY")
if not admin_key or not api_key:
return False
return api_key == admin_key
def add_new_member(member_id, name, amount, mission, api_key):
"""Add a new member to MongoDB with API key protection"""
try:
# Validate API key
if not validate_api_key(api_key):
return False, "❌ Invalid API key. Access denied."
# Validate inputs
if not name or not name.strip():
return False, "❌ Member name is required."
# Validate member ID
try:
member_id = int(member_id) if member_id else None
except (ValueError, TypeError):
return False, "❌ Invalid member ID format."
if not member_id or member_id <= 0:
return False, "❌ Member ID must be a positive number."
try:
amount = float(amount) if amount else 0.0
except ValueError:
return False, "❌ Invalid amount format."
# Connect to MongoDB
db, client = connect_mongodb()
if db is None:
return False, "❌ Database connection failed."
members_collection = db.members
# Check if member ID already exists
existing_member = members_collection.find_one({"member_id": member_id})
if existing_member:
if client:
client.close()
return False, f"❌ Member ID {member_id} already exists. Please use a different ID."
# Create new member document
new_member = {
"member_id": member_id,
"name": name.strip(),
"amount": amount,
"mission": mission.strip() if mission else "",
"indexed_at": datetime.now(),
"status": "active"
}
# Insert member
result = members_collection.insert_one(new_member)
if client:
client.close()
if result.inserted_id:
logger.info(f"Added new member: {name} (ID: {member_id})")
return True, f"βœ… Member '{name}' added successfully with ID {member_id}"
else:
return False, "❌ Failed to add member to database."
except Exception as e:
logger.error(f"Error adding member: {e}")
return False, f"❌ Error: {str(e)}"
def get_member_stats():
"""Get member count and target for progress visualization"""
try:
db, client = connect_mongodb()
if db is None:
return 0, 450
members_collection = db.members
member_count = members_collection.count_documents({})
if client:
client.close()
target_members = 450
return member_count, target_members
except Exception as e:
logger.error(f"Error getting stats: {e}")
return 0, 450
def get_next_member_id():
"""Get the next available member ID"""
try:
db, client = connect_mongodb()
if db is None:
return 1
members_collection = db.members
# Get the highest member ID
last_member = members_collection.find().sort("member_id", -1).limit(1)
last_member_list = list(last_member)
next_id = (last_member_list[0]["member_id"] + 1) if last_member_list else 1
if client:
client.close()
return next_id
except Exception as e:
logger.error(f"Error getting next member ID: {e}")
return 1
def get_contribution_types():
"""Get list of available contribution types from database"""
try:
db, client = connect_mongodb()
if db is None:
return ["Medical Support", "Education Fund", "Emergency Relief"] # Default options
# Get unique contribution types from contributions collection
contributions_collection = db.contributions
contribution_types = contributions_collection.distinct("contribution_type")
# Also check if there's a dedicated contribution_types collection
try:
contribution_types_collection = db.contribution_types
if contribution_types_collection.count_documents({}) > 0:
stored_types = [doc['type'] for doc in contribution_types_collection.find({"status": "active"}, {"type": 1})]
contribution_types.extend(stored_types)
except Exception:
pass # Ignore if collection doesn't exist
if client:
client.close()
# Remove duplicates and sort
unique_types = sorted(list(set(contribution_types))) if contribution_types else []
# Add some default options if list is empty
if not unique_types:
unique_types = ["Medical Support", "Education Fund", "Emergency Relief", "Memorial Fund"]
return unique_types
except Exception as e:
logger.error(f"Error getting contribution types: {e}")
return ["Medical Support", "Education Fund", "Emergency Relief"]
def create_membership_progress_display(registered_count, target_count):
"""Create a beautiful membership progress display using HTML/CSS"""
try:
progress_percentage = (registered_count / target_count) * 100
remaining_count = target_count - registered_count
# Create a beautiful HTML progress display
html_display = f"""
<div style="padding: 20px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); border-radius: 15px; color: white; margin: 10px 0;">
<h3 style="text-align: center; margin: 0 0 20px 0;">🎯 ODA Welfare Membership Progress</h3>
<div style="background: rgba(255,255,255,0.2); border-radius: 10px; padding: 20px; margin: 15px 0;">
<div style="display: flex; justify-content: space-between; align-items: center; margin-bottom: 10px;">
<span style="font-size: 18px; font-weight: bold;">Progress:</span>
<span style="font-size: 24px; font-weight: bold;">{progress_percentage:.1f}%</span>
</div>
<div style="background: rgba(255,255,255,0.3); border-radius: 25px; height: 40px; overflow: hidden; margin: 15px 0;">
<div style="background: linear-gradient(45deg, #28a745, #20c997); height: 100%; width: {progress_percentage}%; border-radius: 25px; display: flex; align-items: center; justify-content: center; transition: width 0.5s ease;">
<span style="color: white; font-weight: bold; font-size: 14px;">{registered_count} members</span>
</div>
</div>
<div style="display: grid; grid-template-columns: 1fr 1fr 1fr; gap: 15px; margin-top: 20px; text-align: center;">
<div style="background: rgba(40, 167, 69, 0.8); padding: 15px; border-radius: 10px;">
<div style="font-size: 28px; font-weight: bold;">{registered_count}</div>
<div style="font-size: 14px; opacity: 0.9;">Registered</div>
</div>
<div style="background: rgba(255, 193, 7, 0.8); padding: 15px; border-radius: 10px;">
<div style="font-size: 28px; font-weight: bold;">{remaining_count}</div>
<div style="font-size: 14px; opacity: 0.9;">Remaining</div>
</div>
<div style="background: rgba(108, 117, 125, 0.8); padding: 15px; border-radius: 10px;">
<div style="font-size: 28px; font-weight: bold;">{target_count}</div>
<div style="font-size: 14px; opacity: 0.9;">Target</div>
</div>
</div>
</div>
<div style="text-align: center; margin-top: 15px; font-size: 16px;">
<strong>πŸš€ {remaining_count} more members needed to reach our goal!</strong>
</div>
</div>
"""
return html_display
except Exception as e:
logger.error(f"Error creating progress display: {e}")
return f"<div style='color: red;'>Error creating progress display: {str(e)}</div>"
def search_member_for_edit(search_term):
"""Search for a member to edit"""
try:
if not search_term or not search_term.strip():
return None, "Please enter a search term"
db, client = connect_mongodb()
if db is None:
return None, "Database connection failed"
members_collection = db.members
search_term_clean = search_term.strip().lower()
# Search by member ID or name
query = {
"$or": [
{"member_id": {"$regex": str(search_term), "$options": "i"}},
{"name": {"$regex": search_term_clean, "$options": "i"}}
]
}
members = list(members_collection.find(query, {"_id": 0}))
if client:
client.close()
if not members:
return None, f"No member found matching '{search_term}'"
return members, f"Found {len(members)} member(s)"
except Exception as e:
logger.error(f"Error searching member: {e}")
return None, f"Error: {str(e)}"
def update_member(member_id, name, amount, mission, api_key):
"""Update an existing member"""
try:
# Validate API key
if not validate_api_key(api_key):
return False, "❌ Invalid API key. Access denied."
# Validate inputs
if not name or not name.strip():
return False, "❌ Member name is required."
try:
amount = float(amount) if amount else 0.0
except ValueError:
return False, "❌ Invalid amount format."
try:
member_id = int(member_id)
except ValueError:
return False, "❌ Invalid member ID."
# Connect to MongoDB
db, client = connect_mongodb()
if db is None:
return False, "❌ Database connection failed."
members_collection = db.members
# Update member document
update_data = {
"name": name.strip(),
"amount": amount,
"mission": mission.strip() if mission else "",
"updated_at": datetime.now()
}
result = members_collection.update_one(
{"member_id": member_id},
{"$set": update_data}
)
if client:
client.close()
if result.modified_count > 0:
logger.info(f"Updated member ID {member_id}: {name}")
return True, f"βœ… Member '{name}' updated successfully"
else:
return False, "❌ Member not found or no changes made"
except Exception as e:
logger.error(f"Error updating member: {e}")
return False, f"❌ Error: {str(e)}"
def create_contribution_type(contribution_type, description, api_key):
"""Create a new contribution type"""
try:
# Validate API key
if not validate_api_key(api_key):
return False, "❌ Invalid API key. Access denied."
if not contribution_type or not contribution_type.strip():
return False, "❌ Contribution type name is required."
# Connect to MongoDB
db, client = connect_mongodb()
if db is None:
return False, "❌ Database connection failed."
contribution_types_collection = db.contribution_types
# Check if contribution type already exists
existing = contribution_types_collection.find_one({"type": contribution_type.strip()})
if existing:
if client:
client.close()
return False, f"❌ Contribution type '{contribution_type}' already exists."
# Create new contribution type
new_type = {
"type": contribution_type.strip(),
"description": description.strip() if description else "",
"created_at": datetime.now(),
"status": "active"
}
result = contribution_types_collection.insert_one(new_type)
if client:
client.close()
if result.inserted_id:
logger.info(f"Created new contribution type: {contribution_type}")
return True, f"βœ… Contribution type '{contribution_type}' created successfully"
else:
return False, "❌ Failed to create contribution type"
except Exception as e:
logger.error(f"Error creating contribution type: {e}")
return False, f"❌ Error: {str(e)}"
def add_member_contribution(member_name, contribution_type, amount, date, api_key):
"""Add a contribution for a member"""
try:
# Validate API key
if not validate_api_key(api_key):
return False, "❌ Invalid API key. Access denied."
# Validate inputs
if not member_name or not member_name.strip():
return False, "❌ Member name is required."
if not contribution_type or not contribution_type.strip():
return False, "❌ Contribution type is required."
try:
amount = float(amount) if amount else 0.0
except ValueError:
return False, "❌ Invalid amount format."
# Connect to MongoDB
db, client = connect_mongodb()
if db is None:
return False, "❌ Database connection failed."
contributions_collection = db.contributions
# Create new contribution
new_contribution = {
"name": member_name.strip(),
"contribution_type": contribution_type.strip(),
"amount": amount,
"date": date if date else datetime.now().strftime("%Y-%m-%d"),
"created_at": datetime.now()
}
result = contributions_collection.insert_one(new_contribution)
if client:
client.close()
if result.inserted_id:
logger.info(f"Added contribution for {member_name}: {contribution_type} - {amount}")
return True, f"βœ… Contribution added for '{member_name}'"
else:
return False, "❌ Failed to add contribution"
except Exception as e:
logger.error(f"Error adding contribution: {e}")
return False, f"❌ Error: {str(e)}"
def create_admin_app():
"""Create admin interface for ODA Welfare"""
with gr.Blocks(title="ODA Welfare - Admin Panel") as demo:
gr.Markdown("# πŸ” ODA Welfare Admin Panel")
gr.Markdown("*Protected administrative interface for member and contribution management*")
# Global API key input
global_api_key = gr.Textbox(
placeholder="Enter admin API key...",
label="πŸ”‘ Master API Key",
type="password",
container=True
)
with gr.Tabs():
# Add New Member Tab
with gr.TabItem("βž• Add Member"):
with gr.Row():
with gr.Column(scale=2):
gr.Markdown("## Add New Member")
# Get suggested next member ID
suggested_id = get_next_member_id()
member_id_input = gr.Number(
label=f"πŸ”’ Member ID (Suggested: {suggested_id})",
container=True,
minimum=1,
precision=0,
value=suggested_id,
placeholder="Enter unique member ID..."
)
name_input = gr.Textbox(
placeholder="Enter member full name...",
label="πŸ‘€ Member Name",
container=True
)
amount_input = gr.Number(
label="πŸ’° Initial Amount (KSH)",
value=0,
container=True,
minimum=0
)
mission_input = gr.Textbox(
placeholder="Enter member's parish...",
label="β›ͺ Parish",
container=True
)
add_btn = gr.Button("βž• Add Member", variant="primary", size="lg")
add_status = gr.Markdown()
with gr.Column(scale=1):
gr.Markdown("## πŸ“Š Membership Progress")
refresh_btn = gr.Button("πŸ”„ Refresh Progress", variant="secondary")
# Get initial stats and create progress display
member_count, target_count = get_member_stats()
initial_progress = create_membership_progress_display(member_count, target_count)
progress_display = gr.HTML(value=initial_progress)
# Edit Member Tab
with gr.TabItem("✏️ Edit Member"):
gr.Markdown("## Search and Edit Members")
search_input = gr.Textbox(
placeholder="Enter member name or ID...",
label="πŸ” Search Member",
container=True
)
search_btn = gr.Button("πŸ” Search", variant="secondary")
search_status = gr.Markdown()
search_results = gr.Dataframe(
headers=["ID", "Name", "Amount", "Parish", "Status"],
datatype=["number", "str", "number", "str", "str"],
interactive=True,
wrap=True
)
gr.Markdown("*πŸ’‘ Click on a row above to select a member for editing*")
with gr.Row():
with gr.Column():
edit_id_input = gr.Number(
label="Member ID",
container=True
)
edit_name_input = gr.Textbox(
label="πŸ‘€ Name",
container=True
)
edit_amount_input = gr.Number(
label="πŸ’° Amount (KSH)",
container=True,
minimum=0
)
edit_mission_input = gr.Textbox(
label="β›ͺ Parish",
container=True
)
update_btn = gr.Button("πŸ’Ύ Update Member", variant="primary")
edit_status = gr.Markdown()
selected_member_info = gr.Markdown("*No member selected*")
# Contribution Types Tab
with gr.TabItem("πŸ“‹ Contribution Types"):
gr.Markdown("## Manage Contribution Types")
with gr.Row():
with gr.Column():
contrib_type_input = gr.Textbox(
placeholder="e.g. Medical Support, Education Fund...",
label="🏷️ Contribution Type Name",
container=True
)
contrib_desc_input = gr.Textbox(
placeholder="Description of this contribution type...",
label="πŸ“ Description",
lines=3,
container=True
)
create_type_btn = gr.Button("βž• Create Type", variant="primary")
type_status = gr.Markdown()
# Manage Contributions Tab
with gr.TabItem("πŸ’° Manage Contributions"):
gr.Markdown("## Add Member Contributions")
with gr.Row():
with gr.Column():
contrib_member_input = gr.Textbox(
placeholder="Enter member name...",
label="πŸ‘€ Member Name",
container=True
)
# Get available contribution types for dropdown
available_contrib_types = get_contribution_types()
contrib_type_dropdown = gr.Dropdown(
choices=available_contrib_types,
label="🏷️ Contribution Type",
container=True,
allow_custom_value=True, # Allow adding new types
info="Select existing type or type a new one"
)
contrib_amount_input = gr.Number(
label="πŸ’° Amount (KSH)",
container=True,
minimum=0
)
contrib_date_input = gr.Textbox(
placeholder="YYYY-MM-DD (leave empty for today)",
label="πŸ“… Date",
container=True
)
add_contrib_btn = gr.Button("βž• Add Contribution", variant="primary")
refresh_contrib_types_btn = gr.Button("πŸ”„ Refresh Types", variant="secondary", size="sm")
contrib_status = gr.Markdown()
# Event handlers
def handle_add_member(member_id, name, amount, mission, api_key):
success, message = add_new_member(member_id, name, amount, mission, api_key)
if success:
member_count, target_count = get_member_stats()
new_progress = create_membership_progress_display(member_count, target_count)
return None, "", 0, "", message, new_progress
else:
return member_id, name, amount, mission, message, progress_display.value
def handle_search_member(search_term, api_key):
if not validate_api_key(api_key):
return pd.DataFrame(), "❌ Invalid API key"
members, message = search_member_for_edit(search_term)
if members:
df_data = []
for member in members:
df_data.append([
int(member.get('member_id', 0)), # Ensure integer
str(member.get('name', '')), # Ensure string
float(member.get('amount', 0)), # Ensure float
str(member.get('mission', '')), # Ensure string
str(member.get('status', 'active')) # Ensure string
])
df = pd.DataFrame(df_data, columns=["ID", "Name", "Amount", "Parish", "Status"])
# Ensure proper data types
df['ID'] = df['ID'].astype(int)
df['Amount'] = df['Amount'].astype(float)
return df, message
else:
return pd.DataFrame(columns=["ID", "Name", "Amount", "Parish", "Status"]), message
def handle_update_member(member_id, name, amount, mission, api_key):
success, message = update_member(member_id, name, amount, mission, api_key)
if success:
return "", "", "", "", message
else:
return member_id, name, amount, mission, message
def handle_create_type(contrib_type, description, api_key):
success, message = create_contribution_type(contrib_type, description, api_key)
if success:
return "", "", message
else:
return contrib_type, description, message
def handle_add_contribution(member_name, contrib_type, amount, date, api_key):
success, message = add_member_contribution(member_name, contrib_type, amount, date, api_key)
if success:
return "", "", "", "", message
else:
return member_name, contrib_type, amount, date, message
def refresh_progress():
member_count, target_count = get_member_stats()
progress = create_membership_progress_display(member_count, target_count)
return progress
def handle_member_selection(df, evt: gr.SelectData):
"""Handle when user clicks on a row in the search results"""
try:
if df is None or len(df) == 0:
return None, "", "", "", "*No member selected*"
# Get the selected row data
row_index = evt.index[0] if evt.index else 0
selected_row = df.iloc[row_index]
# Ensure proper data type conversion
member_id = int(selected_row['ID']) if pd.notna(selected_row['ID']) else None
member_name = str(selected_row['Name']) if pd.notna(selected_row['Name']) else ""
member_amount = float(selected_row['Amount']) if pd.notna(selected_row['Amount']) else 0.0
member_mission = str(selected_row['Parish']) if pd.notna(selected_row['Parish']) else ""
member_status = str(selected_row['Status']) if pd.notna(selected_row['Status']) else "active"
# Create info message
info_msg = f"**βœ… Selected Member:** {member_name} (ID: {member_id}) - Status: {member_status}"
return member_id, member_name, member_amount, member_mission, info_msg
except Exception as e:
logger.error(f"Error handling member selection: {e}")
return None, "", "", "", "*Error selecting member*"
def refresh_contribution_types():
"""Refresh the list of contribution types"""
return gr.Dropdown(choices=get_contribution_types())
# Wire up events
add_btn.click(
handle_add_member,
inputs=[member_id_input, name_input, amount_input, mission_input, global_api_key],
outputs=[member_id_input, name_input, amount_input, mission_input, add_status, progress_display]
)
search_btn.click(
handle_search_member,
inputs=[search_input, global_api_key],
outputs=[search_results, search_status]
)
# Handle row selection in search results
search_results.select(
handle_member_selection,
inputs=[search_results],
outputs=[edit_id_input, edit_name_input, edit_amount_input, edit_mission_input, selected_member_info]
)
update_btn.click(
handle_update_member,
inputs=[edit_id_input, edit_name_input, edit_amount_input, edit_mission_input, global_api_key],
outputs=[edit_id_input, edit_name_input, edit_amount_input, edit_mission_input, edit_status]
)
create_type_btn.click(
handle_create_type,
inputs=[contrib_type_input, contrib_desc_input, global_api_key],
outputs=[contrib_type_input, contrib_desc_input, type_status]
)
add_contrib_btn.click(
handle_add_contribution,
inputs=[contrib_member_input, contrib_type_dropdown, contrib_amount_input, contrib_date_input, global_api_key],
outputs=[contrib_member_input, contrib_type_dropdown, contrib_amount_input, contrib_date_input, contrib_status]
)
refresh_contrib_types_btn.click(
refresh_contribution_types,
outputs=contrib_type_dropdown
)
refresh_btn.click(refresh_progress, outputs=progress_display)
demo.load(refresh_progress, outputs=progress_display)
gr.Markdown("---")
gr.Markdown("**Security:** This interface requires valid API key authentication for all operations.")
gr.Markdown("**Note:** All operations are logged and timestamped in the database.")
return demo
if __name__ == "__main__":
admin_app = create_admin_app()
admin_app.launch(
share=False
)