File size: 11,638 Bytes
a882be6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
"""
Text preprocessing for topic analysis:
  - Basic cleaning and normalisation
  - spaCy-based data cleansing (lemmatise, remove stops/punct/nums/custom words)
  - Gensim bigram creation + Dictionary/corpus
"""

import csv
import os
import re
from typing import Dict, List, Tuple

import nltk
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords

NLTK_DIR = os.environ.get("NLTK_DATA", os.path.join(os.getcwd(), "nltk_data"))
os.makedirs(NLTK_DIR, exist_ok=True)
if NLTK_DIR not in nltk.data.path:
    nltk.data.path.insert(0, NLTK_DIR)

for _pkg in ["punkt", "punkt_tab", "wordnet", "stopwords", "averaged_perceptron_tagger"]:
    try:
        nltk.download(_pkg, download_dir=NLTK_DIR, quiet=True)
    except Exception:
        pass

_lemmatizer = WordNetLemmatizer()
_stop_words = set(stopwords.words("english"))

# ── Lazy spaCy singleton ──────────────────────────────────────────────────────
_nlp = None

def _get_nlp():
    global _nlp
    if _nlp is None:
        import spacy
        _nlp = spacy.load("en_core_web_md")
    return _nlp


# ── Basic cleaning ────────────────────────────────────────────────────────────

def clean_text(text: str) -> str:
    """Lowercase, remove URLs/emails/punctuation, normalise whitespace."""
    text = text.lower()
    text = re.sub(r"http\S+|www\S+", "", text)
    text = re.sub(r"\S+@\S+", "", text)
    text = re.sub(r"[^a-z0-9\s]", " ", text)
    text = re.sub(r"\s+", " ", text).strip()
    return text


def tokenize_and_lemmatize(text: str) -> List[str]:
    tokens = word_tokenize(clean_text(text))
    return [
        _lemmatizer.lemmatize(t)
        for t in tokens
        if t not in _stop_words and len(t) > 2
    ]


def preprocess_batch(texts: List[str]) -> List[str]:
    """Return cleaned (not lemmatized) texts for transformer models."""
    return [clean_text(t) for t in texts]


def preprocess_batch_classical(texts: List[str]) -> List[str]:
    """Return lemmatized, stop-word-free strings for LDA/NMF."""
    return [" ".join(tokenize_and_lemmatize(t)) for t in texts]


# ── spaCy data cleansing ──────────────────────────────────────────────────────

def clean_texts_spacy(
    texts: List[str],
    extra_stopwords: List[str] | None = None,
) -> List[List[str]]:
    """
    Per-document spaCy cleansing: lemmatise each token, remove stop words,
    punctuation, numbers, and any caller-supplied extra stop words.

    Args:
        texts:            raw documents
        extra_stopwords:  additional words to exclude (e.g. ['I', 'said'])

    Returns:
        cleaned_texts: list of token lists, one per document
    """
    nlp = _get_nlp()
    extra = set(w.lower() for w in (extra_stopwords or []))

    cleaned_texts: List[List[str]] = []
    for text in texts:
        doc = nlp(text)
        tokens = [
            word.lemma_.lower()
            for word in doc
            if not word.is_stop
            and not word.is_punct
            and not word.like_num
            and word.text.lower() not in extra
            and len(word.lemma_.strip()) > 1
        ]
        cleaned_texts.append(tokens)
    return cleaned_texts


# ── POS tagging ───────────────────────────────────────────────────────────────

def get_pos_tags(texts: List[str]) -> List[List[Tuple[str, str, str]]]:
    """
    Run spaCy POS tagging on a list of documents.

    Returns:
        List of token lists per document. Each token is (text, pos_, tag_).
        Whitespace-only tokens are excluded.
    """
    nlp = _get_nlp()
    results: List[List[Tuple[str, str, str]]] = []
    for text in texts:
        doc = nlp(text)
        results.append([
            (token.text, token.pos_, token.tag_)
            for token in doc
            if not token.is_space
        ])
    return results


# ── NER tagging ───────────────────────────────────────────────────────────────

def get_ner_tags(texts: List[str]) -> List[List[Tuple[int, int, str, str]]]:
    """
    Run spaCy NER on a list of documents.

    Returns:
        List of entity lists per document.
        Each entity is (start_char, end_char, entity_text, label_).
    """
    nlp = _get_nlp()
    results: List[List[Tuple[int, int, str, str]]] = []
    for text in texts:
        doc = nlp(text)
        results.append([
            (ent.start_char, ent.end_char, ent.text, ent.label_)
            for ent in doc.ents
        ])
    return results


# ── Dependency parsing ────────────────────────────────────────────────────────

