File size: 6,121 Bytes
b6df85f
 
 
 
 
a6d773a
 
 
 
b6df85f
6400ad7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b6df85f
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import os

# =============================
# Fix cache permissions pour HF
# =============================
os.environ["HF_HOME"] = "/tmp/hf"
os.environ["TRANSFORMERS_CACHE"] = "/tmp/hf"
os.environ["HF_HUB_CACHE"] = "/tmp/hf"


from fastapi import FastAPI
from pydantic import BaseModel
from transformers import AutoTokenizer, AutoModelForCausalLM, AutoModelForSequenceClassification
import torch
import torch.nn.functional as F
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from mtranslate import translate
from langdetect import detect
from duckduckgo_search import DDGS
import re

# =============================
# Nettoyage texte
# =============================
def clean_response(text):
    text = re.sub(r'<[^>]+>', '', text)
    text = re.split(r'</(Bot|name|opinion|User|[a-zA-Z]*)>', text)[0]
    text = re.sub(r'^\s*[,.:;-]*', '', text)
    text = re.sub(r'^\s*(Psyche|Therapist|Bot|Assistant|AI):?\s*', '', text)
    text = re.sub(r'\([^)]*\)', '', text)
    text = re.sub(r'\[.*?\]', '', text)
    text = re.sub(r'[:;=8][-~]?[)D(\\/*|]', '', text)
    text = re.sub(r'\s{2,}', ' ', text).strip()
    sentences = re.split(r'(?<=[.!?])\s+', text)
    return " ".join(sentences[:2]).strip()

# =============================
# Charger modèles
# =============================
MODEL_PATH = "fatmata/gpt-psybot"
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH, use_fast=False)
model = AutoModelForCausalLM.from_pretrained(MODEL_PATH)

BERT_MODEL_NAME = "fatmata/bert_model"
bert_tokenizer = AutoTokenizer.from_pretrained(BERT_MODEL_NAME)
bert_model = AutoModelForSequenceClassification.from_pretrained(BERT_MODEL_NAME)

CLASSIFIER_PATH = "fatmata/mini_bert"
model_c = AutoModelForSequenceClassification.from_pretrained(CLASSIFIER_PATH)
tokenizer_c = AutoTokenizer.from_pretrained(CLASSIFIER_PATH)

# =============================
# Analyse émotion
# =============================
analyzer = SentimentIntensityAnalyzer()
GOEMOTIONS_LABELS = ["admiration","anger","approval","autre","curiosity",
                     "disapproval","gratitude","joy","love","neutral","sadness"]
UNACCEPTABLE_EMOTIONS = {"anger"}

def detect_language(text):
    try:
        detected_lang = detect(text)
        return detected_lang if detected_lang in ["fr", "en", "ar"] else "en"
    except:
        return "en"

def search_duckduckgo(query, max_results=3):
    try:
        search_results = list(DDGS().text(query, max_results=max_results))
        return [result["body"] for result in search_results if "body" in result] or ["Pas trouvé."]
    except Exception as e:
        return [f"Erreur recherche : {str(e)}"]

def generate_response(user_input):
    prompt = f"User: {user_input}\nBot:"
    inputs = tokenizer(prompt, return_tensors="pt")
    output = model.generate(
        input_ids=inputs["input_ids"],
        max_new_tokens=150,
        pad_token_id=tokenizer.eos_token_id,
        eos_token_id=tokenizer.eos_token_id,
        do_sample=True,
        temperature=0.7,
        top_k=50,
        top_p=0.9,
        repetition_penalty=1.2
    )
    generated_text = tokenizer.decode(output[0], skip_special_tokens=True)
    return clean_response(generated_text.split("Bot:")[-1].strip())

def classify_emotion(text):
    sentiment_scores = analyzer.polarity_scores(text)
    compound = sentiment_scores['compound'] * 100
    inputs = bert_tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=256)
    with torch.no_grad():
        logits = bert_model(**inputs).logits
    probs = F.softmax(logits, dim=-1).squeeze().cpu().numpy()
    top_emotion_index = probs.argmax()
    top_emotion = GOEMOTIONS_LABELS[top_emotion_index]
    return compound, top_emotion in UNACCEPTABLE_EMOTIONS, top_emotion

def predict_category(text):
    inputs = tokenizer_c(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
    with torch.no_grad():
        outputs = model_c(**inputs)
    logits = outputs.logits
    return "recherche" if torch.argmax(logits, dim=-1).item() == 1 else "gpt"

# =============================
# Fonction principale
# =============================
def classify_and_respond(text):
    steps = []
    original_lang = detect_language(text)
    text_en = translate(text, "en")

    # Étape 1 : prédiction catégorie
    category = predict_category(text_en)
    steps.append("Catégorie détectée : " + category)

    if category == "recherche":
        response = search_duckduckgo(text_en)
        final_response = "\n".join([translate(r, original_lang) for r in response])
        steps.append("Résultats DuckDuckGo récupérés")
        return {
            "response": final_response,
            "response_type": "recherche",
            "emotions": None,
            "steps": steps
        }

    # Étape 2 : analyse émotion
    compound, is_unacceptable, emotion = classify_emotion(text_en)
    steps.append(f"Émotion détectée : {emotion} (score={compound:.2f})")

    if is_unacceptable and abs(compound) > 50:
        final_response = translate("Je ressens beaucoup de tension dans votre message.", original_lang)
        steps.append("Réponse émotion inacceptable envoyée")
        return {
            "response": final_response,
            "response_type": "non acceptable",
            "emotions": emotion,
            "steps": steps
        }

    # Étape 3 : génération GPT
    gpt_response = generate_response(text_en)
    final_response = translate(gpt_response, original_lang)
    steps.append("Réponse GPT générée et traduite")

    return {
        "response": final_response,
        "response_type": "gpt",
        "emotions": emotion,
        "steps": steps
    }

# =============================
# API FastAPI
# =============================
app = FastAPI()

class RequestBody(BaseModel):
    text: str

@app.post("/predict")
async def predict_api(body: RequestBody):
    return classify_and_respond(body.text)

# =============================
# Lancement local (uvicorn)
# =============================
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=7860)