Spaces:
Runtime error
Runtime error
| from datetime import datetime | |
| from io import StringIO | |
| import csv | |
| from fastapi import APIRouter, HTTPException, status, Query | |
| from fastapi.responses import StreamingResponse | |
| from sqlalchemy import or_, cast, String | |
| from backend.core.dependencies import DbSession, AdminUser | |
| from backend.schemas.admin import ( | |
| AuditLogResponse, SystemStatsResponse, SiteCreate, SiteUpdate | |
| ) | |
| from backend.schemas.site import SiteResponse | |
| from backend.models import ( | |
| User, Group, Site, Crop, AuditLog, SensorData, | |
| EquipmentGroup, Parameter | |
| ) | |
| router = APIRouter() | |
| def get_system_stats(db: DbSession, admin: AdminUser): | |
| return SystemStatsResponse( | |
| total_users=db.query(User).count(), | |
| active_users=db.query(User).filter(User.is_active == True).count(), | |
| total_groups=db.query(Group).count(), | |
| total_sites=db.query(Site).count(), | |
| active_sites=db.query(Site).filter(Site.is_active == True).count(), | |
| total_sensor_records=db.query(SensorData).count(), | |
| total_parameters=db.query(Parameter).count(), | |
| total_equipment_groups=db.query(EquipmentGroup).count() | |
| ) | |
| def get_audit_logs( | |
| db: DbSession, | |
| admin: AdminUser, | |
| limit: int = Query(default=100, le=1000), | |
| offset: int = Query(default=0), | |
| user_id: str | None = None, | |
| action: str | None = None, | |
| search: str | None = Query(default=None, description="Search in user email, IP address, resource type/id, and details"), | |
| start_date: datetime | None = Query(default=None, description="Filter logs from this date (ISO format)"), | |
| end_date: datetime | None = Query(default=None, description="Filter logs until this date (ISO format)") | |
| ): | |
| """Get audit logs with optional filtering and search.""" | |
| query = db.query(AuditLog).order_by(AuditLog.created_at.desc()) | |
| if user_id: | |
| query = query.filter(AuditLog.user_id == user_id) | |
| if action: | |
| query = query.filter(AuditLog.action == action) | |
| if start_date: | |
| query = query.filter(AuditLog.created_at >= start_date) | |
| if end_date: | |
| query = query.filter(AuditLog.created_at <= end_date) | |
| # Get all user emails for search matching | |
| all_users = db.query(User).all() | |
| user_map = {u.id: u.email for u in all_users} | |
| email_to_id = {u.email.lower(): u.id for u in all_users} | |
| # Apply search filter | |
| if search: | |
| search_lower = search.lower() | |
| # First check if search matches a user email | |
| matching_user_ids = [uid for email, uid in email_to_id.items() if search_lower in email] | |
| # Build OR conditions for search | |
| search_conditions = [ | |
| AuditLog.ip_address.ilike(f"%{search}%"), | |
| AuditLog.resource_type.ilike(f"%{search}%"), | |
| AuditLog.resource_id.ilike(f"%{search}%"), | |
| AuditLog.action.ilike(f"%{search}%"), | |
| cast(AuditLog.details, String).ilike(f"%{search}%"), | |
| ] | |
| # Add user_id matches if any emails matched | |
| if matching_user_ids: | |
| search_conditions.append(AuditLog.user_id.in_(matching_user_ids)) | |
| query = query.filter(or_(*search_conditions)) | |
| logs = query.offset(offset).limit(limit).all() | |
| return [ | |
| AuditLogResponse( | |
| id=log.id, | |
| user_id=log.user_id, | |
| user_email=user_map.get(log.user_id) if log.user_id else None, | |
| action=log.action, | |
| resource_type=log.resource_type, | |
| resource_id=log.resource_id, | |
| details=log.details, | |
| ip_address=log.ip_address, | |
| created_at=log.created_at | |
| ) | |
| for log in logs | |
| ] | |
| def export_audit_logs( | |
| db: DbSession, | |
| admin: AdminUser, | |
| user_id: str | None = None, | |
| action: str | None = None, | |
| search: str | None = None, | |
| start_date: datetime | None = None, | |
| end_date: datetime | None = None | |
| ): | |
| """Export audit logs as CSV file.""" | |
| query = db.query(AuditLog).order_by(AuditLog.created_at.desc()) | |
| if user_id: | |
| query = query.filter(AuditLog.user_id == user_id) | |
| if action: | |
| query = query.filter(AuditLog.action == action) | |
| if start_date: | |
| query = query.filter(AuditLog.created_at >= start_date) | |
| if end_date: | |
| query = query.filter(AuditLog.created_at <= end_date) | |
| # Get all user emails for mapping | |
| all_users = db.query(User).all() | |
| user_map = {u.id: u.email for u in all_users} | |
| email_to_id = {u.email.lower(): u.id for u in all_users} | |
| # Apply search filter | |
| if search: | |
| search_lower = search.lower() | |
| matching_user_ids = [uid for email, uid in email_to_id.items() if search_lower in email] | |
| search_conditions = [ | |
| AuditLog.ip_address.ilike(f"%{search}%"), | |
| AuditLog.resource_type.ilike(f"%{search}%"), | |
| AuditLog.resource_id.ilike(f"%{search}%"), | |
| AuditLog.action.ilike(f"%{search}%"), | |
| cast(AuditLog.details, String).ilike(f"%{search}%"), | |
| ] | |
| if matching_user_ids: | |
| search_conditions.append(AuditLog.user_id.in_(matching_user_ids)) | |
| query = query.filter(or_(*search_conditions)) | |
| logs = query.all() | |
| # Create CSV in memory | |
| output = StringIO() | |
| writer = csv.writer(output) | |
| # Write header | |
| writer.writerow([ | |
| "ID", "Timestamp (UTC)", "User Email", "Action", "Resource Type", | |
| "Resource ID", "IP Address", "Details" | |
| ]) | |
| # Write data rows | |
| for log in logs: | |
| writer.writerow([ | |
| log.id, | |
| log.created_at.isoformat() if log.created_at else "", | |
| user_map.get(log.user_id, "") if log.user_id else "", | |
| log.action, | |
| log.resource_type or "", | |
| log.resource_id or "", | |
| log.ip_address or "", | |
| str(log.details) if log.details else "" | |
| ]) | |
| output.seek(0) | |
| # Generate filename with timestamp | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| filename = f"audit_log_export_{timestamp}.csv" | |
| return StreamingResponse( | |
| iter([output.getvalue()]), | |
| media_type="text/csv", | |
| headers={"Content-Disposition": f"attachment; filename={filename}"} | |
| ) | |
| def list_all_sites(db: DbSession, admin: AdminUser): | |
| """List all sites (including inactive) for admin management.""" | |
| sites = db.query(Site).all() | |
| return [ | |
| SiteResponse( | |
| id=s.id, | |
| site_code=s.site_code, | |
| name=s.name, | |
| crop_id=s.crop_id, | |
| crop_name=s.crop.display_name if s.crop else None, | |
| latitude=s.latitude, | |
| longitude=s.longitude, | |
| is_active=s.is_active | |
| ) | |
| for s in sites | |
| ] | |
| def create_site(site_data: SiteCreate, db: DbSession, admin: AdminUser): | |
| existing = db.query(Site).filter(Site.site_code == site_data.site_code).first() | |
| if existing: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Site with this code already exists" | |
| ) | |
| crop = db.query(Crop).filter(Crop.id == site_data.crop_id).first() | |
| if not crop: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Invalid crop_id" | |
| ) | |
| site = Site( | |
| site_code=site_data.site_code, | |
| name=site_data.name, | |
| crop_id=site_data.crop_id, | |
| latitude=site_data.latitude, | |
| longitude=site_data.longitude | |
| ) | |
| db.add(site) | |
| db.commit() | |
| db.refresh(site) | |
| # Log the action | |
| log = AuditLog( | |
| user_id=admin.id, | |
| action="site_created", | |
| resource_type="site", | |
| resource_id=site.id, | |
| details={"site_code": site.site_code, "name": site.name} | |
| ) | |
| db.add(log) | |
| db.commit() | |
| return SiteResponse( | |
| id=site.id, | |
| site_code=site.site_code, | |
| name=site.name, | |
| crop_id=site.crop_id, | |
| crop_name=crop.display_name, | |
| latitude=site.latitude, | |
| longitude=site.longitude, | |
| is_active=site.is_active | |
| ) | |
| def update_site(site_id: str, site_data: SiteUpdate, db: DbSession, admin: AdminUser): | |
| site = db.query(Site).filter(Site.id == site_id).first() | |
| if not site: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Site not found" | |
| ) | |
| if site_data.site_code is not None: | |
| existing = db.query(Site).filter(Site.site_code == site_data.site_code, Site.id != site_id).first() | |
| if existing: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Site with this code already exists" | |
| ) | |
| site.site_code = site_data.site_code | |
| if site_data.name is not None: | |
| site.name = site_data.name | |
| if site_data.crop_id is not None: | |
| crop = db.query(Crop).filter(Crop.id == site_data.crop_id).first() | |
| if not crop: | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid crop_id") | |
| site.crop_id = site_data.crop_id | |
| if site_data.latitude is not None: | |
| site.latitude = site_data.latitude | |
| if site_data.longitude is not None: | |
| site.longitude = site_data.longitude | |
| if site_data.is_active is not None: | |
| site.is_active = site_data.is_active | |
| db.commit() | |
| db.refresh(site) | |
| # Log the action | |
| log = AuditLog( | |
| user_id=admin.id, | |
| action="site_updated", | |
| resource_type="site", | |
| resource_id=site.id, | |
| details={"site_code": site.site_code} | |
| ) | |
| db.add(log) | |
| db.commit() | |
| return SiteResponse( | |
| id=site.id, | |
| site_code=site.site_code, | |
| name=site.name, | |
| crop_id=site.crop_id, | |
| crop_name=site.crop.display_name if site.crop else None, | |
| latitude=site.latitude, | |
| longitude=site.longitude, | |
| is_active=site.is_active | |
| ) | |
| def delete_site(site_id: str, db: DbSession, admin: AdminUser): | |
| site = db.query(Site).filter(Site.id == site_id).first() | |
| if not site: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Site not found" | |
| ) | |
| # Log before deletion | |
| log = AuditLog( | |
| user_id=admin.id, | |
| action="site_deleted", | |
| resource_type="site", | |
| resource_id=site.id, | |
| details={"site_code": site.site_code, "name": site.name} | |
| ) | |
| db.add(log) | |
| db.delete(site) | |
| db.commit() | |
| def list_crops_admin(db: DbSession, admin: AdminUser): | |
| """List all crops with site counts for admin.""" | |
| crops = db.query(Crop).all() | |
| return [ | |
| { | |
| "id": c.id, | |
| "name": c.name, | |
| "display_name": c.display_name, | |
| "color": c.color, | |
| "site_count": db.query(Site).filter(Site.crop_id == c.id).count() | |
| } | |
| for c in crops | |
| ] | |