swiftops-backend / src /app /services /invoice_pricing_service.py
kamau1's picture
feat: add flexible invoice pricing system with tiered, flat rate, time-based, and custom pricing models
f18d2f1
"""
Invoice Pricing Service - Calculate invoice amounts based on project pricing rules
Supports multiple pricing models:
- Tiered: Different rates based on ticket count
- Flat Rate: Same price per ticket
- Time Based: Hourly/daily rates
- Custom: Formula-based pricing
"""
import logging
from typing import List, Tuple, Optional
from decimal import Decimal
from datetime import datetime
from uuid import UUID
from sqlalchemy.orm import Session
from app.models.project import Project
from app.models.ticket import Ticket
from app.schemas.invoice_pricing import (
TieredPricingRules,
FlatRatePricingRules,
TimeBasedPricingRules,
CustomPricingRules,
PricingCalculationResult,
TierCalculation
)
logger = logging.getLogger(__name__)
class InvoicePricingService:
"""Service for calculating invoice pricing based on project rules"""
@staticmethod
def calculate_invoice_pricing(
project: Project,
tickets: List[Ticket],
db: Session
) -> Optional[PricingCalculationResult]:
"""
Calculate invoice pricing based on project rules
Args:
project: Project with pricing rules
tickets: List of tickets to invoice
db: Database session
Returns:
PricingCalculationResult or None if no pricing rules
"""
if not project.invoice_pricing_rules:
logger.info(f"No pricing rules for project {project.id}. Manual pricing required.")
return None
pricing_rules = project.invoice_pricing_rules
pricing_model = pricing_rules.get('pricing_model')
if pricing_model == 'tiered':
return InvoicePricingService._calculate_tiered_pricing(
project, tickets, pricing_rules, db
)
elif pricing_model == 'flat_rate':
return InvoicePricingService._calculate_flat_rate_pricing(
tickets, pricing_rules
)
elif pricing_model == 'time_based':
return InvoicePricingService._calculate_time_based_pricing(
tickets, pricing_rules
)
elif pricing_model == 'custom':
return InvoicePricingService._calculate_custom_pricing(
tickets, pricing_rules
)
else:
logger.warning(f"Unknown pricing model: {pricing_model}")
return None
@staticmethod
def _calculate_tiered_pricing(
project: Project,
tickets: List[Ticket],
pricing_rules: dict,
db: Session
) -> PricingCalculationResult:
"""
Calculate tiered pricing
Example: First 500 @ 3200, next 300 @ 2800, rest @ 2500
"""
# Parse pricing rules
try:
rules = TieredPricingRules(**pricing_rules)
except Exception as e:
logger.error(f"Invalid tiered pricing rules: {e}")
raise ValueError(f"Invalid pricing rules: {e}")
total_tickets = len(tickets)
apply_per_invoice = rules.apply_per_invoice
# Determine starting ticket number
if apply_per_invoice:
# Reset count per invoice - always start from 1
starting_ticket_number = 1
else:
# Cumulative mode - continue from last invoiced ticket
starting_ticket_number = project.total_tickets_invoiced + 1
# Calculate pricing for each tier
tiers_applied = []
remaining_tickets = total_tickets
current_ticket_number = starting_ticket_number
subtotal = Decimal('0')
for tier in rules.tiers:
if remaining_tickets <= 0:
break
# Determine how many tickets fall in this tier
tier_start = tier.from_ticket
tier_end = tier.to_ticket if tier.to_ticket else float('inf')
# Calculate overlap between current tickets and this tier
if current_ticket_number > tier_end:
# Current tickets are beyond this tier
continue
# Calculate tickets in this tier
tickets_in_tier_start = max(current_ticket_number, tier_start)
tickets_in_tier_end = min(current_ticket_number + remaining_tickets - 1, tier_end)
tickets_in_tier = int(tickets_in_tier_end - tickets_in_tier_start + 1)
if tickets_in_tier <= 0:
continue
# Calculate tier subtotal
tier_subtotal = Decimal(str(tickets_in_tier)) * tier.rate_per_ticket
subtotal += tier_subtotal
# Record tier calculation
tiers_applied.append(TierCalculation(
tier_number=tier.tier_number,
from_ticket=int(tickets_in_tier_start),
to_ticket=int(tickets_in_tier_end) if tier.to_ticket else None,
tickets_in_tier=tickets_in_tier,
rate_per_ticket=tier.rate_per_ticket,
subtotal=tier_subtotal,
description=tier.description
))
# Update counters
remaining_tickets -= tickets_in_tier
current_ticket_number += tickets_in_tier
# Calculate tax
tax_rate = rules.tax_rate
tax_amount = (subtotal * tax_rate / Decimal('100')).quantize(Decimal('0.01'))
total_amount = subtotal + tax_amount
return PricingCalculationResult(
pricing_model='tiered',
currency=rules.currency,
total_tickets=total_tickets,
tiers_applied=tiers_applied,
subtotal=subtotal,
tax_rate=tax_rate,
tax_amount=tax_amount,
discount_amount=Decimal('0'),
total_amount=total_amount,
calculation_notes=rules.notes,
applied_at=datetime.utcnow().isoformat()
)
@staticmethod
def _calculate_flat_rate_pricing(
tickets: List[Ticket],
pricing_rules: dict
) -> PricingCalculationResult:
"""Calculate flat rate pricing - same price per ticket"""
try:
rules = FlatRatePricingRules(**pricing_rules)
except Exception as e:
logger.error(f"Invalid flat rate pricing rules: {e}")
raise ValueError(f"Invalid pricing rules: {e}")
total_tickets = len(tickets)
subtotal = Decimal(str(total_tickets)) * rules.rate_per_ticket
# Calculate tax
tax_amount = (subtotal * rules.tax_rate / Decimal('100')).quantize(Decimal('0.01'))
total_amount = subtotal + tax_amount
return PricingCalculationResult(
pricing_model='flat_rate',
currency=rules.currency,
total_tickets=total_tickets,
tiers_applied=None,
subtotal=subtotal,
tax_rate=rules.tax_rate,
tax_amount=tax_amount,
discount_amount=Decimal('0'),
total_amount=total_amount,
calculation_notes=rules.notes,
applied_at=datetime.utcnow().isoformat()
)
@staticmethod
def _calculate_time_based_pricing(
tickets: List[Ticket],
pricing_rules: dict
) -> PricingCalculationResult:
"""
Calculate time-based pricing
Note: This is a placeholder. Actual implementation would need
to calculate hours/days from ticket data.
"""
try:
rules = TimeBasedPricingRules(**pricing_rules)
except Exception as e:
logger.error(f"Invalid time-based pricing rules: {e}")
raise ValueError(f"Invalid pricing rules: {e}")
# TODO: Calculate actual time from tickets
# For now, assume 1 unit per ticket
total_units = len(tickets)
subtotal = Decimal(str(total_units)) * rules.rate_per_unit
# Calculate tax
tax_amount = (subtotal * rules.tax_rate / Decimal('100')).quantize(Decimal('0.01'))
total_amount = subtotal + tax_amount
return PricingCalculationResult(
pricing_model='time_based',
currency=rules.currency,
total_tickets=len(tickets),
tiers_applied=None,
subtotal=subtotal,
tax_rate=rules.tax_rate,
tax_amount=tax_amount,
discount_amount=Decimal('0'),
total_amount=total_amount,
calculation_notes=f"{rules.notes} ({total_units} {rules.rate_period}s)",
applied_at=datetime.utcnow().isoformat()
)
@staticmethod
def _calculate_custom_pricing(
tickets: List[Ticket],
pricing_rules: dict
) -> PricingCalculationResult:
"""
Calculate custom formula-based pricing
Note: This is a placeholder. Actual implementation would need
to safely evaluate the formula.
"""
try:
rules = CustomPricingRules(**pricing_rules)
except Exception as e:
logger.error(f"Invalid custom pricing rules: {e}")
raise ValueError(f"Invalid pricing rules: {e}")
# TODO: Implement safe formula evaluation
# For now, return zero
logger.warning("Custom pricing not fully implemented. Returning zero.")
return PricingCalculationResult(
pricing_model='custom',
currency=rules.currency,
total_tickets=len(tickets),
tiers_applied=None,
subtotal=Decimal('0'),
tax_rate=rules.tax_rate,
tax_amount=Decimal('0'),
discount_amount=Decimal('0'),
total_amount=Decimal('0'),
calculation_notes=f"Custom formula: {rules.formula} (not yet implemented)",
applied_at=datetime.utcnow().isoformat()
)
@staticmethod
def generate_line_items_from_pricing(
pricing_result: PricingCalculationResult,
tickets: List[Ticket]
) -> List[dict]:
"""
Generate invoice line items from pricing calculation
For tiered pricing, creates one line item per tier.
For other models, creates one line item for all tickets.
"""
line_items = []
if pricing_result.pricing_model == 'tiered' and pricing_result.tiers_applied:
# Create line item per tier
for tier_calc in pricing_result.tiers_applied:
line_items.append({
"type": "pricing_tier",
"description": tier_calc.description or f"Tickets {tier_calc.from_ticket}-{tier_calc.to_ticket or '∞'} @ {pricing_result.currency} {tier_calc.rate_per_ticket}",
"quantity": tier_calc.tickets_in_tier,
"unit_price": float(tier_calc.rate_per_ticket),
"total": float(tier_calc.subtotal),
"tier_number": tier_calc.tier_number
})
else:
# Single line item for all tickets
unit_price = float(pricing_result.subtotal / Decimal(str(pricing_result.total_tickets))) if pricing_result.total_tickets > 0 else 0
line_items.append({
"type": "tickets",
"description": f"{pricing_result.total_tickets} tickets @ {pricing_result.currency} {unit_price:.2f}",
"quantity": pricing_result.total_tickets,
"unit_price": unit_price,
"total": float(pricing_result.subtotal)
})
return line_items
@staticmethod
def update_project_ticket_count(
project: Project,
tickets_count: int,
db: Session
):
"""
Update project's total_tickets_invoiced counter
Used for cumulative pricing mode
"""
project.total_tickets_invoiced += tickets_count
db.commit()
logger.info(f"Updated project {project.id} ticket count: +{tickets_count} = {project.total_tickets_invoiced}")