import io import logging import os from pathlib import Path from typing import Any, Dict, List, Optional from PIL import Image from app.services.vector_database_search import VectorDatabaseSearch from app.services.websearch import WebSearch from MagicConvert import MagicConvert from app.services.image_classification_vit import SkinDiseaseClassifier try: from pillow_heif import register_heif_opener register_heif_opener() _HEIF_SUPPORTED = True except Exception: _HEIF_SUPPORTED = False logger = logging.getLogger(__name__) def _clean_query(query: str) -> str: return (query or '').strip() _UPLOADS_ROOT = Path( os.getenv( "DERMAI_UPLOAD_DIR", Path(__file__).resolve().parent.parent.parent / "uploads", ) ).resolve() _magic_converter = MagicConvert() _skin_classifier: Optional[SkinDiseaseClassifier] = None _skin_classifier_error: Optional[str] = None def get_web_search(query: str, num_results: int = 4) -> Dict[str, Any]: """Return up-to-date dermatology information from the public web.""" query = _clean_query(query) if not query: return {"status": "error", "error_message": "Query is required."} try: web = WebSearch(num_results=max(1, min(num_results or 4, 8))) raw_results = web.search(query) or [] formatted: List[Dict[str, Any]] = [] references: List[str] = [] for idx, item in enumerate(raw_results, start=1): link = item.get('link') or item.get('url') or '' snippet = item.get('text') or item.get('snippet') or '' title = item.get('title') or '' entry = { "source_number": idx, "title": title, "link": link, "snippet": snippet, } formatted.append(entry) if link: references.append(link) if not formatted: return { "status": "error", "error_message": f"No web results found for '{query}'.", } return { "status": "success", "results": formatted, "references": references, } except Exception as exc: logger.exception("Web search failed: %s", exc) return { "status": "error", "error_message": f"Web search failed: {exc}", } def get_vector_search(query: str, top_k: int = 5) -> Dict[str, Any]: """Return dermatology knowledge from the curated vector database.""" query = _clean_query(query) if not query: return {"status": "error", "error_message": "Query is required."} try: vector = VectorDatabaseSearch() if not vector.is_available(): return { "status": "error", "error_message": "Vector database is not available.", } raw_results = vector.search(query, top_k=max(1, min(top_k or 5, 10))) if not raw_results: return { "status": "error", "error_message": f"No vector results found for '{query}'.", } formatted: List[Dict[str, Any]] = [] references: List[str] = [] for idx, item in enumerate(raw_results, start=1): source = item.get('source') or 'Unknown' page = item.get('page') or 0 content = item.get('content') or '' confidence = item.get('confidence') formatted.append( { "source_number": idx, "source": source, "page": page, "content": content, "confidence": confidence, } ) ref_label = f"{source} (page {page})" if page else source references.append(ref_label) return { "status": "success", "results": formatted, "references": references, } except Exception as exc: logger.exception("Vector search failed: %s", exc) return { "status": "error", "error_message": f"Vector search failed: {exc}", } def get_image_search(query: str, max_images: int = 3) -> Dict[str, Any]: """Return dermatology-relevant image URLs for the given query.""" query = _clean_query(query) if not query: return {"status": "error", "error_message": "Query is required."} try: searcher = WebSearch(max_images=max(1, min(max_images or 3, 8))) images = searcher.search_images(query) or [] unique_images = [] seen = set() for url in images: if url and url not in seen: seen.add(url) unique_images.append(url) if len(unique_images) >= max_images: break if not unique_images: return { "status": "error", "error_message": f"No images found for '{query}'.", } return {"status": "success", "images": unique_images} except Exception as exc: logger.exception("Image search failed: %s", exc) return { "status": "error", "error_message": f"Image search failed: {exc}", } def _get_classifier() -> SkinDiseaseClassifier: global _skin_classifier, _skin_classifier_error if _skin_classifier is not None: return _skin_classifier if _skin_classifier_error: raise RuntimeError(_skin_classifier_error) try: _skin_classifier = SkinDiseaseClassifier() return _skin_classifier except Exception as exc: _skin_classifier_error = str(exc) raise def analyze_skin_image( file_path: str, language: Optional[str] = None, ) -> Dict[str, Any]: """Assess an uploaded image for dermatology analysis. The tool verifies the file exists in the uploads directory and runs the skin disease classifier directly. When confidence is below the 50% threshold it reports the uncertainty instead of a diagnosis and nudges the user toward alternative options. """ if not file_path or not str(file_path).strip(): return {"status": "error", "error_message": "file_path is required."} try: candidate = Path(file_path) if not candidate.is_absolute(): candidate = (_UPLOADS_ROOT / candidate).resolve() else: candidate = candidate.resolve() uploads_root = _UPLOADS_ROOT uploads_root.mkdir(parents=True, exist_ok=True) uploads_root = uploads_root.resolve() if uploads_root not in candidate.parents and candidate != uploads_root: return { "status": "error", "error_message": "Access to the requested file path is not permitted.", } if not candidate.exists() or not candidate.is_file(): return { "status": "error", "error_message": f"Image not found at '{candidate}'.", } try: with candidate.open("rb") as fh: image_bytes = fh.read() image_stream = io.BytesIO(image_bytes) pil_image = Image.open(image_stream) pil_image.load() pil_image = pil_image.convert("RGB") except Exception as exc: logger.exception("Unable to open image for analysis: %s", exc) signature = image_bytes[:12] if 'image_bytes' in locals() else b'' looks_like_heif = signature.startswith(b"\x00\x00\x00\x20ftyp") and any( codec in signature for codec in (b"heic", b"heix", b"hevc", b"avif") ) if looks_like_heif and not _HEIF_SUPPORTED: return { "status": "error", "error_message": ( "The uploaded image appears to be in HEIC/AVIF format, which is not supported. " "Please convert the photo to JPG or PNG and try again." ), } return { "status": "error", "error_message": f"Unable to open the image: {exc}", } user_language = (language or "english").strip().lower() # Skip skin-vs-non-skin classification and directly proceed to disease classification try: classifier = _get_classifier() except Exception as exc: logger.error("Skin classifier unavailable: %s", exc) return { "status": "error", "error_message": ( "Skin analysis is temporarily unavailable. " "Ensure the classifier weights are accessible (set SKIN_CLASSIFIER_WEIGHTS to a local file " "or configure HUGGINGFACE_TOKEN with network access) and try again." ), "details": str(exc), } disease_name, confidence = classifier.predict(pil_image, 5) confidence_value = float(confidence) below_threshold = confidence_value < 50.0 if user_language == "urdu": unable_message = ( "معذرت، میں اس وقت جلد کی بیماری کی درست شناخت نہیں کر پا رہا۔ " "براہ کرم بہتر روشنی میں ایک قریب کی تصویر اپ لوڈ کریں یا اپنی تشخیص کے لیے ڈاکٹر سے رجوع کریں۔" ) diagnosis_message = ( f"مجھے لگتا ہے کہ یہ {disease_name} ہے اور میری اعتماد کی سطح {confidence_value:.2f}% ہے۔ " "براہ کرم حتمی تشخیص کے لیے ماہر ڈرماٹولوجسٹ سے مشورہ کریں۔" ) else: unable_message = ( "I’m not confident enough to identify a condition from this photo. " "Please upload a clearer close-up image with good lighting, or consult a dermatologist for an in-person diagnosis." ) diagnosis_message = ( f"I suspect this may be {disease_name} with a confidence of {confidence_value:.2f}%. " "Please consult a dermatologist for a professional evaluation and treatment plan." ) message = unable_message if below_threshold else diagnosis_message if user_language == "urdu": advice_lines = ["## تصویری تجزیہ کی بنیاد پر"] if not below_threshold: advice_lines.append( f"- مشتبہ بیماری: {disease_name} (اعتماد {confidence_value:.2f}%)." ) advice_lines.append( "- یہ نتیجہ تخمینی ہے، حتمی تشخیص کے لئے ماہر امراض جلد سے رجوع کریں۔" ) else: advice_lines.append( "- ماڈل کا اعتماد 50٪ سے کم ہے، اس لئے قابل اعتماد تشخیص ممکن نہیں۔" ) advice_lines.append( "- متاثرہ جلد کی واضح اور روشنی میں تصویر لیں اور فلٹرز سے پرہیز کریں۔" ) advice_lines.append( "- اگر علامات بگڑتی یا پھیلتی ہیں تو فوری طبی معائنہ کروائیں۔" ) else: advice_lines = ["## Based on the Image Analysis"] if not below_threshold: advice_lines.append( f"- Suspected condition: {disease_name} (confidence {confidence_value:.2f}%)." ) advice_lines.append( "- This prediction is probabilistic; please obtain confirmation from a dermatologist." ) else: advice_lines.append( "- The model's confidence is below 50%, so no reliable diagnosis is available." ) advice_lines.append( "- Capture well-lit close-up photos of the affected area and avoid heavy filters." ) advice_lines.append( "- Seek urgent in-person care if symptoms worsen or spread rapidly." ) advice = "\n".join(advice_lines) return { "status": "success", "is_skin": True, "diagnosis": None if below_threshold else disease_name, "confidence": confidence_value, "confidence_below_threshold": below_threshold, "message": message, "advice": advice, "image_path": str(candidate.relative_to(uploads_root)).replace("\\", "/"), } except Exception as exc: logger.exception("Unexpected error during image analysis: %s", exc) return { "status": "error", "error_message": f"Unexpected error: {exc}", } def convert_document_to_markdown( file_path: str, file_extension: Optional[str] = None, ) -> Dict[str, Any]: """Convert an uploaded document into Markdown text for downstream analysis. Args: file_path: Path to the uploaded file. Accepts absolute paths or paths relative to the backend uploads directory. file_extension: Optional hint such as ".pdf" or ".docx" when the extension cannot be inferred from the filename. Returns: A dictionary containing the Markdown representation (`text_content`), detected title, and basic metadata. On failure the dictionary includes a descriptive error message instead of raising. """ if not file_path or not str(file_path).strip(): return {"status": "error", "error_message": "file_path is required."} try: candidate = Path(file_path) if not candidate.is_absolute(): candidate = (_UPLOADS_ROOT / candidate).resolve() else: candidate = candidate.resolve() uploads_root = _UPLOADS_ROOT uploads_root.mkdir(parents=True, exist_ok=True) uploads_root = uploads_root.resolve() if uploads_root not in candidate.parents and candidate != uploads_root: return { "status": "error", "error_message": "Access to the requested file path is not permitted.", } if not candidate.exists() or not candidate.is_file(): return { "status": "error", "error_message": f"File not found at '{candidate}'.", } extension_hint = file_extension or candidate.suffix if extension_hint and not extension_hint.startswith('.'): extension_hint = f".{extension_hint}" conversion = _magic_converter.magic( str(candidate), file_extension=extension_hint, ) text_content = conversion.text_content if conversion else "" character_count = len(text_content) return { "status": "success", "text_content": text_content, "title": getattr(conversion, "title", None), "character_count": character_count, "source_path": str(candidate), } except Exception as exc: logger.exception("Unexpected error during document conversion: %s", exc) return { "status": "error", "error_message": f"Unexpected error: {exc}", }