|
|
|
|
|
""" |
|
|
E2E Tests for Complete User Workflows |
|
|
|
|
|
Comprehensive end-to-end tests that simulate complete user journeys: |
|
|
- Data room setup and processing |
|
|
- Company analysis generation |
|
|
- Checklist matching workflow |
|
|
- Questions processing workflow |
|
|
- Q&A session workflow |
|
|
- Export workflow |
|
|
- Knowledge graph workflow |
|
|
""" |
|
|
|
|
|
import pytest |
|
|
import os |
|
|
from playwright.sync_api import Page, expect |
|
|
from .conftest import StreamlitPageHelpers |
|
|
|
|
|
|
|
|
class TestCompleteWorkflows: |
|
|
"""Test complete user workflows from start to finish""" |
|
|
|
|
|
def test_complete_data_room_to_analysis_workflow(self, page: Page, streamlit_helpers: StreamlitPageHelpers, sample_test_data): |
|
|
"""Test complete workflow: data room setup -> processing -> analysis generation""" |
|
|
streamlit_helpers.wait_for_streamlit_load() |
|
|
|
|
|
|
|
|
sidebar = page.locator("[data-testid='stSidebar']") |
|
|
|
|
|
|
|
|
path_inputs = sidebar.locator("input[placeholder*='path'], input[aria-label*='path'], input[type='text']") |
|
|
|
|
|
if path_inputs.count() > 0 and sample_test_data["vdr_path"].exists(): |
|
|
|
|
|
path_inputs.first.fill(str(sample_test_data["vdr_path"])) |
|
|
|
|
|
|
|
|
process_buttons = sidebar.locator("button:has-text(/.*[Pp]rocess.*|.*[Bb]uild.*|.*[Ll]oad.*/)") |
|
|
|
|
|
if process_buttons.count() > 0: |
|
|
|
|
|
process_buttons.first.click() |
|
|
|
|
|
|
|
|
page.wait_for_timeout(10000) |
|
|
|
|
|
|
|
|
analysis_tab = page.locator("button:has-text('Company Analysis'), text='Company Analysis'").first |
|
|
if analysis_tab.count() > 0: |
|
|
analysis_tab.click() |
|
|
page.wait_for_timeout(2000) |
|
|
|
|
|
|
|
|
generate_buttons = page.locator("button:has-text(/.*[Gg]enerate.*|.*[Aa]nalysis.*/)") |
|
|
|
|
|
if generate_buttons.count() > 0: |
|
|
generate_buttons.first.click() |
|
|
|
|
|
|
|
|
page.wait_for_timeout(5000) |
|
|
|
|
|
|
|
|
analysis_content = page.locator("text=/.*[Aa]nalysis.*|.*[Ee]rror.*|.*API.*key.*/") |
|
|
|
|
|
|
|
|
expect(page.locator("[data-testid='stApp']")).to_be_visible() |
|
|
|
|
|
def test_complete_checklist_workflow(self, page: Page, streamlit_helpers: StreamlitPageHelpers): |
|
|
"""Test complete checklist matching workflow""" |
|
|
streamlit_helpers.wait_for_streamlit_load() |
|
|
|
|
|
|
|
|
checklist_tab = page.locator("button:has-text('Checklist'), text='Checklist'").first |
|
|
|
|
|
if checklist_tab.count() > 0: |
|
|
checklist_tab.click() |
|
|
page.wait_for_timeout(1000) |
|
|
|
|
|
|
|
|
checklist_content = page.locator("text=/.*[Cc]hecklist.*|.*[Dd]ue.*[Dd]iligence.*|.*[Mm]atching.*/") |
|
|
|
|
|
|
|
|
process_buttons = page.locator("button:has-text(/.*[Pp]rocess.*|.*[Aa]nalyze.*|.*[Mm]atch.*/)") |
|
|
|
|
|
if process_buttons.count() > 0: |
|
|
process_buttons.first.click() |
|
|
|
|
|
|
|
|
page.wait_for_timeout(3000) |
|
|
|
|
|
|
|
|
results_indicators = page.locator("text=/.*[Rr]esults.*|.*[Cc]ompleted.*|.*[Ff]ound.*|.*[Pp]rocessing.*/") |
|
|
|
|
|
|
|
|
expect(page.locator("[data-testid='stApp']")).to_be_visible() |
|
|
|
|
|
def test_complete_questions_workflow(self, page: Page, streamlit_helpers: StreamlitPageHelpers): |
|
|
"""Test complete due diligence questions workflow""" |
|
|
streamlit_helpers.wait_for_streamlit_load() |
|
|
|
|
|
|
|
|
questions_tab = page.locator("button:has-text('Questions'), text='Questions'").first |
|
|
|
|
|
if questions_tab.count() > 0: |
|
|
questions_tab.click() |
|
|
page.wait_for_timeout(1000) |
|
|
|
|
|
|
|
|
questions_content = page.locator("text=/.*[Qq]uestions.*|.*[Dd]ue.*[Dd]iligence.*/") |
|
|
|
|
|
|
|
|
process_buttons = page.locator("button:has-text(/.*[Pp]rocess.*|.*[Aa]nalyze.*|.*[Qq]uestions.*/)") |
|
|
|
|
|
if process_buttons.count() > 0: |
|
|
process_buttons.first.click() |
|
|
|
|
|
|
|
|
page.wait_for_timeout(5000) |
|
|
|
|
|
|
|
|
question_results = page.locator("text=/.*[Qq]uestion.*|.*[Aa]nswer.*|.*[Pp]rocessing.*/") |
|
|
|
|
|
|
|
|
expect(page.locator("[data-testid='stApp']")).to_be_visible() |
|
|
|
|
|
def test_complete_qa_session_workflow(self, page: Page, streamlit_helpers: StreamlitPageHelpers): |
|
|
"""Test complete Q&A session workflow""" |
|
|
streamlit_helpers.wait_for_streamlit_load() |
|
|
|
|
|
|
|
|
qa_tab = page.locator("button:has-text('Q&A'), text='Q&A'").first |
|
|
|
|
|
if qa_tab.count() > 0: |
|
|
qa_tab.click() |
|
|
page.wait_for_timeout(1000) |
|
|
|
|
|
|
|
|
question_inputs = page.locator("input[placeholder*='question'], textarea[placeholder*='question']") |
|
|
|
|
|
if question_inputs.count() > 0: |
|
|
|
|
|
test_question = "What is the company's revenue?" |
|
|
question_inputs.first.fill(test_question) |
|
|
|
|
|
|
|
|
ask_buttons = page.locator("button:has-text(/.*[Aa]sk.*|.*[Ss]ubmit.*|.*[Ss]earch.*/)") |
|
|
|
|
|
if ask_buttons.count() > 0: |
|
|
ask_buttons.first.click() |
|
|
|
|
|
|
|
|
page.wait_for_timeout(5000) |
|
|
|
|
|
|
|
|
response_content = page.locator("text=/.*[Aa]nswer.*|.*[Rr]esponse.*|.*API.*key.*|.*[Ee]rror.*/") |
|
|
|
|
|
|
|
|
expect(page.locator("[data-testid='stApp']")).to_be_visible() |
|
|
|
|
|
def test_complete_export_workflow(self, page: Page, streamlit_helpers: StreamlitPageHelpers): |
|
|
"""Test complete export workflow across multiple tabs""" |
|
|
streamlit_helpers.wait_for_streamlit_load() |
|
|
|
|
|
|
|
|
tabs = page.locator("[data-testid='stTabs'] button, .stTabs button") |
|
|
|
|
|
export_found = False |
|
|
|
|
|
if tabs.count() > 0: |
|
|
for i in range(min(tabs.count(), 5)): |
|
|
tabs.nth(i).click() |
|
|
page.wait_for_timeout(1000) |
|
|
|
|
|
|
|
|
export_buttons = page.locator("button:has-text(/.*[Ee]xport.*|.*[Dd]ownload.*|.*[Ss]ave.*|.*PDF.*/)") |
|
|
download_links = page.locator("a[download], a[href*='download']") |
|
|
|
|
|
if export_buttons.count() > 0: |
|
|
export_buttons.first.click() |
|
|
page.wait_for_timeout(2000) |
|
|
|
|
|
|
|
|
export_success = page.locator("text=/.*[Ee]xported.*|.*[Dd]ownloaded.*|.*[Ss]aved.*/") |
|
|
|
|
|
export_found = True |
|
|
break |
|
|
|
|
|
elif download_links.count() > 0: |
|
|
|
|
|
expect(download_links.first).to_be_visible() |
|
|
export_found = True |
|
|
break |
|
|
|
|
|
|
|
|
|
|
|
expect(page.locator("[data-testid='stApp']")).to_be_visible() |
|
|
|
|
|
def test_complete_knowledge_graph_workflow(self, page: Page, streamlit_helpers: StreamlitPageHelpers): |
|
|
"""Test complete knowledge graph workflow""" |
|
|
streamlit_helpers.wait_for_streamlit_load() |
|
|
|
|
|
|
|
|
graph_tab = page.locator("button:has-text('Graph'), text='Graph'").first |
|
|
|
|
|
if graph_tab.count() > 0: |
|
|
graph_tab.click() |
|
|
page.wait_for_timeout(1000) |
|
|
|
|
|
|
|
|
graph_content = page.locator("text=/.*[Gg]raph.*|.*[Kk]nowledge.*|.*[Ee]ntities.*|.*[Rr]elationships.*/") |
|
|
|
|
|
|
|
|
graph_buttons = page.locator("button:has-text(/.*[Gg]enerate.*|.*[Bb]uild.*|.*[Ss]how.*/)") |
|
|
|
|
|
if graph_buttons.count() > 0: |
|
|
graph_buttons.first.click() |
|
|
|
|
|
|
|
|
page.wait_for_timeout(5000) |
|
|
|
|
|
|
|
|
graph_viz = page.locator("canvas, svg, .plotly, [data-testid='stPlotlyChart']") |
|
|
|
|
|
|
|
|
expect(page.locator("[data-testid='stApp']")).to_be_visible() |
|
|
|
|
|
@pytest.mark.slow |
|
|
def test_complete_end_to_end_workflow(self, page_slow: Page, streamlit_helpers: StreamlitPageHelpers, sample_test_data): |
|
|
"""Test complete end-to-end workflow covering all major features""" |
|
|
page = page_slow |
|
|
streamlit_helpers.wait_for_streamlit_load() |
|
|
|
|
|
|
|
|
workflow_steps = [ |
|
|
"Data Room Setup", |
|
|
"Company Analysis", |
|
|
"Checklist Processing", |
|
|
"Questions Analysis", |
|
|
"Q&A Session", |
|
|
"Export Results" |
|
|
] |
|
|
|
|
|
|
|
|
sidebar = page.locator("[data-testid='stSidebar']") |
|
|
path_inputs = sidebar.locator("input[type='text']") |
|
|
|
|
|
if path_inputs.count() > 0 and sample_test_data["vdr_path"].exists(): |
|
|
path_inputs.first.fill(str(sample_test_data["vdr_path"])) |
|
|
|
|
|
process_buttons = sidebar.locator("button:has-text(/.*[Pp]rocess.*/)") |
|
|
if process_buttons.count() > 0: |
|
|
process_buttons.first.click() |
|
|
page.wait_for_timeout(5000) |
|
|
|
|
|
|
|
|
main_tabs = page.locator("[data-testid='stTabs'] button, .stTabs button") |
|
|
|
|
|
if main_tabs.count() > 0: |
|
|
for i in range(min(main_tabs.count(), 5)): |
|
|
main_tabs.nth(i).click() |
|
|
page.wait_for_timeout(2000) |
|
|
|
|
|
|
|
|
action_buttons = page.locator("button:has-text(/.*[Gg]enerate.*|.*[Pp]rocess.*|.*[Aa]nalyze.*/)") |
|
|
|
|
|
if action_buttons.count() > 0: |
|
|
|
|
|
action_buttons.first.click() |
|
|
page.wait_for_timeout(3000) |
|
|
|
|
|
|
|
|
expect(page.locator("[data-testid='stApp']")).to_be_visible() |
|
|
|
|
|
|
|
|
expect(page.locator("[data-testid='stApp']")).to_be_visible() |
|
|
expect(page.locator("[data-testid='stSidebar']")).to_be_visible() |
|
|
|
|
|
def test_error_recovery_across_workflows(self, page: Page, streamlit_helpers: StreamlitPageHelpers): |
|
|
"""Test that errors in one workflow don't break others""" |
|
|
streamlit_helpers.wait_for_streamlit_load() |
|
|
|
|
|
|
|
|
error_scenarios = [ |
|
|
|
|
|
lambda: self._trigger_invalid_path_error(page), |
|
|
|
|
|
lambda: self._trigger_ai_error(page), |
|
|
|
|
|
lambda: self._trigger_file_error(page) |
|
|
] |
|
|
|
|
|
for i, scenario in enumerate(error_scenarios): |
|
|
try: |
|
|
scenario() |
|
|
page.wait_for_timeout(3000) |
|
|
|
|
|
|
|
|
expect(page.locator("[data-testid='stApp']")).to_be_visible() |
|
|
|
|
|
|
|
|
tabs = page.locator("[data-testid='stTabs'] button, .stTabs button") |
|
|
if tabs.count() > i: |
|
|
tabs.nth(i).click() |
|
|
page.wait_for_timeout(1000) |
|
|
expect(page.locator("[data-testid='stApp']")).to_be_visible() |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
expect(page.locator("[data-testid='stApp']")).to_be_visible() |
|
|
|
|
|
def _trigger_invalid_path_error(self, page: Page): |
|
|
"""Helper to trigger invalid path error""" |
|
|
path_inputs = page.locator("input[type='text']") |
|
|
if path_inputs.count() > 0: |
|
|
path_inputs.first.fill("/invalid/nonexistent/path") |
|
|
|
|
|
process_buttons = page.locator("button:has-text(/.*[Pp]rocess.*/)") |
|
|
if process_buttons.count() > 0: |
|
|
process_buttons.first.click() |
|
|
|
|
|
def _trigger_ai_error(self, page: Page): |
|
|
"""Helper to trigger AI operation error""" |
|
|
ai_buttons = page.locator("button:has-text(/.*[Gg]enerate.*|.*[Aa]nalyze.*/)") |
|
|
if ai_buttons.count() > 0: |
|
|
ai_buttons.first.click() |
|
|
|
|
|
def _trigger_file_error(self, page: Page): |
|
|
"""Helper to trigger file operation error""" |
|
|
file_inputs = page.locator("input[type='file']") |
|
|
if file_inputs.count() > 0: |
|
|
|
|
|
try: |
|
|
file_inputs.first.set_input_files("nonexistent_file.pdf") |
|
|
except: |
|
|
pass |
|
|
|
|
|
def test_session_persistence_across_workflows(self, page: Page, streamlit_helpers: StreamlitPageHelpers): |
|
|
"""Test that session state persists correctly across different workflows""" |
|
|
streamlit_helpers.wait_for_streamlit_load() |
|
|
|
|
|
|
|
|
tabs = page.locator("[data-testid='stTabs'] button, .stTabs button") |
|
|
|
|
|
if tabs.count() > 1: |
|
|
|
|
|
tabs.nth(0).click() |
|
|
page.wait_for_timeout(1000) |
|
|
|
|
|
text_inputs = page.locator("input[type='text'], textarea") |
|
|
if text_inputs.count() > 0: |
|
|
test_value = "Session persistence test" |
|
|
text_inputs.first.fill(test_value) |
|
|
|
|
|
|
|
|
for i in range(1, min(tabs.count(), 4)): |
|
|
tabs.nth(i).click() |
|
|
page.wait_for_timeout(1000) |
|
|
|
|
|
|
|
|
buttons = page.locator("button") |
|
|
if buttons.count() > 0: |
|
|
try: |
|
|
buttons.first.click(timeout=2000) |
|
|
except: |
|
|
pass |
|
|
|
|
|
page.wait_for_timeout(1000) |
|
|
|
|
|
|
|
|
tabs.nth(0).click() |
|
|
page.wait_for_timeout(1000) |
|
|
|
|
|
|
|
|
expect(page.locator("[data-testid='stApp']")).to_be_visible() |
|
|
|