| import io |
| import random |
| import shutil |
| import string |
| from zipfile import ZipFile |
| import streamlit as st |
| from streamlit_extras.colored_header import colored_header |
| from streamlit_extras.add_vertical_space import add_vertical_space |
| from hugchat import hugchat |
| from hugchat.login import Login |
| import pandas as pd |
| import asyncio |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| import sketch |
| from langchain.text_splitter import CharacterTextSplitter |
| |
| from promptTemplate import prompt4conversation, prompt4Data, prompt4Code, prompt4Context, prompt4Audio, prompt4YT |
| from promptTemplate import prompt4conversationInternet |
| |
| |
| from exportchat import export_chat |
| from langchain.vectorstores import Chroma |
| from langchain.chains import RetrievalQA |
| from HuggingChatAPI import HuggingChat |
| from langchain.embeddings import HuggingFaceHubEmbeddings |
| from youtube_transcript_api import YouTubeTranscriptApi |
| import requests |
| from bs4 import BeautifulSoup |
| import speech_recognition as sr |
| import pdfplumber |
| import docx2txt |
| from duckduckgo_search import DDGS |
| from itertools import islice |
| from os import path |
| from pydub import AudioSegment |
| import os |
|
|
| hf = None |
| repo_id = "sentence-transformers/all-mpnet-base-v2" |
|
|
| |
| st.set_page_config( |
| page_title="گفتگو با همه چیز 💬", page_icon="🤗", layout="wide", initial_sidebar_state="expanded" |
| ) |
|
|
| |
| st.markdown(""" |
| <style> |
| /* تنظیمات کلی برای راستچین */ |
| html, body, [class*="st-"] { |
| direction: rtl !important; |
| text-align: right !important; |
| } |
| /* تنظیمات خاص برای برخی عناصر که ممکن است به چپ بچسبند */ |
| .stButton>button, .stDownloadButton>button, .stFileUploader label, .stTextInput label, .stTextArea label, .stSelectbox label, .stSlider label, .stCheckbox label { |
| text-align: right !important; |
| width: 100%; /* حفظ عرض کامل دکمهها و عناصر فرم */ |
| } |
| /* اطمینان از راستچین بودن محتوای اصلی و سایدبار */ |
| .main .block-container, .stSidebar .stBlock { |
| text-align: right !important; |
| } |
| /* راستچین کردن عناوین، پاراگرافها و دیگر عناصر متنی */ |
| h1, h2, h3, h4, h5, h6, p, li, div, span, label { |
| text-align: right !important; |
| font-family: 'Vazirmatn', sans-serif !important; /* فونت فارسی (اختیاری) */ |
| } |
| /* استایل پیامهای چت */ |
| .stChatMessage { |
| text-align: right !important; |
| direction: rtl !important; |
| } |
| /* استایل ورودی چت */ |
| .stChatInput { |
| direction: rtl !important; |
| } |
| .stChatInput textarea { |
| text-align: right !important; |
| direction: rtl !important; |
| font-family: 'Vazirmatn', sans-serif !important; /* فونت فارسی برای ورودی */ |
| } |
| /* چپچین کردن آواتارها در چت برای ظاهر بهتر */ |
| .stChatMessage [data-testid="chatAvatarIcon-user"], .stChatMessage [data-testid="chatAvatarIcon-assistant"] { |
| margin-right: 0; |
| margin-left: 0.5rem; /* فاصله آواتار از متن در حالت RTL */ |
| float: right; /* انتقال آواتار به سمت راست */ |
| } |
| .stChatMessage [data-testid="chatMessageContent"] { |
| width: calc(100% - 50px); /* تنظیم عرض متن پیام برای جا دادن آواتار */ |
| float: left; /* انتقال متن پیام به سمت چپ آواتار */ |
| } |
| /* رفع مشکل احتمالی عرض ۱۰۰٪ برای برخی عناصر داخلی استریملیت */ |
| .css-w770g5, .css-b3z5c9 { |
| width: 100%; |
| } |
| </style> |
| """, unsafe_allow_html=True) |
|
|
|
|
| |
| if 'hf_token' in st.session_state: |
| if 'hf' not in st.session_state: |
| try: |
| hf = HuggingFaceHubEmbeddings( |
| repo_id=repo_id, |
| task="feature-extraction", |
| huggingfacehub_api_token=st.session_state['hf_token'], |
| ) |
| st.session_state['hf'] = hf |
| except Exception as e: |
| st.sidebar.error(f"خطا در ایجاد Embedding: {e}") |
| st.sidebar.warning("توکن Hugging Face API خود را بررسی کنید.") |
|
|
|
|
| |
| with st.sidebar: |
| st.title('🤗💬 اپلیکیشن چت شخصی') |
| st.markdown("---") |
|
|
| if 'hf_email' not in st.session_state or 'hf_pass' not in st.session_state: |
| with st.expander("ℹ️ ورود به Hugging Face", expanded=True): |
| st.write("⚠️ برای استفاده از این برنامه باید وارد حساب Hugging Face خود شوید. میتوانید از [اینجا](https://huggingface.co/join) ثبتنام کنید.") |
| st.header('ورود به Hugging Face') |
| hf_email = st.text_input('ایمیل خود را وارد کنید:') |
| hf_pass = st.text_input('رمز عبور خود را وارد کنید:', type='password') |
| hf_token = st.text_input('توکن API خود را وارد کنید (ضروری برای Embedding):', type='password') |
| if st.button('ورود 🚀') and hf_email and hf_pass and hf_token: |
| with st.spinner('🚀 در حال ورود...'): |
| st.session_state['hf_email'] = hf_email |
| st.session_state['hf_pass'] = hf_pass |
| st.session_state['hf_token'] = hf_token |
|
|
| try: |
| |
| sign = Login(st.session_state['hf_email'], st.session_state['hf_pass']) |
| if sign: |
| st.write("yes") |
| cookies = sign.login() |
| |
| chatbot = hugchat.ChatBot(cookies=cookies.get_dict()) |
| st.session_state['chatbot'] = chatbot |
|
|
| |
| id = st.session_state['chatbot'].new_conversation() |
| st.session_state['chatbot'].change_conversation(id) |
| st.session_state['conversation'] = id |
|
|
| |
| if 'generated' not in st.session_state: |
| st.session_state['generated'] = ["من **چتبات هوشمند** هستم، چگونه میتوانم کمکتان کنم؟"] |
| if 'past' not in st.session_state: |
| st.session_state['past'] = ['سلام!'] |
|
|
| |
| if 'HuggingChat' in globals(): |
| st.session_state['LLM'] = HuggingChat(email=st.session_state['hf_email'], psw=st.session_state['hf_pass']) |
| else: |
| st.warning("کلاس HuggingChat برای Langchain یافت نشد.") |
|
|
| |
| if 'hf' not in st.session_state: |
| hf = HuggingFaceHubEmbeddings( |
| repo_id=repo_id, |
| task="feature-extraction", |
| huggingfacehub_api_token=st.session_state['hf_token'], |
| ) |
| st.session_state['hf'] = hf |
| else: |
| st.write("Error hf") |
|
|
| st.success("ورود با موفقیت انجام شد!") |
| st.experimental_rerun() |
|
|
| except Exception as e: |
| st.error(f"خطا در ورود: {e}") |
|
|
| from time import sleep |
| sleep(3) |
| |
| keys_to_delete = ['hf_email', 'hf_pass', 'hf_token', 'chatbot', 'conversation', 'generated', 'past', 'LLM', 'hf'] |
| for key in keys_to_delete: |
| if key in st.session_state: |
| del st.session_state[key] |
| st.experimental_rerun() |
|
|
| else: |
| with st.expander("ℹ️ تنظیمات پیشرفته مدل زبان"): |
| |
| temperature = st.slider('🌡 دما (Temperature)', min_value=0.1, max_value=1.0, value=0.5, step=0.01, help="مقادیر بالاتر خلاقیت بیشتر، مقادیر پایینتر تمرکز بیشتر.") |
| top_p = st.slider('💡 Top P', min_value=0.1, max_value=1.0, value=0.95, step=0.01, help="نمونهگیری هستهای؛ مجموع احتمالات کلمات بعدی.") |
| repetition_penalty = st.slider('🖌 جریمه تکرار (Repetition Penalty)', min_value=1.0, max_value=2.0, value=1.2, step=0.01, help="جلوگیری از تکرار کلمات.") |
| top_k = st.slider('❄️ Top K', min_value=1, max_value=100, value=50, step=1, help="تعداد محتملترین کلمات بعدی برای در نظر گرفتن.") |
| max_new_tokens = st.slider('📝 حداکثر توکنهای جدید (Max New Tokens)', min_value=1, max_value=1024, value=1024, step=1, help="حداکثر طول پاسخ تولید شده.") |
|
|
| st.markdown("---") |
|
|
| |
| |
| plugins = ["🛑 بدون افزونه", "🌐 جستجوی وب", "🔗 گفتگو با وبسایت", "📋 گفتگو با دادههای شما (CSV)", "📝 گفتگو با اسناد شما", "🎧 گفتگو با فایل صوتی شما", "🎥 گفتگو با ویدیوی یوتیوب", "🧠 حالت دانای کل (GOD MODE)", "💾 بارگذاری VectorStore ذخیره شده"] |
| |
| |
|
|
| if 'plugin' not in st.session_state: |
| st.session_state['plugin'] = st.selectbox('🔌 انتخاب افزونه', plugins, index=0) |
| else: |
| |
| current_plugin_index = 0 |
| try: |
| |
| |
| |
| current_plugin_index = plugins.index(st.session_state['plugin']) |
| except ValueError: |
| st.warning("افزونه قبلی یافت نشد، به حالت 'بدون افزونه' برمیگردیم.") |
| st.session_state['plugin'] = "🛑 بدون افزونه" |
|
|
| st.session_state['plugin'] = st.selectbox('🔌 انتخاب افزونه', plugins, index=current_plugin_index) |
|
|
| st.markdown("---") |
|
|
| |
|
|
| |
| if st.session_state['plugin'] == "🌐 جستجوی وب": |
| with st.expander("🌐 تنظیمات جستجوی وب", expanded=True): |
| if 'web_search' not in st.session_state or st.session_state.get('web_search') != 'True': |
| reg = ['us-en', 'uk-en', 'it-it', 'de-de', 'fr-fr', 'es-es', 'fa-ir'] |
| sf = ['on', 'moderate', 'off'] |
| tl = {'d': 'روز گذشته', 'w': 'هفته گذشته', 'm': 'ماه گذشته', 'y': 'سال گذشته'} |
|
|
| |
| st.session_state['timelimit_display'] = st.selectbox('📅 محدوده زمانی', list(tl.values()), index=1) |
| st.session_state['timelimit'] = list(tl.keys())[list(tl.values()).index(st.session_state['timelimit_display'])] |
|
|
| st.session_state['region'] = st.selectbox('🗺 منطقه', reg, index=reg.index('fa-ir') if 'fa-ir' in reg else 0) |
| st.session_state['safesearch'] = st.selectbox('🚨 جستجوی ایمن', sf, index=1) |
| st.session_state['max_results'] = st.slider('📊 حداکثر نتایج', min_value=1, max_value=5, value=st.session_state.get('max_results', 2), step=1) |
|
|
| if st.button('🌐 ذخیره و فعالسازی جستجوی وب'): |
| st.session_state['web_search'] = "True" |
| st.experimental_rerun() |
| else: |
| st.success('🚀 جستجوی وب فعال است.') |
| st.write(f"🗺 منطقه: {st.session_state.get('region', 'نامشخص')}") |
| st.write(f"🚨 جستجوی ایمن: {st.session_state.get('safesearch', 'نامشخص')}") |
| st.write(f"📅 محدوده زمانی: {st.session_state.get('timelimit_display', 'نامشخص')}") |
| if st.button('🌐🛑 غیرفعال کردن جستجوی وب'): |
| keys_to_delete = ['web_search', 'region', 'safesearch', 'timelimit', 'timelimit_display', 'max_results'] |
| for key in keys_to_delete: |
| if key in st.session_state: |
| del st.session_state[key] |
| st.session_state['plugin'] = "🛑 بدون افزونه" |
| st.experimental_rerun() |
|
|
| |
| if st.session_state['plugin'] == "🧠 حالت دانای کل (GOD MODE)": |
| if 'god_mode' not in st.session_state: |
| with st.expander("🧠 تنظیمات حالت دانای کل", expanded=True): |
| topic = st.text_input('🔎 موضوع تحقیق', "هوش مصنوعی در امور مالی") |
| web_result_god = st.checkbox('🌐 جستجوی وب', value=True, disabled=True) |
| yt_result_god = st.checkbox('🎥 جستجوی یوتیوب', value=True, disabled=True) |
| website_result_god = st.checkbox('🔗 بررسی وبسایتها', value=True, disabled=True) |
| deep_of_search_god = st.slider('📊 عمق جستجو (برای هر منبع)', min_value=1, max_value=5, value=2, step=1) |
|
|
| if st.button('🧠✅ جمعآوری دانش برای مدل'): |
| full_text_god = [] |
| links_god = [] |
| news_god = [] |
| yt_ids_god = [] |
| source_god = [] |
|
|
| with st.spinner('🌐 در حال جستجو در وب...'): |
| internet_result_god = "" |
| internet_answer_god = "" |
| try: |
| with DDGS() as ddgs: |
| |
| ddgs_gen = ddgs.text(topic, region="us-en", max_results=deep_of_search_god) |
| for r in ddgs_gen: |
| l = r['href'] |
| source_god.append(l) |
| links_god.append(l) |
| internet_result_god += str(r) + "\n\n" |
| full_text_god.append(r['body']) |
|
|
| |
| fast_answer = ddgs.news(topic, max_results=deep_of_search_god) |
| for r in fast_answer: |
| internet_answer_god += str(r) + "\n\n" |
| l = r['url'] |
| source_god.append(l) |
| news_god.append(r) |
| full_text_god.append(r['body']) |
|
|
| except Exception as e: |
| st.error(f"خطا در جستجوی وب (DDGS): {e}") |
|
|
|
|
| if yt_result_god: |
| with st.spinner('🎥 در حال جستجو در یوتیوب...'): |
| try: |
| from youtubesearchpython import VideosSearch |
| videosSearch = VideosSearch(topic, limit = deep_of_search_god) |
| yt_result_search = videosSearch.result() |
| for i in yt_result_search.get('result', []): |
| duration = i.get('duration', '0:0').split(':') |
| |
| try: |
| if len(duration) == 3 and int(duration[0]) > 0: |
| continue |
| if len(duration) == 2 and int(duration[0]) > 30: |
| continue |
| except ValueError: |
| pass |
| video_id = i.get('id') |
| if video_id: |
| yt_ids_god.append(video_id) |
| source_god.append(f"https://www.youtube.com/watch?v={video_id}") |
| full_text_god.append(i.get('title', '')) |
| except ImportError: |
| st.warning("کتابخانه youtubesearchpython نصب نیست. جستجوی یوتیوب انجام نشد.") |
| except Exception as e: |
| st.error(f"خطا در جستجوی یوتیوب: {e}") |
|
|
|
|
| if website_result_god and links_god: |
| with st.spinner(f'👨💻 در حال استخراج متن از {len(links_god)} وبسایت...'): |
| for l in links_god: |
| try: |
| r = requests.get(l, timeout=10) |
| r.raise_for_status() |
| soup = BeautifulSoup(r.content, 'html.parser') |
| |
| page_text = ' '.join(t.strip() for t in soup.find_all(text=True) if t.parent.name not in ['style', 'script', 'head', 'title', 'meta', '[document]'] and t.strip()) |
| if page_text: |
| full_text_god.append(page_text + "\n\n") |
| except requests.exceptions.RequestException as e: |
| st.warning(f"خطا در دسترسی به {l}: {e}") |
| except Exception as e: |
| st.warning(f"خطا در پردازش {l}: {e}") |
|
|
|
|
| if yt_ids_god: |
| with st.spinner(f'👨💻 در حال استخراج متن از {len(yt_ids_god)} ویدیوی یوتیوب...'): |
| for video_id in yt_ids_god: |
| try: |
| transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) |
| |
| transcript = None |
| try: |
| transcript = transcript_list.find_generated_transcript(['fa', 'en']) |
| except: |
| try: |
| transcript = next(iter(transcript_list)) |
| except StopIteration: |
| st.warning(f"زیرنویسی برای ویدیوی {video_id} یافت نشد.") |
| continue |
|
|
| |
| if transcript.language_code != 'en': |
| try: |
| transcript = transcript.translate('en') |
| except Exception as e: |
| st.warning(f"خطا در ترجمه زیرنویس {video_id} به انگلیسی: {e}. از زبان اصلی ({transcript.language_code}) استفاده میشود.") |
|
|
|
|
| text_segments = transcript.fetch() |
| video_text = ' '.join([segment['text'] for segment in text_segments]) |
| if video_text: |
| full_text_god.append(video_text) |
|
|
| except Exception as e: |
| st.warning(f"خطای استخراج زیرنویس برای {video_id}: {e}") |
|
|
|
|
| if not full_text_god: |
| st.error("هیچ متنی برای ساخت پایگاه دانش جمعآوری نشد. لطفاً موضوع یا تنظیمات را بررسی کنید.") |
| else: |
| with st.spinner('🧠 در حال ساخت Vectorstore با دانش جمعآوری شده...'): |
| full_text_combined = "\n\n".join(full_text_god) |
| st.session_state['god_text'] = [full_text_combined] |
| text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=100, separator="\n\n") |
| texts = text_splitter.create_documents([full_text_combined]) |
|
|
| if 'hf' not in st.session_state: |
| st.error("Embedding model مقداردهی اولیه نشده است. لطفاً از Hugging Face خارج و دوباره وارد شوید.") |
| else: |
| embeddings = st.session_state['hf'] |
| |
| random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10)) |
| persist_directory = f"./chroma_db_{random_str}" |
| try: |
| db_god = Chroma.from_documents(texts, embeddings, persist_directory=persist_directory) |
| st.session_state['db_path'] = persist_directory |
| except Exception as e: |
| st.error(f"خطا در ساخت Chroma DB: {e}") |
| |
| keys_to_delete = ['god_mode', 'god_mode_source', 'god_mode_info', 'db_path', 'god_text'] |
| for key in keys_to_delete: |
| if key in st.session_state: |
| del st.session_state[key] |
| st.experimental_rerun() |
|
|
|
|
| with st.spinner('🔨 در حال ذخیره Vectorstore...'): |
| db_god.persist() |
| |
| try: |
| zip_filename = f"chroma_db_{random_str}.zip" |
| shutil.make_archive(f"chroma_db_{random_str}", 'zip', persist_directory) |
| st.session_state['db_zip_path'] = zip_filename |
| except Exception as e: |
| st.error(f"خطا در فشردهسازی Vectorstore: {e}") |
|
|
|
|
| with st.spinner('🔨 در حال ساخت زنجیره پرسش و پاسخ (QA)...'): |
| |
| retriever_god = db_god.as_retriever(search_kwargs={"k": 3}) |
| |
| if 'LLM' in st.session_state: |
| qa_god = RetrievalQA.from_chain_type(llm=st.session_state['LLM'], chain_type='stuff', retriever=retriever_god, return_source_documents=True) |
| st.session_state['god_mode'] = qa_god |
| st.session_state['god_mode_source'] = source_god |
| info_text = f"🧠 حالت دانای کل برای موضوع **'{topic}'** فعال شد.\nدانش بر اساس:\n" |
| if news_god: info_text += f"- {len(news_god)} خبر 🗞️\n" |
| if yt_ids_god: info_text += f"- {len(yt_ids_god)} ویدیوی یوتیوب 📺\n" |
| if links_god: info_text += f"- {len(links_god)} وبسایت 🌐\n" |
| st.session_state['god_mode_info'] = info_text |
| st.success("حالت دانای کل با موفقیت فعال شد!") |
| else: |
| st.error("مدل زبان (LLM) مقداردهی اولیه نشده است.") |
|
|
| st.experimental_rerun() |
|
|
| elif 'god_mode' in st.session_state: |
| with st.expander("✅ **حالت دانای کل (GOD MODE) فعال است** 🚀", expanded=True): |
| st.markdown(st.session_state.get('god_mode_info', "اطلاعاتی موجود نیست.")) |
| if 'db_zip_path' in st.session_state: |
| try: |
| with open(st.session_state['db_zip_path'], 'rb') as fp: |
| st.download_button( |
| label="📩 دانلود Vectorstore (.zip)", |
| data=fp, |
| file_name=st.session_state['db_zip_path'], |
| mime='application/zip' |
| ) |
| except FileNotFoundError: |
| st.warning("فایل فشرده Vectorstore یافت نشد.") |
| except Exception as e: |
| st.error(f"خطا در خواندن فایل فشرده: {e}") |
|
|
| if st.button('🧠🛑 غیرفعال کردن حالت دانای کل'): |
| |
| keys_to_delete = ['god_mode', 'god_mode_source', 'god_mode_info', 'db_path', 'db_zip_path', 'god_text'] |
| for key in keys_to_delete: |
| if key in st.session_state: |
| del st.session_state[key] |
| |
| if 'db_path' in st.session_state and os.path.exists(st.session_state['db_path']): |
| try: |
| shutil.rmtree(st.session_state['db_path']) |
| except Exception as e: |
| st.warning(f"خطا در پاک کردن دایرکتوری Chroma DB: {e}") |
| |
| if 'db_zip_path' in st.session_state and os.path.exists(st.session_state['db_zip_path']): |
| try: |
| os.remove(st.session_state['db_zip_path']) |
| except Exception as e: |
| st.warning(f"خطا در پاک کردن فایل زیپ Vectorstore: {e}") |
|
|
| st.session_state['plugin'] = "🛑 بدون افزونه" |
| st.experimental_rerun() |
|
|
| |
| if st.session_state['plugin'] == "📋 گفتگو با دادههای شما (CSV)": |
| if 'df' not in st.session_state: |
| with st.expander("📋 گفتگو با دادههای شما (CSV)", expanded=True): |
| uploaded_csv = st.file_uploader("فایل CSV خود را بارگذاری کنید", type=['csv']) |
| if uploaded_csv is not None: |
| try: |
| df = pd.read_csv(uploaded_csv) |
| st.session_state['df'] = df |
| st.success("فایل CSV با موفقیت بارگذاری شد.") |
| st.experimental_rerun() |
| except Exception as e: |
| st.error(f"خطا در خواندن فایل CSV: {e}") |
| else: |
| with st.expander("📋 دادههای بارگذاری شده (CSV)", expanded=False): |
| st.dataframe(st.session_state['df'].head()) |
| if st.button('🛑📋 حذف دادههای CSV از حافظه'): |
| del st.session_state['df'] |
| st.session_state['plugin'] = "🛑 بدون افزونه" |
| st.experimental_rerun() |
|
|
| |
| if st.session_state['plugin'] == "📝 گفتگو با اسناد شما": |
| if 'documents_qa' not in st.session_state: |
| with st.expander("📝 گفتگو با اسناد شما (PDF, TXT, DOCX)", expanded=True): |
| uploaded_docs = st.file_uploader("اسناد خود را بارگذاری کنید", type=['txt', 'pdf', 'docx'], accept_multiple_files=True) |
| if uploaded_docs and st.button('📝✅ بارگذاری و پردازش اسناد'): |
| documents_text = [] |
| with st.spinner('🔨 در حال خواندن اسناد...'): |
| for doc_file in uploaded_docs: |
| try: |
| if doc_file.type == 'text/plain': |
| |
| try: |
| documents_text.append(doc_file.read().decode('utf-8')) |
| except UnicodeDecodeError: |
| st.warning(f"فایل {doc_file.name} با UTF-8 خوانده نشد، تلاش با انکودینگ دیگر...") |
| |
| documents_text.append(doc_file.read().decode('latin-1', errors='ignore')) |
| elif doc_file.type == 'application/pdf': |
| with pdfplumber.open(doc_file) as pdf: |
| doc_pages = [page.extract_text() for page in pdf.pages if page.extract_text()] |
| documents_text.append("\n".join(doc_pages)) |
| elif doc_file.type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": |
| text = docx2txt.process(doc_file) |
| documents_text.append(text) |
| except Exception as e: |
| st.error(f"خطا در خواندن فایل {doc_file.name}: {e}") |
|
|
| if not documents_text: |
| st.error("هیچ متنی از اسناد استخراج نشد.") |
| else: |
| st.session_state['documents_raw_text'] = documents_text |
|
|
| with st.spinner('🔨 در حال ساخت Vectorstore برای اسناد...'): |
| text_splitter_docs = CharacterTextSplitter(chunk_size=1000, chunk_overlap=100, separator="\n") |
| texts_docs = text_splitter_docs.create_documents(documents_text) |
|
|
| if 'hf' not in st.session_state: |
| st.error("Embedding model مقداردهی اولیه نشده است.") |
| else: |
| embeddings_docs = st.session_state['hf'] |
| random_str_docs = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10)) |
| persist_directory_docs = f"./chroma_db_docs_{random_str_docs}" |
| try: |
| db_docs = Chroma.from_documents(texts_docs, embeddings_docs, persist_directory=persist_directory_docs) |
| st.session_state['db_docs_path'] = persist_directory_docs |
| except Exception as e: |
| st.error(f"خطا در ساخت Chroma DB برای اسناد: {e}") |
| |
| if 'documents_raw_text' in st.session_state: del st.session_state['documents_raw_text'] |
| st.experimental_rerun() |
|
|
|
|
| with st.spinner('🔨 در حال ذخیره Vectorstore اسناد...'): |
| db_docs.persist() |
| try: |
| zip_filename_docs = f"chroma_db_docs_{random_str_docs}.zip" |
| shutil.make_archive(f"chroma_db_docs_{random_str_docs}", 'zip', persist_directory_docs) |
| st.session_state['db_docs_zip_path'] = zip_filename_docs |
| except Exception as e: |
| st.error(f"خطا در فشردهسازی Vectorstore اسناد: {e}") |
|
|
| with st.spinner('🔨 در حال ساخت زنجیره QA برای اسناد...'): |
| retriever_docs = db_docs.as_retriever(search_kwargs={"k": 3}) |
| if 'LLM' in st.session_state: |
| qa_docs = RetrievalQA.from_chain_type(llm=st.session_state['LLM'], chain_type='stuff', retriever=retriever_docs, return_source_documents=True) |
| st.session_state['documents_qa'] = qa_docs |
| st.success("اسناد با موفقیت پردازش شدند.") |
| else: |
| st.error("مدل زبان (LLM) مقداردهی اولیه نشده است.") |
|
|
| st.experimental_rerun() |
|
|
| else: |
| with st.expander("📝 اسناد بارگذاری شده", expanded=False): |
| |
| if 'documents_raw_text' in st.session_state: |
| st.write(f"تعداد {len(st.session_state['documents_raw_text'])} سند بارگذاری شده است.") |
| |
| if 'db_docs_zip_path' in st.session_state: |
| try: |
| with open(st.session_state['db_docs_zip_path'], 'rb') as fp: |
| st.download_button( |
| label="📩 دانلود Vectorstore اسناد (.zip)", |
| data=fp, |
| file_name=st.session_state['db_docs_zip_path'], |
| mime='application/zip' |
| ) |
| except FileNotFoundError: |
| st.warning("فایل فشرده Vectorstore اسناد یافت نشد.") |
| except Exception as e: |
| st.error(f"خطا در خواندن فایل فشرده اسناد: {e}") |
|
|
| if st.button('🛑📝 حذف اسناد از حافظه'): |
| keys_to_delete = ['documents_qa', 'documents_raw_text', 'db_docs_path', 'db_docs_zip_path'] |
| |
| if 'db_docs_path' in st.session_state and os.path.exists(st.session_state['db_docs_path']): |
| shutil.rmtree(st.session_state['db_docs_path'], ignore_errors=True) |
| if 'db_docs_zip_path' in st.session_state and os.path.exists(st.session_state['db_docs_zip_path']): |
| os.remove(st.session_state['db_docs_zip_path']) |
| |
| for key in keys_to_delete: |
| if key in st.session_state: |
| del st.session_state[key] |
| st.session_state['plugin'] = "🛑 بدون افزونه" |
| st.experimental_rerun() |
|
|
| |
| if st.session_state['plugin'] == "🎧 گفتگو با فایل صوتی شما": |
| if 'audio_qa' not in st.session_state: |
| with st.expander("🎙 گفتگو با فایل صوتی شما (WAV, MP3)", expanded=True): |
| uploaded_audio = st.file_uploader("فایل صوتی خود را بارگذاری کنید", type=['wav', 'mp3']) |
| if uploaded_audio is not None: |
| file_path_audio = f"./temp_audio_{uploaded_audio.name}" |
| wav_path_audio = file_path_audio.rsplit('.', 1)[0] + '.wav' |
|
|
| with open(file_path_audio, 'wb') as f: |
| f.write(uploaded_audio.getvalue()) |
|
|
| |
| if uploaded_audio.type == 'audio/mpeg': |
| with st.spinner('🔨 در حال تبدیل MP3 به WAV...'): |
| try: |
| sound = AudioSegment.from_mp3(file_path_audio) |
| sound.export(wav_path_audio, format="wav") |
| |
| os.remove(file_path_audio) |
| audio_to_process = wav_path_audio |
| except Exception as e: |
| st.error(f"خطا در تبدیل MP3 به WAV: {e}") |
| |
| if os.path.exists(file_path_audio): os.remove(file_path_audio) |
| if os.path.exists(wav_path_audio): os.remove(wav_path_audio) |
| st.stop() |
| elif uploaded_audio.type == 'audio/wav': |
| audio_to_process = file_path_audio |
| else: |
| st.error("فرمت فایل صوتی پشتیبانی نمیشود (فقط WAV و MP3).") |
| os.remove(file_path_audio) |
| st.stop() |
|
|
| |
| r = sr.Recognizer() |
| with st.spinner('🎤 در حال تشخیص گفتار از فایل صوتی...'): |
| try: |
| with sr.AudioFile(audio_to_process) as source: |
| audio_data = r.record(source) |
| |
| try: |
| text_audio = r.recognize_google(audio_data, language='fa-IR') |
| st.info("متن به زبان فارسی تشخیص داده شد.") |
| except sr.UnknownValueError: |
| st.warning("گفتار فارسی تشخیص داده نشد، تلاش برای انگلیسی...") |
| try: |
| text_audio = r.recognize_google(audio_data, language='en-US') |
| st.info("متن به زبان انگلیسی تشخیص داده شد.") |
| except sr.UnknownValueError: |
| st.error("گفتار در فایل صوتی تشخیص داده نشد.") |
| text_audio = None |
| except sr.RequestError as e: |
| st.error(f"خطا در اتصال به سرویس تشخیص گفتار گوگل: {e}") |
| text_audio = None |
| except sr.RequestError as e: |
| st.error(f"خطا در اتصال به سرویس تشخیص گفتار گوگل: {e}") |
| text_audio = None |
|
|
| except Exception as e: |
| st.error(f"خطا در پردازش فایل صوتی: {e}") |
| text_audio = None |
| finally: |
| |
| if os.path.exists(audio_to_process): |
| os.remove(audio_to_process) |
|
|
| if text_audio: |
| st.session_state['audio_text'] = text_audio |
|
|
| with st.spinner('🎙 در حال ساخت Vectorstore برای فایل صوتی...'): |
| text_splitter_audio = CharacterTextSplitter(chunk_size=1000, chunk_overlap=100, separator="\n") |
| texts_audio = text_splitter_audio.create_documents([text_audio]) |
|
|
| if 'hf' not in st.session_state: |
| st.error("Embedding model مقداردهی اولیه نشده است.") |
| else: |
| embeddings_audio = st.session_state['hf'] |
| random_str_audio = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10)) |
| persist_directory_audio = f"./chroma_db_audio_{random_str_audio}" |
| try: |
| db_audio = Chroma.from_documents(texts_audio, embeddings_audio, persist_directory=persist_directory_audio) |
| st.session_state['db_audio_path'] = persist_directory_audio |
| except Exception as e: |
| st.error(f"خطا در ساخت Chroma DB برای صوت: {e}") |
| if 'audio_text' in st.session_state: del st.session_state['audio_text'] |
| st.experimental_rerun() |
|
|
| with st.spinner('🎙 در حال ذخیره Vectorstore فایل صوتی...'): |
| db_audio.persist() |
| try: |
| zip_filename_audio = f"chroma_db_audio_{random_str_audio}.zip" |
| shutil.make_archive(f"chroma_db_audio_{random_str_audio}", 'zip', persist_directory_audio) |
| st.session_state['db_audio_zip_path'] = zip_filename_audio |
| except Exception as e: |
| st.error(f"خطا در فشردهسازی Vectorstore صوت: {e}") |
|
|
|
|
| with st.spinner('🎙 در حال ساخت زنجیره QA برای فایل صوتی...'): |
| retriever_audio = db_audio.as_retriever(search_kwargs={"k": 3}) |
| if 'LLM' in st.session_state: |
| qa_audio = RetrievalQA.from_chain_type(llm=st.session_state['LLM'], chain_type='stuff', retriever=retriever_audio, return_source_documents=True) |
| st.session_state['audio_qa'] = qa_audio |
| st.success("فایل صوتی با موفقیت پردازش شد.") |
| else: |
| st.error("مدل زبان (LLM) مقداردهی اولیه نشده است.") |
|
|
| st.experimental_rerun() |
| else: |
| st.error("محتوایی از فایل صوتی استخراج نشد.") |
|
|
|
|
| else: |
| with st.expander("🎧 فایل صوتی پردازش شده", expanded=False): |
| st.write("متن استخراج شده:") |
| st.write(st.session_state.get('audio_text', "متن موجود نیست.")) |
| if 'db_audio_zip_path' in st.session_state: |
| try: |
| with open(st.session_state['db_audio_zip_path'], 'rb') as fp: |
| st.download_button( |
| label="📩 دانلود Vectorstore فایل صوتی (.zip)", |
| data=fp, |
| file_name=st.session_state['db_audio_zip_path'], |
| mime='application/zip' |
| ) |
| except FileNotFoundError: |
| st.warning("فایل فشرده Vectorstore صوت یافت نشد.") |
| except Exception as e: |
| st.error(f"خطا در خواندن فایل فشرده صوت: {e}") |
|
|
| if st.button('🛑🎙 حذف فایل صوتی از حافظه'): |
| keys_to_delete = ['audio_qa', 'audio_text', 'db_audio_path', 'db_audio_zip_path'] |
| |
| if 'db_audio_path' in st.session_state and os.path.exists(st.session_state['db_audio_path']): |
| shutil.rmtree(st.session_state['db_audio_path'], ignore_errors=True) |
| if 'db_audio_zip_path' in st.session_state and os.path.exists(st.session_state['db_audio_zip_path']): |
| os.remove(st.session_state['db_audio_zip_path']) |
| |
| for key in keys_to_delete: |
| if key in st.session_state: |
| del st.session_state[key] |
| st.session_state['plugin'] = "🛑 بدون افزونه" |
| st.experimental_rerun() |
|
|
| |
| if st.session_state['plugin'] == "🎥 گفتگو با ویدیوی یوتیوب": |
| if 'yt_qa' not in st.session_state: |
| with st.expander("🎥 گفتگو با ویدیوی یوتیوب", expanded=True): |
| yt_urls = st.text_area("📺 آدرسهای ویدیوهای یوتیوب را وارد کنید (هر آدرس در یک خط، حداکثر ۳ آدرس)") |
| if yt_urls and st.button('🎥✅ افزودن ویدیو(های) یوتیوب به حافظه'): |
| video_ids = [] |
| urls = yt_urls.strip().split("\n") |
| for url in urls[:3]: |
| url = url.strip() |
| if not url: continue |
| try: |
| |
| if "youtube.com/watch?v=" in url: |
| video_id = url.split("watch?v=")[1].split("&")[0] |
| elif "youtu.be/" in url: |
| video_id = url.split("youtu.be/")[1].split("?")[0] |
| else: |
| st.warning(f"فرمت URL نامعتبر: {url}") |
| continue |
| video_ids.append(video_id) |
| except IndexError: |
| st.warning(f"فرمت URL نامعتبر: {url}") |
|
|
| if not video_ids: |
| st.error("هیچ آدرس معتبر یوتیوب وارد نشد.") |
| else: |
| text_list_yt = [] |
| with st.spinner(f'🎥 در حال استخراج متن از {len(video_ids)} ویدیوی یوتیوب...'): |
| for video_id in video_ids: |
| try: |
| transcript_list_yt = YouTubeTranscriptApi.list_transcripts(video_id) |
| |
| transcript_yt = None |
| try: |
| transcript_yt = transcript_list_yt.find_generated_transcript(['fa', 'en']) |
| except: |
| try: |
| transcript_yt = next(iter(transcript_list_yt)) |
| except StopIteration: |
| st.warning(f"زیرنویسی برای ویدیوی {video_id} یافت نشد.") |
| continue |
|
|
| if transcript_yt.language_code != 'en': |
| try: |
| transcript_yt = transcript_yt.translate('en') |
| except Exception as e: |
| st.warning(f"خطا در ترجمه زیرنویس {video_id} به انگلیسی: {e}. از زبان اصلی ({transcript_yt.language_code}) استفاده میشود.") |
|
|
| text_segments_yt = transcript_yt.fetch() |
| video_text_yt = ' '.join([segment['text'] for segment in text_segments_yt]) |
| if video_text_yt: |
| text_list_yt.append(video_text_yt) |
| except Exception as e: |
| st.warning(f"خطای استخراج زیرنویس برای {video_id}: {e}") |
|
|
| if not text_list_yt: |
| st.error("محتوایی از ویدیوهای یوتیوب استخراج نشد.") |
| else: |
| st.session_state['yt_text'] = text_list_yt |
|
|
| with st.spinner('🎥 در حال ساخت Vectorstore برای ویدیو(ها)...'): |
| text_splitter_yt = CharacterTextSplitter(chunk_size=1000, chunk_overlap=100, separator=" ") |
| texts_yt = text_splitter_yt.create_documents(text_list_yt) |
|
|
| if 'hf' not in st.session_state: |
| st.error("Embedding model مقداردهی اولیه نشده است.") |
| else: |
| embeddings_yt = st.session_state['hf'] |
| random_str_yt = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10)) |
| persist_directory_yt = f"./chroma_db_yt_{random_str_yt}" |
| try: |
| db_yt = Chroma.from_documents(texts_yt, embeddings_yt, persist_directory=persist_directory_yt) |
| st.session_state['db_yt_path'] = persist_directory_yt |
| except Exception as e: |
| st.error(f"خطا در ساخت Chroma DB برای یوتیوب: {e}") |
| if 'yt_text' in st.session_state: del st.session_state['yt_text'] |
| st.experimental_rerun() |
|
|
|
|
| with st.spinner('🎥 در حال ذخیره Vectorstore ویدیو(ها)...'): |
| db_yt.persist() |
| try: |
| zip_filename_yt = f"chroma_db_yt_{random_str_yt}.zip" |
| shutil.make_archive(f"chroma_db_yt_{random_str_yt}", 'zip', persist_directory_yt) |
| st.session_state['db_yt_zip_path'] = zip_filename_yt |
| except Exception as e: |
| st.error(f"خطا در فشردهسازی Vectorstore یوتیوب: {e}") |
|
|
| with st.spinner('🎥 در حال ساخت زنجیره QA برای ویدیو(ها)...'): |
| retriever_yt = db_yt.as_retriever(search_kwargs={"k": 3}) |
| if 'LLM' in st.session_state: |
| qa_yt = RetrievalQA.from_chain_type(llm=st.session_state['LLM'], chain_type='stuff', retriever=retriever_yt, return_source_documents=True) |
| st.session_state['yt_qa'] = qa_yt |
| st.success("ویدیو(های) یوتیوب با موفقیت پردازش شدند.") |
| else: |
| st.error("مدل زبان (LLM) مقداردهی اولیه نشده است.") |
|
|
| st.experimental_rerun() |
|
|
| else: |
| with st.expander("🎥 ویدیو(های) یوتیوب پردازش شده", expanded=False): |
| st.write("متن استخراج شده:") |
| st.write(st.session_state.get('yt_text', ["متن موجود نیست."])) |
| if 'db_yt_zip_path' in st.session_state: |
| try: |
| with open(st.session_state['db_yt_zip_path'], 'rb') as fp: |
| st.download_button( |
| label="📩 دانلود Vectorstore یوتیوب (.zip)", |
| data=fp, |
| file_name=st.session_state['db_yt_zip_path'], |
| mime='application/zip' |
| ) |
| except FileNotFoundError: |
| st.warning("فایل فشرده Vectorstore یوتیوب یافت نشد.") |
| except Exception as e: |
| st.error(f"خطا در خواندن فایل فشرده یوتیوب: {e}") |
|
|
| if st.button('🛑🎥 حذف ویدیو(های) یوتیوب از حافظه'): |
| keys_to_delete = ['yt_qa', 'yt_text', 'db_yt_path', 'db_yt_zip_path'] |
| |
| if 'db_yt_path' in st.session_state and os.path.exists(st.session_state['db_yt_path']): |
| shutil.rmtree(st.session_state['db_yt_path'], ignore_errors=True) |
| if 'db_yt_zip_path' in st.session_state and os.path.exists(st.session_state['db_yt_zip_path']): |
| os.remove(st.session_state['db_yt_zip_path']) |
| |
| for key in keys_to_delete: |
| if key in st.session_state: |
| del st.session_state[key] |
| st.session_state['plugin'] = "🛑 بدون افزونه" |
| st.experimental_rerun() |
|
|
| |
| if st.session_state['plugin'] == "🔗 گفتگو با وبسایت": |
| if 'website_qa' not in st.session_state: |
| with st.expander("🔗 گفتگو با وبسایت", expanded=True): |
| website_urls = st.text_area("🔗 آدرسهای وبسایتها را وارد کنید (هر آدرس در یک خط، حداکثر ۱۰ آدرس)") |
| if website_urls and st.button('🔗✅ افزودن وبسایت(ها) به حافظه'): |
| text_list_web = [] |
| urls_web = website_urls.strip().split("\n") |
| with st.spinner(f'🔗 در حال استخراج متن از {len(urls_web[:10])} وبسایت...'): |
| for url_web in urls_web[:10]: |
| url_web = url_web.strip() |
| if not url_web: continue |
| try: |
| |
| if not url_web.startswith(('http://', 'https://')): |
| url_web = 'https://' + url_web |
|
|
| r_web = requests.get(url_web, timeout=15, headers={'User-Agent': 'Mozilla/5.0'}) |
| r_web.raise_for_status() |
| soup_web = BeautifulSoup(r_web.content, 'html.parser') |
| |
| page_text_web = ' '.join(t.strip() for t in soup_web.find_all(text=True) if t.parent.name not in ['style', 'script', 'head', 'title', 'meta', '[document]', 'a', 'button'] and t.strip()) |
| if page_text_web: |
| text_list_web.append(page_text_web + "\n\n") |
| else: |
| st.warning(f"متن قابل استخراجی از {url_web} یافت نشد.") |
| except requests.exceptions.RequestException as e: |
| st.warning(f"خطا در دسترسی به {url_web}: {e}") |
| except Exception as e: |
| st.warning(f"خطا در پردازش {url_web}: {e}") |
|
|
| if not text_list_web: |
| st.error("محتوایی از وبسایتها استخراج نشد.") |
| else: |
| st.session_state['web_text'] = text_list_web |
|
|
| with st.spinner('🔗 در حال ساخت Vectorstore برای وبسایت(ها)...'): |
| text_splitter_web = CharacterTextSplitter(chunk_size=1000, chunk_overlap=100, separator="\n\n") |
| texts_web = text_splitter_web.create_documents(text_list_web) |
|
|
| if 'hf' not in st.session_state: |
| st.error("Embedding model مقداردهی اولیه نشده است.") |
| else: |
| embeddings_web = st.session_state['hf'] |
| random_str_web = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10)) |
| persist_directory_web = f"./chroma_db_web_{random_str_web}" |
| try: |
| db_web = Chroma.from_documents(texts_web, embeddings_web, persist_directory=persist_directory_web) |
| st.session_state['db_web_path'] = persist_directory_web |
| except Exception as e: |
| st.error(f"خطا در ساخت Chroma DB برای وبسایت: {e}") |
| if 'web_text' in st.session_state: del st.session_state['web_text'] |
| st.experimental_rerun() |
|
|
| with st.spinner('🔗 در حال ذخیره Vectorstore وبسایت(ها)...'): |
| db_web.persist() |
| try: |
| zip_filename_web = f"chroma_db_web_{random_str_web}.zip" |
| shutil.make_archive(f"chroma_db_web_{random_str_web}", 'zip', persist_directory_web) |
| st.session_state['db_web_zip_path'] = zip_filename_web |
| except Exception as e: |
| st.error(f"خطا در فشردهسازی Vectorstore وبسایت: {e}") |
|
|
| with st.spinner('🔗 در حال ساخت زنجیره QA برای وبسایت(ها)...'): |
| retriever_web = db_web.as_retriever(search_kwargs={"k": 3}) |
| if 'LLM' in st.session_state: |
| qa_web = RetrievalQA.from_chain_type(llm=st.session_state['LLM'], chain_type='stuff', retriever=retriever_web, return_source_documents=True) |
| st.session_state['website_qa'] = qa_web |
| st.success("وبسایت(ها) با موفقیت پردازش شدند.") |
| else: |
| st.error("مدل زبان (LLM) مقداردهی اولیه نشده است.") |
| st.experimental_rerun() |
|
|
| else: |
| with st.expander("🔗 وبسایت(های) پردازش شده", expanded=False): |
| st.write("متن استخراج شده:") |
| st.write(st.session_state.get('web_text', ["متن موجود نیست."])) |
| if 'db_web_zip_path' in st.session_state: |
| try: |
| with open(st.session_state['db_web_zip_path'], 'rb') as fp: |
| st.download_button( |
| label="📩 دانلود Vectorstore وبسایت (.zip)", |
| data=fp, |
| file_name=st.session_state['db_web_zip_path'], |
| mime='application/zip' |
| ) |
| except FileNotFoundError: |
| st.warning("فایل فشرده Vectorstore وبسایت یافت نشد.") |
| except Exception as e: |
| st.error(f"خطا در خواندن فایل فشرده وبسایت: {e}") |
|
|
| if st.button('🛑🔗 حذف وبسایت(ها) از حافظه'): |
| keys_to_delete = ['website_qa', 'web_text', 'db_web_path', 'db_web_zip_path'] |
| |
| if 'db_web_path' in st.session_state and os.path.exists(st.session_state['db_web_path']): |
| shutil.rmtree(st.session_state['db_web_path'], ignore_errors=True) |
| if 'db_web_zip_path' in st.session_state and os.path.exists(st.session_state['db_web_zip_path']): |
| os.remove(st.session_state['db_web_zip_path']) |
| |
| for key in keys_to_delete: |
| if key in st.session_state: |
| del st.session_state[key] |
| st.session_state['plugin'] = "🛑 بدون افزونه" |
| st.experimental_rerun() |
|
|
| |
| if st.session_state['plugin'] == "💾 بارگذاری VectorStore ذخیره شده": |
| if 'uploaded_db_qa' not in st.session_state: |
| with st.expander("💾 بارگذاری VectorStore ذخیره شده (.zip)", expanded=True): |
| uploaded_zip = st.file_uploader("فایل VectorStore فشرده (.zip) خود را بارگذاری کنید", type=["zip"]) |
| if uploaded_zip is not None and st.button('✅💾 بارگذاری و افزودن VectorStore به حافظه'): |
| with st.spinner('💾 در حال استخراج VectorStore...'): |
| try: |
| |
| random_str_up = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10)) |
| extract_path = f"./uploaded_chroma_{random_str_up}" |
| os.makedirs(extract_path, exist_ok=True) |
|
|
| |
| with ZipFile(uploaded_zip, 'r') as zipObj: |
| zipObj.extractall(extract_path) |
| st.session_state['uploaded_db_path'] = extract_path |
| except Exception as e: |
| st.error(f"خطا در استخراج فایل فشرده: {e}") |
| |
| if os.path.exists(extract_path): shutil.rmtree(extract_path, ignore_errors=True) |
| st.stop() |
|
|
|
|
| with st.spinner('💾 در حال بارگذاری VectorStore و ساخت زنجیره QA...'): |
| if 'hf' not in st.session_state: |
| st.error("Embedding model مقداردهی اولیه نشده است.") |
| else: |
| hf_up = st.session_state['hf'] |
| try: |
| |
| |
| |
| |
| db_up = Chroma(persist_directory=extract_path, embedding_function=hf_up) |
| retriever_up = db_up.as_retriever(search_kwargs={"k": 3}) |
|
|
| if 'LLM' in st.session_state: |
| qa_up = RetrievalQA.from_chain_type(llm=st.session_state['LLM'], chain_type='stuff', retriever=retriever_up, return_source_documents=True) |
| st.session_state['uploaded_db_qa'] = qa_up |
| st.success("VectorStore ذخیره شده با موفقیت بارگذاری شد.") |
| else: |
| st.error("مدل زبان (LLM) مقداردهی اولیه نشده است.") |
|
|
| except Exception as e: |
| st.error(f"خطا در بارگذاری Chroma DB یا ساخت QA: {e}") |
| |
| if 'uploaded_db_path' in st.session_state: |
| if os.path.exists(st.session_state['uploaded_db_path']): |
| shutil.rmtree(st.session_state['uploaded_db_path'], ignore_errors=True) |
| del st.session_state['uploaded_db_path'] |
| if 'uploaded_db_qa' in st.session_state: del st.session_state['uploaded_db_qa'] |
| st.experimental_rerun() |
|
|
|
|
| else: |
| with st.expander("💾 VectorStore ذخیره شده بارگذاری شد", expanded=False): |
| st.success("📚 VectorStore آماده استفاده است.") |
| if st.button('🛑💾 حذف VectorStore بارگذاری شده از حافظه'): |
| keys_to_delete = ['uploaded_db_qa', 'uploaded_db_path'] |
| |
| if 'uploaded_db_path' in st.session_state and os.path.exists(st.session_state['uploaded_db_path']): |
| shutil.rmtree(st.session_state['uploaded_db_path'], ignore_errors=True) |
| |
| for key in keys_to_delete: |
| if key in st.session_state: |
| del st.session_state[key] |
| st.session_state['plugin'] = "🛑 بدون افزونه" |
| st.experimental_rerun() |
|
|
| |
| add_vertical_space(2) |
| st.markdown("---") |
|
|
| if st.button('🗑 خروج از حساب کاربری'): |
| keys_to_delete = list(st.session_state.keys()) |
| |
| for key in keys_to_delete: |
| if key.endswith('_path') and isinstance(st.session_state[key], str) and os.path.exists(st.session_state[key]): |
| if os.path.isdir(st.session_state[key]): |
| shutil.rmtree(st.session_state[key], ignore_errors=True) |
| elif os.path.isfile(st.session_state[key]): |
| os.remove(st.session_state[key]) |
|
|
| |
| for key in keys_to_delete: |
| del st.session_state[key] |
| st.success("با موفقیت خارج شدید.") |
| st.experimental_rerun() |
|
|
| |
| if 'export_chat' in globals(): |
| export_chat() |
| else: |
| st.warning("تابع export_chat یافت نشد.") |
|
|
| add_vertical_space(5) |
| st.info("ساخته شده با ❤️ و Streamlit") |
|
|
|
|
| |
|
|
|
|
| |
|
|
| |
| input_container = st.container() |
| |
| response_container = st.container() |
| |
| data_view_container = st.container() |
| |
| loading_container = st.container() |
|
|
|
|
| |
| with input_container: |
| input_text = st.chat_input("🧑💻 پیام خود را اینجا بنویسید 👇", key="input", disabled=(not 'hf_email' in st.session_state)) |
|
|
| |
| with data_view_container: |
| active_plugin = st.session_state.get('plugin', '🛑 بدون افزونه') |
|
|
| if active_plugin == "📋 گفتگو با دادههای شما (CSV)" and 'df' in st.session_state: |
| with st.expander("🤖 مشاهده دادههای CSV شما"): |
| st.dataframe(st.session_state['df'], use_container_width=True) |
| elif active_plugin == "📝 گفتگو با اسناد شما" and 'documents_raw_text' in st.session_state: |
| with st.expander("🤖 مشاهده متن اسناد شما"): |
| |
| st.write(f"تعداد {len(st.session_state['documents_raw_text'])} سند بارگذاری شده است.") |
| |
| |
| elif active_plugin == "🎧 گفتگو با فایل صوتی شما" and 'audio_text' in st.session_state: |
| with st.expander("🤖 مشاهده متن فایل صوتی شما"): |
| st.write(st.session_state['audio_text']) |
| elif active_plugin == "🎥 گفتگو با ویدیوی یوتیوب" and 'yt_text' in st.session_state: |
| with st.expander("🤖 مشاهده متن ویدیوی(های) یوتیوب"): |
| |
| for i, vid_text in enumerate(st.session_state['yt_text']): |
| st.text_area(f"ویدیوی {i+1}", vid_text, height=150) |
| elif active_plugin == "🔗 گفتگو با وبسایت" and 'web_text' in st.session_state: |
| with st.expander("🤖 مشاهده محتوای وبسایت(ها)"): |
| for i, site_text in enumerate(st.session_state['web_text']): |
| st.text_area(f"وبسایت {i+1}", site_text[:1000] + "...", height=150) |
| elif active_plugin == "💾 بارگذاری VectorStore ذخیره شده" and 'uploaded_db_qa' in st.session_state: |
| with st.expander("🗂 VectorStore بارگذاری شده"): |
| st.success("📚 VectorStore آماده استفاده است.") |
| elif active_plugin == "🧠 حالت دانای کل (GOD MODE)" and 'god_mode_source' in st.session_state: |
| with st.expander("🌍 مشاهده منابع استفاده شده در حالت دانای کل"): |
| |
| for s in st.session_state.get('god_mode_source', []): |
| st.markdown(f"- [{s}]({s})") |
|
|
|
|
| |
| def generate_response(prompt, temperature, top_p, repetition_penalty, top_k, max_new_tokens): |
| """ |
| تولید پاسخ بر اساس پرامپت کاربر و پلاگین فعال. |
| """ |
| final_prompt_to_llm = "" |
| response_from_llm = "" |
| response_to_display = "" |
| source_info = "" |
| plugin_used = False |
| error_message = None |
|
|
| active_plugin = st.session_state.get('plugin', '🛑 بدون افزونه') |
| qa_chain = None |
|
|
| with loading_container: |
|
|
| try: |
| |
| if active_plugin == "📝 گفتگو با اسناد شما" and 'documents_qa' in st.session_state: |
| qa_chain = st.session_state['documents_qa'] |
| plugin_name_fa = "اسناد شما" |
| elif active_plugin == "🧠 حالت دانای کل (GOD MODE)" and 'god_mode' in st.session_state: |
| qa_chain = st.session_state['god_mode'] |
| plugin_name_fa = "حالت دانای کل" |
| elif active_plugin == "🔗 گفتگو با وبسایت" and 'website_qa' in st.session_state: |
| qa_chain = st.session_state['website_qa'] |
| plugin_name_fa = "وبسایتها" |
| elif active_plugin == "💾 بارگذاری VectorStore ذخیره شده" and 'uploaded_db_qa' in st.session_state: |
| qa_chain = st.session_state['uploaded_db_qa'] |
| plugin_name_fa = "VectorStore بارگذاری شده" |
| elif active_plugin == "🎧 گفتگو با فایل صوتی شما" and 'audio_qa' in st.session_state: |
| qa_chain = st.session_state['audio_qa'] |
| plugin_name_fa = "فایل صوتی" |
| elif active_plugin == "🎥 گفتگو با ویدیوی یوتیوب" and 'yt_qa' in st.session_state: |
| qa_chain = st.session_state['yt_qa'] |
| plugin_name_fa = "ویدیوی یوتیوب" |
|
|
| |
| if qa_chain: |
| plugin_used = True |
| with st.spinner(f'🚀 در حال جستجو در {plugin_name_fa}...'): |
| result = qa_chain({"query": prompt}) |
| context_for_prompt = result.get("result", "اطلاعاتی یافت نشد.") |
| |
| if result.get("source_documents"): |
| source_info += "\n\n✅ **منابع:**\n" |
| |
| seen_sources = set() |
| for i, doc in enumerate(result["source_documents"]): |
| doc_repr = f"منبع {i+1}: " + doc.page_content[:150].replace('\n', ' ') + "..." |
| |
| if hasattr(doc, 'metadata') and doc.metadata: |
| doc_repr += f" (متا: {doc.metadata})" |
|
|
| if doc_repr not in seen_sources: |
| source_info += f"- {doc_repr}\n" |
| seen_sources.add(doc_repr) |
|
|
| |
| |
| |
| |
| |
| |
| final_prompt_to_llm = f"""با توجه به اطلاعات زیر: |
| {context_for_prompt} |
| به سوال زیر پاسخ بده: |
| {prompt} |
| """ |
|
|
| |
| elif active_plugin == "📋 گفتگو با دادههای شما (CSV)" and 'df' in st.session_state: |
| plugin_used = True |
| context_history = f"User: {st.session_state['past'][-1]}\nBot: {st.session_state['generated'][-1]}\n" |
| |
| if any(kw in prompt.lower() for kw in ['python', 'کد', 'پایتون', 'code']): |
| with st.spinner('🚀 در حال تولید کد پایتون با Sketch...'): |
| try: |
| solution = "\n```python\n" |
| |
| solution += st.session_state['df'].sketch.howto(prompt, call_display=False) |
| solution += "\n```\n\n" |
| |
| final_prompt_to_llm = prompt4Code(prompt, context_history, solution) |
| source_info = "\n\n✅ **کد تولید شده توسط Sketch:**\n" + solution |
| except Exception as e: |
| error_message = f"خطا در اجرای Sketch (howto): {e}" |
| final_prompt_to_llm = prompt |
| else: |
| with st.spinner('🚀 در حال تحلیل داده با Sketch...'): |
| try: |
| solution = st.session_state['df'].sketch.ask(prompt, call_display=False) |
| |
| final_prompt_to_llm = prompt4Data(prompt, context_history, solution) |
| source_info = "\n\n✅ **تحلیل Sketch:**\n" + str(solution) |
| except Exception as e: |
| error_message = f"خطا در اجرای Sketch (ask): {e}" |
| final_prompt_to_llm = prompt |
|
|
| |
| elif active_plugin == "🌐 جستجوی وب" and st.session_state.get('web_search') == 'True': |
| plugin_used = True |
| with st.spinner('🚀 در حال جستجو در اینترنت...'): |
| internet_result = "" |
| internet_answer = "" |
| source_urls = set() |
| try: |
| with DDGS() as ddgs: |
| |
| text_results = ddgs.text(prompt, |
| region=st.session_state.get('region', 'wt-wt'), |
| safesearch=st.session_state.get('safesearch', 'moderate'), |
| timelimit=st.session_state.get('timelimit', 'w'), |
| max_results=st.session_state.get('max_results', 3)) |
| for r in text_results: |
| internet_result += f"**{r.get('title', '')}**: {r.get('body', '')}\n[Link]({r.get('href', '')})\n\n" |
| if r.get('href'): source_urls.add(r.get('href')) |
|
|
| |
| answer_results = ddgs.answers(prompt) |
| for r in answer_results: |
| internet_answer += f"**پاسخ سریع**: {r.get('text', '')}\n" |
| if r.get('url'): source_urls.add(r.get('url')) |
|
|
| context_history = f"User: {st.session_state['past'][-1]}\nBot: {st.session_state['generated'][-1]}\n" |
| |
| final_prompt_to_llm = prompt4conversationInternet(prompt, context_history, internet_result, internet_answer) |
| if source_urls: |
| source_info = "\n\n✅ **منابع وب:**\n" + "\n".join([f"- {url}" for url in source_urls]) |
|
|
| except Exception as e: |
| error_message = f"خطا در جستجوی وب (DDGS): {e}" |
| final_prompt_to_llm = prompt |
|
|
| |
| else: |
| |
| |
| history_context = "" |
| num_turns = 2 |
| if 'past' in st.session_state and 'generated' in st.session_state: |
| start_index = max(0, len(st.session_state['past']) - num_turns) |
| for i in range(start_index, len(st.session_state['past'])): |
| history_context += f"User: {st.session_state['past'][i]}\nBot: {st.session_state['generated'][i]}\n" |
|
|
| |
| final_prompt_to_llm = prompt4conversation(prompt, history_context) |
|
|
|
|
| |
| if not error_message: |
| with st.spinner('🚀 ربات در حال فکر کردن است...'): |
| if 'chatbot' in st.session_state: |
| |
| if not final_prompt_to_llm: |
| final_prompt_to_llm = prompt |
|
|
| print(f"--- پرامپت نهایی به LLM ---\n{final_prompt_to_llm}\n--------------------------") |
|
|
| response_from_llm = st.session_state['chatbot'].chat( |
| final_prompt_to_llm, |
| temperature=temperature, |
| top_p=top_p, |
| repetition_penalty=repetition_penalty, |
| top_k=top_k, |
| max_new_tokens=max_new_tokens |
| ) |
| response_to_display = response_from_llm + source_info |
| else: |
| error_message = "Chatbot مقداردهی اولیه نشده است." |
| else: |
| |
| response_to_display = f"⚠️ **خطا در پردازش پلاگین:** {error_message}" |
|
|
| except Exception as e: |
| st.error(f"خطای کلی در تابع generate_response: {e}") |
| response_to_display = f"😔 متاسفانه خطایی رخ داد: {e}" |
|
|
|
|
| return response_to_display |
|
|
|
|
| |
| with response_container: |
| |
| if 'hf_email' not in st.session_state: |
| st.info("👋 سلام، از دیدن شما خوشحالیم 🤗") |
| st.warning("👈 لطفاً برای ادامه وارد شوید (از منوی بالا سمت چپ اقدام کنید) 🚀") |
| st.error("👈 اگر در Hugging Face ثبتنام نکردهاید، لطفاً ابتدا ثبتنام کرده و سپس وارد شوید 🤗") |
|
|
| else: |
| |
| if input_text: |
| |
| temp = st.session_state.get('temperature', 0.5) |
| top_p_val = st.session_state.get('top_p', 0.95) |
| rep_penalty = st.session_state.get('repetition_penalty', 1.2) |
| top_k_val = st.session_state.get('top_k', 50) |
| max_tokens = st.session_state.get('max_new_tokens', 1024) |
|
|
| |
| response = generate_response(input_text, temp, top_p_val, rep_penalty, top_k_val, max_tokens) |
|
|
| |
| if 'past' not in st.session_state: st.session_state['past'] = [] |
| if 'generated' not in st.session_state: st.session_state['generated'] = [] |
| st.session_state.past.append(input_text) |
| st.session_state.generated.append(response) |
| st.experimental_rerun() |
|
|
| |
| if 'generated' in st.session_state and st.session_state['generated']: |
| for i in range(len(st.session_state['generated'])): |
| |
| with st.chat_message(name="user", avatar="🧑💻"): |
| st.markdown(st.session_state['past'][i]) |
|
|
| |
| with st.chat_message(name="assistant", avatar="🤖"): |
| full_response = st.session_state['generated'][i] |
| |
| parts = full_response.split("✅ **منابع:**", 1) |
| main_message = parts[0].strip() |
| source_part = "" |
| if len(parts) > 1: |
| source_part = "✅ **منابع:**" + parts[1] |
|
|
| |
| st.markdown(main_message) |
|
|
| |
| if source_part: |
| with st.expander(f"📚 مشاهده منابع پیام شماره {i+1}"): |
| st.markdown(source_part.replace("✅ **منابع:**", "").strip()) |
|
|
| |
| |