File size: 4,694 Bytes
c19c7bf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
"""
Usage tracking service.
Tracks token usage and costs for each LLM request.
"""
from sqlalchemy.orm import Session
from sqlalchemy import func, and_
from datetime import datetime, timedelta
from typing import Optional
import uuid
import logging

from app.db.models import UsageEvent, UsageDaily, UsageMonthly, Tenant
from app.billing.pricing import calculate_cost
from app.billing.quota import ensure_tenant_exists

logger = logging.getLogger(__name__)


def track_usage(
    db: Session,
    tenant_id: str,
    user_id: str,
    kb_id: str,
    provider: str,
    model: str,
    prompt_tokens: int,
    completion_tokens: int,
    request_timestamp: Optional[datetime] = None
) -> UsageEvent:
    """
    Track a single usage event.
    
    Args:
        db: Database session
        tenant_id: Tenant ID
        user_id: User ID
        kb_id: Knowledge base ID
        provider: "gemini" or "openai"
        model: Model name
        prompt_tokens: Input tokens
        completion_tokens: Output tokens
        request_timestamp: Request timestamp (defaults to now)
        
    Returns:
        Created UsageEvent
    """
    # Ensure tenant exists
    ensure_tenant_exists(db, tenant_id)
    
    # Calculate cost
    total_tokens = prompt_tokens + completion_tokens
    estimated_cost = calculate_cost(provider, model, prompt_tokens, completion_tokens)
    
    # Create usage event
    request_id = f"req_{uuid.uuid4().hex[:16]}"
    timestamp = request_timestamp or datetime.utcnow()
    
    usage_event = UsageEvent(
        request_id=request_id,
        tenant_id=tenant_id,
        user_id=user_id,
        kb_id=kb_id,
        provider=provider,
        model=model,
        prompt_tokens=prompt_tokens,
        completion_tokens=completion_tokens,
        total_tokens=total_tokens,
        estimated_cost_usd=estimated_cost,
        request_timestamp=timestamp
    )
    
    db.add(usage_event)
    
    # Update daily aggregation
    _update_daily_usage(db, tenant_id, timestamp, provider, total_tokens, estimated_cost)
    
    # Update monthly aggregation
    _update_monthly_usage(db, tenant_id, timestamp, provider, total_tokens, estimated_cost)
    
    db.commit()
    db.refresh(usage_event)
    
    logger.info(
        f"Tracked usage: tenant={tenant_id}, provider={provider}, "
        f"tokens={total_tokens}, cost=${estimated_cost:.6f}"
    )
    
    return usage_event


def _update_daily_usage(
    db: Session,
    tenant_id: str,
    timestamp: datetime,
    provider: str,
    tokens: int,
    cost: float
):
    """Update daily usage aggregation."""
    date = timestamp.date()
    date_start = datetime.combine(date, datetime.min.time())
    
    daily = db.query(UsageDaily).filter(
        and_(
            UsageDaily.tenant_id == tenant_id,
            UsageDaily.date == date_start
        )
    ).first()
    
    if daily:
        daily.total_requests += 1
        daily.total_tokens += tokens
        daily.total_cost_usd += cost
        if provider == "gemini":
            daily.gemini_requests += 1
        elif provider == "openai":
            daily.openai_requests += 1
        daily.updated_at = datetime.utcnow()
    else:
        daily = UsageDaily(
            tenant_id=tenant_id,
            date=date_start,
            total_requests=1,
            total_tokens=tokens,
            total_cost_usd=cost,
            gemini_requests=1 if provider == "gemini" else 0,
            openai_requests=1 if provider == "openai" else 0
        )
        db.add(daily)


def _update_monthly_usage(
    db: Session,
    tenant_id: str,
    timestamp: datetime,
    provider: str,
    tokens: int,
    cost: float
):
    """Update monthly usage aggregation."""
    year = timestamp.year
    month = timestamp.month
    
    monthly = db.query(UsageMonthly).filter(
        and_(
            UsageMonthly.tenant_id == tenant_id,
            UsageMonthly.year == year,
            UsageMonthly.month == month
        )
    ).first()
    
    if monthly:
        monthly.total_requests += 1
        monthly.total_tokens += tokens
        monthly.total_cost_usd += cost
        if provider == "gemini":
            monthly.gemini_requests += 1
        elif provider == "openai":
            monthly.openai_requests += 1
        monthly.updated_at = datetime.utcnow()
    else:
        monthly = UsageMonthly(
            tenant_id=tenant_id,
            year=year,
            month=month,
            total_requests=1,
            total_tokens=tokens,
            total_cost_usd=cost,
            gemini_requests=1 if provider == "gemini" else 0,
            openai_requests=1 if provider == "openai" else 0
        )
        db.add(monthly)