apigateway / core /models.py
jebin2's picture
zero def
40628e2
raw
history blame
12.3 kB
"""
SQLAlchemy models for the APIGateway application.
Tables:
- User: Server-side users with credits
- ClientUser: Client identifiers mapping to server users
- AuditLog: Unified client/server audit logging
- GeminiJob: AI job queue
- PaymentTransaction: Credit purchases
- Contact: Support tickets
- ApiKeyUsage: API key load balancing
- RateLimit: Rate limiting
"""
from sqlalchemy import Column, Integer, String, Text, DateTime, JSON, Boolean, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from core.database import Base
# =============================================================================
# User & Client Tracking
# =============================================================================
class User(Base):
"""
User model for credit system.
Supports Google OAuth authentication.
One User can have many ClientUser mappings.
"""
__tablename__ = "users"
id = Column(Integer, primary_key=True, autoincrement=True, index=True)
user_id = Column(String(50), unique=True, index=True, nullable=False) # Backend generated UUID
email = Column(String(255), unique=True, index=True, nullable=False)
# Google OAuth fields
google_id = Column(String(255), unique=True, index=True, nullable=True) # Google sub claim
name = Column(String(255), nullable=True) # Display name from Google
profile_picture = Column(Text, nullable=True) # Google profile picture URL
# Token versioning for JWT invalidation
token_version = Column(Integer, default=1, nullable=False)
# Credits and status
credits = Column(Integer, default=0)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
last_used_at = Column(DateTime(timezone=True), nullable=True)
is_active = Column(Boolean, default=True)
# Relationships
client_users = relationship("ClientUser", back_populates="user", lazy="dynamic")
jobs = relationship("GeminiJob", back_populates="user", lazy="dynamic")
payments = relationship("PaymentTransaction", back_populates="user", lazy="dynamic")
contacts = relationship("Contact", back_populates="user", lazy="dynamic")
audit_logs = relationship("AuditLog", back_populates="user", lazy="dynamic")
def __repr__(self):
return f"<User(id={self.id}, email={self.email})>"
class ClientUser(Base):
"""
Map server users to multiple client identifiers.
Enables tracking users across devices, IPs, and login states.
Use cases:
- Track same user across multiple devices
- Link anonymous activity to logged-in user
- Detect same device before/after login
- Track users by network/location
"""
__tablename__ = "client_users"
id = Column(Integer, primary_key=True, autoincrement=True, index=True)
user_id = Column(String(50), ForeignKey("users.user_id"), index=True, nullable=True) # Nullable for anonymous tracking
client_user_id = Column(String(100), index=True, nullable=True) # Client-side temp identifier
# IP tracking for network/location correlation
ipv4_address = Column(String(15), nullable=True, index=True)
ipv6_address = Column(String(45), nullable=True, index=True)
# Device identification
device_fingerprint = Column(String(255), nullable=True, index=True) # Browser fingerprint hash
device_info = Column(JSON, nullable=True) # Browser, OS, screen size, language, etc.
# Timestamps
created_at = Column(DateTime(timezone=True), server_default=func.now())
last_seen_at = Column(DateTime(timezone=True), nullable=True)
# Relationship
user = relationship("User", back_populates="client_users")
def __repr__(self):
return f"<ClientUser(user_id={self.user_id}, client_user_id={self.client_user_id})>"
# =============================================================================
# Audit Logging
# =============================================================================
class AuditLog(Base):
"""
Unified audit log for client and server events.
Replaces BlinkData - captures all trackable events.
log_type:
- "client": Login attempts, page views, API calls from client
- "server": Job processing, credit changes, internal operations
"""
__tablename__ = "audit_logs"
id = Column(Integer, primary_key=True, autoincrement=True, index=True)
log_type = Column(String(20), index=True, nullable=False) # "client" or "server"
# User tracking
user_id = Column(String(50), ForeignKey("users.user_id"), nullable=True, index=True)
client_user_id = Column(String(100), nullable=True, index=True) # For anonymous client logs
# Event details
action = Column(String(50), index=True, nullable=False) # login, page_view, job_created, etc.
details = Column(JSON, nullable=True) # Flexible data storage
# Request context
ip_address = Column(String(45), nullable=True)
user_agent = Column(String(500), nullable=True)
refer_url = Column(Text, nullable=True)
# Outcome
status = Column(String(20), nullable=False) # success, failure, error
error_message = Column(Text, nullable=True)
# Timestamp
timestamp = Column(DateTime(timezone=True), server_default=func.now(), index=True)
# Relationship
user = relationship("User", back_populates="audit_logs")
def __repr__(self):
return f"<AuditLog(type={self.log_type}, action={self.action}, status={self.status})>"
# =============================================================================
# Job Queue
# =============================================================================
class GeminiJob(Base):
"""
Generic job queue for Gemini operations (video, image, text).
Uses priority-tier system for worker assignment.
"""
__tablename__ = "gemini_jobs"
id = Column(Integer, primary_key=True, autoincrement=True, index=True)
job_id = Column(String(100), unique=True, index=True, nullable=False) # Our ID for client
user_id = Column(String(50), ForeignKey("users.user_id"), index=True, nullable=False)
job_type = Column(String(20), index=True, nullable=False) # video, image, text, analyze
third_party_id = Column(String(255), nullable=True) # Gemini operation name (for video)
status = Column(String(20), default="queued", index=True) # queued, processing, completed, failed, cancelled
# Priority-tier worker system
priority = Column(String(10), default="fast", index=True) # fast (5s), medium (30s), slow (60s)
next_process_at = Column(DateTime(timezone=True), nullable=True, index=True) # When worker should pick up again
retry_count = Column(Integer, default=0) # Number of status check retries
input_data = Column(JSON, nullable=True) # Request details (prompt, settings, etc.)
output_data = Column(JSON, nullable=True) # Result (filename, text, etc.)
api_response = Column(JSON, nullable=True) # Raw response from third-party API (success or error)
error_message = Column(Text, nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
started_at = Column(DateTime(timezone=True), nullable=True)
completed_at = Column(DateTime(timezone=True), nullable=True)
# Credit tracking for reservation pattern
credits_reserved = Column(Integer, default=0) # Credits reserved for this job
credits_refunded = Column(Boolean, default=False) # Whether credits were refunded
# Relationship
user = relationship("User", back_populates="jobs")
def __repr__(self):
return f"<GeminiJob(job_id={self.job_id}, type={self.job_type}, status={self.status}, priority={self.priority})>"
# =============================================================================
# Payments
# =============================================================================
class PaymentTransaction(Base):
"""
Track payment transactions for credit purchases.
Supports multiple payment gateways (Razorpay now, Stripe later).
"""
__tablename__ = "payment_transactions"
id = Column(Integer, primary_key=True, autoincrement=True, index=True)
transaction_id = Column(String(50), unique=True, index=True, nullable=False) # Our internal ID
user_id = Column(String(50), ForeignKey("users.user_id"), index=True, nullable=False)
# Order details
gateway = Column(String(20), nullable=False) # razorpay, stripe
gateway_order_id = Column(String(100), index=True, nullable=True) # Razorpay order_id
gateway_payment_id = Column(String(100), index=True, nullable=True) # Razorpay payment_id
# Package details
package_id = Column(String(50), nullable=False) # starter, standard, pro
credits_amount = Column(Integer, nullable=False) # Credits to add
amount_paise = Column(Integer, nullable=False) # Amount in paise (INR * 100)
currency = Column(String(3), default="INR")
# Status tracking
status = Column(String(20), default="created", index=True) # created, paid, failed, refunded
verified_by = Column(String(20), nullable=True) # client, webhook, or both
# Timestamps
created_at = Column(DateTime(timezone=True), server_default=func.now())
paid_at = Column(DateTime(timezone=True), nullable=True)
# Metadata
razorpay_signature = Column(String(255), nullable=True) # For verification audit
error_message = Column(Text, nullable=True)
extra_data = Column(JSON, nullable=True) # Additional gateway-specific data
# Relationship
user = relationship("User", back_populates="payments")
def __repr__(self):
return f"<PaymentTransaction(id={self.transaction_id}, status={self.status}, amount={self.amount_paise})>"
# =============================================================================
# Support & Utilities
# =============================================================================
class Contact(Base):
"""
Contact form submissions from authenticated users.
For customer support inquiries.
"""
__tablename__ = "contacts"
id = Column(Integer, primary_key=True, autoincrement=True, index=True)
user_id = Column(String(50), ForeignKey("users.user_id"), index=True, nullable=False)
email = Column(String(255), nullable=False, index=True) # User's email
subject = Column(String(500), nullable=True)
message = Column(Text, nullable=False)
ip_address = Column(String(45), nullable=True) # IPv6 can be up to 45 chars
created_at = Column(DateTime(timezone=True), server_default=func.now())
# Relationship
user = relationship("User", back_populates="contacts")
def __repr__(self):
return f"<Contact(id={self.id}, user_id={self.user_id}, email={self.email})>"
class RateLimit(Base):
"""
Rate limit tracking table.
"""
__tablename__ = "rate_limits"
id = Column(Integer, primary_key=True, autoincrement=True, index=True)
identifier = Column(String(255), index=True, nullable=False) # IP or email
endpoint = Column(String(255), index=True, nullable=False)
attempts = Column(Integer, default=0)
window_start = Column(DateTime(timezone=True), nullable=False)
expires_at = Column(DateTime(timezone=True), nullable=False)
class ApiKeyUsage(Base):
"""
Track API key usage for round-robin load balancing.
Only stores the key index, not the actual key for security.
"""
__tablename__ = "api_key_usage"
id = Column(Integer, primary_key=True, autoincrement=True)
key_index = Column(Integer, unique=True, index=True, nullable=False) # Index of key in GEMINI_API_KEYS
total_requests = Column(Integer, default=0)
success_count = Column(Integer, default=0)
failure_count = Column(Integer, default=0)
last_error = Column(Text, nullable=True) # Stores the last error message for review
last_used_at = Column(DateTime(timezone=True), nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
def __repr__(self):
return f"<ApiKeyUsage(index={self.key_index}, total={self.total_requests}, success={self.success_count}, failed={self.failure_count})>"