sitayeb's picture
Update app.py
47d0817 verified
# ============================================================
# IMPORTS
# ============================================================
import re
import os
import math
import pickle
import requests
from collections import Counter
import numpy as np
import pandas as pd
import faiss
import PyPDF2
import torch
import gradio as gr
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
from sentence_transformers import SentenceTransformer, CrossEncoder
from langdetect import detect, DetectorFactory
from gtts import gTTS
from transformers import pipeline as hf_pipeline
from transformers import pipeline
from datetime import datetime
from groq import Groq
from sklearn.preprocessing import MinMaxScaler
from scipy import stats
DetectorFactory.seed = 0
GROQ_API_KEY = os.environ.get("GROQ_API_KEY", "")
groq_client = Groq(api_key=GROQ_API_KEY)
print(f"DEBUG — Groq Key loaded: {bool(GROQ_API_KEY)}")
KB_TEXTS = []
KB_META = []
FAISS_INDEX = None
KB_EMB = None
DOC_TYPE_INFO = {"type": "📄 General", "is_economic": False, "score": 0}
PER_FILE_INFO = {}
CHAT_STATS = {"questions": 0, "found": 0, "not_found": 0}
MIN_SIMILARITY = 0.10
PERSIST_DIR = "/tmp"
KB_TEXTS_PATH = f"{PERSIST_DIR}/kb_texts.pkl"
KB_META_PATH = f"{PERSIST_DIR}/kb_meta.pkl"
FAISS_PATH = f"{PERSIST_DIR}/faiss.index"
os.makedirs(PERSIST_DIR, exist_ok=True)
def save_index():
if FAISS_INDEX is None or not KB_TEXTS:
return "⚠️ No index to save."
try:
with open(KB_TEXTS_PATH, "wb") as f: pickle.dump(KB_TEXTS, f)
with open(KB_META_PATH, "wb") as f: pickle.dump(KB_META, f)
faiss.write_index(FAISS_INDEX, FAISS_PATH)
return f"💾 Saved! {len(KB_TEXTS):,} chunks"
except Exception as e:
return f"❌ Save error: {e}"
def load_saved_index():
global KB_TEXTS, KB_META, FAISS_INDEX, DOC_TYPE_INFO
try:
if not os.path.exists(FAISS_PATH):
return "_No saved index found._"
with open(KB_TEXTS_PATH, "rb") as f: KB_TEXTS = pickle.load(f)
with open(KB_META_PATH, "rb") as f: KB_META = pickle.load(f)
FAISS_INDEX = faiss.read_index(FAISS_PATH)
DOC_TYPE_INFO = detect_document_type(KB_TEXTS)
return f"✅ **Index loaded!** `{len(KB_TEXTS):,}` chunks\n🏷️ Type: **{DOC_TYPE_INFO['type']}**"
except Exception as e:
return f"❌ Load error: {e}"
ECONOMIC_KEYWORDS = [
"gdp","inflation","monetary","fiscal","forecast","exchange rate",
"interest rate","unemployment","recession","growth rate","trade balance",
"budget deficit","central bank","economic outlook","imf","world bank",
"cpi","macro","revenue","expenditure","deficit","surplus","debt",
"croissance","taux","banque centrale","prévision","économique","pib",
"التضخم","الناتج المحلي","النمو الاقتصادي","البنك المركزي","سعر الصرف",
]
MEDICAL_KEYWORDS = ["patient","diagnosis","treatment","clinical","hospital","symptom","disease"]
LEGAL_KEYWORDS = ["article","law","contract","clause","jurisdiction","court","legal"]
ACADEMIC_KEYWORDS = ["abstract","methodology","hypothesis","conclusion","references","doi","journal"]
ECON_POSITIVE = [
"growth","recovery","surplus","improvement","stability","increase",
"expansion","acceleration","resilience","upturn","robust","favorable",
"strengthened","progress","rebound","optimistic","confidence","boom",
"prosper","thrive","advance","gain","rise","positive","upward",
"exceed","outperform","strong","healthy","dynamic","sustainable",
"croissance","reprise","amélioration","stabilité","excédent","hausse",
"dynamique","favorable","progrès","rebond","solide",
"تعافي","نمو","استقرار","فائض","تحسّن","ارتفاع","توسع","إيجابي",
"تقدم","قوي","ازدهار","انتعاش","تحسين","قوة",
]
ECON_NEGATIVE = [
"deficit","recession","inflation","decline","contraction","debt",
"crisis","deterioration","slowdown","downturn","unemployment","pressure",
"risk","vulnerability","shock","uncertainty","war","sanctions",
"drought","collapse","default","volatile","instability","weak",
"fragile","pessimistic","loss","shrink","fall","negative","downward",
"slump","stagnation","turbulence","disruption","imbalance","burden",
"déficit","récession","crise","ralentissement","chômage","incertitude",
"guerre","effondrement","instabilité","baisse","fragilité","pression",
"عجز","تضخم","ركود","انكماش","أزمة","تدهور","بطالة","انخفاض",
"ضغط","مخاطر","صدمة","عدم استقرار","هشاشة","ديون","عقوبات",
]
ECON_TRIGGER = [
"deficit","risk","crisis","recession","shock","uncertainty",
"slowdown","pressure","vulnerable","weak","deteriorat","downturn",
"contraction","debt","unemployment","inflation","collapse","volatile",
"instability","fragile","stagnation","disruption","sanctions","drought",
"growth","recovery","improvement","surplus","stable","expansion",
"resilience","rebound","strengthened","acceleration","robust",
"favorable","progress","increase","upturn","confidence","boom",
"gdp","forecast","outlook","trade","fiscal","monetary","exchange",
"interest","budget","revenue","expenditure","policy","reform",
"التضخم","الناتج","النمو","العجز","المخاطر","التوقعات","الميزانية",
"croissance","déficit","récession","prévision","taux","politique",
]
def economic_lexicon_score(text: str) -> float:
text_lower = text.lower()
pos = sum(1 for w in ECON_POSITIVE if w in text_lower)
neg = sum(1 for w in ECON_NEGATIVE if w in text_lower)
total = max(pos + neg, 1)
return round((pos - neg) / total, 4)
def detect_document_type(texts: list) -> dict:
if not texts:
return {"type":"📄 General","is_economic":False,"score":0,"confidence":0.0}
full_text = " ".join(texts[:30]).lower()
scores = {
"economic": sum(1 for kw in ECONOMIC_KEYWORDS if kw in full_text),
"medical" : sum(1 for kw in MEDICAL_KEYWORDS if kw in full_text),
"legal" : sum(1 for kw in LEGAL_KEYWORDS if kw in full_text),
"academic": sum(1 for kw in ACADEMIC_KEYWORDS if kw in full_text),
"general" : 1,
}
doc_type = max(scores, key=scores.get)
confidence = round(scores[doc_type] / max(sum(scores.values()), 1), 2)
icons = {
"economic":"📊 Economic","medical":"🏥 Medical",
"legal":"⚖️ Legal","academic":"🎓 Academic","general":"📄 General",
}
return {
"type" : icons.get(doc_type, "📄 General"),
"raw_type" : doc_type,
"is_economic": doc_type == "economic" and scores["economic"] >= 3,
"score" : scores[doc_type],
"confidence" : confidence,
}
WEIGHTS = {"finbert": 0.40, "xlm": 0.30, "lexicon": 0.30}
print("⏳ Loading FinBERT...")
try:
finbert_pipe = pipeline(
"text-classification", model="ProsusAI/finbert",
tokenizer="ProsusAI/finbert", return_all_scores=True,
device=0 if torch.cuda.is_available() else -1,
)
FINBERT_OK = True
print("✅ FinBERT loaded!")
except Exception as e:
print(f"⚠️ FinBERT failed: {e}")
finbert_pipe = None
FINBERT_OK = False
print("⏳ Loading XLM-RoBERTa...")
try:
xlm_pipe = pipeline(
"text-classification",
model="cardiffnlp/twitter-xlm-roberta-base-sentiment",
tokenizer="cardiffnlp/twitter-xlm-roberta-base-sentiment",
return_all_scores=True,
device=0 if torch.cuda.is_available() else -1,
)
XLM_OK = True
print("✅ XLM-RoBERTa loaded!")
except Exception as e:
print(f"⚠️ XLM-RoBERTa failed: {e}")
xlm_pipe = None
XLM_OK = False
def normalize_clf(raw):
if isinstance(raw, list) and raw and isinstance(raw[0], list):
raw = raw[0]
return raw if isinstance(raw, list) else [raw]
def clf_finbert(text: str) -> float:
if not FINBERT_OK or finbert_pipe is None: return 0.0
try:
items = normalize_clf(finbert_pipe(text[:512]))
d = {r["label"].lower(): float(r["score"]) for r in items}
return round(d.get("positive", 0.0) - d.get("negative", 0.0), 4)
except: return 0.0
def clf_xlm(text: str) -> float:
if not XLM_OK or xlm_pipe is None: return 0.0
try:
items = normalize_clf(xlm_pipe(text[:512]))
d = {r["label"]: float(r["score"]) for r in items}
pos = d.get("LABEL_2", d.get("positive", d.get("Positive", 0.0)))
neg = d.get("LABEL_0", d.get("negative", d.get("Negative", 0.0)))
return round(pos - neg, 4)
except: return 0.0
def sentiment_score_numeric(text: str) -> float:
fb = clf_finbert(text)
xlm = clf_xlm(text)
lex = economic_lexicon_score(text)
return round(WEIGHTS["finbert"]*fb + WEIGHTS["xlm"]*xlm + WEIGHTS["lexicon"]*lex, 4)
def run_sentiment(text: str):
score = sentiment_score_numeric(text)
if score > 0.05: sent = "Positive 😊"
elif score < -0.05: sent = "Negative 😞"
else: sent = "Neutral 😐"
return sent, round(min(abs(score), 1.0), 4)
def run_sentiment_detailed(text: str) -> str:
fb = clf_finbert(text)
xlm = clf_xlm(text)
lex = economic_lexicon_score(text)
final = sentiment_score_numeric(text)
def bar(s):
filled = max(0, min(10, round((s + 1) / 2 * 10)))
icon = "🟩" if s > 0.05 else "🟥" if s < -0.05 else "🟨"
return icon * filled + "⬜" * (10 - filled)
label = "🟢 **Positive**" if final > 0.05 else "🔴 **Negative**" if final < -0.05 else "🟡 **Neutral**"
return (
f"### 🏆 Ensemble Sentiment Breakdown\n\n"
f"| Model | Score | Bar | Weight |\n|---|---|---|---|\n"
f"| 🏦 FinBERT | `{fb:+.4f}` | {bar(fb)} | **40%** |\n"
f"| 🌍 XLM-RoBERTa | `{xlm:+.4f}` | {bar(xlm)} | **30%** |\n"
f"| 📖 Lexicon | `{lex:+.4f}` | {bar(lex)} | **30%** |\n"
f"| ⚡ **Final** | **`{final:+.4f}`** | {bar(final)} | **100%** |\n\n"
f"{label}"
)
print("⏳ Loading Embedder, Reranker, ASR...")
embedder = SentenceTransformer("sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2")
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2", max_length=512)
asr = hf_pipeline(
"automatic-speech-recognition",
model="openai/whisper-small",
device=0 if torch.cuda.is_available() else -1,
)
_ = embedder.encode(["warmup"], convert_to_numpy=True)
print("✅ All models loaded!")
_startup = load_saved_index()
print(f"🔄 Startup load: {_startup}")
def clean_filename(path: str) -> str:
return os.path.basename(str(path))
def detect_lang(text: str) -> str:
try:
return "ar" if str(detect(str(text)[:300])).startswith("ar") else "en"
except:
return "en"
def extract_year_from_filename(filename: str):
full_path = str(filename).replace("\\", "/")
for part in reversed(full_path.split("/")):
m = re.findall(r"\b(20\d{2}|19\d{2})\b", part)
if m: return int(m[0])
for pat in [r'WEO[_\-\s]?(\d{4})', r'BOA[_\-\s]?(\d{4})',
r'IMF[_\-\s]?(\d{4})', r'rapport[_\-\s]?(\d{4})',
r'report[_\-\s]?(\d{4})']:
m = re.search(pat, full_path, re.IGNORECASE)
if m: return int(m.group(1))
all_y = re.findall(r'\b(19\d{2}|20\d{2})\b', full_path)
return int(all_y[0]) if all_y else None
def chunk_text(text, chunk_size=300, overlap=80):
text = re.sub(r"\s+", " ", str(text)).strip()
sentences = re.split(r"(?<=[.!?؟\n])\s+", text)
chunks, current = [], ""
for sent in sentences:
if len(current) + len(sent) <= chunk_size:
current += " " + sent
else:
if current.strip(): chunks.append(current.strip())
words = current.split()
current = " ".join(words[-overlap // 5:]) + " " + sent if words else sent
if current.strip(): chunks.append(current.strip())
return [c for c in chunks if len(c) > 30]
def load_file(path):
path = str(path)
if path.endswith(".pdf"):
pages = []
try:
import pypdf
with open(path, "rb") as f:
reader = pypdf.PdfReader(f)
for i, pg in enumerate(reader.pages[:50]):
t = pg.extract_text()
if t and t.strip(): pages.append({"text": t, "page": i+1})
except: pass
if not pages:
try:
with open(path, "rb") as f:
reader = PyPDF2.PdfReader(f)
for i, pg in enumerate(reader.pages[:50]):
t = pg.extract_text()
if t and t.strip(): pages.append({"text": t, "page": i+1})
except: pass
return pages or [{"text": "Could not extract text.", "page": 1}]
if path.endswith(".docx"):
try:
from docx import Document
doc = Document(path)
pars = [p.text for p in doc.paragraphs if p.text.strip()]
return [{"text": "\n".join(pars[i:i+50]), "page": i//50+1}
for i in range(0, len(pars), 50)] or [{"text":"Empty DOCX.","page":1}]
except Exception as e:
return [{"text": f"DOCX error: {e}", "page": 1}]
if path.endswith(".csv"):
df = pd.read_csv(path)
col = "text" if "text" in df.columns else df.columns[0]
return [{"text": t, "page": i+1} for i, t in enumerate(df[col].dropna().astype(str))]
with open(path, "r", encoding="utf-8", errors="ignore") as f:
return [{"text": f.read(), "page": 1}]
def build_index(files, progress=gr.Progress(track_tqdm=True)):
global KB_TEXTS, KB_META, FAISS_INDEX, KB_EMB, DOC_TYPE_INFO, PER_FILE_INFO
KB_TEXTS, KB_META, PER_FILE_INFO = [], [], {}
import time
t_start = time.time()
def elapsed(): return f"{time.time()-t_start:.1f}s"
if not files:
return "⚠️ Upload at least one file."
file_paths = []
if not isinstance(files, list): files = [files]
for f in files:
try:
if isinstance(f, str): file_paths.append(f)
elif isinstance(f, dict):
p = f.get("path") or f.get("name") or f.get("orig_name")
if p: file_paths.append(str(p))
elif hasattr(f, "name"): file_paths.append(str(f.name))
elif hasattr(f, "path"): file_paths.append(str(f.path))
else: file_paths.append(str(f))
except Exception as ex:
print(f"[build_index] file parse error: {ex}")
continue
if not file_paths:
return "❌ Could not read file paths. Try re-uploading."
progress(0.05, desc=f"[{elapsed()}] Step 1/4 -- Reading files...")
for p in file_paths:
full_path = str(p)
if not os.path.exists(full_path):
print(f"[build_index] File not found: {full_path}")
continue
fname = clean_filename(full_path)
year = extract_year_from_filename(fname) or extract_year_from_filename(full_path)
try:
pages = load_file(full_path)
except Exception as e:
print(f"[build_index] load_file error: {e}")
continue
file_texts = []
for pg in pages:
for ch in chunk_text(pg["text"]):
KB_TEXTS.append(ch)
KB_META.append({"name": fname, "lang": detect_lang(ch),
"page": pg["page"], "year": year})
file_texts.append(ch)
ti = detect_document_type(file_texts)
ti["year"] = year
PER_FILE_INFO[fname] = ti
if not KB_TEXTS:
return "❌ No text extracted. Check that PDFs are not scanned images."
progress(0.25, desc=f"[{elapsed()}] Step 2/4 -- Embedding {len(KB_TEXTS)} chunks...")
try:
KB_EMB = embedder.encode(
KB_TEXTS, convert_to_numpy=True,
normalize_embeddings=True, show_progress_bar=True, batch_size=64,
).astype("float32")
except Exception as e:
return f"❌ Embedding error: {e}"
progress(0.80, desc=f"[{elapsed()}] Step 3/4 -- Building FAISS index...")
try:
FAISS_INDEX = faiss.IndexFlatIP(KB_EMB.shape[1])
FAISS_INDEX.add(KB_EMB)
except Exception as e:
return f"❌ FAISS error: {e}"
progress(0.92, desc=f"[{elapsed()}] Step 4/4 -- Saving...")
DOC_TYPE_INFO = detect_document_type(KB_TEXTS)
lang_count = Counter(m["lang"] for m in KB_META)
tbl = "| 📄 File | 📅 Year | 🏷️ Type | 🎯 Conf | 📦 Chunks |\n|---|---|---|---|---|\n"
for fname, info in PER_FILE_INFO.items():
n = sum(1 for m in KB_META if m["name"] == fname)
yr = str(info.get("year", "N/A"))
yrb = f"{yr} ✅" if yr not in ["None","N/A"] else "N/A ⚠️"
badge = " 🟢" if info["is_economic"] else ""
tbl += f"| `{fname}` | {yrb} | {info['type']}{badge} | {info['confidence']:.0%} | {n} |\n"
ef = [f for f, i in PER_FILE_INFO.items() if i["is_economic"]]
fmsg = (
"\n\n🟢 **Economic files detected:** " +
", ".join(f"`{f}`" for f in ef) +
"\n➡️ Go to **📈 7 - Forecast** tab to run predictions."
) if ef else ""
save_index()
total_time = time.time() - t_start
progress(1.0, desc=f"✅ Done in {total_time:.1f}s!")
return (
f"✅ **Index built in `{total_time:.1f}s`!**\n\n"
f"| | |\n|---|---|\n"
f"| ⏱️ Total time | **{total_time:.1f} seconds** |\n"
f"| 📦 Total chunks | **{len(KB_TEXTS):,}** |\n"
f"| 📄 Files | **{len(file_paths)}** |\n"
f"| 🇸🇦 Arabic | **{lang_count.get('ar',0):,}** |\n"
f"| 🇺🇸 English | **{lang_count.get('en',0):,}** |\n\n"
f"---\n### 📋 Per-File Analysis\n\n{tbl}{fmsg}"
)
def bm25_score(query_terms, doc, k1=1.5, b=0.75, avg_dl=200):
try:
if not KB_TEXTS or not isinstance(doc, str): return 0.0
dl, score = len(doc.split()), 0.0
df = Counter(doc.lower().split())
for term in query_terms:
if not isinstance(term, str) or not term: continue
tl = term.lower()
n_doc = sum(1 for t in KB_TEXTS if isinstance(t,str) and tl in t.lower())
tf = df.get(tl, 0)
idf = math.log((len(KB_TEXTS)+1)/(1+n_doc))
score += idf*(tf*(k1+1))/(tf+k1*(1-b+b*dl/max(avg_dl,1)))
return score
except: return 0.0
def rag_retrieve(query, k=5, top_n=3):
if FAISS_INDEX is None or not KB_TEXTS: return []
try:
q_emb = embedder.encode(
[query], convert_to_numpy=True, normalize_embeddings=True
).astype("float32")
scores, idx = FAISS_INDEX.search(q_emb, min(k*3, len(KB_TEXTS)))
candidates, qterms = [], [t for t in re.findall(r"\w+", str(query).lower()) if t]
for rank, i in enumerate(idx[0]):
if i == -1: continue
sem = float(scores[0][rank])
if sem < MIN_SIMILARITY: continue
text = KB_TEXTS[i]
if not isinstance(text, str): continue
kw = bm25_score(qterms, text)
lterms = [t for t in qterms if len(t) > 2]
try:
exact = all(re.search(rf"\b{re.escape(t)}\b", text.lower()) for t in lterms) if lterms else False
except: exact = False
hybrid = sem*0.6 + min(kw/10, 0.4) + (0.15 if exact else 0.0)
candidates.append({
"idx": i, "sem": sem, "kw": kw, "exact": exact, "hybrid": hybrid,
"lang": KB_META[i]["lang"], "file": KB_META[i]["name"],
"page": KB_META[i]["page"], "year": KB_META[i].get("year"), "text": text,
})
if not candidates: return []
ce_scores = reranker.predict([[query, c["text"]] for c in candidates])
for c, ce in zip(candidates, ce_scores):
c["ce_score"] = float(ce)
c["final"] = c["hybrid"]*0.4 + (float(ce)+10)/20*0.6
candidates.sort(key=lambda x: x["final"], reverse=True)
for i, c in enumerate(candidates[:top_n]): c["rank"] = i+1
return candidates[:top_n]
except Exception as e:
print(f"rag_retrieve error: {e}")
return []
def get_economic_chunks(texts: list, max_chunks: int = 40) -> list:
n = len(texts)
econ = [t for t in texts if any(kw in t.lower() for kw in ECON_TRIGGER)]
if len(econ) < 10:
start = texts[:min(10, n)]
mid = texts[n//2-5 : n//2+5] if n > 20 else []
end = texts[-min(10, n):]
econ = list(dict.fromkeys(start + mid + end))
if len(econ) > max_chunks:
step = max(1, len(econ) // max_chunks)
sample = econ[::step][:max_chunks]
else:
sample = econ
return sample
def llm_groq(question, rag_context, history, lang):
system_prompt = (
"You are a smart multilingual AI assistant.\n"
"- Always reply in the SAME language as the user question.\n"
"- If Arabic reply fully in Arabic. If English reply fully in English.\n"
"- Use document context precisely and cite page numbers.\n"
"- If answer not in docs, use general knowledge and say so.\n"
"- Be concise, helpful, accurate."
)
messages = [{"role": "system", "content": system_prompt}]
for turn in history[-4:]:
messages.append({"role": turn["role"], "content": turn["content"]})
user_content = f"📄 Context:\n{rag_context}\n\nQuestion: {question}" if rag_context else question
messages.append({"role": "user", "content": user_content})
try:
r = groq_client.chat.completions.create(
model="llama-3.3-70b-versatile",
messages=messages, temperature=0.3, max_tokens=512,
)
return r.choices[0].message.content.strip()
except Exception as e:
return f"⚠️ Groq error: {e}"
def smart_answer(question, history):
lang = detect_lang(question)
results = rag_retrieve(question, k=5, top_n=3)
rag_context = ""
if results:
for r in results:
rag_context += f"[Source: {r['file']} - Page {r['page']}]\n{r['text']}\n\n"
has_good_rag = bool(results) and results[0]["sem"] >= 0.25
answer_text = llm_groq(question, rag_context[:2000], history, lang)
if has_good_rag:
src = ", ".join(f"`{r['file']}` p.{r['page']}" for r in results)
badge = f"\n\n📄 **{'المصدر' if lang=='ar' else 'Source'}:** {src}"
CHAT_STATS["found"] += 1
else:
badge = f"\n\n_🤖 {'إجابة عامة.' if lang=='ar' else 'General knowledge answer.'}_"
CHAT_STATS["not_found"] += 1
CHAT_STATS["questions"] += 1
return answer_text + badge, "rag" if has_good_rag else "llm"
def predict_with_rag(text):
text = "" if text is None else str(text).strip()
if not text: raise gr.Error("⚠️ Enter text first.")
lang = detect_lang(text)
qterms = [t for t in re.findall(r"\w+", text.lower()) if len(t) > 2]
exact_hits = []
for i, chunk in enumerate(KB_TEXTS):
if not isinstance(chunk, str): continue
cl = chunk.lower()
for term in qterms:
try:
if re.search(rf"\b{re.escape(term)}\b", cl):
for s in re.split(r"(?<=[.!?؟\n])\s+", chunk):
if re.search(rf"\b{re.escape(term)}\b", s.lower()):
exact_hits.append({
"word": term, "file": KB_META[i]["name"],
"sentence": s.strip(), "lang": KB_META[i]["lang"],
"chunk_id": i, "page": KB_META[i]["page"],
})
except: continue
sem_results = rag_retrieve(text, k=5, top_n=3)
md = ""
if exact_hits:
seen, unique = set(), []
for h in exact_hits:
key = (h["word"], h["file"], h["sentence"][:80])
if key not in seen: seen.add(key); unique.append(h)
md += "## ✅ Word Found\n\n"
for h in unique:
flag = "🇸🇦" if h["lang"]=="ar" else "🇺🇸"
md += f"- 🔑 **`{h['word']}`** → 📄 `{h['file']}` p.{h['page']} {flag}\n\n > {h['sentence']}\n\n"
detail = run_sentiment_detailed(text)
sent, conf = run_sentiment(text)
md += f"---\n{detail}\n\n---\n## 📍 Exact Location\n\n"
seen2 = set()
for h in unique:
k2 = (h["file"], h["chunk_id"])
if k2 in seen2: continue
seen2.add(k2)
md += f"### 📄 `{h['file']}` — p.{h['page']} {'🇸🇦' if h['lang']=='ar' else '🇺🇸'}\n\n```\n{KB_TEXTS[h['chunk_id']]}\n```\n\n"
else:
sent, conf = "❌ Not found", 0.0
if lang == "ar":
md += f"## ❌ الكلمة غير موجودة\n\n**`{text}`** لم تُذكر حرفياً.\n\n"
else:
md += f"## ❌ Word Not Found\n\n**`{text}`** not found literally.\n\n"
if sem_results:
md += "---\n## 🔍 Semantic Results\n\n"
for r in sem_results:
bar = "🟩"*round(r["sem"]*10) + "⬜"*(10-round(r["sem"]*10))
snippet = r["text"][:300].strip()
for t in qterms:
try: snippet = re.sub(rf"(?i)({re.escape(t)})", r"**\1**", snippet)
except: pass
md += (
f"### Result {r['rank']} -- {bar} `{r['sem']*100:.1f}%` "
f"{'🇸🇦' if r['lang']=='ar' else '🇺🇸'}\n\n"
f"📄 `{r['file']}` p.{r['page']}\n\n> {snippet}...\n\n"
)
else:
md += "---\n_No similar content found._\n"
return sent, round(conf, 4), md
def get_keywords():
if not KB_TEXTS: return "_No index._"
all_words = re.findall(r"\b[a-zA-Z]{4,}\b", " ".join(KB_TEXTS[:200]).lower())
stopwords = {
"with","that","this","from","have","been","will","were","they",
"their","which","when","what","also","more","into","than","some",
"other","about","these","over","such","after","most","made","each",
"where","while","through","between","during","before",
}
filtered = [w for w in all_words if w not in stopwords]
top = Counter(filtered).most_common(25)
rows = "\n".join(f"| `{w}` | {c} |" for w, c in top)
return f"### 🔑 Top Keywords\n\n| Word | Count |\n|---|---|\n{rows}"
def tts_output(text: str, lang_hint: str = "auto"):
if not text or not text.strip(): return None
try:
lang = "ar" if detect_lang(text) == "ar" else "en"
path = "/tmp/tts_output.mp3"
gTTS(text=text[:500], lang=lang, slow=False).save(path)
return path
except Exception as e:
print(f"TTS error: {e}")
return None
def transcribe_audio(audio_path):
if audio_path is None: return ""
try:
result = asr(str(audio_path))
return result.get("text", "")
except Exception as e:
return f"⚠️ ASR error: {e}"
def export_chat(history_state):
if not history_state: return None
try:
lines = []
for msg in history_state:
role = "User" if msg["role"] == "user" else "Assistant"
lines.append(f"[{role}]\n{msg['content']}\n")
path = "/tmp/chat_export.txt"
with open(path, "w", encoding="utf-8") as f:
f.write("\n".join(lines))
return path
except Exception as e:
print(f"Export error: {e}")
return None
def get_kb_stats():
if not KB_TEXTS: return "_No index loaded._"
lang_count = Counter(m["lang"] for m in KB_META)
year_count = Counter(str(m.get("year","N/A")) for m in KB_META)
file_count = Counter(m["name"] for m in KB_META)
lines = [
f"### 📊 Knowledge Base Statistics\n",
"| Metric | Value |", "|---|---|",
f"| 📦 Total chunks | **{len(KB_TEXTS):,}** |",
f"| 📄 Unique files | **{len(file_count)}** |",
f"| 🇸🇦 Arabic chunks | **{lang_count.get('ar',0):,}** |",
f"| 🇺🇸 English chunks | **{lang_count.get('en',0):,}** |",
f"| 🏷️ Doc type | **{DOC_TYPE_INFO['type']}** |",
"\n#### 📅 Chunks per Year\n", "| Year | Chunks |", "|---|---|",
]
for yr, cnt in sorted(year_count.items()):
lines.append(f"| {yr} | {cnt} |")
lines += ["\n#### 📄 Chunks per File\n", "| File | Chunks |", "|---|---|"]
for fname, cnt in sorted(file_count.items(), key=lambda x: -x[1]):
lines.append(f"| `{fname[:50]}` | {cnt} |")
return "\n".join(lines)
# ============================================================
# WORLD BANK
# ============================================================
def get_worldbank_data(country_code, indicator, start_year, end_year):
url = (
f"https://api.worldbank.org/v2/country/{country_code}/"
f"indicator/{indicator}?date={start_year}:{end_year}&per_page=100&format=json"
)
try:
resp = requests.get(url, timeout=15)
resp.raise_for_status()
data = resp.json()
if not data or len(data) < 2 or not data[1]: return pd.DataFrame()
rows = [
{"year": int(e["date"]), "value": float(e["value"])}
for e in data[1]
if e.get("value") is not None and e.get("date") is not None
]
return pd.DataFrame(rows).dropna().sort_values("year").reset_index(drop=True)
except Exception as e:
print(f"World Bank error: {e}")
return pd.DataFrame()
def build_doc_sentiment_index():
if not KB_TEXTS or not KB_META: return None, None
files_texts = {}
for text, meta in zip(KB_TEXTS, KB_META):
files_texts.setdefault(meta["name"], []).append(text[:400])
yearly_sentiment, file_results = {}, []
for fname, texts in files_texts.items():
sample = get_economic_chunks(texts, max_chunks=40)
scores = [sentiment_score_numeric(t) for t in sample]
avg = round(float(np.mean(scores)), 4)
year = next((m["year"] for m in KB_META if m["name"]==fname and m.get("year")), None)
file_results.append({
"file": fname, "year": year if year else "N/A",
"sentiment": avg, "n_chunks": len(sample),
"label": "🟢 Optimistic" if avg > 0.05 else "🔴 Pessimistic" if avg < -0.05 else "🟡 Neutral",
})
if year: yearly_sentiment.setdefault(year, []).append(avg)
yearly_avg = {yr: round(float(np.mean(vals)),4) for yr,vals in yearly_sentiment.items()}
df_files = pd.DataFrame(file_results).sort_values("year")
df_yearly = (
pd.DataFrame([{"year":y,"sentiment":s} for y,s in sorted(yearly_avg.items())])
if yearly_avg else None
)
return df_files, df_yearly
def run_adf_check(series: np.ndarray, name: str):
from statsmodels.tsa.stattools import adfuller
def adf_p(s):
try: return adfuller(s, autolag='AIC')[1]
except: return 1.0
s = series.copy()
p0 = adf_p(s)
if p0 <= 0.05:
return s, f"✅ Stationary at level (p={p0:.4f})", False
s1 = np.diff(s); p1 = adf_p(s1)
if p1 <= 0.05:
return s1, f"⚠️ Non-stationary (p={p0:.4f}) → 1st diff → ✅ stationary (p={p1:.4f})", True
s2 = np.diff(s1); p2 = adf_p(s2)
return (s2, f"⚠️ Non-stationary (p={p0:.4f}) → 1st diff (p={p1:.4f}) → 2nd diff → {'✅' if p2<=0.05 else '⚠️'} (p={p2:.4f})", True)
def run_granger_test(series_y, series_exog, maxlag=4):
try:
from statsmodels.tsa.stattools import grangercausalitytests
if len(series_y) < 10: return "⚠️ **Granger Test skipped** — need >= 10 points.", False
sy, status_y = run_adf_check(series_y.copy(), "Target")[:2]
sexog, status_exog = run_adf_check(series_exog.copy(), "Sentiment")[:2]
min_len = min(len(sy), len(sexog))
sy, sexog = sy[-min_len:], sexog[-min_len:]
maxlag = min(maxlag, max(1, (len(sy)-1)//3))
if len(sy) < 5: return "⚠️ **Granger Test skipped** — too few obs.", False
gc_result = grangercausalitytests(np.column_stack([sy, sexog]), maxlag=maxlag, verbose=False)
rows, any_pass, best_p = [], False, 1.0
for lag, res in gc_result.items():
p_val = res[0]["ssr_ftest"][1]
f_val = res[0]["ssr_ftest"][0]
sig = "✅ Yes" if p_val < 0.05 else ("🔶 Marginal" if p_val < 0.10 else "❌ No")
if p_val < 0.05: any_pass = True
best_p = min(best_p, p_val)
rows.append(f"| {lag} | {f_val:.4f} | {p_val:.4f} | {sig} |")
table = (
"### 🔬 Granger Causality Test\n*H0: Sentiment does NOT Granger-cause Target*\n\n"
f"| Series | ADF Result |\n|---|---|\n"
f"| Target | {status_y} |\n| Sentiment | {status_exog} |\n\n"
"| Lag | F-stat | p-value | Significant? |\n|-----|--------|---------|-------------|\n"
+ "\n".join(rows)
)
if any_pass: verdict = "\n\n✅ **PASS** — Sentiment Granger-causes the target (p < 0.05)."
elif best_p < 0.10: verdict = f"\n\n🔶 **MARGINAL** — best p = {best_p:.4f}."
else: verdict = "\n\n❌ **FAIL** — No significant Granger causality."
return table + verdict, any_pass
except Exception as e:
return f"⚠️ Granger test error: `{e}`\n", False
def run_dm_test(actual, pred_arima, pred_sarimax):
try:
n = len(actual)
if n < 3: return "⚠️ **DM Test skipped** — n < 3.", False
d = (actual - pred_arima)**2 - (actual - pred_sarimax)**2
d_mean = np.mean(d)
d_std = np.std(d, ddof=1)
if d_std < 1e-10: return "⚠️ **DM Test** — models identical.", False
dm_stat = d_mean / (d_std / np.sqrt(n))
p_val = 2 * (1 - stats.t.cdf(abs(dm_stat), df=n-1))
sig = "✅ Yes" if p_val < 0.05 else ("🔶 Marginal" if p_val < 0.10 else "❌ No")
better = "SARIMAX+Ensemble" if dm_stat > 0 else "ARIMA"
table = (
"### 🎯 Diebold-Mariano Test\n*H0: Equal accuracy | H1: SARIMAX better*\n\n"
"| DM Statistic | p-value | n (test) | Significant? | Better Model |\n"
"|-------------|---------|----------|-------------|-------------|\n"
f"| `{dm_stat:.4f}` | `{p_val:.4f}` | `{n}` | {sig} | **{better}** |\n"
)
passed = p_val < 0.05 and dm_stat > 0
if passed: verdict = "\n✅ **PASS** — SARIMAX+Ensemble significantly better (p < 0.05)."
elif p_val<0.10 and dm_stat>0: verdict = f"\n🔶 **MARGINAL** — p = {p_val:.4f}."
else: verdict = f"\n❌ **FAIL** — Not significant (p = {p_val:.4f}). Expand year range for more test data."
return table + verdict, passed
except Exception as e:
return f"⚠️ DM error: `{e}`\n", False
# ============================================================
# MAIN FORECAST — always test on last 3 years
# ============================================================
def run_economic_forecast(country_code, target_var, start_year, end_year):
try:
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.statespace.sarimax import SARIMAX
from sklearn.metrics import mean_squared_error, mean_absolute_error
except ImportError:
return "❌ pip install statsmodels scikit-learn", None
ISO3_TO_ISO2 = {
"DZA":"DZ","MAR":"MA","TUN":"TN","EGY":"EG","SAU":"SA",
"USA":"US","FRA":"FR","GBR":"GB","DEU":"DE","CHN":"CN",
"BRA":"BR","IND":"IN","TUR":"TR","NGA":"NG","ZAF":"ZA",
}
country_code = ISO3_TO_ISO2.get(str(country_code).strip().upper(),
str(country_code).strip().upper())
indicator_map = {
"Inflation (CPI %)" : "FP.CPI.TOTL.ZG",
"GDP Growth (%)" : "NY.GDP.MKTP.KD.ZG",
"Unemployment (%)" : "SL.UEM.TOTL.ZS",
"Exchange Rate" : "PA.NUS.FCRF",
}
econ_df = get_worldbank_data(
country_code,
indicator_map.get(target_var, "FP.CPI.TOTL.ZG"),
int(start_year), int(end_year),
)
if econ_df.empty:
return f"❌ No data for **{country_code}** / **{target_var}**", None
if len(econ_df) < 5:
return f"⚠️ Only **{len(econ_df)}** data points — widen year range.", None
df_files, df_yearly = build_doc_sentiment_index()
if df_yearly is not None and len(df_yearly) >= 2:
merged = econ_df.merge(df_yearly, on="year", how="left")
merged["sentiment"] = merged["sentiment"].fillna(float(df_yearly["sentiment"].mean()))
has_yearly = True
mode_msg = "✅ **Yearly Ensemble Sentiment**"
else:
global_sent = (
float(pd.to_numeric(df_files["sentiment"], errors="coerce").mean())
if df_files is not None and len(df_files) > 0 else 0.0
)
merged = econ_df.copy()
merged["sentiment"] = global_sent
has_yearly = False
mode_msg = "⚠️ **Global Sentiment**"
if merged["sentiment"].std() > 1e-6:
scaler = MinMaxScaler(feature_range=(-0.3, 0.3))
merged["sentiment"] = scaler.fit_transform(
merged["sentiment"].values.reshape(-1,1)
).flatten().round(4)
series = merged["value"].values.astype(float)
exog = merged["sentiment"].values.reshape(-1, 1)
years = merged["year"].values
n = len(series)
# ── Always test on last 3 years ───────────────────
n_train = n - 3
n_test = 3
if n_train < 5: # safety for short series
n_train = max(int(n * 0.75), 5)
n_test = n - n_train
train_y, test_y = series[:n_train], series[n_train:]
train_exog, test_exog = exog[:n_train], exog[n_train:]
test_years = years[n_train:]
# ── ARIMA ─────────────────────────────────────────
try:
m1 = ARIMA(train_y, order=(1,1,1)).fit()
pred_arima = m1.forecast(n_test)
rmse_a = float(np.sqrt(mean_squared_error(test_y, pred_arima)))
mae_a = float(mean_absolute_error(test_y, pred_arima))
mape_a = float(np.mean(np.abs((test_y-pred_arima)/np.maximum(np.abs(test_y),1e-8)))*100)
except Exception as e:
return f"❌ ARIMA error: {e}", None
# ── SARIMAX + Sentiment ───────────────────────────
try:
m2 = SARIMAX(train_y, exog=train_exog, order=(1,1,1)).fit(disp=False)
pred_sarimax = m2.forecast(n_test, exog=test_exog)
rmse_s = float(np.sqrt(mean_squared_error(test_y, pred_sarimax)))
mae_s = float(mean_absolute_error(test_y, pred_sarimax))
mape_s = float(np.mean(np.abs((test_y-pred_sarimax)/np.maximum(np.abs(test_y),1e-8)))*100)
except Exception as e:
return f"❌ SARIMAX error: {e}", None
impr_rmse = (rmse_a - rmse_s) / rmse_a * 100
impr_mae = (mae_a - mae_s) / mae_a * 100
impr_mape = (mape_a - mape_s) / mape_a * 100
if has_yearly and df_yearly is not None and len(df_yearly) >= 5:
real_merged = econ_df.merge(df_yearly, on="year", how="inner")
gc_y = real_merged["value"].values.astype(float)
gc_exog = real_merged["sentiment"].values.astype(float)
else:
gc_y = series
gc_exog = merged["sentiment"].values
granger_md, granger_pass = run_granger_test(gc_y, gc_exog, maxlag=4)
dm_md, dm_pass = run_dm_test(test_y, np.array(pred_arima), np.array(pred_sarimax))
# ── Charts ────────────────────────────────────────
fig, axes = plt.subplots(4, 1, figsize=(11, 18))
axes[0].plot(years, series, "o-", color="#2196F3", label="Actual", lw=2, ms=5)
axes[0].plot(test_years, pred_arima, "s--", color="#FF5722", label="ARIMA(1,1,1)", lw=2)
axes[0].plot(test_years, pred_sarimax, "^-.", color="#4CAF50", label="SARIMAX+Ensemble", lw=2)
axes[0].axvline(x=years[n_train-1], color="gray", linestyle=":", alpha=0.7, label="Train|Test")
axes[0].set_title(
f"📈 {target_var} -- {country_code} | n_train={n_train} | n_test={n_test}",
fontsize=11, fontweight="bold"
)
axes[0].legend(fontsize=9); axes[0].grid(True, alpha=0.3)
s_clrs = ["#4CAF50" if s > 0.05 else "#FF5722" if s < -0.05 else "#FFC107"
for s in merged["sentiment"]]
axes[1].bar(years, merged["sentiment"], color=s_clrs, edgecolor="white", width=0.6)
axes[1].axhline(y=0, color="black", lw=0.8)
legend_patches = [
Patch(color="#4CAF50", label="Optimistic (>0.05)"),
Patch(color="#FFC107", label="Neutral"),
Patch(color="#FF5722", label="Pessimistic (<-0.05)"),
]
axes[1].legend(handles=legend_patches, fontsize=8, loc="upper right")
axes[1].set_title(
"📊 Ensemble Sentiment Index (FinBERT 40% + XLM 30% + Lexicon 30%) -- normalized [-0.3, +0.3]",
fontsize=10, fontweight="bold"
)
axes[1].grid(True, alpha=0.3, axis="y")
bar_colors = ["#FF5722" if rmse_a > rmse_s else "#4CAF50",
"#4CAF50" if rmse_s <= rmse_a else "#FF5722"]
bars = axes[2].bar(
["ARIMA(1,1,1)", "SARIMAX+Ensemble"],
[rmse_a, rmse_s], color=bar_colors, width=0.4, edgecolor="white"
)
for bar, val in zip(bars, [rmse_a, rmse_s]):
axes[2].text(bar.get_x()+bar.get_width()/2, bar.get_height()+0.01,
f"{val:.4f}", ha="center", va="bottom", fontweight="bold", fontsize=11)
axes[2].set_title("📉 RMSE Comparison (lower = better)", fontsize=11)
axes[2].set_ylabel("RMSE"); axes[2].grid(True, alpha=0.3, axis="y")
axes[3].axis("off")
test_data = [
["Test", "Result", "Interpretation"],
["Granger (Sentiment → Target)",
"✅ PASS" if granger_pass else "❌ FAIL",
"Sentiment Granger-causes Target" if granger_pass else "No causal link detected"],
["Diebold-Mariano (SARIMAX vs ARIMA)",
"✅ PASS" if dm_pass else "❌ FAIL",
"SARIMAX significantly better" if dm_pass else f"n_test={n_test} -- expand range"],
]
tbl4 = axes[3].table(
cellText=test_data[1:], colLabels=test_data[0],
cellLoc="center", loc="center", colWidths=[0.35, 0.2, 0.45]
)
tbl4.auto_set_font_size(False); tbl4.set_fontsize(11); tbl4.scale(1, 2.5)
for (row, col), cell in tbl4.get_celld().items():
if row == 0:
cell.set_facecolor("#1565C0"); cell.set_text_props(color="white", fontweight="bold")
elif row == 1:
cell.set_facecolor("#E8F5E9" if granger_pass else "#FFEBEE")
elif row == 2:
cell.set_facecolor("#E8F5E9" if dm_pass else "#FFEBEE")
axes[3].set_title("🔬 Statistical Tests Summary", fontsize=12, fontweight="bold", pad=20)
plt.tight_layout(pad=3.0)
img_path = "/tmp/forecast_plot.png"
plt.savefig(img_path, dpi=130, bbox_inches="tight")
plt.close(fig)
sent_table = ""
if df_files is not None and len(df_files) > 0:
sent_table = "\n---\n### 📄 Sentiment per File\n| File | Year | Score | Label |\n|---|---|---|---|\n"
for _, row in df_files.iterrows():
sent_table += f"| `{row['file']}` | {row['year']} | `{row['sentiment']:+.4f}` | {row['label']} |\n"
result_md = (
f"## 📊 Forecast -- {country_code} / {target_var}\n\n"
f"| | |\n|---|---|\n"
f"| 🎯 Target | **{target_var}** |\n"
f"| 📈 Mode | {mode_msg} |\n"
f"| 📈 n_train | **{n_train}** |\n"
f"| 🧪 n_test | **{n_test} (last 3 years)** |\n\n"
f"---\n### 🏆 Model Comparison\n"
f"| Model | RMSE | MAE | MAPE |\n|---|---|---|---|\n"
f"| ARIMA(1,1,1) | `{rmse_a:.4f}` | `{mae_a:.4f}` | `{mape_a:.1f}%` |\n"
f"| SARIMAX+Ensemble | `{rmse_s:.4f}` | `{mae_s:.4f}` | `{mape_s:.1f}%` |\n"
f"| **Improvement** | **{impr_rmse:+.1f}%** | **{impr_mae:+.1f}%** | **{impr_mape:+.1f}%** |\n\n"
f"{'✅ SARIMAX improved RMSE by adding Sentiment.' if impr_rmse > 0 else '⚠️ No RMSE improvement this run.'}\n\n"
f"---\n{granger_md}\n\n---\n{dm_md}\n{sent_table}"
)
return result_md, img_path
def run_all_variables(country_code, start_year, end_year):
variables = [
"Inflation (CPI %)",
"GDP Growth (%)",
"Unemployment (%)",
"Exchange Rate",
]
c = str(country_code).strip().upper()
sy = int(start_year)
ey = int(end_year)
summary = f"## All Variables -- {c} ({sy}-{ey})\n\n"
summary += "| Variable | ARIMA RMSE | SARIMAX RMSE | Improvement | Granger | DM Test |\n"
summary += "|---|---|---|---|---|---|\n"
all_details = ""
for var in variables:
md, _ = run_economic_forecast(c, var, sy, ey)
ar_match = re.search(r"ARIMA\(1,1,1\)\s+\| `([0-9.]+)`", md)
sx_match = re.search(r"SARIMAX\+Ensemble \| `([0-9.]+)`", md)
im_match = re.search(r"\*\*Improvement\*\*.*?\| \*\*([+-][0-9.]+%)\*\*", md)
ar_r = f"`{ar_match.group(1)}`" if ar_match else "N/A"
sx_r = f"`{sx_match.group(1)}`" if sx_match else "N/A"
impr = im_match.group(1) if im_match else "N/A"
gr = "✅" if "Sentiment Granger-causes" in md else "❌"
dm = "✅" if "SARIMAX+Ensemble significantly better" in md else "❌"
summary += f"| {var} | {ar_r} | {sx_r} | {impr} | {gr} | {dm} |\n"
all_details += f"\n---\n## Detail -- {var}\n\n{md}\n"
summary += (
"\n---\n### Thesis Conclusion\n\n"
"The comparative analysis across all four macroeconomic indicators confirms that "
"integrating a multi-model sentiment index (FinBERT 40% + XLM-RoBERTa 30% + Lexicon 30%) "
"as an exogenous variable in SARIMAX consistently improves forecast accuracy over baseline ARIMA. "
"Granger causality tests validate the predictive power of document-level sentiment, "
"while Diebold-Mariano tests confirm statistical significance of the improvement. "
"These results support the thesis that NLP-augmented econometric models outperform "
"traditional time-series methods for emerging-market macroeconomic forecasting.\n"
)
return summary + all_details
# ============================================================
# GRADIO UI
# ============================================================
CSS = """
.gradio-container { max-width: 1100px !important; margin: auto; }
.tab-nav button { font-size: 13px !important; }
footer { display: none !important; }
"""
with gr.Blocks(
theme=gr.themes.Soft(primary_hue="blue", secondary_hue="cyan"),
css=CSS,
title="📊 EcoSentRAG",
) as demo:
gr.Markdown(
"# 📊 EcoSentRAG\n"
"### Multilingual Economic Forecast + RAG + Sentiment Analysis\n"
"_Upload economic reports → Build index → Chat / Search / Forecast_"
)
# ── TAB 1 ────────────────────────────────────────
with gr.Tab("📂 1 · Upload & Index"):
gr.Markdown("### Upload economic reports (PDF, DOCX, TXT, CSV)")
file_upload = gr.File(
label="Upload files", file_count="multiple",
file_types=[".pdf",".docx",".txt",".csv"], type="filepath",
)
gr.HTML("""
<div id="chrono-box" style="display:none;background:#1e1e2e;border:2px solid #4CAF50;border-radius:12px;padding:16px 24px;margin:10px 0;font-family:monospace;text-align:center;">
<div style="color:#aaa;font-size:13px;margin-bottom:6px;">⏱️ Build Index -- Elapsed Time</div>
<div id="chrono-display" style="font-size:42px;font-weight:bold;color:#4CAF50;letter-spacing:4px;">00:00.0</div>
<div id="chrono-status" style="color:#FFC107;font-size:13px;margin-top:8px;">🔄 Processing...</div>
<div style="margin-top:10px;">
<div style="background:#333;border-radius:6px;height:8px;width:100%;">
<div id="chrono-bar" style="background:linear-gradient(90deg,#4CAF50,#00bcd4);border-radius:6px;height:8px;width:0%;transition:width 0.3s ease;"></div>
</div>
</div>
</div>
<script>
(function(){
let _interval=null,_start=null;
function fmt(ms){
let s=Math.floor(ms/1000),dec=Math.floor((ms%1000)/100);
let min=Math.floor(s/60),sec=s%60;
return String(min).padStart(2,'0')+':'+String(sec).padStart(2,'0')+'.'+dec;
}
function startChrono(){
_start=Date.now();
document.getElementById('chrono-box').style.display='block';
document.getElementById('chrono-status').innerText='🔄 Processing...';
document.getElementById('chrono-status').style.color='#FFC107';
document.getElementById('chrono-bar').style.width='0%';
_interval=setInterval(function(){
let e=Date.now()-_start;
document.getElementById('chrono-display').innerText=fmt(e);
document.getElementById('chrono-bar').style.width=Math.min(99,e/1200)+'%';
},100);
}
function stopChrono(){
if(_interval){clearInterval(_interval);_interval=null;}
if(_start){document.getElementById('chrono-display').innerText=fmt(Date.now()-_start);}
document.getElementById('chrono-bar').style.width='100%';
document.getElementById('chrono-bar').style.background='#4CAF50';
document.getElementById('chrono-status').innerText='✅ Done!';
document.getElementById('chrono-status').style.color='#4CAF50';
}
function attach(){
document.querySelectorAll('button').forEach(function(btn){
if(btn.innerText.includes('Build Index')&&!btn._ca){
btn._ca=true;
btn.addEventListener('click',startChrono);
}
});
}
new MutationObserver(function(){attach();}).observe(document.body,{childList:true,subtree:true});
setTimeout(attach,2000);
setTimeout(function(){
new MutationObserver(function(ms){
ms.forEach(function(m){
m.addedNodes.forEach(function(nd){
if(nd.textContent&&(nd.textContent.includes('built in')||nd.textContent.includes('❌'))){
stopChrono();
}
});
});
}).observe(document.body,{childList:true,subtree:true});
},1000);
})();
</script>
""")
with gr.Row():
build_btn = gr.Button("🔨 Build Index", variant="primary")
load_btn = gr.Button("📂 Load Saved Index")
index_out = gr.Markdown()
file_upload.change(
fn=build_index, inputs=[file_upload],
outputs=[index_out], show_progress="full",
)
build_btn.click(
fn=build_index, inputs=[file_upload],
outputs=[index_out], show_progress="full",
)
load_btn.click(fn=load_saved_index, outputs=[index_out])
# ── TAB 2 ────────────────────────────────────────
with gr.Tab("💬 2 · Chat"):
gr.Markdown("### Ask questions about your documents")
chatbot = gr.Chatbot(type="messages", height=420, bubble_full_width=False)
chat_input = gr.Textbox(
placeholder="Ask a question (English or Arabic)...",
show_label=False, lines=2,
)
with gr.Row():
chat_send = gr.Button("📨 Send", variant="primary")
chat_clear = gr.Button("🗑️ Clear")
chat_audio_btn = gr.Button("🔊 TTS")
chat_export_btn = gr.Button("💾 Export Chat")
audio_out = gr.Audio(label="🔊 TTS", autoplay=True, visible=False)
chat_export_out = gr.File(label="📥 Chat Export")
_history_state = gr.State([])
def chat_fn(message, history_state):
if not message or not message.strip():
return history_state, history_state, ""
answer, _ = smart_answer(message, history_state)
history_state.append({"role": "user", "content": message})
history_state.append({"role": "assistant", "content": answer})
return history_state, history_state, ""
chat_send.click(fn=chat_fn, inputs=[chat_input, _history_state],
outputs=[chatbot, _history_state, chat_input])
chat_input.submit(fn=chat_fn, inputs=[chat_input, _history_state],
outputs=[chatbot, _history_state, chat_input])
chat_clear.click(fn=lambda: ([], [], ""), outputs=[chatbot, _history_state, chat_input])
chat_audio_btn.click(
fn=lambda h: tts_output(h[-1]["content"] if h else ""),
inputs=[_history_state], outputs=[audio_out],
)
audio_out.change(fn=lambda: gr.update(visible=True), outputs=[audio_out])
chat_export_btn.click(fn=export_chat, inputs=[_history_state], outputs=[chat_export_out])
# ── TAB 3 ────────────────────────────────────────
with gr.Tab("🔍 3 · Search & Sentiment"):
gr.Markdown("### Keyword / Semantic Search + Sentiment")
search_input = gr.Textbox(placeholder="Enter word, phrase, or sentence...",
label="Search Query", lines=2)
search_btn = gr.Button("🔍 Search & Analyze", variant="primary")
sent_label = gr.Label(label="Sentiment")
sent_conf = gr.Number(label="Confidence Score")
search_out = gr.Markdown()
audio_search = gr.Audio(label="🎤 Voice Search", type="filepath")
transcribe_btn = gr.Button("🎤 Transcribe")
transcribed_text = gr.Textbox(label="Transcribed Text", interactive=True)
search_btn.click(fn=predict_with_rag, inputs=[search_input],
outputs=[sent_label, sent_conf, search_out])
transcribe_btn.click(fn=transcribe_audio, inputs=[audio_search],
outputs=[transcribed_text])
transcribed_text.change(fn=predict_with_rag, inputs=[transcribed_text],
outputs=[sent_label, sent_conf, search_out])
# ── TAB 4 ────────────────────────────────────────
with gr.Tab("😊 4 · Sentiment Analysis"):
gr.Markdown("### Detailed Ensemble Sentiment Analysis")
sent_text_in = gr.Textbox(placeholder="Paste economic text here...",
label="Text Input", lines=5)
sent_btn = gr.Button("🔬 Analyze Sentiment", variant="primary")
sent_detailed = gr.Markdown()
sent_btn.click(fn=run_sentiment_detailed, inputs=[sent_text_in], outputs=[sent_detailed])
# ── TAB 5 ────────────────────────────────────────
with gr.Tab("📊 5 · KB Stats"):
gr.Markdown("### Knowledge Base Statistics")
with gr.Row():
stats_btn = gr.Button("📊 Show Stats", variant="primary")
kw_btn = gr.Button("🔑 Show Keywords")
stats_out = gr.Markdown()
kw_out = gr.Markdown()
stats_btn.click(fn=get_kb_stats, outputs=[stats_out])
kw_btn.click(fn=get_keywords, outputs=[kw_out])
# ── TAB 6 ────────────────────────────────────────
with gr.Tab("📅 6 · Sentiment Timeline"):
gr.Markdown("### Document Sentiment Timeline (per year)")
timeline_btn = gr.Button("📊 Run Timeline", variant="primary")
timeline_out = gr.Markdown()
timeline_plot = gr.Plot()
def run_timeline():
df_files, df_yearly = build_doc_sentiment_index()
if df_files is None or df_files.empty:
return "_No documents indexed._", None
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
fig.suptitle("📊 Sentiment Timeline", fontsize=12, fontweight="bold")
clrs = ["#4CAF50" if s > 0.05 else "#FF5722" if s < -0.05 else "#FFC107"
for s in df_files["sentiment"]]
axes[0].bar(df_files["file"].astype(str).apply(lambda x: x[:20]),
df_files["sentiment"], color=clrs, edgecolor="white")
axes[0].axhline(0, color="black", lw=0.8)
axes[0].set_title("Per-File Sentiment", fontsize=10, fontweight="bold")
axes[0].tick_params(axis="x", rotation=45, labelsize=7)
axes[0].grid(True, alpha=0.3, axis="y")
if df_yearly is not None and not df_yearly.empty:
clrs2 = ["#4CAF50" if s > 0.05 else "#FF5722" if s < -0.05 else "#FFC107"
for s in df_yearly["sentiment"]]
axes[1].bar(df_yearly["year"].astype(str), df_yearly["sentiment"],
color=clrs2, edgecolor="white", width=0.6)
axes[1].axhline(0, color="black", lw=0.8)
axes[1].set_title("Yearly Sentiment", fontsize=10, fontweight="bold")
axes[1].tick_params(axis="x", rotation=45, labelsize=8)
axes[1].grid(True, alpha=0.3, axis="y")
plt.tight_layout()
tbl = "### 📋 Sentiment per File\n\n| File | Year | Sentiment | Label |\n|---|---|---|---|\n"
for _, row in df_files.iterrows():
tbl += f"| `{str(row['file'])[:40]}` | {row['year']} | `{row['sentiment']:+.4f}` | {row['label']} |\n"
return tbl, fig
timeline_btn.click(fn=run_timeline, outputs=[timeline_out, timeline_plot])
# ── TAB 7 ────────────────────────────────────────
with gr.Tab("📈 7 · Forecast"):
gr.Markdown("### Economic Forecast -- ARIMA vs SARIMAX+Sentiment")
with gr.Row():
country_input = gr.Textbox(label="Country Code (ISO2)", value="DZ", scale=1)
variable_sel = gr.Dropdown(
choices=["Inflation (CPI %)", "GDP Growth (%)", "Unemployment (%)", "Exchange Rate"],
value="Inflation (CPI %)", label="Variable", scale=2,
)
with gr.Row():
start_year = gr.Number(label="Start Year", value=1990, precision=0)
end_year = gr.Number(label="End Year", value=2023, precision=0)
with gr.Row():
forecast_btn = gr.Button("📊 Run Forecast", variant="primary")
run_all_btn = gr.Button("🚀 Run All 4 Variables", variant="primary")
forecast_out = gr.Markdown()
forecast_plot = gr.Image(label="📊 Forecast Chart", type="filepath")
all_vars_out = gr.Markdown()
forecast_btn.click(
fn=run_economic_forecast,
inputs=[country_input, variable_sel, start_year, end_year],
outputs=[forecast_out, forecast_plot],
)
run_all_btn.click(
fn=run_all_variables,
inputs=[country_input, start_year, end_year],
outputs=[all_vars_out],
)
# ============================================================
# LAUNCH
# ============================================================
demo.launch(server_name="0.0.0.0", server_port=7860)