Spaces:
Sleeping
Sleeping
| """ | |
| Blink Router - Admin endpoints for viewing and tracking data. | |
| Uses AuditLog for unified client/server event tracking. | |
| """ | |
| from fastapi import APIRouter, Depends, HTTPException, status, Query, Request, Response | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy import select, func | |
| import ipaddress | |
| import logging | |
| from core.database import get_db | |
| from core.models import User, GeminiJob, Contact, ClientUser | |
| from services.encryption_service import decrypt_multiple_blocks | |
| from core.utils import get_geolocation | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter() | |
| # User ID length constant | |
| USER_ID_LENGTH = 20 | |
| # ============================================================================= | |
| # Admin Data Endpoints | |
| # ============================================================================= | |
| async def get_data( | |
| request: Request, | |
| page: int = Query(1, ge=1, description="Page number"), | |
| limit: int = Query(100, ge=1, le=500, description="Items per page"), | |
| log_type: str = Query(None, description="Filter by log type: client, server"), | |
| db: AsyncSession = Depends(get_db) | |
| ): | |
| """ | |
| Get paginated audit log data for the authenticated user. | |
| Admins see all logs from all users. | |
| Auth handled by AuthMiddleware - user in request.state.user | |
| """ | |
| user = request.state.user | |
| from services.db_service import QueryService | |
| try: | |
| qs = QueryService(user, db) | |
| offset = (page - 1) * limit | |
| # Build query with optional filter | |
| query = select(AuditLog) | |
| if log_type: | |
| query = query.where(AuditLog.log_type == log_type) | |
| # Automatically filtered by user (unless admin) | |
| query = query.order_by(AuditLog.timestamp.desc()).offset(offset).limit(limit) | |
| items = await qs.select().execute(query) | |
| # Count also automatically filtered | |
| count_query = select(AuditLog) | |
| if log_type: | |
| count_query = count_query.where(AuditLog.log_type == log_type) | |
| total = await qs.select().count(count_query) | |
| # Unique users: 1 for regular users, multiple for admins | |
| unique_users = 1 if not qs.select().is_admin else total | |
| return { | |
| "items": [ | |
| { | |
| "id": item.id, | |
| "log_type": item.log_type, | |
| "user_id": item.user_id, | |
| "client_user_id": item.client_user_id, | |
| "action": item.action, | |
| "details": item.details, | |
| "ip_address": item.ip_address, | |
| "refer_url": item.refer_url, | |
| "status": item.status, | |
| "error_message": item.error_message, | |
| "timestamp": item.timestamp.isoformat() if item.timestamp else None | |
| } | |
| for item in items | |
| ], | |
| "total": total, | |
| "unique_users": unique_users, | |
| "page": page, | |
| "limit": limit | |
| } | |
| except Exception as e: | |
| logger.error(f"Error fetching data: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Error fetching data" | |
| ) | |
| async def get_users( | |
| request: Request, | |
| page: int = Query(1, ge=1, description="Page number"), | |
| limit: int = Query(50, ge=1, le=500, description="Items per page"), | |
| db: AsyncSession = Depends(get_db) | |
| ): | |
| """ | |
| Get current user's profile data. | |
| Admins see paginated list of all users. | |
| Auth handled by AuthMiddleware - user in request.state.user | |
| """ | |
| user = request.state.user | |
| from services.db_service import QueryService | |
| try: | |
| qs = QueryService(user, db) | |
| if qs.select().is_admin: | |
| # Admins get paginated list of all users | |
| offset = (page - 1) * limit | |
| query = select(User).order_by(User.id.desc()).offset(offset).limit(limit) | |
| items = await qs.select().execute(query) # No filter for admin | |
| total = await qs.select().count(select(User)) | |
| else: | |
| # Regular users only see their own profile | |
| query = select(User) | |
| items = await qs.select().execute(query) # Filtered to current user | |
| total = 1 | |
| return { | |
| "items": [ | |
| { | |
| "id": item.id, | |
| "user_id": item.user_id, | |
| "email": item.email, | |
| "name": item.name, | |
| "google_id": item.google_id[:10] + "..." if item.google_id else None, | |
| "credits": item.credits, | |
| "is_active": item.is_active, | |
| "created_at": item.created_at.isoformat() if item.created_at else None, | |
| "last_used_at": item.last_used_at.isoformat() if item.last_used_at else None | |
| } | |
| for item in items | |
| ], | |
| "total": total, | |
| "page": page, | |
| "limit": limit | |
| } | |
| except Exception as e: | |
| logger.error(f"Error fetching users: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Error fetching users" | |
| ) | |
| async def get_client_users( | |
| request: Request, | |
| page: int = Query(1, ge=1, description="Page number"), | |
| limit: int = Query(50, ge=1, le=500, description="Items per page"), | |
| user_id: str = Query(None, description="Filter by server user_id"), | |
| db: AsyncSession = Depends(get_db) | |
| ): | |
| """ | |
| Get current user's client mappings. | |
| Admins see all client mappings from all users. | |
| Auth handled by AuthMiddleware - user in request.state.user | |
| """ | |
| user = request.state.user | |
| from services.db_service import QueryService | |
| try: | |
| qs = QueryService(user, db) | |
| offset = (page - 1) * limit | |
| # Automatically filtered by user (unless admin) | |
| query = select(ClientUser).order_by(ClientUser.id.desc()).offset(offset).limit(limit) | |
| items = await qs.select().execute(query) | |
| total = await qs.select().count(select(ClientUser)) | |
| return { | |
| "items": [ | |
| { | |
| "id": item.id, | |
| "user_id": item.user_id, | |
| "client_user_id": item.client_user_id, | |
| "ip_address": item.ip_address, # Standardized IP column | |
| "device_fingerprint": item.device_fingerprint, | |
| "device_info": item.device_info, | |
| "created_at": item.created_at.isoformat() if item.created_at else None, | |
| "last_seen_at": item.last_seen_at.isoformat() if item.last_seen_at else None | |
| } | |
| for item in items | |
| ], | |
| "total": total, | |
| "page": page, | |
| "limit": limit | |
| } | |
| except Exception as e: | |
| logger.error(f"Error fetching client users: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Error fetching client users" | |
| ) | |
| async def get_audit_logs( | |
| request: Request, | |
| page: int = Query(1, ge=1, description="Page number"), | |
| limit: int = Query(50, ge=1, le=500, description="Items per page"), | |
| log_type: str = Query(None, description="Filter by log type: client, server"), | |
| action: str = Query(None, description="Filter by action"), | |
| db: AsyncSession = Depends(get_db) | |
| ): | |
| """ | |
| Get current user's audit logs with optional filters. | |
| Admins see all logs from all users. | |
| Auth handled by AuthMiddleware - user in request.state.user | |
| """ | |
| user = request.state.user | |
| from services.db_service import QueryService | |
| try: | |
| qs = QueryService(user, db) | |
| offset = (page - 1) * limit | |
| # Build query with filters | |
| query = select(AuditLog) | |
| if log_type: | |
| query = query.where(AuditLog.log_type == log_type) | |
| if action: | |
| query = query.where(AuditLog.action == action) | |
| # Automatically filtered by user (unless admin) | |
| query = query.order_by(AuditLog.timestamp.desc()).offset(offset).limit(limit) | |
| items = await qs.select().execute(query) | |
| # Count with same filters | |
| count_query = select(AuditLog) | |
| if log_type: | |
| count_query = count_query.where(AuditLog.log_type == log_type) | |
| if action: | |
| count_query = count_query.where(AuditLog.action == action) | |
| total = await qs.select().count(count_query) | |
| return { | |
| "items": [ | |
| { | |
| "id": item.id, | |
| "log_type": item.log_type, | |
| "user_id": item.user_id, | |
| "client_user_id": item.client_user_id, | |
| "action": item.action, | |
| "details": item.details, | |
| "ip_address": item.ip_address, | |
| "status": item.status, | |
| "error_message": item.error_message, | |
| "timestamp": item.timestamp.isoformat() if item.timestamp else None | |
| } | |
| for item in items | |
| ], | |
| "total": total, | |
| "page": page, | |
| "limit": limit | |
| } | |
| except Exception as e: | |
| logger.error(f"Error fetching audit logs: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Error fetching audit logs" | |
| ) | |
| async def get_gemini_jobs( | |
| request: Request, | |
| page: int = Query(1, ge=1, description="Page number"), | |
| limit: int = Query(50, ge=1, le=500, description="Items per page"), | |
| db: AsyncSession = Depends(get_db) | |
| ): | |
| """ | |
| Get current user's Gemini jobs. | |
| Admins see all jobs from all users. | |
| Auth handled by AuthMiddleware - user in request.state.user | |
| """ | |
| user = request.state.user | |
| from services.db_service import QueryService | |
| try: | |
| qs = QueryService(user, db) | |
| offset = (page - 1) * limit | |
| # Automatically filtered by user (unless admin) | |
| query = select(GeminiJob).order_by(GeminiJob.id.desc()).offset(offset).limit(limit) | |
| items = await qs.select().execute(query) | |
| # Count also automatically filtered | |
| total = await qs.select().count(select(GeminiJob)) | |
| return { | |
| "items": [ | |
| { | |
| "id": item.id, | |
| "job_id": item.job_id, | |
| "user_id": item.user_id, | |
| "job_type": item.job_type, | |
| "status": item.status, | |
| "error_message": item.error_message, | |
| "created_at": item.created_at.isoformat() if item.created_at else None, | |
| "completed_at": item.completed_at.isoformat() if item.completed_at else None | |
| } | |
| for item in items | |
| ], | |
| "total": total, | |
| "page": page, | |
| "limit": limit | |
| } | |
| except Exception as e: | |
| logger.error(f"Error fetching gemini jobs: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Error fetching gemini jobs" | |
| ) | |
| async def get_payment_transactions( | |
| request: Request, | |
| page: int = Query(1, ge=1, description="Page number"), | |
| limit: int = Query(50, ge=1, le=500, description="Items per page"), | |
| db: AsyncSession = Depends(get_db) | |
| ): | |
| """ | |
| Get current user's payment transactions. | |
| Admins see all transactions from all users. | |
| Auth handled by AuthMiddleware - user in request.state.user | |
| """ | |
| user = request.state.user | |
| from core.models import PaymentTransaction | |
| from services.db_service import QueryService | |
| try: | |
| qs = QueryService(user, db) | |
| offset = (page - 1) * limit | |
| # Automatically filtered by user (unless admin) | |
| total = await qs.select().count(select(PaymentTransaction)) | |
| # Get paid transactions revenue - automatically filtered | |
| revenue_query = select(func.sum(PaymentTransaction.amount_paise)).where( | |
| PaymentTransaction.status == "paid" | |
| ) | |
| revenue_result = await qs.execute_one(revenue_query) | |
| total_revenue_paise = revenue_result or 0 | |
| # Get paginated items - automatically filtered | |
| query = select(PaymentTransaction).order_by( | |
| PaymentTransaction.id.desc() | |
| ).offset(offset).limit(limit) | |
| items = await qs.select().execute(query) | |
| return { | |
| "items": [ | |
| { | |
| "id": item.id, | |
| "transaction_id": item.transaction_id, | |
| "user_id": item.user_id, | |
| "gateway": item.gateway, | |
| "gateway_order_id": item.gateway_order_id, | |
| "gateway_payment_id": item.gateway_payment_id, | |
| "package_id": item.package_id, | |
| "credits_amount": item.credits_amount, | |
| "amount_paise": item.amount_paise, | |
| "amount_rupees": item.amount_paise / 100, | |
| "currency": item.currency, | |
| "status": item.status, | |
| "verified_by": item.verified_by, | |
| "error_message": item.error_message, | |
| "created_at": item.created_at.isoformat() if item.created_at else None, | |
| "paid_at": item.paid_at.isoformat() if item.paid_at else None | |
| } | |
| for item in items | |
| ], | |
| "total": total, | |
| "total_revenue_paise": total_revenue_paise, | |
| "total_revenue_rupees": total_revenue_paise / 100, | |
| "page": page, | |
| "limit": limit | |
| } | |
| except Exception as e: | |
| logger.error(f"Error fetching payment transactions: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Error fetching payment transactions" | |
| ) | |
| async def get_contacts( | |
| request: Request, | |
| page: int = Query(1, ge=1, description="Page number"), | |
| limit: int = Query(50, ge=1, le=500, description="Items per page"), | |
| db: AsyncSession = Depends(get_db) | |
| ): | |
| """ | |
| Get current user's contact form submissions. | |
| Admins see all contact submissions from all users. | |
| Auth handled by AuthMiddleware - user in request.state.user | |
| """ | |
| user = request.state.user | |
| from services.db_service import QueryService | |
| try: | |
| qs = QueryService(user, db) | |
| offset = (page - 1) * limit | |
| # Automatically filtered by user (unless admin) | |
| query = select(Contact).order_by(Contact.id.desc()).offset(offset).limit(limit) | |
| items = await qs.select().execute(query) | |
| total = await qs.select().count(select(Contact)) | |
| return { | |
| "items": [ | |
| { | |
| "id": item.id, | |
| "user_id": item.user_id, | |
| "email": item.email, | |
| "subject": item.subject, | |
| "message": item.message, | |
| "ip_address": item.ip_address, | |
| "created_at": item.created_at.isoformat() if item.created_at else None | |
| } | |
| for item in items | |
| ], | |
| "total": total, | |
| "page": page, | |
| "limit": limit | |
| } | |
| except Exception as e: | |
| logger.error(f"Error fetching contacts: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Error fetching contacts" | |
| ) | |
| # ============================================================================= | |
| # Client Tracking Endpoint | |
| # ============================================================================= | |
| async def blink( | |
| request: Request, | |
| userid: str = Query(..., description="User ID (20 chars) + encrypted data"), | |
| db: AsyncSession = Depends(get_db) | |
| ): | |
| """ | |
| Process blink request with encrypted user data. | |
| Logs to AuditLog with log_type='client'. | |
| Auth is optional (handled by AuthMiddleware): | |
| - If authenticated: user in request.state.user | |
| - If not authenticated: request.state.user is None | |
| If authenticated via JWT: | |
| - Creates a new ClientUser entry linking client_user_id to server user_id | |
| - Sets user_id in AuditLog entries | |
| If not authenticated: | |
| - Creates AuditLog entries with user_id=None (anonymous) | |
| """ | |
| # Optional auth - may be None | |
| current_user = request.state.user | |
| try: | |
| # Validate minimum length | |
| if len(userid) < USER_ID_LENGTH: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"Parameter 'userid' must be at least {USER_ID_LENGTH} characters" | |
| ) | |
| # Extract client_user_id (first 20 characters) | |
| client_user_id = userid[:USER_ID_LENGTH] | |
| # Extract encrypted data (remaining characters) | |
| encrypted_data = userid[USER_ID_LENGTH:] | |
| if not encrypted_data: | |
| logger.warning(f"No encrypted data received for client: {client_user_id}") | |
| decrypted_results = [] | |
| else: | |
| try: | |
| decrypted_results = decrypt_multiple_blocks(encrypted_data) | |
| except Exception as e: | |
| logger.error(f"Decryption failed for client {client_user_id}: {e}") | |
| decrypted_results = [{"error": str(e), "raw_encrypted": encrypted_data[:100]}] | |
| # Get referer URL from headers | |
| refer_url = request.headers.get("referer") | |
| user_agent = request.headers.get("user-agent") | |
| # Get client IP address | |
| forwarded_for = request.headers.get("x-forwarded-for") | |
| if forwarded_for: | |
| ip_address = forwarded_for.split(",")[0].strip() | |
| else: | |
| ip_address = request.client.host if request.client else None | |
| # Determine IPv4/IPv6 | |
| ipv4_address = None | |
| ipv6_address = None | |
| if ip_address: | |
| try: | |
| ip_obj = ipaddress.ip_address(ip_address) | |
| if ip_obj.version == 4: | |
| ipv4_address = ip_address | |
| else: | |
| ipv6_address = ip_address | |
| except ValueError: | |
| pass # Invalid IP, leave both as None | |
| # Get geolocation from IP address | |
| country, region = await get_geolocation(ip_address) | |
| # Determine server user_id (if authenticated) | |
| server_user_id = current_user.id if current_user else None # Integer FK or None | |
| # Always create a ClientUser entry (user_id is None for anonymous users) | |
| new_client_user = ClientUser( | |
| user_id=server_user_id, # Integer FK (None if anonymous) | |
| client_user_id=client_user_id, | |
| ip_address=ip_address, # Standardized IP column | |
| device_info={"user_agent": user_agent} if user_agent else None | |
| ) | |
| db.add(new_client_user) | |
| auth_status = "authenticated" if current_user else "anonymous" | |
| logger.info(f"Created ClientUser entry: user_id={server_user_id}, client_user_id={client_user_id}, auth: {auth_status}") | |
| # Store each decrypted result as separate audit log entries | |
| records_created = 0 | |
| for json_data in decrypted_results: | |
| # Add geolocation to details | |
| details = { | |
| "data": json_data, | |
| "country": country, | |
| "region": region | |
| } | |
| # Use AuditService for consistent logging | |
| from services.audit_service import AuditService | |
| await AuditService.log_event( | |
| db=db, | |
| action="blink", | |
| status="success", | |
| user_id=server_user_id, | |
| client_user_id=client_user_id, | |
| details=details, | |
| request=request, | |
| log_type="client" | |
| ) | |
| records_created += 1 | |
| # If no results but we have encrypted data, store a record | |
| if not decrypted_results and encrypted_data: | |
| # Use AuditService for consistent logging | |
| from services.audit_service import AuditService | |
| details = {"encrypted_length": len(encrypted_data), "country": country, "region": region} | |
| await AuditService.log_event( | |
| db=db, | |
| action="blink", | |
| status="success", | |
| user_id=server_user_id, | |
| client_user_id=client_user_id, | |
| details=details, | |
| request=request, | |
| log_type="client" | |
| ) | |
| records_created = 1 | |
| await db.commit() | |
| auth_status = "authenticated" if current_user else "anonymous" | |
| logger.info(f"Successfully processed blink for client: {client_user_id}, records: {records_created}, auth: {auth_status}") | |
| # Return 204 No Content for silent tracking | |
| return Response(status_code=status.HTTP_204_NO_CONTENT) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error processing blink request: {e}") | |
| await db.rollback() | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Internal server error processing request" | |
| ) | |