| | import gradio as gr |
| | from langchain_community.document_loaders import UnstructuredMarkdownLoader |
| | from langchain.text_splitter import RecursiveCharacterTextSplitter |
| | from langchain_core.documents import Document |
| | from langchain_huggingface import HuggingFaceEmbeddings, HuggingFaceEndpoint |
| | from langchain_community.vectorstores import FAISS |
| | from langchain.prompts import ChatPromptTemplate |
| | from dotenv import load_dotenv |
| | import os |
| | from datetime import datetime |
| | from skyfield.api import load |
| | import matplotlib.pyplot as plt |
| | from io import BytesIO |
| | from PIL import Image |
| |
|
| |
|
| | |
| | load_dotenv() |
| |
|
| | DATA_PATH = "" |
| | PROMPT_TEMPLATE = """ |
| | |
| | Ответь на вопрос, используя только следующий контекст: |
| | {context} |
| | --- |
| | Ответь на вопрос на основе приведенного контекста: {question} |
| | """ |
| |
|
| | |
| | status_message = "Инициализация..." |
| |
|
| | |
| | classification_ru = { |
| | 'Swallowed': 'проглоченная', |
| | 'Tiny': 'сверхмалая', |
| | 'Small': 'малая', |
| | 'Normal': 'нормальная', |
| | 'Ideal': 'идеальная', |
| | 'Big': 'большая' |
| | } |
| |
|
| | planet_ru = { |
| | 'Sun': 'Солнце', |
| | 'Moon': 'Луна', |
| | 'Mercury': 'Меркурий', |
| | 'Venus': 'Венера', |
| | 'Mars': 'Марс', |
| | 'Jupiter': 'Юпитер', |
| | 'Saturn': 'Сатурн' |
| | } |
| |
|
| | planet_symbols = { |
| | 'Sun': '☉', |
| | 'Moon': '☾', |
| | 'Mercury': '☿', |
| | 'Venus': '♀', |
| | 'Mars': '♂', |
| | 'Jupiter': '♃', |
| | 'Saturn': '♄' |
| | } |
| |
|
| | def initialize_vectorstore(): |
| | """Initialize the FAISS vector store for document retrieval.""" |
| | global status_message |
| | try: |
| | status_message = "Загрузка и обработка документов..." |
| | documents = load_documents() |
| | chunks = split_text(documents) |
| | |
| | status_message = "Создание векторной базы..." |
| | vectorstore = save_to_faiss(chunks) |
| | |
| | status_message = "База данных готова к использованию." |
| | return vectorstore |
| | except Exception as e: |
| | status_message = f"Ошибка инициализации: {str(e)}" |
| | raise |
| |
|
| | def load_documents(): |
| | """Load documents from the specified file path.""" |
| | file_path = os.path.join(DATA_PATH, "pl250320252.md") |
| | if not os.path.exists(file_path): |
| | raise FileNotFoundError(f"Файл {file_path} не найден") |
| | loader = UnstructuredMarkdownLoader(file_path) |
| | return loader.load() |
| |
|
| | def split_text(documents: list[Document]): |
| | """Split documents into chunks for vectorization.""" |
| | text_splitter = RecursiveCharacterTextSplitter( |
| | chunk_size=900, |
| | chunk_overlap=300, |
| | length_function=len, |
| | add_start_index=True, |
| | ) |
| | return text_splitter.split_documents(documents) |
| |
|
| | def save_to_faiss(chunks: list[Document]): |
| | """Save document chunks to a FAISS vector store.""" |
| | embeddings = HuggingFaceEmbeddings( |
| | model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2", |
| | model_kwargs={'device': 'cpu'}, |
| | encode_kwargs={'normalize_embeddings': True} |
| | ) |
| | return FAISS.from_documents(chunks, embeddings) |
| |
|
| | def process_query(query_text: str, vectorstore): |
| | """Process a query using the RAG system.""" |
| | if vectorstore is None: |
| | return "База данных не инициализирована", [] |
| | |
| | try: |
| | results = vectorstore.similarity_search_with_relevance_scores(query_text, k=3) |
| | global status_message |
| | status_message += f"\nНайдено {len(results)} результатов" |
| | |
| | if not results: |
| | return "Не найдено результатов.", [] |
| | |
| | context_text = "\n\n---\n\n".join([ |
| | f"Релевантность: {score:.2f}\n{doc.page_content}" |
| | for doc, score in results |
| | ]) |
| | |
| | prompt_template = ChatPromptTemplate.from_template(PROMPT_TEMPLATE) |
| | prompt = prompt_template.format(context=context_text, question=query_text) |
| | |
| | model = HuggingFaceEndpoint( |
| | endpoint_url="https://pflgm2locj2t89co.us-east-1.aws.endpoints.huggingface.cloud/", |
| | task="text2text-generation", |
| | |
| | model_kwargs={"temperature": 0.5, "max_length": 512} |
| | ) |
| | response_text = model.invoke(prompt) |
| | |
| | sources = list(set([doc.metadata.get("source", "") for doc, _ in results])) |
| | return response_text, sources |
| | except Exception as e: |
| | return f"Ошибка обработки запроса: {str(e)}", [] |
| |
|
| | def PLadder_ZSizes(date_time_iso: str): |
| | """ |
| | Calculate the planetary ladder and zone sizes for a given date and time. |
| | |
| | Args: |
| | date_time_iso (str): Date and time in ISO format (e.g., '2023-10-10T12:00:00') |
| | |
| | Returns: |
| | dict: Contains 'PLadder' (list of planets) and 'ZSizes' (list of zone sizes with classifications) |
| | or an error message if unsuccessful |
| | """ |
| | try: |
| | dt = datetime.fromisoformat(date_time_iso) |
| | if dt.year < 1900 or dt.year > 2050: |
| | return {"error": "Дата вне диапазона. Должна быть между 1900 и 2050 годами."} |
| | |
| | |
| | planets = load('de421.bsp') |
| | earth = planets['earth'] |
| | |
| | |
| | planet_objects = { |
| | 'Sun': planets['sun'], |
| | 'Moon': planets['moon'], |
| | 'Mercury': planets['mercury'], |
| | 'Venus': planets['venus'], |
| | 'Mars': planets['mars'], |
| | 'Jupiter': planets['jupiter barycenter'], |
| | 'Saturn': planets['saturn barycenter'] |
| | } |
| | |
| | |
| | ts = load.timescale() |
| | t = ts.utc(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second) |
| | |
| | |
| | longitudes = {} |
| | for planet in planet_objects: |
| | apparent = earth.at(t).observe(planet_objects[planet]).apparent() |
| | _, lon, _ = apparent.ecliptic_latlon() |
| | longitudes[planet] = lon.degrees |
| | |
| | |
| | sorted_planets = sorted(longitudes.items(), key=lambda x: x[1]) |
| | PLadder = [p for p, _ in sorted_planets] |
| | sorted_lons = [lon for _, lon in sorted_planets] |
| | |
| | |
| | zone_sizes = [sorted_lons[0]] + [sorted_lons[i+1] - sorted_lons[i] for i in range(6)] + [360 - sorted_lons[6]] |
| | |
| | |
| | bordering = [[PLadder[0]]] + [[PLadder[i-1], PLadder[i]] for i in range(1, 7)] + [[PLadder[6]]] |
| | |
| | |
| | ZSizes = [] |
| | for i, size in enumerate(zone_sizes): |
| | bord = bordering[i] |
| | if any(p in ['Sun', 'Moon'] for p in bord): |
| | X = 7 |
| | elif any(p in ['Mercury', 'Venus', 'Mars'] for p in bord): |
| | X = 6 |
| | else: |
| | X = 5 |
| | |
| | if size <= 1: |
| | classification = 'Swallowed' |
| | elif size <= X: |
| | classification = 'Tiny' |
| | elif size <= 40: |
| | classification = 'Small' |
| | elif size < 60: |
| | if 50 <= size <= 52: |
| | classification = 'Ideal' |
| | else: |
| | classification = 'Normal' |
| | else: |
| | classification = 'Big' |
| | |
| | |
| | d = int(size) |
| | m = int((size - d) * 60) |
| | size_str = f"{d}°{m}'" |
| | ZSizes.append((size_str, classification)) |
| | |
| | return {'PLadder': PLadder, 'ZSizes': ZSizes} |
| | |
| | except ValueError: |
| | return {"error": "Неверный формат даты и времени. Используйте ISO формат, например, '2023-10-10T12:00:00'"} |
| | except Exception as e: |
| | return {"error": f"Ошибка при вычислении: {str(e)}"} |
| |
|
| | def plot_pladder(PLadder): |
| | """ |
| | Plot the planetary ladder as a right triangle with planet symbols. |
| | |
| | Args: |
| | PLadder (list): List of planet names in order |
| | |
| | Returns: |
| | matplotlib.figure.Figure: The generated plot |
| | """ |
| | fig, ax = plt.subplots() |
| | |
| | ax.plot([0, 0, 3, 0], [0, 3, 0, 0], 'k-') |
| | |
| | ax.plot([0, 3], [1, 1], 'k--') |
| | ax.plot([0, 3], [2, 2], 'k--') |
| | |
| | positions = [(0, 0), (0, 1), (0, 2), (0, 3), (1, 2), (2, 1), (3, 0)] |
| | for i, pos in enumerate(positions): |
| | symbol = planet_symbols[PLadder[i]] |
| | ax.text(pos[0], pos[1], symbol, ha='center', va='center', fontsize=12) |
| | ax.set_xlim(-0.5, 3.5) |
| | ax.set_ylim(-0.5, 3.5) |
| | ax.set_aspect('equal') |
| | ax.axis('off') |
| | return fig |
| |
|
| | def chat_interface(query_text): |
| | """ |
| | Handle user queries, either for planetary ladder or general RAG questions. |
| | |
| | Args: |
| | query_text (str): User's input query |
| | |
| | Returns: |
| | tuple: (text response, plot figure or None) |
| | """ |
| | global status_message |
| | try: |
| | vectorstore = initialize_vectorstore() |
| | |
| | if query_text.startswith("PLadder "): |
| | |
| | date_time_iso = query_text.split(" ", 1)[1] |
| | result = PLadder_ZSizes(date_time_iso) |
| | |
| | if "error" in result: |
| | return result["error"], None |
| | |
| | PLadder = result["PLadder"] |
| | ZSizes = result["ZSizes"] |
| | |
| | |
| | PLadder_ru = [planet_ru[p] for p in PLadder] |
| | ZSizes_ru = [(size_str, classification_ru[classification]) for size_str, classification in ZSizes] |
| | |
| | |
| | responses = [] |
| | for i in range(7): |
| | planet = PLadder_ru[i] |
| | size_str, class_ru = ZSizes_ru[i] |
| | query = f"Что значит {planet} на {i+1}-й ступени и {size_str} {class_ru} {i+1}-я зона?" |
| | response, _ = process_query(query, vectorstore) |
| | responses.append(f"Интерпретация для {i+1}-й ступени и {i+1}-й зоны: {response}") |
| | |
| | |
| | size_str, class_ru = ZSizes_ru[7] |
| | query = f"Что значит {size_str} {class_ru} восьмая зона?" |
| | response, _ = process_query(query, vectorstore) |
| | responses.append(f"Интерпретация для 8-й зоны: {response}") |
| | |
| | |
| | fig = plot_pladder(PLadder) |
| | buf = BytesIO() |
| | fig.savefig(buf, format='png') |
| | buf.seek(0) |
| | img = Image.open(buf) |
| | plt.close(fig) |
| | return text, img |
| | |
| | |
| | text = "Планетарная лестница: " + ", ".join(PLadder_ru) + "\n" |
| | text += "Размеры зон:\n" + "\n".join([f"Зона {i+1}: {size_str} {class_ru}" |
| | for i, (size_str, class_ru) in enumerate(ZSizes_ru)]) + "\n\n" |
| | text += "\n".join(responses) |
| | return text, fig |
| | |
| | else: |
| | |
| | response, sources = process_query(query_text, vectorstore) |
| | full_response = f"{status_message}\n\nОтвет: {response}\n\nИсточники: {', '.join(sources) if sources else 'Нет источников'}" |
| | return full_response, None |
| | |
| | except Exception as e: |
| | return f"Критическая ошибка: {str(e)}", None |
| |
|
| | |
| | interface = gr.Interface( |
| | fn=chat_interface, |
| | inputs=gr.Textbox(lines=2, placeholder="Введите ваш вопрос здесь..."), |
| | outputs=[gr.Textbox(), gr.Image()], |
| | title="Чат с документами", |
| | description="Задайте вопрос, и я отвечу на основе загруженных документов. " |
| | "Для запроса планетарной лестницы используйте формат: PLadder YYYY-MM-DDTHH:MM:SS" |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | interface.launch() |