File size: 6,416 Bytes
9d9d143
 
 
5ac897e
 
9de93b0
9d9d143
5ac897e
9d9d143
 
 
 
5ac897e
e34ebb0
2108ce2
e34ebb0
9d9d143
e69a3fb
5ac897e
9d9d143
5ac897e
14bd362
 
668f19f
 
 
 
9d9d143
 
 
 
 
 
 
5ac897e
9d9d143
 
 
 
 
 
 
 
38026de
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9d9d143
5ac897e
 
 
9d9d143
5ac897e
 
 
 
9d9d143
5ac897e
 
 
 
 
 
 
 
 
 
 
 
 
94eceb2
 
 
 
 
 
 
 
 
5ac897e
9d9d143
94eceb2
 
 
 
 
 
 
 
5ac897e
9d9d143
 
 
 
 
5ac897e
 
9d9d143
5ac897e
 
 
 
9d9d143
5ac897e
 
 
 
 
 
 
 
 
 
 
 
 
 
9d9d143
 
5ac897e
9d9d143
 
e69a3fb
 
9d9d143
5ac897e
 
 
 
 
 
9d9d143
5ac897e
 
9d9d143
 
 
5ac897e
 
 
9d9d143
 
 
 
 
 
5ac897e
 
9d9d143
5ac897e
9d9d143
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
from pathlib import Path

from fastapi import FastAPI
from pydantic import BaseModel
from typing import List
from transformers import MarianMTModel, MarianTokenizer
import torch
from transformers import (
    AutoTokenizer,
    AutoModelForTokenClassification,
    AutoModelForSequenceClassification,
    pipeline,
)
import os
from huggingface_hub import snapshot_download

# ────────────────────── konfiguracja ──────────────────────
device = "cuda" if torch.cuda.is_available() else "cpu"

ROOT = Path(__file__).parent

aspect_dir = ROOT / "bert-aspect-ner"
sentiment_dir = ROOT / "absa-roberta"


device = "cuda" if torch.cuda.is_available() else "cpu"
hf_token = os.getenv("HF_TOKEN")
# ────────────────────── modele lokalne ─────────────────────
aspect_tokenizer = AutoTokenizer.from_pretrained(
    str(aspect_dir), local_files_only=True, use_fast=False        # ← jeΕ›li brak tokenizer.json
)
aspect_model = AutoModelForTokenClassification.from_pretrained(
    str(aspect_dir), local_files_only=True
).to(device)

sentiment_tokenizer = AutoTokenizer.from_pretrained(
    str(sentiment_dir), local_files_only=True
)
sentiment_model = AutoModelForSequenceClassification.from_pretrained(
    str(sentiment_dir), local_files_only=True
).to(device)

# ────────────────────── modele tΕ‚umaczeΕ„ (on-line) ─────────
HF_CACHE_DIR = "/tmp/hf_cache"
os.makedirs(HF_CACHE_DIR, exist_ok=True)
os.environ["HF_HOME"] = HF_CACHE_DIR
os.environ["TRANSFORMERS_CACHE"] = HF_CACHE_DIR

#  Pobieramy modele
pl_to_en_dir = snapshot_download(
    "Helsinki-NLP/opus-mt-pl-en", token=hf_token, cache_dir=HF_CACHE_DIR
)
en_to_pl_dir = snapshot_download(
    "gsarti/opus-mt-tc-en-pl", token=hf_token, cache_dir=HF_CACHE_DIR
)

# Ładujemy
pl_to_en_tok = MarianTokenizer.from_pretrained(pl_to_en_dir)
pl_to_en_mod = MarianMTModel.from_pretrained(pl_to_en_dir).to(device)

en_to_pl_tok = MarianTokenizer.from_pretrained(en_to_pl_dir)
en_to_pl_mod = MarianMTModel.from_pretrained(en_to_pl_dir).to(device)
# ────────────────────── schemy Pydantic ────────────────────
class Comment(BaseModel):
    text: str


class AspectSentiment(BaseModel):
    aspect: str
    sentiment: str


