matching-test / app.py
akhil-vaidya's picture
Update app.py
520d581 verified
import streamlit as st
import pandas as pd
import numpy as np
import os
import tempfile
from pathlib import Path
import fitz # PyMuPDF for PDF processing
from sentence_transformers import SentenceTransformer # HuggingFace's sentence transformer models
from sklearn.metrics.pairwise import cosine_similarity
from typing import List, Dict, Tuple, Any, Optional
import json
import re
import requests
from huggingface_hub import HfApi, Repository
from huggingface_hub.utils import HfHubHTTPError
from llama_index.core.vector_stores import SimpleVectorStore
from llama_index.core.schema import TextNode
# Add this to your existing imports at the top of the file
from llama_index.core import VectorStoreIndex
# Add to imports section at the top
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
import os
import re
import tempfile
import traceback
import pandas as pd
import fitz # PyMuPDF
from pathlib import Path
from llama_index.core import SimpleDirectoryReader
from mistralai import Mistral
# Add imports for DOI extraction
try:
from pdf2doi import pdf2doi
PDF2DOI_AVAILABLE = True
except ImportError:
PDF2DOI_AVAILABLE = False
MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY")
class SentenceEmbeddingApp:
"""
Main application class for matching sentences using embeddings.
This class handles:
1. Loading corpus and theory data
2. Transforming sentences into embeddings using Hugging Face models
3. Storing embeddings in LlamaIndex and persistent storage
4. Finding matches based on cosine similarity
"""
def __init__(self):
"""
Initialize application with default settings and create storage directories.
"""
# Model configuration
self.model_name = "sentence-transformers/all-MiniLM-L6-v2" # HuggingFace model ID
self.embedding_dim = 384 # Dimension size for the embeddings
self.similarity_threshold = 0.75 # Default similarity threshold (0.0-1.0)
# Initialize as None, will be populated later
self.model = None # Will hold the sentence transformer model
self.corpus_df = None # Will hold corpus sentences dataframe
self.theory_df = None # Will hold theory sentences dataframe
self.vector_store = None # Will hold LlamaIndex vector store
self.index = None # Will hold LlamaIndex index
# Create persistent storage directories for Hugging Face Spaces
self.data_dir = Path("data") # Directory for raw data
self.data_dir.mkdir(exist_ok=True) # Create if it doesn't exist
self.embeddings_dir = Path("embeddings") # Directory for embeddings
self.embeddings_dir.mkdir(exist_ok=True) # Create if it doesn't exist
# Check if pdf2doi is available
if not PDF2DOI_AVAILABLE:
print("Note: pdf2doi library not installed. Using built-in DOI extraction.")
def load_model(self):
"""
Load the sentence transformer model from Hugging Face Hub.
The model is used to convert sentences into fixed-dimensional vectors.
"""
try:
st.info(f"Loading sentence transformer model: {self.model_name}")
# Load model directly from Hugging Face Hub
# This automatically downloads the model if not cached
self.model = SentenceTransformer(self.model_name)
st.success(f"Model {self.model_name} loaded successfully")
except Exception as e:
st.error(f"Error loading model: {e}")
def process_paragraph(self, paragraph, headers, sentences_data, pdf_title):
"""
Process a paragraph and extract sentences with their section headers
Args:
paragraph: The paragraph text to process
headers: List of current headers
sentences_data: List to append sentence data to
pdf_title: Title of the PDF for source reference
"""
# Clean paragraph
clean_paragraph = sanitize_text(paragraph.strip())
# Split into sentences
sentences = re.split(r'(?<=[.!?])\s+', clean_paragraph)
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
# Basic filtering - sentences must have at least 2 words
if len(sentence.split()) >= 2:
# Create header hierarchy string
section_hierarchy = " > ".join([h for h in headers if h])
# Add sentence data
sentences_data.append({
'sentence': sentence,
'section_hierarchy': section_hierarchy if section_hierarchy else "No Section",
'source': pdf_title
})
def extract_sections_and_sentences(self, markdown_text, pdf_title):
"""
Extract sections and sentences from markdown text
Args:
markdown_text: The markdown text converted from PDF
pdf_title: Title of the PDF for source reference
Returns:
List of dictionaries containing sentences and their section hierarchies
"""
lines = markdown_text.split('\n')
current_headers = []
sentences_data = []
current_paragraph = ""
header_pattern = re.compile(r'^(#{1,6})\s+(.+)$')
for line in lines:
header_match = header_pattern.match(line)
if header_match:
# Process any accumulated paragraph
if current_paragraph:
self.process_paragraph(current_paragraph, current_headers, sentences_data, pdf_title)
current_paragraph = ""
# Update current headers
level = len(header_match.group(1))
header_text = header_match.group(2).strip()
# Truncate headers list if at a higher level
current_headers = current_headers[:level-1]
# Append new header
if len(current_headers) < level:
current_headers.append(header_text)
else:
current_headers[level-1] = header_text
# Skip empty lines or formatting
elif line.strip() and not line.strip().startswith('---') and not line.strip().startswith('***'):
current_paragraph += " " + line.strip()
# Process any remaining paragraph
if current_paragraph:
self.process_paragraph(current_paragraph, current_headers, sentences_data, pdf_title)
return sentences_data
def extract_sentences_from_pdf(self, pdf_file):
"""
Extract sentences from PDF file with metadata enriched from DOI.
Uses Mistral OCR to convert PDF to markdown and extract section hierarchy.
Args:
pdf_file: Uploaded PDF file from Streamlit
Returns:
Tuple containing:
- List of extracted sentences
- List of metadata dictionaries for each sentence
"""
sentences = []
metadata_list = []
doc = None
tmp_path = None
try:
# Create temporary file to process the PDF
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp:
pdf_bytes = pdf_file.getvalue()
tmp.write(pdf_bytes)
tmp_path = tmp.name
# First, try to extract DOI from the PDF
st.info("Extracting DOI from PDF...")
doi = extract_doi_from_pdf(pdf_bytes)
# Initialize paper metadata
paper_metadata = {
"source": sanitize_text(pdf_file.name), # Original filename, sanitized
"doi": doi
}
# If DOI was found, fetch additional metadata
if doi:
st.success(f"DOI found: {doi}")
st.info("Retrieving metadata from Crossref API...")
# Get metadata from Crossref API
raw_metadata = get_metadata(doi)
if (raw_metadata):
structured_metadata = extract_structured_metadata(raw_metadata)
# Update paper metadata with Crossref data - sanitize values for Excel
paper_metadata.update({
"title": sanitize_for_excel(structured_metadata.get("title", "")),
"authors": sanitize_for_excel(structured_metadata.get("author_names", "")),
"published_date": sanitize_for_excel(structured_metadata.get("published_date", "")),
"publisher": sanitize_for_excel(structured_metadata.get("publisher", "")),
"container_title": sanitize_for_excel(structured_metadata.get("container_title", "")),
"type": sanitize_for_excel(structured_metadata.get("type", "")),
"citation_count": structured_metadata.get("citation_count", 0)
})
# Display success message with paper title
st.success(f"Metadata retrieved: {paper_metadata['title']}")
else:
st.warning("Could not retrieve metadata from Crossref.")
else:
st.warning("No DOI found in the PDF. Using basic metadata only.")
# Convert PDF to markdown using Mistral OCR
st.info("Converting PDF to markdown using Mistral OCR...")
# Initialize Mistral OCR
try:
mistral_api_key = MISTRAL_API_KEY
if not mistral_api_key:
st.error("MISTRAL_API_KEY environment variable not set.")
raise ValueError("MISTRAL_API_KEY environment variable not set.")
client = Mistral(api_key=mistral_api_key)
# Upload PDF to Mistral
uploaded_pdf = client.files.upload(
file={
"file_name": os.path.basename(tmp_path),
"content": open(tmp_path, "rb"),
},
purpose="ocr"
)
# Get signed URL for OCR processing
signed_url = client.files.get_signed_url(file_id=uploaded_pdf.id)
# Process PDF with OCR
ocr_response = client.ocr.process(
model="mistral-ocr-latest",
document={
"type": "document_url",
"document_url": signed_url.url,
}
)
# Extract markdown from all pages of OCR response
markdown_text = ""
if hasattr(ocr_response, 'pages') and ocr_response.pages:
for page in ocr_response.pages:
if hasattr(page, 'markdown') and page.markdown:
markdown_text += page.markdown + "\n\n" # Add spacing between pages
markdown_text = markdown_text.strip() # Remove trailing whitespace
if not markdown_text:
st.warning("No content extracted from PDF using Mistral OCR. Falling back to PyMuPDF.")
else:
st.success(f"PDF successfully converted to markdown using Mistral OCR ({len(ocr_response.pages)} pages).")
except Exception as e:
st.warning(f"Error using Mistral OCR: {str(e)}. Falling back to PyMuPDF.")
markdown_text = ""
# If Mistral OCR succeeded, extract sections and sentences
if markdown_text:
# Extract sections and sentences from markdown
sections_and_sentences = self.extract_sections_and_sentences(markdown_text, paper_metadata["title"])
# Process each sentence with its section hierarchy
for sentence_data in sections_and_sentences:
# Add sentence to list
sentences.append(sentence_data["sentence"])
# Create metadata for this sentence
sentence_metadata = paper_metadata.copy() # Start with paper metadata
sentence_metadata["sentence_source"] = sentence_data["section_hierarchy"] # Use section hierarchy as source
metadata_list.append(sentence_metadata)
# If Mistral OCR failed or returned no content, fall back to PyMuPDF
else:
# Open PDF with PyMuPDF
doc = fitz.open(tmp_path)
# Process each page in the PDF
for page_num, page in enumerate(doc):
# Extract text from page
text = page.get_text()
# Clean text before splitting into sentences
text = sanitize_text(text)
# Simple sentence splitting by period
raw_sentences = [s.strip() for s in text.replace('\n', ' ').split('.') if s.strip()]
# Process each sentence
for sentence in raw_sentences:
# Basic filtering - sentences must have at least 2 words
if len(sentence.split()) >= 2:
# Final clean to ensure no problematic characters
clean_sentence = sanitize_text(sentence)
sentences.append(clean_sentence)
# Create metadata for this sentence
sentence_metadata = paper_metadata.copy() # Start with paper metadata
sentence_metadata["page"] = page_num + 1 # Add page number (1-indexed)
sentence_metadata["sentence_source"] = f"Page {page_num + 1}" # Use page number as fallback
metadata_list.append(sentence_metadata)
# Show summary
st.success(f"Extracted {len(sentences)} sentences from PDF.")
except Exception as e:
st.error(f"Error processing PDF: {e}")
st.error(traceback.format_exc())
finally:
# Ensure document is closed before deleting file
if doc:
try:
doc.close()
except Exception as e:
st.error(f"Error closing document: {e}")
# Clean up temporary file
if tmp_path and os.path.exists(tmp_path):
try:
os.unlink(tmp_path)
except Exception as e:
st.warning(f"Failed to remove temporary file: {e}")
return sentences, metadata_list
def load_corpus(self, uploaded_files):
"""
Load corpus data from multiple uploaded files.
Each file's sentences are extracted and combined into one corpus.
Args:
uploaded_files: List of files uploaded through Streamlit interface
"""
try:
# Initialize lists for DataFrame columns
all_sentences = []
all_metadata = []
# Process each uploaded file
for uploaded_file in uploaded_files:
# Determine file type from extension
file_extension = Path(uploaded_file.name).suffix.lower()
# Process PDF files
if file_extension == '.pdf':
# Extract sentences and metadata using DOI
sentences, metadata_list = self.extract_sentences_from_pdf(uploaded_file)
# Add to combined lists
all_sentences.extend(sentences)
all_metadata.extend(metadata_list)
st.success(f"Extracted {len(sentences)} sentences from PDF: {uploaded_file.name}")
# Process CSV files
elif file_extension == '.csv':
# Read CSV file
raw_df = pd.read_csv(uploaded_file)
# Create lists for each column
sentences = []
file_sources = []
dois = []
paper_titles = []
source_titles = [] # For journal/publication names
authors_list = []
years = []
sentence_sources = []
# Process each row
for i, row in raw_df.iterrows():
# Get paper details for metadata
paper_title = row.get('Title', '')
paper_doi = row.get('DOI', '')
paper_authors = row.get('Authors', '')
paper_year = row.get('Year', '')
source_title = row.get('Source title', '') # Get source title (journal name)
# Extract title if available
if 'Title' in raw_df.columns and pd.notna(row['Title']):
sentences.append(row['Title'])
file_sources.append(uploaded_file.name)
dois.append(paper_doi)
paper_titles.append(paper_title)
source_titles.append(source_title)
authors_list.append(paper_authors)
years.append(paper_year)
sentence_sources.append('Title')
# Extract abstract if available
if 'Abstract' in raw_df.columns and pd.notna(row['Abstract']):
# Split abstract into sentences (simple split by periods)
abstract_sentences = [s.strip() for s in str(row['Abstract']).split('.') if s.strip()]
for abs_sent in abstract_sentences:
sentences.append(abs_sent)
file_sources.append(uploaded_file.name)
dois.append(paper_doi)
paper_titles.append(paper_title)
source_titles.append(source_title)
authors_list.append(paper_authors)
years.append(paper_year)
sentence_sources.append('Abstract')
# Extract author keywords if available
if 'Author Keywords' in raw_df.columns and pd.notna(row['Author Keywords']):
keywords = str(row['Author Keywords']).split(';')
for keyword in keywords:
if keyword.strip():
sentences.append(keyword.strip())
file_sources.append(uploaded_file.name)
dois.append(paper_doi)
paper_titles.append(paper_title)
source_titles.append(source_title)
authors_list.append(paper_authors)
years.append(paper_year)
sentence_sources.append('Author Keywords')
# Extract index keywords if available
if 'Index Keywords' in raw_df.columns and pd.notna(row['Index Keywords']):
keywords = str(row['Index Keywords']).split(';')
for keyword in keywords:
if keyword.strip():
sentences.append(keyword.strip())
file_sources.append(uploaded_file.name)
dois.append(paper_doi)
paper_titles.append(paper_title)
source_titles.append(source_title)
authors_list.append(paper_authors)
years.append(paper_year)
sentence_sources.append('Index Keywords')
# Add to combined lists
all_sentences.extend(sentences)
all_metadata.extend([{
'doi': doi,
'title': title,
'container_title': source,
'authors': authors,
'published_date': str(year) if year else '',
'source': uploaded_file.name,
'sentence_source': source_type
} for doi, title, source, authors, year, source_type in zip(
dois, paper_titles, source_titles, authors_list, years, sentence_sources)])
# Create the corpus DataFrame with separate columns
self.corpus_df = pd.DataFrame({
'sentence': sentences,
'doi': dois,
'paper_title': paper_titles,
'source_title': source_titles, # Journal/publication name
'authors': authors_list,
'year': years,
'file_source': file_sources,
'sentence_source': sentence_sources
})
# Save a copy to persistent storage
corpus_path = self.data_dir / f"processed_corpus_{uploaded_file.name}"
self.corpus_df.to_excel(corpus_path.with_suffix('.xlsx'), index=False)
st.success(f"Extracted {len(sentences)} sentences from CSV and saved to {corpus_path}")
# Process Excel files
elif file_extension in ['.xlsx', '.xls']:
# Read Excel file
try:
raw_df = pd.read_excel(uploaded_file)
sentences = []
file_sources = []
dois = []
paper_titles = []
source_titles = [] # For journal/publication names
authors_list = []
years = []
sentence_sources = []
# Process each row
for i, row in raw_df.iterrows():
# Get paper details for metadata
paper_title = row.get('Title', '')
paper_doi = row.get('DOI', '')
paper_authors = row.get('Authors', '')
paper_year = row.get('Year', '')
source_title = row.get('Source title', '') # Get source title (journal name)
# Extract title if available
if 'Title' in raw_df.columns and pd.notna(row['Title']):
sentences.append(row['Title'])
file_sources.append(uploaded_file.name)
dois.append(paper_doi)
paper_titles.append(paper_title)
source_titles.append(source_title)
authors_list.append(paper_authors)
years.append(paper_year)
sentence_sources.append('Title')
# Extract abstract if available
if 'Abstract' in raw_df.columns and pd.notna(row['Abstract']):
# Split abstract into sentences (simple split by periods)
abstract_sentences = [s.strip() for s in str(row['Abstract']).split('.') if s.strip()]
for abs_sent in abstract_sentences:
sentences.append(abs_sent)
file_sources.append(uploaded_file.name)
dois.append(paper_doi)
paper_titles.append(paper_title)
source_titles.append(source_title)
authors_list.append(paper_authors)
years.append(paper_year)
sentence_sources.append('Abstract')
# Extract author keywords if available
if 'Author Keywords' in raw_df.columns and pd.notna(row['Author Keywords']):
keywords = str(row['Author Keywords']).split(';')
for keyword in keywords:
if keyword.strip():
sentences.append(keyword.strip())
file_sources.append(uploaded_file.name)
dois.append(paper_doi)
paper_titles.append(paper_title)
source_titles.append(source_title)
authors_list.append(paper_authors)
years.append(paper_year)
sentence_sources.append('Author Keywords')
# Extract index keywords if available
if 'Index Keywords' in raw_df.columns and pd.notna(row['Index Keywords']):
keywords = str(row['Index Keywords']).split(';')
for keyword in keywords:
if keyword.strip():
sentences.append(keyword.strip())
file_sources.append(uploaded_file.name)
dois.append(paper_doi)
paper_titles.append(paper_title)
source_titles.append(source_title)
authors_list.append(paper_authors)
years.append(paper_year)
sentence_sources.append('Index Keywords')
# Add to combined lists
all_sentences.extend(sentences)
all_metadata.extend([{
'doi': doi,
'title': title,
'container_title': source,
'authors': authors,
'published_date': str(year) if year else '',
'source': uploaded_file.name,
'sentence_source': source_type
} for doi, title, source, authors, year, source_type in zip(
dois, paper_titles, source_titles, authors_list, years, sentence_sources)])
# Create the corpus DataFrame with separate columns
self.corpus_df = pd.DataFrame({
'sentence': sentences,
'doi': dois,
'paper_title': paper_titles,
'source_title': source_titles, # Journal/publication name
'authors': authors_list,
'year': years,
'file_source': file_sources,
'sentence_source': sentence_sources
})
# Save a copy to persistent storage
corpus_path = self.data_dir / f"processed_corpus_{uploaded_file.name}"
self.corpus_df.to_excel(corpus_path.with_suffix('.xlsx'), index=False)
st.success(f"Extracted {len(sentences)} sentences from Excel and saved to {corpus_path}")
except Exception as excel_e:
st.error(f"Error processing Excel: {excel_e}")
else:
st.warning(f"Skipping unsupported file format: {file_extension}. Please upload PDF, Excel, or CSV files.")
continue
# Create combined DataFrame from all extracted sentences
if all_sentences:
# Process data to ensure it's safe for Excel
data = {
'sentence': [sanitize_text(s) for s in all_sentences],
'doi': [sanitize_text(m.get('doi', '')) for m in all_metadata],
'paper_title': [sanitize_text(m.get('title', '')) for m in all_metadata],
'source_title': [sanitize_text(m.get('container_title', '')) for m in all_metadata],
'authors': [sanitize_text(m.get('authors', '')) for m in all_metadata],
'year': [sanitize_text(m.get('published_date', '')[:4]) if m.get('published_date') else '' for m in all_metadata],
'file_source': [sanitize_text(m.get('source', '')) for m in all_metadata],
'sentence_source': [sanitize_text(m.get('sentence_source', 'PDF Text')) for m in all_metadata],
'page': [m.get('page', '') for m in all_metadata]
}
# Convert to DataFrame
self.corpus_df = pd.DataFrame(data)
try:
# Save a copy to persistent storage
corpus_path = self.data_dir / "combined_corpus.xlsx"
self.corpus_df.to_excel(corpus_path, index=False)
st.success(f"Combined {len(all_sentences)} sentences from {len(uploaded_files)} files and saved to {corpus_path}")
# Display preview of the loaded data
st.write("First few rows of the combined corpus:")
st.write(self.corpus_df.head())
except Exception as save_e:
st.error(f"Error saving to Excel: {save_e}")
# Fallback to CSV which is more robust
csv_path = self.data_dir / "combined_corpus.csv"
self.corpus_df.to_csv(csv_path, index=False)
st.warning(f"Saved as CSV instead at {csv_path} due to Excel compatibility issues")
else:
st.error("No valid sentences extracted from the uploaded files.")
except Exception as e:
st.error(f"Error loading corpus: {e}")
import traceback
st.error(traceback.format_exc())
def load_theory(self, uploaded_file):
"""
Load theory sentences from Excel or CSV.
These are the sentences we want to match with the corpus.
Args:
uploaded_file: File uploaded through Streamlit interface
"""
try:
# Determine file type from extension
file_extension = Path(uploaded_file.name).suffix.lower()
# Process Excel files
if file_extension in ['.xlsx', '.xls']:
# Read Excel file
self.theory_df = pd.read_excel(uploaded_file)
# Save a copy to persistent storage
theory_path = self.data_dir / f"theory_{uploaded_file.name}"
self.theory_df.to_excel(theory_path, index=False)
st.success(f"Loaded {len(self.theory_df)} rows from theory Excel file and saved to {theory_path}")
# Process CSV files
elif file_extension == '.csv':
# Read CSV file
self.theory_df = pd.read_csv(uploaded_file)
# Save a copy to persistent storage
theory_path = self.data_dir / f"theory_{uploaded_file.name}"
self.theory_df.to_csv(theory_path, index=False)
st.success(f"Loaded {len(self.theory_df)} rows from theory CSV file and saved to {theory_path}")
# Handle unsupported file types
else:
st.error("Unsupported file format for theory data. Please upload an Excel or CSV file.")
return
# Display preview of the loaded data
st.write("First few rows of the theory data:")
st.write(self.theory_df.head())
except Exception as e:
st.error(f"Error loading theory data: {e}")
def save_embeddings(self, corpus_nodes, theory_nodes):
"""
Save embeddings to persistent storage on Hugging Face Space.
This allows reusing embeddings without recomputing them.
Args:
corpus_nodes: List of TextNode objects for corpus sentences
theory_nodes: List of TextNode objects for theory sentences
Returns:
bool: True if saving was successful, False otherwise
"""
try:
# Prepare corpus embeddings for serialization
corpus_data = []
for node in corpus_nodes:
corpus_data.append({
"text": node.text,
"metadata": node.metadata,
# Convert numpy array to list for JSON serialization
# "embedding": node.embedding.tolist()
"embedding": node.embedding
})
# Prepare theory embeddings for serialization
theory_data = []
for node in theory_nodes:
theory_data.append({
"text": node.text,
"metadata": node.metadata,
# Convert numpy array to list for JSON serialization
# "embedding": node.embedding.tolist()
"embedding": node.embedding
})
# Define paths for saving embeddings
corpus_emb_path = self.embeddings_dir / "corpus_embeddings.json"
theory_emb_path = self.embeddings_dir / "theory_embeddings.json"
# Save corpus embeddings to JSON
with open(corpus_emb_path, 'w') as f:
json.dump(corpus_data, f)
# Save theory embeddings to JSON
with open(theory_emb_path, 'w') as f:
json.dump(theory_data, f)
st.success(f"Saved embeddings to {self.embeddings_dir}")
return True
except Exception as e:
st.error(f"Error saving embeddings: {e}")
return False
def load_embeddings(self):
"""
Load previously saved embeddings from persistent storage.
This avoids recomputing embeddings for the same data.
Returns:
Tuple containing:
- List of corpus TextNode objects (or None if loading fails)
- List of theory TextNode objects (or None if loading fails)
"""
try:
# Define paths for loading embeddings
corpus_emb_path = self.embeddings_dir / "corpus_embeddings.json"
theory_emb_path = self.embeddings_dir / "theory_embeddings.json"
# Check if embedding files exist
if not corpus_emb_path.exists() or not theory_emb_path.exists():
st.warning("No saved embeddings found.")
return None, None
# Load corpus embeddings from JSON
with open(corpus_emb_path, 'r') as f:
corpus_data = json.load(f)
# Load theory embeddings from JSON
with open(theory_emb_path, 'r') as f:
theory_data = json.load(f)
# Recreate corpus TextNode objects
corpus_nodes = []
for item in corpus_data:
node = TextNode(
text=item["text"],
metadata=item["metadata"],
# Convert list back to numpy array
embedding=np.array(item["embedding"])
)
corpus_nodes.append(node)
# Recreate theory TextNode objects
theory_nodes = []
for item in theory_data:
node = TextNode(
text=item["text"],
metadata=item["metadata"],
# Convert list back to numpy array
embedding=np.array(item["embedding"])
)
theory_nodes.append(node)
st.success(f"Loaded {len(corpus_nodes)} corpus embeddings and {len(theory_nodes)} theory embeddings from {self.embeddings_dir}")
return corpus_nodes, theory_nodes
except Exception as e:
st.error(f"Error loading embeddings: {e}")
return None, None
def create_embeddings(self, use_cached=False):
"""
Create embeddings for corpus and theory sentences and store in LlamaIndex.
Uses Hugging Face sentence transformer model to generate embeddings.
Args:
use_cached: Whether to try loading cached embeddings first
"""
try:
# Try to load cached embeddings if requested
if use_cached:
corpus_nodes, theory_nodes = self.load_embeddings()
if corpus_nodes and theory_nodes:
# If cached embeddings were successfully loaded
self.corpus_nodes = corpus_nodes
self.theory_nodes = theory_nodes
# Initialize vector store with loaded embeddings
all_nodes = corpus_nodes + theory_nodes
self.vector_store = SimpleVectorStore()
# Create a custom settings object with our embedding model
from llama_index.core import Settings
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
# Initialize embedding model for LlamaIndex
hf_embed_model = HuggingFaceEmbedding(model_name=self.model_name, trust_remote_code=True)
# Set the embedding model in settings and create index
Settings.embed_model = hf_embed_model
self.index = VectorStoreIndex(all_nodes)
return
# Check if data is loaded
if self.corpus_df is None or self.theory_df is None:
st.error("Please load both corpus and theory data first.")
return
# Load model if not already loaded
if self.model is None:
self.load_model()
st.info("Creating embeddings for corpus and theory sentences...")
# Initialize vector store for storing embeddings
self.vector_store = SimpleVectorStore()
# Create status containers for progress reporting
corpus_status = st.empty()
theory_status = st.empty()
corpus_progress = st.progress(0)
theory_progress = st.progress(0)
# Process corpus sentences
corpus_nodes = []
total_corpus = len(self.corpus_df)
corpus_status.text(f"Processing corpus sentences: 0/{total_corpus}")
for i, row in self.corpus_df.iterrows():
# Update progress every 10 sentences or so
if i % 10 == 0 or i == total_corpus - 1:
progress_pct = min(i / total_corpus, 1.0)
corpus_progress.progress(progress_pct)
corpus_status.text(f"Processing corpus sentences: {i+1}/{total_corpus} ({int(progress_pct*100)}%)")
# Get sentence text
sentence = row['sentence']
# Create metadata dictionary from all columns except 'sentence'
metadata = {col: row[col] for col in self.corpus_df.columns if col != 'sentence'}
# Create text node with embeddings from the Hugging Face model
node = TextNode(
text=sentence,
metadata=metadata,
# Generate embedding for the sentence
embedding=self.model.encode(sentence).tolist()
)
corpus_nodes.append(node)
corpus_status.text(f"Completed processing all {total_corpus} corpus sentences")
corpus_progress.progress(1.0)
# Process theory sentences
theory_nodes = []
total_theory = len(self.theory_df)
theory_status.text(f"Processing theory sentences: 0/{total_theory}")
for i, row in self.theory_df.iterrows():
# Update progress every 10 sentences or so
if i % 10 == 0 or i == total_theory - 1:
progress_pct = min(i / total_theory, 1.0)
theory_progress.progress(progress_pct)
theory_status.text(f"Processing theory sentences: {i+1}/{total_theory} ({int(progress_pct*100)}%)")
# Get sentence text
sentence = row['sentence']
# Create metadata from all other columns
metadata = {col: row[col] for col in self.theory_df.columns if col != 'sentence'}
# Add a theory indicator
metadata["type"] = "theory"
# Create text node with embeddings from the Hugging Face model
node = TextNode(
text=sentence,
metadata=metadata,
# Generate embedding for the sentence
embedding=self.model.encode(sentence).tolist()
)
theory_nodes.append(node)
theory_status.text(f"Completed processing all {total_theory} theory sentences")
theory_progress.progress(1.0)
# Add all nodes to the vector store
all_nodes = corpus_nodes + theory_nodes
# Create index for vector search - use HuggingFace embedding model explicitly
st.info("Creating vector index for similarity search...")
# Initialize embedding model for LlamaIndex
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core import Settings
# Set the embedding model in settings
hf_embed_model = HuggingFaceEmbedding(model_name=self.model_name, trust_remote_code=True)
Settings.embed_model = hf_embed_model
# Now create the index with the correct embedding model
self.index = VectorStoreIndex(all_nodes)
# Save embeddings to persistent storage
st.info("Saving embeddings to persistent storage...")
if self.save_embeddings(corpus_nodes, theory_nodes):
st.success(f"Successfully created and saved embeddings for {len(all_nodes)} sentences")
# Store the nodes separately for direct similarity calculations
self.corpus_nodes = corpus_nodes
self.theory_nodes = theory_nodes
except Exception as e:
st.error(f"Error creating embeddings: {e}")
import traceback
st.error(traceback.format_exc()) # Show the full stack trace for debugging
def find_matches(self):
"""
Find matches between corpus and theory sentences with
cosine similarity above the threshold.
"""
try:
# Check if embeddings have been created in memory
if not hasattr(self, 'corpus_nodes') or not hasattr(self, 'theory_nodes') or not self.corpus_nodes or not self.theory_nodes:
# Try to load embeddings from disk
st.info("Loading embeddings from disk...")
corpus_nodes, theory_nodes = self.load_embeddings()
# If loading failed, show error
if not corpus_nodes or not theory_nodes:
st.error("Please create embeddings first.")
return
# Store loaded embeddings in memory
self.corpus_nodes = corpus_nodes
self.theory_nodes = theory_nodes
st.info(f"Finding matches with similarity threshold > {self.similarity_threshold}...")
# Create list to store matches
matches = []
# Extract embeddings from nodes into numpy arrays for efficient computation
corpus_embeddings = np.array([node.embedding for node in self.corpus_nodes])
theory_embeddings = np.array([node.embedding for node in self.theory_nodes])
# Calculate cosine similarity matrix between all corpus and theory sentences
# This is efficient as it computes all pairwise similarities at once
similarity_matrix = cosine_similarity(corpus_embeddings, theory_embeddings)
# Find matches above threshold
for i in range(similarity_matrix.shape[0]):
for j in range(similarity_matrix.shape[1]):
sim_score = similarity_matrix[i, j]
# Only include matches above the threshold
if sim_score > self.similarity_threshold:
matches.append({
'corpus_sentence': self.corpus_nodes[i].text,
'theory_sentence': self.theory_nodes[j].text,
'similarity_score': sim_score,
'corpus_metadata': self.corpus_nodes[i].metadata,
'theory_metadata': self.theory_nodes[j].metadata
})
# Create DataFrame and sort by similarity score (highest first)
matches_df = pd.DataFrame(matches)
if not matches_df.empty:
matches_df = matches_df.sort_values(by='similarity_score', ascending=False)
# Display matches
st.success(f"Found {len(matches_df)} matches above similarity threshold {self.similarity_threshold}")
st.write(matches_df)
# Save matches to persistent storage
matches_path = self.data_dir / f"matches_threshold_{self.similarity_threshold}.csv"
matches_df.to_csv(matches_path, index=False)
# Create downloadable CSV
csv = matches_df.to_csv(index=False)
st.download_button(
label="Download matches as CSV",
data=csv,
file_name="similarity_matches.csv",
mime="text/csv"
)
st.info(f"Matches saved to {matches_path}")
else:
st.warning(f"No matches found above similarity threshold {self.similarity_threshold}")
except Exception as e:
st.error(f"Error finding matches: {e}")
import traceback
st.error(traceback.format_exc())
def adjust_threshold(self, new_threshold):
"""
Adjust similarity threshold for matching.
Args:
new_threshold: New threshold value (0.0-1.0)
"""
self.similarity_threshold = new_threshold
st.success(f"Similarity threshold updated to {self.similarity_threshold}")
def run_app(self):
"""
Main application function that creates the Streamlit interface
and handles the application flow.
"""
# Set up the main page
st.title("Document Similarity Matcher")
st.write("Match sentences from your corpus with theory sentences using sentence embeddings")
# Sidebar with model information
st.sidebar.header("Model Information")
st.sidebar.info(f"Using model: {self.model_name}\nEmbedding dimensions: {self.embedding_dim}")
# Load model button
if st.sidebar.button("Load Model"):
self.load_model()
# Step 1: Corpus data upload
st.header("1. Upload Corpus Data")
st.write("Upload a file containing sentences to analyze (PDF, Excel, or CSV)")
corpus_files = st.file_uploader("Upload corpus file (PDF, Excel, CSV)", type=["pdf", "xlsx", "xls", "csv"], accept_multiple_files=True)
if corpus_files:
st.info(f"Loaded {len(corpus_files)} files. Processing...")
corpus_process = st.button("Process Corpus Files")
if True:
self.load_corpus(corpus_files)
# Step 2: Theory data upload
st.header("2. Upload Theory Data")
st.write("Upload a file containing theory sentences to match against (Excel or CSV)")
theory_file = st.file_uploader("Upload theory file (Excel, CSV)", type=["xlsx", "xls", "csv"])
if theory_file is not None:
self.load_theory(theory_file)
# Step 3: Create embeddings
st.header("3. Create Embeddings")
st.write("Transform sentences into vector embeddings using the sentence transformer model")
use_cached = st.checkbox("Use cached embeddings if available", value=True)
if st.button("Create Embeddings"):
self.create_embeddings(use_cached=use_cached)
# Step 4: Set similarity threshold
st.header("4. Set Similarity Threshold")
st.write("Adjust the minimum similarity score required for a match (0.0-1.0)")
new_threshold = st.slider("Similarity threshold", min_value=0.0, max_value=1.0, value=0.75, step=0.01)
if new_threshold != self.similarity_threshold:
self.adjust_threshold(new_threshold)
# Step 5: Find matches
st.header("5. Find Matches")
st.write("Find corpus sentences that match theory sentences above the similarity threshold")
if st.button("Find Matches"):
self.find_matches()
def extract_doi_from_text(text):
"""
Extract DOI from text using regex pattern matching.
This serves as a fallback when pdf2doi is not available.
Args:
text (str): Text to search for DOI
Returns:
str: The DOI if found, empty string if not found
"""
# Common DOI patterns
doi_patterns = [
r'(?:doi|DOI):\s*(10\.\d{4,}(?:\.\d+)*\/\S+)',
r'(?:https?://)?(?:dx\.)?doi\.org/(10\.\d{4,}(?:\.\d+)*\/\S+)',
r'(10\.\d{4,}(?:\.\d+)*\/\S+)'
]
# Search for DOI using each pattern
for pattern in doi_patterns:
match = re.search(pattern, text)
if match:
# Clean up the DOI by removing trailing punctuation
doi = match.group(1).rstrip('.,;:')
return doi
return ""
def extract_doi_from_pdf(pdf_bytes):
"""
Extract DOI from a PDF file.
Args:
pdf_bytes: PDF file as bytes
Returns:
str: The DOI if found, empty string if not found or error occurred
"""
try:
# Create a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp:
tmp.write(pdf_bytes)
tmp_path = tmp.name
# Try using pdf2doi if available
if PDF2DOI_AVAILABLE:
try:
result = pdf2doi(tmp_path)
if result and 'identifier' in result and result['identifier']:
os.unlink(tmp_path) # Clean up
return result['identifier']
except Exception as e:
st.warning(f"pdf2doi error: {e}. Falling back to text-based extraction.")
# Fallback: Extract DOI from text
doc = fitz.open(tmp_path)
text = ""
# Check first few pages for DOI
for page_num in range(min(5, len(doc))):
text += doc[page_num].get_text()
os.unlink(tmp_path) # Clean up
# Try to find DOI in the extracted text
doi = extract_doi_from_text(text)
return doi
except Exception as e:
st.error(f"Error extracting DOI: {str(e)}")
return ""
def get_metadata(doi):
"""
Retrieve metadata for a DOI from the Crossref API
Args:
doi (str): DOI string to lookup
Returns:
dict: Raw metadata response or None if request failed
"""
url = f"https://api.crossref.org/works/{doi}"
headers = {
"Accept": "application/json",
"User-Agent": "DataNova/1.0 (mailto:your.email@example.com)"
}
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json()
else:
st.warning(f"Failed to retrieve metadata: HTTP {response.status_code}")
return None
except Exception as e:
st.warning(f"Error retrieving metadata: {str(e)}")
return None
def extract_structured_metadata(metadata):
"""
Extract relevant fields from the metadata response
Args:
metadata (dict): Raw metadata from Crossref API
Returns:
dict: Structured metadata with key fields
"""
if not metadata or 'message' not in metadata:
return {}
work = metadata.get('message', {})
# Extract basic information
structured_data = {
'doi': work.get('DOI', ''),
'title': work.get('title', [''])[0] if work.get('title') else '',
'container_title': work.get('container-title', [''])[0] if work.get('container-title') else '',
'publisher': work.get('publisher', ''),
'type': work.get('type', ''),
'volume': work.get('volume', ''),
'issue': work.get('issue', ''),
'page': work.get('page', ''),
'article_number': work.get('article-number', ''),
'published_date': '',
'reference_count': work.get('reference-count', 0),
'citation_count': work.get('is-referenced-by-count', 0),
'abstract': work.get('abstract', '')
}
# Extract publication date
if 'published' in work and 'date-parts' in work['published'] and work['published']['date-parts']:
date_parts = work['published']['date-parts'][0]
if len(date_parts) >= 3:
structured_data['published_date'] = f"{date_parts[0]}-{date_parts[1]:02d}-{date_parts[2]:02d}"
elif len(date_parts) == 2:
structured_data['published_date'] = f"{date_parts[0]}-{date_parts[1]:02d}"
elif len(date_parts) == 1:
structured_data['published_date'] = f"{date_parts[0]}"
# Extract authors
authors = work.get('author', [])
structured_data['authors'] = []
structured_data['author_names'] = []
for author in authors:
author_data = {
'given': author.get('given', ''),
'family': author.get('family', ''),
'orcid': author.get('ORCID', ''),
'sequence': author.get('sequence', '')
}
structured_data['authors'].append(author_data)
structured_data['author_names'].append(f"{author.get('given', '')} {author.get('family', '')}")
# Convert author_names to string for DataFrame compatibility
structured_data['author_names'] = '; '.join(structured_data['author_names'])
# Extract funding information
funders = work.get('funder', [])
structured_data['funders'] = []
structured_data['funder_names'] = []
for funder in funders:
funder_data = {
'name': funder.get('name', ''),
'doi': funder.get('DOI', ''),
'award': funder.get('award', [])
}
structured_data['funders'].append(funder_data)
structured_data['funder_names'].append(funder.get('name', ''))
# Convert funder_names to string for DataFrame compatibility
structured_data['funder_names'] = '; '.join(structured_data['funder_names'])
return structured_data
def sanitize_for_excel(text):
"""
Sanitize text to be Excel-friendly, keeping only alphanumeric and basic characters.
Args:
text (str): The input text to sanitize
Returns:
str: Sanitized text safe for Excel
"""
# First sanitize all text to remove problematic characters
text = sanitize_text(text)
if not text:
return "Unnamed"
# First, replace common problematic characters with simple alternatives
result = text.replace(':', '').replace('/', ' ').replace('\\', ' ')
result = result.replace('[', '').replace(']', '')
result = result.replace('*', '').replace('?', '')
result = result.replace(',', ' ').replace(';', ' ')
# Keep only alphanumeric characters, spaces, periods, parentheses, and hyphens
allowed_chars = set('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 .-()_')
result = ''.join(c for c in result if c in allowed_chars)
# Trim excess whitespace
result = ' '.join(result.split())
# Excel worksheet names have a limit of 31 characters
if len(result) > 31:
result = result[:31]
# Ensure the text isn't empty after sanitizing
if not result.strip():
result = "Unnamed"
return result
def sanitize_text(text):
"""
Clean text by removing problematic characters and normalizing Unicode characters.
Args:
text: The text to clean
Returns:
str: Cleaned text without non-printable or problematic characters
"""
if not isinstance(text, str):
return str(text)
# First normalize Unicode to NFKD form to handle diacritical marks and special characters
import unicodedata
text = unicodedata.normalize('NFKD', text)
# Replace common problematic non-printable characters
text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F-\x9F]', '', text)
# Replace other common problematic characters for Excel
text = text.replace('\u2013', '-') # en dash
text = text.replace('\u2014', '-') # em dash
text = text.replace('\u2018', "'") # left single quote
text = text.replace('\u2019', "'") # right single quote
text = text.replace('\u201C', '"') # left double quote
text = text.replace('\u201D', '"') # right double quote
text = text.replace('\u2026', '...') # ellipsis
# Remove Unicode private use area characters
text = re.sub(r'[\uE000-\uF8FF]', '', text)
text = re.sub(r'[\U000F0000-\U000FFFFD]', '', text)
text = re.sub(r'[\U00100000-\U0010FFFD]', '', text)
# Convert to ASCII and ignore non-convertible characters (aggressive cleaning)
text = text.encode('ascii', errors='ignore').decode('ascii')
# Remove control characters
text = ''.join(ch for ch in text if unicodedata.category(ch)[0] != 'C')
return text
def main():
"""
Entry point for the application.
Creates an instance of the SentenceEmbeddingApp and runs it.
"""
app = SentenceEmbeddingApp()
app.run_app()
# Standard Python idiom to check if this file is being run directly
if __name__ == "__main__":
main()