import torch import numpy as np import io import matplotlib.pyplot as plt import pandas as pd from sentence_transformers import SentenceTransformer from transformers import pipeline from datetime import datetime from PIL import Image import os from datetime import datetime # from openai import OpenAI # from ai71 import AI71 if torch.cuda.is_available(): model = model.to('cuda') dials_embeddings = pd.read_pickle('https://huggingface.co/datasets/vsrinivas/CBT_dialogue_embed_ds/resolve/main/kaggle_therapy_embeddings.pkl') with open ('emotion_group_labels.txt') as file: emotion_group_labels = file.read().splitlines() embed_model = SentenceTransformer('paraphrase-MiniLM-L6-v2') classifier = pipeline("zero-shot-classification", model ='facebook/bart-large-mnli') # AI71_API_KEY = os.getenv('AI71_API_KEY') # print(AI71_API_KEY) huggingface_token = os.getenv('hf_token') print("hf-token:",huggingface_token) # Detect emotions from patient dialogues def detect_emotions(text): emotion = classifier(text, candidate_labels=emotion_group_labels, batch_size=16) top_5_scores = [i/sum(emotion['scores'][:5]) for i in emotion['scores'][:5]] top_5_emotions = emotion['labels'][:5] emotion_set = {l: "{:.2%}".format(s) for l, s in zip(top_5_emotions, top_5_scores)} return emotion_set # Measure cosine similarity between a pair of vectors def cosine_distance(vec1,vec2): cosine = (np.dot(vec1, vec2)/(np.linalg.norm(vec1)*np.linalg.norm(vec2))) return cosine # Generate an image of trigger emotions def generate_triggers_img(items): labels = list(items.keys()) values = [float(v.strip('%')) for v in items.values()] # Convert to float for plotting new_items = {k:v for k, v in zip(labels, values)} new_items = dict(sorted(new_items.items(), key=lambda item: item[1])) labels = list(new_items.keys()) values = list(new_items.values()) fig, ax = plt.subplots(figsize=(10, 6)) colors = plt.cm.viridis(np.linspace(0, 1, len(labels))) bars = ax.barh(labels, values, color=colors) for spine in ax.spines.values(): spine.set_visible(False) ax.tick_params(axis='y', labelsize=18) ax.xaxis.set_visible(False) ax.yaxis.set_ticks_position('none') for bar in bars: width = bar.get_width() ax.text(width, bar.get_y() + bar.get_height()/2, f'{width:.2f}%', ha='left', va='center', fontweight='bold', fontsize=18) plt.tight_layout() plt.savefig('triggeres.png') triggers_img = Image.open('triggeres.png') return triggers_img def get_doc_response_emotions(user_message, therapy_session_conversation): user_messages = [] user_messages.append(user_message) emotion_set = detect_emotions(user_message) print(emotion_set) emotions_msg = generate_triggers_img(emotion_set) user_embedding = embed_model.encode(user_message, device='cuda' if torch.cuda.is_available() else 'cpu') similarities =[] for v in dials_embeddings['embeddings']: similarities.append(cosine_distance(user_embedding,v)) top_match_index = similarities.index(max(similarities)) doc_response = dials_embeddings.iloc[top_match_index]['Doctor'] therapy_session_conversation.append(["User: "+user_message, "Therapist: "+doc_response]) print(f"User's message: {user_message}") print(f"RAG Matching message: {dials_embeddings.iloc[top_match_index]['Patient']}") print(f"Therapist's response: {dials_embeddings.iloc[top_match_index]['Doctor']}\n\n") print(f"therapy_session_conversation: {therapy_session_conversation}") return '', therapy_session_conversation, emotions_msg from transformers import AutoTokenizer, AutoModelForCausalLM import torch # Load model once globally for reuse tokenizer = AutoTokenizer.from_pretrained("tiiuae/Falcon-H1-3B-Instruct") use_cuda = torch.cuda.is_available() model = AutoModelForCausalLM.from_pretrained( "tiiuae/Falcon-H1-1.5B-Deep-Instruct", torch_dtype=torch.float16 if use_cuda else torch.float32, device_map="auto" if use_cuda else None ) def generate_falcon_response(prompt, max_new_tokens=300): inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=1800).to(model.device) outputs = model.generate( **inputs, max_new_tokens=max_new_tokens, do_sample=True, temperature=0.7, top_p=0.9 ) decoded_output = tokenizer.decode(outputs[0], skip_special_tokens=True) return decoded_output[len(prompt):].strip() def summarize_and_recommend(therapy_session_conversation): session_time = str(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # session_conversation = [item[0] for item in therapy_session_conversation] # session_conversation = [x for x in session_conversation if x is not None] session_conversation = [x for item in therapy_session_conversation for x in item if x is not None] session_conversation.insert(0, "Session_time: " + session_time) session_conversation_processed = '\n'.join(session_conversation) print("session_conversation_processed:", session_conversation_processed) # Summarization prompt summary_prompt = f"""You are an Expert Cognitive Behavioural Therapist and Precis writer. Summarize STRICTLY the following session into concise, ethical, and clinically meaningful content. Session: {session_conversation_processed} Format your response as: Session Time: Summary of the patient messages: Summary of therapist messages: Summary of the whole session: Ensure the entire summary is less than 300 tokens.""" full_summary = generate_falcon_response(summary_prompt, max_new_tokens=300) print("\nFull summary:", full_summary) # Recommendation prompt recommendation_prompt = f"""You are an expert Cognitive Behavioural Therapist. Based STRICTLY on the following summary, provide a clinically valid action plan for the patient. Summary: {full_summary} Use this format: - The patient is referred to... - The patient is advised to... - The patient is refrained from... - It is suggested that the patient... - Scheduled a follow-up session with the patient... Ensure the list contains NOT MORE THAN 7 points and is in passive voice with proper tense.""" full_recommendations = generate_falcon_response(recommendation_prompt, max_new_tokens=400) print("\nFull recommendations:", full_recommendations) chatbox = [] return full_summary, full_recommendations, chatbox