| from ragflow_sdk import RAGFlow,Agent |
| from common import HOST_ADDRESS |
| import pytest |
|
|
|
|
| def test_create_session_with_success(get_api_key_fixture): |
| API_KEY = get_api_key_fixture |
| rag = RAGFlow(API_KEY, HOST_ADDRESS) |
| kb = rag.create_dataset(name="test_create_session") |
| displayed_name = "ragflow.txt" |
| with open("test_data/ragflow.txt", "rb") as file: |
| blob = file.read() |
| document = {"displayed_name":displayed_name,"blob":blob} |
| documents = [] |
| documents.append(document) |
| docs= kb.upload_documents(documents) |
| for doc in docs: |
| doc.add_chunk("This is a test to add chunk") |
| assistant=rag.create_chat("test_create_session", dataset_ids=[kb.id]) |
| assistant.create_session() |
|
|
|
|
| def test_create_conversation_with_success(get_api_key_fixture): |
| API_KEY = get_api_key_fixture |
| rag = RAGFlow(API_KEY, HOST_ADDRESS) |
| kb = rag.create_dataset(name="test_create_conversation") |
| displayed_name = "ragflow.txt" |
| with open("test_data/ragflow.txt", "rb") as file: |
| blob = file.read() |
| document = {"displayed_name": displayed_name, "blob": blob} |
| documents = [] |
| documents.append(document) |
| docs = kb.upload_documents(documents) |
| for doc in docs: |
| doc.add_chunk("This is a test to add chunk") |
| assistant = rag.create_chat("test_create_conversation", dataset_ids=[kb.id]) |
| session = assistant.create_session() |
| question = "What is AI" |
| for ans in session.ask(question): |
| pass |
| |
| |
|
|
|
|
| def test_delete_sessions_with_success(get_api_key_fixture): |
| API_KEY = get_api_key_fixture |
| rag = RAGFlow(API_KEY, HOST_ADDRESS) |
| kb = rag.create_dataset(name="test_delete_session") |
| displayed_name = "ragflow.txt" |
| with open("test_data/ragflow.txt", "rb") as file: |
| blob = file.read() |
| document = {"displayed_name":displayed_name,"blob":blob} |
| documents = [] |
| documents.append(document) |
| docs= kb.upload_documents(documents) |
| for doc in docs: |
| doc.add_chunk("This is a test to add chunk") |
| assistant=rag.create_chat("test_delete_session", dataset_ids=[kb.id]) |
| session = assistant.create_session() |
| assistant.delete_sessions(ids=[session.id]) |
|
|
|
|
| def test_update_session_with_name(get_api_key_fixture): |
| API_KEY = get_api_key_fixture |
| rag = RAGFlow(API_KEY, HOST_ADDRESS) |
| kb = rag.create_dataset(name="test_update_session") |
| displayed_name = "ragflow.txt" |
| with open("test_data/ragflow.txt", "rb") as file: |
| blob = file.read() |
| document = {"displayed_name": displayed_name, "blob": blob} |
| documents = [] |
| documents.append(document) |
| docs = kb.upload_documents(documents) |
| for doc in docs: |
| doc.add_chunk("This is a test to add chunk") |
| assistant = rag.create_chat("test_update_session", dataset_ids=[kb.id]) |
| session = assistant.create_session(name="old session") |
| session.update({"name": "new session"}) |
|
|
|
|
| def test_list_sessions_with_success(get_api_key_fixture): |
| API_KEY = get_api_key_fixture |
| rag = RAGFlow(API_KEY, HOST_ADDRESS) |
| kb = rag.create_dataset(name="test_list_session") |
| displayed_name = "ragflow.txt" |
| with open("test_data/ragflow.txt", "rb") as file: |
| blob = file.read() |
| document = {"displayed_name":displayed_name,"blob":blob} |
| documents = [] |
| documents.append(document) |
| docs= kb.upload_documents(documents) |
| for doc in docs: |
| doc.add_chunk("This is a test to add chunk") |
| assistant=rag.create_chat("test_list_session", dataset_ids=[kb.id]) |
| assistant.create_session("test_1") |
| assistant.create_session("test_2") |
| assistant.list_sessions() |
|
|
| @pytest.mark.skip(reason="") |
| def test_create_agent_session_with_success(get_api_key_fixture): |
| API_KEY = "ragflow-BkOGNhYjIyN2JiODExZWY5MzVhMDI0Mm" |
| rag = RAGFlow(API_KEY,HOST_ADDRESS) |
| Agent.create_session("2e45b5209c1011efa3e90242ac120006", rag) |
|
|
| @pytest.mark.skip(reason="") |
| def test_create_agent_conversation_with_success(get_api_key_fixture): |
| API_KEY = "ragflow-BkOGNhYjIyN2JiODExZWY5MzVhMDI0Mm" |
| rag = RAGFlow(API_KEY,HOST_ADDRESS) |
| session = Agent.create_session("2e45b5209c1011efa3e90242ac120006", rag) |
| session.ask("What is this job") |
|
|
| @pytest.mark.skip(reason="") |
| def test_list_agent_sessions_with_success(get_api_key_fixture): |
| API_KEY = "ragflow-BkOGNhYjIyN2JiODExZWY5MzVhMDI0Mm" |
| agent_id = "2710f2269b4611ef8fdf0242ac120006" |
| rag = RAGFlow(API_KEY,HOST_ADDRESS) |
| Agent.list_sessions(agent_id,rag) |
|
|