Spaces:
Sleeping
Sleeping
| """ | |
| Invoice Pricing Service - Calculate invoice amounts based on project pricing rules | |
| Supports multiple pricing models: | |
| - Tiered: Different rates based on ticket count | |
| - Flat Rate: Same price per ticket | |
| - Time Based: Hourly/daily rates | |
| - Custom: Formula-based pricing | |
| """ | |
| import logging | |
| from typing import List, Tuple, Optional | |
| from decimal import Decimal | |
| from datetime import datetime | |
| from uuid import UUID | |
| from sqlalchemy.orm import Session | |
| from app.models.project import Project | |
| from app.models.ticket import Ticket | |
| from app.schemas.invoice_pricing import ( | |
| TieredPricingRules, | |
| FlatRatePricingRules, | |
| TimeBasedPricingRules, | |
| CustomPricingRules, | |
| PricingCalculationResult, | |
| TierCalculation | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class InvoicePricingService: | |
| """Service for calculating invoice pricing based on project rules""" | |
| def calculate_invoice_pricing( | |
| project: Project, | |
| tickets: List[Ticket], | |
| db: Session | |
| ) -> Optional[PricingCalculationResult]: | |
| """ | |
| Calculate invoice pricing based on project rules | |
| Args: | |
| project: Project with pricing rules | |
| tickets: List of tickets to invoice | |
| db: Database session | |
| Returns: | |
| PricingCalculationResult or None if no pricing rules | |
| """ | |
| if not project.invoice_pricing_rules: | |
| logger.info(f"No pricing rules for project {project.id}. Manual pricing required.") | |
| return None | |
| pricing_rules = project.invoice_pricing_rules | |
| pricing_model = pricing_rules.get('pricing_model') | |
| if pricing_model == 'tiered': | |
| return InvoicePricingService._calculate_tiered_pricing( | |
| project, tickets, pricing_rules, db | |
| ) | |
| elif pricing_model == 'flat_rate': | |
| return InvoicePricingService._calculate_flat_rate_pricing( | |
| tickets, pricing_rules | |
| ) | |
| elif pricing_model == 'time_based': | |
| return InvoicePricingService._calculate_time_based_pricing( | |
| tickets, pricing_rules | |
| ) | |
| elif pricing_model == 'custom': | |
| return InvoicePricingService._calculate_custom_pricing( | |
| tickets, pricing_rules | |
| ) | |
| else: | |
| logger.warning(f"Unknown pricing model: {pricing_model}") | |
| return None | |
| def _calculate_tiered_pricing( | |
| project: Project, | |
| tickets: List[Ticket], | |
| pricing_rules: dict, | |
| db: Session | |
| ) -> PricingCalculationResult: | |
| """ | |
| Calculate tiered pricing | |
| Example: First 500 @ 3200, next 300 @ 2800, rest @ 2500 | |
| """ | |
| # Parse pricing rules | |
| try: | |
| rules = TieredPricingRules(**pricing_rules) | |
| except Exception as e: | |
| logger.error(f"Invalid tiered pricing rules: {e}") | |
| raise ValueError(f"Invalid pricing rules: {e}") | |
| total_tickets = len(tickets) | |
| apply_per_invoice = rules.apply_per_invoice | |
| # Determine starting ticket number | |
| if apply_per_invoice: | |
| # Reset count per invoice - always start from 1 | |
| starting_ticket_number = 1 | |
| else: | |
| # Cumulative mode - continue from last invoiced ticket | |
| starting_ticket_number = project.total_tickets_invoiced + 1 | |
| # Calculate pricing for each tier | |
| tiers_applied = [] | |
| remaining_tickets = total_tickets | |
| current_ticket_number = starting_ticket_number | |
| subtotal = Decimal('0') | |
| for tier in rules.tiers: | |
| if remaining_tickets <= 0: | |
| break | |
| # Determine how many tickets fall in this tier | |
| tier_start = tier.from_ticket | |
| tier_end = tier.to_ticket if tier.to_ticket else float('inf') | |
| # Calculate overlap between current tickets and this tier | |
| if current_ticket_number > tier_end: | |
| # Current tickets are beyond this tier | |
| continue | |
| # Calculate tickets in this tier | |
| tickets_in_tier_start = max(current_ticket_number, tier_start) | |
| tickets_in_tier_end = min(current_ticket_number + remaining_tickets - 1, tier_end) | |
| tickets_in_tier = int(tickets_in_tier_end - tickets_in_tier_start + 1) | |
| if tickets_in_tier <= 0: | |
| continue | |
| # Calculate tier subtotal | |
| tier_subtotal = Decimal(str(tickets_in_tier)) * tier.rate_per_ticket | |
| subtotal += tier_subtotal | |
| # Record tier calculation | |
| tiers_applied.append(TierCalculation( | |
| tier_number=tier.tier_number, | |
| from_ticket=int(tickets_in_tier_start), | |
| to_ticket=int(tickets_in_tier_end) if tier.to_ticket else None, | |
| tickets_in_tier=tickets_in_tier, | |
| rate_per_ticket=tier.rate_per_ticket, | |
| subtotal=tier_subtotal, | |
| description=tier.description | |
| )) | |
| # Update counters | |
| remaining_tickets -= tickets_in_tier | |
| current_ticket_number += tickets_in_tier | |
| # Calculate tax | |
| tax_rate = rules.tax_rate | |
| tax_amount = (subtotal * tax_rate / Decimal('100')).quantize(Decimal('0.01')) | |
| total_amount = subtotal + tax_amount | |
| return PricingCalculationResult( | |
| pricing_model='tiered', | |
| currency=rules.currency, | |
| total_tickets=total_tickets, | |
| tiers_applied=tiers_applied, | |
| subtotal=subtotal, | |
| tax_rate=tax_rate, | |
| tax_amount=tax_amount, | |
| discount_amount=Decimal('0'), | |
| total_amount=total_amount, | |
| calculation_notes=rules.notes, | |
| applied_at=datetime.utcnow().isoformat() | |
| ) | |
| def _calculate_flat_rate_pricing( | |
| tickets: List[Ticket], | |
| pricing_rules: dict | |
| ) -> PricingCalculationResult: | |
| """Calculate flat rate pricing - same price per ticket""" | |
| try: | |
| rules = FlatRatePricingRules(**pricing_rules) | |
| except Exception as e: | |
| logger.error(f"Invalid flat rate pricing rules: {e}") | |
| raise ValueError(f"Invalid pricing rules: {e}") | |
| total_tickets = len(tickets) | |
| subtotal = Decimal(str(total_tickets)) * rules.rate_per_ticket | |
| # Calculate tax | |
| tax_amount = (subtotal * rules.tax_rate / Decimal('100')).quantize(Decimal('0.01')) | |
| total_amount = subtotal + tax_amount | |
| return PricingCalculationResult( | |
| pricing_model='flat_rate', | |
| currency=rules.currency, | |
| total_tickets=total_tickets, | |
| tiers_applied=None, | |
| subtotal=subtotal, | |
| tax_rate=rules.tax_rate, | |
| tax_amount=tax_amount, | |
| discount_amount=Decimal('0'), | |
| total_amount=total_amount, | |
| calculation_notes=rules.notes, | |
| applied_at=datetime.utcnow().isoformat() | |
| ) | |
| def _calculate_time_based_pricing( | |
| tickets: List[Ticket], | |
| pricing_rules: dict | |
| ) -> PricingCalculationResult: | |
| """ | |
| Calculate time-based pricing | |
| Note: This is a placeholder. Actual implementation would need | |
| to calculate hours/days from ticket data. | |
| """ | |
| try: | |
| rules = TimeBasedPricingRules(**pricing_rules) | |
| except Exception as e: | |
| logger.error(f"Invalid time-based pricing rules: {e}") | |
| raise ValueError(f"Invalid pricing rules: {e}") | |
| # TODO: Calculate actual time from tickets | |
| # For now, assume 1 unit per ticket | |
| total_units = len(tickets) | |
| subtotal = Decimal(str(total_units)) * rules.rate_per_unit | |
| # Calculate tax | |
| tax_amount = (subtotal * rules.tax_rate / Decimal('100')).quantize(Decimal('0.01')) | |
| total_amount = subtotal + tax_amount | |
| return PricingCalculationResult( | |
| pricing_model='time_based', | |
| currency=rules.currency, | |
| total_tickets=len(tickets), | |
| tiers_applied=None, | |
| subtotal=subtotal, | |
| tax_rate=rules.tax_rate, | |
| tax_amount=tax_amount, | |
| discount_amount=Decimal('0'), | |
| total_amount=total_amount, | |
| calculation_notes=f"{rules.notes} ({total_units} {rules.rate_period}s)", | |
| applied_at=datetime.utcnow().isoformat() | |
| ) | |
| def _calculate_custom_pricing( | |
| tickets: List[Ticket], | |
| pricing_rules: dict | |
| ) -> PricingCalculationResult: | |
| """ | |
| Calculate custom formula-based pricing | |
| Note: This is a placeholder. Actual implementation would need | |
| to safely evaluate the formula. | |
| """ | |
| try: | |
| rules = CustomPricingRules(**pricing_rules) | |
| except Exception as e: | |
| logger.error(f"Invalid custom pricing rules: {e}") | |
| raise ValueError(f"Invalid pricing rules: {e}") | |
| # TODO: Implement safe formula evaluation | |
| # For now, return zero | |
| logger.warning("Custom pricing not fully implemented. Returning zero.") | |
| return PricingCalculationResult( | |
| pricing_model='custom', | |
| currency=rules.currency, | |
| total_tickets=len(tickets), | |
| tiers_applied=None, | |
| subtotal=Decimal('0'), | |
| tax_rate=rules.tax_rate, | |
| tax_amount=Decimal('0'), | |
| discount_amount=Decimal('0'), | |
| total_amount=Decimal('0'), | |
| calculation_notes=f"Custom formula: {rules.formula} (not yet implemented)", | |
| applied_at=datetime.utcnow().isoformat() | |
| ) | |
| def generate_line_items_from_pricing( | |
| pricing_result: PricingCalculationResult, | |
| tickets: List[Ticket] | |
| ) -> List[dict]: | |
| """ | |
| Generate invoice line items from pricing calculation | |
| For tiered pricing, creates one line item per tier. | |
| For other models, creates one line item for all tickets. | |
| """ | |
| line_items = [] | |
| if pricing_result.pricing_model == 'tiered' and pricing_result.tiers_applied: | |
| # Create line item per tier | |
| for tier_calc in pricing_result.tiers_applied: | |
| line_items.append({ | |
| "type": "pricing_tier", | |
| "description": tier_calc.description or f"Tickets {tier_calc.from_ticket}-{tier_calc.to_ticket or '∞'} @ {pricing_result.currency} {tier_calc.rate_per_ticket}", | |
| "quantity": tier_calc.tickets_in_tier, | |
| "unit_price": float(tier_calc.rate_per_ticket), | |
| "total": float(tier_calc.subtotal), | |
| "tier_number": tier_calc.tier_number | |
| }) | |
| else: | |
| # Single line item for all tickets | |
| unit_price = float(pricing_result.subtotal / Decimal(str(pricing_result.total_tickets))) if pricing_result.total_tickets > 0 else 0 | |
| line_items.append({ | |
| "type": "tickets", | |
| "description": f"{pricing_result.total_tickets} tickets @ {pricing_result.currency} {unit_price:.2f}", | |
| "quantity": pricing_result.total_tickets, | |
| "unit_price": unit_price, | |
| "total": float(pricing_result.subtotal) | |
| }) | |
| return line_items | |
| def update_project_ticket_count( | |
| project: Project, | |
| tickets_count: int, | |
| db: Session | |
| ): | |
| """ | |
| Update project's total_tickets_invoiced counter | |
| Used for cumulative pricing mode | |
| """ | |
| project.total_tickets_invoiced += tickets_count | |
| db.commit() | |
| logger.info(f"Updated project {project.id} ticket count: +{tickets_count} = {project.total_tickets_invoiced}") | |