TreeTrack / app.py
RoyAalekh's picture
Security: remove auto-login demo endpoint, require password for demo_user
2a49dd7
raw
history blame
42.5 kB
"""
TreeTrack FastAPI Application - Supabase Edition
Clean implementation using Supabase Postgres + Storage
"""
import json
import logging
import time
from datetime import datetime
from typing import Any, Optional, List, Dict
import uvicorn
from fastapi import FastAPI, HTTPException, Request, status, File, UploadFile, Form, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, RedirectResponse, Response
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field, field_validator
import os
# Import our Supabase components
from supabase_database import SupabaseDatabase
from supabase_storage import SupabaseFileStorage
from config import get_settings
from master_tree_database import create_master_tree_database, get_tree_suggestions, get_all_tree_codes
from auth import auth_manager
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("app.log"), logging.StreamHandler()],
)
logger = logging.getLogger(__name__)
# Log startup
build_time = os.environ.get('BUILD_TIME', 'unknown')
logger.info(f"TreeTrack Supabase Edition starting - Build time: {build_time}")
# Get configuration settings
settings = get_settings()
# Initialize FastAPI app
app = FastAPI(
title="TreeTrack - Supabase Edition",
description="Tree mapping and tracking with persistent cloud storage",
version="3.0.0",
docs_url="/docs",
redoc_url="/redoc",
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.security.cors_origins,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["*"],
)
# Serve static files
app.mount("/static", StaticFiles(directory="static"), name="static")
# Initialize Supabase components
db = SupabaseDatabase()
storage = SupabaseFileStorage()
# Authentication models
class LoginRequest(BaseModel):
username: str
password: str
class LoginResponse(BaseModel):
token: str
user: Dict[str, Any]
class UserInfo(BaseModel):
username: str
role: str
full_name: str
permissions: List[str]
class DemoLoginResponse(BaseModel):
token: str
user: Dict[str, Any]
is_demo_mode: bool = True
# Helper function for authentication
def get_current_user(request: Request) -> Optional[Dict[str, Any]]:
"""Extract user info from request headers or cookies"""
# Try Authorization header first (for API calls)
auth_header = request.headers.get('Authorization')
if auth_header and auth_header.startswith('Bearer '):
token = auth_header.split(' ')[1]
return auth_manager.validate_session(token)
# Try cookie for web page requests
auth_cookie = request.cookies.get('auth_token')
if auth_cookie:
return auth_manager.validate_session(auth_cookie)
return None
def require_auth(request: Request) -> Dict[str, Any]:
"""Dependency that requires authentication"""
user = get_current_user(request)
if not user:
# Server-side auth telemetry for unauthorized access
try:
_record_server_telemetry(
request=request,
event_type='auth',
status='unauthorized',
metadata={
'path': str(request.url),
'method': request.method,
'has_auth_header': bool(request.headers.get('Authorization'))
},
user=None
)
except Exception:
pass
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required",
headers={"WWW-Authenticate": "Bearer"},
)
return user
def require_permission(permission: str):
"""Dependency factory for specific permissions"""
def check_permission(request: Request) -> Dict[str, Any]:
user = require_auth(request)
if permission not in user.get('permissions', []):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission '{permission}' required"
)
return user
return check_permission
# Pydantic models (same as before)
class Tree(BaseModel):
"""Complete tree model with all 12 fields"""
id: int
latitude: float
longitude: float
location_name: Optional[str] = None
local_name: Optional[str] = None
scientific_name: Optional[str] = None
common_name: Optional[str] = None
tree_code: Optional[str] = None
height: Optional[float] = None
width: Optional[float] = None
utility: Optional[List[str]] = None
storytelling_text: Optional[str] = None
storytelling_audio: Optional[str] = None
phenology_stages: Optional[List[str]] = None
photographs: Optional[Dict[str, str]] = None
notes: Optional[str] = None
created_at: str
updated_at: Optional[str] = None
created_by: str = "system"
class TreeCreate(BaseModel):
"""Model for creating new tree records"""
latitude: float = Field(..., ge=-90, le=90, description="Latitude in decimal degrees")
longitude: float = Field(..., ge=-180, le=180, description="Longitude in decimal degrees")
location_name: Optional[str] = Field(None, max_length=200, description="Human-readable location (e.g., landmark)")
local_name: Optional[str] = Field(None, max_length=200, description="Local Assamese name")
scientific_name: Optional[str] = Field(None, max_length=200, description="Scientific name")
common_name: Optional[str] = Field(None, max_length=200, description="Common name")
tree_code: Optional[str] = Field(None, max_length=20, description="Tree reference code")
height: Optional[float] = Field(None, gt=0, le=1000, description="Height in feet (ft)")
width: Optional[float] = Field(None, gt=0, le=200, description="Girth/DBH in feet (ft)")
utility: Optional[List[str]] = Field(None, description="Ecological/cultural utilities")
storytelling_text: Optional[str] = Field(None, max_length=5000, description="Stories and narratives")
storytelling_audio: Optional[str] = Field(None, description="Audio file path")
phenology_stages: Optional[List[str]] = Field(None, description="Current development stages")
photographs: Optional[Dict[str, str]] = Field(None, description="Photo categories and paths")
notes: Optional[str] = Field(None, max_length=2000, description="Additional observations")
@field_validator("utility", mode='before')
@classmethod
def validate_utility(cls, v):
if isinstance(v, str):
try:
v = json.loads(v)
except json.JSONDecodeError:
raise ValueError(f"Invalid JSON string for utility: {v}")
if v is not None:
valid_utilities = [
"Religious", "Timber", "Biodiversity", "Hydrological benefit",
"Faunal interaction", "Food", "Medicine", "Shelter", "Cultural"
]
for item in v:
if item not in valid_utilities:
raise ValueError(f"Invalid utility: {item}")
return v
@field_validator("phenology_stages", mode='before')
@classmethod
def validate_phenology(cls, v):
if isinstance(v, str):
try:
v = json.loads(v)
except json.JSONDecodeError:
raise ValueError(f"Invalid JSON string for phenology_stages: {v}")
if v is not None:
valid_stages = [
"New leaves", "Old leaves", "Open flowers", "Fruiting",
"Ripe fruit", "Recent fruit drop", "Other"
]
for stage in v:
if stage not in valid_stages:
raise ValueError(f"Invalid phenology stage: {stage}")
return v
@field_validator("photographs", mode='before')
@classmethod
def validate_photographs(cls, v):
if isinstance(v, str):
try:
v = json.loads(v)
except json.JSONDecodeError:
raise ValueError(f"Invalid JSON string for photographs: {v}")
if v is not None:
valid_categories = ["Leaf", "Bark", "Fruit", "Seed", "Flower", "Full tree"]
for category in v.keys():
if category not in valid_categories:
raise ValueError(f"Invalid photo category: {category}")
return v
class TreeUpdate(BaseModel):
"""Model for updating tree records"""
latitude: Optional[float] = Field(None, ge=-90, le=90)
longitude: Optional[float] = Field(None, ge=-180, le=180)
location_name: Optional[str] = Field(None, max_length=200)
local_name: Optional[str] = Field(None, max_length=200)
scientific_name: Optional[str] = Field(None, max_length=200)
common_name: Optional[str] = Field(None, max_length=200)
tree_code: Optional[str] = Field(None, max_length=20)
height: Optional[float] = Field(None, gt=0, le=1000)
width: Optional[float] = Field(None, gt=0, le=200)
utility: Optional[List[str]] = None
storytelling_text: Optional[str] = Field(None, max_length=5000)
storytelling_audio: Optional[str] = None
phenology_stages: Optional[List[str]] = None
photographs: Optional[Dict[str, str]] = None
notes: Optional[str] = Field(None, max_length=2000)
# Application startup
@app.on_event("startup")
async def startup_event():
"""Initialize application"""
try:
# Initialize master tree database (always works - local SQLite)
create_master_tree_database()
logger.info("Master tree database initialized with 146 species")
# Test Supabase connection (non-blocking)
try:
if db.test_connection():
# Initialize database schema if connection works
db.initialize_database()
# Log success
tree_count = db.get_tree_count()
logger.info(f"TreeTrack Supabase Edition initialized - {tree_count} trees in database")
else:
logger.warning("Supabase connection failed - running in limited mode")
logger.warning("Database operations will fail until Supabase is configured")
except Exception as db_error:
logger.error(f"Supabase connection error: {db_error}")
logger.warning("App starting in limited mode - only master tree database available")
logger.info("TreeTrack application startup complete")
except Exception as e:
logger.error(f"Critical application startup failed: {e}")
# Only fail if master database fails (shouldn't happen)
raise
# Health check
@app.get("/health", tags=["Health"])
async def health_check():
"""Health check endpoint"""
try:
connection_ok = db.test_connection()
tree_count = db.get_tree_count() if connection_ok else 0
# Include environment debug info
import os
supabase_url = os.getenv("SUPABASE_URL", "NOT_SET")
supabase_anon_key = os.getenv("SUPABASE_ANON_KEY", "NOT_SET")
return {
"status": "healthy" if connection_ok else "limited",
"database": "connected" if connection_ok else "disconnected",
"trees": tree_count,
"master_database": "available",
"timestamp": datetime.now().isoformat(),
"version": "3.0.0",
"environment": {
"supabase_url": supabase_url[:50] + "..." if len(supabase_url) > 50 else supabase_url,
"supabase_anon_key": "SET" if supabase_anon_key != "NOT_SET" else "NOT_SET",
"port": os.getenv("PORT", "7860")
}
}
except Exception as e:
logger.error(f"Health check failed: {e}")
return {
"status": "unhealthy",
"error": str(e),
"timestamp": datetime.now().isoformat(),
}
# Authentication routes
@app.post("/api/auth/login", response_model=LoginResponse, tags=["Authentication"])
async def login(login_data: LoginRequest, response: Response):
"""Authenticate user and create session"""
result = auth_manager.authenticate(login_data.username, login_data.password)
if not result:
# Telemetry: login failure
try:
# Construct a minimal request-like object for _record_server_telemetry if needed
from fastapi import Request as _Req
except Exception:
pass
# We have the real request inside FastAPI dependency; emulate via middleware not needed here
# Instead, log via logger for this path
# Note: We cannot access Request here directly, so we skip client context
# Use file-based fallback
_write_telemetry({
'event_type': 'auth',
'status': 'login_failed',
'metadata': {'username': login_data.username},
'timestamp': datetime.now().isoformat()
})
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password"
)
# Set authentication cookie for web page requests
response.set_cookie(
key="auth_token",
value=result["token"],
max_age=8*60*60, # 8 hours (same as session timeout)
httponly=True, # Prevent JavaScript access for security
secure=True, # HTTPS required for HuggingFace Spaces
samesite="lax" # CSRF protection
)
# Telemetry: login success (server-side)
try:
# We cannot access Request here directly; emit without client metadata
_write_telemetry({
'event_type': 'auth',
'status': 'login_success',
'metadata': {'username': login_data.username},
'timestamp': datetime.now().isoformat(),
'user': {'username': result['user']['username'], 'role': result['user']['role']}
})
except Exception:
pass
return result
@app.get("/api/auth/validate", tags=["Authentication"])
async def validate_session(user: Dict[str, Any] = Depends(require_auth)):
"""Validate current session"""
return {
"valid": True,
"user": user
}
@app.post("/api/auth/logout", tags=["Authentication"])
async def logout(request: Request, response: Response):
"""Logout user and invalidate session"""
# Get token from header or cookie
token = None
auth_header = request.headers.get('Authorization')
if auth_header and auth_header.startswith('Bearer '):
token = auth_header.split(' ')[1]
else:
token = request.cookies.get('auth_token')
if token:
# Telemetry: logout server-side
try:
session = auth_manager.validate_session(token)
_record_server_telemetry(
request=request,
event_type='auth',
status='logout',
metadata={'path': str(request.url)},
user=session or None
)
except Exception:
pass
auth_manager.logout(token)
# Clear the authentication cookie (must match creation parameters)
response.delete_cookie(
key="auth_token",
secure=True,
samesite="lax"
)
return {"message": "Logged out successfully"}
@app.get("/api/auth/user", response_model=UserInfo, tags=["Authentication"])
async def get_user_info(user: Dict[str, Any] = Depends(require_auth)):
"""Get current user information"""
return UserInfo(
username=user["username"],
role=user["role"],
full_name=user["full_name"],
permissions=user["permissions"]
)
# Frontend routes
@app.get("/welcome", response_class=HTMLResponse, tags=["Frontend"])
async def serve_welcome(request: Request):
"""Serve the demo welcome screen for authenticated users"""
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
try:
with open("static/welcome.html", encoding="utf-8") as f:
content = f.read()
return HTMLResponse(content=content)
except FileNotFoundError:
logger.error("welcome.html not found")
raise HTTPException(status_code=404, detail="Welcome page not found")
@app.get("/login", response_class=HTMLResponse, tags=["Frontend"])
async def serve_login():
"""Serve the login page"""
try:
with open("static/login.html", encoding="utf-8") as f:
content = f.read()
return HTMLResponse(content=content)
except FileNotFoundError:
logger.error("login.html not found")
raise HTTPException(status_code=404, detail="Login page not found")
@app.get("/", response_class=HTMLResponse, tags=["Frontend"])
async def read_root(request: Request):
"""Serve main app or redirect based on user type"""
user = get_current_user(request)
# No user? Redirect to login
if not user:
return RedirectResponse(url="/login")
# Demo users see welcome screen after login
if user.get('role') == 'demo_user':
return RedirectResponse(url="/welcome")
# Regular authenticated users get the app directly
try:
with open("static/index.html", encoding="utf-8") as f:
content = f.read()
return HTMLResponse(content=content)
except FileNotFoundError:
raise HTTPException(status_code=404, detail="App not found")
@app.get("/form", response_class=HTMLResponse, tags=["Frontend"])
async def serve_form(request: Request):
"""Direct access to form for demo users"""
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
try:
with open("static/index.html", encoding="utf-8") as f:
content = f.read()
return HTMLResponse(content=content)
except FileNotFoundError:
raise HTTPException(status_code=404, detail="Form not found")
@app.get("/map", response_class=HTMLResponse, tags=["Frontend"])
async def serve_map(request: Request):
"""Serve the map page with auth check"""
# Check if user is authenticated
user = get_current_user(request)
# Redirect to login if not authenticated
if not user:
return RedirectResponse(url="/login")
return RedirectResponse(url="/static/map.html")
@app.get("/contributors", response_class=HTMLResponse, tags=["Frontend"])
async def serve_contributors(request: Request):
"""Serve the contributors page with auth check"""
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
try:
with open("static/contributors.html", encoding="utf-8") as f:
content = f.read()
return HTMLResponse(content=content)
except FileNotFoundError:
logger.error("contributors.html not found")
raise HTTPException(status_code=404, detail="Contributors page not found")
@app.get("/api/dashboard/stats", tags=["Dashboard"])
async def get_dashboard_stats(user: Dict[str, Any] = Depends(require_auth)):
"""Get dashboard statistics"""
try:
# Get basic tree statistics
total_trees = db.get_tree_count() # Remove await - method is not async
# Get unique species count
trees = await db.get_trees(limit=10000, offset=0) # Get all for counting
unique_species = set()
for tree in trees:
if hasattr(tree, 'scientific_name') and tree.scientific_name:
unique_species.add(tree.scientific_name)
elif hasattr(tree, 'common_name') and tree.common_name:
unique_species.add(tree.common_name)
# Get last update time
last_updated = None
if trees:
last_updated = max(tree.updated_at or tree.created_at for tree in trees)
return {
"totalTrees": total_trees,
"totalSpecies": len(unique_species),
"totalUsers": 4, # Static count for now
"lastUpdated": last_updated
}
except Exception as e:
logger.error(f"Error getting dashboard stats: {e}")
# Return basic fallback stats
return {
"totalTrees": 0,
"totalSpecies": 0,
"totalUsers": 4,
"lastUpdated": None
}
# Tree CRUD Operations
@app.get("/api/trees", response_model=List[Tree], tags=["Trees"])
async def get_trees(
limit: int = 100,
offset: int = 0,
species: str = None,
health_status: str = None,
user: Dict[str, Any] = Depends(require_auth)
):
"""Get trees with pagination and filters"""
if limit < 1 or limit > settings.server.max_trees_per_request:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Limit must be between 1 and {settings.server.max_trees_per_request}",
)
if offset < 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Offset must be non-negative",
)
try:
trees = await db.get_trees(limit=limit, offset=offset, species=species, health_status=health_status)
# For map performance: Skip image processing for bulk tree requests
# Images are not needed for map markers and clustering
return trees
except RuntimeError as e:
if "Database not connected" in str(e):
raise HTTPException(status_code=503, detail="Database not configured")
raise HTTPException(status_code=500, detail="Database error")
except Exception as e:
logger.error(f"Error retrieving trees: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve trees")
@app.post("/api/trees", response_model=Tree, status_code=status.HTTP_201_CREATED, tags=["Trees"])
async def create_tree(tree: TreeCreate, user: Dict[str, Any] = Depends(require_auth)):
"""Create a new tree record (demo mode aware)"""
try:
# Check if user is in demo mode (conference participant)
is_demo_user = "demo_interact" in user.get("permissions", [])
if is_demo_user:
# Demo mode: Return mock success response without saving
tree_data = tree.model_dump(exclude_unset=True)
tree_data['created_by'] = user['username']
# Create a mock tree response
mock_tree = {
"id": 9999, # Mock ID
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
**tree_data
}
logger.info(f"Demo mode: Mock tree creation for conference participant")
return mock_tree
# Check write permission for regular users
if "write" not in user.get("permissions", []):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions to create trees"
)
# Convert to dict for database insertion
tree_data = tree.model_dump(exclude_unset=True)
# Add created_by field
tree_data['created_by'] = user['username']
# Create tree in database
created_tree = await db.create_tree(tree_data)
# Process files and return with URLs
processed_tree = storage.process_tree_files(created_tree)
logger.info(f"Created tree with ID: {created_tree.get('id')}")
return processed_tree
except HTTPException:
raise
except Exception as e:
logger.error(f"Error creating tree: {e}")
raise HTTPException(status_code=500, detail="Failed to create tree")
@app.get("/api/trees/{tree_id}", response_model=Tree, tags=["Trees"])
async def get_tree(tree_id: int, user: Dict[str, Any] = Depends(require_auth)):
"""Get a specific tree by ID"""
try:
tree = await db.get_tree(tree_id)
if tree is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Tree with ID {tree_id} not found",
)
# Process files and return with URLs
processed_tree = storage.process_tree_files(tree)
return processed_tree
except HTTPException:
raise
except Exception as e:
logger.error(f"Error retrieving tree {tree_id}: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve tree")
@app.put("/api/trees/{tree_id}", response_model=Tree, tags=["Trees"])
async def update_tree(tree_id: int, tree_update: TreeUpdate, request: Request):
"""Update a tree record"""
try:
# Get current user
user = require_auth(request)
# Get existing tree to check permissions
existing_tree = await db.get_tree(tree_id)
if not existing_tree:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Tree with ID {tree_id} not found",
)
# Get token from header or cookie for permission checking
token = None
auth_header = request.headers.get('Authorization', '')
if auth_header.startswith('Bearer '):
token = auth_header.split(' ')[1]
else:
token = request.cookies.get('auth_token')
if not token or not auth_manager.can_edit_tree(token, existing_tree.get('created_by', '')):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to edit this tree",
)
# Convert to dict for database update
update_data = tree_update.model_dump(exclude_unset=True)
if not update_data:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No update data provided",
)
# Update tree in database
logger.info(f"Updating tree {tree_id} with fields: {list(update_data.keys())}")
updated_tree = await db.update_tree(tree_id, update_data)
# Process files and return with URLs
processed_tree = storage.process_tree_files(updated_tree)
logger.info(f"Updated tree with ID: {tree_id}")
return processed_tree
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating tree {tree_id}: {e}")
raise HTTPException(status_code=500, detail="Failed to update tree")
@app.delete("/api/trees/{tree_id}", tags=["Trees"])
async def delete_tree(tree_id: int, request: Request):
"""Delete a tree record"""
try:
# Get current user
user = require_auth(request)
# Get tree data first to clean up files
tree = await db.get_tree(tree_id)
if tree is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Tree with ID {tree_id} not found",
)
# Get token from header or cookie for permission checking
token = None
auth_header = request.headers.get('Authorization', '')
if auth_header.startswith('Bearer '):
token = auth_header.split(' ')[1]
else:
token = request.cookies.get('auth_token')
if not token or not auth_manager.can_delete_tree(token, tree.get('created_by', '')):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to delete this tree",
)
# Delete tree from database
await db.delete_tree(tree_id)
# Clean up associated files
try:
if tree.get('photographs'):
for file_path in tree['photographs'].values():
if file_path:
storage.delete_image(file_path)
if tree.get('storytelling_audio'):
storage.delete_audio(tree['storytelling_audio'])
except Exception as e:
logger.warning(f"Failed to clean up files for tree {tree_id}: {e}")
logger.info(f"Deleted tree with ID: {tree_id}")
return {"message": f"Tree {tree_id} deleted successfully"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting tree {tree_id}: {e}")
raise HTTPException(status_code=500, detail="Failed to delete tree")
# File Upload Endpoints
@app.post("/api/upload/image", tags=["Files"])
async def upload_image(
file: UploadFile = File(...),
category: str = Form(...),
user: Dict[str, Any] = Depends(require_auth)
):
"""Upload an image file with cloud persistence (demo mode aware)"""
# Validate file type
if not file.content_type or not file.content_type.startswith('image/'):
raise HTTPException(status_code=400, detail="File must be an image")
# Validate category
valid_categories = ["Leaf", "Bark", "Fruit", "Seed", "Flower", "Full tree"]
if category not in valid_categories:
raise HTTPException(
status_code=400,
detail=f"Category must be one of: {valid_categories}"
)
try:
# Check if user is in demo mode
is_demo_user = "demo_interact" in user.get("permissions", [])
if is_demo_user:
# Demo mode: Return mock success without uploading
await file.read() # Consume the file to prevent issues
logger.info(f"Demo mode: Mock image upload for conference participant")
return {
"filename": f"demo_{file.filename}",
"category": category,
"size": 0,
"content_type": file.content_type,
"bucket": "demo-bucket",
"success": True
}
# Check write permission for regular users
if "write" not in user.get("permissions", []):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions to upload files"
)
# Read file content
content = await file.read()
# Upload to Supabase Storage
result = storage.upload_image(content, file.filename, category)
logger.info(f"Image uploaded successfully: {result['filename']}")
return {
"filename": result['filename'],
"category": category,
"size": result['size'],
"content_type": file.content_type,
"bucket": result['bucket'],
"success": True
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error uploading image: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/upload/audio", tags=["Files"])
async def upload_audio(file: UploadFile = File(...), user: Dict[str, Any] = Depends(require_auth)):
"""Upload an audio file with cloud persistence (demo mode aware)"""
# Validate file type
if not file.content_type or not file.content_type.startswith('audio/'):
raise HTTPException(status_code=400, detail="File must be an audio file")
try:
# Check if user is in demo mode
is_demo_user = "demo_interact" in user.get("permissions", [])
if is_demo_user:
# Demo mode: Return mock success without uploading
await file.read() # Consume the file to prevent issues
logger.info(f"Demo mode: Mock audio upload for conference participant")
return {
"filename": f"demo_{file.filename}",
"size": 0,
"content_type": file.content_type,
"bucket": "demo-bucket",
"success": True
}
# Check write permission for regular users
if "write" not in user.get("permissions", []):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions to upload files"
)
# Read file content
content = await file.read()
# Upload to Supabase Storage
result = storage.upload_audio(content, file.filename)
logger.info(f"Audio uploaded successfully: {result['filename']}")
return {
"filename": result['filename'],
"size": result['size'],
"content_type": file.content_type,
"bucket": result['bucket'],
"success": True
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error uploading audio: {e}")
raise HTTPException(status_code=500, detail=str(e))
# File serving - generate signed URLs on demand
@app.get("/api/files/image/{file_path:path}", tags=["Files"])
async def get_image(file_path: str):
"""Get signed URL for image file"""
try:
signed_url = storage.get_image_url(file_path, expires_in=3600) # 1 hour
return RedirectResponse(url=signed_url)
except Exception as e:
logger.error(f"Error getting image URL: {e}")
raise HTTPException(status_code=404, detail="Image not found")
@app.get("/api/files/audio/{file_path:path}", tags=["Files"])
async def get_audio(file_path: str):
"""Get signed URL for audio file"""
try:
signed_url = storage.get_audio_url(file_path, expires_in=3600) # 1 hour
return RedirectResponse(url=signed_url)
except Exception as e:
logger.error(f"Error getting audio URL: {e}")
raise HTTPException(status_code=404, detail="Audio not found")
# Utility endpoints
@app.get("/api/utilities", tags=["Data"])
async def get_utilities():
"""Get list of valid utility options"""
return {
"utilities": [
"Religious", "Timber", "Biodiversity", "Hydrological benefit",
"Faunal interaction", "Food", "Medicine", "Shelter", "Cultural"
]
}
@app.get("/api/phenology-stages", tags=["Data"])
async def get_phenology_stages():
"""Get list of valid phenology stages"""
return {
"stages": [
"New leaves", "Old leaves", "Open flowers", "Fruiting",
"Ripe fruit", "Recent fruit drop", "Other"
]
}
@app.get("/api/photo-categories", tags=["Data"])
async def get_photo_categories():
"""Get list of valid photo categories"""
return {
"categories": ["Leaf", "Bark", "Fruit", "Seed", "Flower", "Full tree"]
}
# Statistics
@app.get("/api/stats", tags=["Statistics"])
async def get_stats():
"""Get comprehensive tree statistics"""
try:
total_trees = db.get_tree_count()
species_distribution = db.get_species_distribution(limit=20)
health_distribution = db.get_health_distribution() # Will be empty for now
measurements = db.get_average_measurements()
return {
"total_trees": total_trees,
"species_distribution": species_distribution,
"health_distribution": health_distribution,
"average_height_ft": measurements["average_height"],
"average_girth_ft": measurements["average_diameter"],
"units": {"height": "ft", "girth": "ft"},
"last_updated": datetime.now().isoformat(),
}
except Exception as e:
logger.error(f"Error retrieving statistics: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve statistics")
# Master tree database suggestions
@app.get("/api/tree-suggestions", tags=["Data"])
async def get_tree_suggestions_api(query: str = "", limit: int = 10):
"""Get auto-suggestions for tree names from master database"""
if not query or len(query.strip()) == 0:
return {"suggestions": []}
try:
create_master_tree_database()
suggestions = get_tree_suggestions(query.strip(), limit)
return {
"query": query,
"suggestions": suggestions,
"count": len(suggestions)
}
except Exception as e:
logger.error(f"Error getting tree suggestions: {e}")
return {"suggestions": [], "error": str(e)}
@app.get("/api/tree-codes", tags=["Data"])
async def get_tree_codes_api():
"""Get all available tree codes from master database"""
try:
create_master_tree_database()
tree_codes = get_all_tree_codes()
return {
"tree_codes": tree_codes,
"count": len(tree_codes)
}
except Exception as e:
logger.error(f"Error getting tree codes: {e}")
return {"tree_codes": [], "error": str(e)}
# Telemetry logging
# Internal helper to record server-side telemetry without requiring client call
def _record_server_telemetry(request: Request, event_type: str, status: str = None, metadata: Dict[str, Any] = None, user: Dict[str, Any] = None):
evt = {
'event_type': event_type,
'status': status,
'metadata': metadata or {},
'timestamp': datetime.now().isoformat(),
'user': None,
'client': {
'ip': request.client.host if request.client else None,
'user_agent': request.headers.get('user-agent')
}
}
if user:
evt['user'] = { 'username': user.get('username'), 'role': user.get('role') }
if getattr(db, 'connected', False):
if not db.log_telemetry(evt):
_write_telemetry(evt)
else:
_write_telemetry(evt)
class TelemetryEvent(BaseModel):
event_type: str = Field(..., description="Type of event, e.g., 'upload', 'ui', 'error'")
status: Optional[str] = Field(None, description="Status such as success/error")
metadata: Optional[Dict[str, Any]] = Field(default_factory=dict)
timestamp: Optional[str] = Field(default=None)
def _write_telemetry(event: Dict[str, Any]):
try:
# Ensure timestamp
if 'timestamp' not in event or not event['timestamp']:
event['timestamp'] = datetime.now().isoformat()
# Append as JSON line
with open("telemetry.log", "a", encoding="utf-8") as f:
f.write(json.dumps(event) + "\n")
except Exception as e:
logger.warning(f"Failed to write telemetry: {e}")
@app.post("/api/telemetry", tags=["System"])
async def telemetry(event: TelemetryEvent, request: Request, user: Dict[str, Any] = Depends(require_auth)):
"""Accept telemetry/observability events from the client for troubleshooting."""
try:
evt = event.model_dump()
# Enrich with user and request context
evt['user'] = {
"username": user.get("username"),
"role": user.get("role")
}
evt['client'] = {
"ip": request.client.host if request.client else None,
"user_agent": request.headers.get('user-agent')
}
# Prefer Supabase persistent storage; fallback to file if not configured
if getattr(db, 'connected', False):
ok = db.log_telemetry(evt)
if ok:
logger.info(f"Telemetry stored: {evt.get('event_type')} status={evt.get('status')}")
else:
logger.warning("Telemetry DB insert failed, writing to file")
_write_telemetry(evt)
else:
logger.info("DB not connected, writing telemetry to file")
_write_telemetry(evt)
return {"ok": True}
except Exception as e:
logger.error(f"Telemetry error: {e}")
raise HTTPException(status_code=500, detail="Failed to record telemetry")
# Telemetry query (admin-only)
@app.get("/api/telemetry", tags=["System"])
async def get_telemetry(limit: int = 100, user: Dict[str, Any] = Depends(require_permission("admin"))):
"""Return recent telemetry events. Uses Supabase if connected, else reads telemetry.log."""
limit = max(1, min(1000, limit))
try:
if getattr(db, 'connected', False):
events = db.get_recent_telemetry(limit)
# Normalize shape to always include evt.user = {username, role}
norm_events: List[Dict[str, Any]] = []
for e in (events or []):
if 'user' not in e or not e.get('user'):
username = e.get('username')
role = e.get('role')
if username or role:
e = {**e, 'user': {'username': username, 'role': role}}
norm_events.append(e)
return {"events": norm_events, "source": "supabase", "count": len(norm_events)}
# Fallback to file
events: List[Dict[str, Any]] = []
try:
with open("telemetry.log", "r", encoding="utf-8") as f:
lines = f.readlines()
for line in lines[-limit:]:
line = line.strip()
if not line:
continue
try:
events.append(json.loads(line))
except Exception:
continue
except FileNotFoundError:
events = []
return {"events": events, "source": "file", "count": len(events)}
except Exception as e:
logger.error(f"Get telemetry failed: {e}")
raise HTTPException(status_code=500, detail="Failed to fetch telemetry")
# Version info
@app.get("/api/version", tags=["System"])
async def get_version():
"""Get current application version"""
return {
"version": "3.0.0",
"backend": "supabase",
"database": "postgres",
"storage": "supabase-storage",
"timestamp": int(time.time()),
"build": build_time,
"server_time": datetime.now().isoformat()
}
if __name__ == "__main__":
uvicorn.run(
"app:app", host="127.0.0.1", port=8000, reload=True, log_level="info"
)