Spaces:
Sleeping
Sleeping
| from qdrant_client import QdrantClient | |
| from qdrant_client.models import VectorParams, Distance, PointStruct | |
| import numpy as np | |
| from typing import List, Dict, Optional, Tuple, Set | |
| from collections import Counter, defaultdict | |
| from sentence_transformers import SentenceTransformer | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import time | |
| import re | |
| import pprint | |
| import os | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| class MultiCollectionChapterRetrieval: | |
| def __init__(self, use_cloud: bool = True): | |
| """ | |
| Initialize with Qdrant Cloud or local connection | |
| Args: | |
| use_cloud: If True, connects to Qdrant Cloud using environment variables | |
| """ | |
| if use_cloud: | |
| self.client = self._create_cloud_client() | |
| else: | |
| self.client = QdrantClient("http://localhost:6333") | |
| self.encoder = None | |
| # ICD-10 Chapter mapping (all 22 chapters) | |
| self.chapter_info = { | |
| "chapter_1_I": "Certain infectious and parasitic diseases", | |
| "chapter_2_II": "Neoplasms", | |
| "chapter_3_III": "Diseases of the blood and blood-forming organs and certain disorders involving the immune mechanism", | |
| "chapter_4_IV": "Endocrine, nutritional and metabolic diseases", | |
| "chapter_5_V": "Mental and behavioural disorders", | |
| "chapter_6_VI": "Diseases of the nervous system", | |
| "chapter_7_VII": "Diseases of the eye and adnexa", | |
| "chapter_8_VIII": "Diseases of the ear and mastoid process", | |
| "chapter_9_IX": "Diseases of the circulatory system", | |
| "chapter_10_X": "Diseases of the respiratory system", | |
| "chapter_11_XI": "Diseases of the digestive system", | |
| "chapter_12_XII": "Diseases of the skin and subcutaneous tissue", | |
| "chapter_13_XIII": "Diseases of the musculoskeletal system and connective tissue", | |
| "chapter_14_XIV": "Diseases of the genitourinary system", | |
| "chapter_15_XV": "Pregnancy, childbirth and the puerperium", | |
| "chapter_16_XVI": "Certain conditions originating in the perinatal period", | |
| "chapter_17_XVII": "Congenital malformations, deformations and chromosomal abnormalities", | |
| "chapter_18_XVIII": "Symptoms, signs and abnormal clinical and laboratory findings, not elsewhere classified", | |
| "chapter_19_XIX": "Injury, poisoning and certain other consequences of external causes", | |
| "chapter_20_XX": "External causes of morbidity and mortality", | |
| "chapter_21_XXI": "Factors influencing health status and contact with health services", | |
| "chapter_22_XXII": "Codes for special purposes" | |
| } | |
| # Cache for collection names | |
| self._chapter_collections = None | |
| def _create_cloud_client(self) -> QdrantClient: | |
| """Create Qdrant Cloud client with authentication""" | |
| qdrant_url = os.getenv('QDRANT_URL') | |
| qdrant_api_key = os.getenv('QDRANT_API_KEY') | |
| if not qdrant_url or not qdrant_api_key: | |
| raise ValueError( | |
| "Qdrant Cloud credentials not found in environment variables.\n" | |
| "Please set QDRANT_URL and QDRANT_API_KEY in your .env file:\n" | |
| "QDRANT_URL=https://your-cluster-id.region.aws.cloud.qdrant.io:6333\n" | |
| "QDRANT_API_KEY=your-api-key-here" | |
| ) | |
| print(f"π Connecting to Qdrant Cloud: {qdrant_url}") | |
| try: | |
| client = QdrantClient( | |
| url=qdrant_url, | |
| api_key=qdrant_api_key, | |
| timeout=60, # Increased timeout for cloud | |
| # Optional: Add additional cloud-specific settings | |
| prefer_grpc=True, # Use gRPC for better performance | |
| ) | |
| # Test connection | |
| collections = client.get_collections() | |
| print(f"β Connected successfully! Found {len(collections.collections)} collections") | |
| return client | |
| except Exception as e: | |
| print(f"β Failed to connect to Qdrant Cloud: {e}") | |
| print("Please check your QDRANT_URL and QDRANT_API_KEY in the .env file") | |
| raise | |
| def split_into_sentences(self, text: str) -> List[str]: | |
| """Split text into sentences using simple rules""" | |
| import re | |
| # Simple sentence splitting - you can enhance this with nltk or spacy if needed | |
| sentences = re.split(r'[.!?]+', text) | |
| sentences = [s.strip() for s in sentences if s.strip()] | |
| return sentences | |
| def load_encoder(self, model_name: str = "all-MiniLM-L6-v2"): | |
| """Load the sentence transformer model""" | |
| if self.encoder is None: | |
| print(f"π₯ Loading encoder: {model_name}") | |
| self.encoder = SentenceTransformer(model_name) | |
| print(f"β Encoder loaded successfully") | |
| def encode_query(self, query: str) -> List[float]: | |
| """Encode diagnostic string to vector""" | |
| if self.encoder is None: | |
| self.load_encoder() | |
| return self.encoder.encode([query])[0].tolist() | |
| def get_chapter_collections(self) -> Dict[str, str]: | |
| """ | |
| Get mapping of chapter_id -> collection_name | |
| Discovers collections automatically based on naming patterns | |
| """ | |
| if self._chapter_collections is not None: | |
| return self._chapter_collections | |
| try: | |
| collections = self.client.get_collections() | |
| chapter_collections = {} | |
| print("π Discovering chapter collections...") | |
| for collection in collections.collections: | |
| collection_name = collection.name | |
| # Try to match collection names to chapters | |
| chapter_match = None | |
| # Pattern 1: icd10_chapter_X_Y or chapter_X_Y | |
| pattern1 = re.search(r'chapter[_-]?(\d+)[_-]?([IVX]+)', collection_name, re.IGNORECASE) | |
| if pattern1: | |
| chapter_num = pattern1.group(1) | |
| roman = pattern1.group(2) | |
| chapter_match = f"chapter_{chapter_num}_{roman}" | |
| # Pattern 2: Single collection with all chapters (e.g., icd10_codes_all_chapters) | |
| elif 'all' in collection_name.lower() and ('chapter' in collection_name.lower() or 'icd' in collection_name.lower()): | |
| print(f" π Found unified collection: {collection_name}") | |
| # For unified collections, we'll handle this differently | |
| chapter_collections['unified_collection'] = collection_name | |
| continue | |
| # Pattern 3: Just the chapter part (chapter1, chapterI, etc.) | |
| elif 'chapter' in collection_name.lower(): | |
| numbers = re.findall(r'\d+', collection_name) | |
| romans = re.findall(r'[IVX]+', collection_name) | |
| if numbers and romans: | |
| chapter_match = f"chapter_{numbers[0]}_{romans[0]}" | |
| elif numbers: | |
| # Try to convert number to roman numeral | |
| num = int(numbers[0]) | |
| roman_map = {1: 'I', 2: 'II', 3: 'III', 4: 'IV', 5: 'V', 6: 'VI', 7: 'VII', | |
| 8: 'VIII', 9: 'IX', 10: 'X', 11: 'XI', 12: 'XII', 13: 'XIII', | |
| 14: 'XIV', 15: 'XV', 16: 'XVI', 17: 'XVII', 18: 'XVIII', 19: 'XIX', | |
| 20: 'XX', 21: 'XXI', 22: 'XXII'} | |
| if num in roman_map: | |
| chapter_match = f"chapter_{num}_{roman_map[num]}" | |
| if chapter_match: | |
| chapter_collections[chapter_match] = collection_name | |
| print(f" β {chapter_match} -> {collection_name}") | |
| print(f"π Found {len(chapter_collections)} chapter collections") | |
| # If we only found a unified collection, we'll need to handle searches differently | |
| if len(chapter_collections) == 1 and 'unified_collection' in chapter_collections: | |
| print("β οΈ Only unified collection found. Searches will use chapter filtering.") | |
| self._chapter_collections = chapter_collections | |
| return chapter_collections | |
| except Exception as e: | |
| print(f"β Error discovering collections: {e}") | |
| return {} | |
| def search_single_collection( | |
| self, | |
| collection_name: str, | |
| query_vector: List[float], | |
| limit: int = 20, | |
| score_threshold: float = 0.3, | |
| chapter_filter: Optional[str] = None | |
| ) -> List[Dict]: | |
| """Search a single collection and return formatted results""" | |
| try: | |
| # Build search parameters | |
| search_params = { | |
| "collection_name": collection_name, | |
| "query_vector": query_vector, | |
| "limit": limit, | |
| "score_threshold": score_threshold | |
| } | |
| results = self.client.search(**search_params) | |
| formatted_results = [] | |
| for result in results: | |
| formatted_results.append({ | |
| 'collection': collection_name, | |
| 'score': result.score, | |
| 'id': result.id, | |
| 'payload': result.payload | |
| }) | |
| return formatted_results | |
| except Exception as e: | |
| print(f"β Error searching {collection_name}: {e}") | |
| if "timeout" in str(e).lower(): | |
| print(" This might be due to network issues. Retrying with lower limit...") | |
| try: | |
| # Retry with reduced parameters | |
| search_params["limit"] = min(limit, 10) | |
| search_params["score_threshold"] = max(score_threshold, 0.5) | |
| results = self.client.search(**search_params) | |
| formatted_results = [] | |
| for result in results: | |
| formatted_results.append({ | |
| 'collection': collection_name, | |
| 'score': result.score, | |
| 'id': result.id, | |
| 'payload': result.payload | |
| }) | |
| return formatted_results | |
| except: | |
| pass | |
| return [] | |
| def analyze_chapters_parallel( | |
| self, | |
| diagnostic_string: str, | |
| sample_size_per_chapter: int = 15, | |
| score_threshold: float = 0.3, | |
| max_workers: int = 4 # Reduced for cloud stability | |
| ) -> Dict[str, Dict]: | |
| """ | |
| Analyze all chapter collections in parallel to determine relevance | |
| Optimized for cloud performance | |
| """ | |
| query_vector = self.encode_query(diagnostic_string) | |
| chapter_collections = self.get_chapter_collections() | |
| if not chapter_collections: | |
| print("β No chapter collections found!") | |
| return {} | |
| print(f"\nπ Analyzing diagnostic: '{diagnostic_string}'") | |
| # Handle unified collection differently | |
| # if 'unified_collection' in chapter_collections: | |
| # return self._analyze_unified_collection( | |
| # diagnostic_string, query_vector, | |
| # chapter_collections['unified_collection'], | |
| # sample_size_per_chapter, score_threshold | |
| # ) | |
| print(f"π Searching {len(chapter_collections)} collections in parallel...") | |
| chapter_analysis = {} | |
| def search_chapter(chapter_id: str, collection_name: str) -> Tuple[str, List[Dict]]: | |
| """Search function for parallel execution with retry logic""" | |
| max_retries = 2 | |
| for attempt in range(max_retries): | |
| try: | |
| results = self.search_single_collection( | |
| collection_name, query_vector, sample_size_per_chapter, score_threshold | |
| ) | |
| return chapter_id, results | |
| except Exception as e: | |
| if attempt < max_retries - 1: | |
| print(f" β οΈ Retry {attempt + 1} for {chapter_id}: {e}") | |
| time.sleep(1) # Brief delay before retry | |
| else: | |
| print(f" β Failed {chapter_id} after {max_retries} attempts: {e}") | |
| return chapter_id, [] | |
| # Execute searches in parallel | |
| start_time = time.time() | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| # Submit all search tasks | |
| future_to_chapter = { | |
| executor.submit(search_chapter, chapter_id, collection_name): chapter_id | |
| for chapter_id, collection_name in chapter_collections.items() | |
| if chapter_id != 'unified_collection' | |
| } | |
| # Collect results as they complete | |
| for future in as_completed(future_to_chapter): | |
| chapter_id = future_to_chapter[future] | |
| try: | |
| chapter_id, results = future.result(timeout=30) # 30 second timeout per search | |
| if results: | |
| scores = [r['score'] for r in results] | |
| # Calculate chapter statistics | |
| chapter_analysis[chapter_id] = { | |
| 'collection_name': chapter_collections[chapter_id], | |
| 'match_count': len(results), | |
| 'max_score': max(scores), | |
| 'avg_score': np.mean(scores), | |
| 'median_score': np.median(scores), | |
| 'min_score': min(scores), | |
| 'score_std': np.std(scores), | |
| 'top_matches': sorted(results, key=lambda x: x['score'], reverse=True)[:5], | |
| 'all_results': results | |
| } | |
| # Calculate relevance score (weighted combination of metrics) | |
| relevance = ( | |
| chapter_analysis[chapter_id]['avg_score'] * 0.4 + | |
| chapter_analysis[chapter_id]['max_score'] * 0.3 + | |
| min(len(results) / sample_size_per_chapter, 1.0) * 0.2 + | |
| (1.0 / (1.0 + chapter_analysis[chapter_id]['score_std'])) * 0.1 | |
| ) | |
| chapter_analysis[chapter_id]['relevance_score'] = relevance | |
| # print(f" β {chapter_id}: {len(results)} matches, relevance: {relevance:.4f}") | |
| # else: | |
| # print(f" β {chapter_id}: No matches above threshold") | |
| except Exception as e: | |
| print(f" β {chapter_id}: Error - {e}") | |
| elapsed = time.time() - start_time | |
| print(f"β±οΈ Parallel analysis completed in {elapsed:.2f} seconds") | |
| # Sort by relevance score | |
| sorted_analysis = dict(sorted( | |
| chapter_analysis.items(), | |
| key=lambda x: x[1]['relevance_score'], | |
| reverse=True | |
| )) | |
| return sorted_analysis | |
| def _analyze_unified_collection( | |
| self, | |
| diagnostic_string: str, | |
| query_vector: List[float], | |
| collection_name: str, | |
| sample_size_per_chapter: int, | |
| score_threshold: float | |
| ) -> Dict[str, Dict]: | |
| """Analyze unified collection by searching with chapter filters""" | |
| print(f"π Analyzing unified collection: {collection_name}") | |
| chapter_analysis = {} | |
| # Search each chapter in the unified collection | |
| for chapter_id in self.chapter_info.keys(): | |
| try: | |
| results = self.search_single_collection( | |
| collection_name, query_vector, sample_size_per_chapter, | |
| score_threshold, chapter_filter=chapter_id | |
| ) | |
| if results: | |
| scores = [r['score'] for r in results] | |
| chapter_analysis[chapter_id] = { | |
| 'collection_name': collection_name, | |
| 'match_count': len(results), | |
| 'max_score': max(scores), | |
| 'avg_score': np.mean(scores), | |
| 'median_score': np.median(scores), | |
| 'min_score': min(scores), | |
| 'score_std': np.std(scores), | |
| 'top_matches': sorted(results, key=lambda x: x['score'], reverse=True)[:5], | |
| 'all_results': results | |
| } | |
| # Calculate relevance score | |
| relevance = ( | |
| chapter_analysis[chapter_id]['avg_score'] * 0.4 + | |
| chapter_analysis[chapter_id]['max_score'] * 0.3 + | |
| min(len(results) / sample_size_per_chapter, 1.0) * 0.2 + | |
| (1.0 / (1.0 + chapter_analysis[chapter_id]['score_std'])) * 0.1 | |
| ) | |
| chapter_analysis[chapter_id]['relevance_score'] = relevance | |
| print(f" β {chapter_id}: {len(results)} matches, relevance: {relevance:.4f}") | |
| else: | |
| print(f" β {chapter_id}: No matches above threshold") | |
| # Small delay to avoid overwhelming the cloud service | |
| time.sleep(0.1) | |
| except Exception as e: | |
| print(f" β {chapter_id}: Error - {e}") | |
| # Sort by relevance score | |
| return dict(sorted( | |
| chapter_analysis.items(), | |
| key=lambda x: x[1]['relevance_score'], | |
| reverse=True | |
| )) | |
| def get_top_chapters( | |
| self, | |
| diagnostic_string: str, | |
| top_n: int = 5, | |
| min_relevance: float = 0.1 | |
| ) -> List[Tuple[str, float, str]]: | |
| """ | |
| Get top N most relevant chapters for a diagnostic string | |
| Returns: [(chapter_id, relevance_score, description)] | |
| """ | |
| analysis = self.analyze_chapters_parallel(diagnostic_string) | |
| top_chapters = [] | |
| for chapter_id, stats in analysis.items(): | |
| relevance = stats['relevance_score'] | |
| if relevance >= min_relevance and len(top_chapters) < top_n: | |
| description = self.chapter_info.get(chapter_id, "Unknown chapter") | |
| top_chapters.append((chapter_id, relevance, description)) | |
| return top_chapters | |
| def search_targeted_chapters( | |
| self, | |
| diagnostic_string: str, | |
| target_chapters: List[str] = None, | |
| results_per_chapter: int = 10, # Keep for backward compatibility | |
| results_per_sentence: int = 3, | |
| chapters_per_sentence: int = 2 # New parameter: how many top chapters to search per sentence | |
| ) -> Dict[str, Dict[str, List[Dict]]]: | |
| """ | |
| Search only specific chapters or auto-identify top chapters for each sentence individually. | |
| Now searches only the most relevant chapters for each specific sentence. | |
| """ | |
| print(f"\n=== STARTING search_targeted_chapters ===") | |
| print(f"Input parameters:") | |
| print(f" diagnostic_string: '{diagnostic_string[:100]}{'...' if len(diagnostic_string) > 100 else ''}'") | |
| print(f" target_chapters: {target_chapters}") | |
| print(f" results_per_sentence: {results_per_sentence}") | |
| print(f" chapters_per_sentence: {chapters_per_sentence}") | |
| # Split input into sentences first | |
| print(f"\n--- SENTENCE SPLITTING ---") | |
| sentences = self.split_into_sentences(diagnostic_string) | |
| print(f"Split into {len(sentences)} sentences:") | |
| for i, sentence in enumerate(sentences): | |
| print(f" [{i+1}]: '{sentence}'") | |
| print(f"\n--- GETTING CHAPTER COLLECTIONS ---") | |
| chapter_collections = self.get_chapter_collections() | |
| print(f"Available chapter collections: {len(chapter_collections)} total") | |
| print(f"Chapter IDs: {list(chapter_collections.keys())}") | |
| results = {} | |
| if target_chapters is None: | |
| print(f"\n=== AUTO-IDENTIFICATION MODE ===") | |
| print("Auto-identifying most relevant chapters for each sentence individually...") | |
| for i, sentence in enumerate(sentences): | |
| if sentence.strip(): # Skip empty sentences | |
| sentence_key = f"sentence_{i+1}" | |
| print(f"\n--- Processing sentence {i+1} ---") | |
| print(f"Sentence: '{sentence}'") | |
| print(f"Sentence key: {sentence_key}") | |
| # Get top chapters specifically for THIS sentence | |
| print(f"Getting top {chapters_per_sentence} chapters for this sentence...") | |
| try: | |
| sentence_top_chapters = self.get_top_chapters( | |
| sentence, | |
| top_n=chapters_per_sentence, | |
| min_relevance=0.05 | |
| ) | |
| print(f"Found {len(sentence_top_chapters)} relevant chapters:") | |
| for j, (ch_id, rel, desc) in enumerate(sentence_top_chapters): | |
| print(f" [{j+1}] {ch_id}: {rel:.4f} - {desc}") | |
| except Exception as e: | |
| print(f"ERROR in get_top_chapters: {e}") | |
| sentence_top_chapters = [] | |
| # Search only the relevant chapters for this specific sentence | |
| print(f"Searching in {len(sentence_top_chapters)} selected chapters...") | |
| for chapter_id, relevance, description in sentence_top_chapters: | |
| print(f"\n >> Searching chapter: {chapter_id} (relevance: {relevance:.4f})") | |
| if chapter_id in chapter_collections: | |
| collection_name = chapter_collections[chapter_id] | |
| print(f" Collection name: {collection_name}") | |
| # Initialize chapter in results if not exists | |
| if chapter_id not in results: | |
| results[chapter_id] = {} | |
| print(f" Initialized results dict for chapter {chapter_id}") | |
| # Search this sentence in this specific chapter | |
| try: | |
| print(f" Encoding query for sentence...") | |
| query_vector = self.encode_query(sentence) | |
| print(f" Query vector shape: {getattr(query_vector, 'shape', 'N/A')}") | |
| print(f" Searching collection '{collection_name}' for top {results_per_sentence} results...") | |
| sentence_results = self.search_single_collection( | |
| collection_name, query_vector, results_per_sentence | |
| ) | |
| print(f" Raw search returned {len(sentence_results) if sentence_results else 0} results") | |
| except Exception as e: | |
| print(f" ERROR during search: {e}") | |
| sentence_results = [] | |
| if sentence_results: | |
| results[chapter_id][sentence_key] = { | |
| 'text': sentence, | |
| 'chapter_relevance': relevance, | |
| 'results': sentence_results | |
| } | |
| print(f" β Stored {len(sentence_results)} results for {chapter_id}[{sentence_key}]") | |
| # Debug: show top result scores | |
| if sentence_results: | |
| top_scores = [r.get('score', 'N/A') for r in sentence_results[:3]] | |
| print(f" Top 3 scores: {top_scores}") | |
| else: | |
| print(f" β No results above threshold for {chapter_id}") | |
| else: | |
| print(f" ERROR: Chapter {chapter_id} collection not found in available collections") | |
| else: | |
| print(f"\n--- Skipping empty sentence {i+1} ---") | |
| else: | |
| print(f"\n=== PRE-SPECIFIED CHAPTERS MODE ===") | |
| print(f"Using pre-specified chapters: {target_chapters}") | |
| # Validate chapters exist | |
| valid_chapters = [] | |
| invalid_chapters = [] | |
| for chapter_id in target_chapters: | |
| if chapter_id in chapter_collections: | |
| valid_chapters.append(chapter_id) | |
| else: | |
| invalid_chapters.append(chapter_id) | |
| print(f"Valid chapters: {valid_chapters}") | |
| if invalid_chapters: | |
| print(f"WARNING: Invalid chapters (will be skipped): {invalid_chapters}") | |
| for chapter_id in valid_chapters: | |
| collection_name = chapter_collections[chapter_id] | |
| print(f"\n--- Searching chapter: {chapter_id} ---") | |
| print(f"Collection name: {collection_name}") | |
| chapter_results = {} | |
| # Search each sentence in this chapter | |
| for i, sentence in enumerate(sentences): | |
| if sentence.strip(): # Skip empty sentences | |
| sentence_key = f"sentence_{i+1}" | |
| print(f"\n >> Processing sentence {i+1} in {chapter_id}") | |
| print(f" Sentence: '{sentence}'") | |
| try: | |
| print(f" Encoding query...") | |
| query_vector = self.encode_query(sentence) | |
| print(f" Query vector shape: {getattr(query_vector, 'shape', 'N/A')}") | |
| print(f" Searching for top {results_per_sentence} results...") | |
| sentence_results = self.search_single_collection( | |
| collection_name, query_vector, results_per_sentence | |
| ) | |
| print(f" Found {len(sentence_results) if sentence_results else 0} results") | |
| except Exception as e: | |
| print(f" ERROR during search: {e}") | |
| sentence_results = [] | |
| if sentence_results: | |
| chapter_results[sentence_key] = { | |
| 'text': sentence, | |
| 'chapter_relevance': None, # Not calculated for pre-specified chapters | |
| 'results': sentence_results | |
| } | |
| print(f" β Stored results for sentence {i+1}") | |
| # Debug: show top result scores | |
| top_scores = [r.get('score', 'N/A') for r in sentence_results[:3]] | |
| print(f" Top 3 scores: {top_scores}") | |
| else: | |
| print(f" β No results found for sentence {i+1}") | |
| else: | |
| print(f" >> Skipping empty sentence {i+1}") | |
| if chapter_results: | |
| results[chapter_id] = chapter_results | |
| print(f"\n β Chapter {chapter_id}: Stored results for {len(chapter_results)} sentences") | |
| else: | |
| print(f"\n β Chapter {chapter_id}: No results found") | |
| # Final summary | |
| print(f"\n=== SEARCH COMPLETE ===") | |
| print(f"Results summary:") | |
| total_results = 0 | |
| for chapter_id, chapter_data in results.items(): | |
| sentence_count = len(chapter_data) | |
| result_count = sum(len(sent_data.get('results', [])) for sent_data in chapter_data.values()) | |
| total_results += result_count | |
| print(f" {chapter_id}: {sentence_count} sentences, {result_count} total results") | |
| print(f"Grand total: {len(results)} chapters, {total_results} results") | |
| print(f"=== END search_targeted_chapters ===\n") | |
| return results | |
| def format_chapter_analysis(self, diagnostic_string: str, detailed: bool = True) -> str: | |
| """Format comprehensive chapter analysis""" | |
| analysis = self.analyze_chapters_parallel(diagnostic_string) | |
| if not analysis: | |
| return "β No relevant chapters found." | |
| output = [] | |
| output.append(f"\n{'='*90}") | |
| output.append(f"π CHAPTER RELEVANCE ANALYSIS") | |
| output.append(f"π Diagnostic: '{diagnostic_string}'") | |
| output.append(f"{'='*90}") | |
| for i, (chapter_id, stats) in enumerate(analysis.items(), 1): | |
| if stats['relevance_score'] < 0.05: # Skip very low relevance | |
| continue | |
| description = self.chapter_info.get(chapter_id, "Unknown chapter") | |
| output.append(f"\n{i}. π {chapter_id.upper()}") | |
| output.append(f" π·οΈ Collection: {stats['collection_name']}") | |
| output.append(f" π Description: {description}") | |
| output.append(f" β Relevance Score: {stats['relevance_score']:.4f}") | |
| output.append(f" π Statistics:") | |
| output.append(f" β’ Matches: {stats['match_count']}") | |
| output.append(f" β’ Max Score: {stats['max_score']:.4f}") | |
| output.append(f" β’ Avg Score: {stats['avg_score']:.4f}") | |
| output.append(f" β’ Score Range: {stats['min_score']:.4f} - {stats['max_score']:.4f}") | |
| if detailed: | |
| output.append(f"\n π― Top Matches:") | |
| for j, match in enumerate(stats['top_matches'][:3], 1): | |
| code = match['payload'].get('code', 'N/A') | |
| title = match['payload'].get('title', 'N/A') | |
| score = match['score'] | |
| output.append(f" {j}. {code} - {title}") | |
| output.append(f" π― Similarity: {score:.4f}") | |
| output.append("-" * 90) | |
| return "\n".join(output) | |
| # Convenience functions for multi-collection setup | |
| def analyze_diagnostic_chapters(diagnostic_string: str, detailed: bool = True, use_cloud: bool = True) -> str: | |
| """ | |
| Main function to analyze which chapters are most relevant for a diagnostic | |
| """ | |
| retriever = MultiCollectionChapterRetrieval(use_cloud=use_cloud) | |
| return retriever.format_chapter_analysis(diagnostic_string, detailed) | |
| def get_relevant_chapters(diagnostic_string: str, top_n: int = 5, use_cloud: bool = True) -> List[str]: | |
| """ | |
| Get list of most relevant chapter IDs for a diagnostic string | |
| Returns: ['chapter_9_IX', 'chapter_10_X', ...] | |
| """ | |
| retriever = MultiCollectionChapterRetrieval(use_cloud=use_cloud) | |
| top_chapters = retriever.get_top_chapters(diagnostic_string, top_n) | |
| return [chapter_id for chapter_id, _, _ in top_chapters] | |
| def smart_diagnostic_search( | |
| diagnostic_string: str, | |
| auto_select_chapters: bool = True, | |
| target_chapters: List[str] = None, | |
| results_per_sentence: int = 3, # Updated parameter name | |
| use_cloud: bool = True | |
| ) -> Dict[str, Dict[str, List[Dict]]]: # Updated return type | |
| """ | |
| Intelligent diagnostic search that processes each sentence separately | |
| Optimized for Qdrant Cloud | |
| """ | |
| retriever = MultiCollectionChapterRetrieval(use_cloud=use_cloud) | |
| if auto_select_chapters: | |
| return retriever.search_targeted_chapters( | |
| diagnostic_string, target_chapters, results_per_sentence=results_per_sentence | |
| ) | |
| else: | |
| return retriever.search_targeted_chapters( | |
| diagnostic_string, target_chapters, results_per_sentence=results_per_sentence | |
| ) | |
| def format_smart_search_results( | |
| diagnostic_string: str, | |
| search_results: Dict[str, Dict[str, List[Dict]]], # Updated parameter type | |
| use_cloud: bool = True | |
| ) -> str: | |
| """Format the results from sentence-based smart_diagnostic_search""" | |
| if not search_results: | |
| return "β No results found." | |
| retriever = MultiCollectionChapterRetrieval(use_cloud=use_cloud) | |
| output = [] | |
| output.append(f"\n{'='*90}") | |
| output.append(f"π SENTENCE-BASED DIAGNOSTIC SEARCH RESULTS") | |
| output.append(f"π― Query: '{diagnostic_string}'") | |
| output.append(f"{'='*90}") | |
| # Count total results | |
| total_results = 0 | |
| total_sentences = 0 | |
| for chapter_results in search_results.values(): | |
| total_sentences += len(chapter_results) | |
| for sentence_data in chapter_results.values(): | |
| total_results += len(sentence_data['results']) | |
| output.append(f"π Total results: {total_results} across {len(search_results)} chapters and {total_sentences} sentences") | |
| for chapter_id, chapter_data in search_results.items(): | |
| description = retriever.chapter_info.get(chapter_id, "Unknown chapter") | |
| output.append(f"\nπ {chapter_id.upper()}") | |
| output.append(f" π {description}") | |
| output.append(f" π {len(chapter_data)} sentences processed") | |
| output.append("-" * 60) | |
| for sentence_key, sentence_data in chapter_data.items(): | |
| sentence_text = sentence_data['text'] | |
| results = sentence_data['results'] | |
| output.append(f"\n π {sentence_key.replace('_', ' ').title()}: \"{sentence_text}\"") | |
| output.append(f" π― Top {len(results)} matches:") | |
| output.append("") | |
| for i, result in enumerate(results, 1): | |
| payload = result['payload'] | |
| code = payload.get('code', 'N/A') | |
| title = payload.get('title', 'N/A') | |
| score = result['score'] | |
| output.append(f" {i}. {code} - {title}") | |
| output.append(f" π― Score: {score:.4f}") | |
| # Show description if available | |
| desc = payload.get('description', '') | |
| if desc: | |
| desc_preview = desc[:100] + "..." if len(desc) > 100 else desc | |
| output.append(f" π {desc_preview}") | |
| output.append("") | |
| output.append("=" * 90) | |
| return "\n".join(output) | |
| # Example usage | |
| def example_multi_collection_analysis(use_cloud: bool = True): | |
| """Example of using the multi-collection chapter analysis""" | |
| test_cases = [ | |
| "severe chest pain with shortness of breath", | |
| "type 2 diabetes with kidney complications", | |
| "depression and anxiety disorder", | |
| "broken wrist from falling", | |
| "acute appendicitis with fever", | |
| "skin cancer melanoma", | |
| "pregnancy complications in third trimester" | |
| ] | |
| for diagnostic in test_cases: | |
| print(f"\n{'='*100}") | |
| print(f"π ANALYZING: {diagnostic}") | |
| print(f"{'='*100}") | |
| try: | |
| # Step 1: Analyze chapter relevance | |
| analysis = analyze_diagnostic_chapters(diagnostic, detailed=False, use_cloud=use_cloud) | |
| print(analysis) | |
| # Step 2: Get top relevant chapters | |
| top_chapters = get_relevant_chapters(diagnostic, top_n=3, use_cloud=use_cloud) | |
| print(f"\nπ Top 3 relevant chapters: {top_chapters}") | |
| # Step 3: Smart search in those chapters | |
| search_results = smart_diagnostic_search( | |
| diagnostic, | |
| results_per_sentence=5, | |
| use_cloud=use_cloud | |
| ) | |
| formatted_results = format_smart_search_results( | |
| diagnostic, | |
| search_results, | |
| use_cloud=use_cloud | |
| ) | |
| print(formatted_results) | |
| except Exception as e: | |
| print(f"β Error processing '{diagnostic}': {e}") | |
| continue | |
| def test_cloud_connection(): | |
| """Test Qdrant Cloud connection and basic functionality""" | |
| print("π§ͺ Testing Qdrant Cloud Connection...") | |
| try: | |
| retriever = MultiCollectionChapterRetrieval(use_cloud=True) | |
| # Test basic search | |
| test_query = "heart disease" | |
| print(f"\n㪠Testing with query: '{test_query}'") | |
| # Get collections | |
| collections = retriever.get_chapter_collections() | |
| print(f"π Available collections: {len(collections)}") | |
| if collections: | |
| # Test search | |
| top_chapters = retriever.get_top_chapters(test_query, top_n=3) | |
| print(f"π― Top chapters for '{test_query}': {[ch[0] for ch in top_chapters]}") | |
| print("β Cloud connection test successful!") | |
| return True | |
| else: | |
| print("β οΈ No collections found") | |
| return False | |
| except Exception as e: | |
| print(f"β Cloud connection test failed: {e}") | |
| return False | |
| if __name__ == "__main__": | |
| # Test cloud connection first | |
| if test_cloud_connection(): | |
| print("\n" + "="*100) | |
| print("π Running example analysis with Qdrant Cloud...") | |
| print("="*100) | |
| # Run examples with cloud | |
| example_multi_collection_analysis(use_cloud=True) | |
| else: | |
| print("β Skipping examples due to connection issues") | |
| # Or use directly: | |
| # chapters = get_relevant_chapters("heart attack symptoms", use_cloud=True) | |
| # results = smart_diagnostic_search("heart attack symptoms", use_cloud=True) | |
| # print(format_smart_search_results("heart attack symptoms", results, use_cloud=True)) |