Spaces:
Sleeping
Sleeping
| # derm-ai-backend | |
| Generated on: C:\Work\derm-ai\derm-ai-backend | |
| ## Project Structure | |
| ``` | |
| derm-ai-backend/ | |
| ├── app | |
| │ ├── config | |
| │ │ ├── __init__.py | |
| │ │ └── config.py | |
| │ ├── database | |
| │ │ ├── __init__.py | |
| │ │ ├── database_query.py | |
| │ │ └── db.py | |
| │ ├── middleware | |
| │ │ └── auth.py | |
| │ ├── routers | |
| │ │ ├── admin.py | |
| │ │ ├── agent_chat.py | |
| │ │ ├── auth.py | |
| │ │ ├── chat.py | |
| │ │ ├── chat_session.py | |
| │ │ ├── language.py | |
| │ │ ├── location.py | |
| │ │ ├── preferences.py | |
| │ │ ├── profile.py | |
| │ │ └── questionnaire.py | |
| │ ├── services | |
| │ │ ├── RAG_evaluation.py | |
| │ │ ├── __init__.py | |
| │ │ ├── agentic_prompt.py | |
| │ │ ├── chathistory.py | |
| │ │ ├── environmental_condition.py | |
| │ │ ├── google_agent_service.py | |
| │ │ ├── image_classification_vit.py | |
| │ │ ├── llm_model.py | |
| │ │ ├── prompts.py | |
| │ │ ├── skincare_scheduler.py | |
| │ │ ├── tools.py | |
| │ │ ├── vector_database_search.py | |
| │ │ ├── websearch.py | |
| │ │ └── wheel.py | |
| │ ├── __init__.py | |
| │ └── main.py | |
| ├── Dockerfile | |
| ├── LICENSE | |
| ├── Makefile | |
| ├── README.md | |
| ├── app.py | |
| ├── docker-compose.yml | |
| ├── document_code.py | |
| └── pyproject.toml | |
| ``` | |
| ## Source Code | |
| ### app\__init__.py | |
| ```python | |
| # app/__init__.py | |
| from app.main import app | |
| __all__ = [ | |
| "app", | |
| ] | |
| ``` | |
| ### app\config\__init__.py | |
| ```python | |
| from app.config.config import Config | |
| config = Config() | |
| ``` | |
| ### app\config\config.py | |
| ```python | |
| import os | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| class Config: | |
| JWT_SECRET_KEY = os.getenv('JWT_SECRET_KEY') | |
| JWT_ACCESS_TOKEN_EXPIRES = int(os.getenv('JWT_ACCESS_TOKEN_EXPIRES')) | |
| CORS_ORIGINS = ["http://localhost:3000"] | |
| UPLOAD_FOLDER = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'temp') | |
| ``` | |
| ### app\database\__init__.py | |
| ```python | |
| from app.database.db import get_db, db | |
| from app.database.database_query import DatabaseQuery | |
| __all__ = ["get_db", "db", "DatabaseQuery"] | |
| ``` | |
| ### app\database\database_query.py | |
| ```python | |
| from app.database.db import db | |
| import re | |
| from bson import ObjectId | |
| from datetime import datetime, timezone, timedelta | |
| from pymongo import DESCENDING | |
| from typing import Optional | |
| class DatabaseQuery: | |
| def __init__(self): | |
| pass | |
| def create_chat_session(self, chat_session): | |
| try: | |
| db.chat_sessions.insert_one(chat_session) | |
| except Exception as e: | |
| raise Exception(f"Error creating chat session: {str(e)}") | |
| def get_user_chat_sessions(self, user_id): | |
| try: | |
| sessions = list(db.chat_sessions.find( | |
| {"user_id": user_id}, | |
| {"_id": 0} | |
| ).sort("last_accessed", -1)) | |
| return sessions | |
| except Exception as e: | |
| raise Exception(f"Error retrieving user chat sessions: {str(e)}") | |
| def create_chat(self, chat_data): | |
| try: | |
| db.chats.insert_one(chat_data) | |
| return True | |
| except Exception as e: | |
| raise Exception(f"Error creating chat: {str(e)}") | |
| def update_last_accessed_time(self, session_id): | |
| try: | |
| db.chat_sessions.update_one( | |
| {"session_id": session_id}, | |
| {"$set": {"last_accessed": datetime.now(timezone.utc)}} | |
| ) | |
| except Exception as e: | |
| raise Exception(f"Error updating last accessed time: {str(e)}") | |
| def get_session_chats(self, session_id, user_id): | |
| try: | |
| chats = list(db.chats.find( | |
| {"session_id": session_id, "user_id": user_id}, | |
| {"_id": 0} | |
| ).sort("timestamp", 1)) | |
| return chats | |
| except Exception as e: | |
| raise Exception(f"Error retrieving session chats: {str(e)}") | |
| def get_user_by_identifier(self, identifier): | |
| try: | |
| user = db.users.find_one({'$or': [{'username': identifier}, {'email': identifier}]}) | |
| return user | |
| except Exception as e: | |
| raise Exception(f"Error retrieving user by identifier: {str(e)}") | |
| def add_token_to_blacklist(self, jti): | |
| try: | |
| db.blacklist.insert_one({'jti': jti}) | |
| except Exception as e: | |
| raise Exception(f"Error adding token to blacklist: {str(e)}") | |
| def create_indexes(self): | |
| try: | |
| db.chat_sessions.create_index([("user_id", 1), ("last_accessed", -1)]) | |
| db.chat_sessions.create_index([("session_id", 1)]) | |
| db.chats.create_index([("session_id", 1), ("timestamp", 1)]) | |
| db.chats.create_index([("user_id", 1)]) | |
| except Exception as e: | |
| raise Exception(f"Error creating indexes: {str(e)}") | |
| def check_chat_session(self, session_id): | |
| try: | |
| chat_session = db.chat_sessions.find_one({'session_id': session_id}) | |
| return chat_session is not None | |
| except Exception as e: | |
| raise Exception(f"Error checking chat session: {str(e)}") | |
| def get_user_profile(self, username): | |
| try: | |
| user = db.users.find_one({'username': username}, {'password': 0}) | |
| return user | |
| except Exception as e: | |
| raise Exception(f"Error getting user profile: {str(e)}") | |
| def update_user_profile(self, username, update_fields): | |
| try: | |
| result = db.users.update_one( | |
| {'username': username}, | |
| {'$set': update_fields} | |
| ) | |
| return result.modified_count > 0 | |
| except Exception as e: | |
| raise Exception(f"Error updating user profile: {str(e)}") | |
| def delete_user_account(self, username): | |
| try: | |
| result = db.users.delete_one({'username': username}) | |
| return result.deleted_count > 0 | |
| except Exception as e: | |
| raise Exception(f"Error deleting user account: {str(e)}") | |
| def is_username_or_email_exists(self, username, email): | |
| try: | |
| user = db.users.find_one({'$or': [{'username': username}, {'email': email}]}) | |
| return user is not None | |
| except Exception as e: | |
| raise Exception(f"Error checking if username or email exists: {str(e)}") | |
| def create_or_update_temp_user(self, username, email, temp_user_data): | |
| try: | |
| db.temp_users.update_one( | |
| {'$or': [{'username': username}, {'email': email}]}, | |
| {'$set': temp_user_data}, | |
| upsert=True | |
| ) | |
| except Exception as e: | |
| raise Exception(f"Error creating/updating temp user: {str(e)}") | |
| def get_temp_user_by_username(self, username): | |
| try: | |
| temp_user = db.temp_users.find_one({'username': username}) | |
| return temp_user | |
| except Exception as e: | |
| raise Exception(f"Error retrieving temp user by username: {str(e)}") | |
| def delete_temp_user(self, username): | |
| try: | |
| db.temp_users.delete_one({'username': username}) | |
| except Exception as e: | |
| raise Exception(f"Error deleting temp user: {str(e)}") | |
| def create_user_from_data(self, user_data): | |
| try: | |
| db.users.insert_one(user_data) | |
| return user_data | |
| except Exception as e: | |
| raise Exception(f"Error creating user from data: {str(e)}") | |
| def create_user(self, username, email, hashed_password, name, age, created_at, | |
| is_verified=False, verification_code=None, code_expiration=None): | |
| try: | |
| new_user = { | |
| 'username': username, | |
| 'email': email, | |
| 'password': hashed_password, | |
| 'name': name, | |
| 'age': age, | |
| 'created_at': created_at, | |
| 'is_verified': is_verified | |
| } | |
| if verification_code and code_expiration: | |
| new_user['verification_code'] = verification_code | |
| new_user['code_expiration'] = code_expiration | |
| db.users.insert_one(new_user) | |
| return new_user | |
| except Exception as e: | |
| raise Exception(f"Error creating user: {str(e)}") | |
| def get_user_by_username(self, username): | |
| try: | |
| user = db.users.find_one({'username': username}) | |
| return user | |
| except Exception as e: | |
| raise Exception(f"Error retrieving user by username: {str(e)}") | |
| def verify_user_email(self, username): | |
| try: | |
| result = db.users.update_one( | |
| {'username': username}, | |
| {'$set': {'is_verified': True}, '$unset': {'verification_code': '', 'code_expiration': ''}} | |
| ) | |
| return result.modified_count > 0 | |
| except Exception as e: | |
| raise Exception(f"Error verifying user email: {str(e)}") | |
| def update_verification_code(self, username, verification_code, code_expiration): | |
| try: | |
| result = db.users.update_one( | |
| {'username': username}, | |
| {'$set': {'verification_code': verification_code, 'code_expiration': code_expiration}} | |
| ) | |
| return result.modified_count > 0 | |
| except Exception as e: | |
| raise Exception(f"Error updating verification code: {str(e)}") | |
| def is_valid_email(self, email): | |
| try: | |
| email_regex = r'^\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,4}\b' | |
| return re.match(email_regex, email) is not None | |
| except Exception as e: | |
| raise Exception(f"Error validating email: {str(e)}") | |
| def add_or_update_location(self, username, location): | |
| try: | |
| db.locations.update_one( | |
| {'username': username}, | |
| {'$set': {'location': location, 'updated_at': datetime.now(timezone.utc)}}, | |
| upsert=True | |
| ) | |
| except Exception as e: | |
| raise Exception(f"Error adding/updating location: {str(e)}") | |
| def get_location(self, username): | |
| try: | |
| location = db.locations.find_one({'username': username}) | |
| return location | |
| except Exception as e: | |
| raise Exception(f"Error retrieving location: {str(e)}") | |
| def submit_questionnaire(self, user_id, answers): | |
| try: | |
| questionnaire_data = { | |
| 'user_id': user_id, | |
| 'answers': answers, | |
| 'created_at': datetime.now(timezone.utc), | |
| 'updated_at': datetime.now(timezone.utc) | |
| } | |
| result = db.questionnaires.insert_one(questionnaire_data) | |
| return str(result.inserted_id) | |
| except Exception as e: | |
| raise Exception(f"Error submitting questionnaire: {str(e)}") | |
| def get_latest_questionnaire(self, user_id): | |
| try: | |
| questionnaire = db.questionnaires.find_one( | |
| {'user_id': user_id}, | |
| sort=[('created_at', -1)] | |
| ) | |
| if questionnaire: | |
| questionnaire['_id'] = str(questionnaire['_id']) | |
| return questionnaire | |
| except Exception as e: | |
| raise Exception(f"Error getting latest questionnaire: {str(e)}") | |
| def update_questionnaire(self, questionnaire_id, user_id, answers): | |
| try: | |
| result = db.questionnaires.update_one( | |
| {'_id': ObjectId(questionnaire_id), 'user_id': user_id}, | |
| { | |
| '$set': { | |
| 'answers': answers, | |
| 'updated_at': datetime.now(timezone.utc) | |
| } | |
| } | |
| ) | |
| return result.modified_count > 0 | |
| except Exception as e: | |
| raise Exception(f"Error updating questionnaire: {str(e)}") | |
| def delete_questionnaire(self, questionnaire_id, user_id): | |
| try: | |
| result = db.questionnaires.delete_one( | |
| {'_id': ObjectId(questionnaire_id), 'user_id': user_id} | |
| ) | |
| return result.deleted_count > 0 | |
| except Exception as e: | |
| raise Exception(f"Error deleting questionnaire: {str(e)}") | |
| def count_answered_questions(self, username): | |
| try: | |
| answered_count = db.questions.count_documents({ | |
| 'username': username, | |
| 'answer': {'$ne': None} | |
| }) | |
| return answered_count | |
| except Exception as e: | |
| raise Exception(f"Error counting answered questions: {str(e)}") | |
| def get_user_preferences(self, username): | |
| try: | |
| user_preferences = db.preferences.find_one({'username': username}) | |
| if not user_preferences: | |
| return { | |
| 'keywords': False, | |
| 'references': False, | |
| 'websearch': False, | |
| 'personalized_recommendations': False, | |
| 'environmental_recommendations': False | |
| } | |
| return { | |
| 'keywords': user_preferences.get('keywords', False), | |
| 'references': user_preferences.get('references', False), | |
| 'websearch': user_preferences.get('websearch', False), | |
| 'personalized_recommendations': user_preferences.get('personalized_recommendations', False), | |
| 'environmental_recommendations': user_preferences.get('environmental_recommendations', False) | |
| } | |
| except Exception as e: | |
| raise Exception(f"Error getting user preferences: {str(e)}") | |
| def set_user_preferences(self, username, preferences): | |
| try: | |
| preferences_data = { | |
| 'username': username, | |
| 'keywords': bool(preferences.get('keywords', False)), | |
| 'references': bool(preferences.get('references', False)), | |
| 'websearch': bool(preferences.get('websearch', False)), | |
| 'personalized_recommendations': bool(preferences.get('personalized_recommendations', False)), | |
| 'environmental_recommendations': bool(preferences.get('environmental_recommendations', False)), | |
| 'updated_at': datetime.now(timezone.utc) | |
| } | |
| result = db.preferences.update_one( | |
| {'username': username}, | |
| {'$set': preferences_data}, | |
| upsert=True | |
| ) | |
| return preferences_data | |
| except Exception as e: | |
| raise Exception(f"Error setting user preferences: {str(e)}") | |
| def get_user_theme(self, username): | |
| try: | |
| user_theme = db.user_themes.find_one({'username': username}) | |
| if not user_theme: | |
| return 'light' | |
| return user_theme.get('theme', 'light') | |
| except Exception as e: | |
| raise Exception(f"Error getting user theme: {str(e)}") | |
| def set_user_theme(self, username, theme): | |
| try: | |
| theme_data = { | |
| 'username': username, | |
| 'theme': "dark" if theme else "light", | |
| 'updated_at': datetime.now(timezone.utc) | |
| } | |
| db.user_themes.update_one( | |
| {'username': username}, | |
| {'$set': theme_data}, | |
| upsert=True | |
| ) | |
| return theme_data | |
| except Exception as e: | |
| raise Exception(f"Error setting user theme: {str(e)}") | |
| def verify_session(self, session_id, user_id): | |
| try: | |
| session = db.chat_sessions.find_one({ | |
| "session_id": session_id, | |
| "user_id": user_id | |
| }) | |
| return session is not None | |
| except Exception as e: | |
| raise Exception(f"Error verifying session: {str(e)}") | |
| def update_chat_session_title(self, session_id, new_title): | |
| try: | |
| result = db.chat_sessions.update_one( | |
| {"session_id": session_id}, | |
| {"$set": {"title": new_title}} | |
| ) | |
| if result.matched_count == 0: | |
| raise Exception("Chat session not found") | |
| return result.modified_count > 0 | |
| except Exception as e: | |
| raise Exception(f"Error updating chat session title: {str(e)}") | |
| def delete_chat_session(self, session_id, user_id): | |
| try: | |
| session_result = db.chat_sessions.delete_one({ | |
| "session_id": session_id, | |
| "user_id": user_id | |
| }) | |
| chats_result = db.chats.delete_many({ | |
| "session_id": session_id, | |
| "user_id": user_id | |
| }) | |
| return { | |
| "session_deleted": session_result.deleted_count > 0, | |
| "chats_deleted": chats_result.deleted_count | |
| } | |
| except Exception as e: | |
| raise Exception(f"Error deleting chat session and chats: {str(e)}") | |
| def delete_all_user_sessions_and_chats(self, user_id): | |
| try: | |
| chats_result = db.chats.delete_many({"user_id": user_id}) | |
| sessions_result = db.chat_sessions.delete_many({"user_id": user_id}) | |
| return { | |
| "deleted_chats": chats_result.deleted_count, | |
| "deleted_sessions": sessions_result.deleted_count | |
| } | |
| except Exception as e: | |
| raise Exception(f"Error deleting user sessions and chats: {str(e)}") | |
| def get_all_user_chats(self, user_id): | |
| try: | |
| sessions = list(db.chat_sessions.find( | |
| {"user_id": user_id}, | |
| {"_id": 0} | |
| ).sort("last_accessed", -1)) | |
| all_chats = [] | |
| for session in sessions: | |
| session_chats = list(db.chats.find( | |
| {"session_id": session["session_id"], "user_id": user_id}, | |
| {"_id": 0} | |
| ).sort("timestamp", 1)) | |
| all_chats.append({ | |
| "session_id": session["session_id"], | |
| "title": session.get("title", "New Chat"), | |
| "created_at": session.get("created_at"), | |
| "last_accessed": session.get("last_accessed"), | |
| "chats": session_chats | |
| }) | |
| return all_chats | |
| except Exception as e: | |
| raise Exception(f"Error retrieving all user chats: {str(e)}") | |
| def store_reset_token(self, email, token, expiration): | |
| try: | |
| db.password_resets.update_one( | |
| {'email': email}, | |
| { | |
| '$set': { | |
| 'token': token, | |
| 'expiration': expiration | |
| } | |
| }, | |
| upsert=True | |
| ) | |
| except Exception as e: | |
| raise Exception(f"Error storing reset token: {str(e)}") | |
| def verify_reset_token(self, token): | |
| try: | |
| reset_info = db.password_resets.find_one({ | |
| 'token': token, | |
| 'expiration': {'$gt': datetime.now(timezone.utc)} | |
| }) | |
| return reset_info | |
| except Exception as e: | |
| raise Exception(f"Error verifying reset token: {str(e)}") | |
| def update_password(self, email, hashed_password): | |
| try: | |
| db.users.update_one( | |
| {'email': email}, | |
| {'$set': {'password': hashed_password}} | |
| ) | |
| except Exception as e: | |
| raise Exception(f"Error updating password: {str(e)}") | |
| def delete_reset_token(self, token): | |
| try: | |
| db.password_resets.delete_one({'token': token}) | |
| except Exception as e: | |
| raise Exception(f"Error deleting reset token: {str(e)}") | |
| def delete_account_permanently(self, username): | |
| try: | |
| chat_deletion_result = self.delete_all_user_sessions_and_chats(username) | |
| preferences_result = db.preferences.delete_one({'username': username}) | |
| theme_result = db.user_themes.delete_one({'username': username}) | |
| location_result = db.locations.delete_one({'username': username}) | |
| questionnaire_result = db.questionnaires.delete_many({'user_id': username}) | |
| user_result = db.users.delete_one({'username': username}) | |
| return { | |
| 'success': True, | |
| 'deleted_data': { | |
| 'chats': chat_deletion_result['deleted_chats'], | |
| 'chat_sessions': chat_deletion_result['deleted_sessions'], | |
| 'preferences': preferences_result.deleted_count, | |
| 'theme': theme_result.deleted_count, | |
| 'location': location_result.deleted_count, | |
| 'questionnaires': questionnaire_result.deleted_count, | |
| 'user_account': user_result.deleted_count | |
| } | |
| } | |
| except Exception as e: | |
| raise Exception(f"Error deleting account permanently: {str(e)}") | |
| def store_reset_token(self, email, token, expiration): | |
| try: | |
| db.password_resets.update_one( | |
| {'email': email}, | |
| { | |
| '$set': { | |
| 'token': token, | |
| 'expiration': expiration | |
| } | |
| }, | |
| upsert=True | |
| ) | |
| except Exception as e: | |
| raise Exception(f"Error storing reset token: {str(e)}") | |
| def verify_reset_token(self, token): | |
| try: | |
| reset_info = db.password_resets.find_one({ | |
| 'token': token, | |
| 'expiration': {'$gt': datetime.now(timezone.utc)} | |
| }) | |
| return reset_info | |
| except Exception as e: | |
| raise Exception(f"Error verifying reset token: {str(e)}") | |
| def update_password(self, email, new_password): | |
| try: | |
| db.users.update_one( | |
| {'email': email}, | |
| {'$set': {'password': new_password}} | |
| ) | |
| except Exception as e: | |
| raise Exception(f"Error updating password: {str(e)}") | |
| def get_user_language(self, user_id): | |
| try: | |
| language = db.languages.find_one({'user_id': user_id}) | |
| return language.get('language') if language else None | |
| except Exception as e: | |
| raise Exception(f"Error retrieving user language: {str(e)}") | |
| def set_user_language(self, user_id, language): | |
| try: | |
| language_data = { | |
| 'user_id': user_id, | |
| 'language': language, | |
| 'updated_at': datetime.now(timezone.utc) | |
| } | |
| result = db.languages.update_one( | |
| {'user_id': user_id}, | |
| {'$set': language_data}, | |
| upsert=True | |
| ) | |
| return language_data | |
| except Exception as e: | |
| raise Exception(f"Error setting user language: {str(e)}") | |
| def delete_user_language(self, user_id): | |
| try: | |
| result = db.languages.delete_one({'user_id': user_id}) | |
| return result.deleted_count > 0 | |
| except Exception as e: | |
| raise Exception(f"Error deleting user language: {str(e)}") | |
| def get_today_schedule(self, user_id): | |
| try: | |
| # Get today's date at midnight UTC | |
| today = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) | |
| tomorrow = today.replace(hour=23, minute=59, second=59) | |
| schedule = db.skin_schedules.find_one({ | |
| "user_id": user_id, | |
| "created_at": { | |
| "$gte": today, | |
| "$lte": tomorrow | |
| } | |
| }) | |
| return schedule | |
| except Exception as e: | |
| raise Exception(f"Error retrieving today's schedule: {str(e)}") | |
| def save_schedule(self, user_id, schedule_data): | |
| try: | |
| existing_schedule = self.get_today_schedule(user_id) | |
| if existing_schedule: | |
| return str(existing_schedule["_id"]) | |
| schedule = { | |
| "user_id": user_id, | |
| "schedule_data": schedule_data, | |
| "created_at": datetime.now(timezone.utc) | |
| } | |
| result = db.skin_schedules.insert_one(schedule) | |
| return str(result.inserted_id) | |
| except Exception as e: | |
| raise Exception(f"Error saving schedule: {str(e)}") | |
| def get_last_seven_days_schedules(self, user_id): | |
| try: | |
| seven_days_ago = datetime.now(timezone.utc) - timedelta(days=7) | |
| schedules = db.skin_schedules.find({ | |
| "user_id": user_id, | |
| "created_at": {"$gte": seven_days_ago} | |
| }).sort("created_at", -1) | |
| return list(schedules) | |
| except Exception as e: | |
| raise Exception(f"Error fetching last 7 days schedules: {str(e)}") | |
| def save_rag_interaction(self, user_id: str, session_id: str, context: str, query: str, | |
| response: str, rag_start_time: datetime, rag_end_time: datetime): | |
| try: | |
| interaction = { | |
| "interaction_id": str(ObjectId()), | |
| "user_id": user_id, | |
| "session_id": session_id, | |
| "context": context, | |
| "query": query, | |
| "response": response, | |
| "rag_start_time": rag_start_time.astimezone(timezone.utc), | |
| "rag_end_time": rag_end_time.astimezone(timezone.utc), | |
| "created_at": datetime.now(timezone.utc) | |
| } | |
| result = db.rag_interactions.insert_one(interaction) | |
| return interaction["interaction_id"] | |
| except Exception as e: | |
| raise Exception(f"Error saving RAG interaction: {str(e)}") | |
| def get_rag_interactions( | |
| self, | |
| user_id: Optional[str] = None, | |
| page: int = 1, | |
| page_size: int = 5 | |
| ) -> dict: | |
| try: | |
| query_filter = {} | |
| if user_id: | |
| query_filter["user_id"] = user_id | |
| skip = (page - 1) * page_size | |
| total = db.rag_interactions.count_documents(query_filter) | |
| interactions = db.rag_interactions.find( | |
| query_filter, | |
| {"_id": 0} | |
| ).sort("created_at", DESCENDING).skip(skip).limit(page_size) | |
| result_list = [] | |
| for interaction in interactions: | |
| interaction["rag_start_time"] = interaction["rag_start_time"].isoformat() | |
| interaction["rag_end_time"] = interaction["rag_end_time"].isoformat() | |
| interaction["created_at"] = interaction["created_at"].isoformat() | |
| result_list.append(interaction) | |
| return { | |
| "total_interactions": total, | |
| "page": page, | |
| "page_size": page_size, | |
| "total_pages": (total + page_size - 1) // page_size, | |
| "results": result_list | |
| } | |
| except Exception as e: | |
| raise Exception(f"Error retrieving RAG interactions: {str(e)}") | |
| def log_image_upload(self, user_id): | |
| """Log an image upload for a user""" | |
| try: | |
| timestamp = datetime.now(timezone.utc) # This is timezone-aware | |
| db.image_uploads.insert_one({ | |
| "user_id": user_id, | |
| "timestamp": timestamp | |
| }) | |
| return True | |
| except Exception as e: | |
| raise Exception(f"Error logging image upload: {str(e)}") | |
| def get_user_daily_uploads(self, user_id): | |
| """Get number of images uploaded by user in the last 24 hours""" | |
| try: | |
| now = datetime.now(timezone.utc) | |
| yesterday = now - timedelta(days=1) | |
| count = db.image_uploads.count_documents({ | |
| "user_id": user_id, | |
| "timestamp": {"$gte": yesterday} | |
| }) | |
| return count | |
| except Exception as e: | |
| raise Exception(f"Error retrieving user daily uploads: {str(e)}") | |
| def get_user_last_upload_time(self, user_id): | |
| """Get the timestamp of user's most recent image upload""" | |
| try: | |
| last_upload = db.image_uploads.find_one( | |
| {"user_id": user_id}, | |
| sort=[("timestamp", DESCENDING)] | |
| ) | |
| return last_upload["timestamp"] if last_upload else None | |
| except Exception as e: | |
| raise Exception(f"Error retrieving last upload time: {str(e)}") | |
| ``` | |
| ### app\database\db.py | |
| ```python | |
| import os | |
| from pymongo.mongo_client import MongoClient | |
| from pymongo.server_api import ServerApi | |
| uri = os.getenv('MONGO_URI') | |
| mongo_uri = os.getenv('MONGO_URI') | |
| if not mongo_uri: | |
| raise ValueError("MONGO_URI environment variable is not set") | |
| def get_db(): | |
| client = MongoClient(uri, server_api=ServerApi('1')) | |
| try: | |
| client.admin.command('ping') | |
| except Exception as e: | |
| print(e) | |
| return client.get_database("dermai") | |
| db = get_db() | |
| ``` | |
| ### app\main.py | |
| ```python | |
| # app/main.py | |
| from fastapi import FastAPI | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| import os | |
| from dotenv import load_dotenv | |
| from app.config.config import Config | |
| from app.routers import admin, auth, chat, location, preferences, profile, questionnaire, language, chat_session | |
| from app.routers import agent_chat | |
| load_dotenv() | |
| app = FastAPI(title="Skin AI API") | |
| # Configure CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Mount static files for uploads | |
| os.makedirs(Config.UPLOAD_FOLDER, exist_ok=True) | |
| app.mount("/uploads", StaticFiles(directory=Config.UPLOAD_FOLDER), name="uploads") | |
| # Register routers | |
| app.include_router(admin.router, prefix="/api", tags=["admin"]) | |
| app.include_router(auth.router, prefix="/api", tags=["auth"]) | |
| app.include_router(chat.router, prefix="/api", tags=["chat"]) | |
| app.include_router(location.router, prefix="/api", tags=["location"]) | |
| app.include_router(preferences.router, prefix="/api", tags=["preferences"]) | |
| app.include_router(profile.router, prefix="/api", tags=["profile"]) | |
| app.include_router(questionnaire.router, prefix="/api", tags=["questionnaire"]) | |
| app.include_router(language.router, prefix="/api", tags=["language"]) | |
| app.include_router(chat_session.router, prefix="/api", tags=["chat_session"]) | |
| app.include_router(agent_chat.router, prefix="/api", tags=["agent_chat"]) | |
| @app.get("/") | |
| async def root(): | |
| return {"message": "API is running", "status": "healthy"} | |
| ``` | |
| ### app\middleware\auth.py | |
| ```python | |
| # app/middleware/auth.py | |
| from fastapi import Depends, HTTPException, status | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| import jwt | |
| from datetime import datetime, timedelta | |
| import os | |
| security = HTTPBearer() | |
| JWT_SECRET_KEY = os.getenv('JWT_SECRET_KEY') | |
| JWT_ACCESS_TOKEN_EXPIRES = int(os.getenv('JWT_ACCESS_TOKEN_EXPIRES')) | |
| def create_access_token(data: dict): | |
| to_encode = data.copy() | |
| expire = datetime.utcnow() + timedelta(seconds=JWT_ACCESS_TOKEN_EXPIRES) | |
| to_encode.update({"exp": expire}) | |
| encoded_jwt = jwt.encode(to_encode, JWT_SECRET_KEY, algorithm="HS256") | |
| return encoded_jwt | |
| def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): | |
| try: | |
| payload = jwt.decode(credentials.credentials, JWT_SECRET_KEY, algorithms=["HS256"]) | |
| username: str = payload.get("sub") | |
| if username is None: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid authentication credentials", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| return username | |
| except jwt.PyJWTError: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid authentication credentials", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| def get_current_user(username: str = Depends(verify_token)): | |
| return username | |
| # For optional JWT authentication (some endpoints allow unauthenticated access) | |
| def get_optional_user(authorization: HTTPAuthorizationCredentials = Depends(security)): | |
| try: | |
| payload = jwt.decode(authorization.credentials, JWT_SECRET_KEY, algorithms=["HS256"]) | |
| username: str = payload.get("sub") | |
| return username | |
| except: | |
| return None | |
| ``` | |
| ### app\routers\admin.py | |
| ```python | |
| # app/routers/admin.py | |
| from fastapi import APIRouter, Depends, HTTPException, UploadFile, File | |
| from typing import List | |
| import os | |
| from app.database.database_query import DatabaseQuery | |
| from app.services.vector_database_search import VectorDatabaseSearch | |
| from app.middleware.auth import get_current_user | |
| from pydantic import BaseModel | |
| router = APIRouter() | |
| vector_db = VectorDatabaseSearch() | |
| query = DatabaseQuery() | |
| TEMP_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'temp') | |
| os.makedirs(TEMP_DIR, exist_ok=True) | |
| class SearchQuery(BaseModel): | |
| query: str | |
| k: int = 5 | |
| @router.get('/books') | |
| async def get_books(username: str = Depends(get_current_user)): | |
| try: | |
| book_info = vector_db.get_book_info() | |
| return { | |
| 'status': 'success', | |
| 'data': book_info | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.post('/books', status_code=201) | |
| async def add_books(files: List[UploadFile] = File(...), username: str = Depends(get_current_user)): | |
| try: | |
| pdf_paths = [] | |
| for file in files: | |
| if file.filename.endswith('.pdf'): | |
| safe_filename = os.path.basename(file.filename) | |
| temp_path = os.path.join(TEMP_DIR, safe_filename) | |
| with open(temp_path, "wb") as buffer: | |
| content = await file.read() | |
| buffer.write(content) | |
| pdf_paths.append(temp_path) | |
| if not pdf_paths: | |
| raise HTTPException(status_code=400, detail="No valid PDF files provided") | |
| success_count = 0 | |
| for pdf_path in pdf_paths: | |
| if vector_db.add_pdf(pdf_path): | |
| success_count += 1 | |
| # Clean up temporary files | |
| for path in pdf_paths: | |
| try: | |
| if os.path.exists(path): | |
| os.remove(path) | |
| except Exception: | |
| pass | |
| return { | |
| 'status': 'success', | |
| 'message': f'Successfully added {success_count} of {len(pdf_paths)} books' | |
| } | |
| except Exception as e: | |
| # Clean up temporary files in case of error | |
| for path in pdf_paths: | |
| try: | |
| if os.path.exists(path): | |
| os.remove(path) | |
| except: | |
| pass | |
| if isinstance(e, HTTPException): | |
| raise e | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.post('/search') | |
| async def search_books(search_data: SearchQuery, username: str = Depends(get_current_user)): | |
| try: | |
| query_text = search_data.query | |
| k = search_data.k | |
| results = vector_db.search( | |
| query=query_text, | |
| top_k=k | |
| ) | |
| return { | |
| 'status': 'success', | |
| 'data': results | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| ``` | |
| ### app\routers\agent_chat.py | |
| ```python | |
| from typing import Optional | |
| import asyncio | |
| import json | |
| import logging | |
| from fastapi import APIRouter, Depends, Header, HTTPException | |
| from fastapi.responses import StreamingResponse | |
| from pydantic import BaseModel | |
| from app.middleware.auth import get_current_user | |
| from app.services.google_agent_service import ( | |
| DEFAULT_MODEL_NAME, | |
| GoogleAgentService, | |
| ) | |
| router = APIRouter() | |
| logger = logging.getLogger(__name__) | |
| class AgentChatDocument(BaseModel): | |
| path: str | |
| name: Optional[str] = None | |
| type: Optional[str] = None | |
| extension: Optional[str] = None | |
| class AgentChatImage(BaseModel): | |
| path: str | |
| name: Optional[str] = None | |
| type: Optional[str] = None | |
| extension: Optional[str] = None | |
| prompt: Optional[str] = None | |
| class AgentChatRequest(BaseModel): | |
| session_id: Optional[str] = None | |
| query: str | |
| document: Optional[AgentChatDocument] = None | |
| image: Optional[AgentChatImage] = None | |
| async def stream_agent_response(agent_service: GoogleAgentService, query: str): | |
| try: | |
| async for event in agent_service.process_message_async(query): | |
| event_type = event.get("type") | |
| if event_type == "chunk": | |
| payload = {"type": "chunk", "content": event.get("content", "")} | |
| elif event_type == "tool_call": | |
| payload = { | |
| "type": "tool_call", | |
| "tool_name": event.get("tool_name"), | |
| "arguments": event.get("arguments", {}), | |
| } | |
| elif event_type == "tool_result": | |
| payload = { | |
| "type": "tool_result", | |
| "tool_name": event.get("tool_name"), | |
| "result": event.get("result", {}), | |
| } | |
| elif event_type == "completed": | |
| payload = { | |
| "type": "completed", | |
| "saved": event.get("saved"), | |
| "session_id": event.get("session_id"), | |
| "response": event.get("response", ""), | |
| "keywords": event.get("keywords", []), | |
| "references": event.get("references", []), | |
| "images": event.get("images", []), | |
| } | |
| elif event_type == "error": | |
| payload = {"type": "error", "message": event.get("content", "")} | |
| else: | |
| payload = {"type": event_type or "unknown", "data": event} | |
| yield f"data: {json.dumps(payload)}\n\n" | |
| await asyncio.sleep(0.001) | |
| yield "data: {\"type\": \"done\"}\n\n" | |
| except Exception as exc: | |
| logger.error("Streaming error: %s", exc, exc_info=True) | |
| yield f"data: {json.dumps({'type': 'error', 'message': str(exc)})}\n\n" | |
| @router.post("/agent-chat") | |
| async def agent_chat( | |
| request: AgentChatRequest, | |
| authorization: str = Header(None), | |
| username: str = Depends(get_current_user), | |
| ): | |
| if not authorization or not authorization.startswith("Bearer "): | |
| raise HTTPException(status_code=401, detail="Invalid authorization header") | |
| token = authorization.split(" ", 1)[1] | |
| try: | |
| agent_service = GoogleAgentService( | |
| token=token, | |
| session_id=request.session_id, | |
| document=request.document.dict() if request.document else None, | |
| image=request.image.dict() if request.image else None, | |
| ) | |
| except Exception as exc: | |
| logger.error("Failed to initialise agent service: %s", exc, exc_info=True) | |
| raise HTTPException(status_code=500, detail="Unable to initialise agent") | |
| return StreamingResponse( | |
| stream_agent_response(agent_service, request.query), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "Content-Type": "text/event-stream", | |
| "X-Accel-Buffering": "no", | |
| "Access-Control-Allow-Origin": "*", | |
| "Access-Control-Allow-Headers": "*", | |
| }, | |
| ) | |
| @router.get("/agent-status") | |
| async def agent_status(username: str = Depends(get_current_user)): | |
| try: | |
| return { | |
| "status": "available", | |
| "model": DEFAULT_MODEL_NAME, | |
| "features": [ | |
| "web_search", | |
| "vector_search", | |
| "image_search", | |
| "streaming", | |
| "tool_calls", | |
| ], | |
| } | |
| except Exception as exc: | |
| logger.error("Agent status error: %s", exc, exc_info=True) | |
| return {"status": "error", "message": str(exc)} | |
| ``` | |
| ### app\routers\auth.py | |
| ```python | |
| # app/routers/auth.py | |
| import os | |
| import random | |
| import smtplib | |
| import ssl | |
| import string | |
| from datetime import datetime, timedelta | |
| from email.mime.multipart import MIMEMultipart | |
| from email.mime.text import MIMEText | |
| from email.utils import formataddr | |
| from fastapi import APIRouter, HTTPException, Depends | |
| from pydantic import BaseModel, EmailStr | |
| from werkzeug.security import generate_password_hash, check_password_hash | |
| from app.database.database_query import DatabaseQuery | |
| from app.middleware.auth import create_access_token, get_current_user | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| SMTP_SERVER = os.getenv("SMTP_SERVER") # optional alias | |
| _raw_host = os.getenv("SMTP_HOST") | |
| _raw_port = os.getenv("SMTP_PORT") | |
| # Be forgiving if env is swapped (e.g., SMTP_HOST=587 and SMTP_SERVER=smtp.gmail.com) | |
| if _raw_host and _raw_host.strip().isdigit() and not _raw_port: | |
| SMTP_HOST = SMTP_SERVER or "smtp.gmail.com" | |
| SMTP_PORT = int(_raw_host.strip()) | |
| else: | |
| SMTP_HOST = _raw_host or SMTP_SERVER or "smtp.gmail.com" | |
| try: | |
| SMTP_PORT = int(_raw_port) if _raw_port else 587 | |
| except Exception: | |
| SMTP_PORT = 587 | |
| SMTP_USER = os.getenv("SMTP_USER") | |
| SMTP_PASSWORD = os.getenv("SMTP_PASSWORD") | |
| EMAILS_FROM_EMAIL = os.getenv("EMAILS_FROM_EMAIL") or None | |
| EMAILS_FROM_NAME = os.getenv("EMAILS_FROM_NAME") or None | |
| def send_email(to_email: str, subject: str, html_content: str, failure_message: str = "Failed to send email") -> None: | |
| if not all([SMTP_HOST, SMTP_PORT, SMTP_USER, SMTP_PASSWORD]): | |
| raise HTTPException(status_code=500, detail="Email service is not configured properly") | |
| message = MIMEMultipart("alternative") | |
| from_email = EMAILS_FROM_EMAIL or SMTP_USER | |
| from_header = formataddr((EMAILS_FROM_NAME, from_email)) if EMAILS_FROM_NAME else from_email | |
| message["Subject"] = subject | |
| message["From"] = from_header | |
| message["To"] = to_email | |
| if EMAILS_FROM_EMAIL and EMAILS_FROM_EMAIL != SMTP_USER: | |
| message["Reply-To"] = EMAILS_FROM_EMAIL | |
| message.attach(MIMEText(html_content, "html")) | |
| try: | |
| context = ssl.create_default_context() | |
| with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server: | |
| server.starttls(context=context) | |
| auth_password = SMTP_PASSWORD | |
| if (("gmail" in (SMTP_HOST or "")) or ((SMTP_USER or "").lower().endswith("@gmail.com"))) and isinstance(SMTP_PASSWORD, str) and (" " in SMTP_PASSWORD): | |
| auth_password = SMTP_PASSWORD.replace(" ", "") | |
| server.login(SMTP_USER, auth_password) | |
| server.sendmail(SMTP_USER, [to_email], message.as_string()) | |
| except Exception as exc: | |
| raise HTTPException(status_code=500, detail=f"{failure_message}: {exc}") | |
| router = APIRouter() | |
| query = DatabaseQuery() | |
| class LoginRequest(BaseModel): | |
| identifier: str | |
| password: str | |
| class LoginResponse(BaseModel): | |
| message: str | |
| token: str | |
| class RegisterRequest(BaseModel): | |
| username: str | |
| email: EmailStr | |
| password: str | |
| name: str | |
| age: int | |
| class VerifyEmailRequest(BaseModel): | |
| username: str | |
| code: str | |
| class ResendCodeRequest(BaseModel): | |
| username: str | |
| class ForgotPasswordRequest(BaseModel): | |
| email: EmailStr | |
| class ResetPasswordRequest(BaseModel): | |
| token: str | |
| password: str | |
| class ChatSessionCheck(BaseModel): | |
| session_id: str | |
| @router.post('/login', response_model=LoginResponse) | |
| async def login(login_data: LoginRequest): | |
| try: | |
| identifier = login_data.identifier | |
| password = login_data.password | |
| user = query.get_user_by_identifier(identifier) | |
| if user: | |
| if not user.get('is_verified'): | |
| raise HTTPException(status_code=401, detail="Please verify your email before logging in") | |
| if check_password_hash(user['password'], password): | |
| access_token = create_access_token({"sub": user['username']}) | |
| return {"message": "Login successful", "token": access_token} | |
| raise HTTPException(status_code=401, detail="Invalid username/email or password") | |
| except Exception as e: | |
| if isinstance(e, HTTPException): | |
| raise e | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.post('/register', status_code=201) | |
| async def register(register_data: RegisterRequest): | |
| try: | |
| username = register_data.username | |
| email = register_data.email | |
| password = register_data.password | |
| name = register_data.name | |
| age = register_data.age | |
| if query.is_username_or_email_exists(username, email): | |
| raise HTTPException(status_code=409, detail="Username or email already exists") | |
| verification_code = ''.join(random.choices(string.digits, k=6)) | |
| code_expiration = datetime.utcnow() + timedelta(minutes=10) | |
| hashed_password = generate_password_hash(password) | |
| created_at = datetime.utcnow() | |
| temp_user = { | |
| 'username': username, | |
| 'email': email, | |
| 'password': hashed_password, | |
| 'name': name, | |
| 'age': age, | |
| 'created_at': created_at, | |
| 'verification_code': verification_code, | |
| 'code_expiration': code_expiration | |
| } | |
| query.create_or_update_temp_user(username, email, temp_user) | |
| try: | |
| send_email( | |
| to_email=email, | |
| subject='Verify your email address', | |
| html_content=f''' | |
| <p>Hi {name},</p> | |
| <p>Thank you for registering. Please use the following code to verify your email address:</p> | |
| <h2>{verification_code}</h2> | |
| <p>This code will expire in 10 minutes.</p> | |
| ''', | |
| failure_message="Failed to send verification email", | |
| ) | |
| except Exception: | |
| raise HTTPException(status_code=500, detail="Failed to send verification email") | |
| return {"message": "Registration successful. A verification code has been sent to your email."} | |
| except Exception as e: | |
| if isinstance(e, HTTPException): | |
| raise e | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.post('/verify-email') | |
| async def verify_email(verify_data: VerifyEmailRequest): | |
| try: | |
| username = verify_data.username | |
| code = verify_data.code | |
| temp_user = query.get_temp_user_by_username(username) | |
| if not temp_user: | |
| raise HTTPException(status_code=404, detail="User not found or already verified") | |
| if temp_user['verification_code'] != code: | |
| raise HTTPException(status_code=400, detail="Invalid verification code") | |
| if datetime.utcnow() > temp_user['code_expiration']: | |
| raise HTTPException(status_code=400, detail="Verification code has expired") | |
| user_data = temp_user.copy() | |
| user_data['is_verified'] = True | |
| user_data.pop('verification_code', None) | |
| user_data.pop('code_expiration', None) | |
| user_data.pop('_id', None) | |
| query.create_user_from_data(user_data) | |
| query.delete_temp_user(username) | |
| # Set default language to English | |
| query.set_user_language(username, "English") | |
| # Set default theme to light (passing false for dark theme) | |
| query.set_user_theme(username, False) | |
| default_preferences = { | |
| 'keywords': True, | |
| 'references': True, | |
| 'websearch': False, | |
| 'personalized_recommendations': True, | |
| 'environmental_recommendations': True | |
| } | |
| query.set_user_preferences(username, default_preferences) | |
| return {"message": "Email verification successful"} | |
| except Exception as e: | |
| if isinstance(e, HTTPException): | |
| raise e | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.post('/resend-code') | |
| async def resend_code(resend_data: ResendCodeRequest): | |
| try: | |
| username = resend_data.username | |
| temp_user = query.get_temp_user_by_username(username) | |
| if not temp_user: | |
| raise HTTPException(status_code=404, detail="User not found or already verified") | |
| verification_code = ''.join(random.choices(string.digits, k=6)) | |
| code_expiration = datetime.utcnow() + timedelta(minutes=10) | |
| temp_user['verification_code'] = verification_code | |
| temp_user['code_expiration'] = code_expiration | |
| query.create_or_update_temp_user(username, temp_user['email'], temp_user) | |
| try: | |
| send_email( | |
| to_email=temp_user['email'], | |
| subject='Your new verification code', | |
| html_content=f''' | |
| <p>Hi {temp_user['name']},</p> | |
| <p>You requested a new verification code. Please use the following code to verify your email address:</p> | |
| <h2>{verification_code}</h2> | |
| <p>This code will expire in 10 minutes.</p> | |
| ''', | |
| failure_message="Failed to send verification email", | |
| ) | |
| except Exception: | |
| raise HTTPException(status_code=500, detail="Failed to send verification email") | |
| return {"message": "A new verification code has been sent to your email."} | |
| except Exception as e: | |
| if isinstance(e, HTTPException): | |
| raise e | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.post('/checkChatsession') | |
| async def check_chatsession(data: ChatSessionCheck, username: str = Depends(get_current_user)): | |
| session_id = data.session_id | |
| is_chat_exit = query.check_chat_session(session_id) | |
| return {"ischatexit": is_chat_exit} | |
| @router.get('/check-token') | |
| async def check_token(username: str = Depends(get_current_user)): | |
| try: | |
| return {'valid': True, 'user': username} | |
| except Exception as e: | |
| raise HTTPException(status_code=401, detail=str(e)) | |
| @router.post('/forgot-password') | |
| async def forgot_password(data: ForgotPasswordRequest): | |
| try: | |
| email = data.email | |
| user = query.get_user_by_identifier(email) | |
| if not user: | |
| raise HTTPException(status_code=404, detail="Email not found") | |
| reset_token = ''.join(random.choices(string.ascii_letters + string.digits, k=32)) | |
| expiration = datetime.utcnow() + timedelta(hours=1) | |
| query.store_reset_token(email, reset_token, expiration) | |
| reset_link = f"http://localhost:3000/reset-password?token={reset_token}" | |
| try: | |
| send_email( | |
| to_email=email, | |
| subject='Reset Your Password', | |
| html_content=f''' | |
| <p>Hi,</p> | |
| <p>You requested to reset your password. Click the link below to reset it:</p> | |
| <p><a href="{reset_link}">Reset Password</a></p> | |
| <p>This link will expire in 1 hour.</p> | |
| <p>If you didn't request this, please ignore this email.</p> | |
| ''', | |
| failure_message="Failed to send password reset email", | |
| ) | |
| except Exception: | |
| raise HTTPException(status_code=500, detail="Failed to send password reset email") | |
| return {"message": "Password reset instructions sent to email"} | |
| except Exception as e: | |
| if isinstance(e, HTTPException): | |
| raise e | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.post('/reset-password') | |
| async def reset_password(data: ResetPasswordRequest): | |
| try: | |
| token = data.token | |
| new_password = data.password | |
| if not token or not new_password: | |
| raise HTTPException(status_code=400, detail="Token and new password are required") | |
| reset_info = query.verify_reset_token(token) | |
| if not reset_info: | |
| raise HTTPException(status_code=400, detail="Invalid or expired reset token") | |
| hashed_password = generate_password_hash(new_password) | |
| query.update_password(reset_info['email'], hashed_password) | |
| return {"message": "Password successfully reset"} | |
| except Exception as e: | |
| if isinstance(e, HTTPException): | |
| raise e | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| ``` | |
| ### app\routers\chat.py | |
| ```python | |
| # app/routers/chat.py | |
| import logging | |
| import os | |
| import json | |
| from datetime import datetime | |
| from bson import ObjectId | |
| from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Header | |
| from fastapi.responses import JSONResponse, FileResponse | |
| from pydantic import BaseModel | |
| from app.database.database_query import DatabaseQuery | |
| from app.middleware.auth import get_current_user, get_optional_user | |
| from app.services.skincare_scheduler import SkinCareScheduler | |
| from app.services.wheel import EnvironmentalConditions | |
| from app.services.RAG_evaluation import RAGEvaluation | |
| router = APIRouter() | |
| query = DatabaseQuery() | |
| class ChatSessionTitleUpdate(BaseModel): | |
| title: str | |
| @router.get('/image/{filename}') | |
| async def serve_image(filename: str): | |
| try: | |
| # Use an absolute path or environment variable to ensure consistency | |
| upload_dir = os.path.abspath('uploads') | |
| file_path = os.path.join(upload_dir, filename) | |
| # Add logging to debug | |
| print(f"Attempting to serve file from: {file_path}") | |
| if not os.path.exists(file_path): | |
| print(f"File not found: {file_path}") | |
| raise FileNotFoundError() | |
| return FileResponse(file_path) | |
| except FileNotFoundError: | |
| raise HTTPException(status_code=404, detail="Image not found") | |
| @router.post('/chat-sessions', status_code=201) | |
| async def create_chat_session(username: str = Depends(get_current_user)): | |
| try: | |
| session_id = str(ObjectId()) | |
| chat_session = { | |
| "user_id": username, | |
| "session_id": session_id, | |
| "created_at": datetime.utcnow(), | |
| "last_accessed": datetime.utcnow(), | |
| "title": "New Chat" | |
| } | |
| query.create_chat_session(chat_session) | |
| return {"message": "Chat session created", "session_id": session_id} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.get('/chat-sessions') | |
| async def get_user_chat_sessions(username: str = Depends(get_current_user)): | |
| try: | |
| sessions = query.get_user_chat_sessions(username) | |
| return sessions | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.delete('/chat-sessions/{session_id}') | |
| async def delete_chat_session(session_id: str, username: str = Depends(get_current_user)): | |
| try: | |
| result = query.delete_chat_session(session_id, username) | |
| if result["session_deleted"]: | |
| return { | |
| "message": "Chat session and associated chats deleted successfully", | |
| "chats_deleted": result["chats_deleted"] | |
| } | |
| raise HTTPException(status_code=404, detail="Chat session not found or unauthorized") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.put('/chat-sessions/{session_id}/title') | |
| async def update_chat_title( | |
| session_id: str, | |
| title_data: ChatSessionTitleUpdate, | |
| username: str = Depends(get_current_user) | |
| ): | |
| try: | |
| new_title = title_data.title | |
| if not query.verify_session(session_id, username): | |
| raise HTTPException(status_code=404, detail="Chat session not found or unauthorized") | |
| if query.update_chat_session_title(session_id, new_title): | |
| return { | |
| 'message': 'Chat session title updated successfully', | |
| 'session_id': session_id, | |
| 'new_title': new_title | |
| } | |
| raise HTTPException(status_code=500, detail="Failed to update chat session title") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.delete('/chat-sessions/all') | |
| async def delete_all_sessions_and_chats(username: str = Depends(get_current_user)): | |
| try: | |
| result = query.delete_all_user_sessions_and_chats(username) | |
| return { | |
| "message": "Successfully deleted all chat sessions and chats", | |
| "deleted_chats": result["deleted_chats"], | |
| "deleted_sessions": result["deleted_sessions"] | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.get('/chats/session/{session_id}') | |
| async def get_session_chats(session_id: str, username: str = Depends(get_current_user)): | |
| try: | |
| chats = query.get_session_chats(session_id, username) | |
| return chats | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.get('/export-chat/{session_id}') | |
| async def export_chat(session_id: str, username: str = Depends(get_current_user)): | |
| try: | |
| if not query.verify_session(session_id, username): | |
| raise HTTPException(status_code=404, detail="Chat session not found or unauthorized") | |
| chats = query.get_session_chats(session_id, username) | |
| formatted_chats = [] | |
| for chat in chats: | |
| formatted_chat = { | |
| 'query': chat.get('query', ''), | |
| 'response': chat.get('response', ''), | |
| 'references': chat.get('references', []), | |
| 'page_no': chat.get('page_no', ''), | |
| 'date': chat.get('timestamp', ''), | |
| 'chat_id': chat.get('chat_id', '') | |
| } | |
| formatted_chats.append(formatted_chat) | |
| export_data = { | |
| 'session_id': session_id, | |
| 'export_date': datetime.utcnow().isoformat(), | |
| 'chats': formatted_chats | |
| } | |
| return export_data | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.get('/export-all-chats') | |
| async def export_all_chats(username: str = Depends(get_current_user)): | |
| try: | |
| all_chats = query.get_all_user_chats(username) | |
| formatted_sessions = [] | |
| for session in all_chats: | |
| formatted_chats = [] | |
| for chat in session['chats']: | |
| formatted_chat = { | |
| 'query': chat.get('query', ''), | |
| 'response': chat.get('response', ''), | |
| 'references': chat.get('references', []), | |
| 'page_no': chat.get('page_no', ''), | |
| 'timestamp': chat.get('timestamp', ''), | |
| 'chat_id': chat.get('chat_id', '') | |
| } | |
| formatted_chats.append(formatted_chat) | |
| formatted_session = { | |
| 'session_id': session['session_id'], | |
| 'title': session['title'], | |
| 'created_at': session['created_at'], | |
| 'last_accessed': session['last_accessed'], | |
| 'chats': formatted_chats | |
| } | |
| formatted_sessions.append(formatted_session) | |
| export_data = { | |
| 'user': username, | |
| 'export_date': datetime.utcnow().isoformat(), | |
| 'sessions': formatted_sessions | |
| } | |
| return export_data | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.post('/report-analysis') | |
| async def upload_report( | |
| file: UploadFile = File(...), | |
| session_id: str = Form(...), | |
| authorization: str = Header(None), | |
| username: str = Depends(get_current_user) | |
| ): | |
| try: | |
| _ = authorization.split(" ")[1] | |
| if not file.filename: | |
| return JSONResponse( | |
| status_code=400, | |
| content={"status": "error", "error": "Empty file provided"} | |
| ) | |
| file_extension = file.filename.rsplit('.', 1)[1].lower() if '.' in file.filename else '' | |
| allowed_extensions = { | |
| 'pdf', | |
| 'xlsx', | |
| 'xls', | |
| 'csv', | |
| 'jpg', | |
| 'jpeg', | |
| 'png', | |
| 'doc', | |
| 'docx', | |
| 'ppt', | |
| 'pptx', | |
| 'txt', | |
| 'html' | |
| } | |
| if file_extension not in allowed_extensions: | |
| return JSONResponse( | |
| status_code=400, | |
| content={ | |
| "status": "error", | |
| "error": f"Unsupported file type. Allowed types: {', '.join(sorted(allowed_extensions))}", | |
| } | |
| ) | |
| default_upload_root = os.path.abspath( | |
| os.path.join(os.path.dirname(__file__), "..", "..", "uploads") | |
| ) | |
| uploads_root = os.getenv('DERMAI_UPLOAD_DIR', default_upload_root) | |
| session_upload_dir = os.path.join(uploads_root, session_id) | |
| os.makedirs(session_upload_dir, exist_ok=True) | |
| timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f') | |
| sanitized_name = file.filename.replace(' ', '_') | |
| stored_filename = f"{timestamp}_{sanitized_name}" | |
| stored_path = os.path.join(session_upload_dir, stored_filename) | |
| content = await file.read() | |
| with open(stored_path, 'wb') as f: | |
| f.write(content) | |
| relative_root = os.path.abspath(uploads_root) | |
| absolute_path = os.path.abspath(stored_path) | |
| relative_path = os.path.relpath(absolute_path, relative_root) | |
| return { | |
| "status": "success", | |
| "message": "File uploaded successfully", | |
| "file": { | |
| "path": relative_path.replace('\\', '/'), | |
| "name": file.filename, | |
| "content_type": file.content_type, | |
| "size": len(content), | |
| "extension": file_extension, | |
| } | |
| } | |
| except Exception as e: | |
| logging.error(f"Error in upload_report: {str(e)}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail={ | |
| "status": "error", | |
| "error": "Internal server error", | |
| "details": str(e) | |
| } | |
| ) | |
| @router.get('/skin-care-schedule') | |
| async def get_skin_care_schedule( | |
| authorization: str = Header(None), | |
| username: str = Depends(get_current_user) | |
| ): | |
| try: | |
| token = authorization.split(" ")[1] | |
| scheduler = SkinCareScheduler(token, "session_id") | |
| schedule = scheduler.createTable() | |
| return json.loads(schedule) | |
| except Exception as e: | |
| logging.error(f"Error generating skin care schedule: {str(e)}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail={"error": "Failed to generate skin care schedule"} | |
| ) | |
| @router.get('/skin-care-wheel') | |
| async def get_skin_care_wheel( | |
| authorization: str = Header(...), | |
| username: str = Depends(get_current_user) | |
| ): | |
| try: | |
| token = authorization.split(" ")[1] | |
| condition = EnvironmentalConditions(session_id=token) | |
| condition_data = condition.get_conditon() | |
| return condition_data | |
| except Exception as e: | |
| logging.error(f"Error generating skin care wheel: {str(e)}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail={ | |
| "error": "Failed to generate skin care wheel", | |
| "message": "An unexpected error occurred" | |
| } | |
| ) | |
| @router.post('/image_disease_search') | |
| async def upload_skin_image( | |
| image: UploadFile = File(...), | |
| session_id: str = Form(...), | |
| query: str = Form(""), | |
| num_results: int = Form(3), | |
| num_images: int = Form(3), | |
| authorization: str = Header(...), | |
| username: str = Depends(get_current_user) | |
| ): | |
| try: | |
| _ = authorization.split(" ")[1] | |
| if not image.filename: | |
| return JSONResponse( | |
| status_code=400, | |
| content={"status": "error", "error": "Empty image file provided"}, | |
| ) | |
| allowed_extensions = { | |
| "jpg", | |
| "jpeg", | |
| "png", | |
| "bmp", | |
| "webp", | |
| "avif", | |
| "avifs", | |
| "heic", | |
| "heif", | |
| } | |
| file_extension = ( | |
| image.filename.rsplit('.', 1)[1].lower() if '.' in image.filename else '' | |
| ) | |
| if file_extension not in allowed_extensions: | |
| return JSONResponse( | |
| status_code=400, | |
| content={ | |
| "status": "error", | |
| "error": ( | |
| "Unsupported image type. Allowed types: " | |
| f"{', '.join(sorted(allowed_extensions))}" | |
| ), | |
| }, | |
| ) | |
| default_upload_root = os.path.abspath( | |
| os.path.join(os.path.dirname(__file__), "..", "..", "uploads") | |
| ) | |
| uploads_root = os.getenv('DERMAI_UPLOAD_DIR', default_upload_root) | |
| session_upload_dir = os.path.join(uploads_root, session_id) | |
| os.makedirs(session_upload_dir, exist_ok=True) | |
| timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f') | |
| sanitized_name = image.filename.replace(' ', '_') | |
| stored_filename = f"{timestamp}_{sanitized_name}" | |
| stored_path = os.path.join(session_upload_dir, stored_filename) | |
| content = await image.read() | |
| with open(stored_path, 'wb') as f: | |
| f.write(content) | |
| absolute_path = os.path.abspath(stored_path) | |
| relative_root = os.path.abspath(uploads_root) | |
| relative_path = os.path.relpath(absolute_path, relative_root) | |
| return { | |
| "status": "success", | |
| "message": "Image uploaded successfully", | |
| "file": { | |
| "path": relative_path.replace('\\', '/'), | |
| "name": image.filename, | |
| "content_type": image.content_type, | |
| "size": len(content), | |
| "extension": file_extension, | |
| "original_query": query, | |
| }, | |
| } | |
| except Exception as e: | |
| logging.error(f"Error in upload_skin_image: {str(e)}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail={ | |
| "status": "error", | |
| "error": "Internal server error", | |
| "details": str(e), | |
| }, | |
| ) | |
| @router.post('/get_rag_evaluation') | |
| async def rag_evaluation( | |
| page: int = Form(3), | |
| page_size: int = Form(3), | |
| authorization: str = Header(...), | |
| username: str = Depends(get_current_user) | |
| ): | |
| try: | |
| token = authorization.split(" ")[1] | |
| evaluator = RAGEvaluation( | |
| token=token, | |
| page=page, | |
| page_size=page_size | |
| ) | |
| report = evaluator.generate_evaluation_report() | |
| return {"response": report} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| ``` | |
| ### app\routers\chat_session.py | |
| ```python | |
| # app/routers/chat_session.py | |
| from datetime import datetime | |
| from bson import ObjectId | |
| from fastapi import APIRouter, Depends, HTTPException | |
| from pydantic import BaseModel | |
| from app.database.database_query import DatabaseQuery | |
| from app.middleware.auth import get_current_user | |
| router = APIRouter() | |
| query = DatabaseQuery() | |
| class ChatSessionTitleUpdate(BaseModel): | |
| title: str | |
| @router.post('/chat-sessions', status_code=201) | |
| async def create_chat_session(username: str = Depends(get_current_user)): | |
| try: | |
| session_id = str(ObjectId()) | |
| chat_session = { | |
| "user_id": username, | |
| "session_id": session_id, | |
| "created_at": datetime.utcnow(), | |
| "last_accessed": datetime.utcnow(), | |
| "title": "New Chat" | |
| } | |
| query.create_chat_session(chat_session) | |
| return {"message": "Chat session created", "session_id": session_id} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.get('/chat-sessions') | |
| async def get_user_chat_sessions(username: str = Depends(get_current_user)): | |
| try: | |
| sessions = query.get_user_chat_sessions(username) | |
| return sessions | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.delete('/chat-sessions/{session_id}') | |
| async def delete_chat_session(session_id: str, username: str = Depends(get_current_user)): | |
| try: | |
| result = query.delete_chat_session(session_id, username) | |
| if result["session_deleted"]: | |
| return { | |
| "message": "Chat session and associated chats deleted successfully", | |
| "chats_deleted": result["chats_deleted"] | |
| } | |
| raise HTTPException(status_code=404, detail="Chat session not found or unauthorized") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.put('/chat-sessions/{session_id}/title') | |
| async def update_chat_title( | |
| session_id: str, | |
| title_data: ChatSessionTitleUpdate, | |
| username: str = Depends(get_current_user) | |
| ): | |
| try: | |
| new_title = title_data.title | |
| if not query.verify_session(session_id, username): | |
| raise HTTPException(status_code=404, detail="Chat session not found or unauthorized") | |
| if query.update_chat_session_title(session_id, new_title): | |
| return { | |
| 'message': 'Chat session title updated successfully', | |
| 'session_id': session_id, | |
| 'new_title': new_title | |
| } | |
| raise HTTPException(status_code=500, detail="Failed to update chat session title") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.delete('/chat-sessions/all') | |
| async def delete_all_sessions_and_chats(username: str = Depends(get_current_user)): | |
| try: | |
| result = query.delete_all_user_sessions_and_chats(username) | |
| return { | |
| "message": "Successfully deleted all chat sessions and chats", | |
| "deleted_chats": result["deleted_chats"], | |
| "deleted_sessions": result["deleted_sessions"] | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.get('/chats/session/{session_id}') | |
| async def get_session_chats(session_id: str, username: str = Depends(get_current_user)): | |
| try: | |
| chats = query.get_session_chats(session_id, username) | |
| return chats | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| ``` | |
| ### app\routers\language.py | |
| ```python | |
| from fastapi import APIRouter, Depends, HTTPException | |
| from pydantic import BaseModel | |
| from app.middleware.auth import get_current_user | |
| from app.database.database_query import DatabaseQuery | |
| router = APIRouter() | |
| query = DatabaseQuery() | |
| class LanguageSettings(BaseModel): | |
| language: str | |
| @router.post('/language', status_code=201) | |
| async def set_language( | |
| language_data: LanguageSettings, | |
| username: str = Depends(get_current_user) | |
| ): | |
| try: | |
| language = language_data.language | |
| if not language: | |
| raise HTTPException(status_code=400, detail="Language is required") | |
| result = query.set_user_language(username, language) | |
| return { | |
| "message": "Language set successfully", | |
| "language": result["language"] | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.get('/language') | |
| async def get_language(username: str = Depends(get_current_user)): | |
| try: | |
| language = query.get_user_language(username) | |
| if language is None: | |
| raise HTTPException(status_code=404, detail="Language not set") | |
| return { | |
| "message": "Language retrieved successfully", | |
| "language": language | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.delete('/language') | |
| async def delete_language(username: str = Depends(get_current_user)): | |
| try: | |
| result = query.delete_user_language(username) | |
| if not result: | |
| raise HTTPException(status_code=404, detail="Language not found or already deleted") | |
| return { | |
| "message": "Language deleted successfully" | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| ``` | |
| ### app\routers\location.py | |
| ```python | |
| # app/routers/location.py | |
| from fastapi import APIRouter, Depends, HTTPException | |
| from pydantic import BaseModel | |
| from app.database.database_query import DatabaseQuery | |
| from app.middleware.auth import get_current_user | |
| router = APIRouter() | |
| query = DatabaseQuery() | |
| class LocationData(BaseModel): | |
| location: str | |
| @router.post('/location', status_code=201) | |
| async def add_location(location_data: LocationData, username: str = Depends(get_current_user)): | |
| try: | |
| location = location_data.location | |
| if not location: | |
| raise HTTPException(status_code=400, detail="Location is required") | |
| query.add_or_update_location(username, location) | |
| return {'message': 'Location added/updated successfully'} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.get('/location') | |
| async def get_location(username: str = Depends(get_current_user)): | |
| try: | |
| location_data = query.get_location(username) | |
| if not location_data: | |
| raise HTTPException(status_code=404, detail="No location found for this user") | |
| return {'location': location_data['location']} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| ``` | |
| ### app\routers\preferences.py | |
| ```python | |
| # app/routers/preferences.py | |
| from fastapi import APIRouter, Depends, HTTPException | |
| from pydantic import BaseModel | |
| from typing import Dict, Any | |
| from app.database.database_query import DatabaseQuery | |
| from app.middleware.auth import get_current_user | |
| router = APIRouter() | |
| query = DatabaseQuery() | |
| class ThemeSettings(BaseModel): | |
| theme: bool | |
| @router.get('/preferences') | |
| async def get_preferences(username: str = Depends(get_current_user)): | |
| try: | |
| user_preferences = query.get_user_preferences(username) | |
| return {'preferences': user_preferences} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.post('/preferences') | |
| async def set_preferences(preferences: Dict[str, Any], username: str = Depends(get_current_user)): | |
| try: | |
| preferences_result = query.set_user_preferences(username, preferences) | |
| return { | |
| 'message': 'Preferences updated successfully', | |
| 'preferences': preferences_result | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.get('/theme') | |
| async def get_theme(username: str = Depends(get_current_user)): | |
| try: | |
| user_theme = query.get_user_theme(username) | |
| return {'theme': user_theme} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.post('/theme') | |
| async def set_theme(theme_data: ThemeSettings, username: str = Depends(get_current_user)): | |
| try: | |
| theme = theme_data.theme | |
| theme_data = query.set_user_theme(username, theme) | |
| return { | |
| 'message': 'Theme updated successfully', | |
| 'theme': theme_data['theme'] | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| ``` | |
| ### app\routers\profile.py | |
| ```python | |
| from fastapi import APIRouter, Depends, HTTPException, Body | |
| from pydantic import BaseModel, EmailStr, validator | |
| from typing import Optional | |
| from werkzeug.security import generate_password_hash | |
| from app.database.database_query import DatabaseQuery | |
| from app.middleware.auth import get_current_user | |
| router = APIRouter() | |
| query = DatabaseQuery() | |
| class ProfileUpdateRequest(BaseModel): | |
| email: Optional[EmailStr] = None | |
| password: Optional[str] = None | |
| name: Optional[str] = None | |
| age: Optional[int] = None | |
| @validator('password') | |
| def password_length(cls, v): | |
| if v is not None and len(v) < 6: | |
| raise ValueError('Password must be at least 6 characters') | |
| return v | |
| @validator('age') | |
| def age_range(cls, v): | |
| if v is not None and (v < 13 or v > 120): | |
| raise ValueError('Age must be between 13 and 120') | |
| return v | |
| @router.get('/profile') | |
| async def get_profile(username: str = Depends(get_current_user)): | |
| try: | |
| user = query.get_user_profile(username) | |
| if not user: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| return { | |
| 'username': user['username'], | |
| 'email': user['email'], | |
| 'name': user['name'], | |
| 'age': user['age'], | |
| 'created_at': user['created_at'] | |
| } | |
| except Exception as e: | |
| if isinstance(e, HTTPException): | |
| raise e | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.put('/profile') | |
| async def update_profile( | |
| update_data: ProfileUpdateRequest = Body(...), | |
| username: str = Depends(get_current_user) | |
| ): | |
| try: | |
| update_fields = {} | |
| if update_data.email: | |
| if not query.is_valid_email(update_data.email): | |
| raise HTTPException(status_code=400, detail="Invalid email format") | |
| update_fields['email'] = update_data.email | |
| if update_data.password: | |
| update_fields['password'] = generate_password_hash(update_data.password) | |
| if update_data.name: | |
| update_fields['name'] = update_data.name | |
| if update_data.age is not None: | |
| update_fields['age'] = update_data.age | |
| if update_fields: | |
| if query.update_user_profile(username, update_fields): | |
| return {"message": "Profile updated successfully"} | |
| return {"message": "No changes made"} | |
| except Exception as e: | |
| if isinstance(e, HTTPException): | |
| raise e | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.delete('/profile') | |
| async def delete_account(username: str = Depends(get_current_user)): | |
| try: | |
| if query.delete_user_account(username): | |
| return {"message": "Account deleted successfully"} | |
| raise HTTPException(status_code=404, detail="User not found") | |
| except Exception as e: | |
| if isinstance(e, HTTPException): | |
| raise e | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.delete('/delete-account-permanently') | |
| async def delete_account_permanently(username: str = Depends(get_current_user)): | |
| try: | |
| result = query.delete_account_permanently(username) | |
| if result['success']: | |
| return { | |
| 'message': 'Account and all associated data deleted successfully', | |
| 'details': result['deleted_data'] | |
| } | |
| else: | |
| raise HTTPException(status_code=500, detail="Failed to delete account") | |
| except Exception as e: | |
| if isinstance(e, HTTPException): | |
| raise e | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| ``` | |
| ### app\routers\questionnaire.py | |
| ```python | |
| from fastapi import APIRouter, Depends, HTTPException | |
| from pydantic import BaseModel | |
| from typing import Dict, Any | |
| from app.database.database_query import DatabaseQuery | |
| from app.middleware.auth import get_current_user | |
| router = APIRouter() | |
| query = DatabaseQuery() | |
| class QuestionnaireSubmission(BaseModel): | |
| answers: Dict[str, Any] | |
| @router.post('/questionnaires', status_code=201) | |
| async def submit_questionnaire( | |
| submission: QuestionnaireSubmission, | |
| username: str = Depends(get_current_user) | |
| ): | |
| try: | |
| if not submission.answers: | |
| raise HTTPException(status_code=400, detail="Answers are required") | |
| questionnaire_id = query.submit_questionnaire(username, submission.answers) | |
| return { | |
| 'message': 'Questionnaire submitted successfully', | |
| 'questionnaire_id': questionnaire_id | |
| } | |
| except Exception as e: | |
| if isinstance(e, HTTPException): | |
| raise e | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.get('/questionnaires') | |
| async def get_questionnaire(username: str = Depends(get_current_user)): | |
| try: | |
| questionnaire = query.get_latest_questionnaire(username) | |
| if not questionnaire: | |
| return {'message': 'No questionnaire found', 'data': None} | |
| return {'message': 'Success', 'data': questionnaire} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.put('/questionnaires/{questionnaire_id}') | |
| async def update_questionnaire( | |
| questionnaire_id: str, | |
| submission: QuestionnaireSubmission, | |
| username: str = Depends(get_current_user) | |
| ): | |
| try: | |
| if not submission.answers: | |
| raise HTTPException(status_code=400, detail="Answers are required") | |
| if query.update_questionnaire(questionnaire_id, username, submission.answers): | |
| return {'message': 'Questionnaire updated successfully'} | |
| raise HTTPException( | |
| status_code=404, | |
| detail='Questionnaire not found or unauthorized' | |
| ) | |
| except Exception as e: | |
| if isinstance(e, HTTPException): | |
| raise e | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.delete('/questionnaires/{questionnaire_id}') | |
| async def delete_questionnaire( | |
| questionnaire_id: str, | |
| username: str = Depends(get_current_user) | |
| ): | |
| try: | |
| if query.delete_questionnaire(questionnaire_id, username): | |
| return {'message': 'Questionnaire deleted successfully'} | |
| raise HTTPException( | |
| status_code=404, | |
| detail='Questionnaire not found or unauthorized' | |
| ) | |
| except Exception as e: | |
| if isinstance(e, HTTPException): | |
| raise e | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.get('/check-answers') | |
| async def check_answers(username: str = Depends(get_current_user)): | |
| try: | |
| answered_count = query.count_answered_questions(username) | |
| return {'has_at_least_two_answers': answered_count >= 2} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| @router.get('/check-questionnaire') | |
| async def check_questionnaire_submission(username: str = Depends(get_current_user)): | |
| try: | |
| questionnaire = query.get_latest_questionnaire(username) | |
| has_questionnaire = questionnaire is not None | |
| return {'has_questionnaire': has_questionnaire} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| ``` | |
| ### app\services\__init__.py | |
| ```python | |
| # app/services/__init__.py | |
| from app.services.image_classification_vit import SkinDiseaseClassifier | |
| from app.services.llm_model import Model | |
| from app.services.chathistory import ChatSession | |
| from app.services.environmental_condition import EnvironmentalData | |
| from app.services.prompts import * | |
| from app.services.RAG_evaluation import RAGEvaluation | |
| from app.services.skincare_scheduler import SkinCareScheduler | |
| from app.services.vector_database_search import VectorDatabaseSearch | |
| from app.services.websearch import WebSearch | |
| from app.services.wheel import EnvironmentalConditions | |
| __all__ = [ | |
| "AISkinDetector", | |
| "SkinDiseaseClassifier", | |
| "Model", | |
| "ChatSession", | |
| "EnvironmentalData", | |
| "RAGEvaluation", | |
| "SkinCareScheduler", | |
| "VectorDatabaseSearch", | |
| "WebSearch", | |
| "EnvironmentalConditions", | |
| ] | |
| ``` | |
| ### app\services\agentic_prompt.py | |
| ```python | |
| from typing import Dict | |
| def _append_personalization(prompt: str, user_data: Dict) -> str: | |
| personalized_tool = user_data.get('personalized_tool_name') | |
| if user_data.get('has_personalized_data') and personalized_tool: | |
| prompt += ( | |
| "\n\n## Personalized Data Access:\n" | |
| f"Call `{personalized_tool}` exactly once after search tools to retrieve patient context. " | |
| "IMPORTANT: Analyze how the personalized data relates to the user's current query. " | |
| "Don't just list the data - explain how their specific conditions, medications, or history " | |
| "impacts the topic they're asking about. Make connections between their profile and the condition." | |
| ) | |
| else: | |
| prompt += ( | |
| "\n\nNo personalization data is available; omit the `## Personalization Recommendation` section." | |
| ) | |
| environmental_tool = user_data.get('environmental_tool_name') | |
| if user_data.get('has_environmental_data') and environmental_tool: | |
| prompt += ( | |
| "\n\n## Environmental Data Access:\n" | |
| f"Call `{environmental_tool}` exactly once when relevant. " | |
| "CRITICAL: Don't just list environmental statistics. Instead, analyze how the specific " | |
| "environmental factors (UV index, humidity, pollution, temperature) affect the skin condition " | |
| "the user is asking about. Explain the mechanisms and provide actionable advice based on their location." | |
| ) | |
| else: | |
| prompt += ( | |
| "\n\nEnvironmental conditions are unavailable; omit the `## Environmental Condition` section." | |
| ) | |
| if user_data.get('language', 'english').lower() != 'english': | |
| prompt += ( | |
| f"\n\nRespond in {user_data.get('language')} while keeping the JSON structure intact." | |
| ) | |
| return prompt | |
| def _append_document_guidance(prompt: str, user_data: Dict) -> str: | |
| document_info = user_data.get('document_info') or {} | |
| tool_name = user_data.get('document_tool_name') | |
| document_path = document_info.get('path') | |
| if document_info and tool_name and document_path: | |
| document_name = document_info.get('name') or 'Uploaded document' | |
| prompt += ( | |
| "\n\n## Uploaded Document\n" | |
| f"The user has provided `{document_name}` located at `{document_path}`. " | |
| f"Call `{tool_name}` exactly once to convert it to Markdown. " | |
| "Extract medically relevant findings and incorporate them into your response. " | |
| "DO NOT include the document path or name in the references array - only web/vector search results belong there." | |
| ) | |
| return prompt | |
| def _append_image_guidance(prompt: str, user_data: Dict) -> str: | |
| image_info = user_data.get('image_info') or {} | |
| tool_name = user_data.get('image_tool_name') | |
| image_path = image_info.get('path') | |
| if image_info and tool_name and image_path: | |
| image_name = image_info.get('name') or 'Uploaded image' | |
| prompt += ( | |
| "\n\n## Uploaded Image\n" | |
| f"The user shared `{image_name}` located at `{image_path}`. " | |
| f"Call `{tool_name}` exactly once to analyze the skin photo. " | |
| "Incorporate the analysis results into your response but DO NOT add image paths to references array. " | |
| "References should only contain search result URLs/sources." | |
| ) | |
| return prompt | |
| def _format_json_guidance(user_data: Dict) -> str: | |
| references_instruction = ( | |
| "Populate `references` ONLY with URLs/sources from get_web_search or get_vector_search results. " | |
| "NEVER include uploaded document paths, image paths, or tool names in references. Please use same citation which you will includein response if you get similar citation from web search and vector datat with same book and page number then consider itas singlecitationand usesmae ciation in reponse and in citation/refrences.section" | |
| if user_data.get('include_references', True) | |
| else "Set `references` to an empty array." | |
| ) | |
| keywords_instruction = ( | |
| "Provide 4-6 concise medical keywords related to the skin condition discussed." | |
| if user_data.get('include_keywords', True) | |
| else "Set `keywords` to an empty array." | |
| ) | |
| images_instruction = ( | |
| "Include up to 3 image URLs from get_image_search when they help visualize the condition. and If needed image url more than one tool calling then do it together not one by one" | |
| if user_data.get('include_images', True) | |
| else "Set `images` to an empty array." | |
| ) | |
| has_personalization = user_data.get('has_personalized_data') | |
| has_environmental = user_data.get('has_environmental_data') | |
| personalization_instruction = ( | |
| "In `## Personalization Recommendation`: Analyze how the user's specific medical history, " | |
| "current medications, allergies, and conditions relate to their query. Don't just list their data - " | |
| "explain the interactions and provide tailored advice based on their profile." | |
| if has_personalization | |
| else "Omit the `## Personalization Recommendation` section." | |
| ) | |
| environmental_instruction = ( | |
| "In `## Environmental Condition`: Explain how current environmental factors in their location " | |
| "specifically impact the skin condition they're asking about. Provide actionable advice for " | |
| "protection and management based on UV levels, humidity, pollution, and temperature." | |
| if has_environmental | |
| else "Omit the `## Environmental Condition` section." | |
| ) | |
| return ( | |
| "\nYour final response MUST be valid JSON Strictly follow below structure:\n" | |
| "{\n" | |
| " \"response\": \"Start with `## Response from References` containing evidence-based information with citations [1], [2]. " | |
| "Add `## Personalization Recommendation` analyzing how their profile affects this condition. " | |
| "Add `## Environmental Condition` explaining environmental impact on their skin.\",\n" | |
| " \"references\": [\"ONLY search result URLs/sources, NO file paths\"],\n" | |
| " \"keywords\": [\"keyword1\", \"keyword2\", ...],\n" | |
| " \"images\": [\"image_urls_from_search\"]\n" | |
| "}\n\n" | |
| f"{references_instruction}\n" | |
| f"{keywords_instruction}\n" | |
| f"{images_instruction}\n" | |
| f"{personalization_instruction}\n" | |
| f"{environmental_instruction}\n\n" | |
| "CRITICAL RULES:\n" | |
| "- Citations [1], [2] should ONLY reference search results, not uploaded files\n" | |
| "- Personalization and Environmental sections must analyze impact, not list data\n" | |
| "- Always call get_image_search for visual conditions unless non-medical query\n" | |
| ) | |
| def get_web_search_prompt(user_data: Dict) -> str: | |
| prompt_lines = [ | |
| "You are Dr. DermAI, an evidence-based dermatology consultant. Do not hallucination , if you donot get context and not have valid answerjust say I do not know.", | |
| "", | |
| "## QUERY ASSESSMENT:", | |
| "First determine if the query is medical/dermatological. If not, politely decline.", | |
| "Please invoke each tool one at a time, waiting for the response before invoking the next. If you need to use the same tool multiple times—for example, for different topics—then call that tool multiple times just vector serach adn web search tool, but still one at a time.", | |
| "if there multiple topics then call web search multiple time for each topic" | |
| "if the query of user required more than one web search then do it but given answer by combining and use same structure do not strictly give 2 separte json do not make any silly mistake." | |
| "## TOOL EXECUTION ORDER FOR MEDICAL QUERIES:", | |
| ] | |
| step_num = 1 | |
| if user_data.get('image_info') and user_data.get('image_tool_name'): | |
| prompt_lines.append( | |
| f"{step_num}. Call `{user_data.get('image_tool_name')}` FIRST if skin image uploaded" | |
| ) | |
| step_num += 1 | |
| prompt_lines.extend([ | |
| f"{step_num}. Call `get_web_search` with the user's query to gather current medical knowledge", | |
| f"{step_num + 1}. Call `get_image_search` with relevant terms (e.g., 'eczema rash', 'psoriasis patches') " | |
| f"to provide visual references UNLESS the condition is internal or non-visual", | |
| ]) | |
| step_num += 2 | |
| if user_data.get('document_info') and user_data.get('document_tool_name'): | |
| prompt_lines.append( | |
| f"{step_num}. Call `{user_data.get('document_tool_name')}` to analyze uploaded documents" | |
| ) | |
| step_num += 1 | |
| if user_data.get('has_personalized_data'): | |
| prompt_lines.append( | |
| f"{step_num}. Call `{user_data.get('personalized_tool_name')}` to get user's medical profile" | |
| ) | |
| step_num += 1 | |
| if user_data.get('has_environmental_data'): | |
| prompt_lines.append( | |
| f"{step_num}. Call `{user_data.get('environmental_tool_name')}` to get environmental factors" | |
| ) | |
| step_num += 1 | |
| prompt_lines.extend([ | |
| f"{step_num}. Synthesize all information into structured JSON response", | |
| "", | |
| "## IMAGE SEARCH GUIDELINES:", | |
| "ALWAYS use get_image_search for:", | |
| "- Rashes, lesions, spots, patches, bumps", | |
| "- Skin discoloration, texture changes", | |
| "- Hair/nail conditions with visible symptoms", | |
| "- Any condition where visual reference aids understanding", | |
| "", | |
| "SKIP get_image_search only for:", | |
| "- Non-medical queries", | |
| "- Internal conditions without skin manifestations", | |
| "- General health questions without visual components", | |
| "", | |
| "## IMPORTANT NOTES:", | |
| "- Maximum 1 call per tool type", | |
| "- Web search should focus on recent, evidence-based sources", | |
| "- Image search should use specific descriptive terms", | |
| "- Personalization must analyze relevance to query, not list profile data", | |
| "- Environmental section must explain impact on the specific condition", | |
| ]) | |
| prompt = "\n".join(prompt_lines) | |
| recent_history = user_data.get('recent_history') | |
| if recent_history: | |
| prompt += ( | |
| "\n\n## Recent Conversation:\n" | |
| f"{recent_history}\n" | |
| ) | |
| prompt = _append_document_guidance(prompt, user_data) | |
| prompt = _append_image_guidance(prompt, user_data) | |
| prompt += _format_json_guidance(user_data) | |
| prompt = _append_personalization(prompt, user_data) | |
| return prompt.strip() | |
| def get_vector_search_prompt(user_data: Dict) -> str: | |
| prompt_lines = [ | |
| "You are Dr. DermAI with access to a curated dermatology knowledge base. Do not hallucination , if you donot get context and not have valid answerjust say I do not know.", | |
| "", | |
| "## QUERY ASSESSMENT:", | |
| "Determine if medical/dermatological. If not, politely decline.", | |
| "Please invoke each tool one at a time, waiting for the response before invoking the next. If you need to use the same tool multiple times—for example, for different topics—then call that tool multiple times just vector serach adn web search tool, but still one at a time.", | |
| "if there multiple topics then call web search multiple time for each topic", | |
| "if the query of user required more than one vector query search then do it but given answer by combining and use same structure do not strictly give 2 separte json do not make any silly mistake." | |
| "## TOOL EXECUTION ORDER FOR MEDICAL QUERIES:", | |
| ] | |
| step_num = 1 | |
| if user_data.get('image_info') and user_data.get('image_tool_name'): | |
| prompt_lines.append( | |
| f"{step_num}. Call `{user_data.get('image_tool_name')}` FIRST if skin image uploaded" | |
| ) | |
| step_num += 1 | |
| prompt_lines.extend([ | |
| f"{step_num}. Call `get_vector_search` to retrieve authoritative medical passages", | |
| f"{step_num + 1}. Call `get_image_search` with condition-specific terms " | |
| f"(e.g., 'melanoma ABCDE', 'atopic dermatitis flexural') for visual aids", | |
| ]) | |
| step_num += 2 | |
| if user_data.get('document_info') and user_data.get('document_tool_name'): | |
| prompt_lines.append( | |
| f"{step_num}. Call `{user_data.get('document_tool_name')}` for document analysis" | |
| ) | |
| step_num += 1 | |
| if user_data.get('has_personalized_data'): | |
| prompt_lines.append( | |
| f"{step_num}. Call `{user_data.get('personalized_tool_name')}` for user profile" | |
| ) | |
| step_num += 1 | |
| if user_data.get('has_environmental_data'): | |
| prompt_lines.append( | |
| f"{step_num}. Call `{user_data.get('environmental_tool_name')}` for environmental data" | |
| ) | |
| step_num += 1 | |
| prompt_lines.extend([ | |
| f"{step_num}. Create comprehensive JSON response", | |
| "", | |
| "## IMAGE SEARCH REQUIREMENTS:", | |
| "MANDATORY for these conditions (use specific medical terms):", | |
| "- Inflammatory: psoriasis, eczema, dermatitis, rosacea", | |
| "- Infectious: fungal infections, bacterial infections, viral exanthems", | |
| "- Neoplastic: melanoma, basal cell, squamous cell, keratosis", | |
| "- Pigmentary: melasma, vitiligo, post-inflammatory changes", | |
| "- Hair/Nail: alopecia patterns, onychomycosis, nail psoriasis", | |
| "", | |
| "Use descriptive search terms like:", | |
| "- 'plaque psoriasis elbows knees'", | |
| "- 'atopic dermatitis infant face'", | |
| "- 'tinea corporis ring rash'", | |
| "", | |
| "## PERSONALIZATION ANALYSIS:", | |
| "When using personalized data:", | |
| "- Identify drug interactions with current medications", | |
| "- Consider contraindications based on health conditions", | |
| "- Adjust recommendations for age/pregnancy/allergies", | |
| "- Explain how their specific profile affects treatment options", | |
| "", | |
| "## ENVIRONMENTAL IMPACT ANALYSIS:", | |
| "When using environmental data:", | |
| "- High UV: Explain photosensitivity risks, sun protection needs", | |
| "- Low humidity: Impact on barrier function, moisturization needs", | |
| "- High pollution: Oxidative stress, cleansing requirements", | |
| "- Temperature extremes: Vasodilation/constriction effects", | |
| "", | |
| "Never just list the data - explain the mechanisms and provide targeted advice.", | |
| ]) | |
| prompt = "\n".join(prompt_lines) | |
| recent_history = user_data.get('recent_history') | |
| if recent_history: | |
| prompt += ( | |
| "\n\n## Recent Conversation:\n" | |
| f"{recent_history}\n" | |
| ) | |
| prompt = _append_document_guidance(prompt, user_data) | |
| prompt = _append_image_guidance(prompt, user_data) | |
| prompt += _format_json_guidance(user_data) | |
| prompt = _append_personalization(prompt, user_data) | |
| return prompt.strip() | |
| ``` | |
| ### app\services\chathistory.py | |
| ```python | |
| from app.database.database_query import DatabaseQuery | |
| import os | |
| import jwt | |
| from dotenv import load_dotenv | |
| from typing import Optional, Dict, List | |
| from bson import ObjectId | |
| from datetime import datetime | |
| load_dotenv() | |
| jwt_secret_key = os.getenv('JWT_SECRET_KEY') | |
| query = DatabaseQuery() | |
| class ChatSession: | |
| def __init__(self, token: str , session_id: str): | |
| self.token = token | |
| self.session_id = session_id | |
| self.chats = [] | |
| self.identity = self._decode_token(token) | |
| self.query = query | |
| def _decode_token(self, token: str) -> str: | |
| try: | |
| decoded_token = jwt.decode(token, jwt_secret_key, algorithms=["HS256"]) | |
| identity = decoded_token['sub'] | |
| return identity | |
| except jwt.ExpiredSignatureError: | |
| raise ValueError("The token has expired.") | |
| except jwt.InvalidTokenError: | |
| raise ValueError("Invalid token.") | |
| except Exception as e: | |
| raise ValueError(f"Failed to decode token: {e}") | |
| def get_user_preferences(self) -> dict: | |
| current_user = self.identity | |
| preferences = self.query.get_user_preferences(current_user) | |
| if preferences is not None: | |
| return preferences | |
| raise ValueError("Failed to fetch user preferences.") | |
| def get_personalized_recommendation(self) -> Optional[str]: | |
| current_user = self.identity | |
| response = self.query.get_latest_questionnaire(current_user) | |
| if not response: | |
| return None | |
| answers = response.get('answers', {}) | |
| if not answers: | |
| return None | |
| def format_answer(answer): | |
| if answer is None: | |
| return None | |
| if isinstance(answer, str): | |
| stripped_answer = answer.strip().lower() | |
| if stripped_answer in ['none', ''] or len(stripped_answer) < 3: | |
| return None | |
| return stripped_answer | |
| if isinstance(answer, list): | |
| filtered_answer = [item for item in answer if "Other" not in item | |
| and item.strip().lower() not in ['none', ''] | |
| and len(item.strip()) >= 3] | |
| return ", ".join(filtered_answer) if filtered_answer else None | |
| return answer | |
| questions = { | |
| "skinType": "How would you describe your skin type?", | |
| "currentConditions": "Do you currently have any skin conditions?", | |
| "autoImmuneConditions": "Do you have a history of autoimmune or hormonal conditions?", | |
| "allergies": "Do you have any known allergies to skincare ingredients?", | |
| "medications": "Are you currently taking any medications that might affect your skin?", | |
| "hormonal": "Do you experience hormonal changes that affect your skin?", | |
| "diet": "Have you noticed any foods that trigger skin reactions?", | |
| "diabetes": "Do you have diabetes?", | |
| "outdoorTime": "How much time do you spend outdoors during the day?", | |
| "sleep": "How many hours of sleep do you get on average?", | |
| "familyHistory": "Do you have a family history of skin conditions?", | |
| "products": "What skincare products are you currently using?" | |
| } | |
| valid_answers = {key: format_answer(answers.get(key)) | |
| for key in questions | |
| if format_answer(answers.get(key)) is not None} | |
| if not valid_answers: | |
| return None | |
| formatted_response = [] | |
| for key, answer in valid_answers.items(): | |
| question = questions.get(key) | |
| formatted_response.append(f"question: {question}\nUser answer: {answer}") | |
| profile = self.get_profile() | |
| name = profile.get('name', 'Unknown') | |
| age = profile.get('age', 'Unknown') | |
| return f"user name: {name}\nuser age: {age}\n\n" + "\n\n".join(formatted_response) | |
| def create_new_session(self, title: str = None) -> bool: | |
| current_user = self.identity | |
| session_id = str(ObjectId()) | |
| chat_session = { | |
| "user_id": current_user, | |
| "session_id": session_id, | |
| "created_at": datetime.utcnow(), | |
| "last_accessed": datetime.utcnow(), | |
| "title": title if title else "New Chat" | |
| } | |
| try: | |
| self.query.create_chat_session(chat_session) | |
| self.session_id = session_id | |
| return True | |
| except Exception as e: | |
| raise Exception(f"Failed to create session: {str(e)}") | |
| def verify_session_exists(self, session_id: str) -> bool: | |
| current_user = self.identity | |
| return self.query.verify_session(session_id, current_user) | |
| def validate_session(self, session_id: Optional[str] = None, title: str = None) -> bool: | |
| if not session_id or not session_id.strip(): | |
| return self.create_new_session(title=title) | |
| if self.verify_session_exists(session_id): | |
| self.session_id = session_id | |
| return self.load_chat_history() | |
| return self.create_new_session(title=title) | |
| def load_session(self, session_id: str) -> bool: | |
| return self.validate_session(session_id) | |
| def load_chat_history(self) -> bool: | |
| if not self.session_id: | |
| raise ValueError("No session ID provided.") | |
| current_user = self.identity | |
| try: | |
| self.chats = self.query.get_session_chats(self.session_id, current_user) | |
| return True | |
| except Exception as e: | |
| raise Exception(f"Failed to load chat history: {str(e)}") | |
| def get_chat_history(self) -> List[Dict]: | |
| return self.chats | |
| def format_history(self) -> str: | |
| formatted_chats = [] | |
| for chat in self.chats: | |
| query = chat.get('query', '').strip() | |
| response = chat.get('response', '').strip() | |
| if query and response: | |
| formatted_chats.append(f"User: {query}") | |
| formatted_chats.append(f"dermatologist Dr DermAI: {response}") | |
| return "\n".join(formatted_chats) if formatted_chats else "" | |
| def save_chat(self, chat_data: Dict) -> bool: | |
| if not self.session_id: | |
| raise ValueError("No active session to save chat") | |
| current_user = self.identity | |
| data = { | |
| "user_id": current_user, | |
| "session_id": self.session_id, | |
| "query": chat_data.get("query", "").strip(), | |
| "response": chat_data.get("response", "").strip(), | |
| "references": chat_data.get("references", []), | |
| "page_no": chat_data.get("page_no", []), | |
| "keywords": chat_data.get("keywords", []), | |
| "images": chat_data.get("images", []), | |
| "context": chat_data.get("context", ""), | |
| "timestamp": datetime.utcnow(), | |
| "chat_id": str(ObjectId()) | |
| } | |
| try: | |
| if self.query.create_chat(data): | |
| self.query.update_last_accessed_time(self.session_id) | |
| self.chats.append(data) | |
| return True | |
| return False | |
| except Exception as e: | |
| raise Exception(f"Failed to save chat: {str(e)}") | |
| def get_name_and_age(self): | |
| current_user = self.identity | |
| try: | |
| user_profile = self.query.get_user_profile(current_user) | |
| return user_profile | |
| except Exception as e: | |
| raise Exception(f"Failed to get user name and age: {str(e)}") | |
| def get_profile(self): | |
| current_user = self.identity | |
| try: | |
| user = query.get_user_profile(current_user) | |
| if not user: | |
| return {'error': 'User not found'} | |
| return { | |
| 'username': user['username'], | |
| 'email': user['email'], | |
| 'name': user['name'], | |
| 'age': user['age'], | |
| 'created_at': user['created_at'] | |
| } | |
| except Exception as e: | |
| return {'error': str(e)} | |
| def update_title(self , sessionId , new_title): | |
| query.update_chat_session_title(sessionId, new_title) | |
| def get_city(self) -> Optional[str]: | |
| current_user = self.identity | |
| try: | |
| location_data = self.query.get_location(current_user) | |
| if location_data and 'location' in location_data: | |
| return location_data['location'] | |
| return None | |
| except Exception as e: | |
| raise Exception(f"Failed to get user city: {str(e)}") | |
| def get_language(self) -> Optional[str]: | |
| current_user = self.identity | |
| try: | |
| language = self.query.get_user_language(current_user) | |
| if not language : | |
| return "english" | |
| else: | |
| return language | |
| return None | |
| except Exception as e: | |
| raise Exception(f"Failed to get user city: {str(e)}") | |
| def get_language(self) -> Optional[str]: | |
| current_user = self.identity | |
| try: | |
| language = self.query.get_user_language(current_user) | |
| if not language : | |
| return "english" | |
| else: | |
| return language | |
| return None | |
| except Exception as e: | |
| raise Exception(f"Failed to get user city: {str(e)}") | |
| def get_today_schedule(self): | |
| data = self.query.get_today_schedule(user_id=self.identity) | |
| if not data: | |
| return "" | |
| return data | |
| def save_schedule(self, schedule_data): | |
| return self.query.save_schedule(user_id=self.identity, schedule_data=schedule_data) | |
| def get_last_seven_days_schedules(self): | |
| data = self.query.get_last_seven_days_schedules(user_id=self.identity) | |
| if not data: | |
| return "" | |
| return data | |
| def save_details(self, session_id, context, query, response, rag_start_time, rag_end_time): | |
| data = self.query.save_rag_interaction( | |
| user_id="admin", | |
| session_id=session_id, | |
| context=context, | |
| query=query, | |
| response=response, | |
| rag_start_time=rag_start_time, | |
| rag_end_time=rag_end_time | |
| ) | |
| return data | |
| def get_save_details(self, page: int, page_size: int) -> dict: | |
| data = self.query.get_rag_interactions( | |
| user_id="admin", | |
| page=page, | |
| page_size=page_size | |
| ) | |
| return data | |
| def log_user_image_upload(self): | |
| """Log an image upload for the current user""" | |
| try: | |
| return self.query.log_image_upload(self.identity) | |
| except Exception as e: | |
| raise ValueError(f"Failed to log image upload: {e}") | |
| def get_user_daily_uploads(self): | |
| """Get number of images uploaded by current user in the last 24 hours""" | |
| try: | |
| return self.query.get_user_daily_uploads(self.identity) | |
| except Exception as e: | |
| raise ValueError(f"Failed to get user daily uploads: {e}") | |
| def get_user_last_upload_time(self): | |
| """Get the timestamp of current user's most recent image upload""" | |
| try: | |
| return self.query.get_user_last_upload_time(self.identity) | |
| except Exception as e: | |
| raise ValueError(f"Failed to get user's last upload time: {e}") | |
| ``` | |
| ### app\services\environmental_condition.py | |
| ```python | |
| import requests | |
| from bs4 import BeautifulSoup | |
| class EnvironmentalData: | |
| def __init__(self, city): | |
| self.city = city | |
| self.aqi_url = f"https://api.waqi.info/feed/{city}/?token=466cde4d55e7c5d6cc658ad9c391214b593f46b9" | |
| self.uv_url = f"https://www.weatheronline.co.uk/Pakistan/{city}/UVindex.html" | |
| def fetch_aqi_data(self): | |
| try: | |
| response = requests.get(self.aqi_url) | |
| data = response.json() | |
| if data["status"] == "ok": | |
| return { | |
| "Temperature": data["data"]["iaqi"].get("t", {}).get("v", "N/A"), | |
| "Humidity": data["data"]["iaqi"].get("h", {}).get("v", "N/A"), | |
| "Wind Speed": data["data"]["iaqi"].get("w", {}).get("v", "N/A"), | |
| "Pressure": data["data"]["iaqi"].get("p", {}).get("v", "N/A"), | |
| "AQI": data["data"].get("aqi", "N/A"), | |
| "Dominant Pollutant": data["data"].get("dominentpol", "N/A"), | |
| } | |
| return self.get_default_aqi_data() | |
| except: | |
| return self.get_default_aqi_data() | |
| def get_default_aqi_data(self): | |
| return { | |
| "Temperature": "N/A", | |
| "Humidity": "N/A", | |
| "Wind Speed": "N/A", | |
| "Pressure": "N/A", | |
| "AQI": "N/A", | |
| "Dominant Pollutant": "N/A" | |
| } | |
| def fetch_uv_data(self): | |
| try: | |
| response = requests.get(self.uv_url) | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| gr1_elements = soup.find_all(class_='gr1') | |
| if gr1_elements: | |
| tr_elements = gr1_elements[0].find_all('tr') | |
| if len(tr_elements) > 1: | |
| second_tr = tr_elements[1] | |
| td_elements = second_tr.find_all('td') | |
| if len(td_elements) > 1: | |
| return int(td_elements[1].text.strip()) | |
| return "N/A" | |
| except: | |
| return "N/A" | |
| def get_environmental_data(self): | |
| aqi_data = self.fetch_aqi_data() | |
| uv_index = self.fetch_uv_data() | |
| environmental_data = { | |
| "Temperature": f"{aqi_data['Temperature']} °C" if aqi_data['Temperature'] != "N/A" else "N/A", | |
| "Humidity": f"{aqi_data['Humidity']} %" if aqi_data['Humidity'] != "N/A" else "N/A", | |
| "Wind Speed": f"{aqi_data['Wind Speed']} m/s" if aqi_data['Wind Speed'] != "N/A" else "N/A", | |
| "Pressure": f"{aqi_data['Pressure']} hPa" if aqi_data['Pressure'] != "N/A" else "N/A", | |
| "Air Quality Index": aqi_data['AQI'], | |
| "Dominant Pollutant": aqi_data["Dominant Pollutant"], | |
| "UV_Index": uv_index | |
| } | |
| return environmental_data | |
| ``` | |
| ### app\services\google_agent_service.py | |
| ```python | |
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| import re | |
| from datetime import datetime, timezone | |
| from typing import Any, AsyncGenerator, Callable, Dict, List, Optional, Tuple | |
| from google.adk.agents import Agent | |
| from google.adk.agents.run_config import RunConfig, StreamingMode | |
| from google.adk.runners import InMemoryRunner | |
| from google.adk.tools import FunctionTool | |
| from google.genai import types | |
| from app.services.agentic_prompt import ( | |
| get_vector_search_prompt, | |
| get_web_search_prompt, | |
| ) | |
| from app.services.chathistory import ChatSession | |
| from app.services.environmental_condition import EnvironmentalData | |
| from app.services.tools import ( | |
| analyze_skin_image, | |
| convert_document_to_markdown, | |
| get_image_search, | |
| get_vector_search, | |
| get_web_search, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") or os.getenv("GEMINI_API_KEY") | |
| DEFAULT_MODEL_NAME = os.getenv("GEMINI_MODEL", "gemini-2.0-flash-exp") | |
| PERSONALIZED_TOOL_NAME = "get_personalized_context" | |
| ENVIRONMENT_TOOL_NAME = "get_environmental_context" | |
| DOCUMENT_CONVERSION_TOOL_NAME = "convert_uploaded_document" | |
| IMAGE_ANALYSIS_TOOL_NAME = "analyze_skin_image" | |
| if not os.getenv("GOOGLE_API_KEY") and GOOGLE_API_KEY: | |
| os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY | |
| if os.name == "nt": | |
| try: | |
| asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) | |
| except Exception: | |
| pass | |
| class GoogleAgentService: | |
| """Chat orchestrator that streams responses from a Google ADK agent.""" | |
| def __init__( | |
| self, | |
| token: str, | |
| session_id: Optional[str] = None, | |
| document: Optional[Dict[str, Any]] = None, | |
| image: Optional[Dict[str, Any]] = None, | |
| ) -> None: | |
| self.token = token | |
| self.session_id = session_id | |
| self.chat_session = ChatSession(token, session_id) | |
| self.user_preferences = self._load_user_preferences() | |
| self.language = self.chat_session.get_language() or "english" | |
| self.user_profile = self._load_user_profile() | |
| self.user_city = self.chat_session.get_city() | |
| self.environment_data = self._load_environmental_data() | |
| self.document = document | |
| self.image = image | |
| async def process_message_async( | |
| self, query: str | |
| ) -> AsyncGenerator[Dict[str, Any], None]: | |
| if not GOOGLE_API_KEY: | |
| error = "Google API key is not configured." | |
| logger.error(error) | |
| yield {"type": "error", "content": error} | |
| return | |
| try: | |
| session_id = self._ensure_valid_session(query) | |
| agent_mode = "web" if self.user_preferences.get("websearch") else "vector" | |
| user_data = self._prepare_user_data() | |
| agent = self._build_agent(agent_mode, user_data) | |
| runner = InMemoryRunner(agent=agent) | |
| await runner.session_service.create_session( | |
| app_name=runner.app_name, | |
| user_id=self.chat_session.identity, | |
| session_id=session_id, | |
| ) | |
| user_message = types.Content( | |
| role="user", | |
| parts=[types.Part(text=query)], | |
| ) | |
| run_config = RunConfig(streaming_mode=StreamingMode.SSE) | |
| tool_calls: List[Dict[str, Any]] = [] | |
| tool_call_map: Dict[str, Dict[str, Any]] = {} | |
| collected_images: List[str] = [] | |
| collected_references: List[str] = [] | |
| streamed_text = "" | |
| final_text = "" | |
| pending_token_buffer = "" | |
| def emit_word_chunks(delta: str, *, final: bool = False) -> List[str]: | |
| nonlocal pending_token_buffer | |
| pending_token_buffer += delta | |
| chunks: List[str] = [] | |
| while pending_token_buffer: | |
| match = re.search(r'\s', pending_token_buffer) | |
| if not match: | |
| break | |
| idx = match.end() | |
| token = pending_token_buffer[:idx] | |
| pending_token_buffer = pending_token_buffer[idx:] | |
| if token: | |
| chunks.append(token) | |
| if final and pending_token_buffer: | |
| chunks.append(pending_token_buffer) | |
| pending_token_buffer = "" | |
| return chunks | |
| async for event in runner.run_async( | |
| user_id=self.chat_session.identity, | |
| session_id=session_id, | |
| new_message=user_message, | |
| run_config=run_config, | |
| ): | |
| if event.error_message: | |
| logger.error("Agent error: %s", event.error_message) | |
| yield {"type": "error", "content": event.error_message} | |
| return | |
| for function_call in event.get_function_calls(): | |
| call_entry = { | |
| "id": function_call.id, | |
| "tool_name": function_call.name, | |
| "arguments": function_call.args or {}, | |
| } | |
| tool_call_map[function_call.id] = call_entry | |
| tool_calls.append(call_entry) | |
| yield { | |
| "type": "tool_call", | |
| "id": function_call.id, | |
| "tool_name": function_call.name, | |
| "arguments": function_call.args or {}, | |
| } | |
| for function_response in event.get_function_responses(): | |
| response_payload = function_response.response or {} | |
| call_entry = tool_call_map.get(function_response.id) | |
| if call_entry is not None: | |
| call_entry["result"] = response_payload | |
| if isinstance(response_payload, dict): | |
| if function_response.name == "get_image_search": | |
| collected_images.extend(response_payload.get("images", [])) | |
| if response_payload.get("references"): | |
| collected_references.extend(response_payload["references"]) | |
| yield { | |
| "type": "tool_result", | |
| "id": function_response.id, | |
| "tool_name": function_response.name, | |
| "result": response_payload, | |
| } | |
| text_segment = self._extract_text(event) | |
| if not text_segment: | |
| continue | |
| if event.partial: | |
| streamed_text += text_segment | |
| for token in emit_word_chunks(text_segment): | |
| yield {"type": "chunk", "content": token} | |
| else: | |
| final_text = text_segment | |
| if streamed_text and text_segment.startswith(streamed_text): | |
| delta = text_segment[len(streamed_text) :] | |
| else: | |
| delta = text_segment | |
| if text_segment: | |
| streamed_text = text_segment | |
| for token in emit_word_chunks(delta, final=True): | |
| yield {"type": "chunk", "content": token} | |
| for leftover in emit_word_chunks("", final=True): | |
| if leftover: | |
| yield {"type": "chunk", "content": leftover} | |
| parsed_response = self._parse_agent_response(final_text or streamed_text) | |
| response_text, keywords, response_images, response_refs = parsed_response | |
| merged_images = self._dedupe_list(collected_images + response_images) | |
| merged_references = self._dedupe_list(collected_references + response_refs) | |
| context_chunks: List[str] = [] | |
| if self.document and self.document.get("path"): | |
| context_chunks.append(f"document:{self.document.get('path')}") | |
| if self.image and self.image.get("path"): | |
| context_chunks.append(f"image:{self.image.get('path')}") | |
| context_payload = " ".join(context_chunks) | |
| chat_payload = { | |
| "query": query, | |
| "response": response_text, | |
| "references": merged_references, | |
| "keywords": keywords, | |
| "images": merged_images, | |
| "context": context_payload, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "session_id": session_id, | |
| "tool_calls": tool_calls, | |
| } | |
| saved = self.chat_session.save_chat(chat_payload) | |
| yield { | |
| "type": "completed", | |
| "saved": saved, | |
| "session_id": session_id, | |
| "response": response_text, | |
| "keywords": keywords, | |
| "references": merged_references, | |
| "images": merged_images, | |
| "tool_calls": tool_calls, | |
| } | |
| except Exception as exc: | |
| logger.error("Agent streaming failure: %s", exc, exc_info=True) | |
| yield {"type": "error", "content": f"Generation failed: {exc}"} | |
| def _build_agent(self, mode: str, user_data: Dict[str, Any]) -> Agent: | |
| prompt = ( | |
| get_web_search_prompt(user_data) | |
| if mode == "web" | |
| else get_vector_search_prompt(user_data) | |
| ) | |
| search_tool = get_web_search if mode == "web" else get_vector_search | |
| tools: List[FunctionTool] = [] | |
| if self.image and self.image.get("path"): | |
| tools.append(FunctionTool(self._create_image_tool())) | |
| tools.extend( | |
| [ | |
| FunctionTool(search_tool), | |
| FunctionTool(get_image_search), | |
| ] | |
| ) | |
| if self.document and self.document.get("path"): | |
| tools.append(FunctionTool(self._create_document_tool())) | |
| if user_data.get("has_personalized_data"): | |
| personalized_tool = self._create_personalized_data_tool( | |
| user_data.get("personalized_data", "") | |
| ) | |
| tools.append(FunctionTool(personalized_tool)) | |
| if user_data.get("has_environmental_data"): | |
| environmental_tool = self._create_environmental_data_tool( | |
| user_data.get("environmental_payload") or {} | |
| ) | |
| tools.append(FunctionTool(environmental_tool)) | |
| agent = Agent( | |
| name="DermAI", | |
| model=DEFAULT_MODEL_NAME, | |
| instruction=prompt, | |
| tools=tools, | |
| ) | |
| return agent | |
| def _create_document_tool(self) -> Callable[..., Dict[str, Any]]: | |
| document_record = self.document or {} | |
| allowed_path = (document_record.get("path") or "").strip() | |
| def run_document_conversion( | |
| file_path: Optional[str] = None, | |
| file_extension: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| target_path = (file_path or allowed_path or "").replace("\\", "/").strip() | |
| if not target_path: | |
| return { | |
| "status": "error", | |
| "error_message": "file_path is required to convert a document.", | |
| } | |
| allowed_normalized = allowed_path.replace("\\", "/").strip() | |
| if allowed_normalized and target_path != allowed_normalized: | |
| return { | |
| "status": "error", | |
| "error_message": "The provided file_path does not match the uploaded document for this session.", | |
| } | |
| result = convert_document_to_markdown( | |
| file_path=target_path, | |
| file_extension=file_extension or document_record.get("extension"), | |
| ) | |
| if result.get("status") == "success": | |
| text_content = result.get("text_content") or "" | |
| result["preview"] = text_content[:1000] | |
| result["character_count"] = len(text_content) | |
| if allowed_path and result.get("source_path"): | |
| # Normalize to relative path for transparency | |
| result["source_path"] = allowed_path | |
| return result | |
| run_document_conversion.__name__ = DOCUMENT_CONVERSION_TOOL_NAME | |
| run_document_conversion.__doc__ = ( | |
| "Convert the user's uploaded dermatology document into Markdown text. " | |
| "Provide the `file_path` exactly as supplied in the conversation context." | |
| ) | |
| return run_document_conversion | |
| def _create_image_tool(self) -> Callable[..., Dict[str, Any]]: | |
| image_record = self.image or {} | |
| allowed_path = (image_record.get("path") or "").strip() | |
| def run_image_analysis( | |
| file_path: Optional[str] = None, | |
| language: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| target_path = (file_path or allowed_path or "").replace("\\", "/").strip() | |
| if not target_path: | |
| return { | |
| "status": "error", | |
| "error_message": "file_path is required to analyse the image.", | |
| } | |
| allowed_normalized = allowed_path.replace("\\", "/").strip() | |
| if allowed_normalized and target_path != allowed_normalized: | |
| return { | |
| "status": "error", | |
| "error_message": "The provided file_path does not match the uploaded image for this session.", | |
| } | |
| result = analyze_skin_image( | |
| file_path=target_path, | |
| language=language or self.language, | |
| ) | |
| if result.get("status") == "success" and allowed_path: | |
| result["image_path"] = allowed_path | |
| return result | |
| run_image_analysis.__name__ = IMAGE_ANALYSIS_TOOL_NAME | |
| run_image_analysis.__doc__ = ( | |
| "Analyse the user's uploaded skin image. Provide the `file_path` exactly as supplied " | |
| "in the conversation context to run the classifier." | |
| ) | |
| return run_image_analysis | |
| def _create_personalized_data_tool(self, data: str) -> Callable[[], Dict[str, Any]]: | |
| sanitized = (data or "").strip() | |
| def personalized_tool() -> Dict[str, Any]: | |
| return { | |
| "status": "success", | |
| "generated_at": datetime.now(timezone.utc).isoformat(), | |
| "personalized_data": sanitized, | |
| } | |
| personalized_tool.__name__ = PERSONALIZED_TOOL_NAME | |
| personalized_tool.__doc__ = ( | |
| "Return questionnaire-derived personalization details for the current user." | |
| ) | |
| return personalized_tool | |
| def _create_environmental_data_tool( | |
| self, data: Dict[str, Any] | |
| ) -> Callable[[], Dict[str, Any]]: | |
| snapshot = dict(data) if isinstance(data, dict) else {} | |
| city = self.user_city | |
| def environmental_tool() -> Dict[str, Any]: | |
| return { | |
| "status": "success", | |
| "city": city, | |
| "retrieved_at": datetime.now(timezone.utc).isoformat(), | |
| "environmental_data": snapshot, | |
| } | |
| environmental_tool.__name__ = ENVIRONMENT_TOOL_NAME | |
| environmental_tool.__doc__ = ( | |
| "Return the cached environmental conditions for the user's location." | |
| ) | |
| return environmental_tool | |
| def _load_user_preferences(self) -> Dict[str, Any]: | |
| try: | |
| return self.chat_session.get_user_preferences() | |
| except Exception as exc: | |
| logger.warning("Failed to load user preferences: %s", exc) | |
| return { | |
| "websearch": False, | |
| "keywords": True, | |
| "references": True, | |
| "personalized_recommendations": False, | |
| "environmental_recommendations": False, | |
| } | |
| def _load_user_profile(self) -> Dict[str, Any]: | |
| try: | |
| profile = self.chat_session.get_name_and_age() or {} | |
| return { | |
| "name": profile.get("name", "Patient"), | |
| "age": profile.get("age", "Unknown"), | |
| } | |
| except Exception as exc: | |
| logger.warning("Failed to load profile: %s", exc) | |
| return {"name": "Patient", "age": "Unknown"} | |
| def _load_environmental_data(self) -> Optional[Dict[str, Any]]: | |
| try: | |
| if ( | |
| self.user_preferences.get("environmental_recommendations") | |
| and self.user_city | |
| ): | |
| data = EnvironmentalData(self.user_city).get_environmental_data() | |
| if data: | |
| return data | |
| except Exception as exc: | |
| logger.warning("Failed to load environmental data: %s", exc) | |
| return None | |
| def _load_personalized_data(self) -> str: | |
| try: | |
| if self.user_preferences.get("personalized_recommendations"): | |
| data = self.chat_session.get_personalized_recommendation() | |
| return data or "" | |
| except Exception as exc: | |
| logger.warning("Failed to load personalized data: %s", exc) | |
| return "" | |
| def _prepare_user_data(self) -> Dict[str, Any]: | |
| personalized_data = self._load_personalized_data() | |
| environmental_payload = ( | |
| dict(self.environment_data) | |
| if isinstance(self.environment_data, dict) | |
| else {} | |
| ) | |
| has_personalized_data = bool(personalized_data) | |
| has_environmental_data = bool(environmental_payload) | |
| document_info = None | |
| if self.document and self.document.get("path"): | |
| document_info = { | |
| "path": self.document.get("path"), | |
| "name": self.document.get("name") or "Uploaded document", | |
| "type": self.document.get("type"), | |
| "extension": self.document.get("extension"), | |
| } | |
| image_info = None | |
| if self.image and self.image.get("path"): | |
| image_info = { | |
| "path": self.image.get("path"), | |
| "name": self.image.get("name") or "Uploaded image", | |
| "type": self.image.get("type"), | |
| "extension": self.image.get("extension"), | |
| "prompt": self.image.get("prompt"), | |
| } | |
| return { | |
| "name": self.user_profile.get("name"), | |
| "age": self.user_profile.get("age"), | |
| "language": self.language, | |
| "personalized_recommendations": self.user_preferences.get( | |
| "personalized_recommendations" | |
| ), | |
| "environmental_recommendations": self.user_preferences.get( | |
| "environmental_recommendations" | |
| ), | |
| "personalized_data": personalized_data, | |
| "environmental_data": json.dumps(environmental_payload) | |
| if has_environmental_data | |
| else "", | |
| "has_personalized_data": has_personalized_data, | |
| "has_environmental_data": has_environmental_data, | |
| "personalized_tool_name": PERSONALIZED_TOOL_NAME | |
| if has_personalized_data | |
| else None, | |
| "environmental_tool_name": ENVIRONMENT_TOOL_NAME | |
| if has_environmental_data | |
| else None, | |
| "environmental_payload": environmental_payload, | |
| "include_keywords": self.user_preferences.get("keywords", True), | |
| "include_references": self.user_preferences.get("references", True), | |
| "include_images": True, | |
| "recent_history": self._get_recent_history(), | |
| "document_info": document_info, | |
| "document_tool_name": DOCUMENT_CONVERSION_TOOL_NAME | |
| if document_info | |
| else None, | |
| "image_info": image_info, | |
| "image_tool_name": IMAGE_ANALYSIS_TOOL_NAME if image_info else None, | |
| } | |
| def _get_recent_history(self, limit: int = 10) -> str: | |
| try: | |
| if not self.session_id: | |
| return "" | |
| self.chat_session.load_chat_history() | |
| history_items = self.chat_session.get_chat_history() or [] | |
| if not history_items: | |
| return "" | |
| recent = history_items[-limit:] | |
| formatted = [] | |
| for entry in recent: | |
| user_q = entry.get("query") or "" | |
| bot_a = entry.get("response") or "" | |
| if user_q: | |
| formatted.append(f"User: {user_q}") | |
| if bot_a: | |
| formatted.append(f"Dr DermAI: {bot_a}") | |
| return "\n".join(formatted[-limit * 2:]) | |
| except Exception as exc: | |
| logger.warning("Failed to load recent history: %s", exc) | |
| return "" | |
| def _ensure_valid_session(self, title: Optional[str] = None) -> str: | |
| if not self.session_id or not self.session_id.strip(): | |
| self.chat_session.create_new_session(title=title) | |
| self.session_id = self.chat_session.session_id | |
| else: | |
| try: | |
| if not self.chat_session.validate_session(self.session_id, title=title): | |
| self.chat_session.create_new_session(title=title) | |
| self.session_id = self.chat_session.session_id | |
| except Exception: | |
| self.chat_session.create_new_session(title=title) | |
| self.session_id = self.chat_session.session_id | |
| return self.session_id | |
| @staticmethod | |
| def _extract_text(event) -> str: | |
| if not event.content or not event.content.parts: | |
| return "" | |
| parts: List[str] = [] | |
| for part in event.content.parts: | |
| if part.text: | |
| parts.append(part.text) | |
| return "".join(parts) | |
| @staticmethod | |
| def _strip_code_fence(text: str) -> str: | |
| stripped = text.strip() | |
| if stripped.startswith("```") and stripped.endswith("```"): | |
| body = stripped.strip("`") | |
| if body.lower().startswith("json"): | |
| body = body[4:] | |
| stripped = body | |
| return stripped.strip() | |
| def _parse_agent_response(self, text: str) -> Tuple[str, List[str], List[str], List[str]]: | |
| cleaned = self._strip_code_fence(text) | |
| if not cleaned: | |
| return "", [], [], [] | |
| try: | |
| payload = json.loads(cleaned) | |
| except json.JSONDecodeError: | |
| logger.warning("Unable to parse agent JSON response; returning raw text.") | |
| return cleaned, [], [], [] | |
| if not isinstance(payload, dict): | |
| return cleaned, [], [], [] | |
| response_text = payload.get("response") or cleaned | |
| raw_keywords = payload.get("keywords", []) | |
| if isinstance(raw_keywords, list): | |
| keywords = [str(item).strip() for item in raw_keywords if str(item).strip()] | |
| elif raw_keywords: | |
| keywords = [str(raw_keywords).strip()] | |
| else: | |
| keywords = [] | |
| raw_images = payload.get("images", []) | |
| if isinstance(raw_images, list): | |
| images = [str(item).strip() for item in raw_images if str(item).strip()] | |
| elif raw_images: | |
| images = [str(raw_images).strip()] | |
| else: | |
| images = [] | |
| raw_refs = payload.get("references", []) | |
| if isinstance(raw_refs, list): | |
| references = [str(item).strip() for item in raw_refs if str(item).strip()] | |
| elif raw_refs: | |
| references = [str(raw_refs).strip()] | |
| else: | |
| references = [] | |
| return response_text, keywords, images, references | |
| @staticmethod | |
| def _dedupe_list(items: List[str]) -> List[str]: | |
| seen = set() | |
| deduped: List[str] = [] | |
| for item in items: | |
| if not item: | |
| continue | |
| if item in seen: | |
| continue | |
| seen.add(item) | |
| deduped.append(item) | |
| return deduped | |
| ``` | |
| ### app\services\image_classification_vit.py | |
| ```python | |
| import torch | |
| from PIL import Image | |
| import torch.nn.functional as F | |
| from torchvision import transforms | |
| from transformers import AutoModelForImageClassification, AutoConfig | |
| import requests | |
| from io import BytesIO | |
| import os | |
| from huggingface_hub import hf_hub_download | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN") | |
| # Set environment variables for better network handling | |
| os.environ['HF_HUB_DOWNLOAD_TIMEOUT'] = '300' # Increase timeout to 5 minutes | |
| os.environ['TRANSFORMERS_OFFLINE'] = '0' # Ensure online mode | |
| class SkinDiseaseClassifier: | |
| CLASS_NAMES = [ | |
| "Acne", "Basal Cell Carcinoma", "Benign Keratosis-like Lesions", "Chickenpox", "Eczema", "Healthy Skin", | |
| "Measles", "Melanocytic Nevi", "Melanoma", "Monkeypox", "Psoriasis Lichen Planus and related diseases", | |
| "Seborrheic Keratoses and other Benign Tumors", "Tinea Ringworm Candidiasis and other Fungal Infections", | |
| "Vitiligo", "Warts Molluscum and other Viral Infections" | |
| ] | |
| def __init__(self, repo_id="muhammadnoman76/skin-disease-classifier", cache_dir=None): | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.repo_id = repo_id | |
| self.model = self.load_trained_model() | |
| self.transform = self.get_inference_transform() | |
| def load_trained_model(self): | |
| model_path= hf_hub_download(repo_id=self.repo_id, filename="healthy.pth", token=HUGGINGFACE_TOKEN) | |
| checkpoint = torch.load(model_path, map_location=self.device, weights_only=True) | |
| classifier_weight = checkpoint['model_state_dict']['classifier.3.weight'] | |
| num_classes = classifier_weight.size(0) | |
| config = AutoConfig.from_pretrained("google/vit-base-patch16-224-in21k", num_labels=num_classes) | |
| model = AutoModelForImageClassification.from_pretrained( | |
| "google/vit-base-patch16-224-in21k", | |
| config=config, | |
| ignore_mismatched_sizes=True | |
| ) | |
| in_features = model.classifier.in_features | |
| model.classifier = torch.nn.Sequential( | |
| torch.nn.Linear(in_features, 512), | |
| torch.nn.ReLU(), | |
| torch.nn.Dropout(0.3), | |
| torch.nn.Linear(512, num_classes) | |
| ) | |
| model.load_state_dict(checkpoint['model_state_dict']) | |
| model = model.to(self.device) | |
| if self.device.type == 'cuda': | |
| model = model.half() | |
| model.eval() | |
| return model | |
| def get_inference_transform(self): | |
| return transforms.Compose([ | |
| transforms.Resize(256), | |
| transforms.CenterCrop(224), | |
| transforms.ToTensor(), | |
| transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), | |
| ]) | |
| def load_image(self, image_input): | |
| try: | |
| if isinstance(image_input, Image.Image): | |
| image = image_input | |
| elif isinstance(image_input, str): | |
| if image_input.startswith(('http://', 'https://')): | |
| response = requests.get(image_input) | |
| image = Image.open(BytesIO(response.content)) | |
| else: | |
| if not os.path.exists(image_input): | |
| raise FileNotFoundError(f"Image file not found: {image_input}") | |
| image = Image.open(image_input) | |
| elif hasattr(image_input, 'read'): | |
| image = Image.open(image_input) | |
| else: | |
| raise ValueError("Unsupported image input type") | |
| return image.convert('RGB') | |
| except Exception as e: | |
| raise Exception(f"Error loading image: {str(e)}") | |
| def predict(self, image_input, confidence_threshold=0.3): | |
| try: | |
| image = self.load_image(image_input) | |
| image_tensor = self.transform(image).unsqueeze(0) | |
| if self.device.type == 'cuda': | |
| image_tensor = image_tensor.half() | |
| image_tensor = image_tensor.to(self.device) | |
| with torch.inference_mode(): | |
| outputs = self.model(pixel_values=image_tensor).logits | |
| probabilities = F.softmax(outputs, dim=1) | |
| confidence, predicted = torch.max(probabilities, 1) | |
| confidence = confidence.item() | |
| predicted_class_idx = predicted.item() | |
| confidence_percentage = round(confidence * 100, 2) | |
| predicted_class_name = self.CLASS_NAMES[predicted_class_idx] | |
| return predicted_class_name, confidence_percentage | |
| except Exception as e: | |
| raise Exception(f"Error during prediction: {str(e)}") | |
| ``` | |
| ### app\services\llm_model.py | |
| ```python | |
| import json | |
| from dotenv import load_dotenv | |
| import os | |
| import re | |
| from g4f.client import Client | |
| load_dotenv() | |
| class Model: | |
| def __init__(self): | |
| self.gemini_api_key = os.getenv("GEMINI_API_KEY") | |
| self.gemini_model = os.getenv("GEMINI_MODEL") | |
| # Removed genai client initialization since it's not properly imported | |
| def fall_back_llm(self, prompt): | |
| """Fallback method using gpt-4o-mini when Gemini fails""" | |
| try: | |
| response = Client().chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[{"role": "user", "content": prompt}], | |
| web_search=False | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| return f"Both primary and fallback models failed. Error: {str(e)}" | |
| def send_message_openrouter(self, prompt): | |
| # Since genai client is not available, use fallback model directly | |
| return self.fall_back_llm(prompt) | |
| def llm(self, prompt, query): | |
| # Since genai client is not available, use fallback model directly | |
| combined_content = f"{prompt}\n\n{query}" | |
| return self.fall_back_llm(combined_content) | |
| def llm_image(self, text, image): | |
| # Image processing with LLM is not available without genai client | |
| print(f"Error in llm_image: genai client not available") | |
| return f"Error: Image processing not available - genai client not configured" | |
| def clean_json_response(self, response_text): | |
| """Clean the model's response to extract valid JSON.""" | |
| start = response_text.find('[') | |
| end = response_text.rfind(']') + 1 | |
| if start != -1 and end != -1: | |
| json_str = re.sub(r",\s*]", "]", response_text[start:end]) | |
| return json_str | |
| return response_text | |
| def skinScheduler(self, prompt, max_retries=3): | |
| """Generate a skincare schedule with retries and cleaning.""" | |
| # Since genai client is not available, use fallback model directly | |
| try: | |
| fallback_response = self.fall_back_llm(prompt) | |
| cleaned_fallback = self.clean_json_response(fallback_response) | |
| return json.loads(cleaned_fallback) | |
| except json.JSONDecodeError: | |
| return {"error": "Failed to produce valid JSON"} | |
| except Exception as e: | |
| return {"error": f"Model failed: {str(e)}"} | |
| ``` | |
| ### app\services\prompts.py | |
| ```python | |
| SKIN_CARE_SCHEDULER = """As a skincare expert, generate a daily schedule based on: | |
| - User's skin profile: {personalized_condition} | |
| - Current environmental conditions: {environmental_values} | |
| - Historical routines: {historical_data} | |
| Create EXACTLY 5 entries in this JSON format: | |
| [ | |
| {{ | |
| "time": "6:00 AM - 8:00 AM", | |
| "recommendation": "Cleanse with [Product Name]", | |
| "icon": "💧", | |
| "category": "morning" | |
| }}, | |
| {{ | |
| "time": "8:00 AM - 10:00 AM", | |
| "recommendation": "Apply [Sunscreen Name] SPF 50", | |
| "icon": "☀️", | |
| "category": "morning" | |
| }}, | |
| {{ | |
| "time": "12:00 PM - 2:00 PM", | |
| "recommendation": "Reapply sunscreen", | |
| "icon": "🌤️", | |
| "category": "afternoon" | |
| }}, | |
| {{ | |
| "time": "6:00 PM - 8:00 PM", | |
| "recommendation": "Evening cleansing routine", | |
| "icon": "🌙", | |
| "category": "evening" | |
| }}, | |
| {{ | |
| "time": "9:00 PM - 11:00 PM", | |
| "recommendation": "Night serum application", | |
| "icon": "✨", | |
| "category": "night" | |
| }} | |
| ] | |
| Important rules: | |
| 1. Use only double quotes | |
| 2. Maintain category order: morning, morning, afternoon, evening, night | |
| 3. Include specific product names from historical data when available | |
| 4. Never add comments or text outside the JSON array | |
| 5. Time ranges must follow "HH:MM AM/PM - HH:MM AM/PM" format | |
| 6. Use appropriate emojis for each activity | |
| """ | |
| DEFAULT_SCHEDULE = [ | |
| { | |
| "time": "6:00 AM - 8:00 AM", | |
| "recommendation": "Cleanse with a gentle cleanser", | |
| "icon": "💧", | |
| "category": "Dummy" | |
| }, | |
| { | |
| "time": "8:00 AM - 10:00 AM", | |
| "recommendation": "Apply sunscreen SPF 30+", | |
| "icon": "☀️", | |
| "category": "morning" | |
| }, | |
| { | |
| "time": "12:00 PM - 2:00 PM", | |
| "recommendation": "Reapply sunscreen if needed", | |
| "icon": "🌤️", | |
| "category": "afternoon" | |
| }, | |
| { | |
| "time": "6:00 PM - 8:00 PM", | |
| "recommendation": "Evening cleansing routine", | |
| "icon": "🌙", | |
| "category": "evening" | |
| }, | |
| { | |
| "time": "9:00 PM - 11:00 PM", | |
| "recommendation": "Apply night cream or serum", | |
| "icon": "✨", | |
| "category": "night" | |
| } | |
| ] | |
| ADVICE_REPORT_SUGGESTION = """ | |
| ## Based on your Image Analysis: | |
| We have identified the presence of {diseases_name} with a confidence level of {diseases_detection_confidence}. | |
| {response} | |
| """ | |
| URDU_ADVICE_REPORT_SUGGESTION = """ | |
| ## آپ کی تصویر کے تجزیے کی بنیاد پر: | |
| ہم نے {diseases_detection_confidence} کی اعتماد کی سطح کے ساتھ {diseases_name} کی موجودگی کی شناخت کی ہے۔ | |
| {response} | |
| """ | |
| SKIN_NON_SKIN_PROMPT = """ | |
| You are an expert at analyzing whether an image shows human skin or not. | |
| Your task is to determine if the given image should be processed by a skin disease model. | |
| Examine the image carefully and provide a clear two-word response: | |
| answer <YES> if the image shows human skin, otherwise answer <NO>. | |
| """ | |
| ``` | |
| ### app\services\RAG_evaluation.py | |
| ```python | |
| from typing import Dict, Any | |
| import re | |
| from datetime import datetime | |
| import nltk | |
| from nltk.corpus import stopwords | |
| from nltk.stem import WordNetLemmatizer | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from app.services.chathistory import ChatSession | |
| import os | |
| # # Set NLTK data path to a writable location | |
| # nltk_data_dir = os.path.join(os.getcwd(), "nltk_data") | |
| # os.makedirs(nltk_data_dir, exist_ok=True) | |
| # nltk.data.path.append(nltk_data_dir) | |
| # # Download NLTK resources to the specified directory | |
| # nltk.download('stopwords', download_dir=nltk_data_dir) | |
| # nltk.download('wordnet', download_dir=nltk_data_dir) | |
| class RAGEvaluation: | |
| def __init__(self, token: str, page: int = 1, page_size: int = 5): | |
| self.chat_session = ChatSession(token, "session_id") | |
| self.page = page | |
| self.page_size = page_size | |
| self.lemmatizer = WordNetLemmatizer() | |
| self.stop_words = set(stopwords.words('english')) | |
| def _preprocess_text(self, text: str) -> str: | |
| text = re.sub(r'[^a-zA-Z0-9\s]', '', text.lower()) | |
| words = text.split() | |
| lemmatized_words = [self.lemmatizer.lemmatize(word) for word in words] | |
| filtered_words = [word for word in lemmatized_words if word not in self.stop_words] | |
| seen = set() | |
| cleaned_words = [] | |
| for word in filtered_words: | |
| if word not in seen: | |
| seen.add(word) | |
| cleaned_words.append(word) | |
| return ' '.join(cleaned_words) | |
| def _calculate_cosine_similarity(self, context: str, response: str) -> float: | |
| clean_context = self._preprocess_text(context) | |
| clean_response = self._preprocess_text(response) | |
| vectorizer = TfidfVectorizer(vocabulary=clean_context.split()) | |
| try: | |
| context_vector = vectorizer.fit_transform([clean_context]) | |
| response_vector = vectorizer.transform([clean_response]) | |
| return cosine_similarity(context_vector, response_vector)[0][0] | |
| except ValueError: | |
| return 0.0 | |
| def _calculate_time_difference(self, start_time: str, end_time: str) -> float: | |
| start = datetime.fromisoformat(start_time) | |
| end = datetime.fromisoformat(end_time) | |
| return (end - start).total_seconds() | |
| def _process_interaction(self, interaction: Dict[str, Any]) -> Dict[str, Any]: | |
| processed = interaction.copy() | |
| processed['accuracy'] = self._calculate_cosine_similarity( | |
| interaction['context'], | |
| interaction['response'] | |
| ) | |
| processed['overall_time'] = self._calculate_time_difference( | |
| interaction['rag_start_time'], | |
| interaction['rag_end_time'] | |
| ) | |
| return processed | |
| def generate_evaluation_report(self) -> Dict[str, Any]: | |
| raw_data = self.chat_session.get_save_details( | |
| page=self.page, | |
| page_size=self.page_size | |
| ) | |
| return { | |
| 'total_interactions': raw_data['total_interactions'], | |
| 'page': raw_data['page'], | |
| 'page_size': raw_data['page_size'], | |
| 'total_pages': raw_data['total_pages'], | |
| 'results': [self._process_interaction(i) for i in raw_data['results']] | |
| } | |
| ``` | |
| ### app\services\skincare_scheduler.py | |
| ```python | |
| import json | |
| import logging | |
| from app.services.chathistory import ChatSession | |
| from app.services.llm_model import Model | |
| from app.services.environmental_condition import EnvironmentalData | |
| from app.services.prompts import SKIN_CARE_SCHEDULER, DEFAULT_SCHEDULE | |
| class SkinCareScheduler: | |
| def __init__(self, token, session_id): | |
| self.token = token | |
| self.session_id = session_id | |
| self.chat_session = ChatSession(token, session_id) | |
| self.user_city = self.chat_session.get_city() or '' | |
| self.environment_data = EnvironmentalData(self.user_city) | |
| def get_historical_data(self): | |
| """Retrieve the last 7 days of schedules.""" | |
| schedules = self.chat_session.get_last_seven_days_schedules() | |
| return [schedule["schedule_data"] for schedule in schedules] | |
| def createTable(self): | |
| """Generate and return a daily skincare schedule.""" | |
| try: | |
| # Check for an existing valid schedule | |
| existing_schedule = self.chat_session.get_today_schedule() | |
| if existing_schedule and isinstance(existing_schedule.get("schedule_data"), list): | |
| return json.dumps(existing_schedule["schedule_data"], indent=2) | |
| # Gather input data | |
| historical_data = self.get_historical_data() | |
| personalized_condition = self.chat_session.get_personalized_recommendation() or "No specific skin conditions provided" | |
| environmental_data = self.environment_data.get_environmental_data() | |
| # Format the prompt | |
| formatted_prompt = SKIN_CARE_SCHEDULER.format( | |
| personalized_condition=personalized_condition, | |
| environmental_values=json.dumps(environmental_data, indent=2), | |
| historical_data=json.dumps(historical_data, indent=2) | |
| ) | |
| # Generate schedule with the model | |
| model = Model() | |
| result = model.skinScheduler(formatted_prompt) | |
| # Handle errors by falling back to default schedule | |
| if isinstance(result, dict) and "error" in result: | |
| logging.error(f"Model error: {result['error']}") | |
| result = DEFAULT_SCHEDULE | |
| # Validate basic structure (optional, but ensures 5 entries) | |
| if not isinstance(result, list) or len(result) != 5: | |
| logging.warning("Generated schedule invalid; using default.") | |
| result = DEFAULT_SCHEDULE | |
| # Save and return the schedule | |
| self.chat_session.save_schedule(result) | |
| return json.dumps(result, indent=2) | |
| except Exception as e: | |
| logging.error(f"Schedule generation failed: {str(e)}") | |
| return json.dumps(DEFAULT_SCHEDULE, indent=2) | |
| ``` | |
| ### app\services\tools.py | |
| ```python | |
| import io | |
| import logging | |
| import os | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| from PIL import Image | |
| from app.services.vector_database_search import VectorDatabaseSearch | |
| from app.services.websearch import WebSearch | |
| from MagicConvert import MagicConvert | |
| from app.services.prompts import ( | |
| ADVICE_REPORT_SUGGESTION, | |
| URDU_ADVICE_REPORT_SUGGESTION, | |
| ) | |
| from app.services.image_classification_vit import SkinDiseaseClassifier | |
| try: | |
| from pillow_heif import register_heif_opener | |
| register_heif_opener() | |
| _HEIF_SUPPORTED = True | |
| except Exception: | |
| _HEIF_SUPPORTED = False | |
| logger = logging.getLogger(__name__) | |
| def _clean_query(query: str) -> str: | |
| return (query or '').strip() | |
| _UPLOADS_ROOT = Path( | |
| os.getenv( | |
| "DERMAI_UPLOAD_DIR", | |
| Path(__file__).resolve().parent.parent.parent / "uploads", | |
| ) | |
| ).resolve() | |
| _magic_converter = MagicConvert() | |
| _skin_classifier: Optional[SkinDiseaseClassifier] = None | |
| _skin_classifier_error: Optional[str] = None | |
| def get_web_search(query: str, num_results: int = 4) -> Dict[str, Any]: | |
| """Return up-to-date dermatology information from the public web.""" | |
| query = _clean_query(query) | |
| if not query: | |
| return {"status": "error", "error_message": "Query is required."} | |
| try: | |
| web = WebSearch(num_results=max(1, min(num_results or 4, 8))) | |
| raw_results = web.search(query) or [] | |
| formatted: List[Dict[str, Any]] = [] | |
| references: List[str] = [] | |
| for idx, item in enumerate(raw_results, start=1): | |
| link = item.get('link') or item.get('url') or '' | |
| snippet = item.get('text') or item.get('snippet') or '' | |
| title = item.get('title') or '' | |
| entry = { | |
| "source_number": idx, | |
| "title": title, | |
| "link": link, | |
| "snippet": snippet, | |
| } | |
| formatted.append(entry) | |
| if link: | |
| references.append(link) | |
| if not formatted: | |
| return { | |
| "status": "error", | |
| "error_message": f"No web results found for '{query}'.", | |
| } | |
| return { | |
| "status": "success", | |
| "results": formatted, | |
| "references": references, | |
| } | |
| except Exception as exc: | |
| logger.exception("Web search failed: %s", exc) | |
| return { | |
| "status": "error", | |
| "error_message": f"Web search failed: {exc}", | |
| } | |
| def get_vector_search(query: str, top_k: int = 5) -> Dict[str, Any]: | |
| """Return dermatology knowledge from the curated vector database.""" | |
| query = _clean_query(query) | |
| if not query: | |
| return {"status": "error", "error_message": "Query is required."} | |
| try: | |
| vector = VectorDatabaseSearch() | |
| if not vector.is_available(): | |
| return { | |
| "status": "error", | |
| "error_message": "Vector database is not available.", | |
| } | |
| raw_results = vector.search(query, top_k=max(1, min(top_k or 5, 10))) | |
| if not raw_results: | |
| return { | |
| "status": "error", | |
| "error_message": f"No vector results found for '{query}'.", | |
| } | |
| formatted: List[Dict[str, Any]] = [] | |
| references: List[str] = [] | |
| for idx, item in enumerate(raw_results, start=1): | |
| source = item.get('source') or 'Unknown' | |
| page = item.get('page') or 0 | |
| content = item.get('content') or '' | |
| confidence = item.get('confidence') | |
| formatted.append( | |
| { | |
| "source_number": idx, | |
| "source": source, | |
| "page": page, | |
| "content": content, | |
| "confidence": confidence, | |
| } | |
| ) | |
| ref_label = f"{source} (page {page})" if page else source | |
| references.append(ref_label) | |
| return { | |
| "status": "success", | |
| "results": formatted, | |
| "references": references, | |
| } | |
| except Exception as exc: | |
| logger.exception("Vector search failed: %s", exc) | |
| return { | |
| "status": "error", | |
| "error_message": f"Vector search failed: {exc}", | |
| } | |
| def get_image_search(query: str, max_images: int = 3) -> Dict[str, Any]: | |
| """Return dermatology-relevant image URLs for the given query.""" | |
| query = _clean_query(query) | |
| if not query: | |
| return {"status": "error", "error_message": "Query is required."} | |
| try: | |
| searcher = WebSearch(max_images=max(1, min(max_images or 3, 8))) | |
| images = searcher.search_images(query) or [] | |
| unique_images = [] | |
| seen = set() | |
| for url in images: | |
| if url and url not in seen: | |
| seen.add(url) | |
| unique_images.append(url) | |
| if len(unique_images) >= max_images: | |
| break | |
| if not unique_images: | |
| return { | |
| "status": "error", | |
| "error_message": f"No images found for '{query}'.", | |
| } | |
| return {"status": "success", "images": unique_images} | |
| except Exception as exc: | |
| logger.exception("Image search failed: %s", exc) | |
| return { | |
| "status": "error", | |
| "error_message": f"Image search failed: {exc}", | |
| } | |
| def _get_classifier() -> SkinDiseaseClassifier: | |
| global _skin_classifier, _skin_classifier_error | |
| if _skin_classifier is not None: | |
| return _skin_classifier | |
| if _skin_classifier_error: | |
| raise RuntimeError(_skin_classifier_error) | |
| try: | |
| _skin_classifier = SkinDiseaseClassifier() | |
| return _skin_classifier | |
| except Exception as exc: | |
| _skin_classifier_error = str(exc) | |
| raise | |
| def analyze_skin_image( | |
| file_path: str, | |
| language: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| """Assess an uploaded image for dermatology analysis. | |
| The tool verifies the file exists in the uploads directory and runs the skin | |
| disease classifier directly. When confidence is below the 50% threshold it | |
| reports the uncertainty instead of a diagnosis and nudges the user toward | |
| alternative options. | |
| """ | |
| if not file_path or not str(file_path).strip(): | |
| return {"status": "error", "error_message": "file_path is required."} | |
| try: | |
| candidate = Path(file_path) | |
| if not candidate.is_absolute(): | |
| candidate = (_UPLOADS_ROOT / candidate).resolve() | |
| else: | |
| candidate = candidate.resolve() | |
| uploads_root = _UPLOADS_ROOT | |
| uploads_root.mkdir(parents=True, exist_ok=True) | |
| uploads_root = uploads_root.resolve() | |
| if uploads_root not in candidate.parents and candidate != uploads_root: | |
| return { | |
| "status": "error", | |
| "error_message": "Access to the requested file path is not permitted.", | |
| } | |
| if not candidate.exists() or not candidate.is_file(): | |
| return { | |
| "status": "error", | |
| "error_message": f"Image not found at '{candidate}'.", | |
| } | |
| try: | |
| with candidate.open("rb") as fh: | |
| image_bytes = fh.read() | |
| image_stream = io.BytesIO(image_bytes) | |
| pil_image = Image.open(image_stream) | |
| pil_image.load() | |
| pil_image = pil_image.convert("RGB") | |
| except Exception as exc: | |
| logger.exception("Unable to open image for analysis: %s", exc) | |
| signature = image_bytes[:12] if 'image_bytes' in locals() else b'' | |
| looks_like_heif = signature.startswith(b"\x00\x00\x00\x20ftyp") and any( | |
| codec in signature for codec in (b"heic", b"heix", b"hevc", b"avif") | |
| ) | |
| if looks_like_heif and not _HEIF_SUPPORTED: | |
| return { | |
| "status": "error", | |
| "error_message": ( | |
| "The uploaded image appears to be in HEIC/AVIF format, which is not supported. " | |
| "Please convert the photo to JPG or PNG and try again." | |
| ), | |
| } | |
| return { | |
| "status": "error", | |
| "error_message": f"Unable to open the image: {exc}", | |
| } | |
| user_language = (language or "english").strip().lower() | |
| # Skip skin-vs-non-skin classification and directly proceed to disease classification | |
| try: | |
| classifier = _get_classifier() | |
| except Exception as exc: | |
| logger.error("Skin classifier unavailable: %s", exc) | |
| return { | |
| "status": "error", | |
| "error_message": ( | |
| "Skin analysis is temporarily unavailable. " | |
| "Ensure the classifier weights are accessible (set SKIN_CLASSIFIER_WEIGHTS to a local file " | |
| "or configure HUGGINGFACE_TOKEN with network access) and try again." | |
| ), | |
| "details": str(exc), | |
| } | |
| disease_name, confidence = classifier.predict(pil_image, 5) | |
| confidence_value = float(confidence) | |
| below_threshold = confidence_value < 50.0 | |
| if user_language == "urdu": | |
| unable_message = ( | |
| "معذرت، میں اس وقت جلد کی بیماری کی درست شناخت نہیں کر پا رہا۔ " | |
| "براہ کرم بہتر روشنی میں ایک قریب کی تصویر اپ لوڈ کریں یا اپنی تشخیص کے لیے ڈاکٹر سے رجوع کریں۔" | |
| ) | |
| diagnosis_message = ( | |
| f"مجھے لگتا ہے کہ یہ {disease_name} ہے اور میری اعتماد کی سطح {confidence_value:.2f}% ہے۔ " | |
| "براہ کرم حتمی تشخیص کے لیے ماہر ڈرماٹولوجسٹ سے مشورہ کریں۔" | |
| ) | |
| else: | |
| unable_message = ( | |
| "I’m not confident enough to identify a condition from this photo. " | |
| "Please upload a clearer close-up image with good lighting, or consult a dermatologist for an in-person diagnosis." | |
| ) | |
| diagnosis_message = ( | |
| f"I suspect this may be {disease_name} with a confidence of {confidence_value:.2f}%. " | |
| "Please consult a dermatologist for a professional evaluation and treatment plan." | |
| ) | |
| message = unable_message if below_threshold else diagnosis_message | |
| if user_language == "urdu": | |
| advice_lines = ["## تصویری تجزیہ کی بنیاد پر"] | |
| if not below_threshold: | |
| advice_lines.append( | |
| f"- مشتبہ بیماری: {disease_name} (اعتماد {confidence_value:.2f}%)." | |
| ) | |
| advice_lines.append( | |
| "- یہ نتیجہ تخمینی ہے، حتمی تشخیص کے لئے ماہر امراض جلد سے رجوع کریں۔" | |
| ) | |
| else: | |
| advice_lines.append( | |
| "- ماڈل کا اعتماد 50٪ سے کم ہے، اس لئے قابل اعتماد تشخیص ممکن نہیں۔" | |
| ) | |
| advice_lines.append( | |
| "- متاثرہ جلد کی واضح اور روشنی میں تصویر لیں اور فلٹرز سے پرہیز کریں۔" | |
| ) | |
| advice_lines.append( | |
| "- اگر علامات بگڑتی یا پھیلتی ہیں تو فوری طبی معائنہ کروائیں۔" | |
| ) | |
| else: | |
| advice_lines = ["## Based on the Image Analysis"] | |
| if not below_threshold: | |
| advice_lines.append( | |
| f"- Suspected condition: {disease_name} (confidence {confidence_value:.2f}%)." | |
| ) | |
| advice_lines.append( | |
| "- This prediction is probabilistic; please obtain confirmation from a dermatologist." | |
| ) | |
| else: | |
| advice_lines.append( | |
| "- The model's confidence is below 50%, so no reliable diagnosis is available." | |
| ) | |
| advice_lines.append( | |
| "- Capture well-lit close-up photos of the affected area and avoid heavy filters." | |
| ) | |
| advice_lines.append( | |
| "- Seek urgent in-person care if symptoms worsen or spread rapidly." | |
| ) | |
| advice = "\n".join(advice_lines) | |
| return { | |
| "status": "success", | |
| "is_skin": True, | |
| "diagnosis": None if below_threshold else disease_name, | |
| "confidence": confidence_value, | |
| "confidence_below_threshold": below_threshold, | |
| "message": message, | |
| "advice": advice, | |
| "image_path": str(candidate.relative_to(uploads_root)).replace("\\", "/"), | |
| } | |
| except Exception as exc: | |
| logger.exception("Unexpected error during image analysis: %s", exc) | |
| return { | |
| "status": "error", | |
| "error_message": f"Unexpected error: {exc}", | |
| } | |
| def convert_document_to_markdown( | |
| file_path: str, | |
| file_extension: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| """Convert an uploaded document into Markdown text for downstream analysis. | |
| Args: | |
| file_path: Path to the uploaded file. Accepts absolute paths or paths | |
| relative to the backend uploads directory. | |
| file_extension: Optional hint such as ".pdf" or ".docx" when the | |
| extension cannot be inferred from the filename. | |
| Returns: | |
| A dictionary containing the Markdown representation (`text_content`), | |
| detected title, and basic metadata. On failure the dictionary includes a | |
| descriptive error message instead of raising. | |
| """ | |
| if not file_path or not str(file_path).strip(): | |
| return {"status": "error", "error_message": "file_path is required."} | |
| try: | |
| candidate = Path(file_path) | |
| if not candidate.is_absolute(): | |
| candidate = (_UPLOADS_ROOT / candidate).resolve() | |
| else: | |
| candidate = candidate.resolve() | |
| uploads_root = _UPLOADS_ROOT | |
| uploads_root.mkdir(parents=True, exist_ok=True) | |
| uploads_root = uploads_root.resolve() | |
| if uploads_root not in candidate.parents and candidate != uploads_root: | |
| return { | |
| "status": "error", | |
| "error_message": "Access to the requested file path is not permitted.", | |
| } | |
| if not candidate.exists() or not candidate.is_file(): | |
| return { | |
| "status": "error", | |
| "error_message": f"File not found at '{candidate}'.", | |
| } | |
| extension_hint = file_extension or candidate.suffix | |
| if extension_hint and not extension_hint.startswith('.'): | |
| extension_hint = f".{extension_hint}" | |
| conversion = _magic_converter.magic( | |
| str(candidate), | |
| file_extension=extension_hint, | |
| ) | |
| text_content = conversion.text_content if conversion else "" | |
| character_count = len(text_content) | |
| return { | |
| "status": "success", | |
| "text_content": text_content, | |
| "title": getattr(conversion, "title", None), | |
| "character_count": character_count, | |
| "source_path": str(candidate), | |
| } | |
| except Exception as exc: | |
| logger.exception("Unexpected error during document conversion: %s", exc) | |
| return { | |
| "status": "error", | |
| "error_message": f"Unexpected error: {exc}", | |
| } | |
| ``` | |
| ### app\services\vector_database_search.py | |
| ```python | |
| import os | |
| import uuid | |
| from langchain_community.document_loaders import PyPDFLoader | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_google_genai import GoogleGenerativeAIEmbeddings | |
| from langchain_qdrant import Qdrant | |
| from qdrant_client import QdrantClient, models | |
| from qdrant_client.http.exceptions import UnexpectedResponse | |
| from dotenv import load_dotenv | |
| import logging | |
| load_dotenv() | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| os.environ["GOOGLE_API_KEY"] = os.getenv("GEMINI_API_KEY") | |
| QDRANT_URL = os.getenv("QDRANT_URL") | |
| QDRANT_API_KEY = os.getenv("QDRANT_API_KEY") | |
| QDRANT_COLLECTION_NAME = os.getenv("QDRANT_COLLECTION_NAME", "dermatology_docs") | |
| class VectorDatabaseSearch: | |
| def __init__(self, collection_name=QDRANT_COLLECTION_NAME): | |
| self.collection_name = collection_name | |
| self.embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001") | |
| self.client = None | |
| self.vectorstore = None | |
| self.is_initialized = False | |
| # Initialize connection | |
| self._initialize_connection() | |
| def _initialize_connection(self): | |
| """Initialize Qdrant connection with proper error handling""" | |
| try: | |
| # Check if credentials are available | |
| if not QDRANT_URL or not QDRANT_API_KEY: | |
| logger.warning("Qdrant credentials not found. Vector search will be disabled.") | |
| self.is_initialized = False | |
| return | |
| # Initialize Qdrant client | |
| self.client = QdrantClient( | |
| url=QDRANT_URL, | |
| api_key=QDRANT_API_KEY, | |
| timeout=30 # Add timeout | |
| ) | |
| # Test connection | |
| self.client.get_collections() | |
| # Initialize collection | |
| self._initialize_collection() | |
| # Initialize vector store | |
| self.vectorstore = Qdrant( | |
| client=self.client, | |
| collection_name=self.collection_name, | |
| embeddings=self.embeddings | |
| ) | |
| self.is_initialized = True | |
| logger.info(f"Successfully connected to Qdrant collection: {self.collection_name}") | |
| except UnexpectedResponse as e: | |
| logger.error(f"Authentication error with Qdrant: {e}") | |
| self.is_initialized = False | |
| except Exception as e: | |
| logger.error(f"Error initializing Qdrant connection: {e}") | |
| self.is_initialized = False | |
| def _initialize_collection(self): | |
| """Initialize Qdrant collection if it doesn't exist""" | |
| if not self.client: | |
| return | |
| try: | |
| collections = self.client.get_collections() | |
| collection_exists = any(c.name == self.collection_name for c in collections.collections) | |
| if not collection_exists: | |
| self.client.create_collection( | |
| collection_name=self.collection_name, | |
| vectors_config=models.VectorParams( | |
| size=768, | |
| distance=models.Distance.COSINE | |
| ) | |
| ) | |
| logger.info(f"Created new collection: {self.collection_name}") | |
| else: | |
| # Check if collection has data | |
| collection_info = self.client.get_collection(self.collection_name) | |
| logger.info(f"Collection {self.collection_name} exists with {collection_info.points_count} points") | |
| except Exception as e: | |
| logger.error(f"Error initializing collection: {e}") | |
| self.is_initialized = False | |
| def add_pdf(self, pdf_path): | |
| """Add PDF to vector database""" | |
| if not self.is_initialized: | |
| logger.error("Vector database not initialized. Cannot add PDF.") | |
| return False | |
| try: | |
| loader = PyPDFLoader(pdf_path) | |
| docs = loader.load() | |
| splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) | |
| split_docs = splitter.split_documents(docs) | |
| book_name = os.path.splitext(os.path.basename(pdf_path))[0] | |
| logger.info(f"Processing {book_name} with {len(split_docs)} chunks") | |
| for doc in split_docs: | |
| doc.metadata = { | |
| "source": book_name, | |
| "page": doc.metadata.get('page', 1), | |
| "id": str(uuid.uuid4()) | |
| } | |
| self.vectorstore.add_documents(split_docs) | |
| logger.info(f"Successfully added {len(split_docs)} chunks from {book_name}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error adding PDF: {e}") | |
| return False | |
| def search(self, query, top_k=5): | |
| """Search documents based on query""" | |
| if not self.is_initialized: | |
| logger.warning("Vector database not initialized. Returning empty results.") | |
| return [] | |
| try: | |
| # Check if collection has any data | |
| collection_info = self.client.get_collection(self.collection_name) | |
| if collection_info.points_count == 0: | |
| logger.warning(f"Collection {self.collection_name} is empty. No documents to search.") | |
| return [] | |
| # Perform similarity search | |
| results = self.vectorstore.similarity_search_with_score(query, k=top_k) | |
| formatted = [] | |
| for doc, score in results: | |
| # Convert score to confidence percentage (cosine similarity) | |
| confidence = (1 - score) * 100 # Qdrant returns distance, not similarity | |
| formatted.append({ | |
| "source": doc.metadata.get('source', 'Unknown'), | |
| "page": doc.metadata.get('page', 0), | |
| "content": doc.page_content[:500], | |
| "confidence": round(confidence, 2) | |
| }) | |
| logger.info(f"Found {len(formatted)} results for query: {query[:50]}...") | |
| return formatted | |
| except Exception as e: | |
| logger.error(f"Search error: {e}") | |
| return [] | |
| def get_book_info(self): | |
| """Retrieve list of unique book sources in the collection""" | |
| if not self.is_initialized: | |
| logger.warning("Vector database not initialized.") | |
| return [] | |
| try: | |
| # Check if collection exists | |
| collections = self.client.get_collections() | |
| if not any(c.name == self.collection_name for c in collections.collections): | |
| logger.info(f"Collection {self.collection_name} does not exist yet") | |
| return [] | |
| # Get collection info | |
| collection_info = self.client.get_collection(self.collection_name) | |
| if collection_info.points_count == 0: | |
| logger.info("Collection is empty") | |
| return [] | |
| # Get sample of points to extract sources | |
| points = self.client.scroll( | |
| collection_name=self.collection_name, | |
| limit=min(1000, collection_info.points_count), | |
| with_payload=True, | |
| with_vectors=False | |
| )[0] | |
| books = set() | |
| for point in points: | |
| if hasattr(point, 'payload') and point.payload: | |
| if 'metadata' in point.payload and 'source' in point.payload['metadata']: | |
| books.add(point.payload['metadata']['source']) | |
| elif 'source' in point.payload: | |
| books.add(point.payload['source']) | |
| logger.info(f"Found {len(books)} unique books in collection") | |
| return list(books) | |
| except Exception as e: | |
| logger.error(f"Error retrieving book info: {e}") | |
| return [] | |
| def is_available(self): | |
| """Check if vector database is available and has data""" | |
| if not self.is_initialized: | |
| return False | |
| try: | |
| collection_info = self.client.get_collection(self.collection_name) | |
| return collection_info.points_count > 0 | |
| except: | |
| return False | |
| ``` | |
| ### app\services\websearch.py | |
| ```python | |
| import re | |
| import warnings | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import urllib.parse | |
| import time | |
| import random | |
| from urllib.parse import urlparse, parse_qs | |
| warnings.simplefilter('ignore', requests.packages.urllib3.exceptions.InsecureRequestWarning) | |
| class WebSearch: | |
| def __init__(self, num_results=4, max_chars_per_page=6000, max_images=10): | |
| self.num_results = num_results | |
| self.max_chars_per_page = max_chars_per_page | |
| self.reference = [] | |
| self.results = [] | |
| self.max_images = max_images | |
| self.headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', | |
| 'Accept-Language': 'en-US,en;q=0.5', | |
| 'Accept-Encoding': 'gzip, deflate', | |
| 'DNT': '1', | |
| 'Connection': 'keep-alive', | |
| } | |
| # Common domains for direct content | |
| self.content_domains = [ | |
| "wikipedia.org", "webmd.com", "mayoclinic.org", "healthline.com", "nih.gov", | |
| "clevelandclinic.org", "nhs.uk", "cdc.gov", "medlineplus.gov", "hopkinsmedicine.org" | |
| ] | |
| # Ad and tracking domains to filter out | |
| self.blocked_domains = [ | |
| "ad.doubleclick.net", "googleadservices.com", "bing.com/aclick", "duckduckgo.com/y.js", | |
| "amazon.com/s", "ads.google.com", "analytics", "tracker", "pixel", "adservice" | |
| ] | |
| def is_valid_url(self, url): | |
| """Check if URL is valid and not an ad/tracking URL""" | |
| if not url or len(url) < 10: | |
| return False | |
| try: | |
| parsed = urlparse(url) | |
| # Check if URL has a valid scheme and netloc | |
| if not all([parsed.scheme, parsed.netloc]): | |
| return False | |
| # Filter out ad/tracking URLs | |
| domain = parsed.netloc.lower() | |
| path = parsed.path.lower() | |
| query = parsed.query.lower() | |
| # Block URLs containing ad-related indicators | |
| for blocked in self.blocked_domains: | |
| if blocked in domain or blocked in path: | |
| return False | |
| # Block URLs with ad-related query parameters | |
| if any(param in query for param in ["ad", "click", "track", "clkid", "msclkid"]): | |
| return False | |
| # Extra check for redirect URLs | |
| if "redirect" in path or "goto" in path or "go.php" in path: | |
| return False | |
| # Reject extremely long URLs (often tracking) | |
| if len(url) > 500: | |
| return False | |
| return True | |
| except Exception: | |
| return False | |
| def clean_url(self, url): | |
| """Clean the URL by removing tracking parameters""" | |
| try: | |
| parsed = urlparse(url) | |
| # List of known tracking parameters to remove | |
| tracking_params = [ | |
| 'utm_', 'ref_', 'ref=', 'refid', 'fbclid', 'gclid', 'msclkid', 'dclid', | |
| 'zanpid', 'icid', 'igshid', 'mc_eid', '_hsenc', 'mkt_tok', 'yclid' | |
| ] | |
| # Parse query parameters | |
| query_params = parse_qs(parsed.query) | |
| # Remove tracking parameters | |
| filtered_params = { | |
| k: v for k, v in query_params.items() | |
| if not any(tracker in k.lower() for tracker in tracking_params) | |
| } | |
| # Rebuild query string | |
| clean_query = urllib.parse.urlencode(filtered_params, doseq=True) if filtered_params else "" | |
| # Reconstruct URL | |
| clean_url = urllib.parse.urlunparse(( | |
| parsed.scheme, | |
| parsed.netloc, | |
| parsed.path, | |
| parsed.params, | |
| clean_query, | |
| "" # Remove fragment | |
| )) | |
| return clean_url | |
| except Exception: | |
| # If any error occurs, return the original URL | |
| return url | |
| def extract_real_url_from_redirect(self, url): | |
| """Extract the actual URL from a redirect URL""" | |
| try: | |
| parsed = urlparse(url) | |
| # Handle DuckDuckGo redirects | |
| if "duckduckgo.com" in parsed.netloc and "u3=" in parsed.query: | |
| params = parse_qs(parsed.query) | |
| if "u3" in params and params["u3"]: | |
| redirect_url = params["u3"][0] | |
| # Handle nested redirects (like Bing redirects inside DuckDuckGo) | |
| if "bing.com/aclick" in redirect_url: | |
| bing_parsed = urlparse(redirect_url) | |
| bing_params = parse_qs(bing_parsed.query) | |
| if "u" in bing_params and bing_params["u"]: | |
| decoded_url = urllib.parse.unquote(bing_params["u"][0]) | |
| return self.clean_url(decoded_url) | |
| return self.clean_url(redirect_url) | |
| # Handle Bing redirects | |
| if "bing.com/aclick" in url: | |
| params = parse_qs(parsed.query) | |
| if "u" in params and params["u"]: | |
| return self.clean_url(urllib.parse.unquote(params["u"][0])) | |
| return url | |
| except Exception: | |
| return url | |
| def extract_text_from_webpage(self, html_content): | |
| soup = BeautifulSoup(html_content, "html.parser") | |
| # Remove non-content elements | |
| for tag in soup(["script", "style", "header", "footer", "nav", "form", "svg", | |
| "aside", "iframe", "noscript", "img", "figure", "button"]): | |
| tag.extract() | |
| # Extract text and normalize spacing | |
| text = ' '.join(soup.stripped_strings) | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| return text | |
| def search(self, query): | |
| results = [] | |
| encoded_query = urllib.parse.quote(query) | |
| url = f'https://html.duckduckgo.com/html/?q={encoded_query}' | |
| try: | |
| with requests.Session() as session: | |
| session.headers.update(self.headers) | |
| response = session.get(url, timeout=10) | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Getting more results than needed to account for filtering | |
| search_results = soup.find_all('div', class_='result')[:self.num_results * 2] | |
| links = [] | |
| # Extract and process links | |
| for result in search_results: | |
| link_tag = result.find('a', class_='result__a') | |
| if not link_tag or not link_tag.get('href'): | |
| continue | |
| original_link = link_tag['href'] | |
| # Process link to get the actual URL | |
| clean_link = self.extract_real_url_from_redirect(original_link) | |
| # Validate the URL | |
| if self.is_valid_url(clean_link): | |
| links.append(clean_link) | |
| # Prioritize content domains | |
| prioritized_links = [] | |
| other_links = [] | |
| for link in links: | |
| if any(domain in link for domain in self.content_domains): | |
| prioritized_links.append(link) | |
| else: | |
| other_links.append(link) | |
| # Combine prioritized links first, then others | |
| final_links = prioritized_links + other_links | |
| # Limit to unique links up to num_results | |
| unique_links = [] | |
| seen_domains = set() | |
| for link in final_links: | |
| domain = urlparse(link).netloc | |
| if domain not in seen_domains and len(unique_links) < self.num_results: | |
| unique_links.append(link) | |
| seen_domains.add(domain) | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| def fetch_page(link): | |
| try: | |
| # Random delay to avoid being blocked | |
| time.sleep(random.uniform(0.5, 1.5)) | |
| # Set a longer timeout for reliable fetching | |
| page_response = session.get(link, timeout=10, verify=False) | |
| # Only process HTML content | |
| if 'text/html' not in page_response.headers.get('Content-Type', ''): | |
| return None | |
| page_soup = BeautifulSoup(page_response.text, 'lxml') | |
| # Remove non-content elements | |
| [tag.decompose() for tag in page_soup(['script', 'style', 'header', 'footer', | |
| 'nav', 'form', 'iframe', 'noscript'])] | |
| # Extract text with better formatting | |
| text = ' '.join(page_soup.stripped_strings) | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| title = page_soup.title.string if page_soup.title else "Untitled Page" | |
| return { | |
| 'link': link, | |
| 'title': title, | |
| 'text': text[:self.max_chars_per_page] | |
| } | |
| except Exception as e: | |
| print(f"Error fetching {link}: {str(e)}") | |
| return None | |
| with ThreadPoolExecutor(max_workers=min(len(unique_links), 4)) as executor: | |
| future_to_url = {executor.submit(fetch_page, link): link for link in unique_links} | |
| for future in as_completed(future_to_url): | |
| result = future.result() | |
| if result: | |
| results.append(result) | |
| return results | |
| except Exception as e: | |
| print(f"Search error: {str(e)}") | |
| return [] | |
| def search_images(self, query): | |
| images = [] | |
| encoded_query = urllib.parse.quote(query) | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', | |
| 'Accept-Language': 'en-US,en;q=0.5', | |
| 'Accept-Encoding': 'gzip, deflate', | |
| 'DNT': '1', | |
| 'Connection': 'keep-alive', | |
| 'Upgrade-Insecure-Requests': '1' | |
| } | |
| # Try multiple sources for better results | |
| image_sources = [ | |
| f"https://www.google.com/search?q={encoded_query}&tbm=isch&hl=en", | |
| f"https://www.bing.com/images/search?q={encoded_query}&form=HDRSC2&first=1", | |
| f"https://duckduckgo.com/?q={encoded_query}&iar=images&iax=images&ia=images" | |
| ] | |
| for source_url in image_sources: | |
| try: | |
| time.sleep(random.uniform(0.5, 1.0)) # Polite delay | |
| response = requests.get(source_url, headers=headers, verify=False, timeout=10) | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Extract image URLs from img tags | |
| for img in soup.find_all('img'): | |
| src = img.get('src', '') | |
| if src and src.startswith('http') and self.is_image_url(src): | |
| cleaned_url = self.clean_url(src) | |
| if self.is_valid_image(cleaned_url): | |
| images.append(cleaned_url) | |
| # Extract image URLs from scripts (useful for Google Images) | |
| for script in soup.find_all('script'): | |
| if script.string: | |
| urls = re.findall(r'https?://[^\s<>"\']+?(?:\.(?:jpg|jpeg|png|gif|bmp|webp))', script.string) | |
| for url in urls: | |
| cleaned_url = self.clean_url(url) | |
| if self.is_valid_image(cleaned_url): | |
| images.append(cleaned_url) | |
| except Exception as e: | |
| print(f"Error searching images at {source_url}: {str(e)}") | |
| continue | |
| # Remove duplicates while preserving order | |
| seen = set() | |
| unique_images = [x for x in images if not (x in seen or seen.add(x))] | |
| # Filter out small images and suspicious URLs | |
| filtered_images = [img for img in unique_images if self.is_valid_image(img)] | |
| return filtered_images[:self.max_images] | |
| def is_image_url(self, url): | |
| """Check if URL points to an image file""" | |
| image_extensions = ('.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp') | |
| return any(url.lower().endswith(ext) for ext in image_extensions) | |
| def is_valid_image(self, url): | |
| """Additional validation for image URLs""" | |
| try: | |
| # Reject tiny images (often icons) and tracking pixels | |
| if re.search(r'(?:icon|pixel|tracker|thumb|logo|button)\d*\.(?:jpg|png|gif)', url.lower()): | |
| return False | |
| # Avoid suspicious domains for images | |
| parsed = urlparse(url) | |
| if any(bad in parsed.netloc.lower() for bad in ["tracker", "pixel", "counter", "ad."]): | |
| return False | |
| # Avoid very short URLs (likely not valid images) | |
| if len(url) < 30: | |
| return False | |
| return True | |
| except: | |
| return False | |
| ``` | |
| ### app\services\wheel.py | |
| ```python | |
| from app.services.chathistory import ChatSession | |
| from app.services.environmental_condition import EnvironmentalData | |
| def map_air_quality_index(aqi): | |
| if aqi <= 50: | |
| return {"displayValue": "Good", "value": aqi, "color": "#00C853"} | |
| elif aqi <= 100: | |
| return {"displayValue": "Moderate", "value": aqi, "color": "#FFB74D"} | |
| elif aqi <= 150: | |
| return {"displayValue": "Unhealthy Tolerate", "value": aqi, "color": "#FF7043"} | |
| elif aqi <= 200: | |
| return {"displayValue": "Unhealthy", "value": aqi, "color": "#E53935"} | |
| else: | |
| return {"displayValue": "Very Unhealthy", "value": aqi, "color": "#8E24AA"} | |
| def map_pollution_level(aqi): | |
| if aqi <= 50: | |
| return 20 | |
| elif aqi <= 100: | |
| return 40 | |
| elif aqi <= 150: | |
| return 60 | |
| elif aqi <= 200: | |
| return 80 | |
| else: | |
| return 100 | |
| class CityNotProvidedError(Exception): | |
| pass | |
| class EnvironmentalConditions: | |
| def __init__(self, session_id): | |
| self.session_id = session_id | |
| self.chat_session = ChatSession(session_id, "session_id") | |
| self.user_city = self.chat_session.get_city() | |
| if not self.user_city: | |
| raise CityNotProvidedError("City information is required but not provided") | |
| self.city = self.user_city | |
| self.environment_data = EnvironmentalData(self.city) | |
| def get_conditon(self): | |
| data = self.environment_data.get_environmental_data() | |
| formatted_data = [ | |
| { | |
| "label": "Humidity", | |
| # Handle decimal values by converting to float first | |
| "value": int(float(data['Humidity'].strip(' %'))), | |
| "color": "#4FC3F7", | |
| "icon": "FaTint", | |
| "type": "numeric" | |
| }, | |
| { | |
| "label": "UV Rays", | |
| "value": data['UV_Index'] * 10, | |
| "color": "#FFB74D", | |
| "icon": "FaSun", | |
| "type": "numeric" | |
| }, | |
| { | |
| "label": "Pollution", | |
| "value": map_pollution_level(data['Air Quality Index']), | |
| "color": "#F06292", | |
| "icon": "FaLeaf", | |
| "type": "numeric" | |
| }, | |
| { | |
| "label": "Air Quality", | |
| **map_air_quality_index(data['Air Quality Index']), | |
| "icon": "FaCloud", | |
| "type": "categorical" | |
| }, | |
| { | |
| "label": "Wind", | |
| "value": float(data['Wind Speed'].strip(' m/s')) * 10, | |
| "color": "#9575CD", | |
| "icon": "FaWind", | |
| "type": "numeric" | |
| }, | |
| { | |
| "label": "Temperature", | |
| "value": int(float(data['Temperature'].strip(' °C'))), | |
| "color": "#FF7043", | |
| "icon": "FaThermometerHalf", | |
| "type": "numeric" | |
| } | |
| ] | |
| return formatted_data | |
| ``` | |
| ### app.py | |
| ```python | |
| import uvicorn | |
| from app.main import app | |
| if __name__ == "__main__": | |
| uvicorn.run("app.main:app", host="0.0.0.0", port=5000, reload=True) | |
| ``` | |
| ### docker-compose.yml | |
| ```yaml | |
| version: '3' | |
| services: | |
| api: | |
| build: . | |
| ports: | |
| - "5000:5000" | |
| volumes: | |
| - ./temp:/app/temp | |
| - ./uploads:/app/uploads | |
| env_file: | |
| - .env | |
| restart: unless-stopped | |
| ``` | |
| ### document_code.py | |
| ```python | |
| import os | |
| from pathlib import Path | |
| def generate_tree(path, prefix=""): | |
| """Generate tree structure""" | |
| items = [] | |
| try: | |
| entries = sorted(path.iterdir(), key=lambda x: (x.is_file(), x.name)) | |
| # Filter out ignored entries first | |
| filtered_entries = [e for e in entries if not should_ignore(e)] | |
| for i, entry in enumerate(filtered_entries): | |
| is_last = i == len(filtered_entries) - 1 | |
| current_prefix = "└── " if is_last else "├── " | |
| items.append(f"{prefix}{current_prefix}{entry.name}") | |
| if entry.is_dir(): | |
| next_prefix = prefix + (" " if is_last else "│ ") | |
| items.extend(generate_tree(entry, next_prefix)) | |
| except PermissionError: | |
| pass | |
| return items | |
| def should_ignore(path): | |
| """Check if file/folder should be ignored""" | |
| # Explicitly check for virtual environments and common ignored folders | |
| if path.name in {'.venv', 'venv', '__pycache__', '.git', 'node_modules', '.idea', '.vscode'}: | |
| return True | |
| # Check if file is inside .venv or venv folder | |
| if '.venv' in path.parts or 'venv' in path.parts: | |
| return True | |
| # Ignore all hidden files/folders (starting with .) | |
| if path.name.startswith('.'): | |
| return True | |
| # Ignore specific files | |
| ignore_files = { | |
| 'CODE_DOCUMENTATION.md', 'CODE_DOCUMENTATION.ipynb', | |
| 'CODE_DOCUMENTATION.html', 'CODE_DOCUMENTATION.pdf', | |
| 'CODE_DOCUMENTATION.docx', 'CODE_DOCUMENTATION.txt', | |
| 'CODE_DOCUMENTATION.csv', 'CODE_DOCUMENTATION.xlsx', | |
| 'CODE_DOCUMENTATION.pptx', 'CODE_DOCUMENTATION.ods', | |
| 'CODE_DOCUMENTATION.odp', 'CODE_DOCUMENTATION.odt', | |
| 'uv.lock', 'poetry.lock', 'Pipfile.lock', | |
| '.DS_Store' | |
| } | |
| # Ignore by file extension | |
| ignore_extensions = {'.pyc', '.pyo', '.pyd', '.so', '.egg-info'} | |
| return (path.name in ignore_files or | |
| path.suffix in ignore_extensions or | |
| path.name.endswith('.egg-info')) | |
| def get_code_files(directory): | |
| """Get all relevant code files""" | |
| code_extensions = {'.py', '.js', '.ts', '.html', '.css', '.sql', '.yaml', '.yml', '.json', '.toml', '.cfg', '.ini'} | |
| code_files = [] | |
| for file_path in directory.rglob("*"): | |
| # Skip if it's a directory | |
| if file_path.is_dir(): | |
| continue | |
| # Skip ignored files/folders | |
| if should_ignore(file_path): | |
| continue | |
| # Only include files with relevant extensions | |
| if file_path.suffix.lower() in code_extensions: | |
| code_files.append(file_path) | |
| return sorted(code_files) | |
| def main(): | |
| current_dir = Path.cwd() | |
| output_file = current_dir / "CODE_DOCUMENTATION.md" | |
| # Generate markdown | |
| markdown = f"# {current_dir.name}\n\n" | |
| markdown += f"Generated on: {Path.cwd()}\n\n" | |
| # Add tree structure | |
| markdown += "## Project Structure\n\n```\n" | |
| markdown += f"{current_dir.name}/\n" | |
| tree_items = generate_tree(current_dir) | |
| for item in tree_items: | |
| markdown += f"{item}\n" | |
| markdown += "```\n\n" | |
| # Get all code files | |
| code_files = get_code_files(current_dir) | |
| if code_files: | |
| markdown += "## Source Code\n\n" | |
| for file_path in code_files: | |
| try: | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| content = f.read() | |
| rel_path = file_path.relative_to(current_dir) | |
| file_extension = file_path.suffix.lstrip('.') | |
| # Use appropriate syntax highlighting | |
| if file_extension == 'py': | |
| lang = 'python' | |
| elif file_extension in ['js', 'ts']: | |
| lang = 'javascript' | |
| elif file_extension in ['html']: | |
| lang = 'html' | |
| elif file_extension in ['css']: | |
| lang = 'css' | |
| elif file_extension in ['sql']: | |
| lang = 'sql' | |
| elif file_extension in ['yaml', 'yml']: | |
| lang = 'yaml' | |
| elif file_extension in ['json']: | |
| lang = 'json' | |
| else: | |
| lang = file_extension | |
| markdown += f"### {rel_path}\n\n" | |
| markdown += f"```{lang}\n{content}\n```\n\n" | |
| except Exception as e: | |
| markdown += f"### {rel_path}\n\n" | |
| markdown += f"*Could not read file: {str(e)}*\n\n" | |
| continue | |
| else: | |
| markdown += "## Source Code\n\n*No code files found.*\n\n" | |
| # Write output | |
| try: | |
| with open(output_file, 'w', encoding='utf-8') as f: | |
| f.write(markdown) | |
| print(f"✅ Documentation generated successfully: {output_file}") | |
| print(f"📁 Total files documented: {len(code_files)}") | |
| except Exception as e: | |
| print(f"❌ Error writing documentation: {str(e)}") | |
| if __name__ == "__main__": | |
| main() | |
| ``` | |
| ### pyproject.toml | |
| ```toml | |
| [project] | |
| name = "derm_ai" | |
| version = "0.1.0" | |
| description = "This is derm_ai backend" | |
| authors = [ | |
| { name = "Muhammad Noman", email = "muhammadnoman76@gmail.com" } | |
| ] | |
| dependencies = [ | |
| "absolufy-imports==0.3.1", | |
| "aiohappyeyeballs==2.6.1", | |
| "aiohttp==3.12.15", | |
| "aiosignal==1.4.0", | |
| "alembic==1.16.5", | |
| "annotated-types==0.7.0", | |
| "anyio==4.10.0", | |
| "attrs==25.3.0", | |
| "Authlib==1.6.3", | |
| "beautifulsoup4==4.13.4", | |
| "Brotli==1.1.0", | |
| "cachetools==5.5.2", | |
| "certifi==2025.8.3", | |
| "cffi==2.0.0", | |
| "charset-normalizer==3.4.1", | |
| "click==8.2.1", | |
| "cloudpickle==3.1.1", | |
| "cobble==0.1.4", | |
| "colorama==0.4.6", | |
| "cryptography==45.0.7", | |
| "dataclasses-json==0.6.7", | |
| "dnspython==2.8.0", | |
| "docstring_parser==0.17.0", | |
| "email-validator==2.3.0", | |
| "fastapi==0.115.12", | |
| "filelock==3.19.1", | |
| "filetype==1.2.0", | |
| "frozenlist==1.7.0", | |
| "fsspec==2025.9.0", | |
| "g4f==0.5.2.1", | |
| "google-adk==1.14.0", | |
| "google-ai-generativelanguage==0.6.18", | |
| "google-api-core==2.25.1", | |
| "google-api-python-client==2.181.0", | |
| "google-auth==2.40.3", | |
| "google-auth-httplib2==0.2.0", | |
| "google-cloud-aiplatform==1.113.0", | |
| "google-cloud-appengine-logging==1.6.2", | |
| "google-cloud-audit-log==0.3.2", | |
| "google-cloud-bigquery==3.37.0", | |
| "google-cloud-bigtable==2.32.0", | |
| "google-cloud-core==2.4.3", | |
| "google-cloud-logging==3.12.1", | |
| "google-cloud-resource-manager==1.14.2", | |
| "google-cloud-secret-manager==2.24.0", | |
| "google-cloud-spanner==3.57.0", | |
| "google-cloud-speech==2.33.0", | |
| "google-cloud-storage==2.19.0", | |
| "google-cloud-trace==1.16.2", | |
| "google-crc32c==1.7.1", | |
| "google-genai==1.36.0", | |
| "google-resumable-media==2.7.2", | |
| "googleapis-common-protos==1.70.0", | |
| "graphviz==0.21", | |
| "greenlet==3.2.4", | |
| "grpc-google-iam-v1==0.14.2", | |
| "grpc-interceptor==0.15.4", | |
| "grpcio==1.74.0", | |
| "grpcio-status==1.74.0", | |
| "h11==0.16.0", | |
| "h2==4.3.0", | |
| "hpack==4.1.0", | |
| "httpcore==1.0.9", | |
| "httplib2==0.31.0", | |
| "httpx==0.28.1", | |
| "httpx-sse==0.4.1", | |
| "huggingface-hub==0.30.2", | |
| "hyperframe==6.1.0", | |
| "idna==3.10", | |
| "importlib_metadata==8.7.0", | |
| "jellyfish==1.2.0", | |
| "Jinja2==3.1.6", | |
| "joblib==1.5.2", | |
| "jsonpatch==1.33", | |
| "jsonpointer==3.0.0", | |
| "jsonschema==4.25.1", | |
| "jsonschema-specifications==2025.9.1", | |
| "langchain==0.3.26", | |
| "langchain-community==0.3.23", | |
| "langchain-core==0.3.76", | |
| "langchain-google-genai==2.1.4", | |
| "langchain-qdrant==0.2.0", | |
| "langchain-text-splitters==0.3.8", | |
| "langsmith==0.3.45", | |
| "lxml==6.0.1", | |
| "Mako==1.3.10", | |
| "mammoth==1.9.0", | |
| "markdownify==1.1.0", | |
| "MarkupSafe==3.0.2", | |
| "marshmallow==3.26.1", | |
| "mcp==1.14.0", | |
| "mpmath==1.3.0", | |
| "multidict==6.6.4", | |
| "mypy_extensions==1.1.0", | |
| "nest-asyncio==1.6.0", | |
| "networkx==3.5", | |
| "nltk==3.9.1", | |
| "numpy==2.2.4", | |
| "opentelemetry-api==1.37.0", | |
| "opentelemetry-exporter-gcp-trace==1.9.0", | |
| "opentelemetry-resourcedetector-gcp==1.9.0a0", | |
| "opentelemetry-sdk==1.37.0", | |
| "opentelemetry-semantic-conventions==0.58b0", | |
| "orjson==3.11.3", | |
| "packaging==25.0", | |
| "pandas==2.2.3", | |
| "pdfminer.six==20250416", | |
| "pillow==11.2.1", | |
| "portalocker==2.10.1", | |
| "propcache==0.3.2", | |
| "proto-plus==1.26.1", | |
| "protobuf==6.32.1", | |
| "puremagic==1.28", | |
| "pyasn1==0.6.1", | |
| "pyasn1_modules==0.4.2", | |
| "pycparser==2.23", | |
| "pycryptodome==3.23.0", | |
| "pydantic==2.11.3", | |
| "pydantic_core==2.33.1", | |
| "pydantic-settings==2.10.1", | |
| "PyJWT==1.7.1", | |
| "pymongo==4.12.1", | |
| "pyparsing==3.2.4", | |
| "pypdf==5.4.0", | |
| "pytesseract==0.3.13", | |
| "python-dateutil==2.9.0.post0", | |
| "python-dotenv==1.1.0", | |
| "python-http-client==3.3.7", | |
| "python-multipart==0.0.20", | |
| "python-pptx==1.0.2", | |
| "pytz==2025.2", | |
| "PyYAML==6.0.2", | |
| "qdrant-client==1.14.2", | |
| "referencing==0.36.2", | |
| "regex==2025.9.1", | |
| "requests==2.32.5", | |
| "requests-toolbelt==1.0.0", | |
| "rpds-py==0.27.1", | |
| "rsa==4.9.1", | |
| "safetensors==0.6.2", | |
| "scikit-learn==1.6.1", | |
| "scipy==1.16.2", | |
| "segtok==1.5.11", | |
| "sendgrid==6.11.0", | |
| "shapely==2.1.1", | |
| "six==1.17.0", | |
| "sniffio==1.3.1", | |
| "soupsieve==2.8", | |
| "SQLAlchemy==2.0.43", | |
| "sqlalchemy-spanner==1.16.0", | |
| "sqlparse==0.5.3", | |
| "sse-starlette==3.0.2", | |
| "starkbank-ecdsa==2.2.0", | |
| "starlette==0.46.2", | |
| "sympy==1.13.1", | |
| "tabulate==0.9.0", | |
| "tenacity==8.5.0", | |
| "threadpoolctl==3.6.0", | |
| "tokenizers==0.21.4", | |
| "torch==2.5.1", | |
| "torchvision==0.20.1", | |
| "tqdm==4.67.1", | |
| "transformers==4.51.3", | |
| "typing_extensions==4.15.0", | |
| "typing-inspect==0.9.0", | |
| "typing-inspection==0.4.1", | |
| "tzdata==2025.2", | |
| "tzlocal==5.3.1", | |
| "uritemplate==4.2.0", | |
| "urllib3==2.5.0", | |
| "uvicorn==0.34.1", | |
| "watchdog==6.0.0", | |
| "websockets==15.0.1", | |
| "Werkzeug==3.1.3", | |
| "xlsxwriter==3.2.8", | |
| "yake==0.4.8", | |
| "yarl==1.20.1", | |
| "zipp==3.23.0", | |
| "zstandard==0.23.0", | |
| "MagicConvert==0.1.3", | |
| ] | |
| [build-system] | |
| requires = ["setuptools>=61.0"] | |
| build-backend = "setuptools.build_meta" | |
| ``` | |