Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException, Query | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| from pydantic import BaseModel | |
| from typing import List, Optional, Dict, Any | |
| import time | |
| import logging | |
| import pprint | |
| # Import your existing neural searcher and the new multi-collection system | |
| # from neural_searcher import NeuralSearcher | |
| from chapter_retrieval_system_v2 import MultiCollectionChapterRetrieval | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| app = FastAPI( | |
| title="ICD-10 Multi-Collection Search API", | |
| description="Advanced ICD-10 code search with intelligent chapter detection", | |
| version="2.0.0" | |
| ) | |
| # Add CORS middleware for web frontend integration | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Configure this properly for production | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Initialize systems | |
| try: | |
| # Initialize the multi-collection chapter retrieval system | |
| chapter_retriever = MultiCollectionChapterRetrieval() | |
| # Keep your original neural searcher for backward compatibility | |
| # You might not need this if switching fully to multi-collection approach | |
| # neural_searcher = NeuralSearcher(collection_name="icd10_codes_chapter_3") | |
| logger.info("Successfully initialized search systems") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize search systems: {e}") | |
| chapter_retriever = None | |
| # neural_searcher = None | |
| # Pydantic models for request/response validation | |
| class SearchRequest(BaseModel): | |
| query: str | |
| limit: Optional[int] = 10 | |
| score_threshold: Optional[float] = 0.3 | |
| search_mode: Optional[str] = "smart" # "smart", "all_chapters", "specific_chapters" | |
| target_chapters: Optional[List[str]] = None | |
| detailed_analysis: Optional[bool] = False | |
| chapters_per_sentence: Optional[int] = 2 # NEW: How many chapters to search per sentence | |
| class ChapterInfo(BaseModel): | |
| chapter_id: str | |
| collection_name: str | |
| relevance_score: float | |
| description: str | |
| match_count: int | |
| avg_score: float | |
| max_score: float | |
| class SearchResult(BaseModel): | |
| code: str | |
| title: str | |
| description: Optional[str] = None | |
| score: float | |
| chapter_id: Optional[str] = None | |
| collection: str | |
| source_sentence: Optional[str] = None # NEW: Track which sentence generated this result | |
| sentence_key: Optional[str] = None # NEW: Track sentence identifier | |
| class SentenceResults(BaseModel): | |
| sentence_text: str | |
| sentence_key: str | |
| results: List[SearchResult] | |
| total_results: int | |
| class SearchResponse(BaseModel): | |
| query: str | |
| total_results: int | |
| search_time: float | |
| search_mode: str | |
| relevant_chapters: List[ChapterInfo] | |
| results: List[SearchResult] # Keep for backward compatibility | |
| sentence_results: Optional[List[SentenceResults]] = None # NEW: Results grouped by sentence | |
| class ChapterAnalysisResponse(BaseModel): | |
| query: str | |
| analysis_time: float | |
| chapters: List[ChapterInfo] | |
| # Health check endpoint | |
| def health_check(): | |
| """Health check endpoint""" | |
| if chapter_retriever is None: | |
| raise HTTPException(status_code=503, detail="Search system not initialized") | |
| return {"status": "healthy", "timestamp": time.time()} | |
| # Chapter analysis endpoint | |
| def analyze_chapters( | |
| q: str = Query(..., description="Diagnostic query string"), | |
| detailed: bool = Query(False, description="Include detailed chapter statistics") | |
| ): | |
| """ | |
| Analyze which ICD-10 chapters are most relevant for a diagnostic query | |
| """ | |
| if not chapter_retriever: | |
| raise HTTPException(status_code=503, detail="Chapter retrieval system not available") | |
| if not q or not q.strip(): | |
| raise HTTPException(status_code=400, detail="Query parameter 'q' is required") | |
| try: | |
| start_time = time.time() | |
| # Perform chapter analysis | |
| analysis = chapter_retriever.analyze_chapters_parallel( | |
| q.strip(), | |
| sample_size_per_chapter=15, | |
| score_threshold=0.2 | |
| ) | |
| analysis_time = time.time() - start_time | |
| # Convert to response format | |
| chapters = [] | |
| for chapter_id, stats in analysis.items(): | |
| if stats['relevance_score'] > 0.05: # Filter very low relevance | |
| chapter_info = ChapterInfo( | |
| chapter_id=chapter_id, | |
| collection_name=stats['collection_name'], | |
| relevance_score=stats['relevance_score'], | |
| description=chapter_retriever.chapter_info.get(chapter_id, "Unknown chapter"), | |
| match_count=stats['match_count'], | |
| avg_score=stats['avg_score'], | |
| max_score=stats['max_score'] | |
| ) | |
| chapters.append(chapter_info) | |
| return ChapterAnalysisResponse( | |
| query=q, | |
| analysis_time=analysis_time, | |
| chapters=chapters | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error in chapter analysis: {e}") | |
| raise HTTPException(status_code=500, detail=f"Chapter analysis failed: {str(e)}") | |
| # Smart search endpoint (main search functionality) | |
| def search_smart(request: SearchRequest): | |
| """ | |
| Advanced search with intelligent chapter detection and targeted searching | |
| """ | |
| return _perform_search(request) | |
| def search_smart_get( | |
| q: str = Query(..., description="Diagnostic query string"), | |
| limit: int = Query(10, ge=1, le=100, description="Maximum number of results"), | |
| score_threshold: float = Query(0.3, ge=0.0, le=1.0, description="Minimum similarity score"), | |
| search_mode: str = Query("smart", description="Search mode: smart, all_chapters, specific_chapters"), | |
| target_chapters: Optional[str] = Query(None, description="Comma-separated list of target chapters (for specific_chapters mode)"), | |
| detailed_analysis: bool = Query(False, description="Include detailed chapter analysis"), | |
| chapters_per_sentence: int = Query(2, ge=1, le=5, description="Number of chapters to search per sentence") # NEW | |
| ): | |
| """ | |
| Advanced search with intelligent chapter detection (GET version) | |
| """ | |
| # Parse target_chapters if provided | |
| parsed_chapters = None | |
| if target_chapters: | |
| parsed_chapters = [ch.strip() for ch in target_chapters.split(",") if ch.strip()] | |
| request = SearchRequest( | |
| query=q, | |
| limit=limit, | |
| score_threshold=score_threshold, | |
| search_mode=search_mode, | |
| target_chapters=parsed_chapters, | |
| detailed_analysis=detailed_analysis, | |
| chapters_per_sentence=chapters_per_sentence # NEW | |
| ) | |
| return _perform_search(request) | |
| def _perform_search(request: SearchRequest) -> SearchResponse: | |
| """Internal search logic - UPDATED to return top responses for each sentence""" | |
| if not chapter_retriever: | |
| raise HTTPException(status_code=503, detail="Search system not available") | |
| if not request.query or not request.query.strip(): | |
| raise HTTPException(status_code=400, detail="Query is required") | |
| try: | |
| start_time = time.time() | |
| query = request.query.strip() | |
| # Initialize response data | |
| relevant_chapters = [] | |
| results = [] | |
| sentence_results = [] # NEW: For sentence-based results | |
| if request.search_mode == "smart": | |
| # Smart search: auto-identify chapters then search them sentence by sentence | |
| logger.info(f"Performing sentence-based smart search for: '{query}'") | |
| # First, analyze chapters if detailed analysis is requested | |
| if request.detailed_analysis: | |
| analysis = chapter_retriever.analyze_chapters_parallel(query) | |
| for chapter_id, stats in analysis.items(): | |
| if stats['relevance_score'] > 0.1: | |
| chapter_info = ChapterInfo( | |
| chapter_id=chapter_id, | |
| collection_name=stats['collection_name'], | |
| relevance_score=stats['relevance_score'], | |
| description=chapter_retriever.chapter_info.get(chapter_id, "Unknown"), | |
| match_count=stats['match_count'], | |
| avg_score=stats['avg_score'], | |
| max_score=stats['max_score'] | |
| ) | |
| relevant_chapters.append(chapter_info) | |
| # Perform sentence-based targeted search | |
| search_results = chapter_retriever.search_targeted_chapters( | |
| query, | |
| target_chapters=request.target_chapters, | |
| results_per_sentence=request.limit, # Use full limit per sentence | |
| chapters_per_sentence=request.chapters_per_sentence | |
| ) | |
| # NEW: Process results by sentence instead of flattening | |
| sentence_result_map = {} # Track results by sentence | |
| all_results = [] # Keep flattened results for backward compatibility | |
| # Group results by sentence | |
| for chapter_id, chapter_data in search_results.items(): | |
| for sentence_key, sentence_data in chapter_data.items(): | |
| sentence_text = sentence_data['text'] | |
| # Initialize sentence entry if not exists | |
| if sentence_key not in sentence_result_map: | |
| sentence_result_map[sentence_key] = { | |
| 'text': sentence_text, | |
| 'results': [] | |
| } | |
| # Add results for this sentence | |
| for result in sentence_data['results']: | |
| # Create enriched result with metadata | |
| enriched_result = { | |
| **result, | |
| 'chapter_id': chapter_id, | |
| 'source_sentence': sentence_text, | |
| 'sentence_key': sentence_key | |
| } | |
| # Add to sentence-specific results | |
| sentence_result_map[sentence_key]['results'].append(enriched_result) | |
| # Add to flattened results for backward compatibility | |
| all_results.append(enriched_result) | |
| # NEW: Create sentence-based result objects | |
| for sentence_key, sentence_data in sentence_result_map.items(): | |
| # Sort sentence results by score | |
| sentence_data['results'].sort(key=lambda x: x['score'], reverse=True) | |
| # Apply score threshold and limit per sentence | |
| filtered_sentence_results = [ | |
| r for r in sentence_data['results'] | |
| if r['score'] >= request.score_threshold | |
| ][:request.limit] | |
| # Convert to SearchResult objects | |
| sentence_search_results = [] | |
| for result in filtered_sentence_results: | |
| payload = result['payload'] | |
| search_result = SearchResult( | |
| code=payload.get('code', 'N/A'), | |
| title=payload.get('title', 'N/A'), | |
| description=payload.get('description'), | |
| score=result['score'], | |
| chapter_id=result.get('chapter_id'), | |
| collection=result['collection'], | |
| source_sentence=result.get('source_sentence'), | |
| sentence_key=result.get('sentence_key') | |
| ) | |
| sentence_search_results.append(search_result) | |
| # Create SentenceResults object | |
| if sentence_search_results: # Only include sentences with results | |
| sentence_result_obj = SentenceResults( | |
| sentence_text=sentence_data['text'], | |
| sentence_key=sentence_key, | |
| results=sentence_search_results, | |
| total_results=len(sentence_search_results) | |
| ) | |
| sentence_results.append(sentence_result_obj) | |
| # Sort sentence results by average score (optional) | |
| sentence_results.sort( | |
| key=lambda x: sum(r.score for r in x.results) / len(x.results) if x.results else 0, | |
| reverse=True | |
| ) | |
| # Process flattened results for backward compatibility | |
| all_results.sort(key=lambda x: x['score'], reverse=True) | |
| all_results = all_results[:request.limit] | |
| elif request.search_mode == "all_chapters": | |
| # Handle other search modes (keeping original logic) | |
| # You can implement similar sentence-based logic here if needed | |
| logger.info("All chapters search mode - using original logic") | |
| # ... implement if needed | |
| elif request.search_mode == "specific_chapters": | |
| # Handle specific chapters mode | |
| logger.info("Specific chapters search mode - using original logic") | |
| # ... implement if needed | |
| else: | |
| raise HTTPException(status_code=400, detail=f"Unknown search mode: {request.search_mode}") | |
| # Convert flattened results to response format (for backward compatibility) | |
| for result in all_results: | |
| if result['score'] >= request.score_threshold: | |
| payload = result['payload'] | |
| search_result = SearchResult( | |
| code=payload.get('code', 'N/A'), | |
| title=payload.get('title', 'N/A'), | |
| description=payload.get('description'), | |
| score=result['score'], | |
| chapter_id=result.get('chapter_id'), | |
| collection=result['collection'], | |
| source_sentence=result.get('source_sentence'), | |
| sentence_key=result.get('sentence_key') | |
| ) | |
| results.append(search_result) | |
| search_time = time.time() - start_time | |
| logger.info(f"Sentence-based search completed: {len(results)} total results, {len(sentence_results)} sentences in {search_time:.3f}s") | |
| # Debug output | |
| logger.info(f"Sentence results breakdown:") | |
| for sent_result in sentence_results: | |
| logger.info(f" '{sent_result.sentence_text}': {sent_result.total_results} results") | |
| return SearchResponse( | |
| query=query, | |
| total_results=len(results), | |
| search_time=search_time, | |
| search_mode=request.search_mode, | |
| relevant_chapters=relevant_chapters, | |
| results=results, # Flattened results for backward compatibility | |
| sentence_results=sentence_results # NEW: Results organized by sentence | |
| ) | |
| except Exception as e: | |
| logger.error(f"Search error: {e}") | |
| raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}") | |
| # Backward compatibility endpoint (your original endpoint) | |
| # @app.get("/api/search/legacy") | |
| # def search_legacy(q: str): | |
| # """ | |
| # Legacy search endpoint for backward compatibility | |
| # Uses your original neural searcher | |
| # """ | |
| # # if not neural_searcher: | |
| # # raise HTTPException(status_code=503, detail="Legacy search system not available") | |
| # if not q or not q.strip(): | |
| # raise HTTPException(status_code=400, detail="Query parameter 'q' is required") | |
| # try: | |
| # result = neural_searcher.search(text=q.strip()) | |
| # return {"result": result} | |
| # except Exception as e: | |
| # logger.error(f"Legacy search error: {e}") | |
| # raise HTTPException(status_code=500, detail=f"Legacy search failed: {str(e)}") | |
| # Get available chapters | |
| def get_available_chapters(): | |
| """ | |
| Get list of available ICD-10 chapters and their descriptions | |
| """ | |
| if not chapter_retriever: | |
| raise HTTPException(status_code=503, detail="Chapter system not available") | |
| try: | |
| chapter_collections = chapter_retriever.get_chapter_collections() | |
| chapters = [] | |
| for chapter_id, collection_name in chapter_collections.items(): | |
| description = chapter_retriever.chapter_info.get(chapter_id, "Unknown chapter") | |
| chapters.append({ | |
| "chapter_id": chapter_id, | |
| "collection_name": collection_name, | |
| "description": description | |
| }) | |
| return { | |
| "total_chapters": len(chapters), | |
| "chapters": chapters | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting chapters: {e}") | |
| raise HTTPException(status_code=500, detail=f"Failed to get chapters: {str(e)}") | |
| # Get search suggestions/autocomplete (optional enhancement) | |
| def get_search_suggestions( | |
| q: str = Query(..., min_length=2, description="Partial query for suggestions"), | |
| limit: int = Query(5, ge=1, le=20, description="Maximum number of suggestions") | |
| ): | |
| """ | |
| Get search suggestions based on partial query | |
| This is a simple implementation - you might want to enhance this | |
| """ | |
| # Simple keyword-based suggestions | |
| # In a real implementation, you might use a more sophisticated approach | |
| common_terms = [ | |
| "chest pain", "shortness of breath", "diabetes", "hypertension", | |
| "pneumonia", "fracture", "depression", "anxiety", "fever", | |
| "headache", "abdominal pain", "nausea", "vomiting", "infection", | |
| "cancer", "tumor", "heart attack", "stroke", "asthma" | |
| ] | |
| query_lower = q.lower().strip() | |
| suggestions = [term for term in common_terms if query_lower in term.lower()] | |
| return {"suggestions": suggestions[:limit]} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| # Run with more configuration options | |
| uvicorn.run( | |
| app, | |
| host="0.0.0.0", | |
| port=8000, | |
| log_level="info", | |
| access_log=True | |
| ) |