RAG1 / extensions.py
wassim2433's picture
inital
fed9d9d
import os
import subprocess
import pandas as pd
import folium
from wordcloud import WordCloud
import matplotlib.pyplot as plt
from textblob import TextBlob
from gtts import gTTS
import speech_recognition as sr
from deep_translator import GoogleTranslator
from collections import Counter
import re
import json
# We will import _index from rag_pipeline to get documents
import rag_pipeline
# ── Map Generation ──────────────────────────────────────────────
LOCATIONS = [
{"name": "Jerusalem (Al-Quds)", "lat": 31.7683, "lon": 35.2137,
"query": "Jerusalem Al-Quds occupation history destruction",
"desc": "The capital of Palestine, central to its history, culture, and religious identity."},
{"name": "Gaza", "lat": 31.5017, "lon": 34.4668,
"query": "Gaza destruction casualties humanitarian crisis displaced",
"desc": "One of the oldest cities; subject of military operations and humanitarian siege."},
{"name": "Ramallah", "lat": 31.9038, "lon": 35.2034,
"query": "Ramallah West Bank Palestinian Authority",
"desc": "A major Palestinian cultural and political center in the West Bank."},
{"name": "Hebron (Al-Khalil)", "lat": 31.5326, "lon": 35.0998,
"query": "Hebron Al-Khalil settlements occupation",
"desc": "A historic city known for the Ibrahimi Mosque and traditional crafts."},
{"name": "Nablus", "lat": 32.2211, "lon": 35.2544,
"query": "Nablus West Bank raids settlements",
"desc": "Famous for its traditional soap, knafeh, and historic old city."},
{"name": "Haifa", "lat": 32.7940, "lon": 34.9896,
"query": "Haifa Nakba 1948 Palestinian expelled",
"desc": "A historic coastal city, largely depopulated during the 1948 Nakba."},
{"name": "Jaffa (Yafa)", "lat": 32.0504, "lon": 34.7522,
"query": "Jaffa Yafa Nakba 1948 destruction port expelled",
"desc": "Historically one of Palestine's most important port cities, depopulated in 1948."},
{"name": "Rafah", "lat": 31.2956, "lon": 34.2527,
"query": "Rafah crossing humanitarian aid evacuation bombardment",
"desc": "A border city in southern Gaza; key crossing for humanitarian aid."},
{"name": "Khan Yunis", "lat": 31.3436, "lon": 34.3061,
"query": "Khan Yunis destruction bombardment casualties",
"desc": "One of Gaza's largest cities, heavily affected by military operations."},
{"name": "Jenin", "lat": 32.4641, "lon": 35.2961,
"query": "Jenin refugee camp military operation incursion",
"desc": "Home to one of the West Bank's largest refugee camps."},
]
def _get_location_facts(query: str) -> str:
"""Retrieve document excerpts relevant to a location. Returns formatted HTML."""
if rag_pipeline._retriever is None:
return ""
try:
nodes = rag_pipeline._retriever.retrieve(query)
if not nodes:
return ""
snippets = []
seen = set()
for node in nodes[:3]:
text = node.node.get_content()[:280].strip().replace("\n", " ")
source = node.node.metadata.get("source", "")
page = node.node.metadata.get("page_number", "?")
key = (source, page)
if key in seen:
continue
seen.add(key)
src_label = (source[:45] + "...") if len(source) > 45 else source
snippets.append(
f'<blockquote style="font-size:11px;margin:4px 0;border-left:3px solid #c00;'
f'padding-left:6px;color:#222;">'
f'"{text}..."<br>'
f'<i style="color:#666;">&#8212; {src_label}, p.{page}</i>'
f'</blockquote>'
)
return "".join(snippets)
except Exception:
return ""
def generate_map():
m = folium.Map(location=[31.5, 34.8], zoom_start=8, tiles="CartoDB positron")
for loc in LOCATIONS:
doc_facts = _get_location_facts(loc["query"])
popup_html = (
f'<div style="font-family:Arial,sans-serif;max-width:340px;direction:auto;">'
f'<h4 style="margin:0 0 6px;color:#1a1a1a;">{loc["name"]}</h4>'
f'<p style="font-size:12px;color:#333;margin:0 0 8px;">{loc["desc"]}</p>'
)
if doc_facts:
popup_html += (
f'<hr style="border:none;border-top:1px solid #ddd;margin:6px 0;">'
f'<p style="font-size:11px;font-weight:bold;color:#c00;margin:0 0 4px;">'
f'&#128196; From the Documents:</p>'
f'{doc_facts}'
)
popup_html += "</div>"
folium.Marker(
location=[loc["lat"], loc["lon"]],
popup=folium.Popup(popup_html, max_width=360),
tooltip=folium.Tooltip(loc["name"], sticky=True),
icon=folium.Icon(color="red", icon="info-sign"),
).add_to(m)
map_path = "palestine_map.html"
m.save(map_path)
return map_path
# ── Timeline Generation ──────────────────────────────────────────────
def generate_timeline():
timeline_html = """
<div style="font-family: Arial, sans-serif; padding: 20px;">
<h3>Historical Timeline of the Palestinian Cause</h3>
<ul style="border-left: 2px solid #333; padding-left: 20px;">
<li style="margin-bottom: 10px;"><b>1917:</b> Balfour Declaration issued by the British government.</li>
<li style="margin-bottom: 10px;"><b>1947:</b> UN General Assembly adopts Resolution 181 (Partition Plan).</li>
<li style="margin-bottom: 10px;"><b>1948:</b> The Nakba (Catastrophe); hundreds of thousands of Palestinians displaced.</li>
<li style="margin-bottom: 10px;"><b>1967:</b> The Naksa (Setback); occupation of the West Bank, Gaza, and East Jerusalem.</li>
<li style="margin-bottom: 10px;"><b>1987:</b> The First Intifada begins.</li>
<li style="margin-bottom: 10px;"><b>1993:</b> Oslo Accords signed.</li>
<li style="margin-bottom: 10px;"><b>2000:</b> The Second Intifada begins.</li>
<li style="margin-bottom: 10px;"><b>Present:</b> Ongoing struggle for self-determination and human rights.</li>
</ul>
</div>
"""
return timeline_html
# ── Word Cloud Generation ──────────────────────────────────────────────
def generate_wordcloud(doc_name="All"):
if rag_pipeline._index is None:
return None
docstore = rag_pipeline._index.docstore
nodes = list(docstore.docs.values())
text = ""
for node in nodes:
if doc_name is None or doc_name == "All" or node.metadata.get("source") == doc_name:
text += node.get_content() + " "
if not text.strip():
# Fallback if no text
text = "Palestine History Culture Rights Peace Justice Freedom"
wordcloud = WordCloud(width=800, height=400, background_color='white').generate(text)
plt.figure(figsize=(10, 5))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
img_path = "wordcloud.png"
plt.savefig(img_path, bbox_inches='tight')
plt.close()
return img_path
# ── Statistics Generation ──────────────────────────────────────────────
def get_statistics():
if rag_pipeline._index is None:
return pd.DataFrame(), pd.DataFrame()
docstore = rag_pipeline._index.docstore
nodes = list(docstore.docs.values())
data = []
for node in nodes:
source = node.metadata.get("source", "Unknown")
page = node.metadata.get("page_number", 0)
length = len(node.get_content())
data.append({"Source": source, "Page": page, "Length": length})
df = pd.DataFrame(data)
if df.empty:
return pd.DataFrame(), pd.DataFrame()
stats_df = df.groupby('Source').agg(
Chunks=('Source', 'count'),
Total_Length=('Length', 'sum'),
Avg_Length=('Length', 'mean')
).reset_index()
return stats_df, df
# ── Advanced Analytics ──────────────────────────────────────────────
def advanced_analytics(text):
if not text or not text.strip():
return "No text provided for analysis."
blob = TextBlob(text)
sentiment = blob.sentiment
sentiment_str = f"Polarity: {sentiment.polarity:.2f} (Negative < 0 < Positive), Subjectivity: {sentiment.subjectivity:.2f} (Objective < 0.5 < Subjective)"
words = re.findall(r'\b[A-Z][a-z]+\b', text)
freq = Counter(words)
common_entities = freq.most_common(10)
analytics_report = f"### Sentiment Analysis\n{sentiment_str}\n\n"
analytics_report += "### Frequent Capitalized Entities (Heuristic)\n"
for ent, count in common_entities:
analytics_report += f"- **{ent}**: {count}\n"
return analytics_report
# ── Audio Features ──────────────────────────────────────────────
def text_to_speech(text, lang='en'):
try:
tts = gTTS(text=text, lang=lang)
output_path = "output_audio.mp3"
tts.save(output_path)
return output_path
except Exception as e:
print(f"TTS Error: {e}")
return None
def speech_to_text(audio_path):
if not audio_path:
return ""
wav_path = audio_path
if not audio_path.lower().endswith(".wav"):
wav_path = "temp_stt.wav"
try:
subprocess.run(["ffmpeg", "-y", "-i", audio_path, wav_path], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except Exception as e:
error_msg = f"[Voice Input Error: Audio conversion failed: {str(e)}]"
print(error_msg)
return error_msg
r = sr.Recognizer()
try:
with sr.AudioFile(wav_path) as source:
audio_data = r.record(source)
text = r.recognize_google(audio_data)
return text
except Exception as e:
error_msg = f"[Voice Input Error: {str(e)}]"
print(error_msg)
return error_msg
# ── Translation ──────────────────────────────────────────────
def translate_text(text, target_lang='en'):
try:
translator = GoogleTranslator(source='auto', target=target_lang)
return translator.translate(text)
except Exception as e:
print(f"Translation Error: {e}")
return text
# ── Export Chat ──────────────────────────────────────────────
def export_chat_history(history):
if not history:
return None
# Convert objects to dicts if necessary for JSON serialization
cleaned_history = []
for msg in history:
if isinstance(msg, dict):
cleaned_history.append(msg)
else:
cleaned_history.append({"role": getattr(msg, "role", "unknown"), "content": getattr(msg, "content", "")})
file_path = "chat_history.json"
with open(file_path, "w", encoding="utf-8") as f:
json.dump(cleaned_history, f, ensure_ascii=False, indent=4)
return file_path