Kevin Hu
mv service_conf.yaml to conf/ and fix: add 'answer' as a parameter to 'generate' (#3379)
587bed3
| # | |
| # Copyright 2024 The InfiniFlow Authors. All Rights Reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| # | |
| import pathlib | |
| import datetime | |
| from api.db.services.dialog_service import keyword_extraction | |
| from rag.app.qa import rmPrefix, beAdoc | |
| from rag.nlp import rag_tokenizer | |
| from api.db import LLMType, ParserType | |
| from api.db.services.llm_service import TenantLLMService | |
| from api.settings import kg_retrievaler | |
| import hashlib | |
| import re | |
| from api.utils.api_utils import token_required | |
| from api.db.db_models import Task | |
| from api.db.services.task_service import TaskService, queue_tasks | |
| from api.utils.api_utils import server_error_response | |
| from api.utils.api_utils import get_result, get_error_data_result | |
| from io import BytesIO | |
| from flask import request, send_file | |
| from api.db import FileSource, TaskStatus, FileType | |
| from api.db.db_models import File | |
| from api.db.services.document_service import DocumentService | |
| from api.db.services.file2document_service import File2DocumentService | |
| from api.db.services.file_service import FileService | |
| from api.db.services.knowledgebase_service import KnowledgebaseService | |
| from api.settings import RetCode, retrievaler | |
| from api.utils.api_utils import construct_json_result, get_parser_config | |
| from rag.nlp import search | |
| from rag.utils import rmSpace | |
| from api.settings import docStoreConn | |
| from rag.utils.storage_factory import STORAGE_IMPL | |
| import os | |
| MAXIMUM_OF_UPLOADING_FILES = 256 | |
| def upload(dataset_id, tenant_id): | |
| """ | |
| Upload documents to a dataset. | |
| --- | |
| tags: | |
| - Documents | |
| security: | |
| - ApiKeyAuth: [] | |
| parameters: | |
| - in: path | |
| name: dataset_id | |
| type: string | |
| required: true | |
| description: ID of the dataset. | |
| - in: header | |
| name: Authorization | |
| type: string | |
| required: true | |
| description: Bearer token for authentication. | |
| - in: formData | |
| name: file | |
| type: file | |
| required: true | |
| description: Document files to upload. | |
| responses: | |
| 200: | |
| description: Successfully uploaded documents. | |
| schema: | |
| type: object | |
| properties: | |
| data: | |
| type: array | |
| items: | |
| type: object | |
| properties: | |
| id: | |
| type: string | |
| description: Document ID. | |
| name: | |
| type: string | |
| description: Document name. | |
| chunk_count: | |
| type: integer | |
| description: Number of chunks. | |
| token_count: | |
| type: integer | |
| description: Number of tokens. | |
| dataset_id: | |
| type: string | |
| description: ID of the dataset. | |
| chunk_method: | |
| type: string | |
| description: Chunking method used. | |
| run: | |
| type: string | |
| description: Processing status. | |
| """ | |
| if "file" not in request.files: | |
| return get_error_data_result( | |
| message="No file part!", code=RetCode.ARGUMENT_ERROR | |
| ) | |
| file_objs = request.files.getlist("file") | |
| for file_obj in file_objs: | |
| if file_obj.filename == "": | |
| return get_result( | |
| message="No file selected!", code=RetCode.ARGUMENT_ERROR | |
| ) | |
| # total size | |
| total_size = 0 | |
| for file_obj in file_objs: | |
| file_obj.seek(0, os.SEEK_END) | |
| total_size += file_obj.tell() | |
| file_obj.seek(0) | |
| MAX_TOTAL_FILE_SIZE = 10 * 1024 * 1024 | |
| if total_size > MAX_TOTAL_FILE_SIZE: | |
| return get_result( | |
| message=f"Total file size exceeds 10MB limit! ({total_size / (1024 * 1024):.2f} MB)", | |
| code=RetCode.ARGUMENT_ERROR, | |
| ) | |
| e, kb = KnowledgebaseService.get_by_id(dataset_id) | |
| if not e: | |
| raise LookupError(f"Can't find the dataset with ID {dataset_id}!") | |
| err, files = FileService.upload_document(kb, file_objs, tenant_id) | |
| if err: | |
| return get_result(message="\n".join(err), code=RetCode.SERVER_ERROR) | |
| # rename key's name | |
| renamed_doc_list = [] | |
| for file in files: | |
| doc = file[0] | |
| key_mapping = { | |
| "chunk_num": "chunk_count", | |
| "kb_id": "dataset_id", | |
| "token_num": "token_count", | |
| "parser_id": "chunk_method", | |
| } | |
| renamed_doc = {} | |
| for key, value in doc.items(): | |
| new_key = key_mapping.get(key, key) | |
| renamed_doc[new_key] = value | |
| renamed_doc["run"] = "UNSTART" | |
| renamed_doc_list.append(renamed_doc) | |
| return get_result(data=renamed_doc_list) | |
| def update_doc(tenant_id, dataset_id, document_id): | |
| """ | |
| Update a document within a dataset. | |
| --- | |
| tags: | |
| - Documents | |
| security: | |
| - ApiKeyAuth: [] | |
| parameters: | |
| - in: path | |
| name: dataset_id | |
| type: string | |
| required: true | |
| description: ID of the dataset. | |
| - in: path | |
| name: document_id | |
| type: string | |
| required: true | |
| description: ID of the document to update. | |
| - in: header | |
| name: Authorization | |
| type: string | |
| required: true | |
| description: Bearer token for authentication. | |
| - in: body | |
| name: body | |
| description: Document update parameters. | |
| required: true | |
| schema: | |
| type: object | |
| properties: | |
| name: | |
| type: string | |
| description: New name of the document. | |
| parser_config: | |
| type: object | |
| description: Parser configuration. | |
| chunk_method: | |
| type: string | |
| description: Chunking method. | |
| responses: | |
| 200: | |
| description: Document updated successfully. | |
| schema: | |
| type: object | |
| """ | |
| req = request.json | |
| if not KnowledgebaseService.query(id=dataset_id, tenant_id=tenant_id): | |
| return get_error_data_result(message="You don't own the dataset.") | |
| doc = DocumentService.query(kb_id=dataset_id, id=document_id) | |
| if not doc: | |
| return get_error_data_result(message="The dataset doesn't own the document.") | |
| doc = doc[0] | |
| if "chunk_count" in req: | |
| if req["chunk_count"] != doc.chunk_num: | |
| return get_error_data_result(message="Can't change `chunk_count`.") | |
| if "token_count" in req: | |
| if req["token_count"] != doc.token_num: | |
| return get_error_data_result(message="Can't change `token_count`.") | |
| if "progress" in req: | |
| if req["progress"] != doc.progress: | |
| return get_error_data_result(message="Can't change `progress`.") | |
| if "name" in req and req["name"] != doc.name: | |
| if ( | |
| pathlib.Path(req["name"].lower()).suffix | |
| != pathlib.Path(doc.name.lower()).suffix | |
| ): | |
| return get_result( | |
| message="The extension of file can't be changed", | |
| code=RetCode.ARGUMENT_ERROR, | |
| ) | |
| for d in DocumentService.query(name=req["name"], kb_id=doc.kb_id): | |
| if d.name == req["name"]: | |
| return get_error_data_result( | |
| message="Duplicated document name in the same dataset." | |
| ) | |
| if not DocumentService.update_by_id(document_id, {"name": req["name"]}): | |
| return get_error_data_result(message="Database error (Document rename)!") | |
| informs = File2DocumentService.get_by_document_id(document_id) | |
| if informs: | |
| e, file = FileService.get_by_id(informs[0].file_id) | |
| FileService.update_by_id(file.id, {"name": req["name"]}) | |
| if "parser_config" in req: | |
| DocumentService.update_parser_config(doc.id, req["parser_config"]) | |
| if "chunk_method" in req: | |
| valid_chunk_method = { | |
| "naive", | |
| "manual", | |
| "qa", | |
| "table", | |
| "paper", | |
| "book", | |
| "laws", | |
| "presentation", | |
| "picture", | |
| "one", | |
| "knowledge_graph", | |
| "email", | |
| } | |
| if req.get("chunk_method") not in valid_chunk_method: | |
| return get_error_data_result( | |
| f"`chunk_method` {req['chunk_method']} doesn't exist" | |
| ) | |
| if doc.parser_id.lower() == req["chunk_method"].lower(): | |
| return get_result() | |
| if doc.type == FileType.VISUAL or re.search(r"\.(ppt|pptx|pages)$", doc.name): | |
| return get_error_data_result(message="Not supported yet!") | |
| e = DocumentService.update_by_id( | |
| doc.id, | |
| { | |
| "parser_id": req["chunk_method"], | |
| "progress": 0, | |
| "progress_msg": "", | |
| "run": TaskStatus.UNSTART.value, | |
| }, | |
| ) | |
| if not e: | |
| return get_error_data_result(message="Document not found!") | |
| req["parser_config"] = get_parser_config( | |
| req["chunk_method"], req.get("parser_config") | |
| ) | |
| DocumentService.update_parser_config(doc.id, req["parser_config"]) | |
| if doc.token_num > 0: | |
| e = DocumentService.increment_chunk_num( | |
| doc.id, | |
| doc.kb_id, | |
| doc.token_num * -1, | |
| doc.chunk_num * -1, | |
| doc.process_duation * -1, | |
| ) | |
| if not e: | |
| return get_error_data_result(message="Document not found!") | |
| docStoreConn.delete({"doc_id": doc.id}, search.index_name(tenant_id), dataset_id) | |
| return get_result() | |
| def download(tenant_id, dataset_id, document_id): | |
| """ | |
| Download a document from a dataset. | |
| --- | |
| tags: | |
| - Documents | |
| security: | |
| - ApiKeyAuth: [] | |
| produces: | |
| - application/octet-stream | |
| parameters: | |
| - in: path | |
| name: dataset_id | |
| type: string | |
| required: true | |
| description: ID of the dataset. | |
| - in: path | |
| name: document_id | |
| type: string | |
| required: true | |
| description: ID of the document to download. | |
| - in: header | |
| name: Authorization | |
| type: string | |
| required: true | |
| description: Bearer token for authentication. | |
| responses: | |
| 200: | |
| description: Document file stream. | |
| schema: | |
| type: file | |
| 400: | |
| description: Error message. | |
| schema: | |
| type: object | |
| """ | |
| if not KnowledgebaseService.query(id=dataset_id, tenant_id=tenant_id): | |
| return get_error_data_result(message=f"You do not own the dataset {dataset_id}.") | |
| doc = DocumentService.query(kb_id=dataset_id, id=document_id) | |
| if not doc: | |
| return get_error_data_result( | |
| message=f"The dataset not own the document {document_id}." | |
| ) | |
| # The process of downloading | |
| doc_id, doc_location = File2DocumentService.get_storage_address( | |
| doc_id=document_id | |
| ) # minio address | |
| file_stream = STORAGE_IMPL.get(doc_id, doc_location) | |
| if not file_stream: | |
| return construct_json_result( | |
| message="This file is empty.", code=RetCode.DATA_ERROR | |
| ) | |
| file = BytesIO(file_stream) | |
| # Use send_file with a proper filename and MIME type | |
| return send_file( | |
| file, | |
| as_attachment=True, | |
| download_name=doc[0].name, | |
| mimetype="application/octet-stream", # Set a default MIME type | |
| ) | |
| def list_docs(dataset_id, tenant_id): | |
| """ | |
| List documents in a dataset. | |
| --- | |
| tags: | |
| - Documents | |
| security: | |
| - ApiKeyAuth: [] | |
| parameters: | |
| - in: path | |
| name: dataset_id | |
| type: string | |
| required: true | |
| description: ID of the dataset. | |
| - in: query | |
| name: id | |
| type: string | |
| required: false | |
| description: Filter by document ID. | |
| - in: query | |
| name: page | |
| type: integer | |
| required: false | |
| default: 1 | |
| description: Page number. | |
| - in: query | |
| name: page_size | |
| type: integer | |
| required: false | |
| default: 30 | |
| description: Number of items per page. | |
| - in: query | |
| name: orderby | |
| type: string | |
| required: false | |
| default: "create_time" | |
| description: Field to order by. | |
| - in: query | |
| name: desc | |
| type: boolean | |
| required: false | |
| default: true | |
| description: Order in descending. | |
| - in: header | |
| name: Authorization | |
| type: string | |
| required: true | |
| description: Bearer token for authentication. | |
| responses: | |
| 200: | |
| description: List of documents. | |
| schema: | |
| type: object | |
| properties: | |
| total: | |
| type: integer | |
| description: Total number of documents. | |
| docs: | |
| type: array | |
| items: | |
| type: object | |
| properties: | |
| id: | |
| type: string | |
| description: Document ID. | |
| name: | |
| type: string | |
| description: Document name. | |
| chunk_count: | |
| type: integer | |
| description: Number of chunks. | |
| token_count: | |
| type: integer | |
| description: Number of tokens. | |
| dataset_id: | |
| type: string | |
| description: ID of the dataset. | |
| chunk_method: | |
| type: string | |
| description: Chunking method used. | |
| run: | |
| type: string | |
| description: Processing status. | |
| """ | |
| if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id): | |
| return get_error_data_result(message=f"You don't own the dataset {dataset_id}. ") | |
| id = request.args.get("id") | |
| name = request.args.get("name") | |
| if not DocumentService.query(id=id, kb_id=dataset_id): | |
| return get_error_data_result(message=f"You don't own the document {id}.") | |
| if not DocumentService.query(name=name, kb_id=dataset_id): | |
| return get_error_data_result(message=f"You don't own the document {name}.") | |
| page = int(request.args.get("page", 1)) | |
| keywords = request.args.get("keywords", "") | |
| page_size = int(request.args.get("page_size", 30)) | |
| orderby = request.args.get("orderby", "create_time") | |
| if request.args.get("desc") == "False": | |
| desc = False | |
| else: | |
| desc = True | |
| docs, tol = DocumentService.get_list( | |
| dataset_id, page, page_size, orderby, desc, keywords, id, name | |
| ) | |
| # rename key's name | |
| renamed_doc_list = [] | |
| for doc in docs: | |
| key_mapping = { | |
| "chunk_num": "chunk_count", | |
| "kb_id": "dataset_id", | |
| "token_num": "token_count", | |
| "parser_id": "chunk_method", | |
| } | |
| run_mapping = { | |
| "0": "UNSTART", | |
| "1": "RUNNING", | |
| "2": "CANCEL", | |
| "3": "DONE", | |
| "4": "FAIL", | |
| } | |
| renamed_doc = {} | |
| for key, value in doc.items(): | |
| if key == "run": | |
| renamed_doc["run"] = run_mapping.get(str(value)) | |
| new_key = key_mapping.get(key, key) | |
| renamed_doc[new_key] = value | |
| if key == "run": | |
| renamed_doc["run"] = run_mapping.get(value) | |
| renamed_doc_list.append(renamed_doc) | |
| return get_result(data={"total": tol, "docs": renamed_doc_list}) | |
| def delete(tenant_id, dataset_id): | |
| """ | |
| Delete documents from a dataset. | |
| --- | |
| tags: | |
| - Documents | |
| security: | |
| - ApiKeyAuth: [] | |
| parameters: | |
| - in: path | |
| name: dataset_id | |
| type: string | |
| required: true | |
| description: ID of the dataset. | |
| - in: body | |
| name: body | |
| description: Document deletion parameters. | |
| required: true | |
| schema: | |
| type: object | |
| properties: | |
| ids: | |
| type: array | |
| items: | |
| type: string | |
| description: List of document IDs to delete. | |
| - in: header | |
| name: Authorization | |
| type: string | |
| required: true | |
| description: Bearer token for authentication. | |
| responses: | |
| 200: | |
| description: Documents deleted successfully. | |
| schema: | |
| type: object | |
| """ | |
| if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id): | |
| return get_error_data_result(message=f"You don't own the dataset {dataset_id}. ") | |
| req = request.json | |
| if not req: | |
| doc_ids = None | |
| else: | |
| doc_ids = req.get("ids") | |
| if not doc_ids: | |
| doc_list = [] | |
| docs = DocumentService.query(kb_id=dataset_id) | |
| for doc in docs: | |
| doc_list.append(doc.id) | |
| else: | |
| doc_list = doc_ids | |
| root_folder = FileService.get_root_folder(tenant_id) | |
| pf_id = root_folder["id"] | |
| FileService.init_knowledgebase_docs(pf_id, tenant_id) | |
| errors = "" | |
| for doc_id in doc_list: | |
| try: | |
| e, doc = DocumentService.get_by_id(doc_id) | |
| if not e: | |
| return get_error_data_result(message="Document not found!") | |
| tenant_id = DocumentService.get_tenant_id(doc_id) | |
| if not tenant_id: | |
| return get_error_data_result(message="Tenant not found!") | |
| b, n = File2DocumentService.get_storage_address(doc_id=doc_id) | |
| if not DocumentService.remove_document(doc, tenant_id): | |
| return get_error_data_result( | |
| message="Database error (Document removal)!" | |
| ) | |
| f2d = File2DocumentService.get_by_document_id(doc_id) | |
| FileService.filter_delete( | |
| [ | |
| File.source_type == FileSource.KNOWLEDGEBASE, | |
| File.id == f2d[0].file_id, | |
| ] | |
| ) | |
| File2DocumentService.delete_by_document_id(doc_id) | |
| STORAGE_IMPL.rm(b, n) | |
| except Exception as e: | |
| errors += str(e) | |
| if errors: | |
| return get_result(message=errors, code=RetCode.SERVER_ERROR) | |
| return get_result() | |
| def parse(tenant_id, dataset_id): | |
| """ | |
| Start parsing documents into chunks. | |
| --- | |
| tags: | |
| - Chunks | |
| security: | |
| - ApiKeyAuth: [] | |
| parameters: | |
| - in: path | |
| name: dataset_id | |
| type: string | |
| required: true | |
| description: ID of the dataset. | |
| - in: body | |
| name: body | |
| description: Parsing parameters. | |
| required: true | |
| schema: | |
| type: object | |
| properties: | |
| document_ids: | |
| type: array | |
| items: | |
| type: string | |
| description: List of document IDs to parse. | |
| - in: header | |
| name: Authorization | |
| type: string | |
| required: true | |
| description: Bearer token for authentication. | |
| responses: | |
| 200: | |
| description: Parsing started successfully. | |
| schema: | |
| type: object | |
| """ | |
| if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id): | |
| return get_error_data_result(message=f"You don't own the dataset {dataset_id}.") | |
| req = request.json | |
| if not req.get("document_ids"): | |
| return get_error_data_result("`document_ids` is required") | |
| for id in req["document_ids"]: | |
| doc = DocumentService.query(id=id, kb_id=dataset_id) | |
| if not doc: | |
| return get_error_data_result(message=f"You don't own the document {id}.") | |
| if doc[0].progress != 0.0: | |
| return get_error_data_result( | |
| "Can't stop parsing document with progress at 0 or 100" | |
| ) | |
| info = {"run": "1", "progress": 0} | |
| info["progress_msg"] = "" | |
| info["chunk_num"] = 0 | |
| info["token_num"] = 0 | |
| DocumentService.update_by_id(id, info) | |
| docStoreConn.delete({"doc_id": id}, search.index_name(tenant_id), dataset_id) | |
| TaskService.filter_delete([Task.doc_id == id]) | |
| e, doc = DocumentService.get_by_id(id) | |
| doc = doc.to_dict() | |
| doc["tenant_id"] = tenant_id | |
| bucket, name = File2DocumentService.get_storage_address(doc_id=doc["id"]) | |
| queue_tasks(doc, bucket, name) | |
| return get_result() | |
| def stop_parsing(tenant_id, dataset_id): | |
| """ | |
| Stop parsing documents into chunks. | |
| --- | |
| tags: | |
| - Chunks | |
| security: | |
| - ApiKeyAuth: [] | |
| parameters: | |
| - in: path | |
| name: dataset_id | |
| type: string | |
| required: true | |
| description: ID of the dataset. | |
| - in: body | |
| name: body | |
| description: Stop parsing parameters. | |
| required: true | |
| schema: | |
| type: object | |
| properties: | |
| document_ids: | |
| type: array | |
| items: | |
| type: string | |
| description: List of document IDs to stop parsing. | |
| - in: header | |
| name: Authorization | |
| type: string | |
| required: true | |
| description: Bearer token for authentication. | |
| responses: | |
| 200: | |
| description: Parsing stopped successfully. | |
| schema: | |
| type: object | |
| """ | |
| if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id): | |
| return get_error_data_result(message=f"You don't own the dataset {dataset_id}.") | |
| req = request.json | |
| if not req.get("document_ids"): | |
| return get_error_data_result("`document_ids` is required") | |
| for id in req["document_ids"]: | |
| doc = DocumentService.query(id=id, kb_id=dataset_id) | |
| if not doc: | |
| return get_error_data_result(message=f"You don't own the document {id}.") | |
| if int(doc[0].progress) == 1 or int(doc[0].progress) == 0: | |
| return get_error_data_result( | |
| "Can't stop parsing document with progress at 0 or 1" | |
| ) | |
| info = {"run": "2", "progress": 0, "chunk_num": 0} | |
| DocumentService.update_by_id(id, info) | |
| docStoreConn.delete({"doc_id": doc.id}, search.index_name(tenant_id), dataset_id) | |
| return get_result() | |
| def list_chunks(tenant_id, dataset_id, document_id): | |
| """ | |
| List chunks of a document. | |
| --- | |
| tags: | |
| - Chunks | |
| security: | |
| - ApiKeyAuth: [] | |
| parameters: | |
| - in: path | |
| name: dataset_id | |
| type: string | |
| required: true | |
| description: ID of the dataset. | |
| - in: path | |
| name: document_id | |
| type: string | |
| required: true | |
| description: ID of the document. | |
| - in: query | |
| name: page | |
| type: integer | |
| required: false | |
| default: 1 | |
| description: Page number. | |
| - in: query | |
| name: page_size | |
| type: integer | |
| required: false | |
| default: 30 | |
| description: Number of items per page. | |
| - in: header | |
| name: Authorization | |
| type: string | |
| required: true | |
| description: Bearer token for authentication. | |
| responses: | |
| 200: | |
| description: List of chunks. | |
| schema: | |
| type: object | |
| properties: | |
| total: | |
| type: integer | |
| description: Total number of chunks. | |
| chunks: | |
| type: array | |
| items: | |
| type: object | |
| properties: | |
| id: | |
| type: string | |
| description: Chunk ID. | |
| content: | |
| type: string | |
| description: Chunk content. | |
| document_id: | |
| type: string | |
| description: ID of the document. | |
| important_keywords: | |
| type: array | |
| items: | |
| type: string | |
| description: Important keywords. | |
| image_id: | |
| type: string | |
| description: Image ID associated with the chunk. | |
| doc: | |
| type: object | |
| description: Document details. | |
| """ | |
| if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id): | |
| return get_error_data_result(message=f"You don't own the dataset {dataset_id}.") | |
| doc = DocumentService.query(id=document_id, kb_id=dataset_id) | |
| if not doc: | |
| return get_error_data_result( | |
| message=f"You don't own the document {document_id}." | |
| ) | |
| doc = doc[0] | |
| req = request.args | |
| doc_id = document_id | |
| page = int(req.get("page", 1)) | |
| size = int(req.get("page_size", 30)) | |
| question = req.get("keywords", "") | |
| query = { | |
| "doc_ids": [doc_id], | |
| "page": page, | |
| "size": size, | |
| "question": question, | |
| "sort": True, | |
| } | |
| key_mapping = { | |
| "chunk_num": "chunk_count", | |
| "kb_id": "dataset_id", | |
| "token_num": "token_count", | |
| "parser_id": "chunk_method", | |
| } | |
| run_mapping = { | |
| "0": "UNSTART", | |
| "1": "RUNNING", | |
| "2": "CANCEL", | |
| "3": "DONE", | |
| "4": "FAIL", | |
| } | |
| doc = doc.to_dict() | |
| renamed_doc = {} | |
| for key, value in doc.items(): | |
| new_key = key_mapping.get(key, key) | |
| renamed_doc[new_key] = value | |
| if key == "run": | |
| renamed_doc["run"] = run_mapping.get(str(value)) | |
| res = {"total": 0, "chunks": [], "doc": renamed_doc} | |
| origin_chunks = [] | |
| if docStoreConn.indexExist(search.index_name(tenant_id), dataset_id): | |
| sres = retrievaler.search(query, search.index_name(tenant_id), [dataset_id], emb_mdl=None, highlight=True) | |
| res["total"] = sres.total | |
| sign = 0 | |
| for id in sres.ids: | |
| d = { | |
| "id": id, | |
| "content_with_weight": ( | |
| rmSpace(sres.highlight[id]) | |
| if question and id in sres.highlight | |
| else sres.field[id].get("content_with_weight", "") | |
| ), | |
| "doc_id": sres.field[id]["doc_id"], | |
| "docnm_kwd": sres.field[id]["docnm_kwd"], | |
| "important_kwd": sres.field[id].get("important_kwd", []), | |
| "img_id": sres.field[id].get("img_id", ""), | |
| "available_int": sres.field[id].get("available_int", 1), | |
| "positions": sres.field[id].get("position_int", "").split("\t"), | |
| } | |
| if len(d["positions"]) % 5 == 0: | |
| poss = [] | |
| for i in range(0, len(d["positions"]), 5): | |
| poss.append( | |
| [ | |
| float(d["positions"][i]), | |
| float(d["positions"][i + 1]), | |
| float(d["positions"][i + 2]), | |
| float(d["positions"][i + 3]), | |
| float(d["positions"][i + 4]), | |
| ] | |
| ) | |
| d["positions"] = poss | |
| origin_chunks.append(d) | |
| if req.get("id"): | |
| if req.get("id") == id: | |
| origin_chunks.clear() | |
| origin_chunks.append(d) | |
| sign = 1 | |
| break | |
| if req.get("id"): | |
| if sign == 0: | |
| return get_error_data_result(f"Can't find this chunk {req.get('id')}") | |
| for chunk in origin_chunks: | |
| key_mapping = { | |
| "id": "id", | |
| "content_with_weight": "content", | |
| "doc_id": "document_id", | |
| "important_kwd": "important_keywords", | |
| "img_id": "image_id", | |
| "available_int": "available", | |
| } | |
| renamed_chunk = {} | |
| for key, value in chunk.items(): | |
| new_key = key_mapping.get(key, key) | |
| renamed_chunk[new_key] = value | |
| if renamed_chunk["available"] == 0: | |
| renamed_chunk["available"] = False | |
| if renamed_chunk["available"] == 1: | |
| renamed_chunk["available"] = True | |
| res["chunks"].append(renamed_chunk) | |
| return get_result(data=res) | |
| def add_chunk(tenant_id, dataset_id, document_id): | |
| """ | |
| Add a chunk to a document. | |
| --- | |
| tags: | |
| - Chunks | |
| security: | |
| - ApiKeyAuth: [] | |
| parameters: | |
| - in: path | |
| name: dataset_id | |
| type: string | |
| required: true | |
| description: ID of the dataset. | |
| - in: path | |
| name: document_id | |
| type: string | |
| required: true | |
| description: ID of the document. | |
| - in: body | |
| name: body | |
| description: Chunk data. | |
| required: true | |
| schema: | |
| type: object | |
| properties: | |
| content: | |
| type: string | |
| required: true | |
| description: Content of the chunk. | |
| important_keywords: | |
| type: array | |
| items: | |
| type: string | |
| description: Important keywords. | |
| - in: header | |
| name: Authorization | |
| type: string | |
| required: true | |
| description: Bearer token for authentication. | |
| responses: | |
| 200: | |
| description: Chunk added successfully. | |
| schema: | |
| type: object | |
| properties: | |
| chunk: | |
| type: object | |
| properties: | |
| id: | |
| type: string | |
| description: Chunk ID. | |
| content: | |
| type: string | |
| description: Chunk content. | |
| document_id: | |
| type: string | |
| description: ID of the document. | |
| important_keywords: | |
| type: array | |
| items: | |
| type: string | |
| description: Important keywords. | |
| """ | |
| if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id): | |
| return get_error_data_result(message=f"You don't own the dataset {dataset_id}.") | |
| doc = DocumentService.query(id=document_id, kb_id=dataset_id) | |
| if not doc: | |
| return get_error_data_result( | |
| message=f"You don't own the document {document_id}." | |
| ) | |
| doc = doc[0] | |
| req = request.json | |
| if not req.get("content"): | |
| return get_error_data_result(message="`content` is required") | |
| if "important_keywords" in req: | |
| if type(req["important_keywords"]) != list: | |
| return get_error_data_result( | |
| "`important_keywords` is required to be a list" | |
| ) | |
| md5 = hashlib.md5() | |
| md5.update((req["content"] + document_id).encode("utf-8")) | |
| chunk_id = md5.hexdigest() | |
| d = { | |
| "id": chunk_id, | |
| "content_ltks": rag_tokenizer.tokenize(req["content"]), | |
| "content_with_weight": req["content"], | |
| } | |
| d["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(d["content_ltks"]) | |
| d["important_kwd"] = req.get("important_keywords", []) | |
| d["important_tks"] = rag_tokenizer.tokenize( | |
| " ".join(req.get("important_keywords", [])) | |
| ) | |
| d["create_time"] = str(datetime.datetime.now()).replace("T", " ")[:19] | |
| d["create_timestamp_flt"] = datetime.datetime.now().timestamp() | |
| d["kb_id"] = dataset_id | |
| d["docnm_kwd"] = doc.name | |
| d["doc_id"] = document_id | |
| embd_id = DocumentService.get_embd_id(document_id) | |
| embd_mdl = TenantLLMService.model_instance( | |
| tenant_id, LLMType.EMBEDDING.value, embd_id | |
| ) | |
| v, c = embd_mdl.encode([doc.name, req["content"]]) | |
| v = 0.1 * v[0] + 0.9 * v[1] | |
| d["q_%d_vec" % len(v)] = v.tolist() | |
| docStoreConn.insert([d], search.index_name(tenant_id), dataset_id) | |
| DocumentService.increment_chunk_num(doc.id, doc.kb_id, c, 1, 0) | |
| # rename keys | |
| key_mapping = { | |
| "id": "id", | |
| "content_with_weight": "content", | |
| "doc_id": "document_id", | |
| "important_kwd": "important_keywords", | |
| "kb_id": "dataset_id", | |
| "create_timestamp_flt": "create_timestamp", | |
| "create_time": "create_time", | |
| "document_keyword": "document", | |
| } | |
| renamed_chunk = {} | |
| for key, value in d.items(): | |
| if key in key_mapping: | |
| new_key = key_mapping.get(key, key) | |
| renamed_chunk[new_key] = value | |
| return get_result(data={"chunk": renamed_chunk}) | |
| # return get_result(data={"chunk_id": chunk_id}) | |
| def rm_chunk(tenant_id, dataset_id, document_id): | |
| """ | |
| Remove chunks from a document. | |
| --- | |
| tags: | |
| - Chunks | |
| security: | |
| - ApiKeyAuth: [] | |
| parameters: | |
| - in: path | |
| name: dataset_id | |
| type: string | |
| required: true | |
| description: ID of the dataset. | |
| - in: path | |
| name: document_id | |
| type: string | |
| required: true | |
| description: ID of the document. | |
| - in: body | |
| name: body | |
| description: Chunk removal parameters. | |
| required: true | |
| schema: | |
| type: object | |
| properties: | |
| chunk_ids: | |
| type: array | |
| items: | |
| type: string | |
| description: List of chunk IDs to remove. | |
| - in: header | |
| name: Authorization | |
| type: string | |
| required: true | |
| description: Bearer token for authentication. | |
| responses: | |
| 200: | |
| description: Chunks removed successfully. | |
| schema: | |
| type: object | |
| """ | |
| if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id): | |
| return get_error_data_result(message=f"You don't own the dataset {dataset_id}.") | |
| req = request.json | |
| condition = {"doc_id": document_id} | |
| if "chunk_ids" in req: | |
| condition["id"] = req["chunk_ids"] | |
| chunk_number = docStoreConn.delete(condition, search.index_name(tenant_id), dataset_id) | |
| if chunk_number != 0: | |
| DocumentService.decrement_chunk_num(document_id, dataset_id, 1, chunk_number, 0) | |
| if "chunk_ids" in req and chunk_number != len(req["chunk_ids"]): | |
| return get_error_data_result(message=f"rm_chunk deleted chunks {chunk_number}, expect {len(req['chunk_ids'])}") | |
| return get_result(message=f"deleted {chunk_number} chunks") | |
| def update_chunk(tenant_id, dataset_id, document_id, chunk_id): | |
| """ | |
| Update a chunk within a document. | |
| --- | |
| tags: | |
| - Chunks | |
| security: | |
| - ApiKeyAuth: [] | |
| parameters: | |
| - in: path | |
| name: dataset_id | |
| type: string | |
| required: true | |
| description: ID of the dataset. | |
| - in: path | |
| name: document_id | |
| type: string | |
| required: true | |
| description: ID of the document. | |
| - in: path | |
| name: chunk_id | |
| type: string | |
| required: true | |
| description: ID of the chunk to update. | |
| - in: body | |
| name: body | |
| description: Chunk update parameters. | |
| required: true | |
| schema: | |
| type: object | |
| properties: | |
| content: | |
| type: string | |
| description: Updated content of the chunk. | |
| important_keywords: | |
| type: array | |
| items: | |
| type: string | |
| description: Updated important keywords. | |
| available: | |
| type: boolean | |
| description: Availability status of the chunk. | |
| - in: header | |
| name: Authorization | |
| type: string | |
| required: true | |
| description: Bearer token for authentication. | |
| responses: | |
| 200: | |
| description: Chunk updated successfully. | |
| schema: | |
| type: object | |
| """ | |
| chunk = docStoreConn.get(chunk_id, search.index_name(tenant_id), [dataset_id]) | |
| if chunk is None: | |
| return get_error_data_result(f"Can't find this chunk {chunk_id}") | |
| if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id): | |
| return get_error_data_result(message=f"You don't own the dataset {dataset_id}.") | |
| doc = DocumentService.query(id=document_id, kb_id=dataset_id) | |
| if not doc: | |
| return get_error_data_result( | |
| message=f"You don't own the document {document_id}." | |
| ) | |
| doc = doc[0] | |
| req = request.json | |
| if "content" in req: | |
| content = req["content"] | |
| else: | |
| content = chunk.get("content_with_weight", "") | |
| d = {"id": chunk_id, "content_with_weight": content} | |
| d["content_ltks"] = rag_tokenizer.tokenize(d["content_with_weight"]) | |
| d["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(d["content_ltks"]) | |
| if "important_keywords" in req: | |
| if not isinstance(req["important_keywords"], list): | |
| return get_error_data_result("`important_keywords` should be a list") | |
| d["important_kwd"] = req.get("important_keywords") | |
| d["important_tks"] = rag_tokenizer.tokenize(" ".join(req["important_keywords"])) | |
| if "available" in req: | |
| d["available_int"] = int(req["available"]) | |
| embd_id = DocumentService.get_embd_id(document_id) | |
| embd_mdl = TenantLLMService.model_instance( | |
| tenant_id, LLMType.EMBEDDING.value, embd_id | |
| ) | |
| if doc.parser_id == ParserType.QA: | |
| arr = [t for t in re.split(r"[\n\t]", d["content_with_weight"]) if len(t) > 1] | |
| if len(arr) != 2: | |
| return get_error_data_result( | |
| message="Q&A must be separated by TAB/ENTER key." | |
| ) | |
| q, a = rmPrefix(arr[0]), rmPrefix(arr[1]) | |
| d = beAdoc( | |
| d, arr[0], arr[1], not any([rag_tokenizer.is_chinese(t) for t in q + a]) | |
| ) | |
| v, c = embd_mdl.encode([doc.name, d["content_with_weight"]]) | |
| v = 0.1 * v[0] + 0.9 * v[1] if doc.parser_id != ParserType.QA else v[1] | |
| d["q_%d_vec" % len(v)] = v.tolist() | |
| docStoreConn.update({"id": chunk_id}, d, search.index_name(tenant_id), dataset_id) | |
| return get_result() | |
| def retrieval_test(tenant_id): | |
| """ | |
| Retrieve chunks based on a query. | |
| --- | |
| tags: | |
| - Retrieval | |
| security: | |
| - ApiKeyAuth: [] | |
| parameters: | |
| - in: body | |
| name: body | |
| description: Retrieval parameters. | |
| required: true | |
| schema: | |
| type: object | |
| properties: | |
| dataset_ids: | |
| type: array | |
| items: | |
| type: string | |
| required: true | |
| description: List of dataset IDs to search in. | |
| question: | |
| type: string | |
| required: true | |
| description: Query string. | |
| document_ids: | |
| type: array | |
| items: | |
| type: string | |
| description: List of document IDs to filter. | |
| similarity_threshold: | |
| type: number | |
| format: float | |
| description: Similarity threshold. | |
| vector_similarity_weight: | |
| type: number | |
| format: float | |
| description: Vector similarity weight. | |
| top_k: | |
| type: integer | |
| description: Maximum number of chunks to return. | |
| highlight: | |
| type: boolean | |
| description: Whether to highlight matched content. | |
| - in: header | |
| name: Authorization | |
| type: string | |
| required: true | |
| description: Bearer token for authentication. | |
| responses: | |
| 200: | |
| description: Retrieval results. | |
| schema: | |
| type: object | |
| properties: | |
| chunks: | |
| type: array | |
| items: | |
| type: object | |
| properties: | |
| id: | |
| type: string | |
| description: Chunk ID. | |
| content: | |
| type: string | |
| description: Chunk content. | |
| document_id: | |
| type: string | |
| description: ID of the document. | |
| dataset_id: | |
| type: string | |
| description: ID of the dataset. | |
| similarity: | |
| type: number | |
| format: float | |
| description: Similarity score. | |
| """ | |
| req = request.json | |
| if not req.get("dataset_ids"): | |
| return get_error_data_result("`dataset_ids` is required.") | |
| kb_ids = req["dataset_ids"] | |
| if not isinstance(kb_ids, list): | |
| return get_error_data_result("`dataset_ids` should be a list") | |
| kbs = KnowledgebaseService.get_by_ids(kb_ids) | |
| for id in kb_ids: | |
| if not KnowledgebaseService.accessible(kb_id=id, user_id=tenant_id): | |
| return get_error_data_result(f"You don't own the dataset {id}.") | |
| embd_nms = list(set([kb.embd_id for kb in kbs])) | |
| if len(embd_nms) != 1: | |
| return get_result( | |
| message='Datasets use different embedding models."', | |
| code=RetCode.AUTHENTICATION_ERROR, | |
| ) | |
| if "question" not in req: | |
| return get_error_data_result("`question` is required.") | |
| page = int(req.get("page", 1)) | |
| size = int(req.get("page_size", 30)) | |
| question = req["question"] | |
| doc_ids = req.get("document_ids", []) | |
| if not isinstance(doc_ids, list): | |
| return get_error_data_result("`documents` should be a list") | |
| doc_ids_list = KnowledgebaseService.list_documents_by_ids(kb_ids) | |
| for doc_id in doc_ids: | |
| if doc_id not in doc_ids_list: | |
| return get_error_data_result( | |
| f"The datasets don't own the document {doc_id}" | |
| ) | |
| similarity_threshold = float(req.get("similarity_threshold", 0.2)) | |
| vector_similarity_weight = float(req.get("vector_similarity_weight", 0.3)) | |
| top = int(req.get("top_k", 1024)) | |
| if req.get("highlight") == "False" or req.get("highlight") == "false": | |
| highlight = False | |
| else: | |
| highlight = True | |
| try: | |
| e, kb = KnowledgebaseService.get_by_id(kb_ids[0]) | |
| if not e: | |
| return get_error_data_result(message="Dataset not found!") | |
| embd_mdl = TenantLLMService.model_instance( | |
| kb.tenant_id, LLMType.EMBEDDING.value, llm_name=kb.embd_id | |
| ) | |
| rerank_mdl = None | |
| if req.get("rerank_id"): | |
| rerank_mdl = TenantLLMService.model_instance( | |
| kb.tenant_id, LLMType.RERANK.value, llm_name=req["rerank_id"] | |
| ) | |
| if req.get("keyword", False): | |
| chat_mdl = TenantLLMService.model_instance(kb.tenant_id, LLMType.CHAT) | |
| question += keyword_extraction(chat_mdl, question) | |
| retr = retrievaler if kb.parser_id != ParserType.KG else kg_retrievaler | |
| ranks = retr.retrieval( | |
| question, | |
| embd_mdl, | |
| kb.tenant_id, | |
| kb_ids, | |
| page, | |
| size, | |
| similarity_threshold, | |
| vector_similarity_weight, | |
| top, | |
| doc_ids, | |
| rerank_mdl=rerank_mdl, | |
| highlight=highlight, | |
| ) | |
| for c in ranks["chunks"]: | |
| if "vector" in c: | |
| del c["vector"] | |
| ##rename keys | |
| renamed_chunks = [] | |
| for chunk in ranks["chunks"]: | |
| key_mapping = { | |
| "chunk_id": "id", | |
| "content_with_weight": "content", | |
| "doc_id": "document_id", | |
| "important_kwd": "important_keywords", | |
| "docnm_kwd": "document_keyword", | |
| } | |
| rename_chunk = {} | |
| for key, value in chunk.items(): | |
| new_key = key_mapping.get(key, key) | |
| rename_chunk[new_key] = value | |
| renamed_chunks.append(rename_chunk) | |
| ranks["chunks"] = renamed_chunks | |
| return get_result(data=ranks) | |
| except Exception as e: | |
| if str(e).find("not_found") > 0: | |
| return get_result( | |
| message="No chunk found! Check the chunk status please!", | |
| code=RetCode.DATA_ERROR, | |
| ) | |
| return server_error_response(e) |