| import os |
| import json |
| import math |
| import re |
| import statistics |
| import urllib.request |
| from collections import Counter |
| from pathlib import Path |
|
|
| import joblib |
| import numpy as np |
| from fastapi import FastAPI, HTTPException |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.middleware.cors import CORSMiddleware |
| from sklearn.feature_extraction import DictVectorizer |
| from sklearn.linear_model import Ridge |
| from sklearn.metrics import mean_absolute_error, mean_squared_error |
| from sklearn.multioutput import MultiOutputRegressor |
| from sklearn.pipeline import Pipeline |
| import uvicorn |
|
|
| app = FastAPI(title="Human Essence Inference") |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| DATA_URL = "https://huggingface.co/datasets/wop/Human-Essence-Dataset/raw/main/data.json" |
| DATA_PATH = Path("/home/user/app/data.json") |
| MODEL_PATH = Path("/home/user/app/human_essence_flow.joblib") |
| MODEL_DATA_PATH = Path("/home/user/app/static/model_data.json") |
|
|
| training_info = {} |
|
|
| |
|
|
| def download_data(): |
| print(f"Downloading dataset to {DATA_PATH} ...") |
| urllib.request.urlretrieve(DATA_URL, DATA_PATH) |
|
|
| def load_rows(): |
| if not DATA_PATH.exists(): |
| download_data() |
| with DATA_PATH.open("r", encoding="utf-8") as f: |
| rows = json.load(f) |
| if not isinstance(rows, list): |
| raise ValueError("Expected data.json to contain a JSON list") |
| return rows |
|
|
| def normalize_label(label): |
| return str(label).strip().lower().replace(" ", "_") |
|
|
| def tokenize(text): |
| return re.findall(r"\S+", text.strip()) |
|
|
| def token_features(tokens, i): |
| token = tokens[i] |
| lower = token.lower() |
| clean = re.sub(r"[^a-z0-9']+", "", lower) |
| prev_tok = tokens[i - 1].lower() if i > 0 else "<START>" |
| next_tok = tokens[i + 1].lower() if i + 1 < len(tokens) else "<END>" |
| n = max(len(tokens) - 1, 1) |
| return { |
| "word": lower, |
| "clean_word": clean or "<PUNCT>", |
| "prev_word": prev_tok, |
| "next_word": next_tok, |
| "prefix2": clean[:2], |
| "prefix3": clean[:3], |
| "suffix2": clean[-2:], |
| "suffix3": clean[-3:], |
| "word_len": len(token), |
| "position_norm": i / n, |
| "position_sin": math.sin((i / n) * math.pi), |
| "is_first": True if i == 0 else False, |
| "is_last": True if i == len(tokens) - 1 else False, |
| "is_title": token.istitle(), |
| "is_upper": token.isupper(), |
| "has_digit": any(ch.isdigit() for ch in token), |
| "has_punct": any(not ch.isalnum() for ch in token), |
| "ends_period": token.endswith("."), |
| "ends_question": token.endswith("?"), |
| "ends_exclaim": token.endswith("!"), |
| "contains_ai": "ai" in clean, |
| } |
|
|
| def discover_labels(rows): |
| labels = set() |
| for row in rows: |
| for emotion in row.get("emotional_flow") or []: |
| label = emotion.get("label") |
| if label: |
| labels.add(normalize_label(label)) |
| return sorted(labels) |
|
|
| def row_to_targets(row, labels): |
| tokens_list = tokenize(row.get("text") or "") |
| label_to_idx = {label: idx for idx, label in enumerate(labels)} |
| y = np.zeros((len(tokens_list), len(labels)), dtype=float) |
| for emotion in row.get("emotional_flow") or []: |
| raw_label = emotion.get("label") |
| if not raw_label: |
| continue |
| label = normalize_label(raw_label) |
| if label not in label_to_idx: |
| continue |
| j = label_to_idx[label] |
| for curve in emotion.get("curves") or []: |
| start = int(curve.get("start_word", 0)) |
| end = int(curve.get("end_word", start)) |
| intensity = float( |
| max( |
| curve.get("start_intensity", 0.0), |
| curve.get("peak_intensity", 0.0), |
| curve.get("end_intensity", 0.0), |
| ) |
| ) |
| for i in range(max(start, 0), min(end, len(tokens_list) - 1) + 1): |
| y[i, j] = max(y[i, j], intensity) |
| return tokens_list, y |
|
|
| def build_training_set(rows): |
| usable_rows = [r for r in rows if (r.get("text") or "").strip() and r.get("emotional_flow")] |
| labels = discover_labels(usable_rows) |
| if not usable_rows: |
| raise ValueError("No non-empty labelled rows found") |
| if not labels: |
| raise ValueError("No emotion labels found") |
|
|
| X = [] |
| y_parts = [] |
| word_counts = [] |
| curve_points = 0 |
| label_counts = Counter() |
|
|
| for row in usable_rows: |
| tokens_list, y = row_to_targets(row, labels) |
| if not tokens_list: |
| continue |
| X.extend(token_features(tokens_list, i) for i in range(len(tokens_list))) |
| y_parts.append(y) |
| word_counts.append(len(tokens_list)) |
| for emotion in row.get("emotional_flow") or []: |
| label = normalize_label(emotion.get("label", "")) |
| if label: |
| label_counts[label] += 1 |
| curve_points += len(emotion.get("curves") or []) |
|
|
| Y = np.vstack(y_parts) |
| info = { |
| "total_rows": len(rows), |
| "usable_rows": len(usable_rows), |
| "word_examples": int(Y.shape[0]), |
| "curve_points": int(curve_points), |
| "labels": labels, |
| "label_counts": dict(sorted(label_counts.items())), |
| "avg_words_per_usable_row": statistics.mean(word_counts) if word_counts else 0, |
| } |
| return X, Y, labels, info |
|
|
| def train(): |
| rows = load_rows() |
| X, Y, labels, info = build_training_set(rows) |
|
|
| model = Pipeline( |
| steps=[ |
| ("features", DictVectorizer(sparse=True)), |
| ("regressor", MultiOutputRegressor(Ridge(alpha=1.0, random_state=42))), |
| ] |
| ) |
| model.fit(X, Y) |
|
|
| pred = np.clip(model.predict(X), 0.0, 1.0) |
| info["train_mae"] = float(mean_absolute_error(Y, pred)) |
| info["train_rmse"] = float(mean_squared_error(Y, pred) ** 0.5) |
| info["note"] = ( |
| "Training metrics are on the same tiny data used for fitting. " |
| "They measure fit/memorization, not real-world accuracy." |
| ) |
|
|
| payload = { |
| "model": model, |
| "labels": labels, |
| "info": info, |
| "version": 1, |
| } |
| MODEL_PATH.parent.mkdir(parents=True, exist_ok=True) |
| joblib.dump(payload, MODEL_PATH) |
| return info |
|
|
| def export_model_data(): |
| payload = joblib.load(MODEL_PATH) |
| model = payload["model"] |
| labels = payload["labels"] |
| info = payload["info"] |
|
|
| dv = model.named_steps["features"] |
| vocab = {k: int(v) for k, v in dv.vocabulary_.items()} |
| coefs = [] |
| intercepts = [] |
| for est in model.named_steps["regressor"].estimators_: |
| coefs.append(est.coef_.tolist()) |
| intercepts.append(float(est.intercept_)) |
|
|
| out = { |
| "labels": labels, |
| "vocab": vocab, |
| "coefs": coefs, |
| "intercepts": intercepts, |
| "n_features": len(vocab), |
| } |
| MODEL_DATA_PATH.parent.mkdir(parents=True, exist_ok=True) |
| with MODEL_DATA_PATH.open("w", encoding="utf-8") as f: |
| json.dump(out, f) |
| return info |
|
|
| |
|
|
| @app.get("/api/stats") |
| def get_stats(): |
| if not training_info: |
| return {"trained": False, "message": "Model not yet trained"} |
| return {"trained": True, **training_info} |
|
|
| @app.post("/api/train") |
| def train_endpoint(): |
| global training_info |
| try: |
| info = train() |
| export_model_data() |
| training_info = info |
| return {"ok": True, "training_info": info} |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.get("/api/model_data") |
| def get_model_data(): |
| if not MODEL_DATA_PATH.exists(): |
| raise HTTPException(status_code=404, detail="Model not trained yet. POST /api/train first.") |
| with MODEL_DATA_PATH.open("r", encoding="utf-8") as f: |
| return json.load(f) |
|
|
| |
| class NoCacheStaticFiles(StaticFiles): |
| async def get_response(self, path, scope): |
| response = await super().get_response(path, scope) |
| response.headers["Cache-Control"] = "no-cache, must-revalidate" |
| return response |
|
|
| app.mount("/", NoCacheStaticFiles(directory="static", html=True), name="static") |
|
|
| if __name__ == "__main__": |
| uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True) |
|
|