Spaces:
Sleeping
Sleeping
| import os | |
| import subprocess | |
| import pandas as pd | |
| import folium | |
| from wordcloud import WordCloud | |
| import matplotlib.pyplot as plt | |
| from textblob import TextBlob | |
| from gtts import gTTS | |
| import speech_recognition as sr | |
| from deep_translator import GoogleTranslator | |
| from collections import Counter | |
| import re | |
| import json | |
| # We will import _index from rag_pipeline to get documents | |
| import rag_pipeline | |
| # ββ Map Generation ββββββββββββββββββββββββββββββββββββββββββββββ | |
| LOCATIONS = [ | |
| {"name": "Jerusalem (Al-Quds)", "lat": 31.7683, "lon": 35.2137, | |
| "query": "Jerusalem Al-Quds occupation history destruction", | |
| "desc": "The capital of Palestine, central to its history, culture, and religious identity."}, | |
| {"name": "Gaza", "lat": 31.5017, "lon": 34.4668, | |
| "query": "Gaza destruction casualties humanitarian crisis displaced", | |
| "desc": "One of the oldest cities; subject of military operations and humanitarian siege."}, | |
| {"name": "Ramallah", "lat": 31.9038, "lon": 35.2034, | |
| "query": "Ramallah West Bank Palestinian Authority", | |
| "desc": "A major Palestinian cultural and political center in the West Bank."}, | |
| {"name": "Hebron (Al-Khalil)", "lat": 31.5326, "lon": 35.0998, | |
| "query": "Hebron Al-Khalil settlements occupation", | |
| "desc": "A historic city known for the Ibrahimi Mosque and traditional crafts."}, | |
| {"name": "Nablus", "lat": 32.2211, "lon": 35.2544, | |
| "query": "Nablus West Bank raids settlements", | |
| "desc": "Famous for its traditional soap, knafeh, and historic old city."}, | |
| {"name": "Haifa", "lat": 32.7940, "lon": 34.9896, | |
| "query": "Haifa Nakba 1948 Palestinian expelled", | |
| "desc": "A historic coastal city, largely depopulated during the 1948 Nakba."}, | |
| {"name": "Jaffa (Yafa)", "lat": 32.0504, "lon": 34.7522, | |
| "query": "Jaffa Yafa Nakba 1948 destruction port expelled", | |
| "desc": "Historically one of Palestine's most important port cities, depopulated in 1948."}, | |
| {"name": "Rafah", "lat": 31.2956, "lon": 34.2527, | |
| "query": "Rafah crossing humanitarian aid evacuation bombardment", | |
| "desc": "A border city in southern Gaza; key crossing for humanitarian aid."}, | |
| {"name": "Khan Yunis", "lat": 31.3436, "lon": 34.3061, | |
| "query": "Khan Yunis destruction bombardment casualties", | |
| "desc": "One of Gaza's largest cities, heavily affected by military operations."}, | |
| {"name": "Jenin", "lat": 32.4641, "lon": 35.2961, | |
| "query": "Jenin refugee camp military operation incursion", | |
| "desc": "Home to one of the West Bank's largest refugee camps."}, | |
| ] | |
| def _get_location_facts(query: str) -> str: | |
| """Retrieve document excerpts relevant to a location. Returns formatted HTML.""" | |
| if rag_pipeline._retriever is None: | |
| return "" | |
| try: | |
| nodes = rag_pipeline._retriever.retrieve(query) | |
| if not nodes: | |
| return "" | |
| snippets = [] | |
| seen = set() | |
| for node in nodes[:3]: | |
| text = node.node.get_content()[:280].strip().replace("\n", " ") | |
| source = node.node.metadata.get("source", "") | |
| page = node.node.metadata.get("page_number", "?") | |
| key = (source, page) | |
| if key in seen: | |
| continue | |
| seen.add(key) | |
| src_label = (source[:45] + "...") if len(source) > 45 else source | |
| snippets.append( | |
| f'<blockquote style="font-size:11px;margin:4px 0;border-left:3px solid #c00;' | |
| f'padding-left:6px;color:#222;">' | |
| f'"{text}..."<br>' | |
| f'<i style="color:#666;">— {src_label}, p.{page}</i>' | |
| f'</blockquote>' | |
| ) | |
| return "".join(snippets) | |
| except Exception: | |
| return "" | |
| def generate_map(): | |
| m = folium.Map(location=[31.5, 34.8], zoom_start=8, tiles="CartoDB positron") | |
| for loc in LOCATIONS: | |
| doc_facts = _get_location_facts(loc["query"]) | |
| popup_html = ( | |
| f'<div style="font-family:Arial,sans-serif;max-width:340px;direction:auto;">' | |
| f'<h4 style="margin:0 0 6px;color:#1a1a1a;">{loc["name"]}</h4>' | |
| f'<p style="font-size:12px;color:#333;margin:0 0 8px;">{loc["desc"]}</p>' | |
| ) | |
| if doc_facts: | |
| popup_html += ( | |
| f'<hr style="border:none;border-top:1px solid #ddd;margin:6px 0;">' | |
| f'<p style="font-size:11px;font-weight:bold;color:#c00;margin:0 0 4px;">' | |
| f'📄 From the Documents:</p>' | |
| f'{doc_facts}' | |
| ) | |
| popup_html += "</div>" | |
| folium.Marker( | |
| location=[loc["lat"], loc["lon"]], | |
| popup=folium.Popup(popup_html, max_width=360), | |
| tooltip=folium.Tooltip(loc["name"], sticky=True), | |
| icon=folium.Icon(color="red", icon="info-sign"), | |
| ).add_to(m) | |
| map_path = "palestine_map.html" | |
| m.save(map_path) | |
| return map_path | |
| # ββ Timeline Generation ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def generate_timeline(): | |
| timeline_html = """ | |
| <div style="font-family: Arial, sans-serif; padding: 20px;"> | |
| <h3>Historical Timeline of the Palestinian Cause</h3> | |
| <ul style="border-left: 2px solid #333; padding-left: 20px;"> | |
| <li style="margin-bottom: 10px;"><b>1917:</b> Balfour Declaration issued by the British government.</li> | |
| <li style="margin-bottom: 10px;"><b>1947:</b> UN General Assembly adopts Resolution 181 (Partition Plan).</li> | |
| <li style="margin-bottom: 10px;"><b>1948:</b> The Nakba (Catastrophe); hundreds of thousands of Palestinians displaced.</li> | |
| <li style="margin-bottom: 10px;"><b>1967:</b> The Naksa (Setback); occupation of the West Bank, Gaza, and East Jerusalem.</li> | |
| <li style="margin-bottom: 10px;"><b>1987:</b> The First Intifada begins.</li> | |
| <li style="margin-bottom: 10px;"><b>1993:</b> Oslo Accords signed.</li> | |
| <li style="margin-bottom: 10px;"><b>2000:</b> The Second Intifada begins.</li> | |
| <li style="margin-bottom: 10px;"><b>Present:</b> Ongoing struggle for self-determination and human rights.</li> | |
| </ul> | |
| </div> | |
| """ | |
| return timeline_html | |
| # ββ Word Cloud Generation ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def generate_wordcloud(doc_name="All"): | |
| if rag_pipeline._index is None: | |
| return None | |
| docstore = rag_pipeline._index.docstore | |
| nodes = list(docstore.docs.values()) | |
| text = "" | |
| for node in nodes: | |
| if doc_name is None or doc_name == "All" or node.metadata.get("source") == doc_name: | |
| text += node.get_content() + " " | |
| if not text.strip(): | |
| # Fallback if no text | |
| text = "Palestine History Culture Rights Peace Justice Freedom" | |
| wordcloud = WordCloud(width=800, height=400, background_color='white').generate(text) | |
| plt.figure(figsize=(10, 5)) | |
| plt.imshow(wordcloud, interpolation='bilinear') | |
| plt.axis('off') | |
| img_path = "wordcloud.png" | |
| plt.savefig(img_path, bbox_inches='tight') | |
| plt.close() | |
| return img_path | |
| # ββ Statistics Generation ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_statistics(): | |
| if rag_pipeline._index is None: | |
| return pd.DataFrame(), pd.DataFrame() | |
| docstore = rag_pipeline._index.docstore | |
| nodes = list(docstore.docs.values()) | |
| data = [] | |
| for node in nodes: | |
| source = node.metadata.get("source", "Unknown") | |
| page = node.metadata.get("page_number", 0) | |
| length = len(node.get_content()) | |
| data.append({"Source": source, "Page": page, "Length": length}) | |
| df = pd.DataFrame(data) | |
| if df.empty: | |
| return pd.DataFrame(), pd.DataFrame() | |
| stats_df = df.groupby('Source').agg( | |
| Chunks=('Source', 'count'), | |
| Total_Length=('Length', 'sum'), | |
| Avg_Length=('Length', 'mean') | |
| ).reset_index() | |
| return stats_df, df | |
| # ββ Advanced Analytics ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def advanced_analytics(text): | |
| if not text or not text.strip(): | |
| return "No text provided for analysis." | |
| blob = TextBlob(text) | |
| sentiment = blob.sentiment | |
| sentiment_str = f"Polarity: {sentiment.polarity:.2f} (Negative < 0 < Positive), Subjectivity: {sentiment.subjectivity:.2f} (Objective < 0.5 < Subjective)" | |
| words = re.findall(r'\b[A-Z][a-z]+\b', text) | |
| freq = Counter(words) | |
| common_entities = freq.most_common(10) | |
| analytics_report = f"### Sentiment Analysis\n{sentiment_str}\n\n" | |
| analytics_report += "### Frequent Capitalized Entities (Heuristic)\n" | |
| for ent, count in common_entities: | |
| analytics_report += f"- **{ent}**: {count}\n" | |
| return analytics_report | |
| # ββ Audio Features ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def text_to_speech(text, lang='en'): | |
| try: | |
| tts = gTTS(text=text, lang=lang) | |
| output_path = "output_audio.mp3" | |
| tts.save(output_path) | |
| return output_path | |
| except Exception as e: | |
| print(f"TTS Error: {e}") | |
| return None | |
| def speech_to_text(audio_path): | |
| if not audio_path: | |
| return "" | |
| wav_path = audio_path | |
| if not audio_path.lower().endswith(".wav"): | |
| wav_path = "temp_stt.wav" | |
| try: | |
| subprocess.run(["ffmpeg", "-y", "-i", audio_path, wav_path], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| except Exception as e: | |
| error_msg = f"[Voice Input Error: Audio conversion failed: {str(e)}]" | |
| print(error_msg) | |
| return error_msg | |
| r = sr.Recognizer() | |
| try: | |
| with sr.AudioFile(wav_path) as source: | |
| audio_data = r.record(source) | |
| text = r.recognize_google(audio_data) | |
| return text | |
| except Exception as e: | |
| error_msg = f"[Voice Input Error: {str(e)}]" | |
| print(error_msg) | |
| return error_msg | |
| # ββ Translation ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def translate_text(text, target_lang='en'): | |
| try: | |
| translator = GoogleTranslator(source='auto', target=target_lang) | |
| return translator.translate(text) | |
| except Exception as e: | |
| print(f"Translation Error: {e}") | |
| return text | |
| # ββ Export Chat ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def export_chat_history(history): | |
| if not history: | |
| return None | |
| # Convert objects to dicts if necessary for JSON serialization | |
| cleaned_history = [] | |
| for msg in history: | |
| if isinstance(msg, dict): | |
| cleaned_history.append(msg) | |
| else: | |
| cleaned_history.append({"role": getattr(msg, "role", "unknown"), "content": getattr(msg, "content", "")}) | |
| file_path = "chat_history.json" | |
| with open(file_path, "w", encoding="utf-8") as f: | |
| json.dump(cleaned_history, f, ensure_ascii=False, indent=4) | |
| return file_path | |