rairo's picture
25 apr
f86f5a8 verified
# -*- coding: utf-8 -*-
import streamlit as st
import json
# import os # Not needed
import graphviz
from datetime import datetime
import re
import uuid # For generating unique IDs
import random # For quiz options shuffling
import google.generativeai as genai # Gemini API
from collections import defaultdict # For timeline and sibling grouping
import firebase_admin
from firebase_admin import credentials, db
from copy import deepcopy # For comparing data states
from dictdiffer import diff # Using dictdiffer for easier comparison
# --- Configuration & Constants ---
DEFAULT_PROFILE = {
"profile": {"name": "", "dob": "", "gender": "Unknown", "phone": ""},
"family_members": [],
"relationships": [],
"settings": {"theme": "Default", "privacy": "Private"}, # Removed collaborators from here
"metadata": {"owner_phone": "", "tree_name": "My Family Tree", "created_at": ""}
}
DEFAULT_MEMBER_STRUCTURE = {
"id": "", "name": "", "dob": "", "dod": "", "gender": "Unknown",
"phone": "", "stories": [], "totem": "",
"created_at": "", "created_by": "",
"last_edited_at": "", "last_edited_by": ""
}
DEFAULT_STORY_STRUCTURE = {
"timestamp": "", "text": "", "added_by": ""
}
# DO NOT CHANGE THIS TITLE/NAME AS PER USER INSTRUCTION
GEMINI_MODEL_NAME = "gemini-2.0-flash"
# --- Firebase Initialization ---
Firebase_DB_URL = st.secrets.get("Firebase_DB")
Firebase_Credentials_JSON = st.secrets.get("FIREBASE")
firebase_app = None
firebase_db_ref = None
firebase_error = None
try:
if Firebase_Credentials_JSON and Firebase_DB_URL:
credentials_json = json.loads(Firebase_Credentials_JSON)
if not firebase_admin._apps:
cred = credentials.Certificate(credentials_json)
firebase_app = firebase_admin.initialize_app(cred, {'databaseURL': Firebase_DB_URL})
firebase_db_ref = db.reference('/') # Root reference
print("Firebase Admin SDK initialized successfully.")
else:
firebase_app = firebase_admin.get_app()
firebase_db_ref = db.reference('/')
print("Firebase Admin SDK already initialized.")
else:
firebase_error = "Firebase secrets (Firebase_DB, FIREBASE) missing."
print(firebase_error)
except Exception as e:
firebase_error = f"Error initializing Firebase: {e}"
print(firebase_error)
# --- Gemini API Client Initialization ---
genai_client = None
api_key_error = False
try:
api_key = st.secrets.get("GOOGLE_API_KEY")
if not api_key: api_key_error = True
else: genai.configure(api_key=api_key); genai_client = genai
except Exception as e: api_key_error = True
# --- Helper Functions ---
def normalize_phone(phone):
"""Normalizes phone to E.164-like format (e.g., +12223334444). Returns None if invalid."""
if not phone: return None
phone_str = str(phone).strip()
# Basic check: starts with +, followed by digits
if not re.match(r"^\+\d{5,}$", phone_str):
return None
digits = "".join(filter(str.isdigit, phone_str))
return f"+{digits}"
def get_user_db_path(owner_phone):
"""Gets the Firebase path for a user's primary tree data."""
normalized = normalize_phone(owner_phone)
if not normalized: return None
# Replace invalid Firebase path characters (like '+') if necessary,
# but E.164 is generally safe. Let's assume '+' is okay for now.
# If issues arise, replace '+' with something else or use a different key structure.
return f'users/{normalized}'
def get_phone_index_path(member_phone):
"""Gets the Firebase path for the phone index entry."""
normalized = normalize_phone(member_phone)
if not normalized: return None
return f'phone_index/{normalized}'
def get_pending_changes_path(owner_phone, proposer_phone=None):
"""Gets the Firebase path for pending changes."""
norm_owner = normalize_phone(owner_phone)
if not norm_owner: return None
base_path = f'pending_changes/{norm_owner}'
if proposer_phone:
norm_proposer = normalize_phone(proposer_phone)
if not norm_proposer: return None
return f'{base_path}/{norm_proposer}'
return base_path
# --- Data Loading ---
def load_tree_data(owner_phone):
"""Loads a specific tree's data from Firebase."""
if firebase_error or not firebase_db_ref: return None
user_path = get_user_db_path(owner_phone)
if not user_path: return None
try:
data = firebase_db_ref.child(user_path).get()
if data and isinstance(data, dict):
merged_data = deepcopy(DEFAULT_PROFILE)
if "metadata" not in data or not isinstance(data["metadata"], dict): data["metadata"] = {}
merged_data.update(data)
merged_data["settings"] = {**DEFAULT_PROFILE["settings"], **data.get("settings", {})}
merged_data["metadata"] = {**DEFAULT_PROFILE["metadata"], **data.get("metadata", {})}
merged_data["metadata"]["owner_phone"] = owner_phone
updated_members = []
for member_data in merged_data.get("family_members", []):
if isinstance(member_data, dict):
complete_member = deepcopy(DEFAULT_MEMBER_STRUCTURE)
member_data.pop('photo_path', None) # Remove legacy field if present
complete_member.update(member_data)
updated_members.append(complete_member)
merged_data["family_members"] = updated_members
if not isinstance(merged_data.get("relationships"), list): merged_data["relationships"] = []
for member in merged_data["family_members"]:
if "stories" not in member or not isinstance(member["stories"], list): member["stories"] = []
cleaned_stories = []
for story in member.get("stories", []):
if isinstance(story, dict):
complete_story = deepcopy(DEFAULT_STORY_STRUCTURE)
complete_story.update(story)
cleaned_stories.append(complete_story)
member["stories"] = cleaned_stories
return merged_data
elif data is None:
new_tree = deepcopy(DEFAULT_PROFILE)
new_tree["metadata"]["owner_phone"] = owner_phone
new_tree["metadata"]["created_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
owner_me_node = deepcopy(DEFAULT_MEMBER_STRUCTURE)
owner_me_node.update({
"id": "Me", "name": "Me",
"phone": owner_phone,
"created_at": new_tree["metadata"]["created_at"],
"created_by": owner_phone
})
new_tree["family_members"].append(owner_me_node)
new_tree["profile"]["phone"] = owner_phone
return new_tree
else:
st.error(f"Invalid data format found for tree owner {owner_phone}.")
return None
except Exception as e:
st.error(f"Error loading tree data for {owner_phone}: {e}")
return None
def find_linked_trees(member_phone):
"""Queries the phone_index to find trees containing the member_phone."""
if firebase_error or not firebase_db_ref: return {}
index_path = get_phone_index_path(member_phone)
if not index_path: return {}
try:
owner_phones_dict = firebase_db_ref.child(index_path).child("trees").get()
return owner_phones_dict if owner_phones_dict and isinstance(owner_phones_dict, dict) else {}
except Exception as e:
st.error(f"Error querying phone index for {member_phone}: {e}")
return {}
# --- Index Update Logic ---
def update_phone_index(owner_phone, previous_members, current_members):
"""Updates the phone_index based on member changes."""
if firebase_error or not firebase_db_ref: return False
norm_owner_phone = normalize_phone(owner_phone)
if not norm_owner_phone: return False
prev_phones = {normalize_phone(p.get('phone')) for p in previous_members if isinstance(p, dict) and normalize_phone(p.get('phone'))}
curr_phones = {normalize_phone(p.get('phone')) for p in current_members if isinstance(p, dict) and normalize_phone(p.get('phone'))}
phones_added = curr_phones - prev_phones
phones_removed = prev_phones - curr_phones
updates = {}
for phone in phones_added:
if phone:
index_entry_path = f"{get_phone_index_path(phone)}/trees/{norm_owner_phone}"
updates[index_entry_path] = True
for phone in phones_removed:
if phone:
index_entry_path = f"{get_phone_index_path(phone)}/trees/{norm_owner_phone}"
updates[index_entry_path] = None
if not updates: return True
try:
firebase_db_ref.update(updates)
return True
except Exception as e:
st.error(f"Error updating phone index: {e}")
st.error(f"Failed index updates: {json.dumps(updates)}")
return False
# --- Data Saving (Owner) ---
def save_tree_data(owner_phone, current_data, previous_data):
"""Saves owner's tree data and updates the phone index."""
if firebase_error or not firebase_db_ref:
st.error(f"Firebase error: {firebase_error or 'Not initialized'}. Cannot save.")
return False
user_path = get_user_db_path(owner_phone)
if not user_path:
st.error("Invalid owner phone format for saving.")
return False
# Prepare tree data
update_totems(current_data)
current_data["metadata"]["owner_phone"] = owner_phone
if "family_members" in current_data:
for member in current_data["family_members"]:
if isinstance(member, dict): member.pop('photo_path', None)
# Save main tree data
try:
firebase_db_ref.child(user_path).set(current_data)
except Exception as e:
st.error(f"Error saving tree data to {user_path}: {e}")
return False
# Update phone index
prev_members = previous_data.get("family_members", []) if previous_data else []
curr_members = current_data.get("family_members", [])
if not update_phone_index(owner_phone, prev_members, curr_members):
st.warning("Tree data saved, but failed to update phone index. Index may be inconsistent.")
return False # Indicate partial failure
return True
# --- Collaboration Functions ---
def propose_changes(owner_phone, proposer_phone, proposed_data):
"""Saves the collaborator's proposed tree state."""
if firebase_error or not firebase_db_ref:
st.error(f"Firebase error: {firebase_error or 'Not initialized'}. Cannot propose changes.")
return False
pending_path = get_pending_changes_path(owner_phone, proposer_phone)
if not pending_path:
st.error("Invalid owner or proposer phone format for proposing changes.")
return False
# Add metadata to the proposal
proposal_payload = {
"proposed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"proposer_phone": proposer_phone,
"tree_data": proposed_data # Store the entire proposed tree
}
try:
firebase_db_ref.child(pending_path).set(proposal_payload)
st.success(f"Changes proposed successfully to owner (...{owner_phone[-4:]}).")
return True
except Exception as e:
st.error(f"Error proposing changes to {pending_path}: {e}")
return False
def load_pending_changes(owner_phone):
"""Loads all pending change proposals for a given owner."""
if firebase_error or not firebase_db_ref: return {}
pending_base_path = get_pending_changes_path(owner_phone)
if not pending_base_path: return {}
try:
data = firebase_db_ref.child(pending_base_path).get()
return data if data and isinstance(data, dict) else {}
except Exception as e:
st.error(f"Error loading pending changes for {owner_phone}: {e}")
return {}
def accept_changes(owner_phone, proposer_phone):
"""Accepts changes: loads proposal, saves it as main tree, updates index, deletes proposal."""
if firebase_error or not firebase_db_ref: return False
pending_path = get_pending_changes_path(owner_phone, proposer_phone)
if not pending_path: return False
try:
# 1. Load the proposal
proposal_payload = firebase_db_ref.child(pending_path).get()
if not proposal_payload or "tree_data" not in proposal_payload:
st.error(f"Proposal from {proposer_phone} not found or invalid.")
return False
accepted_data = proposal_payload["tree_data"]
# 2. Load the owner's current data (for index comparison)
previous_data = load_tree_data(owner_phone)
if previous_data is None:
st.error("Failed to load current owner data before accepting changes.")
return False # Avoid overwriting if current state is unknown
# 3. Save the accepted data as the main tree data (using save_tree_data for index update)
if save_tree_data(owner_phone, accepted_data, previous_data):
# 4. Delete the pending proposal
firebase_db_ref.child(pending_path).delete()
st.success(f"Changes from {proposer_phone} accepted and merged.")
return True
else:
# save_tree_data already showed an error
st.error("Failed to save accepted changes or update index. Proposal not deleted.")
return False
except Exception as e:
st.error(f"Error accepting changes from {proposer_phone}: {e}")
return False
def reject_changes(owner_phone, proposer_phone):
"""Rejects and deletes a pending change proposal."""
if firebase_error or not firebase_db_ref: return False
pending_path = get_pending_changes_path(owner_phone, proposer_phone)
if not pending_path: return False
try:
firebase_db_ref.child(pending_path).delete()
st.success(f"Changes from {proposer_phone} rejected.")
return True
except Exception as e:
st.error(f"Error rejecting changes from {proposer_phone}: {e}")
return False
def generate_diff_summary(current_data, proposed_data):
"""Generates a human-readable summary of differences between two tree data dicts."""
summary = []
try:
# Use dictdiffer to find changes
result = list(diff(current_data, proposed_data, ignore={'last_edited_at', 'last_edited_by', 'created_at', 'created_by'})) # Ignore timestamps/editors for diff summary
# Process members
current_members = {m['id']: m for m in current_data.get('family_members', []) if isinstance(m, dict)}
proposed_members = {m['id']: m for m in proposed_data.get('family_members', []) if isinstance(m, dict)}
added_members = set(proposed_members.keys()) - set(current_members.keys())
deleted_members = set(current_members.keys()) - set(proposed_members.keys())
common_members = set(current_members.keys()) & set(proposed_members.keys())
if added_members:
summary.append("**Members Added:**")
for mid in added_members:
summary.append(f"- {proposed_members[mid].get('name', 'Unnamed')} (ID: ...{mid[-6:]})")
if deleted_members:
summary.append("**Members Deleted:**")
for mid in deleted_members:
summary.append(f"- {current_members[mid].get('name', 'Unnamed')} (ID: ...{mid[-6:]})")
modified_member_summary = []
for mid in common_members:
member_diff = list(diff(current_members[mid], proposed_members[mid], ignore={'last_edited_at', 'last_edited_by', 'created_at', 'created_by', 'stories'})) # Ignore stories here, handle separately
if member_diff:
changes = []
for change_type, path, values in member_diff:
field = path if isinstance(path, str) else path[0] # Get field name
if change_type == 'change': changes.append(f"{field}: '{values[0]}' -> '{values[1]}'")
elif change_type == 'add': changes.append(f"{field}: added '{values}'") # Should be rare at top level
elif change_type == 'remove': changes.append(f"{field}: removed '{values}'") # Should be rare at top level
if changes:
modified_member_summary.append(f"- **{current_members[mid].get('name', 'Unnamed')}**: {'; '.join(changes)}")
if modified_member_summary:
summary.append("**Members Modified:**")
summary.extend(modified_member_summary)
# Process Relationships (Simplified comparison)
current_rels = {(r.get('from_id'), r.get('to_id'), r.get('type')) for r in current_data.get('relationships', []) if isinstance(r, dict)}
proposed_rels = {(r.get('from_id'), r.get('to_id'), r.get('type')) for r in proposed_data.get('relationships', []) if isinstance(r, dict)}
added_rels = proposed_rels - current_rels
deleted_rels = current_rels - proposed_rels
id_to_name_current = {m['id']: m.get('name', 'Unknown') for m in current_data.get('family_members', [])}
id_to_name_proposed = {m['id']: m.get('name', 'Unknown') for m in proposed_data.get('family_members', [])}
def format_rel(rel_tuple, names):
f_id, t_id, r_type = rel_tuple
f_name = names.get(f_id, f"...{f_id[-6:]}")
t_name = names.get(t_id, f"...{t_id[-6:]}")
return f"{f_name} {r_type} {t_name}"
if added_rels:
summary.append("**Relationships Added:**")
for rel in added_rels: summary.append(f"- {format_rel(rel, id_to_name_proposed)}")
if deleted_rels:
summary.append("**Relationships Deleted:**")
for rel in deleted_rels: summary.append(f"- {format_rel(rel, id_to_name_current)}")
# Process Stories (Simplified: check if stories list changed for any common member)
story_changes = []
for mid in common_members:
current_stories = current_members[mid].get('stories', [])
proposed_stories = proposed_members[mid].get('stories', [])
# Simple check: compare lengths or string representations (not perfect but gives an idea)
if json.dumps(current_stories, sort_keys=True) != json.dumps(proposed_stories, sort_keys=True):
story_changes.append(f"- Stories for **{current_members[mid].get('name', 'Unnamed')}**")
if story_changes:
summary.append("**Stories Modified:**")
summary.extend(story_changes)
# Check top-level profile/settings/metadata changes (optional)
# ... add checks for profile, settings, metadata if needed ...
except Exception as e:
summary.append(f"Error generating diff: {e}")
return "\n".join(summary) if summary else "No significant changes detected."
# --- find_person_by_id, find_person_by_name, generate_unique_id (remain same) ---
def find_person_by_id(data, person_id):
for person in data.get("family_members", []):
if isinstance(person, dict) and person.get("id") == person_id: return person
return None
def find_person_by_name(data, name):
if not name: return None
normalized_name = name.strip().lower()
matching_members = [
person for person in data.get("family_members", [])
if isinstance(person, dict) and person.get("name", "").strip().lower() == normalized_name
]
# Return list of matches for name detection, or first match if only one expected
return matching_members if matching_members else []
def generate_unique_id(data=None): return uuid.uuid4().hex
# --- Graphviz, AI Functions (remain same logic) ---
# (generate_graphviz, format_node_label, get_father_id, update_totems)
# (safe_json_loads, call_gemini, generate_tree_from_description_gemini, generate_quiz_questions_gemini)
# --- [Include the full code for these functions here - they are unchanged from the previous version] ---
def format_node_label(person):
label = f"{person.get('name', 'Unknown')}"
details = []
if person.get('dob'): details.append(f"b. {person.get('dob')}")
if person.get('dod'): details.append(f"d. {person.get('dod')}")
if details: label += f"\n({' / '.join(details)})"
if person.get('totem'): label += f"\nTotem: {person.get('totem')}"
return label
def get_father_id(person_id, relationships, members):
id_to_person = {p['id']: p for p in members if isinstance(p, dict)}
for rel in relationships:
if isinstance(rel, dict) and rel.get('type') == 'parent' and rel.get('to_id') == person_id:
parent_id = rel.get('from_id')
parent = id_to_person.get(parent_id)
if parent and parent.get('gender', '').lower() == 'male':
return parent_id
return None
def update_totems(user_data):
members = user_data.get("family_members", [])
relationships = user_data.get("relationships", [])
if not members or not relationships: return
id_to_person = {p['id']: p for p in members if isinstance(p, dict)}
visited = set(); processed_in_queue = set()
all_child_ids = {r['to_id'] for r in relationships if isinstance(r, dict) and r.get('type') == 'parent'}
# Start BFS from root nodes (those with no parents in this tree)
queue = [p['id'] for p in members if isinstance(p, dict) and p['id'] not in all_child_ids]
# Add remaining nodes in case of cycles or disconnected components
queue.extend([p['id'] for p in members if isinstance(p, dict) and p['id'] not in queue])
processed_count = 0 # Safety break for potential infinite loops
max_process = len(members) * 2 + 5
while queue and processed_count < max_process:
person_id = queue.pop(0)
processed_count += 1
if person_id in visited: continue # Already fully processed
# processed_in_queue is not strictly needed with the visited check
person = id_to_person.get(person_id)
if not person: continue
totem_updated = False
current_totem = person.get('totem')
# Only inherit if totem is explicitly empty or None
if current_totem is None or current_totem == "":
father_id = get_father_id(person_id, relationships, members)
if father_id:
father = id_to_person.get(father_id)
# Check if father has a totem AND if father has been processed or is a root
if father and father.get('totem') and (father_id in visited or father_id not in all_child_ids):
person['totem'] = father.get('totem')
totem_updated = True
# print(f"DEBUG: Inherited totem '{person['totem']}' for {person['name']} from father {father['name']}") # Debug
# Mark as visited *after* processing potential inheritance based on parents
visited.add(person_id)
# Add children to queue if not already visited
for rel in relationships:
if isinstance(rel, dict) and rel.get('type') == 'parent' and rel.get('from_id') == person_id:
child_id = rel.get('to_id')
if child_id and child_id not in visited and child_id not in queue:
queue.append(child_id)
# If totem was updated, re-add children to potentially update their totems later
if totem_updated:
for rel in relationships:
if isinstance(rel, dict) and rel.get('type') == 'parent' and rel.get('from_id') == person_id:
child_id = rel.get('to_id')
if child_id and child_id in visited: # If child was already visited, needs re-check
visited.remove(child_id)
if child_id not in queue: queue.append(child_id) # Re-add to queue
elif child_id and child_id not in queue: # If not visited and not in queue, add it
queue.append(child_id)
if processed_count >= max_process:
print("WARN: Totem update BFS reached max iterations, potential loop?")
def generate_graphviz(data):
dot = graphviz.Digraph(comment='Family Tree', format='svg')
dot.attr(rankdir='TB', splines='ortho', nodesep='0.6', ranksep='0.8')
theme = data.get("settings", {}).get("theme", "Default")
node_style = {'shape': 'box', 'style': 'filled', 'margin': '0.1'}
edge_color, marriage_node_color = 'gray50', 'gray50'
if theme == "Dark":
dot.attr(bgcolor='black', fontcolor='white'); node_style.update(fillcolor='grey30', fontcolor='white', color='white'); edge_color, marriage_node_color = 'white', 'white'
else: dot.attr(bgcolor='transparent'); node_style.update(fillcolor='lightblue', fontcolor='black', color='black')
members = data.get("family_members", []); relationships = data.get("relationships", [])
if not members: return dot
member_ids = {m['id'] for m in members if isinstance(m, dict)}; id_to_person = {p['id']: p for p in members if isinstance(p, dict)}
marriage_nodes = {}; parents_of_child = defaultdict(list); children_by_parent_pair = defaultdict(list)
# Add nodes first
for person in members:
if isinstance(person, dict): dot.node(person['id'], label=format_node_label(person), **node_style)
processed_spouses = set()
# Process relationships to build structures
for rel in relationships:
if not isinstance(rel, dict): continue
from_id, to_id, rel_type = rel.get("from_id"), rel.get("to_id"), rel.get("type")
if from_id not in member_ids or to_id not in member_ids: continue # Skip dangling relationships
if rel_type == 'parent': parents_of_child[to_id].append(from_id)
elif rel_type == 'spouse':
pair = frozenset([from_id, to_id])
if pair not in processed_spouses:
m_id = f"m_{uuid.uuid4().hex[:8]}"; marriage_nodes[pair] = m_id # Use UUID for marriage node ID
dot.node(m_id, shape='point', width='0.1', height='0.1', label='', color=marriage_node_color)
# Link spouses to marriage node invisibly for ranking
dot.edge(from_id, m_id, style='invis', dir='none', weight='10')
dot.edge(to_id, m_id, style='invis', dir='none', weight='10')
# Ensure spouses are ranked together
with dot.subgraph(name=f"cluster_m_{m_id}") as sub:
sub.attr(rank='same', style='invis')
sub.node(from_id); sub.node(to_id) # Add spouses to subgraph
processed_spouses.add(pair)
# Group children by parent set
for child_id, parent_ids_list in parents_of_child.items():
valid_p_ids = sorted([p_id for p_id in parent_ids_list if p_id in member_ids]);
if not valid_p_ids: continue
parent_key = frozenset(valid_p_ids); children_by_parent_pair[parent_key].append(child_id)
# Add edges from parents/marriage node to children
processed_child_links = set()
for parent_key, children_list in children_by_parent_pair.items():
p_ids = list(parent_key); source_id = None
if len(p_ids) == 1: source_id = p_ids[0] # Single parent
elif len(p_ids) > 1: source_id = marriage_nodes.get(parent_key) # Find marriage node for couple
if source_id:
for child_id in children_list:
if child_id in member_ids and (source_id, child_id) not in processed_child_links:
dot.edge(source_id, child_id, arrowhead='none', color=edge_color); processed_child_links.add((source_id, child_id))
elif len(p_ids) > 1 and not source_id: # Fallback if marriage node wasn't created (shouldn't happen with spouse logic)
print(f"WARN: Missing marriage node for parents {p_ids}, linking child {children_list} from first parent.")
source_id = p_ids[0]
for child_id in children_list:
if child_id in member_ids and (source_id, child_id) not in processed_child_links:
dot.edge(source_id, child_id, arrowhead='none', color=edge_color); processed_child_links.add((source_id, child_id))
# Rank siblings together
for parent_key, siblings in children_by_parent_pair.items():
valid_sibs = [sid for sid in siblings if sid in member_ids]
if len(valid_sibs) > 1:
with dot.subgraph(name=f"cluster_s_{uuid.uuid4().hex[:8]}") as sub: # Unique cluster name
sub.attr(rank='same', style='invis')
for sid in sorted(valid_sibs): sub.node(sid) # Add nodes to subgraph
return dot
def safe_json_loads(text):
# Attempts to extract and parse JSON from potentially messy AI responses
# Find ```json ... ``` blocks
match = re.search(r"```(?:json)?\s*(\{.*\}|\[.*\])\s*```", text, re.DOTALL | re.IGNORECASE)
if match:
json_text = match.group(1)
else:
# Fallback: Find first '{' or '[' and last '}' or ']'
start_brace = text.find('{')
start_bracket = text.find('[')
end_brace = text.rfind('}')
end_bracket = text.rfind(']')
start = -1
if start_brace != -1 and start_bracket != -1:
start = min(start_brace, start_bracket)
elif start_brace != -1:
start = start_brace
else:
start = start_bracket # Might be -1 if neither found
end = -1
# Ensure end matches start type if possible
if start == start_brace and end_brace != -1:
end = end_brace
elif start == start_bracket and end_bracket != -1:
end = end_bracket
# Fallback if start/end types mismatch or only one end type exists
elif end_brace != -1 and end_bracket != -1:
end = max(end_brace, end_bracket)
elif end_brace != -1:
end = end_brace
elif end_bracket != -1:
end = end_bracket
if start != -1 and end != -1 and end > start:
json_text = text[start:end+1]
else:
# Final fallback: assume the whole text is JSON (strip whitespace)
json_text = text.strip()
# Basic check if it looks like JSON
if not ((json_text.startswith('{') and json_text.endswith('}')) or \
(json_text.startswith('[') and json_text.endswith(']'))):
st.error(f"🔴 Could not find JSON structure. Response:\n```\n{text[:500]}...\n```")
return None
# Clean trailing commas before closing brackets/braces
json_text = re.sub(r",\s*(\]|\})", r"\1", json_text)
try:
return json.loads(json_text)
except json.JSONDecodeError as e:
st.error(f"🔴 Error parsing JSON: {e}\nAttempted JSON Text:\n```\n{json_text[:500]}...\n```")
return None
except Exception as e: # Catch other potential errors
st.error(f"🔴 Unexpected error during JSON parsing: {e}\nResponse Text:\n```\n{text[:500]}...\n```")
return None
def call_gemini(prompt_text, is_json_output_expected=True):
if not genai_client or api_key_error:
st.error("🔴 Gemini API not ready.")
return None
try:
model = genai.GenerativeModel(GEMINI_MODEL_NAME)
# Define safety settings to block harmful content
safety_settings = [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
]
# Configure for JSON output if expected
gen_config = genai.types.GenerationConfig(
# candidate_count=1, # Default is 1
response_mime_type="application/json"
) if is_json_output_expected else None
response = model.generate_content(
prompt_text,
generation_config=gen_config,
safety_settings=safety_settings
)
# Check for empty or blocked response
if not response.parts:
reason = "Unknown"
try:
# Attempt to get block reason
reason = response.prompt_feedback.block_reason.name if hasattr(response, 'prompt_feedback') and response.prompt_feedback.block_reason else None
if not reason and response.candidates:
reason = response.candidates[0].finish_reason.name # e.g., SAFETY, RECITATION
except Exception:
pass # Ignore errors trying to get the reason
st.warning(f"⚠️ AI response empty or blocked. Reason: {reason}")
# print(f"DEBUG: Blocked Response Details: {response}") # Optional detailed debug
return None
if not hasattr(response, 'text') or not response.text:
st.warning("⚠️ AI response text empty.")
return None
# Parse if JSON expected, otherwise return text
if is_json_output_expected:
# Use safe_json_loads for robust parsing
parsed_json = safe_json_loads(response.text)
if parsed_json is None:
st.error(f"🔴 AI response received, but failed to parse as JSON.\nRaw Response:\n```\n{response.text[:500]}...\n```")
return parsed_json
else:
return response.text
except Exception as e:
st.error(f"🔴 Gemini API Call Error: {e} (Type: {type(e).__name__})")
# print(f"DEBUG: Gemini Error Details: {e}") # Optional detailed debug
return None
def generate_tree_from_description_gemini(description):
prompt = f"""Please analyze the following family description. Extract individuals mentioned, including their name, and if available, their date of birth (dob), date of death (dod), gender (Male/Female/Other/Unknown), and totem. Also, identify direct relationships like parent_of, spouse_of, or sibling_of between these individuals.
Format the output as a single JSON object containing two keys:
1. "people": A list of objects, where each object represents an individual and has keys for "name", "dob", "dod", "gender", and "totem" (use null or omit if information is not present). Use the name "Me" if the description uses first-person references without providing a name.
2. "relationships": A list of objects, where each object represents a relationship. Each relationship object should have "person1_name", "person2_name", and "type" (e.g., "parent_of", "spouse_of", "sibling_of"). Ensure the names match exactly those extracted in the "people" list.
Strictly output only the JSON object, without any introductory text, explanations, or markdown formatting like ```json.
Description:
"{description}"
JSON Output:
"""
return call_gemini(prompt, is_json_output_expected=True)
def generate_quiz_questions_gemini(members, relationships, num_questions=3):
if not members or len(members) < 2: return None
id_to_name = {p["id"]: p.get("name", "Unknown") for p in members if isinstance(p, dict)}
rel_strings = []
processed_pairs = set() # To avoid duplicate spouse/sibling entries
for rel in relationships:
if not isinstance(rel, dict): continue
p1_id, p2_id, rt = rel.get("from_id"), rel.get("to_id"), rel.get("type")
p1n, p2n = id_to_name.get(p1_id), id_to_name.get(p2_id)
# Ensure both names exist and are different
if p1n and p2n and p1n != p2n:
pair = frozenset([p1_id, p2_id])
if rt == "parent":
rel_strings.append(f"'{p1n}' is a Parent of '{p2n}'")
elif rt == "spouse" and pair not in processed_pairs:
rel_strings.append(f"'{p1n}' is a Spouse of '{p2n}'")
processed_pairs.add(pair)
elif rt == "sibling" and pair not in processed_pairs:
# Siblings might be implicitly defined via parents, but add explicit ones too
rel_strings.append(f"'{p1n}' is a Sibling of '{p2n}'")
processed_pairs.add(pair)
if not rel_strings:
st.warning("No relationships found to base quiz questions on.")
return None
unique_names = set(id_to_name.values())
min_required_names = 4 # Need at least 4 unique names for good multiple choice
actual_num_questions = num_questions
# Adjust question count or warn if not enough unique names for options
if len(unique_names) < min_required_names:
if len(unique_names) >= 2:
actual_num_questions = max(1, len(unique_names) - 1) # Reduce questions if few names
st.info(f"Note: Reduced quiz questions to {actual_num_questions} due to fewer than {min_required_names} unique names.")
else:
st.warning("Need at least 2 unique names in the tree to generate a quiz.")
return None
prompt = f"""Based *only* on the provided list of family relationships and names, generate {actual_num_questions} unique multiple-choice quiz questions.
Each question should test knowledge about one specific relationship.
The output format must be a single JSON list, where each element is an object representing a question:
`{{"text": "Question text?", "options": ["Correct Answer", "Wrong Option 1", "Wrong Option 2", "Wrong Option 3"], "correct": "Correct Answer"}}`
- The "options" list must contain the correct answer and 3 incorrect answers chosen from the provided 'Names' list.
- All options within a single question must be unique names.
- If fewer than 4 unique names are available in the 'Names' list, use all available names as options, ensuring the correct answer is included.
- Do not invent relationships or information not present in the 'Relationships' list.
- Ensure the 'correct' value exactly matches one of the strings in the 'options' list.
- Strictly output only the JSON list, without any other text or markdown formatting.
Names:
{chr(10).join(f"- {name}" for name in sorted(list(unique_names)))}
Relationships:
{chr(10).join(f"- {rel}" for rel in rel_strings)}
JSON Output:
"""
return call_gemini(prompt, is_json_output_expected=True)
# --- End of unchanged functions ---
# --- Streamlit App UI ---
st.set_page_config(layout="wide", page_title="Family Tree Collab")
# --- State Management ---
# User/Login State
if 'logged_in' not in st.session_state: st.session_state.logged_in = False
if 'current_user_phone' not in st.session_state: st.session_state.current_user_phone = None
# Tree Data State
if 'primary_tree_data' not in st.session_state: st.session_state.primary_tree_data = None
if 'linked_trees_data' not in st.session_state: st.session_state.linked_trees_data = {} # Dict: {owner_phone: tree_data}
if 'active_tree_owner_phone' not in st.session_state: st.session_state.active_tree_owner_phone = None # Phone of the owner of the tree being viewed
# Collaboration State
if 'pending_changes_for_review' not in st.session_state: st.session_state.pending_changes_for_review = {} # Loaded pending changes for owner
if 'selected_proposal_to_review' not in st.session_state: st.session_state.selected_proposal_to_review = None # Key (proposer_phone) of proposal being reviewed
# UI/Feature State
if 'ai_quiz_questions' not in st.session_state: st.session_state.ai_quiz_questions = None
if 'current_quiz_question_index' not in st.session_state: st.session_state.current_quiz_question_index = -1
if 'quiz_score' not in st.session_state: st.session_state.quiz_score = 0
if 'quiz_answered' not in st.session_state: st.session_state.quiz_answered = False
if 'ai_description_result' not in st.session_state: st.session_state.ai_description_result = None
if 'ai_analyze_button_clicked' not in st.session_state: st.session_state.ai_analyze_button_clicked = False
# Add state to hold temporary edits for collaborators before proposing
if 'collaborator_edited_data' not in st.session_state: st.session_state.collaborator_edited_data = None
# State for Name Conflict Detection
if 'show_add_name_conflict_warning' not in st.session_state: st.session_state.show_add_name_conflict_warning = False
if 'add_conflicting_members' not in st.session_state: st.session_state.add_conflicting_members = []
if 'add_new_member_data_staged' not in st.session_state: st.session_state.add_new_member_data_staged = None # Temp storage for member data during conflict resolution
if 'confirm_add_different_person' not in st.session_state: st.session_state.confirm_add_different_person = False
if 'show_edit_name_conflict_warning' not in st.session_state: st.session_state.show_edit_name_conflict_warning = False
if 'edit_conflicting_members' not in st.session_state: st.session_state.edit_conflicting_members = []
if 'edit_member_data_staged' not in st.session_state: st.session_state.edit_member_data_staged = None # Temp storage for member data during conflict resolution
if 'confirm_edit_name_change' not in st.session_state: st.session_state.confirm_edit_name_change = False
if 'editing_member_id_conflict' not in st.session_state: st.session_state.editing_member_id_conflict = None # ID of member being edited during conflict
# --- Login/Register ---
if not st.session_state.logged_in:
st.header("🌴Welcome to the Misheck's Linked Family Tree App!")
st.markdown("*(Login with your phone number to see your tree and trees you are part of 🔥)*")
if firebase_error: st.error(f"🔴 Firebase Error: {firebase_error}")
else: st.success("✅ Firebase Database Connected")
if api_key_error: st.warning("⚠️ Gemini API Key Missing/Invalid.")
else: st.success("✅ Gemini AI Is Ready")
phone_input = st.text_input("Enter Phone Number (e.g., +263777777777):", key="login_phone")
login_button = st.button("Login / Register")
if login_button and phone_input:
normalized_phone = normalize_phone(phone_input)
if not normalized_phone:
st.error("Invalid phone format. Use international format starting with '+'. (e.g., +263777777777)")
else:
st.session_state.current_user_phone = normalized_phone
st.session_state.active_tree_owner_phone = normalized_phone # Default to viewing own tree
# Load primary tree
with st.spinner(f"Loading primary tree for {normalized_phone}..."):
primary_tree = load_tree_data(normalized_phone)
if primary_tree is None:
st.error("Failed to load or initialize primary tree data. Cannot log in.")
st.stop()
st.session_state.primary_tree_data = primary_tree
# If it was a new tree, save it immediately
# Check if 'Me' node exists and has creation info - indicates it might be new
me_node = find_person_by_id(primary_tree, "Me")
is_potentially_new = not me_node or not me_node.get("created_at")
if is_potentially_new and primary_tree["metadata"]["created_at"]:
print(f"Attempting to save initial tree structure for {normalized_phone}")
# Pass None as previous_data for initial save
if not save_tree_data(normalized_phone, primary_tree, None):
st.error("Failed to save initial tree structure. Cannot log in.")
st.stop()
else:
print(f"Saved initial tree structure for {normalized_phone}")
# Reload data after initial save to ensure consistency? Optional.
# st.session_state.primary_tree_data = load_tree_data(normalized_phone)
# Find and load linked trees
st.session_state.linked_trees_data = {}
with st.spinner(f"Checking for linked trees containing {normalized_phone}..."):
linked_owner_phones = find_linked_trees(normalized_phone)
if linked_owner_phones:
st.info(f"You appear in {len(linked_owner_phones)} other tree(s). Loading them...")
for owner_phone in linked_owner_phones.keys():
if owner_phone != normalized_phone: # Don't reload own tree
print(f"Loading linked tree owned by: {owner_phone}") # Debug
linked_tree = load_tree_data(owner_phone)
if linked_tree:
st.session_state.linked_trees_data[owner_phone] = linked_tree
else:
st.warning(f"Could not load linked tree owned by {owner_phone}.")
else:
print(f"No linked trees found for {normalized_phone}") # Debug
st.session_state.logged_in = True
# Reset other states
st.session_state.ai_quiz_questions = None; st.session_state.current_quiz_question_index = -1;
st.session_state.pending_changes_for_review = {}; st.session_state.selected_proposal_to_review = None;
st.session_state.collaborator_edited_data = None;
# Reset name conflict states
st.session_state.show_add_name_conflict_warning = False; st.session_state.add_conflicting_members = []; st.session_state.add_new_member_data_staged = None; st.session_state.confirm_add_different_person = False
st.session_state.show_edit_name_conflict_warning = False; st.session_state.edit_conflicting_members = []; st.session_state.edit_member_data_staged = None; st.session_state.confirm_edit_name_change = False; st.session_state.editing_member_id_conflict = None
st.rerun()
elif login_button: st.error("Phone number required.")
# --- Main App Interface (After Login) ---
else:
current_user = st.session_state.current_user_phone
primary_data = st.session_state.primary_tree_data
linked_data = st.session_state.linked_trees_data
# --- Sidebar ---
st.sidebar.header(f"Logged in: {current_user}")
profile_name_display = primary_data.get('profile', {}).get('name', '') if primary_data else '(No Profile)'
st.sidebar.markdown(f"**My Name:** {profile_name_display or '(Set in Profile)'}")
st.sidebar.divider()
# --- Tree Selector ---
tree_options = {}
if primary_data: # Ensure primary data loaded before adding to options
tree_options[current_user] = primary_data.get("metadata", {}).get("tree_name", "My Tree") + " (Yours)"
for owner_phone, tree_data in linked_data.items():
tree_name = tree_data.get("metadata", {}).get("tree_name", f"Tree by ...{owner_phone[-4:]}")
tree_options[owner_phone] = tree_name
# Ensure active tree is valid, default to primary if not or if no options exist
if not tree_options:
st.error("Error: No trees available to view.")
st.stop()
if st.session_state.active_tree_owner_phone not in tree_options:
st.session_state.active_tree_owner_phone = current_user # Default to own tree
active_owner_phone = st.sidebar.selectbox(
"Select Tree to View:",
options=list(tree_options.keys()),
format_func=lambda phone: tree_options[phone],
key="active_tree_selector",
index=list(tree_options.keys()).index(st.session_state.active_tree_owner_phone) # Set initial index
)
# Update state if selection changes
if active_owner_phone != st.session_state.active_tree_owner_phone:
st.session_state.active_tree_owner_phone = active_owner_phone
st.session_state.collaborator_edited_data = None # Clear collaborator edits when switching trees
# Clear name conflict states when switching trees
st.session_state.show_add_name_conflict_warning = False; st.session_state.add_conflicting_members = []; st.session_state.add_new_member_data_staged = None; st.session_state.confirm_add_different_person = False
st.session_state.show_edit_name_conflict_warning = False; st.session_state.edit_conflicting_members = []; st.session_state.edit_member_data_staged = None; st.session_state.confirm_edit_name_change = False; st.session_state.editing_member_id_conflict = None
st.rerun() # Rerun to load the correct tree's data into view
st.sidebar.divider()
# --- Get Active Tree Data ---
is_viewing_own_tree = (active_owner_phone == current_user)
active_tree_data_source = None # Where the data comes from (primary, linked, or collaborator edit cache)
if is_viewing_own_tree:
active_tree_data_source = primary_data
active_tree_name = tree_options.get(current_user, "My Tree")
elif active_owner_phone in linked_data:
# If collaborator has started editing, use their cached version
if st.session_state.collaborator_edited_data and st.session_state.collaborator_edited_data.get("owner_phone") == active_owner_phone:
active_tree_data_source = st.session_state.collaborator_edited_data["data"]
st.info("You have unsaved proposed changes for this tree.")
else:
# Otherwise, load the read-only version from linked_data
active_tree_data_source = linked_data[active_owner_phone]
active_tree_name = tree_options.get(active_owner_phone, f"Tree by ...{active_owner_phone[-4:]}")
else:
st.error("Error: Could not find data for the selected tree. Defaulting to 'My Tree'.")
active_tree_data_source = primary_data
active_tree_name = tree_options.get(current_user, "My Tree")
st.session_state.active_tree_owner_phone = current_user # Reset state
is_viewing_own_tree = True
# Make a deep copy to work with, especially important for collaborators
active_tree_data = deepcopy(active_tree_data_source) if active_tree_data_source else None
if active_tree_data is None:
st.error(f"Failed to load data for tree: {active_tree_name}. Please try reloading or contacting support.")
st.stop()
# Display whose tree is being viewed
if not is_viewing_own_tree:
st.info(f"Viewing: **{active_tree_name}** (Owned by ...{active_owner_phone[-4:]}). You can propose changes.")
else:
st.success(f"Viewing: **{active_tree_name}** (Your primary tree).")
# Sidebar Status & Logout
if firebase_error: st.sidebar.error(f"🔴 Firebase Error")
else: st.sidebar.success("✅ Firebase Connected")
if api_key_error: st.sidebar.error("⚠️ Gemini API Key Error")
else: st.sidebar.success("✅ Gemini AI Ready")
st.sidebar.divider()
if st.sidebar.button("Logout"):
# Clear session state
keys_to_clear = list(st.session_state.keys()) # Get all keys
for key in keys_to_clear:
del st.session_state[key]
st.rerun()
# --- Navigation ---
nav_options = ["🏠 My Profile", "🌳 View Family Tree"]
# Editing/Proposing allowed for own tree OR linked trees
nav_options.extend(["➕ Add/Edit Family", "💬 Family Stories"])
# Review changes only for own tree
if is_viewing_own_tree:
nav_options.append("🔄 Review Changes") # New page for owner
nav_options.extend(["📜 Family Timeline", "🧩 Family Quiz (AI)"]) # Always available
if is_viewing_own_tree: # AI builder and Settings only for own tree
# DO NOT CHANGE THIS SUBHEADER TITLE AS PER USER INSTRUCTION
nav_options.extend(["🤖 AI Tree Builder (Gemini)", "🎨 Settings"])
page = st.sidebar.radio("Navigate", nav_options, key="navigation")
st.sidebar.divider()
# --- Central Propose Button for Collaborators ---
if not is_viewing_own_tree:
st.sidebar.markdown("---")
st.sidebar.markdown("**Collaboration**")
if st.sidebar.button("Propose My Changes to Owner", key="propose_all_changes"):
# Check if there are actually changes stored in the collaborator cache
if st.session_state.collaborator_edited_data and st.session_state.collaborator_edited_data.get("owner_phone") == active_owner_phone:
proposed_data = st.session_state.collaborator_edited_data["data"]
if propose_changes(active_owner_phone, current_user, proposed_data):
# Clear the cache after successful proposal
st.session_state.collaborator_edited_data = None
# Clear name conflict states as changes are proposed
st.session_state.show_add_name_conflict_warning = False; st.session_state.add_conflicting_members = []; st.session_state.add_new_member_data_staged = None; st.session_state.confirm_add_different_person = False
st.session_state.show_edit_name_conflict_warning = False; st.session_state.edit_conflicting_members = []; st.session_state.edit_member_data_staged = None; st.session_state.confirm_edit_name_change = False; st.session_state.editing_member_id_conflict = None
st.rerun() # Rerun to show the read-only view again
# Error handled within propose_changes
else:
st.sidebar.warning("No changes made to propose.")
st.sidebar.markdown("*(Make edits in Add/Edit or Stories, then click above to submit)*")
st.sidebar.markdown("---")
st.header(f"{page} - {active_tree_name}") # Show page title and tree name
# --- Function to handle saving/proposing ---
def handle_save_or_propose(action_description="change"):
"""Saves data for owner or caches it for collaborator."""
if is_viewing_own_tree:
# Owner saves directly
previous_data = st.session_state.primary_tree_data # Get previous state for diff
if save_tree_data(current_user, active_tree_data, previous_data):
st.success(f"Data saved successfully: {action_description}")
st.session_state.primary_tree_data = deepcopy(active_tree_data) # Update state
return True
else:
st.error(f"Failed to save data: {action_description}")
# Optionally reload to revert UI changes?
# st.session_state.primary_tree_data = load_tree_data(current_user)
return False
else:
# Collaborator caches the change locally in session state
st.session_state.collaborator_edited_data = {
"owner_phone": active_owner_phone,
"data": deepcopy(active_tree_data) # Store the entire modified tree state
}
st.info(f"Change '{action_description}' staged. Click 'Propose My Changes to Owner' in the sidebar to submit.")
return True # Indicate the change was staged
# --- Page Implementations (Operate on `active_tree_data`) ---
# --- My Profile (Only available & editable for own tree) ---
if page == "🏠 My Profile":
if not is_viewing_own_tree:
st.warning("Profile editing is only available for your own tree ('My Tree').")
else:
# Ensure profile exists
active_tree_data.setdefault("profile", deepcopy(DEFAULT_PROFILE["profile"]))
profile = active_tree_data["profile"]
st.subheader("Your Information")
with st.form("profile_form"):
name = st.text_input("Name", value=profile.get("name", ""))
dob = st.text_input("DoB (YYYY-MM-DD)", value=profile.get("dob", ""))
gender_options = ["Male", "Female", "Other", "Unknown"]
try:
gender_index = gender_options.index(profile.get("gender", "Unknown"))
except ValueError:
gender_index = 3 # Default to Unknown if value is invalid
gender = st.selectbox("Gender", gender_options, index=gender_index)
submitted = st.form_submit_button("Update Profile")
if submitted:
# Add validation if needed (e.g., date format)
profile_updated = False
if profile.get("name") != name: profile["name"] = name; profile_updated = True
if profile.get("dob") != dob: profile["dob"] = dob; profile_updated = True
if profile.get("gender") != gender: profile["gender"] = gender; profile_updated = True
# Sync profile changes to the 'Me' node in family_members
me_node = find_person_by_id(active_tree_data, "Me")
me_node_updated = False
if me_node:
me_node_name_update = name if name else "Me" # Use entered name or default 'Me'
if me_node.get("name") != me_node_name_update: me_node["name"] = me_node_name_update; me_node_updated = True
if me_node.get("dob") != dob: me_node["dob"] = dob; me_node_updated = True
if me_node.get("gender") != gender: me_node["gender"] = gender; me_node_updated = True
if me_node_updated:
me_node["last_edited_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
me_node["last_edited_by"] = current_user
else:
# This case should ideally not happen if the 'Me' node is created on registration
st.warning("Could not find the 'Me' node in family members to sync profile information.")
if profile_updated or me_node_updated:
if handle_save_or_propose("Profile Update"):
st.rerun() # Rerun to reflect changes immediately
else:
st.info("No changes made to profile.")
# --- View Family Tree (Available for all trees) ---
elif page == "🌳 View Family Tree":
members = active_tree_data.get("family_members", [])
if not members:
st.info("This tree is empty. Go to 'Add/Edit Family' to add members.")
else:
st.subheader("Family Tree Visualization")
# --- FIX: Ensure no other st.write or print before graphviz ---
# Remove any potential print statements here that might have caused the NULL output
try:
# Generate the graph using the potentially modified active_tree_data
graph_dot = generate_graphviz(active_tree_data)
st.graphviz_chart(graph_dot.source) # Display the graph
# Download Button
try:
# Use graphviz library to render SVG directly
svg_data = graph_dot.pipe(format='svg')
st.download_button(
label="Download SVG",
data=svg_data,
file_name=f"family_tree_{active_owner_phone}.svg",
mime="image/svg+xml"
)
except Exception as e:
st.warning(f"Could not generate SVG for download: {e}")
except ImportError as ie:
st.error(f"Graphviz library error: {ie}. Make sure Graphviz is installed and in the system PATH if needed.")
except graphviz.backend.execute.ExecutableNotFound:
st.error("Graphviz executable not found. Please install Graphviz (https://graphviz.org/download/) and ensure it's in your system's PATH.")
except Exception as e:
st.error(f"An error occurred while rendering the family tree: {e}")
st.exception(e) # Show traceback for debugging
st.divider()
st.subheader("Family Members List")
col_widths = [3, 2, 2, 1, 2, 2] # Added Provenance
cols = st.columns(col_widths)
cols[0].markdown("**Name**"); cols[1].markdown("**DoB**"); cols[2].markdown("**DoD**")
cols[3].markdown("**Gender**"); cols[4].markdown("**Totem**"); cols[5].markdown("**Provenance**")
st.divider()
# Sort members alphabetically by name for consistent display
sorted_members = sorted(members, key=lambda x: x.get('name', 'zzzzzz').lower() if isinstance(x, dict) else 'zzzzzz')
for person in sorted_members:
if not isinstance(person, dict): continue # Skip invalid entries
cols = st.columns(col_widths)
cols[0].write(person.get('name', 'N/A'))
cols[1].write(person.get('dob', '-'))
cols[2].write(person.get('dod', '-'))
cols[3].write(person.get('gender', '-'))
cols[4].write(person.get('totem', '-'))
# Display provenance subtly
prov_info = []
if person.get('created_by'): prov_info.append(f"C: ...{person['created_by'][-4:]} ({person.get('created_at','')[0:10]})") # Show date only
if person.get('last_edited_by'): prov_info.append(f"E: ...{person['last_edited_by'][-4:]} ({person.get('last_edited_at','')[0:10]})")
cols[5].caption(" / ".join(prov_info) if prov_info else "-")
st.divider()
# --- Add/Edit Family (Available for own tree and linked trees) ---
elif page == "➕ Add/Edit Family":
# This page modifies active_tree_data. Changes are saved/proposed via handle_save_or_propose.
members = active_tree_data.setdefault("family_members", [])
relationships = active_tree_data.setdefault("relationships", [])
if not isinstance(relationships, list): relationships = [] # Ensure it's a list
member_options = {f"{p.get('name', 'Unnamed')} (ID: ...{p.get('id', '')[-6:]})": p.get('id') for p in members if isinstance(p, dict)}
id_to_name = {p['id']: p.get('name', 'Unknown') for p in members if isinstance(p, dict)}
tab1, tab2, tab3 = st.tabs(["Add New Member", "Edit Existing Member", "Define Relationships"])
with tab1: # Add New Member
st.subheader("Add a New Family Member")
# Check if a name conflict warning is active
if st.session_state.show_add_name_conflict_warning:
st.warning(f"A member named '{st.session_state.add_new_member_data_staged.get('name')}' already exists.")
st.write("Did you mean one of these existing members?")
for member in st.session_state.add_conflicting_members:
st.write(f"- **{member.get('name')}** (ID: ...{member.get('id')[-6:]})")
st.session_state.confirm_add_different_person = st.checkbox(
"This is a different person, add anyway.",
value=st.session_state.confirm_add_different_person,
key="confirm_add_different_person_checkbox"
)
if st.button("Confirm Add New Person", disabled=not st.session_state.confirm_add_different_person, key="confirm_add_button"):
# Proceed with adding the staged member data
if st.session_state.add_new_member_data_staged:
active_tree_data["family_members"].append(st.session_state.add_new_member_data_staged)
if handle_save_or_propose(f"Add member {st.session_state.add_new_member_data_staged.get('name')} (Confirmed Duplicate Name)"):
# Clear conflict state and staged data on success
st.session_state.show_add_name_conflict_warning = False
st.session_state.add_conflicting_members = []
st.session_state.add_new_member_data_staged = None
st.session_state.confirm_add_different_person = False
st.rerun()
else:
st.error("Error: Staged member data not found.")
# Clear conflict state
st.session_state.show_add_name_conflict_warning = False
st.session_state.add_conflicting_members = []
st.session_state.add_new_member_data_staged = None
st.session_state.confirm_add_different_person = False
# Always show the form, but clear inputs if conflict is resolved/handled
form_key = "add_member_form"
if not st.session_state.show_add_name_conflict_warning:
# Only clear on submit if no conflict was found, or if conflict was confirmed and handled
clear_on_submit_flag = True
else:
# Don't clear if conflict is currently being displayed/resolved
clear_on_submit_flag = False
with st.form(form_key, clear_on_submit=clear_on_submit_flag):
# Use session state values if conflict is active, otherwise use empty/default
initial_name = st.session_state.add_new_member_data_staged.get('name', '') if st.session_state.show_add_name_conflict_warning else ''
initial_dob = st.session_state.add_new_member_data_staged.get('dob', '') if st.session_state.show_add_name_conflict_warning else ''
initial_dod = st.session_state.add_new_member_data_staged.get('dod', '') if st.session_state.show_add_name_conflict_warning else ''
initial_gender = st.session_state.add_new_member_data_staged.get('gender', 'Unknown') if st.session_state.show_add_name_conflict_warning else 'Unknown'
initial_totem = st.session_state.add_new_member_data_staged.get('totem', '') if st.session_state.show_add_name_conflict_warning else ''
initial_phone = st.session_state.add_new_member_data_staged.get('phone', '') if st.session_state.show_add_name_conflict_warning else ''
new_name = st.text_input("Full Name*", value=initial_name, key="add_name_input")
new_dob = st.text_input("DoB (YYYY-MM-DD)", value=initial_dob, key="add_dob_input")
new_dod = st.text_input("DoD (YYYY-MM-DD)", value=initial_dod, key="add_dod_input")
gender_options = ["Male", "Female", "Other", "Unknown"]
try: gender_index = gender_options.index(initial_gender)
except ValueError: gender_index = 3
new_gender = st.selectbox("Gender", gender_options, index=gender_index, key="add_gender_select")
new_totem = st.text_input("Totem (Optional)", value=initial_totem, key="add_totem_input")
new_phone = st.text_input("Phone Number (Optional, e.g., +1...)", value=initial_phone, help="If added, this person might see this tree when they log in.", key="add_phone_input")
st.markdown("*(Photo upload not implemented)*")
submit_label = "Save Member" if is_viewing_own_tree else "Stage Member Addition"
submitted = st.form_submit_button(submit_label, disabled=st.session_state.show_add_name_conflict_warning) # Disable if conflict is active
if submitted:
# Reset conflict state on new submission attempt
st.session_state.show_add_name_conflict_warning = False
st.session_state.add_conflicting_members = []
st.session_state.add_new_member_data_staged = None
st.session_state.confirm_add_different_person = False
norm_new_phone = normalize_phone(new_phone)
if not new_name: st.error("Name required."); st.stop() # Stop execution on error
if new_phone and not norm_new_phone: st.error("Invalid phone format. Use international format starting with '+'."); st.stop() # Stop execution on error
# --- Name Conflict Detection ---
conflicting_members = find_person_by_name(active_tree_data, new_name)
if conflicting_members:
# Conflict found, stage data and show warning
st.session_state.show_add_name_conflict_warning = True
st.session_state.add_conflicting_members = conflicting_members
# Stage the data that was just entered
staged_data = deepcopy(DEFAULT_MEMBER_STRUCTURE)
staged_data.update({
"id": generate_unique_id(), # Generate ID now, but only use if confirmed
"name": new_name.strip(), "dob": new_dob.strip(),
"dod": new_dod.strip(), "gender": new_gender,
"phone": norm_new_phone or "",
"totem": new_totem.strip(),
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"created_by": current_user,
"last_edited_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"last_edited_by": current_user
})
st.session_state.add_new_member_data_staged = staged_data
st.rerun() # Rerun to show the warning UI
else:
# No conflict, proceed with adding
new_member_id = generate_unique_id()
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
new_member_data = deepcopy(DEFAULT_MEMBER_STRUCTURE)
new_member_data.update({
"id": new_member_id, "name": new_name.strip(), "dob": new_dob.strip(),
"dod": new_dod.strip(), "gender": new_gender,
"phone": norm_new_phone or "", # Store normalized or empty string
"totem": new_totem.strip(),
"created_at": current_time, "created_by": current_user, # Track who added it
"last_edited_at": current_time, "last_edited_by": current_user
})
# Modify the active_tree_data directly
active_tree_data["family_members"].append(new_member_data)
# Save (owner) or stage (collaborator)
if handle_save_or_propose(f"Add member {new_name}"):
# Don't rerun here, let the form clear and info message show
pass # Success/Info message handled by the function
with tab2: # Edit Existing Member
st.subheader("Edit an Existing Member")
current_members_edit = active_tree_data.get("family_members", [])
if not current_members_edit: st.info("No members exist yet.")
else:
member_options_edit = {f"{p.get('name', 'Unnamed')} (ID: ...{p.get('id', '')[-6:]})": p.get('id') for p in current_members_edit if isinstance(p, dict)}
# Sort options alphabetically for dropdown
sorted_options = [""] + sorted(list(member_options_edit.keys()))
selected_member_display = st.selectbox("Select Member to Edit", options=sorted_options, index=0, key="edit_select", placeholder="Choose a member...")
if selected_member_display:
member_id_to_edit = member_options_edit[selected_member_display]
# Find the index in the *current* active_tree_data list
member_index = next((i for i, p in enumerate(active_tree_data["family_members"]) if isinstance(p, dict) and p.get('id') == member_id_to_edit), -1)
if member_index != -1:
# Get a reference to the member data within active_tree_data
member_data = active_tree_data["family_members"][member_index]
# Check if a name conflict warning is active for THIS member
is_editing_conflict_member = (st.session_state.show_edit_name_conflict_warning and st.session_state.editing_member_id_conflict == member_id_to_edit)
if is_editing_conflict_member:
st.warning(f"Changing name to '{st.session_state.edit_member_data_staged.get('name')}' would match an existing member.")
st.write("Existing members with this name:")
for member in st.session_state.edit_conflicting_members:
st.write(f"- **{member.get('name')}** (ID: ...{member.get('id')[-6:]})")
st.session_state.confirm_edit_name_change = st.checkbox(
"Yes, change name anyway (these are different people).",
value=st.session_state.confirm_edit_name_change,
key=f"confirm_edit_name_change_checkbox_{member_id_to_edit}"
)
if st.button("Confirm Name Change", disabled=not st.session_state.confirm_edit_name_change, key=f"confirm_edit_name_button_{member_id_to_edit}"):
# Apply the staged name change and proceed with save/propose
if st.session_state.edit_member_data_staged:
# Find the member again in case list order changed (less likely in edit tab)
current_member_ref = find_person_by_id(active_tree_data, member_id_to_edit)
if current_member_ref:
current_member_ref["name"] = st.session_state.edit_member_data_staged.get("name").strip()
current_member_ref["last_edited_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
current_member_ref["last_edited_by"] = current_user
if handle_save_or_propose(f"Update member {current_member_ref.get('name')} (Confirmed Name Change)"):
# Clear conflict state and staged data on success
st.session_state.show_edit_name_conflict_warning = False
st.session_state.edit_conflicting_members = []
st.session_state.edit_member_data_staged = None
st.session_state.confirm_edit_name_change = False
st.session_state.editing_member_id_conflict = None
st.rerun()
# Error handled by handle_save_or_propose
else:
st.error("Error: Member not found during confirmation.")
# Clear conflict state
st.session_state.show_edit_name_conflict_warning = False
st.session_state.edit_conflicting_members = []
st.session_state.edit_member_data_staged = None
st.session_state.confirm_edit_name_change = False
st.session_state.editing_member_id_conflict = None
else:
st.error("Error: Staged edit data not found.")
# Clear conflict state
st.session_state.show_edit_name_conflict_warning = False
st.session_state.edit_conflicting_members = []
st.session_state.edit_member_data_staged = None
st.session_state.confirm_edit_name_change = False
st.session_state.editing_member_id_conflict = None
with st.form(f"edit_member_form_{member_id_to_edit}", clear_on_submit=False): # Don't clear form on submit if conflict is active
st.write(f"Editing: **{member_data.get('name')}** (ID: ...{member_id_to_edit[-6:]})")
# Display provenance
creator = member_data.get('created_by'); editor = member_data.get('last_edited_by')
prov_text = []
if creator: prov_text.append(f"Created by ...{creator[-4:]} ({member_data.get('created_at','')})")
if editor: prov_text.append(f"Last edit by ...{editor[-4:]} ({member_data.get('last_edited_at','')})")
if prov_text: st.caption(" | ".join(prov_text))
is_me_node = (member_id_to_edit == "Me" and is_viewing_own_tree) # Can only be 'Me' if owner is editing own tree
# Input fields - disable name/phone for 'Me' node
# Use staged value if conflict is active for this member, otherwise use current data
initial_edit_name = st.session_state.edit_member_data_staged.get('name', member_data.get("name", "")) if is_editing_conflict_member else member_data.get("name", "")
edit_name = st.text_input("Name", value=initial_edit_name, disabled=is_me_node or is_editing_conflict_member, key=f"edit_name_{member_id_to_edit}")
edit_dob = st.text_input("DoB", value=member_data.get("dob", ""), key=f"edit_dob_{member_id_to_edit}", disabled=is_editing_conflict_member)
edit_dod = st.text_input("DoD", value=member_data.get("dod", ""), key=f"edit_dod_{member_id_to_edit}", disabled=is_editing_conflict_member)
gender_options = ["Male", "Female", "Other", "Unknown"]
try: gender_index = gender_options.index(member_data.get("gender", "Unknown"))
except ValueError: gender_index = 3
edit_gender = st.selectbox("Gender", gender_options, index=gender_index, key=f"edit_gender_{member_id_to_edit}", disabled=is_editing_conflict_member)
edit_totem = st.text_input("Totem", value=member_data.get("totem", ""), key=f"edit_totem_{member_id_to_edit}", disabled=is_editing_conflict_member)
edit_phone = st.text_input("Phone", value=member_data.get("phone", ""), disabled=is_me_node or is_editing_conflict_member, help="Changing phone affects tree visibility for that user.", key=f"edit_phone_{member_id_to_edit}")
st.markdown("*(Photo management not implemented)*"); st.divider()
# --- Deletion ---
# Allow deletion only if NOT the 'Me' node
delete_member = False
if not is_me_node:
delete_member = st.checkbox(f"⚠️ Delete Member '{member_data.get('name')}'?", key=f"delete_{member_id_to_edit}", disabled=is_editing_conflict_member)
else:
st.markdown("*(Cannot delete the 'Me' user node)*")
submit_label = "Save Changes" if is_viewing_own_tree else "Stage Changes"
submitted = st.form_submit_button(submit_label, disabled=is_editing_conflict_member) # Disable if conflict is active
if submitted:
# Reset conflict state on new submission attempt for this member
st.session_state.show_edit_name_conflict_warning = False
st.session_state.edit_conflicting_members = []
st.session_state.edit_member_data_staged = None
st.session_state.confirm_edit_name_change = False
st.session_state.editing_member_id_conflict = None
norm_edit_phone = normalize_phone(edit_phone)
if edit_phone and not norm_edit_phone:
st.error("Invalid phone format. Use international format starting with '+'."); st.stop()
elif delete_member and not is_me_node:
# --- Deletion Logic ---
try:
deleted_name = active_tree_data["family_members"][member_index].get('name')
# Modify active_tree_data state
del active_tree_data["family_members"][member_index]
# Remove relationships involving the deleted member
active_tree_data["relationships"] = [
r for r in active_tree_data.get("relationships", [])
if isinstance(r,dict) and r.get("from_id") != member_id_to_edit and r.get("to_id") != member_id_to_edit
]
if handle_save_or_propose(f"Delete member {deleted_name}"):
st.rerun() # Rerun to refresh lists/view
except Exception as e:
st.error(f"Error during delete operation: {e}")
# Consider reloading data to revert state if save/propose failed implicitly
st.rerun()
else:
# --- Update Logic ---
name_changed = (not is_me_node and member_data.get("name", "").strip().lower() != edit_name.strip().lower())
phone_changed = (not is_me_node and normalize_phone(member_data.get("phone","")) != norm_edit_phone)
# --- Name Conflict Detection on Edit ---
if name_changed and edit_name:
# Check if the new name exists *among other members*
conflicting_members = [
p for p in active_tree_data["family_members"]
if isinstance(p,dict) and p['id'] != member_id_to_edit and p.get('name', '').strip().lower() == edit_name.strip().lower()
]
if conflicting_members:
# Conflict found, stage data and show warning
st.session_state.show_edit_name_conflict_warning = True
st.session_state.edit_conflicting_members = conflicting_members
st.session_state.editing_member_id_conflict = member_id_to_edit
# Stage the *potential* changes (only name is needed for conflict check)
st.session_state.edit_member_data_staged = {"name": edit_name.strip()}
st.rerun() # Rerun to show the warning UI
st.stop() # Stop processing this submission
# If no name conflict (or name didn't change), proceed with saving other fields
if not is_me_node and not edit_name: st.error("Name cannot be empty."); st.stop()
# Apply changes to the member_data reference within active_tree_data
something_changed = False
if name_changed: member_data["name"] = edit_name.strip(); something_changed = True
if member_data.get("dob") != edit_dob.strip(): member_data["dob"] = edit_dob.strip(); something_changed = True
if member_data.get("dod") != edit_dod.strip(): member_data["dod"] = edit_dod.strip(); something_changed = True
if member_data.get("gender") != edit_gender: member_data["gender"] = edit_gender; something_changed = True
if member_data.get("totem") != edit_totem.strip(): member_data["totem"] = edit_totem.strip(); something_changed = True
if phone_changed: member_data["phone"] = norm_edit_phone or ""; something_changed = True
if something_changed:
member_data["last_edited_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
member_data["last_edited_by"] = current_user
if handle_save_or_propose(f"Update member {member_data['name']}"):
st.rerun() # Refresh view
else:
st.info("No changes detected.")
elif selected_member_display:
# This might happen if the list changed between render and selection
st.error("Could not find the selected member data. The list might have refreshed. Please select again.")
st.rerun()
with tab3: # Define Relationships
st.subheader("Define Family Relationships")
current_members_rel = active_tree_data.get("family_members", [])
relationships = active_tree_data.setdefault("relationships", []) # Ensure list exists
member_options_rel = {f"{p.get('name', 'Unnamed')} (ID: ...{p.get('id', '')[-6:]})": p.get('id') for p in current_members_rel if isinstance(p, dict)}
if len(member_options_rel) < 2: st.info("Need at least two members in the tree to define relationships.")
else:
id_to_name_rel = {p['id']: p.get('name', 'Unknown') for p in current_members_rel if isinstance(p, dict)}
with st.form("add_relationship_form"):
member_list = sorted(list(member_options_rel.keys()))
col1, col2, col3 = st.columns([2, 1, 2])
with col1: p1_disp = st.selectbox("Person 1", member_list, key="rel_p1", index=None, placeholder="Choose...")
with col2: rel_type = st.selectbox("Relationship", ["Parent Of", "Spouse Of", "Sibling Of"], key="rel_type", index=None, placeholder="Choose...")
with col3: p2_disp = st.selectbox("Person 2", member_list, key="rel_p2", index=None, placeholder="Choose...")
submit_label = "Save Relationship" if is_viewing_own_tree else "Stage Relationship Addition"
submitted = st.form_submit_button(submit_label)
if submitted:
if not p1_disp or not p2_disp or not rel_type: st.error("Please select both people and the relationship type.")
else:
id1, id2 = member_options_rel.get(p1_disp), member_options_rel.get(p2_disp)
if not id1 or not id2: st.error("Internal error: Could not map selected names to IDs.")
elif id1 == id2: st.error("Cannot define a relationship between a person and themselves.")
else:
rel_map = {"Parent Of": 'parent', "Spouse Of": 'spouse', "Sibling Of": 'sibling'}
rti = rel_map.get(rel_type)
p1n, p2n = id_to_name_rel.get(id1, p1_disp), id_to_name_rel.get(id2, p2_disp)
# Check for existing relationship
exists = False
for rel in relationships:
if not isinstance(rel, dict): continue
rf, rt, rtype = rel.get('from_id'), rel.get('to_id'), rel.get('type')
# Parent check (directional)
if rti == 'parent' and rtype == 'parent' and rf == id1 and rt == id2: exists = True; break
# Spouse/Sibling check (undirectional)
if rti in ['spouse', 'sibling'] and rtype == rti and frozenset([rf, rt]) == frozenset([id1, id2]): exists = True; break
if exists:
st.warning(f"This relationship ({p1n} {rel_type} {p2n}) already exists.")
else:
# Ensure correct order for spouse/sibling (optional, for consistency)
# id1s, id2s = (id1, id2) if rti == 'parent' else tuple(sorted([id1, id2]))
# Keep original direction for parent, use selected for others
new_rel = {"from_id": id1, "to_id": id2, "type": rti}
# Add to active_tree_data
active_tree_data["relationships"].append(new_rel)
if handle_save_or_propose(f"Add relationship {p1n}-{p2n}"):
st.rerun() # Refresh view
st.divider()
st.write("**Existing Relationships**")
# Refresh relationships from potentially modified active_tree_data
relationships = active_tree_data.get("relationships", [])
if not relationships: st.write("No relationships defined yet.")
else:
indices_to_delete = []
displayed_undirected_pairs = set() # Track spouse/sibling pairs to avoid duplicates
for i, rel in enumerate(relationships):
if not isinstance(rel, dict): continue
from_id, to_id, rel_type = rel.get('from_id'), rel.get('to_id'), rel.get('type')
# Check if IDs exist in the current member list (robustness)
if from_id not in id_to_name_rel or to_id not in id_to_name_rel:
st.caption(f"- Relationship with missing member(s): {rel}")
continue
from_name, to_name = id_to_name_rel.get(from_id), id_to_name_rel.get(to_id)
rel_text, display_this = "", True
if rel_type == 'parent':
rel_text = f"'{from_name}' **Parent** of '{to_name}'"
elif rel_type in ['spouse', 'sibling']:
# Normalize pair for display check (undirected)
pair_key = (frozenset([from_id, to_id]), rel_type)
if pair_key in displayed_undirected_pairs:
display_this = False # Don't show the reverse pair
else:
displayed_undirected_pairs.add(pair_key)
# Display consistently (e.g., alphabetical)
n1, n2 = sorted([from_name, to_name])
type_disp = "Spouse" if rel_type == 'spouse' else "Sibling"
rel_text = f"'{n1}' **{type_disp}** of '{n2}'"
else:
rel_text = f"'{from_name}' **Unknown ({rel_type})** '{to_name}'"
if display_this and rel_text:
col1, col2 = st.columns([4, 1])
col1.markdown(f"- {rel_text}")
delete_label = "Delete" if is_viewing_own_tree else "Stage Deletion"
if col2.button(delete_label, key=f"del_rel_{i}_{from_id}_{to_id}"):
indices_to_delete.append(i)
if indices_to_delete:
indices_to_delete.sort(reverse=True) # Delete from end to avoid index shifts
deleted_rels_desc = []
for index in indices_to_delete:
try:
deleted_rel_info = relationships[index]
deleted_rels_desc.append(f"{id_to_name_rel.get(deleted_rel_info['from_id'])} {deleted_rel_info['type']} {id_to_name_rel.get(deleted_rel_info['to_id'])}")
del active_tree_data["relationships"][index]
except IndexError:
st.warning(f"Could not delete relationship at index {index} - list may have changed.")
if deleted_rels_desc:
if handle_save_or_propose(f"Delete relationships: {', '.join(deleted_rels_desc)}"):
st.rerun() # Refresh view
# --- Family Stories (Available for all trees, proposing changes for linked trees) ---
elif page == "💬 Family Stories":
st.subheader("Record and View Family Stories")
members = active_tree_data.get("family_members", [])
if not members: st.info("No members in this tree.")
else:
member_options_story = {f"{p.get('name', 'Unnamed')} (ID: ...{p.get('id', '')[-6:]})": p.get('id') for p in members if isinstance(p, dict)}
sorted_options = [""] + sorted(list(member_options_story.keys()))
selected_member_display = st.selectbox("Select Family Member", options=sorted_options, index=0, key="story_select", placeholder="Choose...")
if selected_member_display:
member_id = member_options_story[selected_member_display]
# Find index in the current active_tree_data
member_index = next((i for i, p in enumerate(active_tree_data["family_members"]) if isinstance(p,dict) and p.get('id') == member_id), -1)
if member_index != -1:
# Get reference to member data in active_tree_data
member_data = active_tree_data["family_members"][member_index]
st.markdown(f"### Stories about **{member_data.get('name')}**")
# Add Story Form
with st.form(f"story_form_{member_id}", clear_on_submit=True):
new_story_text = st.text_area("Add a new story:", key=f"story_text_{member_id}", height=100)
submit_label = "Save Story" if is_viewing_own_tree else "Stage Story Addition"
submitted = st.form_submit_button(submit_label)
if submitted and new_story_text.strip():
# Ensure 'stories' list exists
if "stories" not in member_data or not isinstance(member_data["stories"], list):
member_data["stories"] = []
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
new_story = deepcopy(DEFAULT_STORY_STRUCTURE)
new_story.update({"timestamp": timestamp, "text": new_story_text.strip(), "added_by": current_user})
# Modify active_tree_data
member_data["stories"].append(new_story)
if handle_save_or_propose(f"Add story for {member_data['name']}"):
# Don't rerun, let form clear
pass
elif submitted:
st.warning("Story text cannot be empty.")
st.divider(); st.markdown("**Saved Stories:**")
# Read stories from potentially modified active_tree_data
stories = member_data.get("stories", [])
if not stories: st.write("No stories recorded yet.")
else:
indices_to_delete_story = []
# Display stories in reverse chronological order (newest first)
for i, story in enumerate(reversed(stories)):
if not isinstance(story, dict): continue
original_index = len(stories) - 1 - i # Index in the original list
added_info = f"Added: {story.get('timestamp', 'N/A')}"
if story.get('added_by'): added_info += f" by ...{story['added_by'][-4:]}"
st.markdown(f"**Story {len(stories)-i}** ({added_info})")
st.markdown(f"> {story.get('text', '')}")
# Delete button
delete_label = "Delete Story" if is_viewing_own_tree else "Stage Story Deletion"
if st.button(delete_label, key=f"del_story_{member_id}_{original_index}"):
indices_to_delete_story.append(original_index)
st.markdown("---")
if indices_to_delete_story:
indices_to_delete_story.sort(reverse=True) # Delete from end
deleted_stories_desc = []
for index in indices_to_delete_story:
try:
deleted_story_text = member_data["stories"][index].get('text', 'story')[:30] + "..."
deleted_stories_desc.append(f"'{deleted_story_text}'")
del member_data["stories"][index] # Modify active_tree_data
except IndexError:
st.warning(f"Could not delete story at index {index} - list may have changed.")
if deleted_stories_desc:
if handle_save_or_propose(f"Delete stories for {member_data['name']}: {', '.join(deleted_stories_desc)}"):
st.rerun() # Refresh view
elif selected_member_display:
st.error("Could not find member data. Refresh?")
st.rerun()
# --- Review Changes (Owner Only) ---
elif page == "🔄 Review Changes":
if not is_viewing_own_tree:
st.error("Access denied. This page is only for the tree owner.")
st.stop()
st.subheader("Review Proposed Changes from Collaborators")
# Load pending changes
if st.button("Refresh Pending Changes"):
with st.spinner("Loading proposals..."):
st.session_state.pending_changes_for_review = load_pending_changes(current_user)
st.session_state.selected_proposal_to_review = None # Reset selection
st.rerun()
# Load initially if not already loaded
if not st.session_state.pending_changes_for_review:
st.session_state.pending_changes_for_review = load_pending_changes(current_user)
pending_proposals = st.session_state.pending_changes_for_review
if not pending_proposals:
st.info("No pending changes to review at this time.")
else:
st.write(f"Found {len(pending_proposals)} proposal(s):")
proposer_phones = list(pending_proposals.keys())
selected_proposer = st.selectbox(
"Select proposal to review:",
options=[""] + proposer_phones,
format_func=lambda p: f"From ...{p[-4:]} ({pending_proposals[p].get('proposed_at', 'N/A')})" if p else "Choose...",
key="review_selection"
)
if selected_proposer:
st.session_state.selected_proposal_to_review = selected_proposer
proposal_data = pending_proposals[selected_proposer].get("tree_data")
if not proposal_data:
st.error("Selected proposal data is missing or invalid.")
else:
st.markdown("---")
st.markdown(f"#### Changes Proposed by ...{selected_proposer[-4:]}")
# Generate and display diff summary
st.markdown("**Summary of Changes:**")
current_owner_data = st.session_state.primary_tree_data # Use owner's current data
diff_summary = generate_diff_summary(current_owner_data, proposal_data)
st.text_area("Differences", value=diff_summary, height=250, disabled=True)
# Accept / Reject Buttons
col1, col2 = st.columns(2)
with col1:
if st.button("✅ Accept Changes", key=f"accept_{selected_proposer}"):
with st.spinner("Accepting and merging changes..."):
if accept_changes(current_user, selected_proposer):
# Reload primary data and clear pending state
st.session_state.primary_tree_data = load_tree_data(current_user)
st.session_state.pending_changes_for_review = load_pending_changes(current_user) # Refresh list
st.session_state.selected_proposal_to_review = None
st.rerun()
# Error handled within accept_changes
with col2:
if st.button("❌ Reject Changes", key=f"reject_{selected_proposer}"):
with st.spinner("Rejecting changes..."):
if reject_changes(current_user, selected_proposer):
# Clear pending state
st.session_state.pending_changes_for_review = load_pending_changes(current_user) # Refresh list
st.session_state.selected_proposal_to_review = None
st.rerun()
# Error handled within reject_changes
st.markdown("---")
# --- Family Timeline (Available for all trees) ---
elif page == "📜 Family Timeline":
# (Operates on active_tree_data - code remains same logic)
st.subheader("Significant Family Events")
members = active_tree_data.get("family_members", [])
events = []
valid_date_format = "%Y-%m-%d" # Assumes this format
for person in members:
if not isinstance(person, dict): continue
name = person.get("name", "Unknown")
dob_str, dod_str = person.get("dob"), person.get("dod")
try:
# Attempt to parse dates, skip if invalid format
if dob_str:
dob_dt = datetime.strptime(dob_str, valid_date_format)
events.append({"date": dob_dt, "type": "Birth", "desc": f"{name} born."})
if dod_str:
dod_dt = datetime.strptime(dod_str, valid_date_format)
events.append({"date": dod_dt, "type": "Death", "desc": f"{name} passed."})
except (ValueError, TypeError):
# Silently ignore invalid date formats for the timeline
pass
try:
# Sort events chronologically
events.sort(key=lambda x: x.get("date", datetime.min))
except Exception as e:
st.warning(f"Timeline sort error: {e}.")
if not events:
st.info("No valid Birth/Death dates found in YYYY-MM-DD format to display on the timeline.")
else:
st.write("Chronological Events:")
last_year = None
for event in events:
event_date = event.get("date")
if event_date:
current_year = event_date.year
# Group by year
if current_year != last_year:
if last_year is not None: st.divider() # Add divider between years
st.markdown(f"#### {current_year}"); last_year = current_year
# Display event
st.markdown(f"- **{event_date.strftime(valid_date_format)}**: {event.get('desc','')}")
# --- Family Quiz (AI) (Available for all trees) ---
elif page == "🧩 Family Quiz (AI)":
# (Operates on active_tree_data - code remains same logic)
st.subheader("Test Your Family Knowledge!")
if api_key_error: st.error("🔴 AI features disabled due to API key error."); st.stop()
if firebase_error: st.warning(f"🔴 Firebase connection error: {firebase_error}.")
members = active_tree_data.get("family_members", [])
relationships = active_tree_data.get("relationships", [])
# Quiz generation and display logic
if st.button("Generate New Quiz Questions", key="generate_quiz"):
# Reset quiz state
st.session_state.ai_quiz_questions = None; st.session_state.current_quiz_question_index = -1; st.session_state.quiz_score = 0; st.session_state.quiz_answered = False
if 'current_options_order' in st.session_state: del st.session_state['current_options_order']
# Clear previous answer choices
for k in list(st.session_state.keys()):
if k.startswith("quiz_q_") and k.endswith("_choice"): del st.session_state[k]
if len(members) < 2 or not relationships:
st.warning("Need at least 2 members and some defined relationships to generate a quiz.")
else:
with st.spinner("🧠 Generating quiz questions with AI..."):
questions = generate_quiz_questions_gemini(members, relationships, num_questions=5) # Pass active data
if questions and isinstance(questions, list) and len(questions) > 0:
# Basic validation of question structure
valid_qs = [q for q in questions if isinstance(q, dict) and all(k in q for k in ['text','options','correct']) and isinstance(q['options'],list) and len(q['options']) > 1 and q['correct'] in q['options']]
if valid_qs:
st.session_state.ai_quiz_questions = valid_qs
st.session_state.current_quiz_question_index = 0
st.success(f"Generated {len(valid_qs)} quiz questions!")
st.rerun()
else:
st.error("AI returned questions in an unexpected format. Please try again.")
print(f"DEBUG: Invalid AI Quiz Response: {questions}") # Debug log
elif questions is None:
# Error likely displayed by call_gemini
pass
else:
st.error("Failed to generate quiz questions or AI returned an empty list.")
# Display Quiz
questions = st.session_state.ai_quiz_questions
q_index = st.session_state.current_quiz_question_index
if questions and 0 <= q_index < len(questions):
q_data = questions[q_index]
st.info(f"Question {q_index + 1} of {len(questions)} (Current Score: {st.session_state.quiz_score})")
st.markdown(f"**{q_data.get('text', 'Error: Question text missing')}**")
options = q_data.get('options', [])
correct = q_data.get('correct')
if not options or not correct:
st.error("Error: Invalid question data encountered.")
else:
# Shuffle options per question view
opt_key = f'current_options_order_{q_index}' # Make key specific to question index
if opt_key not in st.session_state:
shuffled_options = list(options)
random.shuffle(shuffled_options)
st.session_state[opt_key] = shuffled_options
display_opts = st.session_state[opt_key]
# Radio button for answer selection
choice_key = f"quiz_q_{q_index}_choice"
radio_key = f"quiz_q_{q_index}_radio"
# Function to update choice state on radio change
def update_choice():
st.session_state[choice_key] = st.session_state[radio_key]
# Get current selection or None
current_selection = st.session_state.get(choice_key)
try:
current_index = display_opts.index(current_selection) if current_selection in display_opts else None
except ValueError:
current_index = None # Handle case where saved choice is no longer valid (shouldn't happen often)
user_choice = st.radio(
"Select your answer:",
options=display_opts,
key=radio_key,
index=current_index,
disabled=st.session_state.quiz_answered,
on_change=update_choice # Update state variable when radio changes
)
# Submit Button
submit_button = st.button("Submit Answer", key=f"submit_{q_index}", disabled=st.session_state.quiz_answered or st.session_state.get(choice_key) is None)
if submit_button and st.session_state.get(choice_key):
st.session_state.quiz_answered = True
if st.session_state[choice_key] == correct:
st.session_state.quiz_score += 1
st.rerun() # Rerun to show feedback and Next button
# Feedback and Next Button
if st.session_state.quiz_answered:
last_choice = st.session_state.get(choice_key)
if last_choice == correct:
st.success(f"Correct! 🎉 The answer is **{correct}**.")
else:
st.error(f"Incorrect. The correct answer was **{correct}**.")
# Check if it's the last question
if q_index < len(questions) - 1:
if st.button("Next Question →", key=f"next_{q_index}"):
st.session_state.current_quiz_question_index += 1
st.session_state.quiz_answered = False # Reset answered flag for next question
# Don't need to clear choice state here as new keys are used per question
st.rerun()
else:
# Last question answered
st.balloons()
st.success(f"Quiz Finished! Your final score: {st.session_state.quiz_score} out of {len(questions)}")
if st.button("Start New Quiz", key="restart_quiz"):
# Reset all quiz state variables
st.session_state.ai_quiz_questions = None
st.session_state.current_quiz_question_index = -1
st.session_state.quiz_score = 0
st.session_state.quiz_answered = False
# Clear options order and choices from session state
keys_to_delete = [k for k in st.session_state if k.startswith('current_options_order_') or (k.startswith('quiz_q_') and k.endswith('_choice'))]
for key in keys_to_delete:
del st.session_state[key]
st.rerun()
elif questions and q_index >= len(questions): # Should be handled by the logic above, but as a fallback
st.success(f"Quiz Finished! Final Score: {st.session_state.quiz_score}/{len(questions)}")
if st.button("Start New Quiz", key="restart_quiz_end"):
# Reset all quiz state variables (same as above)
st.session_state.ai_quiz_questions = None; st.session_state.current_quiz_question_index = -1; st.session_state.quiz_score = 0; st.session_state.quiz_answered = False
keys_to_delete = [k for k in st.session_state if k.startswith('current_options_order_') or (k.startswith('quiz_q_') and k.endswith('_choice'))]
for key in keys_to_delete: del st.session_state[key]
st.rerun()
# --- AI Tree Builder (Only available & editable for own tree) ---
elif page == "🤖 AI Tree Builder (Gemini)":
if not is_viewing_own_tree:
st.warning("AI Tree Builder is only available for your own tree ('My Tree').")
else:
# (Operates on active_tree_data / primary_data - code remains same logic, uses new save function)
# DO NOT CHANGE THIS SUBHEADER TITLE AS PER USER INSTRUCTION
st.subheader("Build Tree from Text Description (Experimental)")
st.markdown("Describe family members and their relationships (e.g., 'John Smith is my father. His wife is Jane Doe. They have two children, me and my sister Sarah.')")
if api_key_error: st.error("🔴 AI features disabled due to API key error."); st.stop()
if firebase_error: st.warning(f"🔴 Firebase connection error: {firebase_error}.")
desc_input = st.text_area("Enter description:", height=150, key="ai_desc_input")
if st.button("Analyze Description", key="ai_analyze"):
st.session_state.ai_analyze_button_clicked = True; st.session_state['ai_description_result'] = None # Reset result
if desc_input.strip():
with st.spinner("🧠 AI is analyzing the description..."):
st.session_state['ai_description_result'] = generate_tree_from_description_gemini(desc_input)
# Don't rerun here, let the result display below
else:
st.warning("Please enter a description first.")
st.session_state.ai_analyze_button_clicked = False # Reset flag if input was empty
ai_result = st.session_state.get('ai_description_result')
analysis_attempted = st.session_state.get('ai_analyze_button_clicked', False)
if ai_result:
st.divider(); st.markdown("#### AI Analysis Result:")
if isinstance(ai_result, dict) and "people" in ai_result and "relationships" in ai_result:
ai_people = ai_result.get("people", [])
ai_rels = ai_result.get("relationships", [])
# Validate AI output structure before displaying/merging
valid_people = [p for p in ai_people if isinstance(p, dict) and p.get("name")]
valid_rel_structs = [r for r in ai_rels if isinstance(r, dict) and all(k in r for k in ["person1_name", "person2_name", "type"])]
ai_people_names = {p.get('name') for p in valid_people}
# Filter relationships to only include those between valid extracted people
valid_rels_final = [r for r in valid_rel_structs if r.get('person1_name') in ai_people_names and r.get('person2_name') in ai_people_names]
st.write("**People Extracted:**")
if valid_people:
for p in valid_people:
details = ", ".join(f'{k}: {v}' for k,v in p.items() if k != 'name' and v)
st.write(f"- **{p.get('name')}**" + (f" ({details})" if details else ""))
else: st.write("None found.")
st.write("**Relationships Extracted:**")
if valid_rels_final:
for r in valid_rels_final:
st.write(f"- **{r.get('person1_name')}** {r.get('type','').replace('_',' ')} **{r.get('person2_name')}**")
else: st.write("None found.")
st.divider()
if st.button("Merge AI Suggestions into Tree", key="ai_merge"):
if not valid_people and not valid_rels_final:
st.warning("No valid people or relationships extracted by AI to merge.")
else:
# Use handle_save_or_propose after modifying active_tree_data
# Keep track of changes for summary
added_p_count, added_r_count, updated_p_names, skipped_p_names, skipped_r_desc = 0, 0, [], [], []
id_map = {} # Map AI name to existing or new ID
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# --- Merge People ---
for person_ai in valid_people:
ai_name = person_ai.get("name")
if not ai_name: continue # Should be filtered by valid_people, but double check
# Use the updated find_person_by_name which returns a list
existing_people_with_name = find_person_by_name(active_tree_data, ai_name)
existing_person = existing_people_with_name[0] if existing_people_with_name else None # Take the first one if multiple exist (simplification for AI merge)
person_updated = False
if existing_person:
# Person exists, map name to existing ID
id_map[ai_name] = existing_person['id']
# Update existing person only if AI provides new info
# Example: Update DoB if missing and AI provides it
if not existing_person.get('dob') and person_ai.get('dob'):
existing_person['dob'] = person_ai.get('dob'); person_updated = True
if not existing_person.get('dod') and person_ai.get('dod'):
existing_person['dod'] = person_ai.get('dod'); person_updated = True
# Update gender only if current is Unknown and AI provides something else
if existing_person.get('gender', 'Unknown') == 'Unknown' and person_ai.get('gender') and person_ai.get('gender') != 'Unknown':
existing_person['gender'] = person_ai.get('gender'); person_updated = True
if not existing_person.get('totem') and person_ai.get('totem'):
existing_person['totem'] = person_ai.get('totem'); person_updated = True
# Add more update conditions as needed
if person_updated:
existing_person["last_edited_at"] = current_time
existing_person["last_edited_by"] = current_user # Mark AI merge as edit by current user
updated_p_names.append(ai_name)
else:
skipped_p_names.append(ai_name + " (exists, no new info)")
else:
# Person doesn't exist, add them
new_id = generate_unique_id()
id_map[ai_name] = new_id
new_data = deepcopy(DEFAULT_MEMBER_STRUCTURE)
# Populate with AI data, ensuring keys are valid
valid_keys = ['name', 'dob', 'dod', 'gender', 'totem']
new_data.update({k: v for k, v in person_ai.items() if k in valid_keys and v})
# Add mandatory fields and provenance
new_data.update({
"id": new_id,
"created_at": current_time,
"created_by": current_user, # Mark AI merge as creation by current user
"last_edited_at": current_time,
"last_edited_by": current_user
})
active_tree_data.setdefault("family_members", []).append(new_data)
added_p_count += 1
# --- Merge Relationships ---
current_rels_list = active_tree_data.setdefault("relationships", [])
# Create a set for quick lookup of existing relationships
existing_rel_set = set()
for r in current_rels_list:
if not isinstance(r, dict): continue
from_id, to_id, r_type = r.get('from_id'), r.get('to_id'), r.get('type')
if not from_id or not to_id or not r_type: continue
if r_type == 'parent':
existing_rel_set.add( (from_id, to_id, r_type) )
elif r_type in ['spouse', 'sibling']:
# Store undirected relationships consistently (e.g., sorted IDs)
existing_rel_set.add( (tuple(sorted((from_id, to_id))), r_type) )
rel_type_map = {"parent_of":'parent', "spouse_of":'spouse', "sibling_of":'sibling'}
for rel_ai in valid_rels_final:
p1n, p2n, rta = rel_ai.get("person1_name"), rel_ai.get("person2_name"), rel_ai.get("type")
id1, id2 = id_map.get(p1n), id_map.get(p2n) # Get IDs mapped from names
rti = rel_type_map.get(rta) # Map AI type to internal type
if id1 and id2 and rti:
# Check if relationship already exists
exists = False
rel_key = None
if rti == 'parent':
rel_key = (id1, id2, rti)
elif rti in ['spouse', 'sibling']:
rel_key = (tuple(sorted((id1, id2))), rti)
if rel_key and rel_key in existing_rel_set:
exists = True
if exists:
skipped_r_desc.append(f"{p1n} {rta.replace('_',' ')} {p2n} (exists)")
else:
# Add the new relationship
new_rel = {"from_id": id1, "to_id": id2, "type": rti}
current_rels_list.append(new_rel)
added_r_count += 1
# Add to set to prevent duplicates within this merge batch
if rel_key: existing_rel_set.add(rel_key)
else:
skipped_r_desc.append(f"{p1n} {rta.replace('_',' ')} {p2n} (missing person ID)")
# --- Save or Propose the merged data ---
if added_p_count > 0 or added_r_count > 0 or updated_p_names:
merge_summary = f"AI Merge: Added {added_p_count} people, {added_r_count} relationships. Updated {len(updated_p_names)} people."
if handle_save_or_propose(merge_summary):
# Display skipped info after successful save/stage
if skipped_p_names: st.info(f"Skipped {len(skipped_p_names)} people: {', '.join(skipped_p_names)}")
if skipped_r_desc: st.info(f"Skipped {len(skipped_r_desc)} relationships: {', '.join(skipped_r_desc)}")
# Clear AI results after merge
st.session_state['ai_description_result'] = None
st.session_state['ai_analyze_button_clicked'] = False
st.rerun()
else:
st.info("No new information found by AI to merge into the tree.")
if skipped_p_names: st.info(f"Skipped {len(skipped_p_names)} people: {', '.join(skipped_p_names)}")
if skipped_r_desc: st.info(f"Skipped {len(skipped_r_desc)} relationships: {', '.join(skipped_r_desc)}")
elif ai_result is None and analysis_attempted:
# Error message was likely shown by call_gemini or safe_json_loads
st.warning("AI analysis did not return a valid result. Please check the description or try again.")
elif analysis_attempted: # Handle cases where AI returned something unexpected
st.error("AI returned data in an unexpected format.")
st.write("Raw AI Response:")
st.code(str(ai_result))
# --- Settings (Only available & editable for own tree) ---
elif page == "🎨 Settings":
if not is_viewing_own_tree:
st.warning("Settings are only available for your own tree ('My Tree').")
else:
# Operates on active_tree_data / primary_data
st.subheader("Application Settings")
# Ensure settings and metadata dicts exist
settings = active_tree_data.setdefault("settings", deepcopy(DEFAULT_PROFILE["settings"]))
metadata = active_tree_data.setdefault("metadata", deepcopy(DEFAULT_PROFILE["metadata"]))
# Tree Name
new_tree_name = st.text_input("Tree Name", value=metadata.get("tree_name", "My Family Tree"))
# Theme
current_theme = settings.get("theme", "Default"); theme_opts = ["Default", "Dark"]
try: theme_idx = theme_opts.index(current_theme)
except ValueError: theme_idx = 0 # Default to 'Default' if invalid value stored
new_theme = st.selectbox("Display Theme", theme_opts, index=theme_idx)
# Privacy (Conceptual - No backend enforcement implemented)
st.markdown("---"); st.markdown("#### Privacy (Conceptual Only)")
current_privacy = settings.get("privacy", "Private"); privacy_opts = ["Private", "Collaborators Only", "Public (Not Recommended)"]
try: privacy_idx = privacy_opts.index(current_privacy)
except ValueError: privacy_idx = 0
new_privacy = st.selectbox("Tree Visibility (Conceptual)", privacy_opts, index=privacy_idx)
st.caption("Note: Privacy settings are currently for display only and are not enforced.")
# Collaborators (Conceptual - Replaced by phone index linking)
# st.markdown("#### Collaborators (Managed Automatically)")
# st.caption("Collaborators are automatically determined by who is included in the tree via their phone number.")
st.markdown("---")
save_label = "Save Settings" # Only owner can save settings
if st.button(save_label):
changed = False
if metadata.get("tree_name") != new_tree_name: metadata["tree_name"] = new_tree_name; changed = True
if settings.get("theme") != new_theme: settings["theme"] = new_theme; changed = True
if settings.get("privacy") != new_privacy: settings["privacy"] = new_privacy; changed = True
if changed:
# Add edit provenance to metadata? Optional for settings.
# metadata["last_edited_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# metadata["last_edited_by"] = current_user
if handle_save_or_propose("Settings Update"): # Should always save for owner
st.rerun()
else:
st.info("No changes made to settings.")