from sqlalchemy.orm import Session from typing import List, Optional import models import schemas # Projects CRUD def get_projects(db: Session, skip: int = 0, limit: int = 100): return db.query(models.Project).offset(skip).limit(limit).all() def get_project(db: Session, project_id: str): return db.query(models.Project).filter(models.Project.project_id == project_id).first() def create_project(db: Session, project: schemas.ProjectCreate): db_project = models.Project(**project.model_dump()) db.add(db_project) db.commit() db.refresh(db_project) return db_project def update_project(db: Session, project_id: str, project: schemas.ProjectCreate): db_project = get_project(db, project_id) if db_project: for key, value in project.model_dump().items(): setattr(db_project, key, value) db.commit() db.refresh(db_project) return db_project def delete_project(db: Session, project_id: str): db_project = get_project(db, project_id) if db_project: db.delete(db_project) db.commit() return db_project # Employees CRUD def get_employees(db: Session, skip: int = 0, limit: int = 100): return db.query(models.Employee).offset(skip).limit(limit).all() def get_employee(db: Session, employee_id: str): return db.query(models.Employee).filter(models.Employee.employee_id == employee_id).first() def create_employee(db: Session, employee: schemas.EmployeeCreate): db_employee = models.Employee(**employee.model_dump()) db.add(db_employee) db.commit() db.refresh(db_employee) return db_employee def update_employee(db: Session, employee_id: str, employee: schemas.EmployeeCreate): db_employee = get_employee(db, employee_id) if db_employee: for key, value in employee.model_dump().items(): setattr(db_employee, key, value) db.commit() db.refresh(db_employee) return db_employee def delete_employee(db: Session, employee_id: str): db_employee = get_employee(db, employee_id) if db_employee: db.delete(db_employee) db.commit() return db_employee # Timesheets CRUD def get_timesheets(db: Session, skip: int = 0, limit: int = 100): return db.query(models.Timesheet).offset(skip).limit(limit).all() def get_timesheet(db: Session, record_id: str): return db.query(models.Timesheet).filter(models.Timesheet.record_id == record_id).first() def create_timesheet(db: Session, timesheet: schemas.TimesheetCreate): db_timesheet = models.Timesheet(**timesheet.model_dump()) db.add(db_timesheet) db.commit() db.refresh(db_timesheet) return db_timesheet def update_timesheet(db: Session, record_id: str, timesheet: schemas.TimesheetCreate): db_timesheet = get_timesheet(db, record_id) if db_timesheet: for key, value in timesheet.model_dump().items(): setattr(db_timesheet, key, value) db.commit() db.refresh(db_timesheet) return db_timesheet def delete_timesheet(db: Session, record_id: str): db_timesheet = get_timesheet(db, record_id) if db_timesheet: db.delete(db_timesheet) db.commit() return db_timesheet # Milestones CRUD def get_milestones(db: Session, skip: int = 0, limit: int = 100): return db.query(models.Milestone).offset(skip).limit(limit).all() def get_milestone(db: Session, milestone_id: int): return db.query(models.Milestone).filter(models.Milestone.milestone_id == milestone_id).first() def create_milestone(db: Session, milestone: schemas.MilestoneCreate): db_milestone = models.Milestone(**milestone.model_dump()) db.add(db_milestone) db.commit() db.refresh(db_milestone) return db_milestone def update_milestone(db: Session, milestone_id: int, milestone: schemas.MilestoneCreate): db_milestone = get_milestone(db, milestone_id) if db_milestone: for key, value in milestone.model_dump().items(): setattr(db_milestone, key, value) db.commit() db.refresh(db_milestone) return db_milestone def delete_milestone(db: Session, milestone_id: int): db_milestone = get_milestone(db, milestone_id) if db_milestone: db.delete(db_milestone) db.commit() return db_milestone # Invoices CRUD def get_invoices(db: Session, skip: int = 0, limit: int = 100): return db.query(models.Invoice).offset(skip).limit(limit).all() def get_invoice(db: Session, invoice_id: str): return db.query(models.Invoice).filter(models.Invoice.invoice_id == invoice_id).first() def create_invoice(db: Session, invoice: schemas.InvoiceCreate): db_invoice = models.Invoice(**invoice.model_dump()) db.add(db_invoice) db.commit() db.refresh(db_invoice) return db_invoice def update_invoice(db: Session, invoice_id: str, invoice: schemas.InvoiceCreate): db_invoice = get_invoice(db, invoice_id) if db_invoice: for key, value in invoice.model_dump().items(): setattr(db_invoice, key, value) db.commit() db.refresh(db_invoice) return db_invoice def delete_invoice(db: Session, invoice_id: str): db_invoice = get_invoice(db, invoice_id) if db_invoice: db.delete(db_invoice) db.commit() return db_invoice # Subcontractors CRUD def get_subcontractors(db: Session, skip: int = 0, limit: int = 100): return db.query(models.Subcontractor).offset(skip).limit(limit).all() def get_subcontractor(db: Session, subcontractor_id: int): return db.query(models.Subcontractor).filter(models.Subcontractor.subcontractor_id == subcontractor_id).first() def create_subcontractor(db: Session, subcontractor: schemas.SubcontractorCreate): db_subcontractor = models.Subcontractor(**subcontractor.model_dump()) db.add(db_subcontractor) db.commit() db.refresh(db_subcontractor) return db_subcontractor def update_subcontractor(db: Session, subcontractor_id: int, subcontractor: schemas.SubcontractorCreate): db_subcontractor = get_subcontractor(db, subcontractor_id) if db_subcontractor: for key, value in subcontractor.model_dump().items(): setattr(db_subcontractor, key, value) db.commit() db.refresh(db_subcontractor) return db_subcontractor def delete_subcontractor(db: Session, subcontractor_id: int): db_subcontractor = get_subcontractor(db, subcontractor_id) if db_subcontractor: db.delete(db_subcontractor) db.commit() return db_subcontractor # Staff Allocations CRUD def get_staff_allocations(db: Session, skip: int = 0, limit: int = 100): return db.query(models.StaffAllocation).offset(skip).limit(limit).all() def get_staff_allocation(db: Session, allocation_id: int): return db.query(models.StaffAllocation).filter(models.StaffAllocation.allocation_id == allocation_id).first() def create_staff_allocation(db: Session, allocation: schemas.StaffAllocationCreate): db_allocation = models.StaffAllocation(**allocation.model_dump()) db.add(db_allocation) db.commit() db.refresh(db_allocation) return db_allocation def update_staff_allocation(db: Session, allocation_id: int, allocation: schemas.StaffAllocationCreate): db_allocation = get_staff_allocation(db, allocation_id) if db_allocation: for key, value in allocation.model_dump().items(): setattr(db_allocation, key, value) db.commit() db.refresh(db_allocation) return db_allocation def delete_staff_allocation(db: Session, allocation_id: int): db_allocation = get_staff_allocation(db, allocation_id) if db_allocation: db.delete(db_allocation) db.commit() return db_allocation