swiftops-backend / src /app /services /report_service.py
kamau1's picture
feat: add custom reports with aggregations and export parity
367d57f
"""
REPORT SERVICE
Handles the generation of complex business intelligence reports.
Supports:
1. SLA Compliance (Customer & Infrastructure)
2. User Performance (Aggregated from Timesheets)
3. Financial Summaries
4. CSV Export generation
Differentiates itself from AnalyticsService by focusing on:
- Large datasets
- Complex Joins
- Downloadable/Exportable formats
"""
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import func, and_, or_, case
from datetime import datetime, date, timedelta
from io import BytesIO, StringIO
import csv
import logging
from pydantic import BaseModel
from app.schemas.report import (
ReportFilter,
ReportType,
SLAReportRow,
UserPerformanceRow,
FinancialReportRow,
InventoryUsageRow
)
from app.models.ticket import Ticket
from app.models.ticket_assignment import TicketAssignment
from app.models.ticket_expense import TicketExpense
from app.models.finance import ProjectFinance
from app.models.timesheet import Timesheet
from app.models.user import User
from app.models.project import Project
from app.models.inventory import InventoryAssignment, ProjectInventory
from app.models.enums import TicketStatus
logger = logging.getLogger(__name__)
class ReportService:
"""
Core service for generating tabular business reports
"""
@staticmethod
def generate_report_data(db: Session, filters: ReportFilter, current_user: User) -> List[Any]:
"""
Main entry point. Routes to specific generator based on ReportType.
"""
if filters.report_type == ReportType.SLA_COMPLIANCE:
return ReportService._generate_sla_report(db, filters)
elif filters.report_type == ReportType.USER_PERFORMANCE:
return ReportService._generate_performance_report(db, filters)
elif filters.report_type == ReportType.FINANCIAL_SUMMARY:
return ReportService._generate_financial_report(db, filters)
elif filters.report_type == ReportType.INVENTORY_USAGE:
return ReportService._generate_inventory_report(db, filters)
elif filters.report_type == ReportType.CUSTOM:
if not filters.custom_config:
raise ValueError("custom_config is required for custom reports")
return ReportService._generate_custom_report(db, filters, current_user)
else:
raise ValueError(f"Unsupported report type: {filters.report_type}")
# =========================================================
# 1. SLA COMPLIANCE REPORT
# =========================================================
@staticmethod
def _generate_sla_report(db: Session, filters: ReportFilter) -> List[SLAReportRow]:
"""
Generates SLA violation & compliance data.
Works for both Customer (SalesOrder) and Infrastructure (Task) tickets
by unifying queries via the Ticket table.
"""
# Base Query
query = db.query(Ticket).options(
joinedload(Ticket.project),
joinedload(Ticket.sales_order), # For customer names
joinedload(Ticket.assignments).joinedload(TicketAssignment.user) # Optimize agent name lookup
).filter(
Ticket.deleted_at.is_(None),
func.date(Ticket.created_at) >= filters.date_range.start_date,
func.date(Ticket.created_at) <= filters.date_range.end_date
)
# Apply Filters
if filters.project_id:
query = query.filter(Ticket.project_id == filters.project_id)
if filters.region_id:
query = query.filter(Ticket.project_region_id == filters.region_id)
if filters.status:
# Flexible status matching (case insensitive)
query = query.filter(Ticket.status == filters.status.lower())
if filters.user_id:
# Join assignments to find tickets assigned to this user
query = query.join(TicketAssignment).filter(
TicketAssignment.user_id == filters.user_id,
TicketAssignment.deleted_at.is_(None)
).distinct() # Avoid duplicates if user was assigned multiple times to same ticket
# Apply Pagination (if limit is set)
if filters.limit is not None:
query = query.offset(filters.skip).limit(filters.limit)
results = query.all()
rows = []
for ticket in results:
# Calculate metrics
violation_margin = 0.0
# If completed, check completion date vs due date
if ticket.completed_at and ticket.due_date:
# Calculate hours diff. Positive = Late. Negative = Early.
delta = ticket.completed_at - ticket.due_date.replace(tzinfo=None) # naive comparison safe if db returns consistent tz
violation_margin = delta.total_seconds() / 3600
# If not completed but overdue
elif not ticket.completed_at and ticket.is_overdue and ticket.due_date:
delta = datetime.utcnow() - ticket.due_date.replace(tzinfo=None)
violation_margin = delta.total_seconds() / 3600
# Resolve Assigned Agent Name (First active assignment or last history)
agent_name = "Unassigned"
if ticket.assignments:
# Grab the first active one
active = next((a for a in ticket.assignments if a.ended_at is None), None)
if active and active.user:
agent_name = active.user.name
elif ticket.assignments:
# Fallback to last person
agent_name = ticket.assignments[-1].user.name if ticket.assignments[-1].user else "Unknown"
# Resolve Customer Name
customer_name = "Internal/Infrastructure"
if ticket.sales_order and ticket.sales_order.customer:
customer_name = ticket.sales_order.customer.customer_name
row = SLAReportRow(
ticket_id=ticket.id,
ticket_number=str(ticket.dedup_key[-8:] if ticket.dedup_key else ticket.id).upper(), # Pseudo-number
project_name=ticket.project.title if ticket.project else "Unknown",
ticket_type=ticket.ticket_type,
priority=ticket.priority,
status=ticket.status,
created_at=ticket.created_at,
due_date=ticket.due_date,
completed_at=ticket.completed_at,
is_violated=ticket.sla_violated,
violation_margin_hours=round(violation_margin, 2),
assigned_agent=agent_name,
customer_name=customer_name
)
rows.append(row)
return rows
# =========================================================
# 2. USER PERFORMANCE REPORT
# =========================================================
@staticmethod
def _generate_performance_report(db: Session, filters: ReportFilter) -> List[UserPerformanceRow]:
"""
Generates performance stats by aggregating TIMESHEETS.
This avoids expensive joins on the ticket table for date ranges.
"""
# Query Timesheets Aggregation
# Group by User and Sum up stats
stats_query = db.query(
Timesheet.user_id,
func.sum(Timesheet.tickets_completed).label('total_completed'),
func.sum(Timesheet.tickets_assigned).label('total_assigned'),
func.sum(Timesheet.hours_worked).label('total_hours')
).filter(
Timesheet.work_date >= filters.date_range.start_date,
Timesheet.work_date <= filters.date_range.end_date,
Timesheet.deleted_at.is_(None)
)
if filters.project_id:
stats_query = stats_query.filter(Timesheet.project_id == filters.project_id)
if filters.user_id:
stats_query = stats_query.filter(Timesheet.user_id == filters.user_id)
if filters.region_id:
# TODO: Implement accurate region filtering for timesheets if columns allow.
pass
# Apply Pagination (Limit on User Groups)
if filters.limit is not None:
stats_query = stats_query.limit(filters.limit).offset(filters.skip)
stats_results = stats_query.group_by(Timesheet.user_id).all()
if not stats_results:
return []
# Optimization: Batch fetch all users
user_ids = [s.user_id for s in stats_results]
users = db.query(User).filter(User.id.in_(user_ids)).all()
user_map = {u.id: u for u in users}
rows = []
for stat in stats_results:
user = user_map.get(stat.user_id)
if not user:
continue
# Calculate Quality Metric (SLA Violations)
# This one DOES require querying tickets because Timesheet doesn't track SLA breaches count explicitly yet
# Optimization: We check SLA count for this user in this date range
# Note: This is an N+1 query. For 50 users, it's 50 efficient index scans. Accepting for now given complexity of batching this specific aggregate.
sla_violations = db.query(func.count(TicketAssignment.id)).join(Ticket).filter(
TicketAssignment.user_id == user.id,
Ticket.sla_violated == True,
func.date(Ticket.completed_at) >= filters.date_range.start_date,
func.date(Ticket.completed_at) <= filters.date_range.end_date
).scalar() or 0
# Derived Metrics
completed = stat.total_completed or 0
assigned = stat.total_assigned or 0
hours = float(stat.total_hours or 0)
completion_rate = (completed / assigned * 100) if assigned > 0 else 0.0
avg_res_time = (hours / completed) if completed > 0 else 0.0
# On Time %
on_time_count = max(0, completed - sla_violations)
on_time_pct = (on_time_count / completed * 100) if completed > 0 else 100.0
row = UserPerformanceRow(
user_id=user.id,
user_name=user.name,
role=user.role,
tickets_assigned=int(assigned),
tickets_completed=int(completed),
completion_rate=round(completion_rate, 2),
total_hours_logged=round(hours, 2),
avg_resolution_time_hours=round(avg_res_time, 2),
sla_violations_count=sla_violations,
on_time_percentage=round(on_time_pct, 2)
)
rows.append(row)
return rows
# =========================================================
# 3. FINANCIAL SUMMARY REPORT
# =========================================================
@staticmethod
def _generate_financial_report(db: Session, filters: ReportFilter) -> List[FinancialReportRow]:
"""
Combines ProjectFinance (Invoices/Payments) and TicketExpenses (Reimbursements)
into a unified ledger view.
"""
rows = []
# Part A: Project Finance (Revenue & Overhead)
finance_query = db.query(ProjectFinance).filter(
func.date(ProjectFinance.transaction_date) >= filters.date_range.start_date,
func.date(ProjectFinance.transaction_date) <= filters.date_range.end_date,
ProjectFinance.deleted_at.is_(None)
)
if filters.project_id:
finance_query = finance_query.filter(ProjectFinance.project_id == filters.project_id)
if filters.status:
finance_query = finance_query.filter(ProjectFinance.status == filters.status)
finance_records = finance_query.options(joinedload(ProjectFinance.project)).all()
for rec in finance_records:
rows.append(FinancialReportRow(
category=rec.category or "General",
description=rec.description or f"Transaction {rec.reference_code}",
date=rec.transaction_date.date(),
project_name=rec.project.title if rec.project else "Unknown",
amount=float(rec.amount),
status=rec.status,
transaction_type="Credit" if rec.transaction_type == 'inflow' else "Debit"
))
# Part B: Ticket Expenses (Reimbursements - Only Approved)
# Note: We usually only show 'approved' expenses in financial reports, but if status filter is passed...
expense_query = db.query(TicketExpense).join(Ticket).filter(
TicketExpense.expense_date >= filters.date_range.start_date,
TicketExpense.expense_date <= filters.date_range.end_date,
TicketExpense.deleted_at.is_(None)
)
# Default behavior: Show approved. If status filter override provided (e.g. 'pending'), use it.
# But 'status' field in ReportFilter is generic string.
# TicketExpense has 'is_approved' boolean, not status string.
# We map string status to boolean logic if possible, or ignore for Expenses if it doesn't map cleanly.
if filters.status:
if filters.status.lower() == 'approved':
expense_query = expense_query.filter(TicketExpense.is_approved == True)
elif filters.status.lower() == 'pending':
expense_query = expense_query.filter(TicketExpense.is_approved == False)
else:
# Default to approved only for financial ledger if no filter specified?
# No, standard ledger usually implies "Real money moves", so Approved is safer default for generic report.
expense_query = expense_query.filter(TicketExpense.is_approved == True)
if filters.project_id:
expense_query = expense_query.filter(Ticket.project_id == filters.project_id)
if filters.region_id:
expense_query = expense_query.filter(Ticket.project_region_id == filters.region_id)
if filters.user_id:
expense_query = expense_query.filter(TicketExpense.incurred_by_user_id == filters.user_id)
expenses = expense_query.options(joinedload(TicketExpense.ticket).joinedload(Ticket.project)).all()
for exp in expenses:
rows.append(FinancialReportRow(
category="Field Expense",
description=f"{exp.expense_type}: {exp.description or 'No desc'}",
date=exp.expense_date,
project_name=exp.ticket.project.title if exp.ticket.project else "Unknown",
amount=float(exp.total_cost),
status="Approved",
transaction_type="Debit"
))
# Sort by date
sorted_rows = sorted(rows, key=lambda x: x.date)
# Apply Pagination (Post-Aggregation Slicing)
if filters.limit is not None:
start = filters.skip or 0
end = start + filters.limit
sorted_rows = sorted_rows[start:end]
return sorted_rows
# =========================================================
# 4. INVENTORY REPORT
# =========================================================
@staticmethod
def _generate_inventory_report(db: Session, filters: ReportFilter) -> List[InventoryUsageRow]:
"""
Tracks usage of inventory items across tickets.
"""
query = db.query(InventoryAssignment).join(
ProjectInventory, InventoryAssignment.project_inventory_id == ProjectInventory.id
).filter(
# Use installed_at or allocated_at based on what we have.
# Assuming we want to see what was USED (consumed/installed) in this period
or_(
and_(InventoryAssignment.installed_at >= filters.date_range.start_date, InventoryAssignment.installed_at <= filters.date_range.end_date),
and_(InventoryAssignment.consumed_at >= filters.date_range.start_date, InventoryAssignment.consumed_at <= filters.date_range.end_date)
),
InventoryAssignment.deleted_at.is_(None)
)
if filters.project_id:
query = query.filter(ProjectInventory.project_id == filters.project_id)
if filters.region_id:
# Join via Ticket to get region
query = query.join(Ticket, InventoryAssignment.ticket_id == Ticket.id)\
.filter(Ticket.project_region_id == filters.region_id)
if filters.user_id:
query = query.filter(InventoryAssignment.user_id == filters.user_id)
# Apply Pagination (DB Level)
if filters.limit is not None:
query = query.offset(filters.skip).limit(filters.limit)
results = query.options(
joinedload(InventoryAssignment.ticket).joinedload(Ticket.project),
joinedload(InventoryAssignment.user),
joinedload(InventoryAssignment.project_inventory)
).all()
rows = []
for assign in results:
used_date = assign.installed_at or assign.consumed_at
if not used_date: continue
rows.append(InventoryUsageRow(
item_name=assign.project_inventory.item_name,
serial_number=assign.serial_number,
category=assign.project_inventory.category or "General",
used_at=used_date,
ticket_id=assign.ticket_id,
project_name=assign.ticket.project.title if assign.ticket and assign.ticket.project else "Unknown",
used_by_user=assign.user.name if assign.user else "Unknown",
unit_cost=float(assign.project_inventory.unit_cost or 0.0)
))
return rows
# =========================================================
# 5. CUSTOM REPORT
# =========================================================
@staticmethod
def _generate_custom_report(db: Session, filters: ReportFilter, current_user: User) -> List[Dict[str, Any]]:
"""
Generate a fully customizable report based on user configuration.
Supports any entity with flexible column selection, calculated fields, and aggregations.
"""
config = filters.custom_config
entity = config.entity.lower()
# Map entity names to models
from app.models.user_payroll import UserPayroll
entity_map = {
"tickets": Ticket,
"timesheets": Timesheet,
"payroll": UserPayroll,
"expenses": TicketExpense,
"users": User
}
if entity not in entity_map:
raise ValueError(f"Unsupported entity for custom reports: {entity}")
model = entity_map[entity]
# Build base query
if config.aggregations and config.group_by:
# Aggregation query
query = ReportService._build_aggregation_query(db, model, config, filters)
else:
# Regular query
query = ReportService._build_custom_query(db, model, config, filters)
# Execute query
results = query.all()
# Format results
rows = []
for result in results:
if config.aggregations and config.group_by:
# Aggregation results are tuples
row = ReportService._format_aggregation_result(result, config)
else:
# Regular results are model instances
row = ReportService._format_custom_result(result, config, db)
rows.append(row)
return rows
@staticmethod
def _build_custom_query(db: Session, model, config, filters: ReportFilter):
"""Build query for custom report without aggregations"""
query = db.query(model)
# Apply date range filter (entity-specific)
if hasattr(model, 'created_at'):
query = query.filter(
func.date(model.created_at) >= filters.date_range.start_date,
func.date(model.created_at) <= filters.date_range.end_date
)
elif hasattr(model, 'work_date'): # Timesheets
query = query.filter(
model.work_date >= filters.date_range.start_date,
model.work_date <= filters.date_range.end_date
)
elif hasattr(model, 'period_start_date'): # Payroll
query = query.filter(
model.period_start_date >= filters.date_range.start_date,
model.period_end_date <= filters.date_range.end_date
)
# Apply common filters
if filters.project_id and hasattr(model, 'project_id'):
query = query.filter(model.project_id == filters.project_id)
if filters.user_id and hasattr(model, 'user_id'):
query = query.filter(model.user_id == filters.user_id)
if filters.region_id and hasattr(model, 'project_region_id'):
query = query.filter(model.project_region_id == filters.region_id)
if filters.status and hasattr(model, 'status'):
query = query.filter(model.status == filters.status)
# Apply additional entity-specific filters
if config.additional_filters:
for field, value in config.additional_filters.items():
if hasattr(model, field):
query = query.filter(getattr(model, field) == value)
# Apply pagination
if filters.limit:
query = query.offset(filters.skip or 0).limit(filters.limit)
return query
@staticmethod
def _build_aggregation_query(db: Session, model, config, filters: ReportFilter):
"""Build aggregation query for custom report"""
# Build select columns
select_cols = []
# Add group by columns
for col in config.group_by:
if hasattr(model, col):
select_cols.append(getattr(model, col))
# Add aggregations
for agg in config.aggregations:
field = getattr(model, agg.field) if hasattr(model, agg.field) else None
if not field:
continue
if agg.type == "count":
select_cols.append(func.count(field).label(agg.alias or f"{agg.field}_count"))
elif agg.type == "sum":
select_cols.append(func.sum(field).label(agg.alias or f"{agg.field}_sum"))
elif agg.type == "avg":
select_cols.append(func.avg(field).label(agg.alias or f"{agg.field}_avg"))
elif agg.type == "min":
select_cols.append(func.min(field).label(agg.alias or f"{agg.field}_min"))
elif agg.type == "max":
select_cols.append(func.max(field).label(agg.alias or f"{agg.field}_max"))
query = db.query(*select_cols)
# Apply filters (same as regular query)
if hasattr(model, 'created_at'):
query = query.filter(
func.date(model.created_at) >= filters.date_range.start_date,
func.date(model.created_at) <= filters.date_range.end_date
)
if filters.project_id and hasattr(model, 'project_id'):
query = query.filter(model.project_id == filters.project_id)
# Group by
for col in config.group_by:
if hasattr(model, col):
query = query.group_by(getattr(model, col))
return query
@staticmethod
def _format_custom_result(result, config, db: Session) -> Dict[str, Any]:
"""Format a single result row for custom report"""
row = {}
# Extract requested columns
for col in config.columns:
if hasattr(result, col):
value = getattr(result, col)
# Convert special types
if isinstance(value, (datetime, date)):
value = value.isoformat()
elif isinstance(value, UUID):
value = str(value)
row[col] = value
# Add calculated fields
if config.calculated_fields:
for calc_field in config.calculated_fields:
if calc_field == "sla_violated" and hasattr(result, 'sla_violated'):
row["sla_violated"] = result.sla_violated
elif calc_field == "days_to_complete" and hasattr(result, 'created_at') and hasattr(result, 'completed_at'):
if result.completed_at and result.created_at:
delta = result.completed_at - result.created_at
row["days_to_complete"] = delta.days
elif calc_field == "is_overdue" and hasattr(result, 'is_overdue'):
row["is_overdue"] = result.is_overdue
return row
@staticmethod
def _format_aggregation_result(result, config) -> Dict[str, Any]:
"""Format aggregation result tuple"""
row = {}
idx = 0
# Add group by values
for col in config.group_by:
row[col] = result[idx]
idx += 1
# Add aggregation values
for agg in config.aggregations:
alias = agg.alias or f"{agg.field}_{agg.type}"
value = result[idx]
# Round floats
if isinstance(value, float):
value = round(value, 2)
row[alias] = value
idx += 1
return row
# =========================================================
# EXPORT GENERATION (CSV)
# =========================================================
@staticmethod
def generate_csv_export(data: List[Any]) -> BytesIO:
"""
Converts a list of Pydantic models or dicts into a CSV file buffer.
"""
if not data:
return BytesIO(b"No data found for the selected criteria.")
output = StringIO()
# Extract headers from the first item
if isinstance(data[0], dict):
first_row = data[0]
elif hasattr(data[0], "model_dump"):
first_row = data[0].model_dump()
else:
first_row = data[0].dict() # Legacy Pydantic support
headers = list(first_row.keys())
writer = csv.DictWriter(output, fieldnames=headers)
writer.writeheader()
for item in data:
if isinstance(item, dict):
row_dict = item
elif hasattr(item, "model_dump"):
row_dict = item.model_dump()
else:
row_dict = item.dict()
writer.writerow(row_dict)
# Encode to bytes
bytes_output = BytesIO()
# UTF-8 with BOM for Excel compatibility is critical
bytes_output.write(output.getvalue().encode('utf-8-sig'))
bytes_output.seek(0)
return bytes_output