File size: 8,666 Bytes
aacd162
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
"""Integration tests for the full ingestion pipeline: upload β†’ extract β†’ chunk β†’ embed β†’ store."""
import sys
import pathlib
import json
import tempfile
import shutil
import pytest
from pathlib import Path
from unittest.mock import patch, MagicMock

# Ensure `src` is on sys.path
ROOT = pathlib.Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))

import ingestion.storage as storage
import ingestion.extractors as extractors
import ingestion.chunker as chunker
import ingestion.embeddings as embeddings
import ingestion.vectorstore as vectorstore


def test_txt_upload_extract_ingest(tmp_path):
    """Test end-to-end TXT upload β†’ extract β†’ chunk β†’ embed β†’ store."""
    # Create a test text file
    test_file = tmp_path / "test.txt"
    test_content = "Sentence one. Sentence two. Sentence three. " * 20
    test_file.write_text(test_content, encoding="utf-8")

    # Initialize adapter
    storage_dir = tmp_path / "storage"
    adapter = storage.LocalStorageAdapter(base_dir=str(storage_dir))

    # Step 1: Upload
    source_id = "test-source-001"
    user, notebook = "testuser", "test-notebook"
    dest = adapter.save_raw_file(user, notebook, source_id, test_file)
    assert dest.exists()

    # Step 2: Extract
    result = extractors.extract_text_from_txt(test_file)
    assert result["text"] == test_content
    assert result["pages"] == 1

    # Save extracted text
    adapter.save_extracted_text(user, notebook, source_id, "content", result["text"])
    extracted_path = storage_dir / "users" / user / "notebooks" / notebook / "files_extracted" / source_id / "content.txt"
    assert extracted_path.exists()
    assert extracted_path.read_text(encoding="utf-8") == test_content

    # Step 3: Chunk
    class DummyTokenizer:
        def encode(self, s, add_special_tokens=False):
            return [0] * max(1, len(s.split()))

    with patch.object(chunker, "get_tokenizer", lambda model_name=None: DummyTokenizer()):
        chunks = chunker.chunk_text(result["text"], model_name="dummy", chunk_size_tokens=50)
    assert len(chunks) > 1
    assert all("chunk_id" in c and "text" in c for c in chunks)

    # Attach metadata
    for c in chunks:
        c["source_id"] = source_id

    # Step 4: Embed (mock embedding to avoid model download)
    mock_embedder = MagicMock()
    mock_embeddings = [[0.1 * i for _ in range(384)] for i in range(len(chunks))]
    mock_embedder.embed_texts.return_value = mock_embeddings

    # Step 5: Store in Chroma
    chroma_dir = str((storage_dir / user / notebook / "chroma").resolve())
    store = vectorstore.ChromaAdapter(persist_directory=chroma_dir)
    store.upsert_chunks(user, notebook, chunks, mock_embeddings)

    # Verify storage
    collection = store.get_or_create_collection(user, notebook)
    assert collection.count() == len(chunks)


def test_url_extraction_with_fallback(tmp_path):
    """Test URL extraction with mocked response."""
    # Mock response
    mock_html = """
    <html>
        <body>
            <article>
                <p>This is the main content of the article.</p>
                <p>It should be extracted correctly.</p>
            </article>
            <footer>Footer text (should be filtered out)</footer>
        </body>
    </html>
    """

    with patch("ingestion.extractors.socket.getaddrinfo") as mock_getaddrinfo, patch(
        "ingestion.extractors.requests.get"
    ) as mock_get:
        mock_getaddrinfo.return_value = [
            (
                2,
                1,
                6,
                "",
                ("93.184.216.34", 0),
            )
        ]
        mock_response = MagicMock()
        mock_response.status_code = 200
        mock_response.headers = {"Content-Type": "text/html; charset=utf-8"}
        mock_response.iter_content.return_value = [mock_html.encode("utf-8")]
        mock_response.encoding = "utf-8"
        mock_response.apparent_encoding = "utf-8"
        mock_response.raise_for_status = MagicMock()
        mock_response.close = MagicMock()
        mock_get.return_value = mock_response

        result = extractors.extract_text_from_url("https://example.com/article")
        assert "main content" in result["text"].lower() or "article" in result["text"].lower()
        assert "source" in result
        assert result["source"] == "https://example.com/article"


