|
|
import os |
|
|
import uuid |
|
|
import tempfile |
|
|
from typing import List, Optional, Dict, Any |
|
|
from pathlib import Path |
|
|
import PyPDF2 |
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
|
from langchain_openai import OpenAIEmbeddings, ChatOpenAI |
|
|
from langchain_community.vectorstores import Chroma |
|
|
from langchain.chains import RetrievalQA |
|
|
from langchain_community.document_loaders import PyPDFLoader |
|
|
from langchain.schema import Document |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
from datetime import datetime |
|
|
import json |
|
|
import base64 |
|
|
from openai import OpenAI |
|
|
import re |
|
|
from semantic_chunking import SemanticChunker |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
class AlternativeEmbeddings: |
|
|
"""Alternative embeddings using Sentence Transformers when OpenAI is not available""" |
|
|
|
|
|
def __init__(self): |
|
|
self.model = None |
|
|
self.embedding_size = 384 |
|
|
|
|
|
try: |
|
|
from sentence_transformers import SentenceTransformer |
|
|
|
|
|
|
|
|
model_options = [ |
|
|
("all-MiniLM-L6-v2", 384), |
|
|
("paraphrase-MiniLM-L3-v2", 384), |
|
|
("BAAI/bge-small-en-v1.5", 384) |
|
|
] |
|
|
|
|
|
for model_name, embed_size in model_options: |
|
|
try: |
|
|
print(f"π Trying to load model: {model_name}") |
|
|
self.model = SentenceTransformer(model_name) |
|
|
self.embedding_size = embed_size |
|
|
print(f"β
Successfully loaded: {model_name}") |
|
|
break |
|
|
except Exception as e: |
|
|
print(f"β οΈ Failed to load {model_name}: {str(e)}") |
|
|
continue |
|
|
|
|
|
if not self.model: |
|
|
raise Exception("All embedding models failed to load") |
|
|
|
|
|
except ImportError: |
|
|
print("β sentence-transformers not available. Please install it or provide OpenAI API key.") |
|
|
raise ImportError("sentence-transformers not available") |
|
|
|
|
|
def embed_documents(self, texts): |
|
|
if not self.model: |
|
|
raise Exception("No embedding model available") |
|
|
try: |
|
|
return self.model.encode(texts, convert_to_numpy=True).tolist() |
|
|
except Exception as e: |
|
|
print(f"Error encoding documents: {e}") |
|
|
raise |
|
|
|
|
|
def embed_query(self, text): |
|
|
if not self.model: |
|
|
raise Exception("No embedding model available") |
|
|
try: |
|
|
return self.model.encode([text], convert_to_numpy=True)[0].tolist() |
|
|
except Exception as e: |
|
|
print(f"Error encoding query: {e}") |
|
|
raise |
|
|
|
|
|
class SEALionLLM: |
|
|
"""Custom LLM class for SEA-LION models""" |
|
|
|
|
|
def __init__(self): |
|
|
self.client = OpenAI( |
|
|
api_key=os.getenv("SEA_LION_API_KEY"), |
|
|
base_url=os.getenv("SEA_LION_BASE_URL", "https://api.sea-lion.ai/v1") |
|
|
) |
|
|
|
|
|
|
|
|
self.instruct_model = "aisingapore/Gemma-SEA-LION-v3-9B-IT" |
|
|
self.reasoning_model = "aisingapore/Llama-SEA-LION-v3.5-8B-R" |
|
|
|
|
|
def _is_complex_query(self, query: str) -> bool: |
|
|
"""Determine if query requires reasoning model or simple instruct model""" |
|
|
|
|
|
complex_keywords = [ |
|
|
"university", "admission", "requirement", "tuition", "fee", "program", "course", |
|
|
"degree", "master", "bachelor", "phd", "scholarship", "deadline", "application", |
|
|
"budget", "under", "less than", "below", "compare", "recommend", "suggest", |
|
|
"which", "what are the", "show me", "find me", "search for", |
|
|
|
|
|
"ε€§ε¦", "ε¦θ΄Ή", "δΈδΈ", "η‘士", "ε¦ε£«", "ε士", "η³θ―·", "θ¦ζ±", "ε₯ε¦ι", |
|
|
|
|
|
"universiti", "yuran", "program", "ijazah", "syarat", "permohonan", |
|
|
|
|
|
"ΰΈ‘ΰΈ«ΰΈ²ΰΈ§ΰΈ΄ΰΈΰΈ’ΰΈ²ΰΈ₯ΰΈ±ΰΈ’", "ΰΈΰΉΰΈ²ΰΉΰΈ₯ΰΉΰΈ²ΰΉΰΈ£ΰΈ΅ΰΈ’ΰΈ", "ΰΈ«ΰΈ₯ΰΈ±ΰΈΰΈͺΰΈΉΰΈΰΈ£", "ΰΈΰΈ£ΰΈ΄ΰΈΰΈΰΈ²", "ΰΉΰΈΰΈ·ΰΉΰΈΰΈΰΉΰΈ", |
|
|
|
|
|
"universitas", "biaya", "kuliah", "program", "sarjana", "persyaratan" |
|
|
] |
|
|
|
|
|
|
|
|
criteria_count = 0 |
|
|
query_lower = query.lower() |
|
|
|
|
|
for keyword in complex_keywords: |
|
|
if keyword.lower() in query_lower: |
|
|
criteria_count += 1 |
|
|
|
|
|
|
|
|
comparison_patterns = [ |
|
|
r"under \$?\d+", r"less than \$?\d+", r"below \$?\d+", r"between \$?\d+ and \$?\d+", |
|
|
r"δΈθΆ
θΏ.*ε
", r"δ½δΊ.*ε
", r"ε°δΊ.*ε
", |
|
|
r"kurang dari", r"di bawah", |
|
|
r"ΰΈΰΉΰΈΰΈ’ΰΈΰΈ§ΰΉΰΈ²", r"ΰΈΰΉΰΈ³ΰΈΰΈ§ΰΉΰΈ²" |
|
|
] |
|
|
|
|
|
for pattern in comparison_patterns: |
|
|
if re.search(pattern, query_lower): |
|
|
criteria_count += 2 |
|
|
|
|
|
|
|
|
return criteria_count >= 2 |
|
|
|
|
|
def _is_translation_query(self, query: str) -> bool: |
|
|
"""Check if query is primarily for translation""" |
|
|
translation_keywords = [ |
|
|
"translate", "translation", "ΰΉΰΈΰΈ₯", "ηΏ»θ―", "terjemah", "traduire" |
|
|
] |
|
|
|
|
|
query_lower = query.lower() |
|
|
return any(keyword in query_lower for keyword in translation_keywords) |
|
|
|
|
|
def generate_response(self, query: str, context: str = "", language: str = "English") -> str: |
|
|
"""Generate response using appropriate SEA-LION model""" |
|
|
|
|
|
|
|
|
if self._is_translation_query(query) or not self._is_complex_query(query): |
|
|
model = self.instruct_model |
|
|
use_reasoning = False |
|
|
else: |
|
|
model = self.reasoning_model |
|
|
use_reasoning = True |
|
|
|
|
|
|
|
|
system_prompt = f"""You are a helpful assistant specializing in ASEAN university admissions. |
|
|
Respond in {language} unless specifically asked otherwise. |
|
|
|
|
|
If provided with context from university documents, use that information to give accurate, specific answers. |
|
|
Always cite your sources when using provided context. |
|
|
|
|
|
For complex university search queries, provide: |
|
|
1. Direct answers to the question |
|
|
2. Relevant admission requirements |
|
|
3. Tuition fees (if available) |
|
|
4. Application deadlines (if available) |
|
|
5. Source citations from the documents |
|
|
|
|
|
Context: {context}""" |
|
|
|
|
|
messages = [ |
|
|
{"role": "system", "content": system_prompt}, |
|
|
{"role": "user", "content": query} |
|
|
] |
|
|
|
|
|
try: |
|
|
if use_reasoning: |
|
|
|
|
|
response = self.client.chat.completions.create( |
|
|
model=model, |
|
|
messages=messages, |
|
|
max_tokens=2000, |
|
|
temperature=0.1, |
|
|
extra_body={"thinking_mode": True} |
|
|
) |
|
|
else: |
|
|
|
|
|
response = self.client.chat.completions.create( |
|
|
model=model, |
|
|
messages=messages, |
|
|
max_tokens=1500, |
|
|
temperature=0.3 |
|
|
) |
|
|
|
|
|
|
|
|
response_text = response.choices[0].message.content |
|
|
if "</think>" in response_text: |
|
|
response_text = response_text.split("</think>")[-1].strip() |
|
|
|
|
|
return response_text |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error with SEA-LION model: {str(e)}") |
|
|
return f"I apologize, but I encountered an error processing your query. Please try rephrasing your question. Error: {str(e)}" |
|
|
|
|
|
def extract_metadata(self, document_text: str) -> Dict[str, str]: |
|
|
"""Extract metadata from document text using LLM""" |
|
|
|
|
|
system_prompt = """You are an expert at extracting metadata from university documents. |
|
|
Analyze the provided document text and extract the following information: |
|
|
|
|
|
1. University name (full official name) |
|
|
2. Country (where the university is located) |
|
|
3. Document type (choose from: admission_requirements, tuition_fees, program_information, scholarship_info, application_deadlines, general_info) |
|
|
4. Language (choose from: English, Chinese, Malay, Thai, Indonesian, Vietnamese, Filipino) |
|
|
|
|
|
Return your response as a JSON object with these exact keys: |
|
|
{ |
|
|
"university_name": "extracted university name or \'Unknown\' if not found", |
|
|
"country": "extracted country or \'Unknown\' if not found", |
|
|
"document_type": "most appropriate document type from the list above", |
|
|
"language": "detected language of the document" |
|
|
} |
|
|
|
|
|
Guidelines: |
|
|
- For university_name: Look for official university names, avoid abbreviations when possible |
|
|
- For country: Look for country names, city names that indicate country, or domain extensions |
|
|
- For document_type: Analyze the content to determine what type of information it contains |
|
|
- For language: Determine the primary language of the document. |
|
|
- If information is unclear, use "Unknown" for university_name and country |
|
|
- Always choose one of the specified document_type options and language options |
|
|
""" |
|
|
|
|
|
messages = [ |
|
|
{"role": "system", "content": system_prompt}, |
|
|
{"role": "user", "content": f"Extract metadata from this document text:\n\n{document_text}"} |
|
|
] |
|
|
|
|
|
try: |
|
|
response = self.client.chat.completions.create( |
|
|
model=self.instruct_model, |
|
|
messages=messages, |
|
|
max_tokens=500, |
|
|
temperature=0.1 |
|
|
) |
|
|
|
|
|
response_text = response.choices[0].message.content.strip() |
|
|
print("--- DEBUG: LLM Metadata Extraction Details ---") |
|
|
print(f"**Input Text for LLM (first 2 pages):**\n```\n{document_text[:1000]}...\n```") |
|
|
print(f"**Raw LLM Response:**\n```json\n{response_text}\n```") |
|
|
|
|
|
json_match = re.search(r'\{.*?\}', response_text, re.DOTALL) |
|
|
if json_match: |
|
|
json_str = json_match.group(0) |
|
|
try: |
|
|
metadata = json.loads(json_str) |
|
|
print(f"**Parsed JSON Metadata:**\n```json\n{json.dumps(metadata, indent=2)}\n```") |
|
|
required_keys = ["university_name", "country", "document_type", "language"] |
|
|
if all(key in metadata for key in required_keys): |
|
|
print("DEBUG: Successfully extracted and parsed metadata from LLM.") |
|
|
return metadata |
|
|
else: |
|
|
print("DEBUG: LLM response missing required keys, attempting fallback or using defaults.") |
|
|
return self._get_default_metadata() |
|
|
except json.JSONDecodeError as e: |
|
|
print(f"DEBUG: JSON Parsing Failed: {e}") |
|
|
print(f"DEBUG: Attempting fallback text extraction from raw response.") |
|
|
return self._extract_from_text_response(response_text) |
|
|
else: |
|
|
print("DEBUG: No JSON object found in LLM response.") |
|
|
return self._extract_from_text_response(response_text) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"DEBUG: Error during LLM Metadata Extraction: {str(e)}") |
|
|
return self._get_default_metadata() |
|
|
|
|
|
def _extract_from_text_response(self, response_text: str) -> Dict[str, str]: |
|
|
"""Fallback method to extract metadata from non-JSON LLM response""" |
|
|
metadata = self._get_default_metadata() |
|
|
lines = response_text.split("\n") |
|
|
for line in lines: |
|
|
line = line.strip() |
|
|
if "university" in line.lower() and ":" in line: |
|
|
value = line.split(":", 1)[1].strip().strip('",') |
|
|
metadata["university_name"] = value |
|
|
elif "country" in line.lower() and ":" in line: |
|
|
value = line.split(":", 1)[1].strip().strip('",') |
|
|
metadata["country"] = value |
|
|
elif "document_type" in line.lower() and ":" in line: |
|
|
value = line.split(":", 1)[1].strip().strip('",') |
|
|
metadata["document_type"] = value |
|
|
elif "language" in line.lower() and ":" in line: |
|
|
value = line.split(":", 1)[1].strip().strip('",') |
|
|
metadata["language"] = value |
|
|
print(f"DEBUG: Fallback text extraction result: {metadata}") |
|
|
return metadata |
|
|
|
|
|
def _get_default_metadata(self) -> Dict[str, str]: |
|
|
"""Return default metadata when extraction fails""" |
|
|
return { |
|
|
"university_name": "Unknown", |
|
|
"country": "Unknown", |
|
|
"document_type": "general_info", |
|
|
"language": "Unknown" |
|
|
} |
|
|
|
|
|
def classify_query_type(query: str) -> str: |
|
|
"""Public function to classify query type for UI display""" |
|
|
|
|
|
temp_llm = SEALionLLM() |
|
|
|
|
|
if temp_llm._is_translation_query(query) or not temp_llm._is_complex_query(query): |
|
|
return "simple" |
|
|
else: |
|
|
return "complex" |
|
|
|
|
|
class DocumentIngestion: |
|
|
def __init__(self): |
|
|
|
|
|
self.sea_lion_llm = SEALionLLM() |
|
|
|
|
|
|
|
|
try: |
|
|
self.embeddings = AlternativeEmbeddings() |
|
|
self.embedding_type = "BGE-small-en" |
|
|
if not self.embeddings.model: |
|
|
raise Exception("BGE model not available") |
|
|
except Exception: |
|
|
|
|
|
openai_key = os.getenv("OPENAI_API_KEY") |
|
|
if openai_key and openai_key != "placeholder_for_embeddings" and openai_key != "your_openai_api_key_here": |
|
|
try: |
|
|
self.embeddings = OpenAIEmbeddings() |
|
|
self.embedding_type = "OpenAI" |
|
|
except Exception as e: |
|
|
print("Both BGE and OpenAI embeddings failed. Please check your setup.") |
|
|
raise e |
|
|
else: |
|
|
print("No embedding model available. Please install sentence-transformers or provide OpenAI API key.") |
|
|
raise Exception("No embedding model available") |
|
|
|
|
|
self.text_splitter = SemanticChunker( |
|
|
embeddings_model=self.embeddings, |
|
|
chunk_size=4, |
|
|
overlap=1, |
|
|
similarity_threshold=0.75, |
|
|
min_chunk_size=150, |
|
|
max_chunk_size=1500, |
|
|
debug=True |
|
|
) |
|
|
|
|
|
|
|
|
self.persist_directory = os.getenv("CHROMA_PERSIST_DIRECTORY", "./chroma_db") |
|
|
os.makedirs(self.persist_directory, exist_ok=True) |
|
|
|
|
|
def extract_text_from_pdf(self, pdf_file_path) -> List[str]: |
|
|
"""Extract text from PDF file path with multiple fallback methods.""" |
|
|
try: |
|
|
|
|
|
with open(pdf_file_path, 'rb') as pdf_file: |
|
|
pdf_reader = PyPDF2.PdfReader(pdf_file) |
|
|
|
|
|
|
|
|
if pdf_reader.is_encrypted: |
|
|
|
|
|
try: |
|
|
pdf_reader.decrypt("") |
|
|
except Exception: |
|
|
print(f"PDF {os.path.basename(pdf_file_path)} is password-protected. Please provide an unprotected version.") |
|
|
return [] |
|
|
|
|
|
text_per_page = [] |
|
|
for page_num, page in enumerate(pdf_reader.pages): |
|
|
try: |
|
|
page_text = page.extract_text() |
|
|
text_per_page.append(page_text) |
|
|
except Exception as e: |
|
|
print(f"Could not extract text from page {page_num + 1} of {os.path.basename(pdf_file_path)}: {str(e)}") |
|
|
text_per_page.append("") |
|
|
|
|
|
if any(text.strip() for text in text_per_page): |
|
|
return text_per_page |
|
|
else: |
|
|
print(f"No extractable text found in {os.path.basename(pdf_file_path)}. This might be a scanned PDF or image-based document.") |
|
|
return [] |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = str(e) |
|
|
if "PyCryptodome" in error_msg: |
|
|
print(f"Encryption error with {os.path.basename(pdf_file_path)}: {error_msg}") |
|
|
print("π‘ The PDF uses encryption. PyCryptodome has been installed to handle this.") |
|
|
elif "password" in error_msg.lower(): |
|
|
print(f"Password-protected PDF: {os.path.basename(pdf_file_path)}") |
|
|
print("π‘ Please provide an unprotected version of this PDF.") |
|
|
else: |
|
|
print(f"Error extracting text from {os.path.basename(pdf_file_path)}: {error_msg}") |
|
|
return [] |
|
|
|
|
|
def process_documents(self, pdf_file_paths) -> List[Document]: |
|
|
"""Process PDF file paths and convert to documents with automatic metadata extraction.""" |
|
|
documents = [] |
|
|
processed_count = 0 |
|
|
failed_count = 0 |
|
|
|
|
|
print(f"π Processing {len(pdf_file_paths)} document(s) with automatic metadata detection...") |
|
|
|
|
|
for pdf_file_path in pdf_file_paths: |
|
|
if pdf_file_path.endswith('.pdf'): |
|
|
filename = os.path.basename(pdf_file_path) |
|
|
print(f"π Extracting text from: **{filename}**") |
|
|
|
|
|
|
|
|
text_per_page = self.extract_text_from_pdf(pdf_file_path) |
|
|
print(f"DEBUG: Extracted {len(text_per_page)} pages from {filename}") |
|
|
|
|
|
if text_per_page: |
|
|
|
|
|
text_for_metadata = "\n".join(text_per_page[:2]) |
|
|
print(f"DEBUG: Text for metadata extraction (first 500 chars): {text_for_metadata[:500]}") |
|
|
|
|
|
print(f"π€ Detecting metadata for: **{filename}**") |
|
|
extracted_metadata = self.sea_lion_llm.extract_metadata(text_for_metadata) |
|
|
|
|
|
|
|
|
metadata = { |
|
|
"source": filename, |
|
|
"university": extracted_metadata.get("university_name", "Unknown"), |
|
|
"country": extracted_metadata.get("country", "Unknown"), |
|
|
"document_type": extracted_metadata.get("document_type", "general_info"), |
|
|
"language": extracted_metadata.get("language", "Unknown"), |
|
|
"upload_timestamp": datetime.now().isoformat(), |
|
|
"file_id": str(uuid.uuid4()) |
|
|
} |
|
|
|
|
|
|
|
|
doc = Document( |
|
|
page_content="\n".join(text_per_page), |
|
|
metadata=metadata |
|
|
) |
|
|
documents.append(doc) |
|
|
processed_count += 1 |
|
|
print(f"β
Successfully processed: **{filename}** ({len(doc.page_content)} characters)") |
|
|
else: |
|
|
failed_count += 1 |
|
|
print(f"β οΈ Could not extract text from **{filename}**") |
|
|
else: |
|
|
failed_count += 1 |
|
|
filename = os.path.basename(pdf_file_path) |
|
|
print(f"β Unsupported file type for {filename} (expected .pdf)") |
|
|
|
|
|
|
|
|
if processed_count > 0: |
|
|
print(f"π Successfully processed **{processed_count}** document(s)") |
|
|
if failed_count > 0: |
|
|
print(f"β οΈ Failed to process **{failed_count}** document(s)") |
|
|
|
|
|
return documents |
|
|
|
|
|
def create_vector_store(self, documents: List[Document]) -> Chroma: |
|
|
"""Create and persist vector store from documents.""" |
|
|
if not documents: |
|
|
print("No documents to process") |
|
|
return None |
|
|
|
|
|
|
|
|
texts = self.text_splitter.split_documents(documents) |
|
|
|
|
|
|
|
|
vectorstore = Chroma.from_documents( |
|
|
documents=texts, |
|
|
embedding=self.embeddings, |
|
|
persist_directory=self.persist_directory |
|
|
) |
|
|
|
|
|
return vectorstore |
|
|
|
|
|
def load_existing_vectorstore(self) -> Optional[Chroma]: |
|
|
"""Load existing vector store if it exists.""" |
|
|
try: |
|
|
vectorstore = Chroma( |
|
|
persist_directory=self.persist_directory, |
|
|
embedding_function=self.embeddings |
|
|
) |
|
|
return vectorstore |
|
|
except Exception as e: |
|
|
print(f"Could not load existing vector store: {str(e)}") |
|
|
return None |
|
|
|
|
|
class RAGSystem: |
|
|
def __init__(self): |
|
|
|
|
|
try: |
|
|
self.embeddings = AlternativeEmbeddings() |
|
|
if not self.embeddings.model: |
|
|
|
|
|
self.embeddings = OpenAIEmbeddings() |
|
|
except Exception: |
|
|
|
|
|
self.embeddings = OpenAIEmbeddings() |
|
|
|
|
|
self.sea_lion_llm = SEALionLLM() |
|
|
self.persist_directory = os.getenv("CHROMA_PERSIST_DIRECTORY", "./chroma_db") |
|
|
|
|
|
def get_vectorstore(self) -> Optional[Chroma]: |
|
|
"""Get the vector store.""" |
|
|
try: |
|
|
vectorstore = Chroma( |
|
|
persist_directory=self.persist_directory, |
|
|
embedding_function=self.embeddings |
|
|
) |
|
|
return vectorstore |
|
|
except Exception as e: |
|
|
print(f"Error loading vector store: {str(e)}") |
|
|
return None |
|
|
|
|
|
def query(self, question: str, language: str = "English") -> Dict[str, Any]: |
|
|
"""Query the RAG system using SEA-LION models.""" |
|
|
vectorstore = self.get_vectorstore() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
retriever = vectorstore.as_retriever(search_kwargs={"k": 5}) |
|
|
relevant_docs = retriever.get_relevant_documents(question) |
|
|
|
|
|
|
|
|
context_parts = [] |
|
|
for i, doc in enumerate(relevant_docs, 1): |
|
|
source_info = doc.metadata.get('source', 'Unknown') |
|
|
university = doc.metadata.get('university', 'Unknown') |
|
|
country = doc.metadata.get('country', 'Unknown') |
|
|
|
|
|
context_parts.append(f""" |
|
|
Document {i} (Source: {source_info}, University: {university}, Country: {country}): |
|
|
{doc.page_content[:500]}... |
|
|
""") |
|
|
|
|
|
context = "\n".join(context_parts) |
|
|
|
|
|
|
|
|
answer = self.sea_lion_llm.generate_response( |
|
|
query=question, |
|
|
context=context, |
|
|
language=language |
|
|
) |
|
|
|
|
|
|
|
|
query_id = str(uuid.uuid4()) |
|
|
|
|
|
return { |
|
|
"answer": answer, |
|
|
"source_documents": relevant_docs, |
|
|
"query_id": query_id, |
|
|
"original_question": question, |
|
|
"language": language, |
|
|
"model_used": "SEA-LION" + (" Reasoning" if self.sea_lion_llm._is_complex_query(question) else " Instruct") |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error querying system: {str(e)}") |
|
|
return { |
|
|
"answer": f"Error processing your question: {str(e)}", |
|
|
"source_documents": [], |
|
|
"query_id": None |
|
|
} |
|
|
|
|
|
def save_query_result(query_result: Dict[str, Any]): |
|
|
"""Save query result for sharing.""" |
|
|
if query_result.get("query_id"): |
|
|
results_dir = "query_results" |
|
|
os.makedirs(results_dir, exist_ok=True) |
|
|
|
|
|
result_file = f"{results_dir}/{query_result['query_id']}.json" |
|
|
|
|
|
|
|
|
save_data = { |
|
|
"query_id": query_result["query_id"], |
|
|
"question": query_result.get("original_question", ""), |
|
|
"answer": query_result["answer"], |
|
|
"language": query_result.get("language", "English"), |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"sources": [ |
|
|
{ |
|
|
"source": doc.metadata.get("source", "Unknown"), |
|
|
"university": doc.metadata.get("university", "Unknown"), |
|
|
"country": doc.metadata.get("country", "Unknown"), |
|
|
"content_preview": doc.page_content[:200] + "..." if len(doc.page_content) > 200 else doc.page_content |
|
|
} |
|
|
for doc in query_result.get("source_documents", []) |
|
|
] |
|
|
} |
|
|
|
|
|
try: |
|
|
with open(result_file, 'w', encoding='utf-8') as f: |
|
|
json.dump(save_data, f, indent=2, ensure_ascii=False) |
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"Error saving query result: {str(e)}") |
|
|
return False |
|
|
return False |
|
|
|
|
|
def load_shared_query(query_id: str) -> Optional[Dict[str, Any]]: |
|
|
"""Load a shared query result.""" |
|
|
result_file = f"query_results/{query_id}.json" |
|
|
|
|
|
if os.path.exists(result_file): |
|
|
try: |
|
|
with open(result_file, 'r', encoding='utf-8') as f: |
|
|
return json.load(f) |
|
|
except Exception as e: |
|
|
print(f"Error loading shared query: {str(e)}") |
|
|
|
|
|
return None |
|
|
|