Spaces:
Sleeping
Sleeping
| import io | |
| import logging | |
| import os | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| from PIL import Image | |
| from app.services.vector_database_search import VectorDatabaseSearch | |
| from app.services.websearch import WebSearch | |
| from MagicConvert import MagicConvert | |
| from app.services.image_classification_vit import SkinDiseaseClassifier | |
| try: | |
| from pillow_heif import register_heif_opener | |
| register_heif_opener() | |
| _HEIF_SUPPORTED = True | |
| except Exception: | |
| _HEIF_SUPPORTED = False | |
| logger = logging.getLogger(__name__) | |
| def _clean_query(query: str) -> str: | |
| return (query or '').strip() | |
| _UPLOADS_ROOT = Path( | |
| os.getenv( | |
| "DERMAI_UPLOAD_DIR", | |
| Path(__file__).resolve().parent.parent.parent / "uploads", | |
| ) | |
| ).resolve() | |
| _magic_converter = MagicConvert() | |
| _skin_classifier: Optional[SkinDiseaseClassifier] = None | |
| _skin_classifier_error: Optional[str] = None | |
| def get_web_search(query: str, num_results: int = 4) -> Dict[str, Any]: | |
| """Return up-to-date dermatology information from the public web.""" | |
| query = _clean_query(query) | |
| if not query: | |
| return {"status": "error", "error_message": "Query is required."} | |
| try: | |
| web = WebSearch(num_results=max(1, min(num_results or 4, 8))) | |
| raw_results = web.search(query) or [] | |
| formatted: List[Dict[str, Any]] = [] | |
| references: List[str] = [] | |
| for idx, item in enumerate(raw_results, start=1): | |
| link = item.get('link') or item.get('url') or '' | |
| snippet = item.get('text') or item.get('snippet') or '' | |
| title = item.get('title') or '' | |
| entry = { | |
| "source_number": idx, | |
| "title": title, | |
| "link": link, | |
| "snippet": snippet, | |
| } | |
| formatted.append(entry) | |
| if link: | |
| references.append(link) | |
| if not formatted: | |
| return { | |
| "status": "error", | |
| "error_message": f"No web results found for '{query}'.", | |
| } | |
| return { | |
| "status": "success", | |
| "results": formatted, | |
| "references": references, | |
| } | |
| except Exception as exc: | |
| logger.exception("Web search failed: %s", exc) | |
| return { | |
| "status": "error", | |
| "error_message": f"Web search failed: {exc}", | |
| } | |
| def get_vector_search(query: str, top_k: int = 5) -> Dict[str, Any]: | |
| """Return dermatology knowledge from the curated vector database.""" | |
| query = _clean_query(query) | |
| if not query: | |
| return {"status": "error", "error_message": "Query is required."} | |
| try: | |
| vector = VectorDatabaseSearch() | |
| if not vector.is_available(): | |
| return { | |
| "status": "error", | |
| "error_message": "Vector database is not available.", | |
| } | |
| raw_results = vector.search(query, top_k=max(1, min(top_k or 5, 10))) | |
| if not raw_results: | |
| return { | |
| "status": "error", | |
| "error_message": f"No vector results found for '{query}'.", | |
| } | |
| formatted: List[Dict[str, Any]] = [] | |
| references: List[str] = [] | |
| for idx, item in enumerate(raw_results, start=1): | |
| source = item.get('source') or 'Unknown' | |
| page = item.get('page') or 0 | |
| content = item.get('content') or '' | |
| confidence = item.get('confidence') | |
| formatted.append( | |
| { | |
| "source_number": idx, | |
| "source": source, | |
| "page": page, | |
| "content": content, | |
| "confidence": confidence, | |
| } | |
| ) | |
| ref_label = f"{source} (page {page})" if page else source | |
| references.append(ref_label) | |
| return { | |
| "status": "success", | |
| "results": formatted, | |
| "references": references, | |
| } | |
| except Exception as exc: | |
| logger.exception("Vector search failed: %s", exc) | |
| return { | |
| "status": "error", | |
| "error_message": f"Vector search failed: {exc}", | |
| } | |
| def get_image_search(query: str, max_images: int = 3) -> Dict[str, Any]: | |
| """Return dermatology-relevant image URLs for the given query.""" | |
| query = _clean_query(query) | |
| if not query: | |
| return {"status": "error", "error_message": "Query is required."} | |
| try: | |
| searcher = WebSearch(max_images=max(1, min(max_images or 3, 8))) | |
| images = searcher.search_images(query) or [] | |
| unique_images = [] | |
| seen = set() | |
| for url in images: | |
| if url and url not in seen: | |
| seen.add(url) | |
| unique_images.append(url) | |
| if len(unique_images) >= max_images: | |
| break | |
| if not unique_images: | |
| return { | |
| "status": "error", | |
| "error_message": f"No images found for '{query}'.", | |
| } | |
| return {"status": "success", "images": unique_images} | |
| except Exception as exc: | |
| logger.exception("Image search failed: %s", exc) | |
| return { | |
| "status": "error", | |
| "error_message": f"Image search failed: {exc}", | |
| } | |
| def _get_classifier() -> SkinDiseaseClassifier: | |
| global _skin_classifier, _skin_classifier_error | |
| if _skin_classifier is not None: | |
| return _skin_classifier | |
| if _skin_classifier_error: | |
| raise RuntimeError(_skin_classifier_error) | |
| try: | |
| _skin_classifier = SkinDiseaseClassifier() | |
| return _skin_classifier | |
| except Exception as exc: | |
| _skin_classifier_error = str(exc) | |
| raise | |
| def analyze_skin_image( | |
| file_path: str, | |
| language: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| """Assess an uploaded image for dermatology analysis. | |
| The tool verifies the file exists in the uploads directory and runs the skin | |
| disease classifier directly. When confidence is below the 50% threshold it | |
| reports the uncertainty instead of a diagnosis and nudges the user toward | |
| alternative options. | |
| """ | |
| if not file_path or not str(file_path).strip(): | |
| return {"status": "error", "error_message": "file_path is required."} | |
| try: | |
| candidate = Path(file_path) | |
| if not candidate.is_absolute(): | |
| candidate = (_UPLOADS_ROOT / candidate).resolve() | |
| else: | |
| candidate = candidate.resolve() | |
| uploads_root = _UPLOADS_ROOT | |
| uploads_root.mkdir(parents=True, exist_ok=True) | |
| uploads_root = uploads_root.resolve() | |
| if uploads_root not in candidate.parents and candidate != uploads_root: | |
| return { | |
| "status": "error", | |
| "error_message": "Access to the requested file path is not permitted.", | |
| } | |
| if not candidate.exists() or not candidate.is_file(): | |
| return { | |
| "status": "error", | |
| "error_message": f"Image not found at '{candidate}'.", | |
| } | |
| try: | |
| with candidate.open("rb") as fh: | |
| image_bytes = fh.read() | |
| image_stream = io.BytesIO(image_bytes) | |
| pil_image = Image.open(image_stream) | |
| pil_image.load() | |
| pil_image = pil_image.convert("RGB") | |
| except Exception as exc: | |
| logger.exception("Unable to open image for analysis: %s", exc) | |
| signature = image_bytes[:12] if 'image_bytes' in locals() else b'' | |
| looks_like_heif = signature.startswith(b"\x00\x00\x00\x20ftyp") and any( | |
| codec in signature for codec in (b"heic", b"heix", b"hevc", b"avif") | |
| ) | |
| if looks_like_heif and not _HEIF_SUPPORTED: | |
| return { | |
| "status": "error", | |
| "error_message": ( | |
| "The uploaded image appears to be in HEIC/AVIF format, which is not supported. " | |
| "Please convert the photo to JPG or PNG and try again." | |
| ), | |
| } | |
| return { | |
| "status": "error", | |
| "error_message": f"Unable to open the image: {exc}", | |
| } | |
| user_language = (language or "english").strip().lower() | |
| # Skip skin-vs-non-skin classification and directly proceed to disease classification | |
| try: | |
| classifier = _get_classifier() | |
| except Exception as exc: | |
| logger.error("Skin classifier unavailable: %s", exc) | |
| return { | |
| "status": "error", | |
| "error_message": ( | |
| "Skin analysis is temporarily unavailable. " | |
| "Ensure the classifier weights are accessible (set SKIN_CLASSIFIER_WEIGHTS to a local file " | |
| "or configure HUGGINGFACE_TOKEN with network access) and try again." | |
| ), | |
| "details": str(exc), | |
| } | |
| disease_name, confidence = classifier.predict(pil_image, 5) | |
| confidence_value = float(confidence) | |
| below_threshold = confidence_value < 50.0 | |
| if user_language == "urdu": | |
| unable_message = ( | |
| "معذرت، میں اس وقت جلد کی بیماری کی درست شناخت نہیں کر پا رہا۔ " | |
| "براہ کرم بہتر روشنی میں ایک قریب کی تصویر اپ لوڈ کریں یا اپنی تشخیص کے لیے ڈاکٹر سے رجوع کریں۔" | |
| ) | |
| diagnosis_message = ( | |
| f"مجھے لگتا ہے کہ یہ {disease_name} ہے اور میری اعتماد کی سطح {confidence_value:.2f}% ہے۔ " | |
| "براہ کرم حتمی تشخیص کے لیے ماہر ڈرماٹولوجسٹ سے مشورہ کریں۔" | |
| ) | |
| else: | |
| unable_message = ( | |
| "I’m not confident enough to identify a condition from this photo. " | |
| "Please upload a clearer close-up image with good lighting, or consult a dermatologist for an in-person diagnosis." | |
| ) | |
| diagnosis_message = ( | |
| f"I suspect this may be {disease_name} with a confidence of {confidence_value:.2f}%. " | |
| "Please consult a dermatologist for a professional evaluation and treatment plan." | |
| ) | |
| message = unable_message if below_threshold else diagnosis_message | |
| if user_language == "urdu": | |
| advice_lines = ["## تصویری تجزیہ کی بنیاد پر"] | |
| if not below_threshold: | |
| advice_lines.append( | |
| f"- مشتبہ بیماری: {disease_name} (اعتماد {confidence_value:.2f}%)." | |
| ) | |
| advice_lines.append( | |
| "- یہ نتیجہ تخمینی ہے، حتمی تشخیص کے لئے ماہر امراض جلد سے رجوع کریں۔" | |
| ) | |
| else: | |
| advice_lines.append( | |
| "- ماڈل کا اعتماد 50٪ سے کم ہے، اس لئے قابل اعتماد تشخیص ممکن نہیں۔" | |
| ) | |
| advice_lines.append( | |
| "- متاثرہ جلد کی واضح اور روشنی میں تصویر لیں اور فلٹرز سے پرہیز کریں۔" | |
| ) | |
| advice_lines.append( | |
| "- اگر علامات بگڑتی یا پھیلتی ہیں تو فوری طبی معائنہ کروائیں۔" | |
| ) | |
| else: | |
| advice_lines = ["## Based on the Image Analysis"] | |
| if not below_threshold: | |
| advice_lines.append( | |
| f"- Suspected condition: {disease_name} (confidence {confidence_value:.2f}%)." | |
| ) | |
| advice_lines.append( | |
| "- This prediction is probabilistic; please obtain confirmation from a dermatologist." | |
| ) | |
| else: | |
| advice_lines.append( | |
| "- The model's confidence is below 50%, so no reliable diagnosis is available." | |
| ) | |
| advice_lines.append( | |
| "- Capture well-lit close-up photos of the affected area and avoid heavy filters." | |
| ) | |
| advice_lines.append( | |
| "- Seek urgent in-person care if symptoms worsen or spread rapidly." | |
| ) | |
| advice = "\n".join(advice_lines) | |
| return { | |
| "status": "success", | |
| "is_skin": True, | |
| "diagnosis": None if below_threshold else disease_name, | |
| "confidence": confidence_value, | |
| "confidence_below_threshold": below_threshold, | |
| "message": message, | |
| "advice": advice, | |
| "image_path": str(candidate.relative_to(uploads_root)).replace("\\", "/"), | |
| } | |
| except Exception as exc: | |
| logger.exception("Unexpected error during image analysis: %s", exc) | |
| return { | |
| "status": "error", | |
| "error_message": f"Unexpected error: {exc}", | |
| } | |
| def convert_document_to_markdown( | |
| file_path: str, | |
| file_extension: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| """Convert an uploaded document into Markdown text for downstream analysis. | |
| Args: | |
| file_path: Path to the uploaded file. Accepts absolute paths or paths | |
| relative to the backend uploads directory. | |
| file_extension: Optional hint such as ".pdf" or ".docx" when the | |
| extension cannot be inferred from the filename. | |
| Returns: | |
| A dictionary containing the Markdown representation (`text_content`), | |
| detected title, and basic metadata. On failure the dictionary includes a | |
| descriptive error message instead of raising. | |
| """ | |
| if not file_path or not str(file_path).strip(): | |
| return {"status": "error", "error_message": "file_path is required."} | |
| try: | |
| candidate = Path(file_path) | |
| if not candidate.is_absolute(): | |
| candidate = (_UPLOADS_ROOT / candidate).resolve() | |
| else: | |
| candidate = candidate.resolve() | |
| uploads_root = _UPLOADS_ROOT | |
| uploads_root.mkdir(parents=True, exist_ok=True) | |
| uploads_root = uploads_root.resolve() | |
| if uploads_root not in candidate.parents and candidate != uploads_root: | |
| return { | |
| "status": "error", | |
| "error_message": "Access to the requested file path is not permitted.", | |
| } | |
| if not candidate.exists() or not candidate.is_file(): | |
| return { | |
| "status": "error", | |
| "error_message": f"File not found at '{candidate}'.", | |
| } | |
| extension_hint = file_extension or candidate.suffix | |
| if extension_hint and not extension_hint.startswith('.'): | |
| extension_hint = f".{extension_hint}" | |
| conversion = _magic_converter.magic( | |
| str(candidate), | |
| file_extension=extension_hint, | |
| ) | |
| text_content = conversion.text_content if conversion else "" | |
| character_count = len(text_content) | |
| return { | |
| "status": "success", | |
| "text_content": text_content, | |
| "title": getattr(conversion, "title", None), | |
| "character_count": character_count, | |
| "source_path": str(candidate), | |
| } | |
| except Exception as exc: | |
| logger.exception("Unexpected error during document conversion: %s", exc) | |
| return { | |
| "status": "error", | |
| "error_message": f"Unexpected error: {exc}", | |
| } |