Spaces:
Sleeping
Sleeping
File size: 3,907 Bytes
9d8a0cf | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 | """
File agent runthrough.
Creates sample files (CSV, TXT, HTML) in a temp directory, processes them
through the full pipeline (detect β parse β chunk β PII mask β embed β Qdrant),
then verifies the stored vectors via Qdrant scroll.
Run:
python -m src.file_agent.test_run
"""
from __future__ import annotations
import hashlib
import logging
import os
import sys
import tempfile
from pathlib import Path
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
logger = logging.getLogger("file_test_run")
_SAMPLE_CSV = """Name,Role,Email,Phone
Alice Smith,Engineer,alice@example.com,555-111-2222
Bob Jones,Manager,bob@example.com,555-333-4444
Carol White,Designer,carol@example.com,555-555-6666
"""
_SAMPLE_TXT = """Engineering Onboarding Guide
Welcome to the engineering team. Your first point of contact is your manager, Alice Smith (alice@example.com).
Repository access:
- Clone the main repo from GitHub
- Set up your SSH key and add it to your profile
- Run `make dev` to start the local environment
The on-call rotation is managed via PagerDuty. Contact the SRE team for access.
"""
_SAMPLE_HTML = """<!DOCTYPE html>
<html>
<head><title>System Architecture</title></head>
<body>
<h1>System Architecture</h1>
<p>The backend runs on Kubernetes. All secrets are managed by Vault. Contact admin@example.com for access.</p>
<h2>Services</h2>
<p>The API gateway handles all external traffic. Internal services communicate via gRPC.</p>
<table>
<tr><th>Service</th><th>Port</th><th>Team</th></tr>
<tr><td>auth-service</td><td>8080</td><td>Platform</td></tr>
<tr><td>data-service</td><td>8081</td><td>Data</td></tr>
</table>
<h2>Deployment</h2>
<p>Deploy via GitHub Actions. Each PR triggers a staging deploy automatically.</p>
</body>
</html>
"""
def _write_samples(tmpdir: str) -> list[str]:
files = []
for name, content in [
("onboarding.csv", _SAMPLE_CSV),
("guide.txt", _SAMPLE_TXT),
("architecture.html", _SAMPLE_HTML),
]:
path = os.path.join(tmpdir, name)
with open(path, "w", encoding="utf-8") as f:
f.write(content)
files.append(path)
return files
def main() -> None:
from ingestion.storage.qdrant_store import ensure_collection_exists
from src.file_agent.pipeline import process_file
from qdrant_client import QdrantClient
from qdrant_client.http import models as qmodels
from ingestion.config import settings
logger.info("=== File agent test ===")
ensure_collection_exists()
with tempfile.TemporaryDirectory() as tmpdir:
files = _write_samples(tmpdir)
total_chunks = 0
for file_path in files:
fname = Path(file_path).name
count = process_file(file_path, team_id="test_team")
logger.info("Processed %s β %d chunks", fname, count)
total_chunks += count
# Verify in Qdrant
doc_id = hashlib.sha256(f"file:{fname}".encode()).hexdigest()
client = QdrantClient(host=settings.qdrant_host, port=settings.qdrant_port)
results, _ = client.scroll(
collection_name=settings.qdrant_collection,
scroll_filter=qmodels.Filter(
must=[qmodels.FieldCondition(key="doc_id", match=qmodels.MatchValue(value=doc_id))]
),
limit=50,
with_payload=True,
with_vectors=False,
)
logger.info(
" Qdrant verification: %d points for %s (doc_id=%s...)",
len(results),
fname,
doc_id[:12],
)
for pt in results[:3]:
logger.info(" point %s | type=%s", str(pt.id)[:8], pt.payload.get("block_type"))
logger.info("=== File agent test DONE β total chunks: %d ===", total_chunks)
if __name__ == "__main__":
main()
|