erfan / main_app.py
digitalai's picture
Update main_app.py
fe45014 verified
import io
import random
import shutil
import string
from zipfile import ZipFile
import streamlit as st
from streamlit_extras.colored_header import colored_header
from streamlit_extras.add_vertical_space import add_vertical_space
from hugchat import hugchat
from hugchat.login import Login
import pandas as pd
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
import sketch
from langchain.text_splitter import CharacterTextSplitter
# فرض می‌کنیم فایل‌های promptTemplate و exportchat وجود دارند
from promptTemplate import prompt4conversation, prompt4Data, prompt4Code, prompt4Context, prompt4Audio, prompt4YT
from promptTemplate import prompt4conversationInternet
# FOR DEVELOPMENT NEW PLUGIN
# from promptTemplate import yourPLUGIN
from exportchat import export_chat
from langchain.vectorstores import Chroma
from langchain.chains import RetrievalQA
from HuggingChatAPI import HuggingChat # فرض می‌کنیم این فایل وجود دارد
from langchain.embeddings import HuggingFaceHubEmbeddings
from youtube_transcript_api import YouTubeTranscriptApi
import requests
from bs4 import BeautifulSoup
import speech_recognition as sr
import pdfplumber
import docx2txt
from duckduckgo_search import DDGS
from itertools import islice
from os import path
from pydub import AudioSegment
import os
hf = None
repo_id = "sentence-transformers/all-mpnet-base-v2"
# --- تنظیمات اولیه صفحه ---
st.set_page_config(
page_title="گفتگو با همه چیز 💬", page_icon="🤗", layout="wide", initial_sidebar_state="expanded"
)
# --- استایل‌دهی (شامل RTL و استایل‌های سفارشی) ---
st.markdown("""
<style>
/* تنظیمات کلی برای راست‌چین */
html, body, [class*="st-"] {
direction: rtl !important;
text-align: right !important;
}
/* تنظیمات خاص برای برخی عناصر که ممکن است به چپ بچسبند */
.stButton>button, .stDownloadButton>button, .stFileUploader label, .stTextInput label, .stTextArea label, .stSelectbox label, .stSlider label, .stCheckbox label {
text-align: right !important;
width: 100%; /* حفظ عرض کامل دکمه‌ها و عناصر فرم */
}
/* اطمینان از راست‌چین بودن محتوای اصلی و سایدبار */
.main .block-container, .stSidebar .stBlock {
text-align: right !important;
}
/* راست‌چین کردن عناوین، پاراگراف‌ها و دیگر عناصر متنی */
h1, h2, h3, h4, h5, h6, p, li, div, span, label {
text-align: right !important;
font-family: 'Vazirmatn', sans-serif !important; /* فونت فارسی (اختیاری) */
}
/* استایل پیام‌های چت */
.stChatMessage {
text-align: right !important;
direction: rtl !important;
}
/* استایل ورودی چت */
.stChatInput {
direction: rtl !important;
}
.stChatInput textarea {
text-align: right !important;
direction: rtl !important;
font-family: 'Vazirmatn', sans-serif !important; /* فونت فارسی برای ورودی */
}
/* چپ‌چین کردن آواتارها در چت برای ظاهر بهتر */
.stChatMessage [data-testid="chatAvatarIcon-user"], .stChatMessage [data-testid="chatAvatarIcon-assistant"] {
margin-right: 0;
margin-left: 0.5rem; /* فاصله آواتار از متن در حالت RTL */
float: right; /* انتقال آواتار به سمت راست */
}
.stChatMessage [data-testid="chatMessageContent"] {
width: calc(100% - 50px); /* تنظیم عرض متن پیام برای جا دادن آواتار */
float: left; /* انتقال متن پیام به سمت چپ آواتار */
}
/* رفع مشکل احتمالی عرض ۱۰۰٪ برای برخی عناصر داخلی استریملیت */
.css-w770g5, .css-b3z5c9 {
width: 100%;
}
</style>
""", unsafe_allow_html=True)
# بارگذاری توکن Hugging Face در صورت وجود
if 'hf_token' in st.session_state:
if 'hf' not in st.session_state:
try:
hf = HuggingFaceHubEmbeddings(
repo_id=repo_id,
task="feature-extraction",
huggingfacehub_api_token=st.session_state['hf_token'],
) # type: ignore
st.session_state['hf'] = hf
except Exception as e:
st.sidebar.error(f"خطا در ایجاد Embedding: {e}")
st.sidebar.warning("توکن Hugging Face API خود را بررسی کنید.")
# --- نوار کناری ---
with st.sidebar:
st.title('🤗💬 اپلیکیشن چت شخصی')
st.markdown("---")
if 'hf_email' not in st.session_state or 'hf_pass' not in st.session_state:
with st.expander("ℹ️ ورود به Hugging Face", expanded=True):
st.write("⚠️ برای استفاده از این برنامه باید وارد حساب Hugging Face خود شوید. می‌توانید از [اینجا](https://huggingface.co/join) ثبت‌نام کنید.")
st.header('ورود به Hugging Face')
hf_email = st.text_input('ایمیل خود را وارد کنید:')
hf_pass = st.text_input('رمز عبور خود را وارد کنید:', type='password')
hf_token = st.text_input('توکن API خود را وارد کنید (ضروری برای Embedding):', type='password')
if st.button('ورود 🚀') and hf_email and hf_pass and hf_token:
with st.spinner('🚀 در حال ورود...'):
st.session_state['hf_email'] = hf_email
st.session_state['hf_pass'] = hf_pass
st.session_state['hf_token'] = hf_token
try:
# ورود به HuggingChat
sign = Login(st.session_state['hf_email'], st.session_state['hf_pass'])
if sign:
st.write("yes")
cookies = sign.login()
# مقداردهی اولیه ChatBot
chatbot = hugchat.ChatBot(cookies=cookies.get_dict())
st.session_state['chatbot'] = chatbot
# ایجاد مکالمه جدید
id = st.session_state['chatbot'].new_conversation()
st.session_state['chatbot'].change_conversation(id)
st.session_state['conversation'] = id
# مقداردهی اولیه تاریخچه چت
if 'generated' not in st.session_state:
st.session_state['generated'] = ["من **چت‌بات هوشمند** هستم، چگونه می‌توانم کمکتان کنم؟"]
if 'past' not in st.session_state:
st.session_state['past'] = ['سلام!']
# مقداردهی اولیه LLM برای Langchain (اگر HuggingChatAPI.py موجود باشد)
if 'HuggingChat' in globals():
st.session_state['LLM'] = HuggingChat(email=st.session_state['hf_email'], psw=st.session_state['hf_pass'])
else:
st.warning("کلاس HuggingChat برای Langchain یافت نشد.")
# مقداردهی اولیه Embedding
if 'hf' not in st.session_state:
hf = HuggingFaceHubEmbeddings(
repo_id=repo_id,
task="feature-extraction",
huggingfacehub_api_token=st.session_state['hf_token'],
) # type: ignore
st.session_state['hf'] = hf
else:
st.write("Error hf")
st.success("ورود با موفقیت انجام شد!")
st.experimental_rerun()
except Exception as e:
st.error(f"خطا در ورود: {e}")
from time import sleep
sleep(3)
# پاک کردن اطلاعات در صورت خطا
keys_to_delete = ['hf_email', 'hf_pass', 'hf_token', 'chatbot', 'conversation', 'generated', 'past', 'LLM', 'hf']
for key in keys_to_delete:
if key in st.session_state:
del st.session_state[key]
st.experimental_rerun()
else: # اگر کاربر وارد شده باشد
with st.expander("ℹ️ تنظیمات پیشرفته مدل زبان"):
# پارامترهای مدل زبان
temperature = st.slider('🌡 دما (Temperature)', min_value=0.1, max_value=1.0, value=0.5, step=0.01, help="مقادیر بالاتر خلاقیت بیشتر، مقادیر پایین‌تر تمرکز بیشتر.")
top_p = st.slider('💡 Top P', min_value=0.1, max_value=1.0, value=0.95, step=0.01, help="نمونه‌گیری هسته‌ای؛ مجموع احتمالات کلمات بعدی.")
repetition_penalty = st.slider('🖌 جریمه تکرار (Repetition Penalty)', min_value=1.0, max_value=2.0, value=1.2, step=0.01, help="جلوگیری از تکرار کلمات.")
top_k = st.slider('❄️ Top K', min_value=1, max_value=100, value=50, step=1, help="تعداد محتمل‌ترین کلمات بعدی برای در نظر گرفتن.")
max_new_tokens = st.slider('📝 حداکثر توکن‌های جدید (Max New Tokens)', min_value=1, max_value=1024, value=1024, step=1, help="حداکثر طول پاسخ تولید شده.")
st.markdown("---")
# انتخاب افزونه (پلاگین)
# نام پلاگین‌ها را فارسی می‌کنیم
plugins = ["🛑 بدون افزونه", "🌐 جستجوی وب", "🔗 گفتگو با وب‌سایت", "📋 گفتگو با داده‌های شما (CSV)", "📝 گفتگو با اسناد شما", "🎧 گفتگو با فایل صوتی شما", "🎥 گفتگو با ویدیوی یوتیوب", "🧠 حالت دانای کل (GOD MODE)", "💾 بارگذاری VectorStore ذخیره شده"]
# FOR DEVELOPMENT NEW PLUGIN YOU MUST ADD IT HERE INTO THE LIST
# YOU NEED ADD THE NAME AT 144 LINE (or adjusted line number)
if 'plugin' not in st.session_state:
st.session_state['plugin'] = st.selectbox('🔌 انتخاب افزونه', plugins, index=0)
else:
# پیدا کردن ایندکس پلاگین فعلی (اگر نامش تغییر کرده باشد باید مدیریت شود)
current_plugin_index = 0 # پیش‌فرض
try:
# سعی می‌کنیم ایندکس پلاگین فعلی را پیدا کنیم
# این بخش نیاز به نگاشت نام انگلیسی به فارسی دارد اگر session_state با نام انگلیسی ذخیره شده باشد
# برای سادگی فرض می‌کنیم session_state['plugin'] نام فارسی را نگه می‌دارد
current_plugin_index = plugins.index(st.session_state['plugin'])
except ValueError:
st.warning("افزونه قبلی یافت نشد، به حالت 'بدون افزونه' برمی‌گردیم.")
st.session_state['plugin'] = "🛑 بدون افزونه"
st.session_state['plugin'] = st.selectbox('🔌 انتخاب افزونه', plugins, index=current_plugin_index)
st.markdown("---")
# --- منطق نمایش تنظیمات هر پلاگین ---
# WEB SEARCH PLUGIN
if st.session_state['plugin'] == "🌐 جستجوی وب":
with st.expander("🌐 تنظیمات جستجوی وب", expanded=True):
if 'web_search' not in st.session_state or st.session_state.get('web_search') != 'True':
reg = ['us-en', 'uk-en', 'it-it', 'de-de', 'fr-fr', 'es-es', 'fa-ir'] # منطقه فارسی اضافه شد
sf = ['on', 'moderate', 'off']
tl = {'d': 'روز گذشته', 'w': 'هفته گذشته', 'm': 'ماه گذشته', 'y': 'سال گذشته'} # دیکشنری برای نمایش فارسی
# نمایش فارسی و ذخیره مقدار انگلیسی
st.session_state['timelimit_display'] = st.selectbox('📅 محدوده زمانی', list(tl.values()), index=1)
st.session_state['timelimit'] = list(tl.keys())[list(tl.values()).index(st.session_state['timelimit_display'])]
st.session_state['region'] = st.selectbox('🗺 منطقه', reg, index=reg.index('fa-ir') if 'fa-ir' in reg else 0) # پیش‌فرض فارسی
st.session_state['safesearch'] = st.selectbox('🚨 جستجوی ایمن', sf, index=1)
st.session_state['max_results'] = st.slider('📊 حداکثر نتایج', min_value=1, max_value=5, value=st.session_state.get('max_results', 2), step=1)
if st.button('🌐 ذخیره و فعال‌سازی جستجوی وب'):
st.session_state['web_search'] = "True"
st.experimental_rerun()
else: # اگر فعال باشد
st.success('🚀 جستجوی وب فعال است.')
st.write(f"🗺 منطقه: {st.session_state.get('region', 'نامشخص')}")
st.write(f"🚨 جستجوی ایمن: {st.session_state.get('safesearch', 'نامشخص')}")
st.write(f"📅 محدوده زمانی: {st.session_state.get('timelimit_display', 'نامشخص')}")
if st.button('🌐🛑 غیرفعال کردن جستجوی وب'):
keys_to_delete = ['web_search', 'region', 'safesearch', 'timelimit', 'timelimit_display', 'max_results']
for key in keys_to_delete:
if key in st.session_state:
del st.session_state[key]
st.session_state['plugin'] = "🛑 بدون افزونه" # بازگشت به حالت پیش‌فرض
st.experimental_rerun()
# GOD MODE PLUGIN
if st.session_state['plugin'] == "🧠 حالت دانای کل (GOD MODE)":
if 'god_mode' not in st.session_state:
with st.expander("🧠 تنظیمات حالت دانای کل", expanded=True):
topic = st.text_input('🔎 موضوع تحقیق', "هوش مصنوعی در امور مالی")
web_result_god = st.checkbox('🌐 جستجوی وب', value=True, disabled=True)
yt_result_god = st.checkbox('🎥 جستجوی یوتیوب', value=True, disabled=True)
website_result_god = st.checkbox('🔗 بررسی وب‌سایت‌ها', value=True, disabled=True)
deep_of_search_god = st.slider('📊 عمق جستجو (برای هر منبع)', min_value=1, max_value=5, value=2, step=1)
if st.button('🧠✅ جمع‌آوری دانش برای مدل'):
full_text_god = []
links_god = []
news_god = []
yt_ids_god = []
source_god = []
with st.spinner('🌐 در حال جستجو در وب...'):
internet_result_god = ""
internet_answer_god = ""
try:
with DDGS() as ddgs:
# جستجوی متنی
ddgs_gen = ddgs.text(topic, region="us-en", max_results=deep_of_search_god) # region را می‌توان پارامتریک کرد
for r in ddgs_gen:
l = r['href']
source_god.append(l)
links_god.append(l)
internet_result_god += str(r) + "\n\n"
full_text_god.append(r['body']) # اضافه کردن متن اصلی
# جستجوی اخبار
fast_answer = ddgs.news(topic, max_results=deep_of_search_god)
for r in fast_answer:
internet_answer_god += str(r) + "\n\n"
l = r['url']
source_god.append(l)
news_god.append(r)
full_text_god.append(r['body']) # اضافه کردن متن خبر
except Exception as e:
st.error(f"خطا در جستجوی وب (DDGS): {e}")
if yt_result_god:
with st.spinner('🎥 در حال جستجو در یوتیوب...'):
try:
from youtubesearchpython import VideosSearch
videosSearch = VideosSearch(topic, limit = deep_of_search_god)
yt_result_search = videosSearch.result()
for i in yt_result_search.get('result', []):
duration = i.get('duration', '0:0').split(':')
# فیلتر کردن ویدیوهای خیلی طولانی
try:
if len(duration) == 3 and int(duration[0]) > 0: # بیشتر از ۱ ساعت
continue
if len(duration) == 2 and int(duration[0]) > 30: # بیشتر از ۳۰ دقیقه
continue
except ValueError:
pass # رد شدن اگر فرمت زمان نامعتبر بود
video_id = i.get('id')
if video_id:
yt_ids_god.append(video_id)
source_god.append(f"https://www.youtube.com/watch?v={video_id}")
full_text_god.append(i.get('title', '')) # اضافه کردن عنوان ویدیو
except ImportError:
st.warning("کتابخانه youtubesearchpython نصب نیست. جستجوی یوتیوب انجام نشد.")
except Exception as e:
st.error(f"خطا در جستجوی یوتیوب: {e}")
if website_result_god and links_god:
with st.spinner(f'👨‍💻 در حال استخراج متن از {len(links_god)} وب‌سایت...'):
for l in links_god:
try:
r = requests.get(l, timeout=10) # اضافه کردن timeout
r.raise_for_status() # بررسی خطاهای HTTP
soup = BeautifulSoup(r.content, 'html.parser')
# استخراج متن تمیزتر
page_text = ' '.join(t.strip() for t in soup.find_all(text=True) if t.parent.name not in ['style', 'script', 'head', 'title', 'meta', '[document]'] and t.strip())
if page_text:
full_text_god.append(page_text + "\n\n")
except requests.exceptions.RequestException as e:
st.warning(f"خطا در دسترسی به {l}: {e}")
except Exception as e:
st.warning(f"خطا در پردازش {l}: {e}")
if yt_ids_god:
with st.spinner(f'👨‍💻 در حال استخراج متن از {len(yt_ids_god)} ویدیوی یوتیوب...'):
for video_id in yt_ids_god:
try:
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
# اولویت با زیرنویس فارسی یا انگلیسی
transcript = None
try:
transcript = transcript_list.find_generated_transcript(['fa', 'en'])
except: # اگر فارسی یا انگلیسی نبود، اولین مورد موجود
try:
transcript = next(iter(transcript_list))
except StopIteration:
st.warning(f"زیرنویسی برای ویدیوی {video_id} یافت نشد.")
continue # برو به ویدیوی بعدی
# اگر زبان اصلی انگلیسی نبود، ترجمه به انگلیسی (مدل‌ها معمولا انگلیسی بهتر کار می‌کنند)
if transcript.language_code != 'en':
try:
transcript = transcript.translate('en')
except Exception as e:
st.warning(f"خطا در ترجمه زیرنویس {video_id} به انگلیسی: {e}. از زبان اصلی ({transcript.language_code}) استفاده می‌شود.")
text_segments = transcript.fetch()
video_text = ' '.join([segment['text'] for segment in text_segments])
if video_text:
full_text_god.append(video_text)
except Exception as e:
st.warning(f"خطای استخراج زیرنویس برای {video_id}: {e}")
if not full_text_god:
st.error("هیچ متنی برای ساخت پایگاه دانش جمع‌آوری نشد. لطفاً موضوع یا تنظیمات را بررسی کنید.")
else:
with st.spinner('🧠 در حال ساخت Vectorstore با دانش جمع‌آوری شده...'):
full_text_combined = "\n\n".join(full_text_god)
st.session_state['god_text'] = [full_text_combined] # ذخیره متن کامل برای نمایش احتمالی
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=100, separator="\n\n") # overlap و separator بهتر
texts = text_splitter.create_documents([full_text_combined])
if 'hf' not in st.session_state:
st.error("Embedding model مقداردهی اولیه نشده است. لطفاً از Hugging Face خارج و دوباره وارد شوید.")
else:
embeddings = st.session_state['hf']
# ایجاد Vectorstore
random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10)) # حروف کوچک برای سازگاری بهتر
persist_directory = f"./chroma_db_{random_str}"
try:
db_god = Chroma.from_documents(texts, embeddings, persist_directory=persist_directory)
st.session_state['db_path'] = persist_directory # ذخیره مسیر دایرکتوری
except Exception as e:
st.error(f"خطا در ساخت Chroma DB: {e}")
# پاک کردن session state های مرتبط با این تلاش ناموفق
keys_to_delete = ['god_mode', 'god_mode_source', 'god_mode_info', 'db_path', 'god_text']
for key in keys_to_delete:
if key in st.session_state:
del st.session_state[key]
st.experimental_rerun()
with st.spinner('🔨 در حال ذخیره Vectorstore...'):
db_god.persist()
# ساخت فایل فشرده برای دانلود
try:
zip_filename = f"chroma_db_{random_str}.zip"
shutil.make_archive(f"chroma_db_{random_str}", 'zip', persist_directory)
st.session_state['db_zip_path'] = zip_filename # مسیر فایل زیپ برای دانلود
except Exception as e:
st.error(f"خطا در فشرده‌سازی Vectorstore: {e}")
with st.spinner('🔨 در حال ساخت زنجیره پرسش و پاسخ (QA)...'):
# ساخت Retriever
retriever_god = db_god.as_retriever(search_kwargs={"k": 3}) # k=3 نتایج مرتبط‌تر
# ساخت زنجیره QA
if 'LLM' in st.session_state:
qa_god = RetrievalQA.from_chain_type(llm=st.session_state['LLM'], chain_type='stuff', retriever=retriever_god, return_source_documents=True)
st.session_state['god_mode'] = qa_god # ذخیره زنجیره QA
st.session_state['god_mode_source'] = source_god # ذخیره منابع
info_text = f"🧠 حالت دانای کل برای موضوع **'{topic}'** فعال شد.\nدانش بر اساس:\n"
if news_god: info_text += f"- {len(news_god)} خبر 🗞️\n"
if yt_ids_god: info_text += f"- {len(yt_ids_god)} ویدیوی یوتیوب 📺\n"
if links_god: info_text += f"- {len(links_god)} وب‌سایت 🌐\n"
st.session_state['god_mode_info'] = info_text
st.success("حالت دانای کل با موفقیت فعال شد!")
else:
st.error("مدل زبان (LLM) مقداردهی اولیه نشده است.")
st.experimental_rerun()
elif 'god_mode' in st.session_state: # اگر GOD MODE فعال باشد
with st.expander("✅ **حالت دانای کل (GOD MODE) فعال است** 🚀", expanded=True):
st.markdown(st.session_state.get('god_mode_info', "اطلاعاتی موجود نیست."))
if 'db_zip_path' in st.session_state:
try:
with open(st.session_state['db_zip_path'], 'rb') as fp:
st.download_button(
label="📩 دانلود Vectorstore (.zip)",
data=fp,
file_name=st.session_state['db_zip_path'],
mime='application/zip'
)
except FileNotFoundError:
st.warning("فایل فشرده Vectorstore یافت نشد.")
except Exception as e:
st.error(f"خطا در خواندن فایل فشرده: {e}")
if st.button('🧠🛑 غیرفعال کردن حالت دانای کل'):
# پاک کردن session state های مرتبط
keys_to_delete = ['god_mode', 'god_mode_source', 'god_mode_info', 'db_path', 'db_zip_path', 'god_text']
for key in keys_to_delete:
if key in st.session_state:
del st.session_state[key]
# پاک کردن دایرکتوری Chroma DB اگر مسیرش ذخیره شده باشد
if 'db_path' in st.session_state and os.path.exists(st.session_state['db_path']):
try:
shutil.rmtree(st.session_state['db_path'])
except Exception as e:
st.warning(f"خطا در پاک کردن دایرکتوری Chroma DB: {e}")
# پاک کردن فایل زیپ اگر مسیرش ذخیره شده باشد
if 'db_zip_path' in st.session_state and os.path.exists(st.session_state['db_zip_path']):
try:
os.remove(st.session_state['db_zip_path'])
except Exception as e:
st.warning(f"خطا در پاک کردن فایل زیپ Vectorstore: {e}")
st.session_state['plugin'] = "🛑 بدون افزونه"
st.experimental_rerun()
# DATA PLUGIN (CSV)
if st.session_state['plugin'] == "📋 گفتگو با داده‌های شما (CSV)":
if 'df' not in st.session_state:
with st.expander("📋 گفتگو با داده‌های شما (CSV)", expanded=True):
uploaded_csv = st.file_uploader("فایل CSV خود را بارگذاری کنید", type=['csv'])
if uploaded_csv is not None:
try:
df = pd.read_csv(uploaded_csv)
st.session_state['df'] = df
st.success("فایل CSV با موفقیت بارگذاری شد.")
st.experimental_rerun()
except Exception as e:
st.error(f"خطا در خواندن فایل CSV: {e}")
else: # اگر فایل بارگذاری شده باشد
with st.expander("📋 داده‌های بارگذاری شده (CSV)", expanded=False): # به صورت بسته نشان داده شود
st.dataframe(st.session_state['df'].head()) # نمایش چند سطر اول
if st.button('🛑📋 حذف داده‌های CSV از حافظه'):
del st.session_state['df']
st.session_state['plugin'] = "🛑 بدون افزونه"
st.experimental_rerun()
# DOCUMENTS PLUGIN (PDF, TXT, DOCX)
if st.session_state['plugin'] == "📝 گفتگو با اسناد شما":
if 'documents_qa' not in st.session_state:
with st.expander("📝 گفتگو با اسناد شما (PDF, TXT, DOCX)", expanded=True):
uploaded_docs = st.file_uploader("اسناد خود را بارگذاری کنید", type=['txt', 'pdf', 'docx'], accept_multiple_files=True)
if uploaded_docs and st.button('📝✅ بارگذاری و پردازش اسناد'):
documents_text = []
with st.spinner('🔨 در حال خواندن اسناد...'):
for doc_file in uploaded_docs:
try:
if doc_file.type == 'text/plain':
# خواندن با انکودینگ مناسب
try:
documents_text.append(doc_file.read().decode('utf-8'))
except UnicodeDecodeError:
st.warning(f"فایل {doc_file.name} با UTF-8 خوانده نشد، تلاش با انکودینگ دیگر...")
# اینجا می‌توانید انکودینگ‌های دیگری را امتحان کنید یا خطا دهید
documents_text.append(doc_file.read().decode('latin-1', errors='ignore')) # مثال
elif doc_file.type == 'application/pdf':
with pdfplumber.open(doc_file) as pdf:
doc_pages = [page.extract_text() for page in pdf.pages if page.extract_text()] # فقط صفحاتی که متن دارند
documents_text.append("\n".join(doc_pages))
elif doc_file.type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document":
text = docx2txt.process(doc_file)
documents_text.append(text)
except Exception as e:
st.error(f"خطا در خواندن فایل {doc_file.name}: {e}")
if not documents_text:
st.error("هیچ متنی از اسناد استخراج نشد.")
else:
st.session_state['documents_raw_text'] = documents_text # ذخیره متن خام برای نمایش
with st.spinner('🔨 در حال ساخت Vectorstore برای اسناد...'):
text_splitter_docs = CharacterTextSplitter(chunk_size=1000, chunk_overlap=100, separator="\n")
texts_docs = text_splitter_docs.create_documents(documents_text)
if 'hf' not in st.session_state:
st.error("Embedding model مقداردهی اولیه نشده است.")
else:
embeddings_docs = st.session_state['hf']
random_str_docs = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10))
persist_directory_docs = f"./chroma_db_docs_{random_str_docs}"
try:
db_docs = Chroma.from_documents(texts_docs, embeddings_docs, persist_directory=persist_directory_docs)
st.session_state['db_docs_path'] = persist_directory_docs
except Exception as e:
st.error(f"خطا در ساخت Chroma DB برای اسناد: {e}")
# پاک کردن session state
if 'documents_raw_text' in st.session_state: del st.session_state['documents_raw_text']
st.experimental_rerun()
with st.spinner('🔨 در حال ذخیره Vectorstore اسناد...'):
db_docs.persist()
try:
zip_filename_docs = f"chroma_db_docs_{random_str_docs}.zip"
shutil.make_archive(f"chroma_db_docs_{random_str_docs}", 'zip', persist_directory_docs)
st.session_state['db_docs_zip_path'] = zip_filename_docs
except Exception as e:
st.error(f"خطا در فشرده‌سازی Vectorstore اسناد: {e}")
with st.spinner('🔨 در حال ساخت زنجیره QA برای اسناد...'):
retriever_docs = db_docs.as_retriever(search_kwargs={"k": 3})
if 'LLM' in st.session_state:
qa_docs = RetrievalQA.from_chain_type(llm=st.session_state['LLM'], chain_type='stuff', retriever=retriever_docs, return_source_documents=True)
st.session_state['documents_qa'] = qa_docs # ذخیره زنجیره QA
st.success("اسناد با موفقیت پردازش شدند.")
else:
st.error("مدل زبان (LLM) مقداردهی اولیه نشده است.")
st.experimental_rerun()
else: # اگر اسناد پردازش شده باشند
with st.expander("📝 اسناد بارگذاری شده", expanded=False):
# نمایش خلاصه یا نام فایل‌ها
if 'documents_raw_text' in st.session_state:
st.write(f"تعداد {len(st.session_state['documents_raw_text'])} سند بارگذاری شده است.")
# st.write(st.session_state['documents_raw_text']) # نمایش کامل متن ممکن است طولانی باشد
if 'db_docs_zip_path' in st.session_state:
try:
with open(st.session_state['db_docs_zip_path'], 'rb') as fp:
st.download_button(
label="📩 دانلود Vectorstore اسناد (.zip)",
data=fp,
file_name=st.session_state['db_docs_zip_path'],
mime='application/zip'
)
except FileNotFoundError:
st.warning("فایل فشرده Vectorstore اسناد یافت نشد.")
except Exception as e:
st.error(f"خطا در خواندن فایل فشرده اسناد: {e}")
if st.button('🛑📝 حذف اسناد از حافظه'):
keys_to_delete = ['documents_qa', 'documents_raw_text', 'db_docs_path', 'db_docs_zip_path']
# پاک کردن فایل‌ها و دایرکتوری
if 'db_docs_path' in st.session_state and os.path.exists(st.session_state['db_docs_path']):
shutil.rmtree(st.session_state['db_docs_path'], ignore_errors=True)
if 'db_docs_zip_path' in st.session_state and os.path.exists(st.session_state['db_docs_zip_path']):
os.remove(st.session_state['db_docs_zip_path'])
# پاک کردن session state
for key in keys_to_delete:
if key in st.session_state:
del st.session_state[key]
st.session_state['plugin'] = "🛑 بدون افزونه"
st.experimental_rerun()
# AUDIO PLUGIN
if st.session_state['plugin'] == "🎧 گفتگو با فایل صوتی شما":
if 'audio_qa' not in st.session_state:
with st.expander("🎙 گفتگو با فایل صوتی شما (WAV, MP3)", expanded=True):
uploaded_audio = st.file_uploader("فایل صوتی خود را بارگذاری کنید", type=['wav', 'mp3'])
if uploaded_audio is not None:
file_path_audio = f"./temp_audio_{uploaded_audio.name}"
wav_path_audio = file_path_audio.rsplit('.', 1)[0] + '.wav' # مسیر فایل wav نهایی
with open(file_path_audio, 'wb') as f:
f.write(uploaded_audio.getvalue())
# تبدیل به WAV در صورت نیاز
if uploaded_audio.type == 'audio/mpeg': # MP3
with st.spinner('🔨 در حال تبدیل MP3 به WAV...'):
try:
sound = AudioSegment.from_mp3(file_path_audio)
sound.export(wav_path_audio, format="wav")
# حذف فایل mp3 موقت
os.remove(file_path_audio)
audio_to_process = wav_path_audio
except Exception as e:
st.error(f"خطا در تبدیل MP3 به WAV: {e}")
# پاک کردن فایل اگر تبدیل ناموفق بود
if os.path.exists(file_path_audio): os.remove(file_path_audio)
if os.path.exists(wav_path_audio): os.remove(wav_path_audio)
st.stop() # توقف اجرا
elif uploaded_audio.type == 'audio/wav':
audio_to_process = file_path_audio # همان فایل wav است
else:
st.error("فرمت فایل صوتی پشتیبانی نمی‌شود (فقط WAV و MP3).")
os.remove(file_path_audio) # پاک کردن فایل آپلود شده
st.stop()
# تشخیص گفتار
r = sr.Recognizer()
with st.spinner('🎤 در حال تشخیص گفتار از فایل صوتی...'):
try:
with sr.AudioFile(audio_to_process) as source:
audio_data = r.record(source)
# تلاش برای تشخیص به فارسی و سپس انگلیسی
try:
text_audio = r.recognize_google(audio_data, language='fa-IR')
st.info("متن به زبان فارسی تشخیص داده شد.")
except sr.UnknownValueError:
st.warning("گفتار فارسی تشخیص داده نشد، تلاش برای انگلیسی...")
try:
text_audio = r.recognize_google(audio_data, language='en-US')
st.info("متن به زبان انگلیسی تشخیص داده شد.")
except sr.UnknownValueError:
st.error("گفتار در فایل صوتی تشخیص داده نشد.")
text_audio = None
except sr.RequestError as e:
st.error(f"خطا در اتصال به سرویس تشخیص گفتار گوگل: {e}")
text_audio = None
except sr.RequestError as e:
st.error(f"خطا در اتصال به سرویس تشخیص گفتار گوگل: {e}")
text_audio = None
except Exception as e:
st.error(f"خطا در پردازش فایل صوتی: {e}")
text_audio = None
finally:
# پاک کردن فایل صوتی پردازش شده (wav)
if os.path.exists(audio_to_process):
os.remove(audio_to_process)
if text_audio:
st.session_state['audio_text'] = text_audio # ذخیره متن برای نمایش
with st.spinner('🎙 در حال ساخت Vectorstore برای فایل صوتی...'):
text_splitter_audio = CharacterTextSplitter(chunk_size=1000, chunk_overlap=100, separator="\n")
texts_audio = text_splitter_audio.create_documents([text_audio])
if 'hf' not in st.session_state:
st.error("Embedding model مقداردهی اولیه نشده است.")
else:
embeddings_audio = st.session_state['hf']
random_str_audio = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10))
persist_directory_audio = f"./chroma_db_audio_{random_str_audio}"
try:
db_audio = Chroma.from_documents(texts_audio, embeddings_audio, persist_directory=persist_directory_audio)
st.session_state['db_audio_path'] = persist_directory_audio
except Exception as e:
st.error(f"خطا در ساخت Chroma DB برای صوت: {e}")
if 'audio_text' in st.session_state: del st.session_state['audio_text']
st.experimental_rerun()
with st.spinner('🎙 در حال ذخیره Vectorstore فایل صوتی...'):
db_audio.persist()
try:
zip_filename_audio = f"chroma_db_audio_{random_str_audio}.zip"
shutil.make_archive(f"chroma_db_audio_{random_str_audio}", 'zip', persist_directory_audio)
st.session_state['db_audio_zip_path'] = zip_filename_audio
except Exception as e:
st.error(f"خطا در فشرده‌سازی Vectorstore صوت: {e}")
with st.spinner('🎙 در حال ساخت زنجیره QA برای فایل صوتی...'):
retriever_audio = db_audio.as_retriever(search_kwargs={"k": 3})
if 'LLM' in st.session_state:
qa_audio = RetrievalQA.from_chain_type(llm=st.session_state['LLM'], chain_type='stuff', retriever=retriever_audio, return_source_documents=True)
st.session_state['audio_qa'] = qa_audio
st.success("فایل صوتی با موفقیت پردازش شد.")
else:
st.error("مدل زبان (LLM) مقداردهی اولیه نشده است.")
st.experimental_rerun()
else:
st.error("محتوایی از فایل صوتی استخراج نشد.")
else: # اگر فایل صوتی پردازش شده باشد
with st.expander("🎧 فایل صوتی پردازش شده", expanded=False):
st.write("متن استخراج شده:")
st.write(st.session_state.get('audio_text', "متن موجود نیست."))
if 'db_audio_zip_path' in st.session_state:
try:
with open(st.session_state['db_audio_zip_path'], 'rb') as fp:
st.download_button(
label="📩 دانلود Vectorstore فایل صوتی (.zip)",
data=fp,
file_name=st.session_state['db_audio_zip_path'],
mime='application/zip'
)
except FileNotFoundError:
st.warning("فایل فشرده Vectorstore صوت یافت نشد.")
except Exception as e:
st.error(f"خطا در خواندن فایل فشرده صوت: {e}")
if st.button('🛑🎙 حذف فایل صوتی از حافظه'):
keys_to_delete = ['audio_qa', 'audio_text', 'db_audio_path', 'db_audio_zip_path']
# پاک کردن فایل‌ها و دایرکتوری
if 'db_audio_path' in st.session_state and os.path.exists(st.session_state['db_audio_path']):
shutil.rmtree(st.session_state['db_audio_path'], ignore_errors=True)
if 'db_audio_zip_path' in st.session_state and os.path.exists(st.session_state['db_audio_zip_path']):
os.remove(st.session_state['db_audio_zip_path'])
# پاک کردن session state
for key in keys_to_delete:
if key in st.session_state:
del st.session_state[key]
st.session_state['plugin'] = "🛑 بدون افزونه"
st.experimental_rerun()
# YT PLUGIN
if st.session_state['plugin'] == "🎥 گفتگو با ویدیوی یوتیوب":
if 'yt_qa' not in st.session_state:
with st.expander("🎥 گفتگو با ویدیوی یوتیوب", expanded=True):
yt_urls = st.text_area("📺 آدرس‌های ویدیوهای یوتیوب را وارد کنید (هر آدرس در یک خط، حداکثر ۳ آدرس)")
if yt_urls and st.button('🎥✅ افزودن ویدیو(های) یوتیوب به حافظه'):
video_ids = []
urls = yt_urls.strip().split("\n")
for url in urls[:3]: # حداکثر ۳ ویدیو
url = url.strip()
if not url: continue
try:
# استخراج video ID از فرمت‌های مختلف URL
if "youtube.com/watch?v=" in url:
video_id = url.split("watch?v=")[1].split("&")[0]
elif "youtu.be/" in url:
video_id = url.split("youtu.be/")[1].split("?")[0]
else:
st.warning(f"فرمت URL نامعتبر: {url}")
continue
video_ids.append(video_id)
except IndexError:
st.warning(f"فرمت URL نامعتبر: {url}")
if not video_ids:
st.error("هیچ آدرس معتبر یوتیوب وارد نشد.")
else:
text_list_yt = []
with st.spinner(f'🎥 در حال استخراج متن از {len(video_ids)} ویدیوی یوتیوب...'):
for video_id in video_ids:
try:
transcript_list_yt = YouTubeTranscriptApi.list_transcripts(video_id)
# مشابه حالت GOD MODE، اولویت با فارسی/انگلیسی
transcript_yt = None
try:
transcript_yt = transcript_list_yt.find_generated_transcript(['fa', 'en'])
except:
try:
transcript_yt = next(iter(transcript_list_yt))
except StopIteration:
st.warning(f"زیرنویسی برای ویدیوی {video_id} یافت نشد.")
continue
if transcript_yt.language_code != 'en':
try:
transcript_yt = transcript_yt.translate('en')
except Exception as e:
st.warning(f"خطا در ترجمه زیرنویس {video_id} به انگلیسی: {e}. از زبان اصلی ({transcript_yt.language_code}) استفاده می‌شود.")
text_segments_yt = transcript_yt.fetch()
video_text_yt = ' '.join([segment['text'] for segment in text_segments_yt])
if video_text_yt:
text_list_yt.append(video_text_yt)
except Exception as e:
st.warning(f"خطای استخراج زیرنویس برای {video_id}: {e}")
if not text_list_yt:
st.error("محتوایی از ویدیوهای یوتیوب استخراج نشد.")
else:
st.session_state['yt_text'] = text_list_yt # ذخیره متن برای نمایش
with st.spinner('🎥 در حال ساخت Vectorstore برای ویدیو(ها)...'):
text_splitter_yt = CharacterTextSplitter(chunk_size=1000, chunk_overlap=100, separator=" ") # جدا کردن با فاصله برای متن زیرنویس
texts_yt = text_splitter_yt.create_documents(text_list_yt)
if 'hf' not in st.session_state:
st.error("Embedding model مقداردهی اولیه نشده است.")
else:
embeddings_yt = st.session_state['hf']
random_str_yt = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10))
persist_directory_yt = f"./chroma_db_yt_{random_str_yt}"
try:
db_yt = Chroma.from_documents(texts_yt, embeddings_yt, persist_directory=persist_directory_yt)
st.session_state['db_yt_path'] = persist_directory_yt
except Exception as e:
st.error(f"خطا در ساخت Chroma DB برای یوتیوب: {e}")
if 'yt_text' in st.session_state: del st.session_state['yt_text']
st.experimental_rerun()
with st.spinner('🎥 در حال ذخیره Vectorstore ویدیو(ها)...'):
db_yt.persist()
try:
zip_filename_yt = f"chroma_db_yt_{random_str_yt}.zip"
shutil.make_archive(f"chroma_db_yt_{random_str_yt}", 'zip', persist_directory_yt)
st.session_state['db_yt_zip_path'] = zip_filename_yt
except Exception as e:
st.error(f"خطا در فشرده‌سازی Vectorstore یوتیوب: {e}")
with st.spinner('🎥 در حال ساخت زنجیره QA برای ویدیو(ها)...'):
retriever_yt = db_yt.as_retriever(search_kwargs={"k": 3})
if 'LLM' in st.session_state:
qa_yt = RetrievalQA.from_chain_type(llm=st.session_state['LLM'], chain_type='stuff', retriever=retriever_yt, return_source_documents=True)
st.session_state['yt_qa'] = qa_yt
st.success("ویدیو(های) یوتیوب با موفقیت پردازش شدند.")
else:
st.error("مدل زبان (LLM) مقداردهی اولیه نشده است.")
st.experimental_rerun()
else: # اگر ویدیو(ها) پردازش شده باشند
with st.expander("🎥 ویدیو(های) یوتیوب پردازش شده", expanded=False):
st.write("متن استخراج شده:")
st.write(st.session_state.get('yt_text', ["متن موجود نیست."]))
if 'db_yt_zip_path' in st.session_state:
try:
with open(st.session_state['db_yt_zip_path'], 'rb') as fp:
st.download_button(
label="📩 دانلود Vectorstore یوتیوب (.zip)",
data=fp,
file_name=st.session_state['db_yt_zip_path'],
mime='application/zip'
)
except FileNotFoundError:
st.warning("فایل فشرده Vectorstore یوتیوب یافت نشد.")
except Exception as e:
st.error(f"خطا در خواندن فایل فشرده یوتیوب: {e}")
if st.button('🛑🎥 حذف ویدیو(های) یوتیوب از حافظه'):
keys_to_delete = ['yt_qa', 'yt_text', 'db_yt_path', 'db_yt_zip_path']
# پاک کردن فایل‌ها و دایرکتوری
if 'db_yt_path' in st.session_state and os.path.exists(st.session_state['db_yt_path']):
shutil.rmtree(st.session_state['db_yt_path'], ignore_errors=True)
if 'db_yt_zip_path' in st.session_state and os.path.exists(st.session_state['db_yt_zip_path']):
os.remove(st.session_state['db_yt_zip_path'])
# پاک کردن session state
for key in keys_to_delete:
if key in st.session_state:
del st.session_state[key]
st.session_state['plugin'] = "🛑 بدون افزونه"
st.experimental_rerun()
# WEBSITE PLUGIN
if st.session_state['plugin'] == "🔗 گفتگو با وب‌سایت":
if 'website_qa' not in st.session_state:
with st.expander("🔗 گفتگو با وب‌سایت", expanded=True):
website_urls = st.text_area("🔗 آدرس‌های وب‌سایت‌ها را وارد کنید (هر آدرس در یک خط، حداکثر ۱۰ آدرس)")
if website_urls and st.button('🔗✅ افزودن وب‌سایت(ها) به حافظه'):
text_list_web = []
urls_web = website_urls.strip().split("\n")
with st.spinner(f'🔗 در حال استخراج متن از {len(urls_web[:10])} وب‌سایت...'):
for url_web in urls_web[:10]: # حداکثر ۱۰ وب‌سایت
url_web = url_web.strip()
if not url_web: continue
try:
# افزودن http اگر وجود ندارد
if not url_web.startswith(('http://', 'https://')):
url_web = 'https://' + url_web
r_web = requests.get(url_web, timeout=15, headers={'User-Agent': 'Mozilla/5.0'}) # User-Agent
r_web.raise_for_status()
soup_web = BeautifulSoup(r_web.content, 'html.parser')
# استخراج متن تمیزتر (مشابه GOD MODE)
page_text_web = ' '.join(t.strip() for t in soup_web.find_all(text=True) if t.parent.name not in ['style', 'script', 'head', 'title', 'meta', '[document]', 'a', 'button'] and t.strip()) # حذف لینک‌ها و دکمه‌ها
if page_text_web:
text_list_web.append(page_text_web + "\n\n")
else:
st.warning(f"متن قابل استخراجی از {url_web} یافت نشد.")
except requests.exceptions.RequestException as e:
st.warning(f"خطا در دسترسی به {url_web}: {e}")
except Exception as e:
st.warning(f"خطا در پردازش {url_web}: {e}")
if not text_list_web:
st.error("محتوایی از وب‌سایت‌ها استخراج نشد.")
else:
st.session_state['web_text'] = text_list_web # ذخیره متن برای نمایش
with st.spinner('🔗 در حال ساخت Vectorstore برای وب‌سایت(ها)...'):
text_splitter_web = CharacterTextSplitter(chunk_size=1000, chunk_overlap=100, separator="\n\n")
texts_web = text_splitter_web.create_documents(text_list_web)
if 'hf' not in st.session_state:
st.error("Embedding model مقداردهی اولیه نشده است.")
else:
embeddings_web = st.session_state['hf']
random_str_web = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10))
persist_directory_web = f"./chroma_db_web_{random_str_web}"
try:
db_web = Chroma.from_documents(texts_web, embeddings_web, persist_directory=persist_directory_web)
st.session_state['db_web_path'] = persist_directory_web
except Exception as e:
st.error(f"خطا در ساخت Chroma DB برای وب‌سایت: {e}")
if 'web_text' in st.session_state: del st.session_state['web_text']
st.experimental_rerun()
with st.spinner('🔗 در حال ذخیره Vectorstore وب‌سایت(ها)...'):
db_web.persist()
try:
zip_filename_web = f"chroma_db_web_{random_str_web}.zip"
shutil.make_archive(f"chroma_db_web_{random_str_web}", 'zip', persist_directory_web)
st.session_state['db_web_zip_path'] = zip_filename_web
except Exception as e:
st.error(f"خطا در فشرده‌سازی Vectorstore وب‌سایت: {e}")
with st.spinner('🔗 در حال ساخت زنجیره QA برای وب‌سایت(ها)...'):
retriever_web = db_web.as_retriever(search_kwargs={"k": 3})
if 'LLM' in st.session_state:
qa_web = RetrievalQA.from_chain_type(llm=st.session_state['LLM'], chain_type='stuff', retriever=retriever_web, return_source_documents=True)
st.session_state['website_qa'] = qa_web
st.success("وب‌سایت(ها) با موفقیت پردازش شدند.")
else:
st.error("مدل زبان (LLM) مقداردهی اولیه نشده است.")
st.experimental_rerun()
else: # اگر وب‌سایت(ها) پردازش شده باشند
with st.expander("🔗 وب‌سایت(های) پردازش شده", expanded=False):
st.write("متن استخراج شده:")
st.write(st.session_state.get('web_text', ["متن موجود نیست."]))
if 'db_web_zip_path' in st.session_state:
try:
with open(st.session_state['db_web_zip_path'], 'rb') as fp:
st.download_button(
label="📩 دانلود Vectorstore وب‌سایت (.zip)",
data=fp,
file_name=st.session_state['db_web_zip_path'],
mime='application/zip'
)
except FileNotFoundError:
st.warning("فایل فشرده Vectorstore وب‌سایت یافت نشد.")
except Exception as e:
st.error(f"خطا در خواندن فایل فشرده وب‌سایت: {e}")
if st.button('🛑🔗 حذف وب‌سایت(ها) از حافظه'):
keys_to_delete = ['website_qa', 'web_text', 'db_web_path', 'db_web_zip_path']
# پاک کردن فایل‌ها و دایرکتوری
if 'db_web_path' in st.session_state and os.path.exists(st.session_state['db_web_path']):
shutil.rmtree(st.session_state['db_web_path'], ignore_errors=True)
if 'db_web_zip_path' in st.session_state and os.path.exists(st.session_state['db_web_zip_path']):
os.remove(st.session_state['db_web_zip_path'])
# پاک کردن session state
for key in keys_to_delete:
if key in st.session_state:
del st.session_state[key]
st.session_state['plugin'] = "🛑 بدون افزونه"
st.experimental_rerun()
# UPLOAD SAVED VECTORSTORE
if st.session_state['plugin'] == "💾 بارگذاری VectorStore ذخیره شده":
if 'uploaded_db_qa' not in st.session_state:
with st.expander("💾 بارگذاری VectorStore ذخیره شده (.zip)", expanded=True):
uploaded_zip = st.file_uploader("فایل VectorStore فشرده (.zip) خود را بارگذاری کنید", type=["zip"])
if uploaded_zip is not None and st.button('✅💾 بارگذاری و افزودن VectorStore به حافظه'):
with st.spinner('💾 در حال استخراج VectorStore...'):
try:
# ایجاد یک دایرکتوری موقت و یکتا
random_str_up = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10))
extract_path = f"./uploaded_chroma_{random_str_up}"
os.makedirs(extract_path, exist_ok=True)
# استخراج فایل زیپ
with ZipFile(uploaded_zip, 'r') as zipObj:
zipObj.extractall(extract_path)
st.session_state['uploaded_db_path'] = extract_path # ذخیره مسیر استخراج شده
except Exception as e:
st.error(f"خطا در استخراج فایل فشرده: {e}")
# پاک کردن دایرکتوری در صورت خطا
if os.path.exists(extract_path): shutil.rmtree(extract_path, ignore_errors=True)
st.stop()
with st.spinner('💾 در حال بارگذاری VectorStore و ساخت زنجیره QA...'):
if 'hf' not in st.session_state:
st.error("Embedding model مقداردهی اولیه نشده است.")
else:
hf_up = st.session_state['hf']
try:
# بارگذاری VectorStore از دایرکتوری استخراج شده
# نام دایرکتوری داخل زیپ ممکن است ثابت نباشد، فرض می‌کنیم یک دایرکتوری اصلی دارد
# یا اگر ساختار مشخصی دارد (مثلا همیشه chroma_db_xxx) باید آن را پیدا کرد
# ساده‌ترین حالت: فرض کنیم محتویات مستقیما در extract_path هستند
db_up = Chroma(persist_directory=extract_path, embedding_function=hf_up)
retriever_up = db_up.as_retriever(search_kwargs={"k": 3})
if 'LLM' in st.session_state:
qa_up = RetrievalQA.from_chain_type(llm=st.session_state['LLM'], chain_type='stuff', retriever=retriever_up, return_source_documents=True)
st.session_state['uploaded_db_qa'] = qa_up
st.success("VectorStore ذخیره شده با موفقیت بارگذاری شد.")
else:
st.error("مدل زبان (LLM) مقداردهی اولیه نشده است.")
except Exception as e:
st.error(f"خطا در بارگذاری Chroma DB یا ساخت QA: {e}")
# پاک کردن session state و دایرکتوری
if 'uploaded_db_path' in st.session_state:
if os.path.exists(st.session_state['uploaded_db_path']):
shutil.rmtree(st.session_state['uploaded_db_path'], ignore_errors=True)
del st.session_state['uploaded_db_path']
if 'uploaded_db_qa' in st.session_state: del st.session_state['uploaded_db_qa']
st.experimental_rerun()
else: # اگر VectorStore بارگذاری شده باشد
with st.expander("💾 VectorStore ذخیره شده بارگذاری شد", expanded=False):
st.success("📚 VectorStore آماده استفاده است.")
if st.button('🛑💾 حذف VectorStore بارگذاری شده از حافظه'):
keys_to_delete = ['uploaded_db_qa', 'uploaded_db_path']
# پاک کردن دایرکتوری
if 'uploaded_db_path' in st.session_state and os.path.exists(st.session_state['uploaded_db_path']):
shutil.rmtree(st.session_state['uploaded_db_path'], ignore_errors=True)
# پاک کردن session state
for key in keys_to_delete:
if key in st.session_state:
del st.session_state[key]
st.session_state['plugin'] = "🛑 بدون افزونه"
st.experimental_rerun()
# --- پایان منطق پلاگین‌ها ---
add_vertical_space(2)
st.markdown("---")
if st.button('🗑 خروج از حساب کاربری'):
keys_to_delete = list(st.session_state.keys())
# پاک کردن فایل‌ها و دایرکتوری‌های موقت ایجاد شده
for key in keys_to_delete:
if key.endswith('_path') and isinstance(st.session_state[key], str) and os.path.exists(st.session_state[key]):
if os.path.isdir(st.session_state[key]):
shutil.rmtree(st.session_state[key], ignore_errors=True)
elif os.path.isfile(st.session_state[key]):
os.remove(st.session_state[key])
# پاک کردن session state
for key in keys_to_delete:
del st.session_state[key]
st.success("با موفقیت خارج شدید.")
st.experimental_rerun()
# دکمه خروجی گرفتن از چت (اگر exportchat.py موجود باشد)
if 'export_chat' in globals():
export_chat()
else:
st.warning("تابع export_chat یافت نشد.")
add_vertical_space(5)
st.info("ساخته شده با ❤️ و Streamlit")
##### پایان نوار کناری #####
# --- محتوای اصلی صفحه ---
# ورودی کاربر
input_container = st.container()
# نمایش پاسخ‌ها
response_container = st.container()
# نمایش داده‌ها (CSV, متن استخراج شده و ...)
data_view_container = st.container()
# نمایش وضعیت بارگذاری (Spinner)
loading_container = st.container()
# کادر ورودی کاربر
with input_container:
input_text = st.chat_input("🧑‍💻 پیام خود را اینجا بنویسید 👇", key="input", disabled=(not 'hf_email' in st.session_state)) # غیرفعال تا زمان لاگین
# نمایش داده‌های مرتبط با پلاگین فعال
with data_view_container:
active_plugin = st.session_state.get('plugin', '🛑 بدون افزونه')
if active_plugin == "📋 گفتگو با داده‌های شما (CSV)" and 'df' in st.session_state:
with st.expander("🤖 مشاهده داده‌های CSV شما"):
st.dataframe(st.session_state['df'], use_container_width=True)
elif active_plugin == "📝 گفتگو با اسناد شما" and 'documents_raw_text' in st.session_state:
with st.expander("🤖 مشاهده متن اسناد شما"):
# نمایش خلاصه یا نام فایل‌ها بهتر است تا متن کامل
st.write(f"تعداد {len(st.session_state['documents_raw_text'])} سند بارگذاری شده است.")
# for i, doc_text in enumerate(st.session_state['documents_raw_text']):
# st.text_area(f"سند {i+1}", doc_text[:500] + "...", height=100) # نمایش ۵۰۰ کاراکتر اول
elif active_plugin == "🎧 گفتگو با فایل صوتی شما" and 'audio_text' in st.session_state:
with st.expander("🤖 مشاهده متن فایل صوتی شما"):
st.write(st.session_state['audio_text'])
elif active_plugin == "🎥 گفتگو با ویدیوی یوتیوب" and 'yt_text' in st.session_state:
with st.expander("🤖 مشاهده متن ویدیوی(های) یوتیوب"):
# نمایش متن هر ویدیو به صورت جداگانه
for i, vid_text in enumerate(st.session_state['yt_text']):
st.text_area(f"ویدیوی {i+1}", vid_text, height=150)
elif active_plugin == "🔗 گفتگو با وب‌سایت" and 'web_text' in st.session_state:
with st.expander("🤖 مشاهده محتوای وب‌سایت(ها)"):
for i, site_text in enumerate(st.session_state['web_text']):
st.text_area(f"وب‌سایت {i+1}", site_text[:1000] + "...", height=150) # نمایش ۱۰۰۰ کاراکتر اول
elif active_plugin == "💾 بارگذاری VectorStore ذخیره شده" and 'uploaded_db_qa' in st.session_state:
with st.expander("🗂 VectorStore بارگذاری شده"):
st.success("📚 VectorStore آماده استفاده است.")
elif active_plugin == "🧠 حالت دانای کل (GOD MODE)" and 'god_mode_source' in st.session_state:
with st.expander("🌍 مشاهده منابع استفاده شده در حالت دانای کل"):
# نمایش لینک‌ها به صورت لیست
for s in st.session_state.get('god_mode_source', []):
st.markdown(f"- [{s}]({s})") # ساخت لینک قابل کلیک
# --- تولید پاسخ ---
def generate_response(prompt, temperature, top_p, repetition_penalty, top_k, max_new_tokens):
"""
تولید پاسخ بر اساس پرامپت کاربر و پلاگین فعال.
"""
final_prompt_to_llm = "" # پرامپتی که نهایتا به LLM ارسال می‌شود
response_from_llm = "" # پاسخ خام از LLM
response_to_display = "" # پاسخی که به کاربر نمایش داده می‌شود (ممکن است شامل منبع باشد)
source_info = "" # اطلاعات منبع (برای نمایش جداگانه)
plugin_used = False # آیا از یک پلاگین خاص استفاده شد؟
error_message = None # پیام خطا در صورت بروز مشکل
active_plugin = st.session_state.get('plugin', '🛑 بدون افزونه')
qa_chain = None # متغیر برای نگهداری زنجیره QA فعال
with loading_container: # Spinner در این context نمایش داده می‌شود
try:
# تعیین زنجیره QA بر اساس پلاگین فعال
if active_plugin == "📝 گفتگو با اسناد شما" and 'documents_qa' in st.session_state:
qa_chain = st.session_state['documents_qa']
plugin_name_fa = "اسناد شما"
elif active_plugin == "🧠 حالت دانای کل (GOD MODE)" and 'god_mode' in st.session_state:
qa_chain = st.session_state['god_mode']
plugin_name_fa = "حالت دانای کل"
elif active_plugin == "🔗 گفتگو با وب‌سایت" and 'website_qa' in st.session_state:
qa_chain = st.session_state['website_qa']
plugin_name_fa = "وب‌سایت‌ها"
elif active_plugin == "💾 بارگذاری VectorStore ذخیره شده" and 'uploaded_db_qa' in st.session_state:
qa_chain = st.session_state['uploaded_db_qa']
plugin_name_fa = "VectorStore بارگذاری شده"
elif active_plugin == "🎧 گفتگو با فایل صوتی شما" and 'audio_qa' in st.session_state:
qa_chain = st.session_state['audio_qa']
plugin_name_fa = "فایل صوتی"
elif active_plugin == "🎥 گفتگو با ویدیوی یوتیوب" and 'yt_qa' in st.session_state:
qa_chain = st.session_state['yt_qa']
plugin_name_fa = "ویدیوی یوتیوب"
# 1. استفاده از پلاگین‌های مبتنی بر RetrievalQA (اسناد، وب، صوت، یوتیوب، GodMode, DB آپلود شده)
if qa_chain:
plugin_used = True
with st.spinner(f'🚀 در حال جستجو در {plugin_name_fa}...'):
result = qa_chain({"query": prompt})
context_for_prompt = result.get("result", "اطلاعاتی یافت نشد.")
# ساخت متن منبع
if result.get("source_documents"):
source_info += "\n\n✅ **منابع:**\n"
# نمایش منبع به صورت خواناتر (مثلا بخشی از متن یا نام فایل)
seen_sources = set()
for i, doc in enumerate(result["source_documents"]):
doc_repr = f"منبع {i+1}: " + doc.page_content[:150].replace('\n', ' ') + "..." # خلاصه متن
# اگر متادیتا داشت (مثلا نام فایل)، آن را اضافه کن
if hasattr(doc, 'metadata') and doc.metadata:
doc_repr += f" (متا: {doc.metadata})"
if doc_repr not in seen_sources:
source_info += f"- {doc_repr}\n"
seen_sources.add(doc_repr)
# ساخت پرامپت نهایی برای LLM با استفاده از context بدست آمده
# از یک prompt template مناسب استفاده کنید
# مثال ساده:
# context_history = f"User: {st.session_state['past'][-1]}\nBot: {st.session_state['generated'][-1]}\n" # آخرین تبادل
# final_prompt_to_llm = prompt4Context(prompt, context_history, context_for_prompt) # استفاده از تابع prompt template
# یا یک پرامپت ساده‌تر:
final_prompt_to_llm = f"""با توجه به اطلاعات زیر:
{context_for_prompt}
به سوال زیر پاسخ بده:
{prompt}
"""
# 2. استفاده از پلاگین CSV (Pandas + Sketch)
elif active_plugin == "📋 گفتگو با داده‌های شما (CSV)" and 'df' in st.session_state:
plugin_used = True
context_history = f"User: {st.session_state['past'][-1]}\nBot: {st.session_state['generated'][-1]}\n"
# تشخیص اینکه آیا کاربر کد پایتون می‌خواهد یا تحلیل داده
if any(kw in prompt.lower() for kw in ['python', 'کد', 'پایتون', 'code']):
with st.spinner('🚀 در حال تولید کد پایتون با Sketch...'):
try:
solution = "\n```python\n"
# پارامتر call_display=False مهم است
solution += st.session_state['df'].sketch.howto(prompt, call_display=False)
solution += "\n```\n\n"
# از prompt template مخصوص کد استفاده کنید
final_prompt_to_llm = prompt4Code(prompt, context_history, solution)
source_info = "\n\n✅ **کد تولید شده توسط Sketch:**\n" + solution
except Exception as e:
error_message = f"خطا در اجرای Sketch (howto): {e}"
final_prompt_to_llm = prompt # بازگشت به پرامپت اصلی در صورت خطا
else:
with st.spinner('🚀 در حال تحلیل داده با Sketch...'):
try:
solution = st.session_state['df'].sketch.ask(prompt, call_display=False)
# از prompt template مخصوص داده استفاده کنید
final_prompt_to_llm = prompt4Data(prompt, context_history, solution)
source_info = "\n\n✅ **تحلیل Sketch:**\n" + str(solution)
except Exception as e:
error_message = f"خطا در اجرای Sketch (ask): {e}"
final_prompt_to_llm = prompt # بازگشت به پرامپت اصلی در صورت خطا
# 3. استفاده از پلاگین جستجوی وب (DDGS)
elif active_plugin == "🌐 جستجوی وب" and st.session_state.get('web_search') == 'True':
plugin_used = True
with st.spinner('🚀 در حال جستجو در اینترنت...'):
internet_result = ""
internet_answer = ""
source_urls = set()
try:
with DDGS() as ddgs:
# نتایج متنی
text_results = ddgs.text(prompt,
region=st.session_state.get('region', 'wt-wt'),
safesearch=st.session_state.get('safesearch', 'moderate'),
timelimit=st.session_state.get('timelimit', 'w'),
max_results=st.session_state.get('max_results', 3))
for r in text_results:
internet_result += f"**{r.get('title', '')}**: {r.get('body', '')}\n[Link]({r.get('href', '')})\n\n"
if r.get('href'): source_urls.add(r.get('href'))
# پاسخ‌های سریع (اگر موجود باشد)
answer_results = ddgs.answers(prompt)
for r in answer_results:
internet_answer += f"**پاسخ سریع**: {r.get('text', '')}\n"
if r.get('url'): source_urls.add(r.get('url'))
context_history = f"User: {st.session_state['past'][-1]}\nBot: {st.session_state['generated'][-1]}\n"
# استفاده از prompt template مخصوص اینترنت
final_prompt_to_llm = prompt4conversationInternet(prompt, context_history, internet_result, internet_answer)
if source_urls:
source_info = "\n\n✅ **منابع وب:**\n" + "\n".join([f"- {url}" for url in source_urls])
except Exception as e:
error_message = f"خطا در جستجوی وب (DDGS): {e}"
final_prompt_to_llm = prompt # بازگشت به پرامپت اصلی در صورت خطا
# 4. حالت بدون پلاگین (مکالمه عادی)
else:
# ساخت context از تاریخچه چت
# استفاده از چند تبادل آخر برای context بهتر
history_context = ""
num_turns = 2 # تعداد تبادلات قبلی برای context
if 'past' in st.session_state and 'generated' in st.session_state:
start_index = max(0, len(st.session_state['past']) - num_turns)
for i in range(start_index, len(st.session_state['past'])):
history_context += f"User: {st.session_state['past'][i]}\nBot: {st.session_state['generated'][i]}\n"
# استفاده از prompt template مکالمه عادی
final_prompt_to_llm = prompt4conversation(prompt, history_context)
# --- ارسال پرامپت نهایی به LLM ---
if not error_message: # فقط اگر خطایی در مراحل قبلی رخ نداده باشد
with st.spinner('🚀 ربات در حال فکر کردن است...'):
if 'chatbot' in st.session_state:
# اطمینان از اینکه final_prompt_to_llm خالی نیست
if not final_prompt_to_llm:
final_prompt_to_llm = prompt # حداقل خود پرامپت کاربر را بفرست
print(f"--- پرامپت نهایی به LLM ---\n{final_prompt_to_llm}\n--------------------------") # برای دیباگ
response_from_llm = st.session_state['chatbot'].chat(
final_prompt_to_llm,
temperature=temperature,
top_p=top_p,
repetition_penalty=repetition_penalty,
top_k=top_k,
max_new_tokens=max_new_tokens
)
response_to_display = response_from_llm + source_info # ترکیب پاسخ با منبع برای نمایش
else:
error_message = "Chatbot مقداردهی اولیه نشده است."
else:
# اگر خطایی در پلاگین رخ داده، فقط پیام خطا را نمایش بده
response_to_display = f"⚠️ **خطا در پردازش پلاگین:** {error_message}"
except Exception as e:
st.error(f"خطای کلی در تابع generate_response: {e}")
response_to_display = f"😔 متاسفانه خطایی رخ داد: {e}"
return response_to_display
# --- نمایش تاریخچه چت و دریافت ورودی جدید ---
with response_container:
# بررسی اینکه کاربر لاگین کرده باشد
if 'hf_email' not in st.session_state:
st.info("👋 سلام، از دیدن شما خوشحالیم 🤗")
st.warning("👈 لطفاً برای ادامه وارد شوید (از منوی بالا سمت چپ اقدام کنید) 🚀")
st.error("👈 اگر در Hugging Face ثبت‌نام نکرده‌اید، لطفاً ابتدا ثبت‌نام کرده و سپس وارد شوید 🤗")
else:
# اگر ورودی جدیدی از کاربر دریافت شد
if input_text:
# دریافت پارامترهای مدل از session_state یا مقدار پیش‌فرض
temp = st.session_state.get('temperature', 0.5)
top_p_val = st.session_state.get('top_p', 0.95)
rep_penalty = st.session_state.get('repetition_penalty', 1.2)
top_k_val = st.session_state.get('top_k', 50)
max_tokens = st.session_state.get('max_new_tokens', 1024)
# تولید پاسخ
response = generate_response(input_text, temp, top_p_val, rep_penalty, top_k_val, max_tokens)
# افزودن ورودی و پاسخ به تاریخچه
if 'past' not in st.session_state: st.session_state['past'] = []
if 'generated' not in st.session_state: st.session_state['generated'] = []
st.session_state.past.append(input_text)
st.session_state.generated.append(response)
st.experimental_rerun() # بارگذاری مجدد برای نمایش پیام جدید
# نمایش کل تاریخچه چت
if 'generated' in st.session_state and st.session_state['generated']:
for i in range(len(st.session_state['generated'])):
# نمایش پیام کاربر
with st.chat_message(name="user", avatar="🧑‍💻"):
st.markdown(st.session_state['past'][i])
# نمایش پاسخ ربات
with st.chat_message(name="assistant", avatar="🤖"):
full_response = st.session_state['generated'][i]
# جدا کردن بخش اصلی پاسخ از منابع (اگر با ✅ **منابع:** مشخص شده باشد)
parts = full_response.split("✅ **منابع:**", 1)
main_message = parts[0].strip()
source_part = ""
if len(parts) > 1:
source_part = "✅ **منابع:**" + parts[1] # نگه داشتن عنوان منابع
# نمایش بخش اصلی پیام
st.markdown(main_message)
# نمایش منابع در یک expander جداگانه اگر وجود داشته باشد
if source_part:
with st.expander(f"📚 مشاهده منابع پیام شماره {i+1}"):
st.markdown(source_part.replace("✅ **منابع:**", "").strip()) # حذف مجدد عنوان داخل expander
# افزودن یک فاصله در انتهای چت برای اسکرول بهتر
# st.markdown('<div style="height: 50px;"></div>', unsafe_allow_html=True)