def test_url_extraction_blocks_localhost():
    """Loopback/local hosts should be blocked to reduce SSRF risk."""
    with pytest.raises(extractors.URLValidationError):
        extractors.extract_text_from_url("http://127.0.0.1:8000/health")


def test_pdf_extraction_fallback(tmp_path):
    """Test PDF extraction with empty text (fallback to no OCR path)."""
    # Create a minimal PDF using fitz
    try:
        import fitz
        doc = fitz.open()
        page = doc.new_page()
        page.insert_text((50, 50), "PDF test content")
        pdf_path = tmp_path / "test.pdf"
        doc.save(pdf_path)
        doc.close()
    except ImportError:
        pytest.skip("fitz/pymupdf not available")
        return

    result = extractors.extract_text_from_pdf(pdf_path, use_ocr=False)
    assert "PDF test content" in result["text"]
    assert result["pages"] >= 1
    assert "source" in result


def test_pptx_extraction():
    """Test PPTX extraction with mock data."""
    try:
        from pptx import Presentation
    except ImportError:
        pytest.skip("python-pptx not available")
        return

    import tempfile
    with tempfile.TemporaryDirectory() as tmpdir:
        # Create a minimal PPTX
        prs = Presentation()
        slide = prs.slides.add_slide(prs.slide_layouts[0])
        title = slide.shapes.title
        title.text = "Test Slide"
        
        pptx_path = Path(tmpdir) / "test.pptx"
        prs.save(pptx_path)

        # Extract
        result = extractors.extract_text_from_pptx(pptx_path)
        assert "Test Slide" in result["text"]
        assert result["slides"] >= 1


def test_embedding_adapter_local_provider():
    """Test embedding adapter with local provider."""
    class MockTokenizer:
        def encode(self, s, add_special_tokens=False):
            return [0] * max(1, len(s.split()))

    with patch("ingestion.embeddings.SentenceTransformer") as MockSentenceTransformer:
        mock_model = MagicMock()
        MockSentenceTransformer.return_value = mock_model
        
        # Mock encode to return simple arrays
        import numpy as np
        mock_model.encode.return_value = np.array([[0.1, 0.2], [0.3, 0.4]])

        adapter = embeddings.EmbeddingAdapter(model_name="test-model", provider="local")
        result = adapter.embed_texts(["text1", "text2"])
        
        assert len(result) == 2
        assert isinstance(result[0], list)
        assert len(result[0]) == 2


def test_embedding_adapter_openai_provider_missing_key():
    """Test that OpenAI provider fails gracefully without openai package or API key."""
    # Skip if openai is installed (test only relevant when it's not)
    try:
        import openai
        pytest.skip("openai package is installed; skipping test")
    except ImportError:
        pass
    
    with patch.dict("os.environ", {}, clear=True):
        try:
            adapter = embeddings.EmbeddingAdapter(model_name="text-embedding-3-small", provider="openai")
            assert False, "Should raise ImportError or ValueError"
        except (ImportError, ValueError) as e:
            # Either missing package or missing API key is acceptable
            assert "openai" in str(e).lower() or "api_key" in str(e).lower()


def test_chroma_isolation_by_user_notebook():
    """Test that Chroma collections isolate by user_id and notebook_id."""
    # Use EphemeralClient (in-memory) to avoid persistence/file locking issues on Windows
    store = vectorstore.ChromaAdapter(persist_directory=None)
    
    # Create collections for different users/notebooks
    col1 = store.get_or_create_collection("alice", "nb1")
    col2 = store.get_or_create_collection("alice", "nb2")
    col3 = store.get_or_create_collection("bob", "nb1")
    
    # Verify different names
    assert col1.name == "alice_nb1"
    assert col2.name == "alice_nb2"
    assert col3.name == "bob_nb1"
    
    # Upsert into col1
    chunks = [{"chunk_id": f"c{i}", "text": f"text{i}", "text_preview": "...", "source_id": "s1"} for i in range(2)]
    embeddings = [[0.1 * j for _ in range(10)] for j in range(len(chunks))]
    store.upsert_chunks("alice", "nb1", chunks, embeddings)
    
    # Verify col1 has data, others don't
    assert col1.count() == 2
    assert col2.count() == 0
    assert col3.count() == 0