apigateway / core /models.py
jebin2's picture
ref
cfe2de7
"""
SQLAlchemy models for the APIGateway application.
Tables:
- User: Server-side users with credits
- ClientUser: Client identifiers mapping to server users
- AuditLog: Unified client/server audit logging
- GeminiJob: AI job queue
- PaymentTransaction: Credit purchases
- Contact: Support tickets
- ApiKeyUsage: API key load balancing
- RateLimit: Rate limiting
"""
from sqlalchemy import Column, Integer, String, Text, DateTime, JSON, Boolean, ForeignKey, Index
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from core.database import Base
# =============================================================================
# User & Client Tracking
# =============================================================================
class User(Base):
"""
User model for credit system.
Supports Google OAuth authentication.
One User can have many ClientUser mappings.
"""
__tablename__ = "users"
id = Column(Integer, primary_key=True, autoincrement=True)
user_id = Column(String(50), unique=True, index=True, nullable=False) # Backend generated UUID (public-facing)
email = Column(String(255), unique=True, index=True, nullable=False)
# Google OAuth fields
google_id = Column(String(255), unique=True, index=True, nullable=True) # Google sub claim
name = Column(String(255), nullable=True) # Display name from Google
profile_picture = Column(Text, nullable=True) # Google profile picture URL
# Token versioning for JWT invalidation
token_version = Column(Integer, default=1, nullable=False)
# Credits and status
credits = Column(Integer, default=0)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
last_used_at = Column(DateTime(timezone=True), nullable=True)
is_active = Column(Boolean, default=True)
deleted_at = Column(DateTime(timezone=True), nullable=True, index=True) # Soft delete timestamp
# Relationships
client_users = relationship("ClientUser", back_populates="user", lazy="dynamic")
jobs = relationship("GeminiJob", back_populates="user", lazy="dynamic")
payments = relationship("PaymentTransaction", back_populates="user", lazy="dynamic")
contacts = relationship("Contact", back_populates="user", lazy="dynamic")
audit_logs = relationship("AuditLog", back_populates="user", lazy="dynamic")
def __repr__(self):
return f"<User(id={self.id}, email={self.email})>"
class ClientUser(Base):
"""
Map server users to multiple client identifiers.
Enables tracking users across devices, IPs, and login states.
Use cases:
- Track same user across multiple devices
- Link anonymous activity to logged-in user
- Detect same device before/after login
- Track users by network/location
"""
__tablename__ = "client_users"
id = Column(Integer, primary_key=True, autoincrement=True)
user_id = Column(Integer, ForeignKey("users.id"), index=True, nullable=True) # FK to users.id (nullable for anonymous)
client_user_id = Column(String(100), index=True, nullable=True) # Client-side temp identifier
# IP tracking for network/location correlation (standardized to single column)
ip_address = Column(String(45), nullable=True, index=True) # Supports both IPv4 and IPv6
# Device identification
device_fingerprint = Column(String(255), nullable=True, index=True) # Browser fingerprint hash
device_info = Column(JSON, nullable=True) # Browser, OS, screen size, language, etc.
# Timestamps
created_at = Column(DateTime(timezone=True), server_default=func.now())
last_seen_at = Column(DateTime(timezone=True), nullable=True)
deleted_at = Column(DateTime(timezone=True), nullable=True, index=True) # Soft delete timestamp
# Relationship
user = relationship("User", back_populates="client_users")
def __repr__(self):
return f"<ClientUser(user_id={self.user_id}, client_user_id={self.client_user_id})>"
# =============================================================================
# Audit Logging
# =============================================================================
class AuditLog(Base):
"""
Unified audit log for client and server events.
Replaces BlinkData - captures all trackable events.
log_type:
- "client": Login attempts, page views, API calls from client
- "server": Job processing, credit changes, internal operations
"""
__tablename__ = "audit_logs"
id = Column(Integer, primary_key=True, autoincrement=True)
log_type = Column(String(20), index=True, nullable=False) # "client" or "server"
# User tracking
user_id = Column(Integer, ForeignKey("users.id"), nullable=True, index=True) # FK to users.id
client_user_id = Column(String(100), nullable=True, index=True) # For anonymous client logs
# Event details
action = Column(String(50), index=True, nullable=False) # login, page_view, job_created, etc.
details = Column(JSON, nullable=True) # Flexible data storage
# Request context
ip_address = Column(String(45), nullable=True)
user_agent = Column(String(500), nullable=True)
refer_url = Column(Text, nullable=True)
# Outcome
status = Column(String(20), nullable=False) # success, failure, error
error_message = Column(Text, nullable=True)
# Timestamp
timestamp = Column(DateTime(timezone=True), server_default=func.now(), index=True)
deleted_at = Column(DateTime(timezone=True), nullable=True, index=True) # Soft delete timestamp
# Relationship
user = relationship("User", back_populates="audit_logs")
def __repr__(self):
return f"<AuditLog(type={self.log_type}, action={self.action}, status={self.status})>"
# =============================================================================
# Job Queue
# =============================================================================
class GeminiJob(Base):
"""
Generic job queue for Gemini operations (video, image, text).
Uses priority-tier system for worker assignment.
"""
__tablename__ = "gemini_jobs"
# Composite index for efficient queue polling: WHERE status = X AND next_process_at <= NOW() ORDER BY priority
__table_args__ = (
Index('ix_jobs_queue_poll', 'status', 'next_process_at', 'priority'),
)
id = Column(Integer, primary_key=True, autoincrement=True)
job_id = Column(String(100), unique=True, index=True, nullable=False) # Our ID for client
user_id = Column(Integer, ForeignKey("users.id"), index=True, nullable=False) # FK to users.id
job_type = Column(String(20), index=True, nullable=False) # video, image, text, analyze
third_party_id = Column(String(255), nullable=True) # Gemini operation name (for video)
status = Column(String(20), default="queued") # queued, processing, completed, failed, cancelled
# Priority-tier worker system (indexed via composite index above)
priority = Column(String(10), default="fast") # fast (5s), medium (30s), slow (60s)
next_process_at = Column(DateTime(timezone=True), nullable=True) # When worker should pick up again
retry_count = Column(Integer, default=0) # Number of status check retries
input_data = Column(JSON, nullable=True) # Request details (prompt, settings, etc.)
output_data = Column(JSON, nullable=True) # Result (filename, text, etc.)
api_response = Column(JSON, nullable=True) # Raw response from third-party API (success or error)
error_message = Column(Text, nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
started_at = Column(DateTime(timezone=True), nullable=True)
completed_at = Column(DateTime(timezone=True), nullable=True)
deleted_at = Column(DateTime(timezone=True), nullable=True, index=True) # Soft delete timestamp
# Credit tracking for reservation pattern
credits_reserved = Column(Integer, default=0) # Credits reserved for this job
credits_refunded = Column(Boolean, default=False) # Whether credits were refunded
# Relationship
user = relationship("User", back_populates="jobs")
def __repr__(self):
return f"<GeminiJob(job_id={self.job_id}, type={self.job_type}, status={self.status}, priority={self.priority})>"
# =============================================================================
# Payments
# =============================================================================
class PaymentTransaction(Base):
"""
Track payment transactions for credit purchases.
Supports multiple payment gateways (Razorpay now, Stripe later).
"""
__tablename__ = "payment_transactions"
id = Column(Integer, primary_key=True, autoincrement=True)
transaction_id = Column(String(50), unique=True, index=True, nullable=False) # Our internal ID
user_id = Column(Integer, ForeignKey("users.id"), index=True, nullable=False) # FK to users.id
# Order details
gateway = Column(String(20), nullable=False) # razorpay, stripe
gateway_order_id = Column(String(100), index=True, nullable=True) # Razorpay order_id
gateway_payment_id = Column(String(100), index=True, nullable=True) # Razorpay payment_id
# Package details
package_id = Column(String(50), nullable=False) # starter, standard, pro
credits_amount = Column(Integer, nullable=False) # Credits to add
amount_paise = Column(Integer, nullable=False) # Amount in paise (INR * 100)
currency = Column(String(3), default="INR")
# Status tracking
status = Column(String(20), default="created", index=True) # created, paid, failed, refunded
verified_by = Column(String(20), nullable=True) # client, webhook, or both
# Timestamps
created_at = Column(DateTime(timezone=True), server_default=func.now())
paid_at = Column(DateTime(timezone=True), nullable=True)
deleted_at = Column(DateTime(timezone=True), nullable=True, index=True) # Soft delete timestamp
# Metadata
razorpay_signature = Column(String(255), nullable=True) # For verification audit
error_message = Column(Text, nullable=True)
extra_data = Column(JSON, nullable=True) # Additional gateway-specific data
# Relationship
user = relationship("User", back_populates="payments")
def __repr__(self):
return f"<PaymentTransaction(id={self.transaction_id}, status={self.status}, amount={self.amount_paise})>"
# =============================================================================
# Support & Utilities
# =============================================================================
class Contact(Base):
"""
Contact form submissions from authenticated users.
For customer support inquiries.
"""
__tablename__ = "contacts"
id = Column(Integer, primary_key=True, autoincrement=True)
user_id = Column(Integer, ForeignKey("users.id"), index=True, nullable=False) # FK to users.id
email = Column(String(255), nullable=False, index=True) # User's email
subject = Column(String(500), nullable=True)
message = Column(Text, nullable=False)
ip_address = Column(String(45), nullable=True) # IPv6 can be up to 45 chars
created_at = Column(DateTime(timezone=True), server_default=func.now())
deleted_at = Column(DateTime(timezone=True), nullable=True, index=True) # Soft delete timestamp
# Relationship
user = relationship("User", back_populates="contacts")
def __repr__(self):
return f"<Contact(id={self.id}, user_id={self.user_id}, email={self.email})>"
class RateLimit(Base):
"""
Rate limit tracking table.
"""
__tablename__ = "rate_limits"
id = Column(Integer, primary_key=True, autoincrement=True)
identifier = Column(String(255), index=True, nullable=False) # IP or email
endpoint = Column(String(255), index=True, nullable=False)
attempts = Column(Integer, default=0)
window_start = Column(DateTime(timezone=True), nullable=False)
expires_at = Column(DateTime(timezone=True), nullable=False)
deleted_at = Column(DateTime(timezone=True), nullable=True, index=True) # Soft delete timestamp
class ApiKeyUsage(Base):
"""
Track API key usage for round-robin load balancing.
Only stores the key index, not the actual key for security.
"""
__tablename__ = "api_key_usage"
id = Column(Integer, primary_key=True, autoincrement=True)
key_index = Column(Integer, unique=True, index=True, nullable=False) # Index of key in GEMINI_API_KEYS
total_requests = Column(Integer, default=0)
success_count = Column(Integer, default=0)
failure_count = Column(Integer, default=0)
last_error = Column(Text, nullable=True) # Stores the last error message for review
last_used_at = Column(DateTime(timezone=True), nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
def __repr__(self):
return f"<ApiKeyUsage(index={self.key_index}, total={self.total_requests}, success={self.success_count}, failed={self.failure_count})>"
# =============================================================================
# Credit Transactions
# =============================================================================
class CreditTransaction(Base):
"""
Complete audit trail for all credit operations.
All credit changes (reserve, refund, confirm, purchase) are recorded here.
Enables balance verification, transaction history, and debugging.
Transaction types:
- reserve: Credits deducted when request arrives (negative amount)
- refund: Credits returned due to failure (positive amount)
- confirm: Credits confirmed as used (amount = 0, marks finalization)
- purchase: Credits added via payment (positive amount)
- adjustment: Manual admin adjustment (positive or negative)
"""
__tablename__ = "credit_transactions"
# Composite indexes for efficient queries
__table_args__ = (
Index('ix_user_transactions', 'user_id', 'created_at'),
Index('ix_transaction_type', 'transaction_type'),
Index('ix_reference', 'reference_type', 'reference_id'),
)
id = Column(Integer, primary_key=True, autoincrement=True)
transaction_id = Column(String(100), unique=True, index=True, nullable=False) # ctx_<uuid>, ref_<uuid>, etc.
user_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True) # FK to users.id
# Transaction details
transaction_type = Column(String(20), nullable=False) # reserve, refund, confirm, purchase, adjustment
amount = Column(Integer, nullable=False) # Positive for credits added, negative for removed
balance_before = Column(Integer, nullable=False) # User balance before transaction
balance_after = Column(Integer, nullable=False) # User balance after transaction
# Context
source = Column(String(50), nullable=False) # middleware, payment, job_completion, manual, admin
reference_type = Column(String(30), nullable=True) # job, payment, request, NULL
reference_id = Column(String(100), nullable=True) # job_id, transaction_id, request endpoint, etc.
# Request/Response metadata (for middleware transactions)
request_path = Column(String(500), nullable=True)
request_method = Column(String(10), nullable=True)
response_status = Column(Integer, nullable=True)
# Additional details
reason = Column(Text, nullable=True) # Human-readable reason
extra_data = Column(JSON, nullable=True) # Additional context (endpoint_type, error_message, etc.)
# Timestamps
created_at = Column(DateTime(timezone=True), server_default=func.now(), index=True)
deleted_at = Column(DateTime(timezone=True), nullable=True, index=True) # Soft delete
# Relationship
user = relationship("User", backref="credit_transactions")
def __repr__(self):
return f"<CreditTransaction(id={self.transaction_id}, type={self.transaction_type}, amount={self.amount}, user={self.user_id})>"