Spaces:
Running
Running
File size: 2,874 Bytes
a066e5a e5b256c a066e5a e5b256c a066e5a e5b256c a066e5a e5b256c a066e5a e5b256c a066e5a e5b256c a066e5a e5b256c a066e5a e5b256c a066e5a e5b256c a066e5a e5b256c a066e5a e5b256c a066e5a e5b256c a066e5a e5b256c a066e5a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 | import uuid
from fastapi import APIRouter, Depends, HTTPException, Request
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.models.smart_models import KnowledgeBase
from app.schemas.knowledge_base import KBEntryCreate, KBEntryResponse, KBEntryUpdate
from app.models.database import get_db
router = APIRouter()
@router.post("/kb/entries", status_code=201, response_model=KBEntryResponse)
def create_entry(
request: Request, entry: KBEntryCreate, db: Session = Depends(get_db)
) -> KBEntryResponse:
knowledge_base = KnowledgeBase()
knowledge_base.tenant_id = (
request.state.tenant_id
) # get the tenant_id from the middleware
knowledge_base.entry = entry.entry # type: ignore[truthy-bool]
db.add(knowledge_base)
db.commit()
db.refresh(knowledge_base)
return knowledge_base
@router.get("/kb/entries/{entry_id}", status_code=200, response_model=KBEntryResponse)
def read_entry(
request: Request, entry_id: uuid.UUID, db: Session = Depends(get_db)
) -> KBEntryResponse:
tenant_id = request.state.tenant_id
kb = db.scalars(
select(KnowledgeBase).where(
KnowledgeBase.id == entry_id, KnowledgeBase.tenant_id == tenant_id
)
).first()
if kb is None:
raise HTTPException(status_code=404, detail="Entry not found")
return kb
@router.get("/kb/entries", status_code=200, response_model=list[KBEntryResponse])
def read_entries(
request: Request, db: Session = Depends(get_db)
) -> list[KBEntryResponse]:
tenant_id = request.state.tenant_id
kb_entries = db.scalars(
select(KnowledgeBase).where(KnowledgeBase.tenant_id == tenant_id)
).all()
return kb_entries # type: ignore[truthy-bool]
@router.put("/kb/entries/{entry_id}", status_code=200, response_model=KBEntryResponse)
def update_entry(
request: Request,
entry_id: uuid.UUID,
entry: KBEntryUpdate,
db: Session = Depends(get_db),
) -> KBEntryResponse:
tenant_id = request.state.tenant_id
kb = db.scalars(
select(KnowledgeBase).where(
KnowledgeBase.tenant_id == tenant_id, KnowledgeBase.id == entry_id
)
).first()
if kb is None:
raise HTTPException(status_code=404, detail="Entry not found")
kb.entry = entry.entry # type: ignore[truthy-bool]
db.commit()
db.refresh(kb)
return kb
@router.delete("/kb/entries/{entry_id}", status_code=204)
def delete_entry(
request: Request, entry_id: uuid.UUID, db: Session = Depends(get_db)
) -> None:
tenant_id = request.state.tenant_id
kb = db.scalars(
select(KnowledgeBase).where(
KnowledgeBase.id == entry_id, KnowledgeBase.tenant_id == tenant_id
)
).first()
if kb is None:
raise HTTPException(status_code=404, detail="Entry not found")
db.delete(kb)
db.commit()
|