Spaces:
Sleeping
Sleeping
| """ | |
| REPORT SERVICE | |
| Handles the generation of complex business intelligence reports. | |
| Supports: | |
| 1. SLA Compliance (Customer & Infrastructure) | |
| 2. User Performance (Aggregated from Timesheets) | |
| 3. Financial Summaries | |
| 4. CSV Export generation | |
| Differentiates itself from AnalyticsService by focusing on: | |
| - Large datasets | |
| - Complex Joins | |
| - Downloadable/Exportable formats | |
| """ | |
| from typing import List, Dict, Any, Optional | |
| from sqlalchemy.orm import Session, joinedload | |
| from sqlalchemy import func, and_, or_, case | |
| from datetime import datetime, date, timedelta | |
| from io import BytesIO, StringIO | |
| import csv | |
| import logging | |
| from pydantic import BaseModel | |
| from app.schemas.report import ( | |
| ReportFilter, | |
| ReportType, | |
| SLAReportRow, | |
| UserPerformanceRow, | |
| FinancialReportRow, | |
| InventoryUsageRow | |
| ) | |
| from app.models.ticket import Ticket | |
| from app.models.ticket_assignment import TicketAssignment | |
| from app.models.ticket_expense import TicketExpense | |
| from app.models.finance import ProjectFinance | |
| from app.models.timesheet import Timesheet | |
| from app.models.user import User | |
| from app.models.project import Project | |
| from app.models.inventory import InventoryAssignment, ProjectInventory | |
| from app.models.enums import TicketStatus | |
| logger = logging.getLogger(__name__) | |
| class ReportService: | |
| """ | |
| Core service for generating tabular business reports | |
| """ | |
| def generate_report_data(db: Session, filters: ReportFilter, current_user: User) -> List[Any]: | |
| """ | |
| Main entry point. Routes to specific generator based on ReportType. | |
| """ | |
| if filters.report_type == ReportType.SLA_COMPLIANCE: | |
| return ReportService._generate_sla_report(db, filters) | |
| elif filters.report_type == ReportType.USER_PERFORMANCE: | |
| return ReportService._generate_performance_report(db, filters) | |
| elif filters.report_type == ReportType.FINANCIAL_SUMMARY: | |
| return ReportService._generate_financial_report(db, filters) | |
| elif filters.report_type == ReportType.INVENTORY_USAGE: | |
| return ReportService._generate_inventory_report(db, filters) | |
| elif filters.report_type == ReportType.CUSTOM: | |
| if not filters.custom_config: | |
| raise ValueError("custom_config is required for custom reports") | |
| return ReportService._generate_custom_report(db, filters, current_user) | |
| else: | |
| raise ValueError(f"Unsupported report type: {filters.report_type}") | |
| # ========================================================= | |
| # 1. SLA COMPLIANCE REPORT | |
| # ========================================================= | |
| def _generate_sla_report(db: Session, filters: ReportFilter) -> List[SLAReportRow]: | |
| """ | |
| Generates SLA violation & compliance data. | |
| Works for both Customer (SalesOrder) and Infrastructure (Task) tickets | |
| by unifying queries via the Ticket table. | |
| """ | |
| # Base Query | |
| query = db.query(Ticket).options( | |
| joinedload(Ticket.project), | |
| joinedload(Ticket.sales_order), # For customer names | |
| joinedload(Ticket.assignments).joinedload(TicketAssignment.user) # Optimize agent name lookup | |
| ).filter( | |
| Ticket.deleted_at.is_(None), | |
| func.date(Ticket.created_at) >= filters.date_range.start_date, | |
| func.date(Ticket.created_at) <= filters.date_range.end_date | |
| ) | |
| # Apply Filters | |
| if filters.project_id: | |
| query = query.filter(Ticket.project_id == filters.project_id) | |
| if filters.region_id: | |
| query = query.filter(Ticket.project_region_id == filters.region_id) | |
| if filters.status: | |
| # Flexible status matching (case insensitive) | |
| query = query.filter(Ticket.status == filters.status.lower()) | |
| if filters.user_id: | |
| # Join assignments to find tickets assigned to this user | |
| query = query.join(TicketAssignment).filter( | |
| TicketAssignment.user_id == filters.user_id, | |
| TicketAssignment.deleted_at.is_(None) | |
| ).distinct() # Avoid duplicates if user was assigned multiple times to same ticket | |
| # Apply Pagination (if limit is set) | |
| if filters.limit is not None: | |
| query = query.offset(filters.skip).limit(filters.limit) | |
| results = query.all() | |
| rows = [] | |
| for ticket in results: | |
| # Calculate metrics | |
| violation_margin = 0.0 | |
| # If completed, check completion date vs due date | |
| if ticket.completed_at and ticket.due_date: | |
| # Calculate hours diff. Positive = Late. Negative = Early. | |
| delta = ticket.completed_at - ticket.due_date.replace(tzinfo=None) # naive comparison safe if db returns consistent tz | |
| violation_margin = delta.total_seconds() / 3600 | |
| # If not completed but overdue | |
| elif not ticket.completed_at and ticket.is_overdue and ticket.due_date: | |
| delta = datetime.utcnow() - ticket.due_date.replace(tzinfo=None) | |
| violation_margin = delta.total_seconds() / 3600 | |
| # Resolve Assigned Agent Name (First active assignment or last history) | |
| agent_name = "Unassigned" | |
| if ticket.assignments: | |
| # Grab the first active one | |
| active = next((a for a in ticket.assignments if a.ended_at is None), None) | |
| if active and active.user: | |
| agent_name = active.user.name | |
| elif ticket.assignments: | |
| # Fallback to last person | |
| agent_name = ticket.assignments[-1].user.name if ticket.assignments[-1].user else "Unknown" | |
| # Resolve Customer Name | |
| customer_name = "Internal/Infrastructure" | |
| if ticket.sales_order and ticket.sales_order.customer: | |
| customer_name = ticket.sales_order.customer.customer_name | |
| row = SLAReportRow( | |
| ticket_id=ticket.id, | |
| ticket_number=str(ticket.dedup_key[-8:] if ticket.dedup_key else ticket.id).upper(), # Pseudo-number | |
| project_name=ticket.project.title if ticket.project else "Unknown", | |
| ticket_type=ticket.ticket_type, | |
| priority=ticket.priority, | |
| status=ticket.status, | |
| created_at=ticket.created_at, | |
| due_date=ticket.due_date, | |
| completed_at=ticket.completed_at, | |
| is_violated=ticket.sla_violated, | |
| violation_margin_hours=round(violation_margin, 2), | |
| assigned_agent=agent_name, | |
| customer_name=customer_name | |
| ) | |
| rows.append(row) | |
| return rows | |
| # ========================================================= | |
| # 2. USER PERFORMANCE REPORT | |
| # ========================================================= | |
| def _generate_performance_report(db: Session, filters: ReportFilter) -> List[UserPerformanceRow]: | |
| """ | |
| Generates performance stats by aggregating TIMESHEETS. | |
| This avoids expensive joins on the ticket table for date ranges. | |
| """ | |
| # Query Timesheets Aggregation | |
| # Group by User and Sum up stats | |
| stats_query = db.query( | |
| Timesheet.user_id, | |
| func.sum(Timesheet.tickets_completed).label('total_completed'), | |
| func.sum(Timesheet.tickets_assigned).label('total_assigned'), | |
| func.sum(Timesheet.hours_worked).label('total_hours') | |
| ).filter( | |
| Timesheet.work_date >= filters.date_range.start_date, | |
| Timesheet.work_date <= filters.date_range.end_date, | |
| Timesheet.deleted_at.is_(None) | |
| ) | |
| if filters.project_id: | |
| stats_query = stats_query.filter(Timesheet.project_id == filters.project_id) | |
| if filters.user_id: | |
| stats_query = stats_query.filter(Timesheet.user_id == filters.user_id) | |
| if filters.region_id: | |
| # TODO: Implement accurate region filtering for timesheets if columns allow. | |
| pass | |
| # Apply Pagination (Limit on User Groups) | |
| if filters.limit is not None: | |
| stats_query = stats_query.limit(filters.limit).offset(filters.skip) | |
| stats_results = stats_query.group_by(Timesheet.user_id).all() | |
| if not stats_results: | |
| return [] | |
| # Optimization: Batch fetch all users | |
| user_ids = [s.user_id for s in stats_results] | |
| users = db.query(User).filter(User.id.in_(user_ids)).all() | |
| user_map = {u.id: u for u in users} | |
| rows = [] | |
| for stat in stats_results: | |
| user = user_map.get(stat.user_id) | |
| if not user: | |
| continue | |
| # Calculate Quality Metric (SLA Violations) | |
| # This one DOES require querying tickets because Timesheet doesn't track SLA breaches count explicitly yet | |
| # Optimization: We check SLA count for this user in this date range | |
| # Note: This is an N+1 query. For 50 users, it's 50 efficient index scans. Accepting for now given complexity of batching this specific aggregate. | |
| sla_violations = db.query(func.count(TicketAssignment.id)).join(Ticket).filter( | |
| TicketAssignment.user_id == user.id, | |
| Ticket.sla_violated == True, | |
| func.date(Ticket.completed_at) >= filters.date_range.start_date, | |
| func.date(Ticket.completed_at) <= filters.date_range.end_date | |
| ).scalar() or 0 | |
| # Derived Metrics | |
| completed = stat.total_completed or 0 | |
| assigned = stat.total_assigned or 0 | |
| hours = float(stat.total_hours or 0) | |
| completion_rate = (completed / assigned * 100) if assigned > 0 else 0.0 | |
| avg_res_time = (hours / completed) if completed > 0 else 0.0 | |
| # On Time % | |
| on_time_count = max(0, completed - sla_violations) | |
| on_time_pct = (on_time_count / completed * 100) if completed > 0 else 100.0 | |
| row = UserPerformanceRow( | |
| user_id=user.id, | |
| user_name=user.name, | |
| role=user.role, | |
| tickets_assigned=int(assigned), | |
| tickets_completed=int(completed), | |
| completion_rate=round(completion_rate, 2), | |
| total_hours_logged=round(hours, 2), | |
| avg_resolution_time_hours=round(avg_res_time, 2), | |
| sla_violations_count=sla_violations, | |
| on_time_percentage=round(on_time_pct, 2) | |
| ) | |
| rows.append(row) | |
| return rows | |
| # ========================================================= | |
| # 3. FINANCIAL SUMMARY REPORT | |
| # ========================================================= | |
| def _generate_financial_report(db: Session, filters: ReportFilter) -> List[FinancialReportRow]: | |
| """ | |
| Combines ProjectFinance (Invoices/Payments) and TicketExpenses (Reimbursements) | |
| into a unified ledger view. | |
| """ | |
| rows = [] | |
| # Part A: Project Finance (Revenue & Overhead) | |
| finance_query = db.query(ProjectFinance).filter( | |
| func.date(ProjectFinance.transaction_date) >= filters.date_range.start_date, | |
| func.date(ProjectFinance.transaction_date) <= filters.date_range.end_date, | |
| ProjectFinance.deleted_at.is_(None) | |
| ) | |
| if filters.project_id: | |
| finance_query = finance_query.filter(ProjectFinance.project_id == filters.project_id) | |
| if filters.status: | |
| finance_query = finance_query.filter(ProjectFinance.status == filters.status) | |
| finance_records = finance_query.options(joinedload(ProjectFinance.project)).all() | |
| for rec in finance_records: | |
| rows.append(FinancialReportRow( | |
| category=rec.category or "General", | |
| description=rec.description or f"Transaction {rec.reference_code}", | |
| date=rec.transaction_date.date(), | |
| project_name=rec.project.title if rec.project else "Unknown", | |
| amount=float(rec.amount), | |
| status=rec.status, | |
| transaction_type="Credit" if rec.transaction_type == 'inflow' else "Debit" | |
| )) | |
| # Part B: Ticket Expenses (Reimbursements - Only Approved) | |
| # Note: We usually only show 'approved' expenses in financial reports, but if status filter is passed... | |
| expense_query = db.query(TicketExpense).join(Ticket).filter( | |
| TicketExpense.expense_date >= filters.date_range.start_date, | |
| TicketExpense.expense_date <= filters.date_range.end_date, | |
| TicketExpense.deleted_at.is_(None) | |
| ) | |
| # Default behavior: Show approved. If status filter override provided (e.g. 'pending'), use it. | |
| # But 'status' field in ReportFilter is generic string. | |
| # TicketExpense has 'is_approved' boolean, not status string. | |
| # We map string status to boolean logic if possible, or ignore for Expenses if it doesn't map cleanly. | |
| if filters.status: | |
| if filters.status.lower() == 'approved': | |
| expense_query = expense_query.filter(TicketExpense.is_approved == True) | |
| elif filters.status.lower() == 'pending': | |
| expense_query = expense_query.filter(TicketExpense.is_approved == False) | |
| else: | |
| # Default to approved only for financial ledger if no filter specified? | |
| # No, standard ledger usually implies "Real money moves", so Approved is safer default for generic report. | |
| expense_query = expense_query.filter(TicketExpense.is_approved == True) | |
| if filters.project_id: | |
| expense_query = expense_query.filter(Ticket.project_id == filters.project_id) | |
| if filters.region_id: | |
| expense_query = expense_query.filter(Ticket.project_region_id == filters.region_id) | |
| if filters.user_id: | |
| expense_query = expense_query.filter(TicketExpense.incurred_by_user_id == filters.user_id) | |
| expenses = expense_query.options(joinedload(TicketExpense.ticket).joinedload(Ticket.project)).all() | |
| for exp in expenses: | |
| rows.append(FinancialReportRow( | |
| category="Field Expense", | |
| description=f"{exp.expense_type}: {exp.description or 'No desc'}", | |
| date=exp.expense_date, | |
| project_name=exp.ticket.project.title if exp.ticket.project else "Unknown", | |
| amount=float(exp.total_cost), | |
| status="Approved", | |
| transaction_type="Debit" | |
| )) | |
| # Sort by date | |
| sorted_rows = sorted(rows, key=lambda x: x.date) | |
| # Apply Pagination (Post-Aggregation Slicing) | |
| if filters.limit is not None: | |
| start = filters.skip or 0 | |
| end = start + filters.limit | |
| sorted_rows = sorted_rows[start:end] | |
| return sorted_rows | |
| # ========================================================= | |
| # 4. INVENTORY REPORT | |
| # ========================================================= | |
| def _generate_inventory_report(db: Session, filters: ReportFilter) -> List[InventoryUsageRow]: | |
| """ | |
| Tracks usage of inventory items across tickets. | |
| """ | |
| query = db.query(InventoryAssignment).join( | |
| ProjectInventory, InventoryAssignment.project_inventory_id == ProjectInventory.id | |
| ).filter( | |
| # Use installed_at or allocated_at based on what we have. | |
| # Assuming we want to see what was USED (consumed/installed) in this period | |
| or_( | |
| and_(InventoryAssignment.installed_at >= filters.date_range.start_date, InventoryAssignment.installed_at <= filters.date_range.end_date), | |
| and_(InventoryAssignment.consumed_at >= filters.date_range.start_date, InventoryAssignment.consumed_at <= filters.date_range.end_date) | |
| ), | |
| InventoryAssignment.deleted_at.is_(None) | |
| ) | |
| if filters.project_id: | |
| query = query.filter(ProjectInventory.project_id == filters.project_id) | |
| if filters.region_id: | |
| # Join via Ticket to get region | |
| query = query.join(Ticket, InventoryAssignment.ticket_id == Ticket.id)\ | |
| .filter(Ticket.project_region_id == filters.region_id) | |
| if filters.user_id: | |
| query = query.filter(InventoryAssignment.user_id == filters.user_id) | |
| # Apply Pagination (DB Level) | |
| if filters.limit is not None: | |
| query = query.offset(filters.skip).limit(filters.limit) | |
| results = query.options( | |
| joinedload(InventoryAssignment.ticket).joinedload(Ticket.project), | |
| joinedload(InventoryAssignment.user), | |
| joinedload(InventoryAssignment.project_inventory) | |
| ).all() | |
| rows = [] | |
| for assign in results: | |
| used_date = assign.installed_at or assign.consumed_at | |
| if not used_date: continue | |
| rows.append(InventoryUsageRow( | |
| item_name=assign.project_inventory.item_name, | |
| serial_number=assign.serial_number, | |
| category=assign.project_inventory.category or "General", | |
| used_at=used_date, | |
| ticket_id=assign.ticket_id, | |
| project_name=assign.ticket.project.title if assign.ticket and assign.ticket.project else "Unknown", | |
| used_by_user=assign.user.name if assign.user else "Unknown", | |
| unit_cost=float(assign.project_inventory.unit_cost or 0.0) | |
| )) | |
| return rows | |
| # ========================================================= | |
| # 5. CUSTOM REPORT | |
| # ========================================================= | |
| def _generate_custom_report(db: Session, filters: ReportFilter, current_user: User) -> List[Dict[str, Any]]: | |
| """ | |
| Generate a fully customizable report based on user configuration. | |
| Supports any entity with flexible column selection, calculated fields, and aggregations. | |
| """ | |
| config = filters.custom_config | |
| entity = config.entity.lower() | |
| # Map entity names to models | |
| from app.models.user_payroll import UserPayroll | |
| entity_map = { | |
| "tickets": Ticket, | |
| "timesheets": Timesheet, | |
| "payroll": UserPayroll, | |
| "expenses": TicketExpense, | |
| "users": User | |
| } | |
| if entity not in entity_map: | |
| raise ValueError(f"Unsupported entity for custom reports: {entity}") | |
| model = entity_map[entity] | |
| # Build base query | |
| if config.aggregations and config.group_by: | |
| # Aggregation query | |
| query = ReportService._build_aggregation_query(db, model, config, filters) | |
| else: | |
| # Regular query | |
| query = ReportService._build_custom_query(db, model, config, filters) | |
| # Execute query | |
| results = query.all() | |
| # Format results | |
| rows = [] | |
| for result in results: | |
| if config.aggregations and config.group_by: | |
| # Aggregation results are tuples | |
| row = ReportService._format_aggregation_result(result, config) | |
| else: | |
| # Regular results are model instances | |
| row = ReportService._format_custom_result(result, config, db) | |
| rows.append(row) | |
| return rows | |
| def _build_custom_query(db: Session, model, config, filters: ReportFilter): | |
| """Build query for custom report without aggregations""" | |
| query = db.query(model) | |
| # Apply date range filter (entity-specific) | |
| if hasattr(model, 'created_at'): | |
| query = query.filter( | |
| func.date(model.created_at) >= filters.date_range.start_date, | |
| func.date(model.created_at) <= filters.date_range.end_date | |
| ) | |
| elif hasattr(model, 'work_date'): # Timesheets | |
| query = query.filter( | |
| model.work_date >= filters.date_range.start_date, | |
| model.work_date <= filters.date_range.end_date | |
| ) | |
| elif hasattr(model, 'period_start_date'): # Payroll | |
| query = query.filter( | |
| model.period_start_date >= filters.date_range.start_date, | |
| model.period_end_date <= filters.date_range.end_date | |
| ) | |
| # Apply common filters | |
| if filters.project_id and hasattr(model, 'project_id'): | |
| query = query.filter(model.project_id == filters.project_id) | |
| if filters.user_id and hasattr(model, 'user_id'): | |
| query = query.filter(model.user_id == filters.user_id) | |
| if filters.region_id and hasattr(model, 'project_region_id'): | |
| query = query.filter(model.project_region_id == filters.region_id) | |
| if filters.status and hasattr(model, 'status'): | |
| query = query.filter(model.status == filters.status) | |
| # Apply additional entity-specific filters | |
| if config.additional_filters: | |
| for field, value in config.additional_filters.items(): | |
| if hasattr(model, field): | |
| query = query.filter(getattr(model, field) == value) | |
| # Apply pagination | |
| if filters.limit: | |
| query = query.offset(filters.skip or 0).limit(filters.limit) | |
| return query | |
| def _build_aggregation_query(db: Session, model, config, filters: ReportFilter): | |
| """Build aggregation query for custom report""" | |
| # Build select columns | |
| select_cols = [] | |
| # Add group by columns | |
| for col in config.group_by: | |
| if hasattr(model, col): | |
| select_cols.append(getattr(model, col)) | |
| # Add aggregations | |
| for agg in config.aggregations: | |
| field = getattr(model, agg.field) if hasattr(model, agg.field) else None | |
| if not field: | |
| continue | |
| if agg.type == "count": | |
| select_cols.append(func.count(field).label(agg.alias or f"{agg.field}_count")) | |
| elif agg.type == "sum": | |
| select_cols.append(func.sum(field).label(agg.alias or f"{agg.field}_sum")) | |
| elif agg.type == "avg": | |
| select_cols.append(func.avg(field).label(agg.alias or f"{agg.field}_avg")) | |
| elif agg.type == "min": | |
| select_cols.append(func.min(field).label(agg.alias or f"{agg.field}_min")) | |
| elif agg.type == "max": | |
| select_cols.append(func.max(field).label(agg.alias or f"{agg.field}_max")) | |
| query = db.query(*select_cols) | |
| # Apply filters (same as regular query) | |
| if hasattr(model, 'created_at'): | |
| query = query.filter( | |
| func.date(model.created_at) >= filters.date_range.start_date, | |
| func.date(model.created_at) <= filters.date_range.end_date | |
| ) | |
| if filters.project_id and hasattr(model, 'project_id'): | |
| query = query.filter(model.project_id == filters.project_id) | |
| # Group by | |
| for col in config.group_by: | |
| if hasattr(model, col): | |
| query = query.group_by(getattr(model, col)) | |
| return query | |
| def _format_custom_result(result, config, db: Session) -> Dict[str, Any]: | |
| """Format a single result row for custom report""" | |
| row = {} | |
| # Extract requested columns | |
| for col in config.columns: | |
| if hasattr(result, col): | |
| value = getattr(result, col) | |
| # Convert special types | |
| if isinstance(value, (datetime, date)): | |
| value = value.isoformat() | |
| elif isinstance(value, UUID): | |
| value = str(value) | |
| row[col] = value | |
| # Add calculated fields | |
| if config.calculated_fields: | |
| for calc_field in config.calculated_fields: | |
| if calc_field == "sla_violated" and hasattr(result, 'sla_violated'): | |
| row["sla_violated"] = result.sla_violated | |
| elif calc_field == "days_to_complete" and hasattr(result, 'created_at') and hasattr(result, 'completed_at'): | |
| if result.completed_at and result.created_at: | |
| delta = result.completed_at - result.created_at | |
| row["days_to_complete"] = delta.days | |
| elif calc_field == "is_overdue" and hasattr(result, 'is_overdue'): | |
| row["is_overdue"] = result.is_overdue | |
| return row | |
| def _format_aggregation_result(result, config) -> Dict[str, Any]: | |
| """Format aggregation result tuple""" | |
| row = {} | |
| idx = 0 | |
| # Add group by values | |
| for col in config.group_by: | |
| row[col] = result[idx] | |
| idx += 1 | |
| # Add aggregation values | |
| for agg in config.aggregations: | |
| alias = agg.alias or f"{agg.field}_{agg.type}" | |
| value = result[idx] | |
| # Round floats | |
| if isinstance(value, float): | |
| value = round(value, 2) | |
| row[alias] = value | |
| idx += 1 | |
| return row | |
| # ========================================================= | |
| # EXPORT GENERATION (CSV) | |
| # ========================================================= | |
| def generate_csv_export(data: List[Any]) -> BytesIO: | |
| """ | |
| Converts a list of Pydantic models or dicts into a CSV file buffer. | |
| """ | |
| if not data: | |
| return BytesIO(b"No data found for the selected criteria.") | |
| output = StringIO() | |
| # Extract headers from the first item | |
| if isinstance(data[0], dict): | |
| first_row = data[0] | |
| elif hasattr(data[0], "model_dump"): | |
| first_row = data[0].model_dump() | |
| else: | |
| first_row = data[0].dict() # Legacy Pydantic support | |
| headers = list(first_row.keys()) | |
| writer = csv.DictWriter(output, fieldnames=headers) | |
| writer.writeheader() | |
| for item in data: | |
| if isinstance(item, dict): | |
| row_dict = item | |
| elif hasattr(item, "model_dump"): | |
| row_dict = item.model_dump() | |
| else: | |
| row_dict = item.dict() | |
| writer.writerow(row_dict) | |
| # Encode to bytes | |
| bytes_output = BytesIO() | |
| # UTF-8 with BOM for Excel compatibility is critical | |
| bytes_output.write(output.getvalue().encode('utf-8-sig')) | |
| bytes_output.seek(0) | |
| return bytes_output | |