Spaces:
Runtime error
Runtime error
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| import os | |
| import tempfile | |
| from pathlib import Path | |
| import fitz # PyMuPDF for PDF processing | |
| from sentence_transformers import SentenceTransformer # HuggingFace's sentence transformer models | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from typing import List, Dict, Tuple, Any, Optional | |
| import json | |
| import re | |
| import requests | |
| from huggingface_hub import HfApi, Repository | |
| from huggingface_hub.utils import HfHubHTTPError | |
| from llama_index.core.vector_stores import SimpleVectorStore | |
| from llama_index.core.schema import TextNode | |
| # Add this to your existing imports at the top of the file | |
| from llama_index.core import VectorStoreIndex | |
| # Add to imports section at the top | |
| from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
| import os | |
| import re | |
| import tempfile | |
| import traceback | |
| import pandas as pd | |
| import fitz # PyMuPDF | |
| from pathlib import Path | |
| from llama_index.core import SimpleDirectoryReader | |
| from mistralai import Mistral | |
| # Add imports for DOI extraction | |
| try: | |
| from pdf2doi import pdf2doi | |
| PDF2DOI_AVAILABLE = True | |
| except ImportError: | |
| PDF2DOI_AVAILABLE = False | |
| MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY") | |
| class SentenceEmbeddingApp: | |
| """ | |
| Main application class for matching sentences using embeddings. | |
| This class handles: | |
| 1. Loading corpus and theory data | |
| 2. Transforming sentences into embeddings using Hugging Face models | |
| 3. Storing embeddings in LlamaIndex and persistent storage | |
| 4. Finding matches based on cosine similarity | |
| """ | |
| def __init__(self): | |
| """ | |
| Initialize application with default settings and create storage directories. | |
| """ | |
| # Model configuration | |
| self.model_name = "sentence-transformers/all-MiniLM-L6-v2" # HuggingFace model ID | |
| self.embedding_dim = 384 # Dimension size for the embeddings | |
| self.similarity_threshold = 0.75 # Default similarity threshold (0.0-1.0) | |
| # Initialize as None, will be populated later | |
| self.model = None # Will hold the sentence transformer model | |
| self.corpus_df = None # Will hold corpus sentences dataframe | |
| self.theory_df = None # Will hold theory sentences dataframe | |
| self.vector_store = None # Will hold LlamaIndex vector store | |
| self.index = None # Will hold LlamaIndex index | |
| # Create persistent storage directories for Hugging Face Spaces | |
| self.data_dir = Path("data") # Directory for raw data | |
| self.data_dir.mkdir(exist_ok=True) # Create if it doesn't exist | |
| self.embeddings_dir = Path("embeddings") # Directory for embeddings | |
| self.embeddings_dir.mkdir(exist_ok=True) # Create if it doesn't exist | |
| # Check if pdf2doi is available | |
| if not PDF2DOI_AVAILABLE: | |
| print("Note: pdf2doi library not installed. Using built-in DOI extraction.") | |
| def load_model(self): | |
| """ | |
| Load the sentence transformer model from Hugging Face Hub. | |
| The model is used to convert sentences into fixed-dimensional vectors. | |
| """ | |
| try: | |
| st.info(f"Loading sentence transformer model: {self.model_name}") | |
| # Load model directly from Hugging Face Hub | |
| # This automatically downloads the model if not cached | |
| self.model = SentenceTransformer(self.model_name) | |
| st.success(f"Model {self.model_name} loaded successfully") | |
| except Exception as e: | |
| st.error(f"Error loading model: {e}") | |
| def process_paragraph(self, paragraph, headers, sentences_data, pdf_title): | |
| """ | |
| Process a paragraph and extract sentences with their section headers | |
| Args: | |
| paragraph: The paragraph text to process | |
| headers: List of current headers | |
| sentences_data: List to append sentence data to | |
| pdf_title: Title of the PDF for source reference | |
| """ | |
| # Clean paragraph | |
| clean_paragraph = sanitize_text(paragraph.strip()) | |
| # Split into sentences | |
| sentences = re.split(r'(?<=[.!?])\s+', clean_paragraph) | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if not sentence: | |
| continue | |
| # Basic filtering - sentences must have at least 2 words | |
| if len(sentence.split()) >= 2: | |
| # Create header hierarchy string | |
| section_hierarchy = " > ".join([h for h in headers if h]) | |
| # Add sentence data | |
| sentences_data.append({ | |
| 'sentence': sentence, | |
| 'section_hierarchy': section_hierarchy if section_hierarchy else "No Section", | |
| 'source': pdf_title | |
| }) | |
| def extract_sections_and_sentences(self, markdown_text, pdf_title): | |
| """ | |
| Extract sections and sentences from markdown text | |
| Args: | |
| markdown_text: The markdown text converted from PDF | |
| pdf_title: Title of the PDF for source reference | |
| Returns: | |
| List of dictionaries containing sentences and their section hierarchies | |
| """ | |
| lines = markdown_text.split('\n') | |
| current_headers = [] | |
| sentences_data = [] | |
| current_paragraph = "" | |
| header_pattern = re.compile(r'^(#{1,6})\s+(.+)$') | |
| for line in lines: | |
| header_match = header_pattern.match(line) | |
| if header_match: | |
| # Process any accumulated paragraph | |
| if current_paragraph: | |
| self.process_paragraph(current_paragraph, current_headers, sentences_data, pdf_title) | |
| current_paragraph = "" | |
| # Update current headers | |
| level = len(header_match.group(1)) | |
| header_text = header_match.group(2).strip() | |
| # Truncate headers list if at a higher level | |
| current_headers = current_headers[:level-1] | |
| # Append new header | |
| if len(current_headers) < level: | |
| current_headers.append(header_text) | |
| else: | |
| current_headers[level-1] = header_text | |
| # Skip empty lines or formatting | |
| elif line.strip() and not line.strip().startswith('---') and not line.strip().startswith('***'): | |
| current_paragraph += " " + line.strip() | |
| # Process any remaining paragraph | |
| if current_paragraph: | |
| self.process_paragraph(current_paragraph, current_headers, sentences_data, pdf_title) | |
| return sentences_data | |
| def extract_sentences_from_pdf(self, pdf_file): | |
| """ | |
| Extract sentences from PDF file with metadata enriched from DOI. | |
| Uses Mistral OCR to convert PDF to markdown and extract section hierarchy. | |
| Args: | |
| pdf_file: Uploaded PDF file from Streamlit | |
| Returns: | |
| Tuple containing: | |
| - List of extracted sentences | |
| - List of metadata dictionaries for each sentence | |
| """ | |
| sentences = [] | |
| metadata_list = [] | |
| doc = None | |
| tmp_path = None | |
| try: | |
| # Create temporary file to process the PDF | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp: | |
| pdf_bytes = pdf_file.getvalue() | |
| tmp.write(pdf_bytes) | |
| tmp_path = tmp.name | |
| # First, try to extract DOI from the PDF | |
| st.info("Extracting DOI from PDF...") | |
| doi = extract_doi_from_pdf(pdf_bytes) | |
| # Initialize paper metadata | |
| paper_metadata = { | |
| "source": sanitize_text(pdf_file.name), # Original filename, sanitized | |
| "doi": doi | |
| } | |
| # If DOI was found, fetch additional metadata | |
| if doi: | |
| st.success(f"DOI found: {doi}") | |
| st.info("Retrieving metadata from Crossref API...") | |
| # Get metadata from Crossref API | |
| raw_metadata = get_metadata(doi) | |
| if (raw_metadata): | |
| structured_metadata = extract_structured_metadata(raw_metadata) | |
| # Update paper metadata with Crossref data - sanitize values for Excel | |
| paper_metadata.update({ | |
| "title": sanitize_for_excel(structured_metadata.get("title", "")), | |
| "authors": sanitize_for_excel(structured_metadata.get("author_names", "")), | |
| "published_date": sanitize_for_excel(structured_metadata.get("published_date", "")), | |
| "publisher": sanitize_for_excel(structured_metadata.get("publisher", "")), | |
| "container_title": sanitize_for_excel(structured_metadata.get("container_title", "")), | |
| "type": sanitize_for_excel(structured_metadata.get("type", "")), | |
| "citation_count": structured_metadata.get("citation_count", 0) | |
| }) | |
| # Display success message with paper title | |
| st.success(f"Metadata retrieved: {paper_metadata['title']}") | |
| else: | |
| st.warning("Could not retrieve metadata from Crossref.") | |
| else: | |
| st.warning("No DOI found in the PDF. Using basic metadata only.") | |
| # Convert PDF to markdown using Mistral OCR | |
| st.info("Converting PDF to markdown using Mistral OCR...") | |
| # Initialize Mistral OCR | |
| try: | |
| mistral_api_key = MISTRAL_API_KEY | |
| if not mistral_api_key: | |
| st.error("MISTRAL_API_KEY environment variable not set.") | |
| raise ValueError("MISTRAL_API_KEY environment variable not set.") | |
| client = Mistral(api_key=mistral_api_key) | |
| # Upload PDF to Mistral | |
| uploaded_pdf = client.files.upload( | |
| file={ | |
| "file_name": os.path.basename(tmp_path), | |
| "content": open(tmp_path, "rb"), | |
| }, | |
| purpose="ocr" | |
| ) | |
| # Get signed URL for OCR processing | |
| signed_url = client.files.get_signed_url(file_id=uploaded_pdf.id) | |
| # Process PDF with OCR | |
| ocr_response = client.ocr.process( | |
| model="mistral-ocr-latest", | |
| document={ | |
| "type": "document_url", | |
| "document_url": signed_url.url, | |
| } | |
| ) | |
| # Extract markdown from all pages of OCR response | |
| markdown_text = "" | |
| if hasattr(ocr_response, 'pages') and ocr_response.pages: | |
| for page in ocr_response.pages: | |
| if hasattr(page, 'markdown') and page.markdown: | |
| markdown_text += page.markdown + "\n\n" # Add spacing between pages | |
| markdown_text = markdown_text.strip() # Remove trailing whitespace | |
| if not markdown_text: | |
| st.warning("No content extracted from PDF using Mistral OCR. Falling back to PyMuPDF.") | |
| else: | |
| st.success(f"PDF successfully converted to markdown using Mistral OCR ({len(ocr_response.pages)} pages).") | |
| except Exception as e: | |
| st.warning(f"Error using Mistral OCR: {str(e)}. Falling back to PyMuPDF.") | |
| markdown_text = "" | |
| # If Mistral OCR succeeded, extract sections and sentences | |
| if markdown_text: | |
| # Extract sections and sentences from markdown | |
| sections_and_sentences = self.extract_sections_and_sentences(markdown_text, paper_metadata["title"]) | |
| # Process each sentence with its section hierarchy | |
| for sentence_data in sections_and_sentences: | |
| # Add sentence to list | |
| sentences.append(sentence_data["sentence"]) | |
| # Create metadata for this sentence | |
| sentence_metadata = paper_metadata.copy() # Start with paper metadata | |
| sentence_metadata["sentence_source"] = sentence_data["section_hierarchy"] # Use section hierarchy as source | |
| metadata_list.append(sentence_metadata) | |
| # If Mistral OCR failed or returned no content, fall back to PyMuPDF | |
| else: | |
| # Open PDF with PyMuPDF | |
| doc = fitz.open(tmp_path) | |
| # Process each page in the PDF | |
| for page_num, page in enumerate(doc): | |
| # Extract text from page | |
| text = page.get_text() | |
| # Clean text before splitting into sentences | |
| text = sanitize_text(text) | |
| # Simple sentence splitting by period | |
| raw_sentences = [s.strip() for s in text.replace('\n', ' ').split('.') if s.strip()] | |
| # Process each sentence | |
| for sentence in raw_sentences: | |
| # Basic filtering - sentences must have at least 2 words | |
| if len(sentence.split()) >= 2: | |
| # Final clean to ensure no problematic characters | |
| clean_sentence = sanitize_text(sentence) | |
| sentences.append(clean_sentence) | |
| # Create metadata for this sentence | |
| sentence_metadata = paper_metadata.copy() # Start with paper metadata | |
| sentence_metadata["page"] = page_num + 1 # Add page number (1-indexed) | |
| sentence_metadata["sentence_source"] = f"Page {page_num + 1}" # Use page number as fallback | |
| metadata_list.append(sentence_metadata) | |
| # Show summary | |
| st.success(f"Extracted {len(sentences)} sentences from PDF.") | |
| except Exception as e: | |
| st.error(f"Error processing PDF: {e}") | |
| st.error(traceback.format_exc()) | |
| finally: | |
| # Ensure document is closed before deleting file | |
| if doc: | |
| try: | |
| doc.close() | |
| except Exception as e: | |
| st.error(f"Error closing document: {e}") | |
| # Clean up temporary file | |
| if tmp_path and os.path.exists(tmp_path): | |
| try: | |
| os.unlink(tmp_path) | |
| except Exception as e: | |
| st.warning(f"Failed to remove temporary file: {e}") | |
| return sentences, metadata_list | |
| def load_corpus(self, uploaded_files): | |
| """ | |
| Load corpus data from multiple uploaded files. | |
| Each file's sentences are extracted and combined into one corpus. | |
| Args: | |
| uploaded_files: List of files uploaded through Streamlit interface | |
| """ | |
| try: | |
| # Initialize lists for DataFrame columns | |
| all_sentences = [] | |
| all_metadata = [] | |
| # Process each uploaded file | |
| for uploaded_file in uploaded_files: | |
| # Determine file type from extension | |
| file_extension = Path(uploaded_file.name).suffix.lower() | |
| # Process PDF files | |
| if file_extension == '.pdf': | |
| # Extract sentences and metadata using DOI | |
| sentences, metadata_list = self.extract_sentences_from_pdf(uploaded_file) | |
| # Add to combined lists | |
| all_sentences.extend(sentences) | |
| all_metadata.extend(metadata_list) | |
| st.success(f"Extracted {len(sentences)} sentences from PDF: {uploaded_file.name}") | |
| # Process CSV files | |
| elif file_extension == '.csv': | |
| # Read CSV file | |
| raw_df = pd.read_csv(uploaded_file) | |
| # Create lists for each column | |
| sentences = [] | |
| file_sources = [] | |
| dois = [] | |
| paper_titles = [] | |
| source_titles = [] # For journal/publication names | |
| authors_list = [] | |
| years = [] | |
| sentence_sources = [] | |
| # Process each row | |
| for i, row in raw_df.iterrows(): | |
| # Get paper details for metadata | |
| paper_title = row.get('Title', '') | |
| paper_doi = row.get('DOI', '') | |
| paper_authors = row.get('Authors', '') | |
| paper_year = row.get('Year', '') | |
| source_title = row.get('Source title', '') # Get source title (journal name) | |
| # Extract title if available | |
| if 'Title' in raw_df.columns and pd.notna(row['Title']): | |
| sentences.append(row['Title']) | |
| file_sources.append(uploaded_file.name) | |
| dois.append(paper_doi) | |
| paper_titles.append(paper_title) | |
| source_titles.append(source_title) | |
| authors_list.append(paper_authors) | |
| years.append(paper_year) | |
| sentence_sources.append('Title') | |
| # Extract abstract if available | |
| if 'Abstract' in raw_df.columns and pd.notna(row['Abstract']): | |
| # Split abstract into sentences (simple split by periods) | |
| abstract_sentences = [s.strip() for s in str(row['Abstract']).split('.') if s.strip()] | |
| for abs_sent in abstract_sentences: | |
| sentences.append(abs_sent) | |
| file_sources.append(uploaded_file.name) | |
| dois.append(paper_doi) | |
| paper_titles.append(paper_title) | |
| source_titles.append(source_title) | |
| authors_list.append(paper_authors) | |
| years.append(paper_year) | |
| sentence_sources.append('Abstract') | |
| # Extract author keywords if available | |
| if 'Author Keywords' in raw_df.columns and pd.notna(row['Author Keywords']): | |
| keywords = str(row['Author Keywords']).split(';') | |
| for keyword in keywords: | |
| if keyword.strip(): | |
| sentences.append(keyword.strip()) | |
| file_sources.append(uploaded_file.name) | |
| dois.append(paper_doi) | |
| paper_titles.append(paper_title) | |
| source_titles.append(source_title) | |
| authors_list.append(paper_authors) | |
| years.append(paper_year) | |
| sentence_sources.append('Author Keywords') | |
| # Extract index keywords if available | |
| if 'Index Keywords' in raw_df.columns and pd.notna(row['Index Keywords']): | |
| keywords = str(row['Index Keywords']).split(';') | |
| for keyword in keywords: | |
| if keyword.strip(): | |
| sentences.append(keyword.strip()) | |
| file_sources.append(uploaded_file.name) | |
| dois.append(paper_doi) | |
| paper_titles.append(paper_title) | |
| source_titles.append(source_title) | |
| authors_list.append(paper_authors) | |
| years.append(paper_year) | |
| sentence_sources.append('Index Keywords') | |
| # Add to combined lists | |
| all_sentences.extend(sentences) | |
| all_metadata.extend([{ | |
| 'doi': doi, | |
| 'title': title, | |
| 'container_title': source, | |
| 'authors': authors, | |
| 'published_date': str(year) if year else '', | |
| 'source': uploaded_file.name, | |
| 'sentence_source': source_type | |
| } for doi, title, source, authors, year, source_type in zip( | |
| dois, paper_titles, source_titles, authors_list, years, sentence_sources)]) | |
| # Create the corpus DataFrame with separate columns | |
| self.corpus_df = pd.DataFrame({ | |
| 'sentence': sentences, | |
| 'doi': dois, | |
| 'paper_title': paper_titles, | |
| 'source_title': source_titles, # Journal/publication name | |
| 'authors': authors_list, | |
| 'year': years, | |
| 'file_source': file_sources, | |
| 'sentence_source': sentence_sources | |
| }) | |
| # Save a copy to persistent storage | |
| corpus_path = self.data_dir / f"processed_corpus_{uploaded_file.name}" | |
| self.corpus_df.to_excel(corpus_path.with_suffix('.xlsx'), index=False) | |
| st.success(f"Extracted {len(sentences)} sentences from CSV and saved to {corpus_path}") | |
| # Process Excel files | |
| elif file_extension in ['.xlsx', '.xls']: | |
| # Read Excel file | |
| try: | |
| raw_df = pd.read_excel(uploaded_file) | |
| sentences = [] | |
| file_sources = [] | |
| dois = [] | |
| paper_titles = [] | |
| source_titles = [] # For journal/publication names | |
| authors_list = [] | |
| years = [] | |
| sentence_sources = [] | |
| # Process each row | |
| for i, row in raw_df.iterrows(): | |
| # Get paper details for metadata | |
| paper_title = row.get('Title', '') | |
| paper_doi = row.get('DOI', '') | |
| paper_authors = row.get('Authors', '') | |
| paper_year = row.get('Year', '') | |
| source_title = row.get('Source title', '') # Get source title (journal name) | |
| # Extract title if available | |
| if 'Title' in raw_df.columns and pd.notna(row['Title']): | |
| sentences.append(row['Title']) | |
| file_sources.append(uploaded_file.name) | |
| dois.append(paper_doi) | |
| paper_titles.append(paper_title) | |
| source_titles.append(source_title) | |
| authors_list.append(paper_authors) | |
| years.append(paper_year) | |
| sentence_sources.append('Title') | |
| # Extract abstract if available | |
| if 'Abstract' in raw_df.columns and pd.notna(row['Abstract']): | |
| # Split abstract into sentences (simple split by periods) | |
| abstract_sentences = [s.strip() for s in str(row['Abstract']).split('.') if s.strip()] | |
| for abs_sent in abstract_sentences: | |
| sentences.append(abs_sent) | |
| file_sources.append(uploaded_file.name) | |
| dois.append(paper_doi) | |
| paper_titles.append(paper_title) | |
| source_titles.append(source_title) | |
| authors_list.append(paper_authors) | |
| years.append(paper_year) | |
| sentence_sources.append('Abstract') | |
| # Extract author keywords if available | |
| if 'Author Keywords' in raw_df.columns and pd.notna(row['Author Keywords']): | |
| keywords = str(row['Author Keywords']).split(';') | |
| for keyword in keywords: | |
| if keyword.strip(): | |
| sentences.append(keyword.strip()) | |
| file_sources.append(uploaded_file.name) | |
| dois.append(paper_doi) | |
| paper_titles.append(paper_title) | |
| source_titles.append(source_title) | |
| authors_list.append(paper_authors) | |
| years.append(paper_year) | |
| sentence_sources.append('Author Keywords') | |
| # Extract index keywords if available | |
| if 'Index Keywords' in raw_df.columns and pd.notna(row['Index Keywords']): | |
| keywords = str(row['Index Keywords']).split(';') | |
| for keyword in keywords: | |
| if keyword.strip(): | |
| sentences.append(keyword.strip()) | |
| file_sources.append(uploaded_file.name) | |
| dois.append(paper_doi) | |
| paper_titles.append(paper_title) | |
| source_titles.append(source_title) | |
| authors_list.append(paper_authors) | |
| years.append(paper_year) | |
| sentence_sources.append('Index Keywords') | |
| # Add to combined lists | |
| all_sentences.extend(sentences) | |
| all_metadata.extend([{ | |
| 'doi': doi, | |
| 'title': title, | |
| 'container_title': source, | |
| 'authors': authors, | |
| 'published_date': str(year) if year else '', | |
| 'source': uploaded_file.name, | |
| 'sentence_source': source_type | |
| } for doi, title, source, authors, year, source_type in zip( | |
| dois, paper_titles, source_titles, authors_list, years, sentence_sources)]) | |
| # Create the corpus DataFrame with separate columns | |
| self.corpus_df = pd.DataFrame({ | |
| 'sentence': sentences, | |
| 'doi': dois, | |
| 'paper_title': paper_titles, | |
| 'source_title': source_titles, # Journal/publication name | |
| 'authors': authors_list, | |
| 'year': years, | |
| 'file_source': file_sources, | |
| 'sentence_source': sentence_sources | |
| }) | |
| # Save a copy to persistent storage | |
| corpus_path = self.data_dir / f"processed_corpus_{uploaded_file.name}" | |
| self.corpus_df.to_excel(corpus_path.with_suffix('.xlsx'), index=False) | |
| st.success(f"Extracted {len(sentences)} sentences from Excel and saved to {corpus_path}") | |
| except Exception as excel_e: | |
| st.error(f"Error processing Excel: {excel_e}") | |
| else: | |
| st.warning(f"Skipping unsupported file format: {file_extension}. Please upload PDF, Excel, or CSV files.") | |
| continue | |
| # Create combined DataFrame from all extracted sentences | |
| if all_sentences: | |
| # Process data to ensure it's safe for Excel | |
| data = { | |
| 'sentence': [sanitize_text(s) for s in all_sentences], | |
| 'doi': [sanitize_text(m.get('doi', '')) for m in all_metadata], | |
| 'paper_title': [sanitize_text(m.get('title', '')) for m in all_metadata], | |
| 'source_title': [sanitize_text(m.get('container_title', '')) for m in all_metadata], | |
| 'authors': [sanitize_text(m.get('authors', '')) for m in all_metadata], | |
| 'year': [sanitize_text(m.get('published_date', '')[:4]) if m.get('published_date') else '' for m in all_metadata], | |
| 'file_source': [sanitize_text(m.get('source', '')) for m in all_metadata], | |
| 'sentence_source': [sanitize_text(m.get('sentence_source', 'PDF Text')) for m in all_metadata], | |
| 'page': [m.get('page', '') for m in all_metadata] | |
| } | |
| # Convert to DataFrame | |
| self.corpus_df = pd.DataFrame(data) | |
| try: | |
| # Save a copy to persistent storage | |
| corpus_path = self.data_dir / "combined_corpus.xlsx" | |
| self.corpus_df.to_excel(corpus_path, index=False) | |
| st.success(f"Combined {len(all_sentences)} sentences from {len(uploaded_files)} files and saved to {corpus_path}") | |
| # Display preview of the loaded data | |
| st.write("First few rows of the combined corpus:") | |
| st.write(self.corpus_df.head()) | |
| except Exception as save_e: | |
| st.error(f"Error saving to Excel: {save_e}") | |
| # Fallback to CSV which is more robust | |
| csv_path = self.data_dir / "combined_corpus.csv" | |
| self.corpus_df.to_csv(csv_path, index=False) | |
| st.warning(f"Saved as CSV instead at {csv_path} due to Excel compatibility issues") | |
| else: | |
| st.error("No valid sentences extracted from the uploaded files.") | |
| except Exception as e: | |
| st.error(f"Error loading corpus: {e}") | |
| import traceback | |
| st.error(traceback.format_exc()) | |
| def load_theory(self, uploaded_file): | |
| """ | |
| Load theory sentences from Excel or CSV. | |
| These are the sentences we want to match with the corpus. | |
| Args: | |
| uploaded_file: File uploaded through Streamlit interface | |
| """ | |
| try: | |
| # Determine file type from extension | |
| file_extension = Path(uploaded_file.name).suffix.lower() | |
| # Process Excel files | |
| if file_extension in ['.xlsx', '.xls']: | |
| # Read Excel file | |
| self.theory_df = pd.read_excel(uploaded_file) | |
| # Save a copy to persistent storage | |
| theory_path = self.data_dir / f"theory_{uploaded_file.name}" | |
| self.theory_df.to_excel(theory_path, index=False) | |
| st.success(f"Loaded {len(self.theory_df)} rows from theory Excel file and saved to {theory_path}") | |
| # Process CSV files | |
| elif file_extension == '.csv': | |
| # Read CSV file | |
| self.theory_df = pd.read_csv(uploaded_file) | |
| # Save a copy to persistent storage | |
| theory_path = self.data_dir / f"theory_{uploaded_file.name}" | |
| self.theory_df.to_csv(theory_path, index=False) | |
| st.success(f"Loaded {len(self.theory_df)} rows from theory CSV file and saved to {theory_path}") | |
| # Handle unsupported file types | |
| else: | |
| st.error("Unsupported file format for theory data. Please upload an Excel or CSV file.") | |
| return | |
| # Display preview of the loaded data | |
| st.write("First few rows of the theory data:") | |
| st.write(self.theory_df.head()) | |
| except Exception as e: | |
| st.error(f"Error loading theory data: {e}") | |
| def save_embeddings(self, corpus_nodes, theory_nodes): | |
| """ | |
| Save embeddings to persistent storage on Hugging Face Space. | |
| This allows reusing embeddings without recomputing them. | |
| Args: | |
| corpus_nodes: List of TextNode objects for corpus sentences | |
| theory_nodes: List of TextNode objects for theory sentences | |
| Returns: | |
| bool: True if saving was successful, False otherwise | |
| """ | |
| try: | |
| # Prepare corpus embeddings for serialization | |
| corpus_data = [] | |
| for node in corpus_nodes: | |
| corpus_data.append({ | |
| "text": node.text, | |
| "metadata": node.metadata, | |
| # Convert numpy array to list for JSON serialization | |
| # "embedding": node.embedding.tolist() | |
| "embedding": node.embedding | |
| }) | |
| # Prepare theory embeddings for serialization | |
| theory_data = [] | |
| for node in theory_nodes: | |
| theory_data.append({ | |
| "text": node.text, | |
| "metadata": node.metadata, | |
| # Convert numpy array to list for JSON serialization | |
| # "embedding": node.embedding.tolist() | |
| "embedding": node.embedding | |
| }) | |
| # Define paths for saving embeddings | |
| corpus_emb_path = self.embeddings_dir / "corpus_embeddings.json" | |
| theory_emb_path = self.embeddings_dir / "theory_embeddings.json" | |
| # Save corpus embeddings to JSON | |
| with open(corpus_emb_path, 'w') as f: | |
| json.dump(corpus_data, f) | |
| # Save theory embeddings to JSON | |
| with open(theory_emb_path, 'w') as f: | |
| json.dump(theory_data, f) | |
| st.success(f"Saved embeddings to {self.embeddings_dir}") | |
| return True | |
| except Exception as e: | |
| st.error(f"Error saving embeddings: {e}") | |
| return False | |
| def load_embeddings(self): | |
| """ | |
| Load previously saved embeddings from persistent storage. | |
| This avoids recomputing embeddings for the same data. | |
| Returns: | |
| Tuple containing: | |
| - List of corpus TextNode objects (or None if loading fails) | |
| - List of theory TextNode objects (or None if loading fails) | |
| """ | |
| try: | |
| # Define paths for loading embeddings | |
| corpus_emb_path = self.embeddings_dir / "corpus_embeddings.json" | |
| theory_emb_path = self.embeddings_dir / "theory_embeddings.json" | |
| # Check if embedding files exist | |
| if not corpus_emb_path.exists() or not theory_emb_path.exists(): | |
| st.warning("No saved embeddings found.") | |
| return None, None | |
| # Load corpus embeddings from JSON | |
| with open(corpus_emb_path, 'r') as f: | |
| corpus_data = json.load(f) | |
| # Load theory embeddings from JSON | |
| with open(theory_emb_path, 'r') as f: | |
| theory_data = json.load(f) | |
| # Recreate corpus TextNode objects | |
| corpus_nodes = [] | |
| for item in corpus_data: | |
| node = TextNode( | |
| text=item["text"], | |
| metadata=item["metadata"], | |
| # Convert list back to numpy array | |
| embedding=np.array(item["embedding"]) | |
| ) | |
| corpus_nodes.append(node) | |
| # Recreate theory TextNode objects | |
| theory_nodes = [] | |
| for item in theory_data: | |
| node = TextNode( | |
| text=item["text"], | |
| metadata=item["metadata"], | |
| # Convert list back to numpy array | |
| embedding=np.array(item["embedding"]) | |
| ) | |
| theory_nodes.append(node) | |
| st.success(f"Loaded {len(corpus_nodes)} corpus embeddings and {len(theory_nodes)} theory embeddings from {self.embeddings_dir}") | |
| return corpus_nodes, theory_nodes | |
| except Exception as e: | |
| st.error(f"Error loading embeddings: {e}") | |
| return None, None | |
| def create_embeddings(self, use_cached=False): | |
| """ | |
| Create embeddings for corpus and theory sentences and store in LlamaIndex. | |
| Uses Hugging Face sentence transformer model to generate embeddings. | |
| Args: | |
| use_cached: Whether to try loading cached embeddings first | |
| """ | |
| try: | |
| # Try to load cached embeddings if requested | |
| if use_cached: | |
| corpus_nodes, theory_nodes = self.load_embeddings() | |
| if corpus_nodes and theory_nodes: | |
| # If cached embeddings were successfully loaded | |
| self.corpus_nodes = corpus_nodes | |
| self.theory_nodes = theory_nodes | |
| # Initialize vector store with loaded embeddings | |
| all_nodes = corpus_nodes + theory_nodes | |
| self.vector_store = SimpleVectorStore() | |
| # Create a custom settings object with our embedding model | |
| from llama_index.core import Settings | |
| from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
| # Initialize embedding model for LlamaIndex | |
| hf_embed_model = HuggingFaceEmbedding(model_name=self.model_name, trust_remote_code=True) | |
| # Set the embedding model in settings and create index | |
| Settings.embed_model = hf_embed_model | |
| self.index = VectorStoreIndex(all_nodes) | |
| return | |
| # Check if data is loaded | |
| if self.corpus_df is None or self.theory_df is None: | |
| st.error("Please load both corpus and theory data first.") | |
| return | |
| # Load model if not already loaded | |
| if self.model is None: | |
| self.load_model() | |
| st.info("Creating embeddings for corpus and theory sentences...") | |
| # Initialize vector store for storing embeddings | |
| self.vector_store = SimpleVectorStore() | |
| # Create status containers for progress reporting | |
| corpus_status = st.empty() | |
| theory_status = st.empty() | |
| corpus_progress = st.progress(0) | |
| theory_progress = st.progress(0) | |
| # Process corpus sentences | |
| corpus_nodes = [] | |
| total_corpus = len(self.corpus_df) | |
| corpus_status.text(f"Processing corpus sentences: 0/{total_corpus}") | |
| for i, row in self.corpus_df.iterrows(): | |
| # Update progress every 10 sentences or so | |
| if i % 10 == 0 or i == total_corpus - 1: | |
| progress_pct = min(i / total_corpus, 1.0) | |
| corpus_progress.progress(progress_pct) | |
| corpus_status.text(f"Processing corpus sentences: {i+1}/{total_corpus} ({int(progress_pct*100)}%)") | |
| # Get sentence text | |
| sentence = row['sentence'] | |
| # Create metadata dictionary from all columns except 'sentence' | |
| metadata = {col: row[col] for col in self.corpus_df.columns if col != 'sentence'} | |
| # Create text node with embeddings from the Hugging Face model | |
| node = TextNode( | |
| text=sentence, | |
| metadata=metadata, | |
| # Generate embedding for the sentence | |
| embedding=self.model.encode(sentence).tolist() | |
| ) | |
| corpus_nodes.append(node) | |
| corpus_status.text(f"Completed processing all {total_corpus} corpus sentences") | |
| corpus_progress.progress(1.0) | |
| # Process theory sentences | |
| theory_nodes = [] | |
| total_theory = len(self.theory_df) | |
| theory_status.text(f"Processing theory sentences: 0/{total_theory}") | |
| for i, row in self.theory_df.iterrows(): | |
| # Update progress every 10 sentences or so | |
| if i % 10 == 0 or i == total_theory - 1: | |
| progress_pct = min(i / total_theory, 1.0) | |
| theory_progress.progress(progress_pct) | |
| theory_status.text(f"Processing theory sentences: {i+1}/{total_theory} ({int(progress_pct*100)}%)") | |
| # Get sentence text | |
| sentence = row['sentence'] | |
| # Create metadata from all other columns | |
| metadata = {col: row[col] for col in self.theory_df.columns if col != 'sentence'} | |
| # Add a theory indicator | |
| metadata["type"] = "theory" | |
| # Create text node with embeddings from the Hugging Face model | |
| node = TextNode( | |
| text=sentence, | |
| metadata=metadata, | |
| # Generate embedding for the sentence | |
| embedding=self.model.encode(sentence).tolist() | |
| ) | |
| theory_nodes.append(node) | |
| theory_status.text(f"Completed processing all {total_theory} theory sentences") | |
| theory_progress.progress(1.0) | |
| # Add all nodes to the vector store | |
| all_nodes = corpus_nodes + theory_nodes | |
| # Create index for vector search - use HuggingFace embedding model explicitly | |
| st.info("Creating vector index for similarity search...") | |
| # Initialize embedding model for LlamaIndex | |
| from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
| from llama_index.core import Settings | |
| # Set the embedding model in settings | |
| hf_embed_model = HuggingFaceEmbedding(model_name=self.model_name, trust_remote_code=True) | |
| Settings.embed_model = hf_embed_model | |
| # Now create the index with the correct embedding model | |
| self.index = VectorStoreIndex(all_nodes) | |
| # Save embeddings to persistent storage | |
| st.info("Saving embeddings to persistent storage...") | |
| if self.save_embeddings(corpus_nodes, theory_nodes): | |
| st.success(f"Successfully created and saved embeddings for {len(all_nodes)} sentences") | |
| # Store the nodes separately for direct similarity calculations | |
| self.corpus_nodes = corpus_nodes | |
| self.theory_nodes = theory_nodes | |
| except Exception as e: | |
| st.error(f"Error creating embeddings: {e}") | |
| import traceback | |
| st.error(traceback.format_exc()) # Show the full stack trace for debugging | |
| def find_matches(self): | |
| """ | |
| Find matches between corpus and theory sentences with | |
| cosine similarity above the threshold. | |
| """ | |
| try: | |
| # Check if embeddings have been created in memory | |
| if not hasattr(self, 'corpus_nodes') or not hasattr(self, 'theory_nodes') or not self.corpus_nodes or not self.theory_nodes: | |
| # Try to load embeddings from disk | |
| st.info("Loading embeddings from disk...") | |
| corpus_nodes, theory_nodes = self.load_embeddings() | |
| # If loading failed, show error | |
| if not corpus_nodes or not theory_nodes: | |
| st.error("Please create embeddings first.") | |
| return | |
| # Store loaded embeddings in memory | |
| self.corpus_nodes = corpus_nodes | |
| self.theory_nodes = theory_nodes | |
| st.info(f"Finding matches with similarity threshold > {self.similarity_threshold}...") | |
| # Create list to store matches | |
| matches = [] | |
| # Extract embeddings from nodes into numpy arrays for efficient computation | |
| corpus_embeddings = np.array([node.embedding for node in self.corpus_nodes]) | |
| theory_embeddings = np.array([node.embedding for node in self.theory_nodes]) | |
| # Calculate cosine similarity matrix between all corpus and theory sentences | |
| # This is efficient as it computes all pairwise similarities at once | |
| similarity_matrix = cosine_similarity(corpus_embeddings, theory_embeddings) | |
| # Find matches above threshold | |
| for i in range(similarity_matrix.shape[0]): | |
| for j in range(similarity_matrix.shape[1]): | |
| sim_score = similarity_matrix[i, j] | |
| # Only include matches above the threshold | |
| if sim_score > self.similarity_threshold: | |
| matches.append({ | |
| 'corpus_sentence': self.corpus_nodes[i].text, | |
| 'theory_sentence': self.theory_nodes[j].text, | |
| 'similarity_score': sim_score, | |
| 'corpus_metadata': self.corpus_nodes[i].metadata, | |
| 'theory_metadata': self.theory_nodes[j].metadata | |
| }) | |
| # Create DataFrame and sort by similarity score (highest first) | |
| matches_df = pd.DataFrame(matches) | |
| if not matches_df.empty: | |
| matches_df = matches_df.sort_values(by='similarity_score', ascending=False) | |
| # Display matches | |
| st.success(f"Found {len(matches_df)} matches above similarity threshold {self.similarity_threshold}") | |
| st.write(matches_df) | |
| # Save matches to persistent storage | |
| matches_path = self.data_dir / f"matches_threshold_{self.similarity_threshold}.csv" | |
| matches_df.to_csv(matches_path, index=False) | |
| # Create downloadable CSV | |
| csv = matches_df.to_csv(index=False) | |
| st.download_button( | |
| label="Download matches as CSV", | |
| data=csv, | |
| file_name="similarity_matches.csv", | |
| mime="text/csv" | |
| ) | |
| st.info(f"Matches saved to {matches_path}") | |
| else: | |
| st.warning(f"No matches found above similarity threshold {self.similarity_threshold}") | |
| except Exception as e: | |
| st.error(f"Error finding matches: {e}") | |
| import traceback | |
| st.error(traceback.format_exc()) | |
| def adjust_threshold(self, new_threshold): | |
| """ | |
| Adjust similarity threshold for matching. | |
| Args: | |
| new_threshold: New threshold value (0.0-1.0) | |
| """ | |
| self.similarity_threshold = new_threshold | |
| st.success(f"Similarity threshold updated to {self.similarity_threshold}") | |
| def run_app(self): | |
| """ | |
| Main application function that creates the Streamlit interface | |
| and handles the application flow. | |
| """ | |
| # Set up the main page | |
| st.title("Document Similarity Matcher") | |
| st.write("Match sentences from your corpus with theory sentences using sentence embeddings") | |
| # Sidebar with model information | |
| st.sidebar.header("Model Information") | |
| st.sidebar.info(f"Using model: {self.model_name}\nEmbedding dimensions: {self.embedding_dim}") | |
| # Load model button | |
| if st.sidebar.button("Load Model"): | |
| self.load_model() | |
| # Step 1: Corpus data upload | |
| st.header("1. Upload Corpus Data") | |
| st.write("Upload a file containing sentences to analyze (PDF, Excel, or CSV)") | |
| corpus_files = st.file_uploader("Upload corpus file (PDF, Excel, CSV)", type=["pdf", "xlsx", "xls", "csv"], accept_multiple_files=True) | |
| if corpus_files: | |
| st.info(f"Loaded {len(corpus_files)} files. Processing...") | |
| corpus_process = st.button("Process Corpus Files") | |
| if True: | |
| self.load_corpus(corpus_files) | |
| # Step 2: Theory data upload | |
| st.header("2. Upload Theory Data") | |
| st.write("Upload a file containing theory sentences to match against (Excel or CSV)") | |
| theory_file = st.file_uploader("Upload theory file (Excel, CSV)", type=["xlsx", "xls", "csv"]) | |
| if theory_file is not None: | |
| self.load_theory(theory_file) | |
| # Step 3: Create embeddings | |
| st.header("3. Create Embeddings") | |
| st.write("Transform sentences into vector embeddings using the sentence transformer model") | |
| use_cached = st.checkbox("Use cached embeddings if available", value=True) | |
| if st.button("Create Embeddings"): | |
| self.create_embeddings(use_cached=use_cached) | |
| # Step 4: Set similarity threshold | |
| st.header("4. Set Similarity Threshold") | |
| st.write("Adjust the minimum similarity score required for a match (0.0-1.0)") | |
| new_threshold = st.slider("Similarity threshold", min_value=0.0, max_value=1.0, value=0.75, step=0.01) | |
| if new_threshold != self.similarity_threshold: | |
| self.adjust_threshold(new_threshold) | |
| # Step 5: Find matches | |
| st.header("5. Find Matches") | |
| st.write("Find corpus sentences that match theory sentences above the similarity threshold") | |
| if st.button("Find Matches"): | |
| self.find_matches() | |
| def extract_doi_from_text(text): | |
| """ | |
| Extract DOI from text using regex pattern matching. | |
| This serves as a fallback when pdf2doi is not available. | |
| Args: | |
| text (str): Text to search for DOI | |
| Returns: | |
| str: The DOI if found, empty string if not found | |
| """ | |
| # Common DOI patterns | |
| doi_patterns = [ | |
| r'(?:doi|DOI):\s*(10\.\d{4,}(?:\.\d+)*\/\S+)', | |
| r'(?:https?://)?(?:dx\.)?doi\.org/(10\.\d{4,}(?:\.\d+)*\/\S+)', | |
| r'(10\.\d{4,}(?:\.\d+)*\/\S+)' | |
| ] | |
| # Search for DOI using each pattern | |
| for pattern in doi_patterns: | |
| match = re.search(pattern, text) | |
| if match: | |
| # Clean up the DOI by removing trailing punctuation | |
| doi = match.group(1).rstrip('.,;:') | |
| return doi | |
| return "" | |
| def extract_doi_from_pdf(pdf_bytes): | |
| """ | |
| Extract DOI from a PDF file. | |
| Args: | |
| pdf_bytes: PDF file as bytes | |
| Returns: | |
| str: The DOI if found, empty string if not found or error occurred | |
| """ | |
| try: | |
| # Create a temporary file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp: | |
| tmp.write(pdf_bytes) | |
| tmp_path = tmp.name | |
| # Try using pdf2doi if available | |
| if PDF2DOI_AVAILABLE: | |
| try: | |
| result = pdf2doi(tmp_path) | |
| if result and 'identifier' in result and result['identifier']: | |
| os.unlink(tmp_path) # Clean up | |
| return result['identifier'] | |
| except Exception as e: | |
| st.warning(f"pdf2doi error: {e}. Falling back to text-based extraction.") | |
| # Fallback: Extract DOI from text | |
| doc = fitz.open(tmp_path) | |
| text = "" | |
| # Check first few pages for DOI | |
| for page_num in range(min(5, len(doc))): | |
| text += doc[page_num].get_text() | |
| os.unlink(tmp_path) # Clean up | |
| # Try to find DOI in the extracted text | |
| doi = extract_doi_from_text(text) | |
| return doi | |
| except Exception as e: | |
| st.error(f"Error extracting DOI: {str(e)}") | |
| return "" | |
| def get_metadata(doi): | |
| """ | |
| Retrieve metadata for a DOI from the Crossref API | |
| Args: | |
| doi (str): DOI string to lookup | |
| Returns: | |
| dict: Raw metadata response or None if request failed | |
| """ | |
| url = f"https://api.crossref.org/works/{doi}" | |
| headers = { | |
| "Accept": "application/json", | |
| "User-Agent": "DataNova/1.0 (mailto:your.email@example.com)" | |
| } | |
| try: | |
| response = requests.get(url, headers=headers) | |
| if response.status_code == 200: | |
| return response.json() | |
| else: | |
| st.warning(f"Failed to retrieve metadata: HTTP {response.status_code}") | |
| return None | |
| except Exception as e: | |
| st.warning(f"Error retrieving metadata: {str(e)}") | |
| return None | |
| def extract_structured_metadata(metadata): | |
| """ | |
| Extract relevant fields from the metadata response | |
| Args: | |
| metadata (dict): Raw metadata from Crossref API | |
| Returns: | |
| dict: Structured metadata with key fields | |
| """ | |
| if not metadata or 'message' not in metadata: | |
| return {} | |
| work = metadata.get('message', {}) | |
| # Extract basic information | |
| structured_data = { | |
| 'doi': work.get('DOI', ''), | |
| 'title': work.get('title', [''])[0] if work.get('title') else '', | |
| 'container_title': work.get('container-title', [''])[0] if work.get('container-title') else '', | |
| 'publisher': work.get('publisher', ''), | |
| 'type': work.get('type', ''), | |
| 'volume': work.get('volume', ''), | |
| 'issue': work.get('issue', ''), | |
| 'page': work.get('page', ''), | |
| 'article_number': work.get('article-number', ''), | |
| 'published_date': '', | |
| 'reference_count': work.get('reference-count', 0), | |
| 'citation_count': work.get('is-referenced-by-count', 0), | |
| 'abstract': work.get('abstract', '') | |
| } | |
| # Extract publication date | |
| if 'published' in work and 'date-parts' in work['published'] and work['published']['date-parts']: | |
| date_parts = work['published']['date-parts'][0] | |
| if len(date_parts) >= 3: | |
| structured_data['published_date'] = f"{date_parts[0]}-{date_parts[1]:02d}-{date_parts[2]:02d}" | |
| elif len(date_parts) == 2: | |
| structured_data['published_date'] = f"{date_parts[0]}-{date_parts[1]:02d}" | |
| elif len(date_parts) == 1: | |
| structured_data['published_date'] = f"{date_parts[0]}" | |
| # Extract authors | |
| authors = work.get('author', []) | |
| structured_data['authors'] = [] | |
| structured_data['author_names'] = [] | |
| for author in authors: | |
| author_data = { | |
| 'given': author.get('given', ''), | |
| 'family': author.get('family', ''), | |
| 'orcid': author.get('ORCID', ''), | |
| 'sequence': author.get('sequence', '') | |
| } | |
| structured_data['authors'].append(author_data) | |
| structured_data['author_names'].append(f"{author.get('given', '')} {author.get('family', '')}") | |
| # Convert author_names to string for DataFrame compatibility | |
| structured_data['author_names'] = '; '.join(structured_data['author_names']) | |
| # Extract funding information | |
| funders = work.get('funder', []) | |
| structured_data['funders'] = [] | |
| structured_data['funder_names'] = [] | |
| for funder in funders: | |
| funder_data = { | |
| 'name': funder.get('name', ''), | |
| 'doi': funder.get('DOI', ''), | |
| 'award': funder.get('award', []) | |
| } | |
| structured_data['funders'].append(funder_data) | |
| structured_data['funder_names'].append(funder.get('name', '')) | |
| # Convert funder_names to string for DataFrame compatibility | |
| structured_data['funder_names'] = '; '.join(structured_data['funder_names']) | |
| return structured_data | |
| def sanitize_for_excel(text): | |
| """ | |
| Sanitize text to be Excel-friendly, keeping only alphanumeric and basic characters. | |
| Args: | |
| text (str): The input text to sanitize | |
| Returns: | |
| str: Sanitized text safe for Excel | |
| """ | |
| # First sanitize all text to remove problematic characters | |
| text = sanitize_text(text) | |
| if not text: | |
| return "Unnamed" | |
| # First, replace common problematic characters with simple alternatives | |
| result = text.replace(':', '').replace('/', ' ').replace('\\', ' ') | |
| result = result.replace('[', '').replace(']', '') | |
| result = result.replace('*', '').replace('?', '') | |
| result = result.replace(',', ' ').replace(';', ' ') | |
| # Keep only alphanumeric characters, spaces, periods, parentheses, and hyphens | |
| allowed_chars = set('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 .-()_') | |
| result = ''.join(c for c in result if c in allowed_chars) | |
| # Trim excess whitespace | |
| result = ' '.join(result.split()) | |
| # Excel worksheet names have a limit of 31 characters | |
| if len(result) > 31: | |
| result = result[:31] | |
| # Ensure the text isn't empty after sanitizing | |
| if not result.strip(): | |
| result = "Unnamed" | |
| return result | |
| def sanitize_text(text): | |
| """ | |
| Clean text by removing problematic characters and normalizing Unicode characters. | |
| Args: | |
| text: The text to clean | |
| Returns: | |
| str: Cleaned text without non-printable or problematic characters | |
| """ | |
| if not isinstance(text, str): | |
| return str(text) | |
| # First normalize Unicode to NFKD form to handle diacritical marks and special characters | |
| import unicodedata | |
| text = unicodedata.normalize('NFKD', text) | |
| # Replace common problematic non-printable characters | |
| text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F-\x9F]', '', text) | |
| # Replace other common problematic characters for Excel | |
| text = text.replace('\u2013', '-') # en dash | |
| text = text.replace('\u2014', '-') # em dash | |
| text = text.replace('\u2018', "'") # left single quote | |
| text = text.replace('\u2019', "'") # right single quote | |
| text = text.replace('\u201C', '"') # left double quote | |
| text = text.replace('\u201D', '"') # right double quote | |
| text = text.replace('\u2026', '...') # ellipsis | |
| # Remove Unicode private use area characters | |
| text = re.sub(r'[\uE000-\uF8FF]', '', text) | |
| text = re.sub(r'[\U000F0000-\U000FFFFD]', '', text) | |
| text = re.sub(r'[\U00100000-\U0010FFFD]', '', text) | |
| # Convert to ASCII and ignore non-convertible characters (aggressive cleaning) | |
| text = text.encode('ascii', errors='ignore').decode('ascii') | |
| # Remove control characters | |
| text = ''.join(ch for ch in text if unicodedata.category(ch)[0] != 'C') | |
| return text | |
| def main(): | |
| """ | |
| Entry point for the application. | |
| Creates an instance of the SentenceEmbeddingApp and runs it. | |
| """ | |
| app = SentenceEmbeddingApp() | |
| app.run_app() | |
| # Standard Python idiom to check if this file is being run directly | |
| if __name__ == "__main__": | |
| main() |