derm-ai / app /services /tools.py
muhammadnoman76's picture
update
2c8665a
import io
import logging
import os
from pathlib import Path
from typing import Any, Dict, List, Optional
from PIL import Image
from app.services.vector_database_search import VectorDatabaseSearch
from app.services.websearch import WebSearch
from MagicConvert import MagicConvert
from app.services.image_classification_vit import SkinDiseaseClassifier
try:
from pillow_heif import register_heif_opener
register_heif_opener()
_HEIF_SUPPORTED = True
except Exception:
_HEIF_SUPPORTED = False
logger = logging.getLogger(__name__)
def _clean_query(query: str) -> str:
return (query or '').strip()
_UPLOADS_ROOT = Path(
os.getenv(
"DERMAI_UPLOAD_DIR",
Path(__file__).resolve().parent.parent.parent / "uploads",
)
).resolve()
_magic_converter = MagicConvert()
_skin_classifier: Optional[SkinDiseaseClassifier] = None
_skin_classifier_error: Optional[str] = None
def get_web_search(query: str, num_results: int = 4) -> Dict[str, Any]:
"""Return up-to-date dermatology information from the public web."""
query = _clean_query(query)
if not query:
return {"status": "error", "error_message": "Query is required."}
try:
web = WebSearch(num_results=max(1, min(num_results or 4, 8)))
raw_results = web.search(query) or []
formatted: List[Dict[str, Any]] = []
references: List[str] = []
for idx, item in enumerate(raw_results, start=1):
link = item.get('link') or item.get('url') or ''
snippet = item.get('text') or item.get('snippet') or ''
title = item.get('title') or ''
entry = {
"source_number": idx,
"title": title,
"link": link,
"snippet": snippet,
}
formatted.append(entry)
if link:
references.append(link)
if not formatted:
return {
"status": "error",
"error_message": f"No web results found for '{query}'.",
}
return {
"status": "success",
"results": formatted,
"references": references,
}
except Exception as exc:
logger.exception("Web search failed: %s", exc)
return {
"status": "error",
"error_message": f"Web search failed: {exc}",
}
def get_vector_search(query: str, top_k: int = 5) -> Dict[str, Any]:
"""Return dermatology knowledge from the curated vector database."""
query = _clean_query(query)
if not query:
return {"status": "error", "error_message": "Query is required."}
try:
vector = VectorDatabaseSearch()
if not vector.is_available():
return {
"status": "error",
"error_message": "Vector database is not available.",
}
raw_results = vector.search(query, top_k=max(1, min(top_k or 5, 10)))
if not raw_results:
return {
"status": "error",
"error_message": f"No vector results found for '{query}'.",
}
formatted: List[Dict[str, Any]] = []
references: List[str] = []
for idx, item in enumerate(raw_results, start=1):
source = item.get('source') or 'Unknown'
page = item.get('page') or 0
content = item.get('content') or ''
confidence = item.get('confidence')
formatted.append(
{
"source_number": idx,
"source": source,
"page": page,
"content": content,
"confidence": confidence,
}
)
ref_label = f"{source} (page {page})" if page else source
references.append(ref_label)
return {
"status": "success",
"results": formatted,
"references": references,
}
except Exception as exc:
logger.exception("Vector search failed: %s", exc)
return {
"status": "error",
"error_message": f"Vector search failed: {exc}",
}
def get_image_search(query: str, max_images: int = 3) -> Dict[str, Any]:
"""Return dermatology-relevant image URLs for the given query."""
query = _clean_query(query)
if not query:
return {"status": "error", "error_message": "Query is required."}
try:
searcher = WebSearch(max_images=max(1, min(max_images or 3, 8)))
images = searcher.search_images(query) or []
unique_images = []
seen = set()
for url in images:
if url and url not in seen:
seen.add(url)
unique_images.append(url)
if len(unique_images) >= max_images:
break
if not unique_images:
return {
"status": "error",
"error_message": f"No images found for '{query}'.",
}
return {"status": "success", "images": unique_images}
except Exception as exc:
logger.exception("Image search failed: %s", exc)
return {
"status": "error",
"error_message": f"Image search failed: {exc}",
}
def _get_classifier() -> SkinDiseaseClassifier:
global _skin_classifier, _skin_classifier_error
if _skin_classifier is not None:
return _skin_classifier
if _skin_classifier_error:
raise RuntimeError(_skin_classifier_error)
try:
_skin_classifier = SkinDiseaseClassifier()
return _skin_classifier
except Exception as exc:
_skin_classifier_error = str(exc)
raise
def analyze_skin_image(
file_path: str,
language: Optional[str] = None,
) -> Dict[str, Any]:
"""Assess an uploaded image for dermatology analysis.
The tool verifies the file exists in the uploads directory and runs the skin
disease classifier directly. When confidence is below the 50% threshold it
reports the uncertainty instead of a diagnosis and nudges the user toward
alternative options.
"""
if not file_path or not str(file_path).strip():
return {"status": "error", "error_message": "file_path is required."}
try:
candidate = Path(file_path)
if not candidate.is_absolute():
candidate = (_UPLOADS_ROOT / candidate).resolve()
else:
candidate = candidate.resolve()
uploads_root = _UPLOADS_ROOT
uploads_root.mkdir(parents=True, exist_ok=True)
uploads_root = uploads_root.resolve()
if uploads_root not in candidate.parents and candidate != uploads_root:
return {
"status": "error",
"error_message": "Access to the requested file path is not permitted.",
}
if not candidate.exists() or not candidate.is_file():
return {
"status": "error",
"error_message": f"Image not found at '{candidate}'.",
}
try:
with candidate.open("rb") as fh:
image_bytes = fh.read()
image_stream = io.BytesIO(image_bytes)
pil_image = Image.open(image_stream)
pil_image.load()
pil_image = pil_image.convert("RGB")
except Exception as exc:
logger.exception("Unable to open image for analysis: %s", exc)
signature = image_bytes[:12] if 'image_bytes' in locals() else b''
looks_like_heif = signature.startswith(b"\x00\x00\x00\x20ftyp") and any(
codec in signature for codec in (b"heic", b"heix", b"hevc", b"avif")
)
if looks_like_heif and not _HEIF_SUPPORTED:
return {
"status": "error",
"error_message": (
"The uploaded image appears to be in HEIC/AVIF format, which is not supported. "
"Please convert the photo to JPG or PNG and try again."
),
}
return {
"status": "error",
"error_message": f"Unable to open the image: {exc}",
}
user_language = (language or "english").strip().lower()
# Skip skin-vs-non-skin classification and directly proceed to disease classification
try:
classifier = _get_classifier()
except Exception as exc:
logger.error("Skin classifier unavailable: %s", exc)
return {
"status": "error",
"error_message": (
"Skin analysis is temporarily unavailable. "
"Ensure the classifier weights are accessible (set SKIN_CLASSIFIER_WEIGHTS to a local file "
"or configure HUGGINGFACE_TOKEN with network access) and try again."
),
"details": str(exc),
}
disease_name, confidence = classifier.predict(pil_image, 5)
confidence_value = float(confidence)
below_threshold = confidence_value < 50.0
if user_language == "urdu":
unable_message = (
"معذرت، میں اس وقت جلد کی بیماری کی درست شناخت نہیں کر پا رہا۔ "
"براہ کرم بہتر روشنی میں ایک قریب کی تصویر اپ لوڈ کریں یا اپنی تشخیص کے لیے ڈاکٹر سے رجوع کریں۔"
)
diagnosis_message = (
f"مجھے لگتا ہے کہ یہ {disease_name} ہے اور میری اعتماد کی سطح {confidence_value:.2f}% ہے۔ "
"براہ کرم حتمی تشخیص کے لیے ماہر ڈرماٹولوجسٹ سے مشورہ کریں۔"
)
else:
unable_message = (
"I’m not confident enough to identify a condition from this photo. "
"Please upload a clearer close-up image with good lighting, or consult a dermatologist for an in-person diagnosis."
)
diagnosis_message = (
f"I suspect this may be {disease_name} with a confidence of {confidence_value:.2f}%. "
"Please consult a dermatologist for a professional evaluation and treatment plan."
)
message = unable_message if below_threshold else diagnosis_message
if user_language == "urdu":
advice_lines = ["## تصویری تجزیہ کی بنیاد پر"]
if not below_threshold:
advice_lines.append(
f"- مشتبہ بیماری: {disease_name} (اعتماد {confidence_value:.2f}%)."
)
advice_lines.append(
"- یہ نتیجہ تخمینی ہے، حتمی تشخیص کے لئے ماہر امراض جلد سے رجوع کریں۔"
)
else:
advice_lines.append(
"- ماڈل کا اعتماد 50٪ سے کم ہے، اس لئے قابل اعتماد تشخیص ممکن نہیں۔"
)
advice_lines.append(
"- متاثرہ جلد کی واضح اور روشنی میں تصویر لیں اور فلٹرز سے پرہیز کریں۔"
)
advice_lines.append(
"- اگر علامات بگڑتی یا پھیلتی ہیں تو فوری طبی معائنہ کروائیں۔"
)
else:
advice_lines = ["## Based on the Image Analysis"]
if not below_threshold:
advice_lines.append(
f"- Suspected condition: {disease_name} (confidence {confidence_value:.2f}%)."
)
advice_lines.append(
"- This prediction is probabilistic; please obtain confirmation from a dermatologist."
)
else:
advice_lines.append(
"- The model's confidence is below 50%, so no reliable diagnosis is available."
)
advice_lines.append(
"- Capture well-lit close-up photos of the affected area and avoid heavy filters."
)
advice_lines.append(
"- Seek urgent in-person care if symptoms worsen or spread rapidly."
)
advice = "\n".join(advice_lines)
return {
"status": "success",
"is_skin": True,
"diagnosis": None if below_threshold else disease_name,
"confidence": confidence_value,
"confidence_below_threshold": below_threshold,
"message": message,
"advice": advice,
"image_path": str(candidate.relative_to(uploads_root)).replace("\\", "/"),
}
except Exception as exc:
logger.exception("Unexpected error during image analysis: %s", exc)
return {
"status": "error",
"error_message": f"Unexpected error: {exc}",
}
def convert_document_to_markdown(
file_path: str,
file_extension: Optional[str] = None,
) -> Dict[str, Any]:
"""Convert an uploaded document into Markdown text for downstream analysis.
Args:
file_path: Path to the uploaded file. Accepts absolute paths or paths
relative to the backend uploads directory.
file_extension: Optional hint such as ".pdf" or ".docx" when the
extension cannot be inferred from the filename.
Returns:
A dictionary containing the Markdown representation (`text_content`),
detected title, and basic metadata. On failure the dictionary includes a
descriptive error message instead of raising.
"""
if not file_path or not str(file_path).strip():
return {"status": "error", "error_message": "file_path is required."}
try:
candidate = Path(file_path)
if not candidate.is_absolute():
candidate = (_UPLOADS_ROOT / candidate).resolve()
else:
candidate = candidate.resolve()
uploads_root = _UPLOADS_ROOT
uploads_root.mkdir(parents=True, exist_ok=True)
uploads_root = uploads_root.resolve()
if uploads_root not in candidate.parents and candidate != uploads_root:
return {
"status": "error",
"error_message": "Access to the requested file path is not permitted.",
}
if not candidate.exists() or not candidate.is_file():
return {
"status": "error",
"error_message": f"File not found at '{candidate}'.",
}
extension_hint = file_extension or candidate.suffix
if extension_hint and not extension_hint.startswith('.'):
extension_hint = f".{extension_hint}"
conversion = _magic_converter.magic(
str(candidate),
file_extension=extension_hint,
)
text_content = conversion.text_content if conversion else ""
character_count = len(text_content)
return {
"status": "success",
"text_content": text_content,
"title": getattr(conversion, "title", None),
"character_count": character_count,
"source_path": str(candidate),
}
except Exception as exc:
logger.exception("Unexpected error during document conversion: %s", exc)
return {
"status": "error",
"error_message": f"Unexpected error: {exc}",
}