|
|
from sqlalchemy.orm import Session
|
|
|
from typing import List, Optional
|
|
|
import models
|
|
|
import schemas
|
|
|
|
|
|
|
|
|
def get_projects(db: Session, skip: int = 0, limit: int = 100):
|
|
|
return db.query(models.Project).offset(skip).limit(limit).all()
|
|
|
|
|
|
def get_project(db: Session, project_id: str):
|
|
|
return db.query(models.Project).filter(models.Project.project_id == project_id).first()
|
|
|
|
|
|
def create_project(db: Session, project: schemas.ProjectCreate):
|
|
|
db_project = models.Project(**project.model_dump())
|
|
|
db.add(db_project)
|
|
|
db.commit()
|
|
|
db.refresh(db_project)
|
|
|
return db_project
|
|
|
|
|
|
def update_project(db: Session, project_id: str, project: schemas.ProjectCreate):
|
|
|
db_project = get_project(db, project_id)
|
|
|
if db_project:
|
|
|
for key, value in project.model_dump().items():
|
|
|
setattr(db_project, key, value)
|
|
|
db.commit()
|
|
|
db.refresh(db_project)
|
|
|
return db_project
|
|
|
|
|
|
def delete_project(db: Session, project_id: str):
|
|
|
db_project = get_project(db, project_id)
|
|
|
if db_project:
|
|
|
db.delete(db_project)
|
|
|
db.commit()
|
|
|
return db_project
|
|
|
|
|
|
|
|
|
|
|
|
def get_employees(db: Session, skip: int = 0, limit: int = 100):
|
|
|
return db.query(models.Employee).offset(skip).limit(limit).all()
|
|
|
|
|
|
def get_employee(db: Session, employee_id: str):
|
|
|
return db.query(models.Employee).filter(models.Employee.employee_id == employee_id).first()
|
|
|
|
|
|
def create_employee(db: Session, employee: schemas.EmployeeCreate):
|
|
|
db_employee = models.Employee(**employee.model_dump())
|
|
|
db.add(db_employee)
|
|
|
db.commit()
|
|
|
db.refresh(db_employee)
|
|
|
return db_employee
|
|
|
|
|
|
def update_employee(db: Session, employee_id: str, employee: schemas.EmployeeCreate):
|
|
|
db_employee = get_employee(db, employee_id)
|
|
|
if db_employee:
|
|
|
for key, value in employee.model_dump().items():
|
|
|
setattr(db_employee, key, value)
|
|
|
db.commit()
|
|
|
db.refresh(db_employee)
|
|
|
return db_employee
|
|
|
|
|
|
def delete_employee(db: Session, employee_id: str):
|
|
|
db_employee = get_employee(db, employee_id)
|
|
|
if db_employee:
|
|
|
db.delete(db_employee)
|
|
|
db.commit()
|
|
|
return db_employee
|
|
|
|
|
|
|
|
|
|
|
|
def get_timesheets(db: Session, skip: int = 0, limit: int = 100):
|
|
|
return db.query(models.Timesheet).offset(skip).limit(limit).all()
|
|
|
|
|
|
def get_timesheet(db: Session, record_id: str):
|
|
|
return db.query(models.Timesheet).filter(models.Timesheet.record_id == record_id).first()
|
|
|
|
|
|
def create_timesheet(db: Session, timesheet: schemas.TimesheetCreate):
|
|
|
db_timesheet = models.Timesheet(**timesheet.model_dump())
|
|
|
db.add(db_timesheet)
|
|
|
db.commit()
|
|
|
db.refresh(db_timesheet)
|
|
|
return db_timesheet
|
|
|
|
|
|
def update_timesheet(db: Session, record_id: str, timesheet: schemas.TimesheetCreate):
|
|
|
db_timesheet = get_timesheet(db, record_id)
|
|
|
if db_timesheet:
|
|
|
for key, value in timesheet.model_dump().items():
|
|
|
setattr(db_timesheet, key, value)
|
|
|
db.commit()
|
|
|
db.refresh(db_timesheet)
|
|
|
return db_timesheet
|
|
|
|
|
|
def delete_timesheet(db: Session, record_id: str):
|
|
|
db_timesheet = get_timesheet(db, record_id)
|
|
|
if db_timesheet:
|
|
|
db.delete(db_timesheet)
|
|
|
db.commit()
|
|
|
return db_timesheet
|
|
|
|
|
|
|
|
|
|
|
|
def get_milestones(db: Session, skip: int = 0, limit: int = 100):
|
|
|
return db.query(models.Milestone).offset(skip).limit(limit).all()
|
|
|
|
|
|
def get_milestone(db: Session, milestone_id: int):
|
|
|
return db.query(models.Milestone).filter(models.Milestone.milestone_id == milestone_id).first()
|
|
|
|
|
|
def create_milestone(db: Session, milestone: schemas.MilestoneCreate):
|
|
|
db_milestone = models.Milestone(**milestone.model_dump())
|
|
|
db.add(db_milestone)
|
|
|
db.commit()
|
|
|
db.refresh(db_milestone)
|
|
|
return db_milestone
|
|
|
|
|
|
def update_milestone(db: Session, milestone_id: int, milestone: schemas.MilestoneCreate):
|
|
|
db_milestone = get_milestone(db, milestone_id)
|
|
|
if db_milestone:
|
|
|
for key, value in milestone.model_dump().items():
|
|
|
setattr(db_milestone, key, value)
|
|
|
db.commit()
|
|
|
db.refresh(db_milestone)
|
|
|
return db_milestone
|
|
|
|
|
|
def delete_milestone(db: Session, milestone_id: int):
|
|
|
db_milestone = get_milestone(db, milestone_id)
|
|
|
if db_milestone:
|
|
|
db.delete(db_milestone)
|
|
|
db.commit()
|
|
|
return db_milestone
|
|
|
|
|
|
|
|
|
|
|
|
def get_invoices(db: Session, skip: int = 0, limit: int = 100):
|
|
|
return db.query(models.Invoice).offset(skip).limit(limit).all()
|
|
|
|
|
|
def get_invoice(db: Session, invoice_id: str):
|
|
|
return db.query(models.Invoice).filter(models.Invoice.invoice_id == invoice_id).first()
|
|
|
|
|
|
def create_invoice(db: Session, invoice: schemas.InvoiceCreate):
|
|
|
db_invoice = models.Invoice(**invoice.model_dump())
|
|
|
db.add(db_invoice)
|
|
|
db.commit()
|
|
|
db.refresh(db_invoice)
|
|
|
return db_invoice
|
|
|
|
|
|
def update_invoice(db: Session, invoice_id: str, invoice: schemas.InvoiceCreate):
|
|
|
db_invoice = get_invoice(db, invoice_id)
|
|
|
if db_invoice:
|
|
|
for key, value in invoice.model_dump().items():
|
|
|
setattr(db_invoice, key, value)
|
|
|
db.commit()
|
|
|
db.refresh(db_invoice)
|
|
|
return db_invoice
|
|
|
|
|
|
def delete_invoice(db: Session, invoice_id: str):
|
|
|
db_invoice = get_invoice(db, invoice_id)
|
|
|
if db_invoice:
|
|
|
db.delete(db_invoice)
|
|
|
db.commit()
|
|
|
return db_invoice
|
|
|
|
|
|
|
|
|
|
|
|
def get_subcontractors(db: Session, skip: int = 0, limit: int = 100):
|
|
|
return db.query(models.Subcontractor).offset(skip).limit(limit).all()
|
|
|
|
|
|
def get_subcontractor(db: Session, subcontractor_id: int):
|
|
|
return db.query(models.Subcontractor).filter(models.Subcontractor.subcontractor_id == subcontractor_id).first()
|
|
|
|
|
|
def create_subcontractor(db: Session, subcontractor: schemas.SubcontractorCreate):
|
|
|
db_subcontractor = models.Subcontractor(**subcontractor.model_dump())
|
|
|
db.add(db_subcontractor)
|
|
|
db.commit()
|
|
|
db.refresh(db_subcontractor)
|
|
|
return db_subcontractor
|
|
|
|
|
|
def update_subcontractor(db: Session, subcontractor_id: int, subcontractor: schemas.SubcontractorCreate):
|
|
|
db_subcontractor = get_subcontractor(db, subcontractor_id)
|
|
|
if db_subcontractor:
|
|
|
for key, value in subcontractor.model_dump().items():
|
|
|
setattr(db_subcontractor, key, value)
|
|
|
db.commit()
|
|
|
db.refresh(db_subcontractor)
|
|
|
return db_subcontractor
|
|
|
|
|
|
def delete_subcontractor(db: Session, subcontractor_id: int):
|
|
|
db_subcontractor = get_subcontractor(db, subcontractor_id)
|
|
|
if db_subcontractor:
|
|
|
db.delete(db_subcontractor)
|
|
|
db.commit()
|
|
|
return db_subcontractor
|
|
|
|
|
|
|
|
|
|
|
|
def get_staff_allocations(db: Session, skip: int = 0, limit: int = 100):
|
|
|
return db.query(models.StaffAllocation).offset(skip).limit(limit).all()
|
|
|
|
|
|
def get_staff_allocation(db: Session, allocation_id: int):
|
|
|
return db.query(models.StaffAllocation).filter(models.StaffAllocation.allocation_id == allocation_id).first()
|
|
|
|
|
|
def create_staff_allocation(db: Session, allocation: schemas.StaffAllocationCreate):
|
|
|
db_allocation = models.StaffAllocation(**allocation.model_dump())
|
|
|
db.add(db_allocation)
|
|
|
db.commit()
|
|
|
db.refresh(db_allocation)
|
|
|
return db_allocation
|
|
|
|
|
|
def update_staff_allocation(db: Session, allocation_id: int, allocation: schemas.StaffAllocationCreate):
|
|
|
db_allocation = get_staff_allocation(db, allocation_id)
|
|
|
if db_allocation:
|
|
|
for key, value in allocation.model_dump().items():
|
|
|
setattr(db_allocation, key, value)
|
|
|
db.commit()
|
|
|
db.refresh(db_allocation)
|
|
|
return db_allocation
|
|
|
|
|
|
def delete_staff_allocation(db: Session, allocation_id: int):
|
|
|
db_allocation = get_staff_allocation(db, allocation_id)
|
|
|
if db_allocation:
|
|
|
db.delete(db_allocation)
|
|
|
db.commit()
|
|
|
return db_allocation |