def get_dep_parse(texts: List[str]) -> Dict:
    """
    Run spaCy dependency parsing on a list of documents.

    Returns dict with three parallel lists (one entry per document):
        noun_chunks  – list of (chunk_text, root_text, dep_, head_text)
        token_deps   – list of (token_text, dep_, head_text, head_pos_, [children])
        svgs         – displaCy arc-diagram SVG strings
    """
    import random
    from spacy import displacy

    nlp = _get_nlp()
    options = {
        "distance": 120,
        "compact":  True,
        "color":    "#000000",   # arc label colour
        "bg":       "#ffffff",   # SVG background
        "font":     "Source Sans Pro",
    }

    noun_chunks_all: List = []
    token_deps_all:  List = []
    svgs:            List[str] = []

    for text in texts:
        doc = nlp(text)

        # Noun chunks: (chunk, root, dependency relation, head)
        noun_chunks_all.append([
            (chunk.text, chunk.root.text, chunk.root.dep_, chunk.root.head.text)
            for chunk in doc.noun_chunks
        ])

        # Token-level deps: (token, dep, head_text, head_pos, children)
        token_deps_all.append([
            (token.text, token.dep_, token.head.text,
             token.head.pos_, [str(c) for c in token.children])
            for token in doc
            if token.dep_ and not token.is_space
        ])

        # displaCy SVG with a unique ID to avoid browser rendering conflicts
        svg = displacy.render(doc, style="dep", jupyter=False, options=options)
        uid = f"displacy-{random.randint(0, 999999)}"
        svg = svg.replace('id="displacy"', f'id="{uid}"')
        svg = svg.replace('id="displacy-svg"', f'id="{uid}"')

        # Step 1: rewrite every fill/stroke to black (arcs, arrows, boxes).
        # White (#ffffff) is preserved for the background rect.
        import re as _re
        svg = _re.sub(
            r'(fill|stroke)="(?!#ffffff|#FFFFFF|none|transparent)([^"]*)"',
            lambda m: f'{m.group(1)}="#000000"',
            svg,
        )
        svg = _re.sub(
            r'(fill|stroke)\s*:\s*(?!#ffffff|#FFFFFF|none|transparent)([^;}"]*)',
            lambda m: f'{m.group(1)}: #000000',
            svg,
        )

        # Step 2: override text elements to navy blue + bold via <style>.
        # CSS rules override SVG presentation attributes, so this wins over Step 1.
        _text_style = (
            "<style>"
            "text, .displacy-word, .displacy-tag {"
            "  fill: #000080 !important;"
            "  font-weight: bold !important;"
            "}"
            "</style>"
        )
        svg = svg.replace("</svg>", _text_style + "</svg>")
        svgs.append(svg)

    return {"noun_chunks": noun_chunks_all, "token_deps": token_deps_all, "svgs": svgs}


# ── Gensim bigram creation ────────────────────────────────────────────────────

def create_bigrams(
    cleaned_texts: List[List[str]],
    min_count: int = 2,
    threshold: float = 10.0,
) -> Tuple[List[List[str]], object, List[List[Tuple[int, int]]]]:
    """
    Build gensim bigram model, apply it to cleaned_texts, then create
    a Dictionary and bag-of-words corpus.

    Args:
        cleaned_texts:  output of clean_texts_spacy()
        min_count:      minimum frequency for a bigram to be kept
        threshold:      higher β†’ fewer bigrams

    Returns:
        (bc_texts, dictionary, corpus)
        - bc_texts:    token lists with bigrams inserted  (e.g. ['wait_time', ...])
        - dictionary:  gensim Dictionary
        - corpus:      list of bag-of-words [(word_id, count), ...]
    """
    import gensim
    from gensim.corpora import Dictionary

    bigram_model = gensim.models.phrases.Phrases(
        cleaned_texts,
        min_count=min_count,
        threshold=threshold,
    )
    bc_texts = [bigram_model[line] for line in cleaned_texts]

    dictionary = Dictionary(bc_texts)
    corpus = [dictionary.doc2bow(text) for text in bc_texts]

    return bc_texts, dictionary, corpus


# ── Combined pipeline ─────────────────────────────────────────────────────────

def full_preprocess(
    texts: List[str],
    extra_stopwords: List[str] | None = None,
) -> Dict:
    """
    Run the full preprocessing pipeline and return all intermediate results.

    Returns dict with keys:
        cleaned_texts   - spaCy token lists (one per doc)
        bc_texts        - token lists with bigrams
        dictionary      - gensim Dictionary
        corpus          - bag-of-words corpus
        sample_tokens   - first doc tokens (for display)
        sample_bigrams  - first doc bigram tokens (for display)
        sample_bow      - first doc BOW (for display)
    """
    cleaned_texts = clean_texts_spacy(texts, extra_stopwords)
    bc_texts, dictionary, corpus = create_bigrams(cleaned_texts)

    return {
        "cleaned_texts": cleaned_texts,
        "bc_texts":      bc_texts,
        "dictionary":    dictionary,
        "corpus":        corpus,
        "sample_tokens": cleaned_texts[0] if cleaned_texts else [],
        "sample_bigrams": bc_texts[0] if bc_texts else [],
        "sample_bow":    corpus[0] if corpus else [],
    }


# ── File I/O ──────────────────────────────────────────────────────────────────

def parse_input(raw: str) -> List[str]:
    """Split raw textarea input into individual documents (one per line)."""
    return [line.strip() for line in raw.splitlines() if line.strip()]


def read_file_path(path: str) -> List[str]:
    """Read .txt or .csv file and return list of documents."""
    ext = os.path.splitext(path)[1].lower()
    if ext == ".txt":
        with open(path, "r", encoding="utf-8") as f:
            return [line.strip() for line in f if line.strip()]
    if ext == ".csv":
        docs = []
        with open(path, "r", encoding="utf-8") as f:
            reader = csv.reader(f)
            for row in reader:
                line = " ".join(row).strip()
                if line:
                    docs.append(line)
        return docs
    return []