icd_10_coding_assistant / chapter_retrieval_system_v2.py
Axcel1's picture
Upload 4 files
0ee5e7e verified
from qdrant_client import QdrantClient
from qdrant_client.models import VectorParams, Distance, PointStruct
import numpy as np
from typing import List, Dict, Optional, Tuple, Set
from collections import Counter, defaultdict
from sentence_transformers import SentenceTransformer
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import re
import pprint
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
class MultiCollectionChapterRetrieval:
def __init__(self, use_cloud: bool = True):
"""
Initialize with Qdrant Cloud or local connection
Args:
use_cloud: If True, connects to Qdrant Cloud using environment variables
"""
if use_cloud:
self.client = self._create_cloud_client()
else:
self.client = QdrantClient("http://localhost:6333")
self.encoder = None
# ICD-10 Chapter mapping (all 22 chapters)
self.chapter_info = {
"chapter_1_I": "Certain infectious and parasitic diseases",
"chapter_2_II": "Neoplasms",
"chapter_3_III": "Diseases of the blood and blood-forming organs and certain disorders involving the immune mechanism",
"chapter_4_IV": "Endocrine, nutritional and metabolic diseases",
"chapter_5_V": "Mental and behavioural disorders",
"chapter_6_VI": "Diseases of the nervous system",
"chapter_7_VII": "Diseases of the eye and adnexa",
"chapter_8_VIII": "Diseases of the ear and mastoid process",
"chapter_9_IX": "Diseases of the circulatory system",
"chapter_10_X": "Diseases of the respiratory system",
"chapter_11_XI": "Diseases of the digestive system",
"chapter_12_XII": "Diseases of the skin and subcutaneous tissue",
"chapter_13_XIII": "Diseases of the musculoskeletal system and connective tissue",
"chapter_14_XIV": "Diseases of the genitourinary system",
"chapter_15_XV": "Pregnancy, childbirth and the puerperium",
"chapter_16_XVI": "Certain conditions originating in the perinatal period",
"chapter_17_XVII": "Congenital malformations, deformations and chromosomal abnormalities",
"chapter_18_XVIII": "Symptoms, signs and abnormal clinical and laboratory findings, not elsewhere classified",
"chapter_19_XIX": "Injury, poisoning and certain other consequences of external causes",
"chapter_20_XX": "External causes of morbidity and mortality",
"chapter_21_XXI": "Factors influencing health status and contact with health services",
"chapter_22_XXII": "Codes for special purposes"
}
# Cache for collection names
self._chapter_collections = None
def _create_cloud_client(self) -> QdrantClient:
"""Create Qdrant Cloud client with authentication"""
qdrant_url = os.getenv('QDRANT_URL')
qdrant_api_key = os.getenv('QDRANT_API_KEY')
if not qdrant_url or not qdrant_api_key:
raise ValueError(
"Qdrant Cloud credentials not found in environment variables.\n"
"Please set QDRANT_URL and QDRANT_API_KEY in your .env file:\n"
"QDRANT_URL=https://your-cluster-id.region.aws.cloud.qdrant.io:6333\n"
"QDRANT_API_KEY=your-api-key-here"
)
print(f"πŸ”— Connecting to Qdrant Cloud: {qdrant_url}")
try:
client = QdrantClient(
url=qdrant_url,
api_key=qdrant_api_key,
timeout=60, # Increased timeout for cloud
# Optional: Add additional cloud-specific settings
prefer_grpc=True, # Use gRPC for better performance
)
# Test connection
collections = client.get_collections()
print(f"βœ… Connected successfully! Found {len(collections.collections)} collections")
return client
except Exception as e:
print(f"❌ Failed to connect to Qdrant Cloud: {e}")
print("Please check your QDRANT_URL and QDRANT_API_KEY in the .env file")
raise
def split_into_sentences(self, text: str) -> List[str]:
"""Split text into sentences using simple rules"""
import re
# Simple sentence splitting - you can enhance this with nltk or spacy if needed
sentences = re.split(r'[.!?]+', text)
sentences = [s.strip() for s in sentences if s.strip()]
return sentences
def load_encoder(self, model_name: str = "all-MiniLM-L6-v2"):
"""Load the sentence transformer model"""
if self.encoder is None:
print(f"πŸ“₯ Loading encoder: {model_name}")
self.encoder = SentenceTransformer(model_name)
print(f"βœ… Encoder loaded successfully")
def encode_query(self, query: str) -> List[float]:
"""Encode diagnostic string to vector"""
if self.encoder is None:
self.load_encoder()
return self.encoder.encode([query])[0].tolist()
def get_chapter_collections(self) -> Dict[str, str]:
"""
Get mapping of chapter_id -> collection_name
Discovers collections automatically based on naming patterns
"""
if self._chapter_collections is not None:
return self._chapter_collections
try:
collections = self.client.get_collections()
chapter_collections = {}
print("πŸ” Discovering chapter collections...")
for collection in collections.collections:
collection_name = collection.name
# Try to match collection names to chapters
chapter_match = None
# Pattern 1: icd10_chapter_X_Y or chapter_X_Y
pattern1 = re.search(r'chapter[_-]?(\d+)[_-]?([IVX]+)', collection_name, re.IGNORECASE)
if pattern1:
chapter_num = pattern1.group(1)
roman = pattern1.group(2)
chapter_match = f"chapter_{chapter_num}_{roman}"
# Pattern 2: Single collection with all chapters (e.g., icd10_codes_all_chapters)
elif 'all' in collection_name.lower() and ('chapter' in collection_name.lower() or 'icd' in collection_name.lower()):
print(f" πŸ“š Found unified collection: {collection_name}")
# For unified collections, we'll handle this differently
chapter_collections['unified_collection'] = collection_name
continue
# Pattern 3: Just the chapter part (chapter1, chapterI, etc.)
elif 'chapter' in collection_name.lower():
numbers = re.findall(r'\d+', collection_name)
romans = re.findall(r'[IVX]+', collection_name)
if numbers and romans:
chapter_match = f"chapter_{numbers[0]}_{romans[0]}"
elif numbers:
# Try to convert number to roman numeral
num = int(numbers[0])
roman_map = {1: 'I', 2: 'II', 3: 'III', 4: 'IV', 5: 'V', 6: 'VI', 7: 'VII',
8: 'VIII', 9: 'IX', 10: 'X', 11: 'XI', 12: 'XII', 13: 'XIII',
14: 'XIV', 15: 'XV', 16: 'XVI', 17: 'XVII', 18: 'XVIII', 19: 'XIX',
20: 'XX', 21: 'XXI', 22: 'XXII'}
if num in roman_map:
chapter_match = f"chapter_{num}_{roman_map[num]}"
if chapter_match:
chapter_collections[chapter_match] = collection_name
print(f" βœ“ {chapter_match} -> {collection_name}")
print(f"πŸ“Š Found {len(chapter_collections)} chapter collections")
# If we only found a unified collection, we'll need to handle searches differently
if len(chapter_collections) == 1 and 'unified_collection' in chapter_collections:
print("⚠️ Only unified collection found. Searches will use chapter filtering.")
self._chapter_collections = chapter_collections
return chapter_collections
except Exception as e:
print(f"❌ Error discovering collections: {e}")
return {}
def search_single_collection(
self,
collection_name: str,
query_vector: List[float],
limit: int = 20,
score_threshold: float = 0.3,
chapter_filter: Optional[str] = None
) -> List[Dict]:
"""Search a single collection and return formatted results"""
try:
# Build search parameters
search_params = {
"collection_name": collection_name,
"query_vector": query_vector,
"limit": limit,
"score_threshold": score_threshold
}
results = self.client.search(**search_params)
formatted_results = []
for result in results:
formatted_results.append({
'collection': collection_name,
'score': result.score,
'id': result.id,
'payload': result.payload
})
return formatted_results
except Exception as e:
print(f"❌ Error searching {collection_name}: {e}")
if "timeout" in str(e).lower():
print(" This might be due to network issues. Retrying with lower limit...")
try:
# Retry with reduced parameters
search_params["limit"] = min(limit, 10)
search_params["score_threshold"] = max(score_threshold, 0.5)
results = self.client.search(**search_params)
formatted_results = []
for result in results:
formatted_results.append({
'collection': collection_name,
'score': result.score,
'id': result.id,
'payload': result.payload
})
return formatted_results
except:
pass
return []
def analyze_chapters_parallel(
self,
diagnostic_string: str,
sample_size_per_chapter: int = 15,
score_threshold: float = 0.3,
max_workers: int = 4 # Reduced for cloud stability
) -> Dict[str, Dict]:
"""
Analyze all chapter collections in parallel to determine relevance
Optimized for cloud performance
"""
query_vector = self.encode_query(diagnostic_string)
chapter_collections = self.get_chapter_collections()
if not chapter_collections:
print("❌ No chapter collections found!")
return {}
print(f"\nπŸ” Analyzing diagnostic: '{diagnostic_string}'")
# Handle unified collection differently
# if 'unified_collection' in chapter_collections:
# return self._analyze_unified_collection(
# diagnostic_string, query_vector,
# chapter_collections['unified_collection'],
# sample_size_per_chapter, score_threshold
# )
print(f"πŸ”„ Searching {len(chapter_collections)} collections in parallel...")
chapter_analysis = {}
def search_chapter(chapter_id: str, collection_name: str) -> Tuple[str, List[Dict]]:
"""Search function for parallel execution with retry logic"""
max_retries = 2
for attempt in range(max_retries):
try:
results = self.search_single_collection(
collection_name, query_vector, sample_size_per_chapter, score_threshold
)
return chapter_id, results
except Exception as e:
if attempt < max_retries - 1:
print(f" ⚠️ Retry {attempt + 1} for {chapter_id}: {e}")
time.sleep(1) # Brief delay before retry
else:
print(f" ❌ Failed {chapter_id} after {max_retries} attempts: {e}")
return chapter_id, []
# Execute searches in parallel
start_time = time.time()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all search tasks
future_to_chapter = {
executor.submit(search_chapter, chapter_id, collection_name): chapter_id
for chapter_id, collection_name in chapter_collections.items()
if chapter_id != 'unified_collection'
}
# Collect results as they complete
for future in as_completed(future_to_chapter):
chapter_id = future_to_chapter[future]
try:
chapter_id, results = future.result(timeout=30) # 30 second timeout per search
if results:
scores = [r['score'] for r in results]
# Calculate chapter statistics
chapter_analysis[chapter_id] = {
'collection_name': chapter_collections[chapter_id],
'match_count': len(results),
'max_score': max(scores),
'avg_score': np.mean(scores),
'median_score': np.median(scores),
'min_score': min(scores),
'score_std': np.std(scores),
'top_matches': sorted(results, key=lambda x: x['score'], reverse=True)[:5],
'all_results': results
}
# Calculate relevance score (weighted combination of metrics)
relevance = (
chapter_analysis[chapter_id]['avg_score'] * 0.4 +
chapter_analysis[chapter_id]['max_score'] * 0.3 +
min(len(results) / sample_size_per_chapter, 1.0) * 0.2 +
(1.0 / (1.0 + chapter_analysis[chapter_id]['score_std'])) * 0.1
)
chapter_analysis[chapter_id]['relevance_score'] = relevance
# print(f" βœ… {chapter_id}: {len(results)} matches, relevance: {relevance:.4f}")
# else:
# print(f" βž– {chapter_id}: No matches above threshold")
except Exception as e:
print(f" ❌ {chapter_id}: Error - {e}")
elapsed = time.time() - start_time
print(f"⏱️ Parallel analysis completed in {elapsed:.2f} seconds")
# Sort by relevance score
sorted_analysis = dict(sorted(
chapter_analysis.items(),
key=lambda x: x[1]['relevance_score'],
reverse=True
))
return sorted_analysis
def _analyze_unified_collection(
self,
diagnostic_string: str,
query_vector: List[float],
collection_name: str,
sample_size_per_chapter: int,
score_threshold: float
) -> Dict[str, Dict]:
"""Analyze unified collection by searching with chapter filters"""
print(f"πŸ”„ Analyzing unified collection: {collection_name}")
chapter_analysis = {}
# Search each chapter in the unified collection
for chapter_id in self.chapter_info.keys():
try:
results = self.search_single_collection(
collection_name, query_vector, sample_size_per_chapter,
score_threshold, chapter_filter=chapter_id
)
if results:
scores = [r['score'] for r in results]
chapter_analysis[chapter_id] = {
'collection_name': collection_name,
'match_count': len(results),
'max_score': max(scores),
'avg_score': np.mean(scores),
'median_score': np.median(scores),
'min_score': min(scores),
'score_std': np.std(scores),
'top_matches': sorted(results, key=lambda x: x['score'], reverse=True)[:5],
'all_results': results
}
# Calculate relevance score
relevance = (
chapter_analysis[chapter_id]['avg_score'] * 0.4 +
chapter_analysis[chapter_id]['max_score'] * 0.3 +
min(len(results) / sample_size_per_chapter, 1.0) * 0.2 +
(1.0 / (1.0 + chapter_analysis[chapter_id]['score_std'])) * 0.1
)
chapter_analysis[chapter_id]['relevance_score'] = relevance
print(f" βœ… {chapter_id}: {len(results)} matches, relevance: {relevance:.4f}")
else:
print(f" βž– {chapter_id}: No matches above threshold")
# Small delay to avoid overwhelming the cloud service
time.sleep(0.1)
except Exception as e:
print(f" ❌ {chapter_id}: Error - {e}")
# Sort by relevance score
return dict(sorted(
chapter_analysis.items(),
key=lambda x: x[1]['relevance_score'],
reverse=True
))
def get_top_chapters(
self,
diagnostic_string: str,
top_n: int = 5,
min_relevance: float = 0.1
) -> List[Tuple[str, float, str]]:
"""
Get top N most relevant chapters for a diagnostic string
Returns: [(chapter_id, relevance_score, description)]
"""
analysis = self.analyze_chapters_parallel(diagnostic_string)
top_chapters = []
for chapter_id, stats in analysis.items():
relevance = stats['relevance_score']
if relevance >= min_relevance and len(top_chapters) < top_n:
description = self.chapter_info.get(chapter_id, "Unknown chapter")
top_chapters.append((chapter_id, relevance, description))
return top_chapters
def search_targeted_chapters(
self,
diagnostic_string: str,
target_chapters: List[str] = None,
results_per_chapter: int = 10, # Keep for backward compatibility
results_per_sentence: int = 3,
chapters_per_sentence: int = 2 # New parameter: how many top chapters to search per sentence
) -> Dict[str, Dict[str, List[Dict]]]:
"""
Search only specific chapters or auto-identify top chapters for each sentence individually.
Now searches only the most relevant chapters for each specific sentence.
"""
print(f"\n=== STARTING search_targeted_chapters ===")
print(f"Input parameters:")
print(f" diagnostic_string: '{diagnostic_string[:100]}{'...' if len(diagnostic_string) > 100 else ''}'")
print(f" target_chapters: {target_chapters}")
print(f" results_per_sentence: {results_per_sentence}")
print(f" chapters_per_sentence: {chapters_per_sentence}")
# Split input into sentences first
print(f"\n--- SENTENCE SPLITTING ---")
sentences = self.split_into_sentences(diagnostic_string)
print(f"Split into {len(sentences)} sentences:")
for i, sentence in enumerate(sentences):
print(f" [{i+1}]: '{sentence}'")
print(f"\n--- GETTING CHAPTER COLLECTIONS ---")
chapter_collections = self.get_chapter_collections()
print(f"Available chapter collections: {len(chapter_collections)} total")
print(f"Chapter IDs: {list(chapter_collections.keys())}")
results = {}
if target_chapters is None:
print(f"\n=== AUTO-IDENTIFICATION MODE ===")
print("Auto-identifying most relevant chapters for each sentence individually...")
for i, sentence in enumerate(sentences):
if sentence.strip(): # Skip empty sentences
sentence_key = f"sentence_{i+1}"
print(f"\n--- Processing sentence {i+1} ---")
print(f"Sentence: '{sentence}'")
print(f"Sentence key: {sentence_key}")
# Get top chapters specifically for THIS sentence
print(f"Getting top {chapters_per_sentence} chapters for this sentence...")
try:
sentence_top_chapters = self.get_top_chapters(
sentence,
top_n=chapters_per_sentence,
min_relevance=0.05
)
print(f"Found {len(sentence_top_chapters)} relevant chapters:")
for j, (ch_id, rel, desc) in enumerate(sentence_top_chapters):
print(f" [{j+1}] {ch_id}: {rel:.4f} - {desc}")
except Exception as e:
print(f"ERROR in get_top_chapters: {e}")
sentence_top_chapters = []
# Search only the relevant chapters for this specific sentence
print(f"Searching in {len(sentence_top_chapters)} selected chapters...")
for chapter_id, relevance, description in sentence_top_chapters:
print(f"\n >> Searching chapter: {chapter_id} (relevance: {relevance:.4f})")
if chapter_id in chapter_collections:
collection_name = chapter_collections[chapter_id]
print(f" Collection name: {collection_name}")
# Initialize chapter in results if not exists
if chapter_id not in results:
results[chapter_id] = {}
print(f" Initialized results dict for chapter {chapter_id}")
# Search this sentence in this specific chapter
try:
print(f" Encoding query for sentence...")
query_vector = self.encode_query(sentence)
print(f" Query vector shape: {getattr(query_vector, 'shape', 'N/A')}")
print(f" Searching collection '{collection_name}' for top {results_per_sentence} results...")
sentence_results = self.search_single_collection(
collection_name, query_vector, results_per_sentence
)
print(f" Raw search returned {len(sentence_results) if sentence_results else 0} results")
except Exception as e:
print(f" ERROR during search: {e}")
sentence_results = []
if sentence_results:
results[chapter_id][sentence_key] = {
'text': sentence,
'chapter_relevance': relevance,
'results': sentence_results
}
print(f" βœ“ Stored {len(sentence_results)} results for {chapter_id}[{sentence_key}]")
# Debug: show top result scores
if sentence_results:
top_scores = [r.get('score', 'N/A') for r in sentence_results[:3]]
print(f" Top 3 scores: {top_scores}")
else:
print(f" βœ— No results above threshold for {chapter_id}")
else:
print(f" ERROR: Chapter {chapter_id} collection not found in available collections")
else:
print(f"\n--- Skipping empty sentence {i+1} ---")
else:
print(f"\n=== PRE-SPECIFIED CHAPTERS MODE ===")
print(f"Using pre-specified chapters: {target_chapters}")
# Validate chapters exist
valid_chapters = []
invalid_chapters = []
for chapter_id in target_chapters:
if chapter_id in chapter_collections:
valid_chapters.append(chapter_id)
else:
invalid_chapters.append(chapter_id)
print(f"Valid chapters: {valid_chapters}")
if invalid_chapters:
print(f"WARNING: Invalid chapters (will be skipped): {invalid_chapters}")
for chapter_id in valid_chapters:
collection_name = chapter_collections[chapter_id]
print(f"\n--- Searching chapter: {chapter_id} ---")
print(f"Collection name: {collection_name}")
chapter_results = {}
# Search each sentence in this chapter
for i, sentence in enumerate(sentences):
if sentence.strip(): # Skip empty sentences
sentence_key = f"sentence_{i+1}"
print(f"\n >> Processing sentence {i+1} in {chapter_id}")
print(f" Sentence: '{sentence}'")
try:
print(f" Encoding query...")
query_vector = self.encode_query(sentence)
print(f" Query vector shape: {getattr(query_vector, 'shape', 'N/A')}")
print(f" Searching for top {results_per_sentence} results...")
sentence_results = self.search_single_collection(
collection_name, query_vector, results_per_sentence
)
print(f" Found {len(sentence_results) if sentence_results else 0} results")
except Exception as e:
print(f" ERROR during search: {e}")
sentence_results = []
if sentence_results:
chapter_results[sentence_key] = {
'text': sentence,
'chapter_relevance': None, # Not calculated for pre-specified chapters
'results': sentence_results
}
print(f" βœ“ Stored results for sentence {i+1}")
# Debug: show top result scores
top_scores = [r.get('score', 'N/A') for r in sentence_results[:3]]
print(f" Top 3 scores: {top_scores}")
else:
print(f" βœ— No results found for sentence {i+1}")
else:
print(f" >> Skipping empty sentence {i+1}")
if chapter_results:
results[chapter_id] = chapter_results
print(f"\n βœ“ Chapter {chapter_id}: Stored results for {len(chapter_results)} sentences")
else:
print(f"\n βœ— Chapter {chapter_id}: No results found")
# Final summary
print(f"\n=== SEARCH COMPLETE ===")
print(f"Results summary:")
total_results = 0
for chapter_id, chapter_data in results.items():
sentence_count = len(chapter_data)
result_count = sum(len(sent_data.get('results', [])) for sent_data in chapter_data.values())
total_results += result_count
print(f" {chapter_id}: {sentence_count} sentences, {result_count} total results")
print(f"Grand total: {len(results)} chapters, {total_results} results")
print(f"=== END search_targeted_chapters ===\n")
return results
def format_chapter_analysis(self, diagnostic_string: str, detailed: bool = True) -> str:
"""Format comprehensive chapter analysis"""
analysis = self.analyze_chapters_parallel(diagnostic_string)
if not analysis:
return "❌ No relevant chapters found."
output = []
output.append(f"\n{'='*90}")
output.append(f"πŸ“Š CHAPTER RELEVANCE ANALYSIS")
output.append(f"πŸ” Diagnostic: '{diagnostic_string}'")
output.append(f"{'='*90}")
for i, (chapter_id, stats) in enumerate(analysis.items(), 1):
if stats['relevance_score'] < 0.05: # Skip very low relevance
continue
description = self.chapter_info.get(chapter_id, "Unknown chapter")
output.append(f"\n{i}. πŸ“š {chapter_id.upper()}")
output.append(f" 🏷️ Collection: {stats['collection_name']}")
output.append(f" πŸ“– Description: {description}")
output.append(f" ⭐ Relevance Score: {stats['relevance_score']:.4f}")
output.append(f" πŸ“Š Statistics:")
output.append(f" β€’ Matches: {stats['match_count']}")
output.append(f" β€’ Max Score: {stats['max_score']:.4f}")
output.append(f" β€’ Avg Score: {stats['avg_score']:.4f}")
output.append(f" β€’ Score Range: {stats['min_score']:.4f} - {stats['max_score']:.4f}")
if detailed:
output.append(f"\n 🎯 Top Matches:")
for j, match in enumerate(stats['top_matches'][:3], 1):
code = match['payload'].get('code', 'N/A')
title = match['payload'].get('title', 'N/A')
score = match['score']
output.append(f" {j}. {code} - {title}")
output.append(f" πŸ’― Similarity: {score:.4f}")
output.append("-" * 90)
return "\n".join(output)
# Convenience functions for multi-collection setup
def analyze_diagnostic_chapters(diagnostic_string: str, detailed: bool = True, use_cloud: bool = True) -> str:
"""
Main function to analyze which chapters are most relevant for a diagnostic
"""
retriever = MultiCollectionChapterRetrieval(use_cloud=use_cloud)
return retriever.format_chapter_analysis(diagnostic_string, detailed)
def get_relevant_chapters(diagnostic_string: str, top_n: int = 5, use_cloud: bool = True) -> List[str]:
"""
Get list of most relevant chapter IDs for a diagnostic string
Returns: ['chapter_9_IX', 'chapter_10_X', ...]
"""
retriever = MultiCollectionChapterRetrieval(use_cloud=use_cloud)
top_chapters = retriever.get_top_chapters(diagnostic_string, top_n)
return [chapter_id for chapter_id, _, _ in top_chapters]
def smart_diagnostic_search(
diagnostic_string: str,
auto_select_chapters: bool = True,
target_chapters: List[str] = None,
results_per_sentence: int = 3, # Updated parameter name
use_cloud: bool = True
) -> Dict[str, Dict[str, List[Dict]]]: # Updated return type
"""
Intelligent diagnostic search that processes each sentence separately
Optimized for Qdrant Cloud
"""
retriever = MultiCollectionChapterRetrieval(use_cloud=use_cloud)
if auto_select_chapters:
return retriever.search_targeted_chapters(
diagnostic_string, target_chapters, results_per_sentence=results_per_sentence
)
else:
return retriever.search_targeted_chapters(
diagnostic_string, target_chapters, results_per_sentence=results_per_sentence
)
def format_smart_search_results(
diagnostic_string: str,
search_results: Dict[str, Dict[str, List[Dict]]], # Updated parameter type
use_cloud: bool = True
) -> str:
"""Format the results from sentence-based smart_diagnostic_search"""
if not search_results:
return "❌ No results found."
retriever = MultiCollectionChapterRetrieval(use_cloud=use_cloud)
output = []
output.append(f"\n{'='*90}")
output.append(f"πŸ” SENTENCE-BASED DIAGNOSTIC SEARCH RESULTS")
output.append(f"🎯 Query: '{diagnostic_string}'")
output.append(f"{'='*90}")
# Count total results
total_results = 0
total_sentences = 0
for chapter_results in search_results.values():
total_sentences += len(chapter_results)
for sentence_data in chapter_results.values():
total_results += len(sentence_data['results'])
output.append(f"πŸ“Š Total results: {total_results} across {len(search_results)} chapters and {total_sentences} sentences")
for chapter_id, chapter_data in search_results.items():
description = retriever.chapter_info.get(chapter_id, "Unknown chapter")
output.append(f"\nπŸ“š {chapter_id.upper()}")
output.append(f" πŸ“– {description}")
output.append(f" πŸ“ {len(chapter_data)} sentences processed")
output.append("-" * 60)
for sentence_key, sentence_data in chapter_data.items():
sentence_text = sentence_data['text']
results = sentence_data['results']
output.append(f"\n πŸ” {sentence_key.replace('_', ' ').title()}: \"{sentence_text}\"")
output.append(f" 🎯 Top {len(results)} matches:")
output.append("")
for i, result in enumerate(results, 1):
payload = result['payload']
code = payload.get('code', 'N/A')
title = payload.get('title', 'N/A')
score = result['score']
output.append(f" {i}. {code} - {title}")
output.append(f" πŸ’― Score: {score:.4f}")
# Show description if available
desc = payload.get('description', '')
if desc:
desc_preview = desc[:100] + "..." if len(desc) > 100 else desc
output.append(f" πŸ“„ {desc_preview}")
output.append("")
output.append("=" * 90)
return "\n".join(output)
# Example usage
def example_multi_collection_analysis(use_cloud: bool = True):
"""Example of using the multi-collection chapter analysis"""
test_cases = [
"severe chest pain with shortness of breath",
"type 2 diabetes with kidney complications",
"depression and anxiety disorder",
"broken wrist from falling",
"acute appendicitis with fever",
"skin cancer melanoma",
"pregnancy complications in third trimester"
]
for diagnostic in test_cases:
print(f"\n{'='*100}")
print(f"πŸ” ANALYZING: {diagnostic}")
print(f"{'='*100}")
try:
# Step 1: Analyze chapter relevance
analysis = analyze_diagnostic_chapters(diagnostic, detailed=False, use_cloud=use_cloud)
print(analysis)
# Step 2: Get top relevant chapters
top_chapters = get_relevant_chapters(diagnostic, top_n=3, use_cloud=use_cloud)
print(f"\nπŸ† Top 3 relevant chapters: {top_chapters}")
# Step 3: Smart search in those chapters
search_results = smart_diagnostic_search(
diagnostic,
results_per_sentence=5,
use_cloud=use_cloud
)
formatted_results = format_smart_search_results(
diagnostic,
search_results,
use_cloud=use_cloud
)
print(formatted_results)
except Exception as e:
print(f"❌ Error processing '{diagnostic}': {e}")
continue
def test_cloud_connection():
"""Test Qdrant Cloud connection and basic functionality"""
print("πŸ§ͺ Testing Qdrant Cloud Connection...")
try:
retriever = MultiCollectionChapterRetrieval(use_cloud=True)
# Test basic search
test_query = "heart disease"
print(f"\nπŸ”¬ Testing with query: '{test_query}'")
# Get collections
collections = retriever.get_chapter_collections()
print(f"πŸ“Š Available collections: {len(collections)}")
if collections:
# Test search
top_chapters = retriever.get_top_chapters(test_query, top_n=3)
print(f"🎯 Top chapters for '{test_query}': {[ch[0] for ch in top_chapters]}")
print("βœ… Cloud connection test successful!")
return True
else:
print("⚠️ No collections found")
return False
except Exception as e:
print(f"❌ Cloud connection test failed: {e}")
return False
if __name__ == "__main__":
# Test cloud connection first
if test_cloud_connection():
print("\n" + "="*100)
print("πŸš€ Running example analysis with Qdrant Cloud...")
print("="*100)
# Run examples with cloud
example_multi_collection_analysis(use_cloud=True)
else:
print("❌ Skipping examples due to connection issues")
# Or use directly:
# chapters = get_relevant_chapters("heart attack symptoms", use_cloud=True)
# results = smart_diagnostic_search("heart attack symptoms", use_cloud=True)
# print(format_smart_search_results("heart attack symptoms", results, use_cloud=True))