GitHub Actions
Sync from GitHub
1d32142
"""
FastAPI endpoints for Ollama chat and donor/volunteer recommendation system.
Endpoints:
- /chat: Chat with Ollama model using LangGraph with memory
- /donors/register: Register a donor and generate embedding
- /volunteers/register: Register a volunteer and generate embedding
- /donors/recommend: Find similar donors based on query
- /volunteers/recommend: Find similar volunteers based on query
- /forms/{id}: Get/Delete a stored form
- /forms/stats: Get form counts by type
"""
import os
import sys
import asyncio
from contextlib import asynccontextmanager
from typing import Optional, List, Dict, Any
# Add app directory to path for local module imports
APP_DIR = os.path.dirname(os.path.abspath(__file__))
if APP_DIR not in sys.path:
sys.path.insert(0, APP_DIR)
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
# Windows-specific fix for psycopg async compatibility
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
# Load .env file for local development
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
# Lazy imports for encoder/recommender (avoid import errors if deps missing)
encoder = None
vector_store = None
pool = None
gis_recommender = None
# ============================================================================
# Pydantic Models
# ============================================================================
class ChatResponse(BaseModel):
response: str
class DonorFormRequest(BaseModel):
"""Donor registration form."""
id: str = Field(..., description="Unique identifier for the donor")
name: str = Field(..., description="Donor name")
donor_type: str = Field(..., description="Type: individual, corporate, foundation")
country: str = Field(..., description="ASEAN country code (SG, MY, TH, VN, ID, PH, etc.)")
preferred_language: str = Field(..., description="Primary language code")
causes: List[str] = Field(default_factory=list, description="Interested causes")
donation_frequency: Optional[str] = Field(None, description="one-time, monthly, quarterly, annual")
amount_range: Optional[str] = Field(None, description="Preferred donation range")
bio: Optional[str] = Field(None, description="Donor background")
motivation: Optional[str] = Field(None, description="Why they want to donate")
class VolunteerFormRequest(BaseModel):
"""Volunteer registration form."""
id: str = Field(..., description="Unique identifier for the volunteer")
name: str = Field(..., description="Volunteer name")
volunteer_type: str = Field(..., description="Type: regular, event_based, skilled")
country: str = Field(..., description="ASEAN country code")
preferred_language: str = Field(..., description="Primary language code")
languages_spoken: List[str] = Field(default_factory=list, description="All languages spoken")
skills: List[str] = Field(default_factory=list, description="Professional/technical skills")
availability: str = Field(..., description="weekends, evenings, flexible, full_time")
causes: List[str] = Field(default_factory=list, description="Interested causes")
experience: Optional[str] = Field(None, description="Prior volunteer experience")
goals: Optional[str] = Field(None, description="What they hope to achieve")
class RecommendRequest(BaseModel):
"""Request for recommendations based on a query form."""
# Either provide a form_id to use existing embedding, or provide form data
form_id: Optional[str] = Field(None, description="Existing form ID to use as query")
# Or provide inline form data
country: Optional[str] = None
preferred_language: Optional[str] = None
causes: List[str] = Field(default_factory=list)
bio: Optional[str] = None
motivation: Optional[str] = None
# Search options
limit: int = Field(default=10, ge=1, le=50)
country_filter: Optional[str] = None
exclude_ids: List[str] = Field(default_factory=list)
class FormResponse(BaseModel):
"""Response for form operations."""
id: str
form_type: str
message: str
embedding_dimension: Optional[int] = None
class ClientProfileRequest(BaseModel):
"""Client profile with spatial and behavioral data."""
user_id: str
coordinates: List[float] = Field(
default=[1.3521, 103.8198], description="[lat, lng]"
)
planning_area: str = Field(default="central", description="Singapore planning area")
housing_type: str = Field(
default="hdb_4_room", description="Housing type for income proxy"
)
interests: List[str] = Field(default_factory=list)
causes: List[str] = Field(default_factory=list)
preferred_language: str = Field(default="en")
is_donor: bool = False
total_donated: float = 0.0
donation_count: int = 0
age_range: Optional[str] = None
class LookalikeRequest(BaseModel):
"""Request for lookalike client search."""
seed_causes: List[str] = Field(..., description="Causes to find lookalikes for")
seed_interests: List[str] = Field(default_factory=list)
planning_area_filter: Optional[str] = Field(
None, description="Geo-fence by planning area"
)
housing_type_filter: Optional[List[str]] = Field(
None, description="Filter by housing types"
)
limit: int = Field(default=50, ge=1, le=200)
min_score: float = Field(default=0.0, ge=0.0, le=1.0)
include_geojson: bool = Field(
default=True, description="Include GeoJSON for mapping"
)
class ScoredClientResponse(BaseModel):
"""Single scored client result."""
user_id: str
planning_area: str
housing_type: str
causes: List[str]
interests: List[str]
is_donor: bool
final_score: float
vector_similarity: float
spatial_proxy: float
proximity: float
coordinates: Optional[List[float]] = None # Reduced precision
class LookalikeResponse(BaseModel):
"""Response containing lookalike clients with optional GeoJSON."""
seed_causes: List[str]
total_found: int
tiers: Dict[str, List[ScoredClientResponse]]
geojson: Optional[Dict[str, Any]] = None
class SingpassMockData(BaseModel):
"""Mock Singpass data for autofill."""
name: str
nric_masked: str
email: str
mobile: str
registered_address: str
planning_area: str
organization_name: Optional[str] = None
organization_uen: Optional[str] = None
organization_type: Optional[str] = None
class RecommendationResult(BaseModel):
"""Single recommendation result."""
id: str
form_type: str
score: float
distance: float
form_data: Dict[str, Any]
class RecommendResponse(BaseModel):
"""Response containing recommendations."""
query_id: Optional[str]
results: List[RecommendationResult]
total_found: int
class StatsResponse(BaseModel):
"""Form statistics response."""
donor: int
volunteer: int
total: int
# ============================================================================
# Database & Encoder Setup
# ============================================================================
async def init_services():
"""Initialize encoder and database connection."""
global encoder, vector_store, pool, gis_recommender
try:
from encoders.sealion import SeaLionEncoder
from recommender.vector_store import DonorVectorStore
from recommender.gis_recommender import GISRecommender
from psycopg_pool import AsyncConnectionPool
# Initialize encoder (reads SEALION_ENDPOINT from env)
encoder = SeaLionEncoder()
# Build connection string from env vars
db_host = os.getenv("SUPABASE_DB_HOST")
db_port = os.getenv("SUPABASE_DB_PORT", "6543")
db_name = os.getenv("SUPABASE_DB_NAME", "postgres")
db_user = os.getenv("SUPABASE_DB_USER")
db_password = os.getenv("SUPABASE_DB_PASSWORD")
db_sslmode = os.getenv("SUPABASE_DB_SSLMODE", "require")
if db_host and db_user and db_password:
conn_string = (
f"postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}"
f"?sslmode={db_sslmode}"
)
pool = AsyncConnectionPool(
conninfo=conn_string,
max_size=10,
kwargs={"autocommit": True, "prepare_threshold": None},
)
await pool.open()
vector_store = DonorVectorStore(pool)
gis_recommender = GISRecommender(vector_store=vector_store, encoder=encoder)
print("[OK] Database connection pool initialized")
print("[OK] GIS Recommender initialized")
else:
print("[WARN] Database credentials not configured, vector store disabled")
print("[OK] SeaLion encoder initialized")
except Exception as e:
print(f"[WARN] Service initialization error: {e}")
print(" Some endpoints may not be available")
async def close_services():
"""Close database connections."""
global pool
if pool:
await pool.close()
print("[OK] Database connection pool closed")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifespan context manager for startup/shutdown."""
await init_services()
await init_langgraph()
yield
await close_services()
# ============================================================================
# FastAPI App
# ============================================================================
app = FastAPI(
title="Donor Recommendation API",
description="API for chat, donor/volunteer registration, and recommendations",
version="1.0.0",
lifespan=lifespan,
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ============================================================================
# LangGraph Chat Setup
# ============================================================================
# Global graph instance (initialized at startup)
langgraph_chat = None
async def init_langgraph():
"""Initialize LangGraph with memory."""
global langgraph_chat
try:
from graph.builder import build_graph_with_memory
graph, _, _ = await build_graph_with_memory()
langgraph_chat = graph
print("[OK] LangGraph chat with memory initialized")
except Exception as e:
import traceback
print(f"[WARN] LangGraph initialization error: {e}")
traceback.print_exc()
print(" /chat endpoint may not be available")
# ============================================================================
# Helper Functions
# ============================================================================
def donor_form_to_text(form: DonorFormRequest) -> str:
"""Convert donor form to encoding text."""
parts = [
f"Donor type: {form.donor_type}",
f"Country: {form.country}",
f"Preferred language: {form.preferred_language}",
]
if form.causes:
parts.append(f"Causes interested in: {', '.join(form.causes)}")
if form.donation_frequency:
parts.append(f"Donation frequency: {form.donation_frequency}")
if form.amount_range:
parts.append(f"Amount range: {form.amount_range}")
if form.bio:
parts.append(f"Bio: {form.bio}")
if form.motivation:
parts.append(f"Motivation: {form.motivation}")
return "\n".join(parts)
def volunteer_form_to_text(form: VolunteerFormRequest) -> str:
"""Convert volunteer form to encoding text."""
parts = [
f"Volunteer type: {form.volunteer_type}",
f"Country: {form.country}",
f"Preferred language: {form.preferred_language}",
]
if form.languages_spoken:
parts.append(f"Languages spoken: {', '.join(form.languages_spoken)}")
if form.skills:
parts.append(f"Skills: {', '.join(form.skills)}")
parts.append(f"Availability: {form.availability}")
if form.causes:
parts.append(f"Causes interested in: {', '.join(form.causes)}")
if form.experience:
parts.append(f"Experience: {form.experience}")
if form.goals:
parts.append(f"Goals: {form.goals}")
return "\n".join(parts)
def recommend_request_to_text(req: RecommendRequest) -> str:
"""Convert recommendation request to encoding text."""
parts = []
if req.country:
parts.append(f"Country: {req.country}")
if req.preferred_language:
parts.append(f"Preferred language: {req.preferred_language}")
if req.causes:
parts.append(f"Causes interested in: {', '.join(req.causes)}")
if req.bio:
parts.append(f"Bio: {req.bio}")
if req.motivation:
parts.append(f"Motivation: {req.motivation}")
return "\n".join(parts) if parts else "General query"
# ============================================================================
# Health Endpoints
# ============================================================================
@app.get("/")
def root():
"""Root endpoint with service status."""
return {
"status": "healthy",
"message": "Donor Recommendation API is running",
"services": {
"langgraph_chat": langgraph_chat is not None,
"encoder": encoder is not None,
"database": vector_store is not None,
}
}
@app.get("/health")
def health():
"""Health check endpoint."""
return {"status": "healthy"}
# ============================================================================
# Chat Endpoints
# ============================================================================
class ChatRequestWithMemory(BaseModel):
message: str
user_id: str = "default_user"
thread_id: str = "default_thread"
stream: bool = False
@app.post("/chat")
async def chat(request: ChatRequestWithMemory):
"""Chat with LangGraph-powered chatbot with memory."""
if not langgraph_chat:
raise HTTPException(
status_code=503,
detail="LangGraph chat not initialized. Check server logs."
)
config = {
"configurable": {
"thread_id": request.thread_id,
"user_id": request.user_id,
}
}
try:
if request.stream:
async def generate_stream():
async for chunk in langgraph_chat.astream(
{"messages": [{"role": "user", "content": request.message}]},
config,
stream_mode="values",
):
if chunk.get("messages"):
last_msg = chunk["messages"][-1]
if hasattr(last_msg, 'content') and last_msg.type == 'ai':
yield last_msg.content
return StreamingResponse(
generate_stream(),
media_type="text/event-stream"
)
else:
# Non-streaming: collect full response
response_content = ""
async for chunk in langgraph_chat.astream(
{"messages": [{"role": "user", "content": request.message}]},
config,
stream_mode="values",
):
if chunk.get("messages"):
last_msg = chunk["messages"][-1]
if hasattr(last_msg, 'content') and last_msg.type == 'ai':
response_content = last_msg.content
return ChatResponse(response=response_content)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# Agentic RAG Endpoints
# ============================================================================
# Global agentic RAG agent instance
agentic_rag_agent = None
class AgenticRAGRequest(BaseModel):
"""Request for Agentic RAG search."""
query: str = Field(..., description="Natural language query for donor/volunteer search")
max_iterations: int = Field(default=10, ge=1, le=20, description="Max tool call iterations")
class AgenticRAGResponse(BaseModel):
"""Response from Agentic RAG search."""
response: str
tool_calls: List[Dict[str, Any]]
message_count: int
async def init_agentic_rag():
"""Initialize the Agentic RAG agent."""
global agentic_rag_agent
if encoder is None or vector_store is None:
print("[WARN] Cannot initialize Agentic RAG: encoder or vector_store not available")
return
try:
from agents.agentic_rag import AgenticRAGAgent
from langchain_ollama import ChatOllama
# Create LLM for the agent
api_key = os.getenv('OLLAMA_API_KEY')
if api_key:
llm = ChatOllama(
model="gpt-oss:120b",
base_url="https://ollama.com",
client_kwargs={
"headers": {"Authorization": f"Bearer {api_key}"}
}
)
else:
llm = ChatOllama(model="gpt-oss:120b-cloud")
agentic_rag_agent = AgenticRAGAgent(llm, encoder, vector_store)
print("[OK] Agentic RAG agent initialized")
except Exception as e:
import traceback
print(f"[WARN] Agentic RAG initialization error: {e}")
traceback.print_exc()
@app.post("/rag/search", response_model=AgenticRAGResponse)
async def agentic_rag_search(request: AgenticRAGRequest):
"""
Agentic RAG search - the agent autonomously explores the vector store.
The agent will:
1. Analyze your query to understand what you're looking for
2. Explore available categories in the database
3. Perform semantic and/or filtered searches
4. Iteratively refine results if needed
5. Return detailed findings with reasoning
Example queries:
- "Find donors interested in education in Singapore"
- "Show me corporate donors who focus on environmental causes"
- "Find volunteers with tech skills available on weekends"
"""
global agentic_rag_agent
# Lazy initialization if not done yet
if agentic_rag_agent is None:
await init_agentic_rag()
if agentic_rag_agent is None:
raise HTTPException(
status_code=503,
detail="Agentic RAG not available. Ensure encoder and database are configured."
)
try:
result = await agentic_rag_agent.search(request.query)
return AgenticRAGResponse(
response=result["response"],
tool_calls=result["tool_calls"],
message_count=result["message_count"]
)
except Exception as e:
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
@app.get("/rag/tools")
async def list_rag_tools():
"""List available RAG tools and their descriptions."""
from tools.rag_tools import RAG_TOOLS
tools_info = []
for tool in RAG_TOOLS:
tools_info.append({
"name": tool.name,
"description": tool.description,
})
return {
"tools": tools_info,
"total": len(tools_info)
}
@app.get("/rag/categories")
async def get_rag_categories():
"""Get available categories in the vector store for filtering."""
if not vector_store:
raise HTTPException(status_code=503, detail="Database not connected")
from tools.rag_tools import list_available_categories, set_rag_dependencies
# Ensure dependencies are set
if encoder and vector_store:
set_rag_dependencies(encoder, vector_store)
try:
result = await list_available_categories.ainvoke({})
import json
return json.loads(result)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# Donor Endpoints
# ============================================================================
@app.post("/donors/register", response_model=FormResponse)
async def register_donor(form: DonorFormRequest):
"""Register a donor and generate embedding."""
if not encoder:
raise HTTPException(status_code=503, detail="Encoder not initialized")
if not vector_store:
raise HTTPException(status_code=503, detail="Database not connected")
try:
# Convert form to encoding text
text = donor_form_to_text(form)
# Generate embedding
embedding = await encoder.encode(text)
# Store in database
form_data = form.model_dump()
await vector_store.store_embedding(
form_id=form.id,
form_type="donor",
embedding=embedding,
form_data=form_data
)
return FormResponse(
id=form.id,
form_type="donor",
message="Donor registered successfully",
embedding_dimension=len(embedding)
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/donors/recommend", response_model=RecommendResponse)
async def recommend_donors(request: RecommendRequest):
"""Find similar donors based on query."""
if not encoder:
raise HTTPException(status_code=503, detail="Encoder not initialized")
if not vector_store:
raise HTTPException(status_code=503, detail="Database not connected")
try:
# Get query embedding
if request.form_id:
# Use existing form's embedding
existing = await vector_store.get_embedding(request.form_id)
if not existing:
raise HTTPException(status_code=404, detail=f"Form {request.form_id} not found")
# Re-encode for query (could also store raw embedding)
text = recommend_request_to_text(request)
query_embedding = await encoder.encode(text)
else:
# Generate new embedding from request data
text = recommend_request_to_text(request)
query_embedding = await encoder.encode(text)
# Find similar donors
results = await vector_store.find_similar(
query_embedding=query_embedding,
form_type="donor",
limit=request.limit,
country_filter=request.country_filter,
exclude_ids=request.exclude_ids if request.exclude_ids else None
)
return RecommendResponse(
query_id=request.form_id,
results=[
RecommendationResult(
id=r.id,
form_type=r.form_type,
score=r.score,
distance=r.distance,
form_data=r.form_data
)
for r in results
],
total_found=len(results)
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# Volunteer Endpoints
# ============================================================================
@app.post("/volunteers/register", response_model=FormResponse)
async def register_volunteer(form: VolunteerFormRequest):
"""Register a volunteer and generate embedding."""
if not encoder:
raise HTTPException(status_code=503, detail="Encoder not initialized")
if not vector_store:
raise HTTPException(status_code=503, detail="Database not connected")
try:
# Convert form to encoding text
text = volunteer_form_to_text(form)
# Generate embedding
embedding = await encoder.encode(text)
# Store in database
form_data = form.model_dump()
await vector_store.store_embedding(
form_id=form.id,
form_type="volunteer",
embedding=embedding,
form_data=form_data
)
return FormResponse(
id=form.id,
form_type="volunteer",
message="Volunteer registered successfully",
embedding_dimension=len(embedding)
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/volunteers/recommend", response_model=RecommendResponse)
async def recommend_volunteers(request: RecommendRequest):
"""Find similar volunteers based on query."""
if not encoder:
raise HTTPException(status_code=503, detail="Encoder not initialized")
if not vector_store:
raise HTTPException(status_code=503, detail="Database not connected")
try:
# Generate query embedding
text = recommend_request_to_text(request)
query_embedding = await encoder.encode(text)
# Find similar volunteers
results = await vector_store.find_similar(
query_embedding=query_embedding,
form_type="volunteer",
limit=request.limit,
country_filter=request.country_filter,
exclude_ids=request.exclude_ids if request.exclude_ids else None
)
return RecommendResponse(
query_id=request.form_id,
results=[
RecommendationResult(
id=r.id,
form_type=r.form_type,
score=r.score,
distance=r.distance,
form_data=r.form_data
)
for r in results
],
total_found=len(results)
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# Form Management Endpoints
# ============================================================================
@app.get("/forms/{form_id}")
async def get_form(form_id: str):
"""Get a stored form by ID."""
if not vector_store:
raise HTTPException(status_code=503, detail="Database not connected")
result = await vector_store.get_embedding(form_id)
if not result:
raise HTTPException(status_code=404, detail=f"Form {form_id} not found")
return {
"id": result.id,
"form_type": result.form_type,
"form_data": result.form_data
}
@app.delete("/forms/{form_id}")
async def delete_form(form_id: str):
"""Delete a form by ID."""
if not vector_store:
raise HTTPException(status_code=503, detail="Database not connected")
deleted = await vector_store.delete_embedding(form_id)
if not deleted:
raise HTTPException(status_code=404, detail=f"Form {form_id} not found")
return {"message": f"Form {form_id} deleted successfully"}
@app.get("/forms/stats/summary", response_model=StatsResponse)
async def get_form_stats():
"""Get form counts by type."""
if not vector_store:
raise HTTPException(status_code=503, detail="Database not connected")
try:
counts = await vector_store.count_by_type()
return StatsResponse(
donor=counts.get("donor", 0),
volunteer=counts.get("volunteer", 0),
total=counts.get("total", 0)
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# Cause-based Search Endpoint
# ============================================================================
@app.post("/forms/search/causes")
async def search_by_causes(
causes: List[str],
limit: int = 20
):
"""Search forms by causes with embedding ranking."""
if not encoder:
raise HTTPException(status_code=503, detail="Encoder not initialized")
if not vector_store:
raise HTTPException(status_code=503, detail="Database not connected")
try:
# Create a synthetic query embedding for ranking
query_text = f"Causes interested in: {', '.join(causes)}"
query_embedding = await encoder.encode(query_text)
results = await vector_store.find_by_causes(
target_causes=causes,
query_embedding=query_embedding,
limit=limit
)
return {
"causes": causes,
"results": [
{
"id": r.id,
"form_type": r.form_type,
"score": r.score,
"distance": r.distance,
"form_data": r.form_data
}
for r in results
],
"total_found": len(results)
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# GIS & Client Targeting Endpoints
# ============================================================================
# Mock Singpass data for different organization profiles
MOCK_SINGPASS_PROFILES = {
"org_001": SingpassMockData(
name="Sarah Tan Wei Ling",
nric_masked="S****567A",
email="sarah.tan@example.org",
mobile="+65 9123 4567",
registered_address="123 Orchard Road, #12-01, Singapore 238867",
planning_area="orchard",
organization_name="Hearts of Hope Foundation",
organization_uen="201912345K",
organization_type="charity",
),
"org_002": SingpassMockData(
name="Ahmad bin Ibrahim",
nric_masked="S****234B",
email="ahmad.ibrahim@greensg.org",
mobile="+65 9876 5432",
registered_address="45 Jurong East Ave 1, #05-12, Singapore 609788",
planning_area="jurong_east",
organization_name="Green Singapore Initiative",
organization_uen="201823456M",
organization_type="ngo",
),
"org_003": SingpassMockData(
name="Lee Mei Hua",
nric_masked="S****789C",
email="meihua@eldercare.sg",
mobile="+65 8765 4321",
registered_address="78 Toa Payoh Lorong 1, #08-22, Singapore 310078",
planning_area="toa_payoh",
organization_name="ElderCare Singapore",
organization_uen="200934567N",
organization_type="social_enterprise",
),
}
@app.get("/singpass/mock/{profile_id}", response_model=SingpassMockData)
async def get_singpass_mock_data(profile_id: str):
"""
Get mock Singpass data for autofill demonstration.
Available profiles: org_001, org_002, org_003
"""
if profile_id not in MOCK_SINGPASS_PROFILES:
# Return a random profile if not found
profile_id = "org_001"
return MOCK_SINGPASS_PROFILES[profile_id]
@app.get("/singpass/mock", response_model=Dict[str, SingpassMockData])
async def list_singpass_mock_profiles():
"""List all available mock Singpass profiles."""
return MOCK_SINGPASS_PROFILES
@app.get("/planning-areas")
async def get_planning_areas():
"""Get all Singapore planning areas with coordinates."""
from recommender.gis_recommender import PLANNING_AREAS
return PLANNING_AREAS
@app.get("/housing-types")
async def get_housing_types():
"""Get all housing types with income proxy scores."""
from recommender.gis_recommender import HOUSING_INCOME_PROXY, HousingType
return {
"types": [h.value for h in HousingType],
"income_proxy": {h.value: score for h, score in HOUSING_INCOME_PROXY.items()},
}
@app.post("/clients/register", response_model=FormResponse)
async def register_client(profile: ClientProfileRequest):
"""
Register a client profile with spatial and behavioral data.
This creates an embedding combining interests/causes with spatial context.
"""
if not encoder:
raise HTTPException(status_code=503, detail="Encoder not initialized")
if not vector_store:
raise HTTPException(status_code=503, detail="Database not connected")
try:
from recommender.gis_recommender import ClientProfile, HousingType
# Create client profile
client = ClientProfile(
user_id=profile.user_id,
coordinates=tuple(profile.coordinates),
planning_area=profile.planning_area,
housing_type=HousingType(profile.housing_type),
interests=profile.interests,
causes=profile.causes,
preferred_language=profile.preferred_language,
is_donor=profile.is_donor,
total_donated=profile.total_donated,
donation_count=profile.donation_count,
age_range=profile.age_range,
)
# Generate embedding
text = client.to_embedding_text()
embedding = await encoder.encode(text)
# Store in database
form_data = client.to_dict()
form_data["country"] = "SG" # For existing filter compatibility
await vector_store.store_embedding(
form_id=profile.user_id,
form_type="client",
embedding=embedding,
form_data=form_data,
)
return FormResponse(
id=profile.user_id,
form_type="client",
message="Client profile registered successfully",
embedding_dimension=len(embedding),
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/clients/lookalike", response_model=LookalikeResponse)
async def find_lookalike_clients(request: LookalikeRequest):
"""
Find lookalike clients (potential donors) based on a seed profile.
This uses the GIS recommender with hybrid semantic-spatial matching:
1. Find registered donors from database via vector search
2. Apply spatial/housing filters
3. Score using tiered targeting (vector + spatial proxy + proximity)
4. Fall back to mock data if database has insufficient results
5. Return results with optional GeoJSON for mapping
Note: Searches BOTH donors (from /donors/register) and clients
(from /clients/register) to find potential matches.
"""
try:
from recommender.gis_recommender import (
ClientProfile,
HousingType,
GISRecommender,
generate_seed_donor_profile,
generate_mock_clients,
)
# Create seed profile from request
seed = generate_seed_donor_profile(
cause=request.seed_causes[0] if request.seed_causes else "education"
)
seed.causes = request.seed_causes
seed.interests = request.seed_interests
# Update seed coordinates if planning area specified
if request.planning_area_filter:
from recommender.gis_recommender import PLANNING_AREAS
if request.planning_area_filter in PLANNING_AREAS:
area = PLANNING_AREAS[request.planning_area_filter]
seed.coordinates = (area["lat"], area["lng"])
seed.planning_area = request.planning_area_filter
# Regenerate embeddings for updated seed
seed.embedding = None # Force regeneration
local_recommender = GISRecommender()
seed.embedding = local_recommender._generate_fallback_embedding(seed)
seed.compute_reduced_embeddings()
# Convert housing type filter
housing_filter = None
if request.housing_type_filter:
housing_filter = [HousingType(h) for h in request.housing_type_filter]
scored_clients = []
db_results_count = 0
# Try database search first if available
if gis_recommender and encoder and vector_store:
try:
print(
f"Searching database for donors matching causes: {request.seed_causes}"
)
scored_clients = await gis_recommender.find_lookalikes(
seed_profile=seed,
k=request.limit * 2, # Get more to allow for filtering
planning_area_filter=None, # Remove strict filter for DB search
housing_type_filter=None, # Filter after retrieval
use_hybrid=False,
)
db_results_count = len(scored_clients)
print(f"Found {db_results_count} donors/clients from database")
# Apply filters after retrieval for more flexible matching
if request.planning_area_filter:
scored_clients = [
sc
for sc in scored_clients
if sc.client.planning_area == request.planning_area_filter
]
if housing_filter:
scored_clients = [
sc
for sc in scored_clients
if sc.client.housing_type in housing_filter
]
except Exception as e:
print(f"Database search failed: {e}")
import traceback
traceback.print_exc()
# If insufficient results from database, supplement with mock data
min_results = max(request.limit // 2, 10) # At least half the requested or 10
if len(scored_clients) < min_results:
print(f"Only {len(scored_clients)} from DB, supplementing with mock data")
# Generate mock candidates
fallback_candidates = generate_mock_clients(150)
# Filter by causes for relevance
if request.seed_causes:
cause_matched = [
c
for c in fallback_candidates
if any(cause in c.causes for cause in request.seed_causes)
]
if len(cause_matched) >= 20:
fallback_candidates = cause_matched
# Use hybrid matching on mock data
mock_results = local_recommender.find_lookalikes_hybrid(
seed_profile=seed,
candidates=fallback_candidates,
k=request.limit - len(scored_clients),
planning_area_filter=request.planning_area_filter,
housing_type_filter=housing_filter,
)
scored_clients.extend(mock_results)
print(
f"Added {len(mock_results)} mock results, total: {len(scored_clients)}"
)
# Sort combined results by score
scored_clients.sort(key=lambda x: x.final_score, reverse=True)
scored_clients = scored_clients[: request.limit]
# Apply tiered targeting with relaxed min_score for small datasets
effective_min_score = max(0, request.min_score - 0.1) # Relax slightly
tiered = local_recommender.apply_tiered_targeting(
scored_clients, min_score=effective_min_score
)
# Convert to response format
def to_response(sc):
return ScoredClientResponse(
user_id=sc.client.user_id,
planning_area=sc.client.planning_area,
housing_type=sc.client.housing_type.value,
causes=sc.client.causes,
interests=sc.client.interests,
is_donor=sc.client.is_donor,
final_score=round(sc.final_score, 3),
vector_similarity=round(sc.vector_similarity_score, 3),
spatial_proxy=round(sc.spatial_proxy_score, 3),
proximity=round(sc.proximity_score, 3),
coordinates=(
list(sc.client.coordinates) if request.include_geojson else None
),
)
tiers_response = {
"tier_1": [to_response(sc) for sc in tiered["tier_1"]],
"tier_2": [to_response(sc) for sc in tiered["tier_2"]],
"tier_3": [to_response(sc) for sc in tiered["tier_3"]],
}
# Generate GeoJSON if requested
geojson = None
if request.include_geojson:
all_clients = tiered["tier_1"] + tiered["tier_2"] + tiered["tier_3"]
geojson = local_recommender.to_geojson(all_clients)
total = sum(len(t) for t in tiered.values())
return LookalikeResponse(
seed_causes=request.seed_causes,
total_found=total,
tiers=tiers_response,
geojson=geojson,
)
except Exception as e:
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
async def _get_mock_lookalike_response(request: LookalikeRequest) -> LookalikeResponse:
"""Generate mock lookalike response when GIS recommender unavailable."""
from recommender.gis_recommender import (
generate_mock_clients,
PLANNING_AREAS,
HOUSING_INCOME_PROXY,
HousingType,
)
# Generate mock clients
mock_clients = generate_mock_clients(100)
# Filter by causes
filtered = [
c
for c in mock_clients
if any(cause in c.causes for cause in request.seed_causes)
]
# Apply planning area filter
if request.planning_area_filter:
filtered = [
c for c in filtered if c.planning_area == request.planning_area_filter
]
# Score and sort
scored = []
for client in filtered[: request.limit]:
# Calculate mock scores
cause_match = len(set(client.causes) & set(request.seed_causes)) / max(
len(request.seed_causes), 1
)
spatial_score = HOUSING_INCOME_PROXY.get(client.housing_type, 0.5)
final_score = 0.5 * cause_match + 0.3 * spatial_score + 0.2 * 0.5
scored.append(
{
"client": client,
"final_score": final_score,
"vector_similarity": cause_match,
"spatial_proxy": spatial_score,
"proximity": 0.5,
}
)
scored.sort(key=lambda x: x["final_score"], reverse=True)
# Apply min score filter
scored = [s for s in scored if s["final_score"] >= request.min_score]
# Create tiers
n = len(scored)
tier_size = max(n // 3, 1)
def to_response(s):
c = s["client"]
return ScoredClientResponse(
user_id=c.user_id,
planning_area=c.planning_area,
housing_type=c.housing_type.value,
causes=c.causes,
interests=c.interests,
is_donor=c.is_donor,
final_score=round(s["final_score"], 3),
vector_similarity=round(s["vector_similarity"], 3),
spatial_proxy=round(s["spatial_proxy"], 3),
proximity=round(s["proximity"], 3),
coordinates=list(c.coordinates) if request.include_geojson else None,
)
tiers = {
"tier_1": [to_response(s) for s in scored[:tier_size]],
"tier_2": [to_response(s) for s in scored[tier_size : tier_size * 2]],
"tier_3": [to_response(s) for s in scored[tier_size * 2 :]],
}
# Generate GeoJSON
geojson = None
if request.include_geojson:
features = []
for s in scored:
c = s["client"]
features.append(
{
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": [
round(c.coordinates[1], 3),
round(c.coordinates[0], 3),
],
},
"properties": {
"user_id": c.user_id,
"planning_area": c.planning_area,
"housing_type": c.housing_type.value,
"causes": c.causes,
"is_donor": c.is_donor,
"final_score": round(s["final_score"], 3),
},
}
)
geojson = {"type": "FeatureCollection", "features": features}
return LookalikeResponse(
seed_causes=request.seed_causes,
total_found=len(scored),
tiers=tiers,
geojson=geojson,
)
@app.post("/clients/seed-mock-data")
async def seed_mock_client_data(count: int = 100):
"""
Seed the database with mock client profiles for testing.
This populates the vector store with realistic Singapore client data.
"""
if not encoder:
raise HTTPException(status_code=503, detail="Encoder not initialized")
if not vector_store:
raise HTTPException(status_code=503, detail="Database not connected")
try:
from recommender.gis_recommender import generate_mock_clients
clients = generate_mock_clients(count)
registered = 0
for client in clients:
text = client.to_embedding_text()
embedding = await encoder.encode(text)
form_data = client.to_dict()
form_data["country"] = "SG"
await vector_store.store_embedding(
form_id=client.user_id,
form_type="client",
embedding=embedding,
form_data=form_data,
)
registered += 1
return {
"message": f"Seeded {registered} mock client profiles",
"count": registered,
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/debug/database-stats")
async def get_database_stats():
"""
Debug endpoint to check what's stored in the vector database.
Returns counts of donors, volunteers, and clients in the database.
"""
if not vector_store:
return {"error": "Database not connected", "stats": None}
try:
async with vector_store.pool.connection() as conn:
async with conn.cursor() as cur:
# Count by form_type
await cur.execute(
"""
SELECT
metadata->>'form_type' as form_type,
COUNT(*) as count
FROM my_embeddings
GROUP BY metadata->>'form_type'
ORDER BY count DESC
"""
)
type_counts = await cur.fetchall()
# Get sample entries
await cur.execute(
"""
SELECT source_id, metadata->>'form_type',
LEFT(text_content::text, 200) as preview
FROM my_embeddings
ORDER BY id DESC
LIMIT 10
"""
)
recent = await cur.fetchall()
return {
"connected": True,
"form_type_counts": {row[0]: row[1] for row in type_counts},
"total_entries": sum(row[1] for row in type_counts),
"recent_entries": [
{"id": row[0], "form_type": row[1], "preview": row[2]} for row in recent
],
}
except Exception as e:
return {"error": str(e), "stats": None}
@app.get("/clients/map-demographics")
async def get_map_demographics(
causes: Optional[str] = None, # Comma-separated causes
include_donors: bool = True,
include_clients: bool = True,
):
"""
Get aggregated demographics data for Singapore map visualization.
Returns:
- Planning area aggregates (donor counts, cause distribution, housing breakdown)
- Individual donor/client points with coordinates
- Demographics summary for clusters
"""
from recommender.gis_recommender import (
PLANNING_AREAS,
HousingType,
HOUSING_INCOME_PROXY,
)
if not vector_store:
# Return mock data if database not available
return await _generate_mock_map_demographics(causes)
try:
cause_list = causes.split(",") if causes else None
# Query all donors and clients from database
all_entries = []
if include_donors:
donor_results = await vector_store.find_by_form_type("donor", limit=500)
all_entries.extend(donor_results)
if include_clients:
client_results = await vector_store.find_by_form_type("client", limit=500)
all_entries.extend(client_results)
# Aggregate by planning area
area_stats = {}
individual_points = []
for entry in all_entries:
form_data = (
entry.form_data
if hasattr(entry, "form_data")
else entry.get("form_data", {})
)
entry_id = entry.id if hasattr(entry, "id") else entry.get("id", "")
form_type = (
entry.form_type
if hasattr(entry, "form_type")
else entry.get("form_type", "")
)
# Get planning area
planning_area = form_data.get("planning_area", "unknown")
if planning_area == "unknown" and form_data.get("country") == "SG":
# Infer planning area from ID hash for donors without explicit area
import hashlib
area_list = list(PLANNING_AREAS.keys())
idx = int(hashlib.md5(entry_id.encode()).hexdigest(), 16) % len(
area_list
)
planning_area = area_list[idx]
# Get causes
entry_causes = form_data.get("causes", [])
if isinstance(entry_causes, str):
entry_causes = [entry_causes]
# Filter by causes if specified
if cause_list:
if not any(c in entry_causes for c in cause_list):
continue
# Get housing type
housing_type = form_data.get("housing_type", "hdb_4_room")
amount_range = form_data.get("amount_range", "")
if not housing_type or housing_type == "unknown":
# Infer from amount_range
if "10000" in str(amount_range) or "5000" in str(amount_range):
housing_type = "landed"
elif "1000" in str(amount_range):
housing_type = "condo"
elif "500" in str(amount_range):
housing_type = "hdb_executive"
else:
housing_type = "hdb_4_room"
# Get coordinates
if planning_area in PLANNING_AREAS:
area_info = PLANNING_AREAS[planning_area]
lat = area_info["lat"] + (hash(entry_id) % 100 - 50) * 0.0005
lng = area_info["lng"] + (hash(entry_id[::-1]) % 100 - 50) * 0.0005
else:
lat, lng = 1.3521, 103.8198 # Singapore center
# Aggregate by area
if planning_area not in area_stats:
area_stats[planning_area] = {
"name": PLANNING_AREAS.get(planning_area, {}).get(
"name", planning_area.replace("_", " ").title()
),
"lat": PLANNING_AREAS.get(planning_area, {}).get("lat", 1.3521),
"lng": PLANNING_AREAS.get(planning_area, {}).get("lng", 103.8198),
"total_count": 0,
"donor_count": 0,
"client_count": 0,
"causes": {},
"housing_breakdown": {},
"avg_income_proxy": 0,
"income_proxies": [],
}
stats = area_stats[planning_area]
stats["total_count"] += 1
if form_type == "donor":
stats["donor_count"] += 1
else:
stats["client_count"] += 1
# Count causes
for cause in entry_causes:
stats["causes"][cause] = stats["causes"].get(cause, 0) + 1
# Count housing
stats["housing_breakdown"][housing_type] = (
stats["housing_breakdown"].get(housing_type, 0) + 1
)
# Track income proxy
try:
income_proxy = HOUSING_INCOME_PROXY.get(HousingType(housing_type), 0.5)
except:
income_proxy = 0.5
stats["income_proxies"].append(income_proxy)
# Add individual point
individual_points.append(
{
"id": entry_id,
"type": form_type,
"lat": lat,
"lng": lng,
"planning_area": planning_area,
"housing_type": housing_type,
"causes": entry_causes[:5], # Limit for performance
"is_donor": form_type == "donor",
}
)
# Calculate averages
for area, stats in area_stats.items():
if stats["income_proxies"]:
stats["avg_income_proxy"] = round(
sum(stats["income_proxies"]) / len(stats["income_proxies"]), 3
)
del stats["income_proxies"]
# Create GeoJSON for areas (polygons would need actual boundary data, using circles)
area_geojson = {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": [stats["lng"], stats["lat"]],
},
"properties": {
"planning_area": area,
"name": stats["name"],
**{k: v for k, v in stats.items() if k not in ["lat", "lng"]},
},
}
for area, stats in area_stats.items()
],
}
# Create GeoJSON for individual points
points_geojson = {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": [p["lng"], p["lat"]],
},
"properties": {
"id": p["id"],
"type": p["type"],
"planning_area": p["planning_area"],
"housing_type": p["housing_type"],
"causes": p["causes"],
"is_donor": p["is_donor"],
},
}
for p in individual_points
],
}
# Summary statistics
all_causes = {}
all_housing = {}
for stats in area_stats.values():
for cause, count in stats["causes"].items():
all_causes[cause] = all_causes.get(cause, 0) + count
for housing, count in stats["housing_breakdown"].items():
all_housing[housing] = all_housing.get(housing, 0) + count
return {
"total_donors": sum(s["donor_count"] for s in area_stats.values()),
"total_clients": sum(s["client_count"] for s in area_stats.values()),
"areas_with_data": len(area_stats),
"summary": {
"top_causes": sorted(
all_causes.items(), key=lambda x: x[1], reverse=True
)[:10],
"housing_distribution": all_housing,
},
"area_aggregates": area_geojson,
"individual_points": points_geojson,
"planning_areas": PLANNING_AREAS,
}
except Exception as e:
import traceback
traceback.print_exc()
return await _generate_mock_map_demographics(causes)
async def _generate_mock_map_demographics(causes: Optional[str] = None):
"""Generate mock demographics data for map visualization."""
from recommender.gis_recommender import (
PLANNING_AREAS,
HOUSING_INCOME_PROXY,
HousingType,
)
import random
cause_list = (
causes.split(",")
if causes
else ["education", "animals", "poverty", "environment", "health"]
)
area_stats = {}
individual_points = []
for area_id, area_info in PLANNING_AREAS.items():
count = random.randint(3, 25)
donors = random.randint(1, count)
area_stats[area_id] = {
"name": area_info["name"],
"lat": area_info["lat"],
"lng": area_info["lng"],
"total_count": count,
"donor_count": donors,
"client_count": count - donors,
"causes": {
cause: random.randint(1, count)
for cause in random.sample(cause_list, min(3, len(cause_list)))
},
"housing_breakdown": {
"hdb_4_room": random.randint(0, count // 2),
"condo": random.randint(0, count // 3),
"landed": random.randint(0, count // 4),
},
"avg_income_proxy": round(random.uniform(0.3, 0.8), 3),
}
# Generate individual points
for i in range(count):
lat = area_info["lat"] + (random.random() - 0.5) * 0.02
lng = area_info["lng"] + (random.random() - 0.5) * 0.02
housing_types = [
"hdb_3_room",
"hdb_4_room",
"hdb_5_room",
"hdb_executive",
"condo",
"landed",
]
individual_points.append(
{
"id": f"mock_{area_id}_{i}",
"type": "donor" if i < donors else "client",
"lat": lat,
"lng": lng,
"planning_area": area_id,
"housing_type": random.choice(housing_types),
"causes": random.sample(cause_list, min(2, len(cause_list))),
"is_donor": i < donors,
}
)
# Create GeoJSON
area_geojson = {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": [stats["lng"], stats["lat"]],
},
"properties": {
"planning_area": area,
"name": stats["name"],
**{k: v for k, v in stats.items() if k not in ["lat", "lng"]},
},
}
for area, stats in area_stats.items()
],
}
points_geojson = {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"geometry": {"type": "Point", "coordinates": [p["lng"], p["lat"]]},
"properties": {k: v for k, v in p.items() if k not in ["lat", "lng"]},
}
for p in individual_points
],
}
return {
"total_donors": sum(s["donor_count"] for s in area_stats.values()),
"total_clients": sum(s["client_count"] for s in area_stats.values()),
"areas_with_data": len(area_stats),
"summary": {
"top_causes": [(c, random.randint(10, 50)) for c in cause_list[:5]],
"housing_distribution": {
"hdb_4_room": 120,
"condo": 45,
"landed": 20,
"hdb_5_room": 30,
},
},
"area_aggregates": area_geojson,
"individual_points": points_geojson,
"planning_areas": PLANNING_AREAS,
}
@app.get("/debug/search-donors")
async def debug_search_donors(cause: str = "education", limit: int = 10):
"""
Debug endpoint to directly search for donors in the database.
This bypasses the GIS recommender to see raw database results.
"""
if not encoder or not vector_store:
return {"error": "Encoder or database not available"}
try:
# Create a simple query embedding
query_text = f"Donor interested in {cause} causes, looking to support {cause} initiatives"
query_embedding = await encoder.encode(query_text)
# Search for donors
donor_results = await vector_store.find_similar(
query_embedding=query_embedding,
form_type="donor",
limit=limit,
)
# Also search for clients
client_results = await vector_store.find_similar(
query_embedding=query_embedding,
form_type="client",
limit=limit,
)
return {
"query_cause": cause,
"donor_results": [
{
"id": r.id,
"form_type": r.form_type,
"score": round(r.score, 4),
"distance": round(r.distance, 4),
"causes": r.form_data.get("causes", []),
"country": r.form_data.get("country"),
}
for r in donor_results
],
"client_results": [
{
"id": r.id,
"form_type": r.form_type,
"score": round(r.score, 4),
"distance": round(r.distance, 4),
"causes": r.form_data.get("causes", []),
"planning_area": r.form_data.get("planning_area"),
}
for r in client_results
],
"total_donors": len(donor_results),
"total_clients": len(client_results),
}
except Exception as e:
import traceback
return {"error": str(e), "traceback": traceback.format_exc()}
# ============================================================================
# Main
# ============================================================================
if __name__ == "__main__":
import uvicorn
# Windows-specific fix: must be set before uvicorn starts its event loop
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
uvicorn.run(app, host="0.0.0.0", port=7860)