Spaces:
Sleeping
Sleeping
| """ | |
| Robustness tests for Auto Tagging RAG System | |
| Tests error handling, edge cases, and data integrity | |
| """ | |
| import pytest | |
| from pathlib import Path | |
| import sys | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| class TestErrorHandling: | |
| """Test error handling and recovery""" | |
| def test_empty_query_handling(self, populated_rag_manager): | |
| """Test handling of empty queries""" | |
| query = "" | |
| # Should not crash, may return empty results | |
| result = populated_rag_manager.base_rag.retrieve(query, k=3) | |
| assert result is not None | |
| def test_invalid_k_value(self, populated_rag_manager): | |
| """Test handling of invalid k values""" | |
| query = "test query" | |
| # Test with k=0 | |
| result = populated_rag_manager.base_rag.retrieve(query, k=0) | |
| assert result is not None | |
| # Test with negative k | |
| result = populated_rag_manager.base_rag.retrieve(query, k=-1) | |
| assert result is not None | |
| def test_missing_tags_in_tag_filter(self, populated_rag_manager): | |
| """Test tag filter with tags that don't exist""" | |
| query = "test query" | |
| result = populated_rag_manager.tag_filter_rag.retrieve( | |
| query, k=3, tags=["nonexistent-tag-xyz"], tag_operator="OR" | |
| ) | |
| assert result is not None | |
| # May return empty results, but shouldn't crash | |
| def test_invalid_tag_operator(self, populated_rag_manager): | |
| """Test handling of invalid tag operator""" | |
| query = "test query" | |
| # Should default to OR or handle gracefully | |
| result = populated_rag_manager.tag_filter_rag.retrieve( | |
| query, k=3, tags=["emergency"], tag_operator="INVALID" | |
| ) | |
| assert result is not None | |
| def test_empty_document_handling(self, rag_manager): | |
| """Test handling of empty documents""" | |
| from core.ingest import FlatTagChunker | |
| chunker = FlatTagChunker() | |
| # Empty document | |
| chunks = chunker.chunk_document("", language="en", user_tags=None) | |
| # Should handle gracefully (may return empty list) | |
| assert chunks is not None | |
| assert isinstance(chunks, list) | |
| class TestEdgeCases: | |
| """Test edge cases and boundary conditions""" | |
| def test_very_short_document(self, rag_manager): | |
| """Test processing of very short documents""" | |
| from core.ingest import FlatTagChunker | |
| chunker = FlatTagChunker() | |
| chunks = chunker.chunk_document("Emergency!", language="en", user_tags=None) | |
| assert chunks is not None | |
| assert isinstance(chunks, list) | |
| def test_special_characters_in_document(self, rag_manager): | |
| """Test handling of special characters""" | |
| from core.ingest import FlatTagChunker | |
| text = "Emergency! 🚨 Fire safety (protocol #1) requires: 1) Alert 2) Evacuate" | |
| chunker = FlatTagChunker() | |
| chunks = chunker.chunk_document(text, language="en", user_tags=None) | |
| assert chunks is not None | |
| assert len(chunks) > 0 | |
| def test_large_k_value(self, populated_rag_manager): | |
| """Test retrieval with large k value""" | |
| query = "test query" | |
| result = populated_rag_manager.base_rag.retrieve(query, k=100) | |
| assert result is not None | |
| # Should not crash, may return fewer results than requested | |
| def test_many_tags(self, populated_rag_manager): | |
| """Test tag filtering with many tags""" | |
| query = "test query" | |
| many_tags = [f"tag_{i}" for i in range(50)] | |
| result = populated_rag_manager.tag_filter_rag.retrieve( | |
| query, k=3, tags=many_tags, tag_operator="OR" | |
| ) | |
| assert result is not None | |
| class TestDataIntegrity: | |
| """Test data integrity and consistency""" | |
| def test_document_count_accuracy(self, rag_manager, sample_documents): | |
| """Test document count reflects unique documents""" | |
| from core.ingest import FlatTagChunker | |
| # Index multiple chunks from same document | |
| all_chunks = [] | |
| doc_data = sample_documents["emergency"] | |
| chunker = FlatTagChunker() | |
| # Create chunks multiple times to simulate chunking | |
| for _ in range(2): | |
| chunks = chunker.chunk_document( | |
| doc_data["content"], | |
| language=doc_data["language"], | |
| user_tags=None | |
| ) | |
| all_chunks.extend(chunks) | |
| if all_chunks: | |
| rag_manager.vector_store.add_documents("documents", all_chunks) | |
| stats = rag_manager.vector_store.get_collection_stats("documents") | |
| # Should count unique documents, not chunks | |
| assert stats["document_count"] > 0 | |
| def test_tag_consistency(self, rag_manager): | |
| """Test tag generation consistency""" | |
| from core.tag_generator import TagGenerator | |
| text = "Emergency procedures for fire safety and evacuation protocols." | |
| generator = TagGenerator() | |
| tags1 = generator.generate_tags(text, method="yake", language="en", max_tags=5) | |
| tags2 = generator.generate_tags(text, method="yake", language="en", max_tags=5) | |
| # Tags should be similar (may vary slightly due to randomness) | |
| assert len(tags1) > 0 | |
| assert len(tags2) > 0 | |
| def test_session_isolation(self, session_manager): | |
| """Test session isolation""" | |
| # Create two sessions | |
| session1 = session_manager.create_session(user_id="user1") | |
| session2 = session_manager.create_session(user_id="user2") | |
| assert session1.session_id != session2.session_id | |
| assert session1.collection_name != session2.collection_name | |
| class TestPerformance: | |
| """Test performance and resource usage""" | |
| def test_retrieval_latency(self, populated_rag_manager): | |
| """Test retrieval latency is reasonable""" | |
| query = "What are emergency procedures?" | |
| result = populated_rag_manager.base_rag.retrieve(query, k=3) | |
| # Should complete within reasonable time (10 seconds for test) | |
| assert result.latency < 10.0 | |
| assert result.latency > 0 | |
| def test_evaluation_performance(self, evaluator, populated_rag_manager, sample_queries): | |
| """Test evaluation completes in reasonable time""" | |
| import time | |
| start = time.time() | |
| df, summary, results = evaluator.batch_evaluate( | |
| sample_queries[:2], # Use 2 queries | |
| output_file=None, | |
| pipelines=['base_rag'] | |
| ) | |
| elapsed = time.time() - start | |
| # Should complete within 60 seconds for 2 queries | |
| assert elapsed < 60.0 | |
| assert df is not None | |