class AnalysisResult(BaseModel):
    results: List[AspectSentiment]

# === Słownik aliasów aspektów EN→PL (taki sam jak wcześniej) ===
aspect_aliases = {
    "food": "jedzenie", "service": "obsΕ‚uga", "price": "cena",
    "taste": "smak", "waiter": "obsΕ‚uga", "dish": "danie",
    "portion": "porcja", "staff": "obsΕ‚uga", "decor": "wystrΓ³j",
    "menu": "menu", "drink": "napoje", "location": "lokalizacja",
    "time": "czas oczekiwania", "cleanliness": "czystoΕ›Δ‡", "smell": "zapach",
    "value": "cena", "experience": "doΕ›wiadczenie", "recommendation": "ogΓ³lna ocena",
    "children": "dzieci", "family": "rodzina", "pet": "zwierzΔ™ta"
}
# ───────────────────── tΕ‚umaczenia  ──────────────────────
def translate_pl_to_en(texts: list[str]) -> list[str]:
    inputs = pl_to_en_tok(texts,
                          return_tensors="pt",
                          padding=True,
                          truncation=True).to(device)
    with torch.no_grad():
        generated = pl_to_en_mod.generate(**inputs)
    return pl_to_en_tok.batch_decode(generated, skip_special_tokens=True)


def translate_en_to_pl(texts: list[str]) -> list[str]:
    inputs = en_to_pl_tok(texts,
                          return_tensors="pt",
                          padding=True,
                          truncation=True).to(device)
    with torch.no_grad():
        generated = en_to_pl_mod.generate(**inputs)
    return en_to_pl_tok.batch_decode(generated, skip_special_tokens=True)


def extract_aspects(text_en: str):
    inputs = aspect_tokenizer(
        text_en, return_tensors="pt", truncation=True, padding=True
    ).to(device)
    with torch.no_grad():
        outputs = aspect_model(**inputs)

    preds = torch.argmax(outputs.logits, dim=2)[0].cpu().numpy()
    tokens = aspect_tokenizer.convert_ids_to_tokens(inputs["input_ids"][0])
    labels = [aspect_model.config.id2label[p] for p in preds]

    aspects, current_tokens = [], []
    for token, label in zip(tokens, labels):
        if label == "B-ASP":
            if current_tokens:
                aspects.append(aspect_tokenizer.convert_tokens_to_string(current_tokens).strip())
            current_tokens = [token]
        elif label == "I-ASP" and current_tokens:
            current_tokens.append(token)
        else:
            if current_tokens:
                aspects.append(aspect_tokenizer.convert_tokens_to_string(current_tokens).strip())
                current_tokens = []
    if current_tokens:
        aspects.append(aspect_tokenizer.convert_tokens_to_string(current_tokens).strip())

    # ↓ usuΕ„ spacje z β€ž##” i zduplikowane wyniki
    return list({tok.replace(" ##", "") for tok in aspects})


# ────────────────────── FastAPI ────────────────────────────
app = FastAPI()


@app.post("/analyze", response_model=AnalysisResult)
def analyze_comment(comment: Comment):
    text_pl = comment.text
    text_en = translate_pl_to_en([text_pl])[0]
    aspects = extract_aspects(text_en)

    results: list[AspectSentiment] = []
    for asp in aspects:
        input_text = f"{text_en} [SEP] {asp}"
        inputs = sentiment_tokenizer(
            input_text, return_tensors="pt", truncation=True, padding=True
        ).to(device)
        with torch.no_grad():
            logits = sentiment_model(**inputs).logits
            predicted_class_id = int(logits.argmax().cpu())
            sentiment_label = {
                0: "negatywny",
                1: "neutralny",
                2: "pozytywny",
                3: "konfliktowy",
            }[predicted_class_id]

        asp_pl = aspect_aliases.get(asp, translate_en_to_pl([asp])[0].lower())
        results.append(AspectSentiment(aspect=asp_pl, sentiment=sentiment_label))

    return {"results": results}