| import streamlit as st |
| from typing import List, Dict |
| import httpx |
| from pathlib import Path |
| import os |
| from dotenv import load_dotenv |
| import json |
| import numpy as np |
| from pymongo import MongoClient |
| from openai import OpenAI |
| from datetime import datetime |
| import asyncio |
| import pandas as pd |
|
|
| |
| load_dotenv() |
| PERPLEXITY_API_KEY = os.getenv("PERPLEXITY_KEY") |
| MONGODB_URI = os.getenv("MONGO_URI") |
| OPENAI_API_KEY = os.getenv("OPENAI_KEY") |
|
|
| |
| client = MongoClient(MONGODB_URI) |
| db = client["document_analysis"] |
| vectors_collection = db["document_vectors"] |
|
|
| |
| openai_client = OpenAI(api_key=OPENAI_API_KEY) |
|
|
|
|
| class GoalAnalyzer: |
| def __init__(self): |
| self.api_key = PERPLEXITY_API_KEY |
| self.base_url = "https://api.perplexity.ai/chat/completions" |
|
|
| def clean_json_string(self, content: str) -> str: |
| """Clean and extract valid JSON from string""" |
| |
| if "```json" in content: |
| content = content.split("```json")[1].split("```")[0] |
| elif "```" in content: |
| content = content.split("```")[1] |
|
|
| |
| start_idx = content.find("{") |
| end_idx = content.rfind("}") + 1 |
|
|
| if start_idx != -1 and end_idx > 0: |
| content = content[start_idx:end_idx] |
|
|
| |
| content = content.strip() |
| content = content.replace("\n", "") |
| content = content.replace("'", '"') |
|
|
| return content |
|
|
| async def get_perplexity_analysis(self, text: str, goal: str) -> Dict: |
| """Get analysis from Perplexity API""" |
| headers = { |
| "Authorization": f"Bearer {self.api_key}", |
| "Content-Type": "application/json", |
| } |
|
|
| prompt = f""" |
| Analyze the following text in context of the goal: {goal} |
| |
| Text: {text} |
| |
| Provide analysis in the following JSON format: |
| {{ |
| "themes": ["theme1", "theme2"], |
| "subthemes": {{"theme1": ["subtheme1", "subtheme2"], "theme2": ["subtheme3"]}}, |
| "keywords": ["keyword1", "keyword2"], |
| "relevance_score": 0-100 |
| }} |
| """ |
|
|
| try: |
| async with httpx.AsyncClient() as client: |
| payload = { |
| "model": "llama-3.1-sonar-small-128k-chat", |
| "messages": [ |
| { |
| "role": "system", |
| "content": "You are an AI assistant that analyzes documents and provides structured analysis.", |
| }, |
| {"role": "user", "content": prompt}, |
| ], |
| "max_tokens": 1024, |
| } |
|
|
| |
| with st.expander("Debug Info", expanded=False): |
| st.write("Request payload:", payload) |
|
|
| response = await client.post( |
| self.base_url, headers=headers, json=payload, timeout=30.0 |
| ) |
|
|
| |
| with st.expander("Response Info", expanded=False): |
| st.write("Response status:", response.status_code) |
| st.write("Response headers:", dict(response.headers)) |
| st.write("Response content:", response.text) |
|
|
| if response.status_code != 200: |
| error_detail = ( |
| response.json() if response.content else "No error details" |
| ) |
| raise Exception( |
| f"API returned status code {response.status_code}. Details: {error_detail}" |
| ) |
|
|
| result = response.json() |
| content = ( |
| result.get("choices", [{}])[0].get("message", {}).get("content", "") |
| ) |
|
|
| |
| cleaned_content = self.clean_json_string(content) |
|
|
| try: |
| analysis = json.loads(cleaned_content) |
|
|
| |
| required_fields = [ |
| "themes", |
| "subthemes", |
| "keywords", |
| "relevance_score", |
| ] |
| for field in required_fields: |
| if field not in analysis: |
| analysis[field] = [] if field != "relevance_score" else 0 |
|
|
| return analysis |
|
|
| except json.JSONDecodeError as e: |
| st.error(f"JSON parsing error: {str(e)}") |
| st.error(f"Failed content: {cleaned_content}") |
| return { |
| "themes": ["Error parsing themes"], |
| "subthemes": {"Error": ["Failed to parse subthemes"]}, |
| "keywords": ["parsing-error"], |
| "relevance_score": 0, |
| } |
|
|
| except Exception as e: |
| st.error(f"API Error: {str(e)}") |
| return None |
|
|
| def extract_text_from_file(self, file) -> str: |
| """Extract text content from uploaded file""" |
| try: |
| text = "" |
| file_type = file.type |
|
|
| if file_type == "text/plain": |
| text = file.getvalue().decode("utf-8") |
| elif file_type == "application/pdf": |
| import PyPDF2 |
|
|
| pdf_reader = PyPDF2.PdfReader(file) |
| for page in pdf_reader.pages: |
| text += page.extract_text() |
| elif ( |
| file_type |
| == "application/vnd.openxmlformats-officedocument.wordprocessingml.document" |
| ): |
| import docx |
|
|
| doc = docx.Document(file) |
| text = " ".join([paragraph.text for paragraph in doc.paragraphs]) |
|
|
| return text |
| except Exception as e: |
| st.error(f"Error extracting text: {str(e)}") |
| return "" |
|
|
|
|
| class DocumentVectorizer: |
| def __init__(self): |
| self.model = "text-embedding-ada-002" |
| self.client = MongoClient(MONGODB_URI) |
| self.db = self.client["document_analysis"] |
| self.vectors_collection = self.db["document_vectors"] |
|
|
| |
| try: |
| self.vectors_collection.create_index( |
| [("vector", "2dsphere")], |
| { |
| "vectorSearchConfig": { |
| "dimensions": 1536, |
| "similarity": "cosine", |
| } |
| }, |
| ) |
| except Exception as e: |
| st.warning(f"Vector index may already exist") |
|
|
| def get_embedding(self, text: str) -> list: |
| """Get embedding vector for text using OpenAI""" |
| try: |
| response = openai_client.embeddings.create(model=self.model, input=text) |
| return response.data[0].embedding |
| except Exception as e: |
| st.error(f"Error getting embedding: {str(e)}") |
| return None |
|
|
| |
| def vector_exists(self, doc_name: str) -> bool: |
| """Check if vector exists for document""" |
| return self.vectors_collection.count_documents({"name": doc_name}) > 0 |
|
|
| |
| def store_vector(self, doc_name: str, vector: list, text: str, goal: str = None): |
| """Store document/goal vector in MongoDB using upsert""" |
| try: |
| vector_doc = { |
| "name": doc_name, |
| "vector": vector, |
| "text": text, |
| "type": "document" if goal is None else "goal", |
| "goal": goal, |
| "updated_at": datetime.utcnow(), |
| } |
|
|
| |
| self.vectors_collection.update_one( |
| {"name": doc_name}, |
| {"$set": vector_doc, "$setOnInsert": {"created_at": datetime.utcnow()}}, |
| upsert=True, |
| ) |
|
|
| except Exception as e: |
| st.error(f"Error storing vector: {str(e)}") |
|
|
| |
| def vector_search(self, query_vector: List[float], limit: int = 5) -> List[Dict]: |
| """Search for similar documents using vector similarity""" |
| try: |
| |
| documents = list(self.vectors_collection.find({"type": "document"})) |
|
|
| |
| similarities = [] |
| for doc in documents: |
| similarity = self.calculate_similarity(query_vector, doc["vector"]) |
| similarities.append( |
| { |
| "name": doc["name"], |
| "text": doc["text"], |
| "similarity": similarity, |
| "similarity_display": f"{similarity*100:.1f}%", |
| } |
| ) |
|
|
| |
| sorted_docs = sorted( |
| similarities, |
| key=lambda x: x["similarity"], |
| reverse=True, |
| )[:limit] |
|
|
| return sorted_docs |
|
|
| except Exception as e: |
| st.error(f"Vector search error: {str(e)}") |
| return [] |
|
|
| def find_similar_documents(self, text: str, limit: int = 5) -> List[Dict]: |
| """Find similar documents for given text""" |
| vector = self.get_embedding(text) |
| if vector: |
| return self.vector_search(vector, limit) |
| return [] |
|
|
| def calculate_similarity(self, vector1: list, vector2: list) -> float: |
| """Calculate cosine similarity between two vectors""" |
| return np.dot(vector1, vector2) / ( |
| np.linalg.norm(vector1) * np.linalg.norm(vector2) |
| ) |
|
|
|
|
| def display_analysis_results(analysis: Dict): |
| """Display analysis results in Streamlit UI""" |
| if not analysis: |
| return |
|
|
| |
| st.subheader("Themes") |
| for theme in analysis.get("themes", []): |
| with st.expander(f"🎯 {theme}"): |
| |
| subthemes = analysis.get("subthemes", {}).get(theme, []) |
| if subthemes: |
| st.write("**Subthemes:**") |
| for subtheme in subthemes: |
| st.write(f"- {subtheme}") |
|
|
| |
| st.subheader("Keywords") |
| keywords = analysis.get("keywords", []) |
| st.write(" | ".join([f"🔑 {keyword}" for keyword in keywords])) |
|
|
| |
| score = analysis.get("relevance_score", 0) |
| st.metric("Relevance Score", f"{score}%") |
|
|
|
|
| def display_analyst_dashboard(): |
| st.title("Multi-Goal Document Analysis") |
|
|
| with st.sidebar: |
| st.markdown("### Input Section") |
| tab1, tab2 = st.tabs(["Document Analysis", "Similarity Search"]) |
| |
|
|
| with tab1: |
| |
| num_goals = st.number_input("Number of goals:", min_value=1, value=1) |
| goals = [] |
| for i in range(num_goals): |
| goal = st.text_area(f"Goal {i+1}:", key=f"goal_{i}", height=100) |
| if goal: |
| goals.append(goal) |
|
|
| uploaded_files = st.file_uploader( |
| "Upload documents", |
| accept_multiple_files=True, |
| type=["txt", "pdf", "docx"], |
| ) |
| analyze_button = ( |
| st.button("Analyze Documents") if goals and uploaded_files else None |
| ) |
|
|
| with tab2: |
| |
| search_text = st.text_area("Enter text to find similar documents:") |
| search_limit = st.slider("Number of results", 1, 10, 5) |
| search_button = st.button("Search Similar") if search_text else None |
|
|
| if st.button("Logout", use_container_width=True): |
| for key in st.session_state.keys(): |
| del st.session_state[key] |
| st.rerun() |
|
|
| if analyze_button: |
| analyzer = GoalAnalyzer() |
| vectorizer = DocumentVectorizer() |
|
|
| |
| doc_vectors = {} |
| goal_vectors = {} |
|
|
| |
| with st.spinner("Processing goals..."): |
| for i, goal in enumerate(goals): |
| vector = vectorizer.get_embedding(goal) |
| if vector: |
| goal_vectors[f"Goal {i+1}"] = vector |
| vectorizer.store_vector(f"Goal {i+1}", vector, goal, goal) |
|
|
| |
| with st.spinner("Processing documents..."): |
| for file in uploaded_files: |
| st.markdown(f"### Analysis for {file.name}") |
|
|
| if vectorizer.vector_exists(file.name): |
| st.info(f"Vector already exists for {file.name}") |
| existing_doc = vectorizer.vectors_collection.find_one( |
| {"name": file.name} |
| ) |
| doc_vectors[file.name] = existing_doc["vector"] |
| else: |
| text = analyzer.extract_text_from_file(file) |
| if not text: |
| st.warning(f"Could not extract text from {file.name}") |
| continue |
|
|
| vector = vectorizer.get_embedding(text) |
| if vector: |
| doc_vectors[file.name] = vector |
| vectorizer.store_vector(file.name, vector, text) |
|
|
| |
| st.subheader("Goal Relevance Scores") |
| col1, col2 = st.columns([1, 2]) |
|
|
| with col1: |
| for goal_name, goal_vector in goal_vectors.items(): |
| similarity = ( |
| vectorizer.calculate_similarity( |
| doc_vectors[file.name], goal_vector |
| ) |
| * 100 |
| ) |
| st.metric(f"{goal_name}", f"{similarity:.1f}%") |
|
|
| with col2: |
| |
| analysis = asyncio.run( |
| analyzer.get_perplexity_analysis(text, " | ".join(goals)) |
| ) |
| display_analysis_results(analysis) |
|
|
| st.divider() |
|
|
| |
| if len(doc_vectors) > 1: |
| st.markdown("### Document Similarity Matrix") |
| files = list(doc_vectors.keys()) |
| similarity_matrix = [] |
|
|
| for file1 in files: |
| row = [] |
| for file2 in files: |
| similarity = vectorizer.calculate_similarity( |
| doc_vectors[file1], doc_vectors[file2] |
| ) |
| row.append(similarity) |
| similarity_matrix.append(row) |
|
|
| df = pd.DataFrame(similarity_matrix, columns=files, index=files) |
| st.dataframe(df.style.background_gradient(cmap="RdYlGn")) |
|
|
| |
| st.markdown("### Goal-Document Similarity Matrix") |
| goal_doc_matrix = [] |
| goal_names = list(goal_vectors.keys()) |
|
|
| for file in files: |
| row = [] |
| for goal in goal_names: |
| similarity = vectorizer.calculate_similarity( |
| doc_vectors[file], goal_vectors[goal] |
| ) |
| row.append(similarity) |
| goal_doc_matrix.append(row) |
|
|
| df_goals = pd.DataFrame( |
| goal_doc_matrix, columns=goal_names, index=files |
| ) |
| st.dataframe(df_goals.style.background_gradient(cmap="RdYlGn")) |
|
|
| |
| elif search_button: |
| vectorizer = DocumentVectorizer() |
| with st.spinner("Searching similar documents..."): |
| query_vector = vectorizer.get_embedding(search_text) |
| if query_vector: |
| similar_docs = vectorizer.vector_search(query_vector, search_limit) |
|
|
| if similar_docs: |
| st.markdown("### Similar Documents Found") |
|
|
| |
| df = pd.DataFrame(similar_docs) |
|
|
| |
| styled_df = df[["name", "similarity"]].style.background_gradient( |
| cmap="RdYlGn", subset=["similarity"] |
| ) |
|
|
| |
| styled_df = styled_df.format({"similarity": "{:.1%}"}) |
|
|
| st.dataframe(styled_df) |
|
|
| |
| for doc in similar_docs: |
| with st.expander( |
| f"📄 {doc['name']} (Similarity: {doc['similarity_display']})" |
| ): |
| st.text( |
| doc["text"][:20] + "..." |
| if len(doc["text"]) > 20 |
| else doc["text"] |
| ) |
| else: |
| st.info("No similar documents found") |
| else: |
| st.error("Could not process search query") |
|
|
|
|
| def main(): |
| st.title("Multi-Goal Document Analysis") |
|
|
| with st.sidebar: |
| st.markdown("### Input Section") |
| tab1, tab2 = st.tabs(["Document Analysis", "Similarity Search"]) |
| |
|
|
| with tab1: |
| |
| num_goals = st.number_input("Number of goals:", min_value=1, value=1) |
| goals = [] |
| for i in range(num_goals): |
| goal = st.text_area(f"Goal {i+1}:", key=f"goal_{i}", height=100) |
| if goal: |
| goals.append(goal) |
|
|
| uploaded_files = st.file_uploader( |
| "Upload documents", |
| accept_multiple_files=True, |
| type=["txt", "pdf", "docx"], |
| ) |
| analyze_button = ( |
| st.button("Analyze Documents") if goals and uploaded_files else None |
| ) |
|
|
| with tab2: |
| |
| search_text = st.text_area("Enter text to find similar documents:") |
| search_limit = st.slider("Number of results", 1, 10, 5) |
| search_button = st.button("Search Similar") if search_text else None |
|
|
| if analyze_button: |
| analyzer = GoalAnalyzer() |
| vectorizer = DocumentVectorizer() |
|
|
| |
| doc_vectors = {} |
| goal_vectors = {} |
|
|
| |
| with st.spinner("Processing goals..."): |
| for i, goal in enumerate(goals): |
| vector = vectorizer.get_embedding(goal) |
| if vector: |
| goal_vectors[f"Goal {i+1}"] = vector |
| vectorizer.store_vector(f"Goal {i+1}", vector, goal, goal) |
|
|
| |
| with st.spinner("Processing documents..."): |
| for file in uploaded_files: |
| st.markdown(f"### Analysis for {file.name}") |
|
|
| if vectorizer.vector_exists(file.name): |
| st.info(f"Vector already exists for {file.name}") |
| existing_doc = vectorizer.vectors_collection.find_one( |
| {"name": file.name} |
| ) |
| doc_vectors[file.name] = existing_doc["vector"] |
| else: |
| text = analyzer.extract_text_from_file(file) |
| if not text: |
| st.warning(f"Could not extract text from {file.name}") |
| continue |
|
|
| vector = vectorizer.get_embedding(text) |
| if vector: |
| doc_vectors[file.name] = vector |
| vectorizer.store_vector(file.name, vector, text) |
|
|
| |
| st.subheader("Goal Relevance Scores") |
| col1, col2 = st.columns([1, 2]) |
|
|
| with col1: |
| for goal_name, goal_vector in goal_vectors.items(): |
| similarity = ( |
| vectorizer.calculate_similarity( |
| doc_vectors[file.name], goal_vector |
| ) |
| * 100 |
| ) |
| st.metric(f"{goal_name}", f"{similarity:.1f}%") |
|
|
| with col2: |
| |
| analysis = asyncio.run( |
| analyzer.get_perplexity_analysis(text, " | ".join(goals)) |
| ) |
| display_analysis_results(analysis) |
|
|
| st.divider() |
|
|
| |
| if len(doc_vectors) > 1: |
| st.markdown("### Document Similarity Matrix") |
| files = list(doc_vectors.keys()) |
| similarity_matrix = [] |
|
|
| for file1 in files: |
| row = [] |
| for file2 in files: |
| similarity = vectorizer.calculate_similarity( |
| doc_vectors[file1], doc_vectors[file2] |
| ) |
| row.append(similarity) |
| similarity_matrix.append(row) |
|
|
| df = pd.DataFrame(similarity_matrix, columns=files, index=files) |
| st.dataframe(df.style.background_gradient(cmap="RdYlGn")) |
|
|
| |
| st.markdown("### Goal-Document Similarity Matrix") |
| goal_doc_matrix = [] |
| goal_names = list(goal_vectors.keys()) |
|
|
| for file in files: |
| row = [] |
| for goal in goal_names: |
| similarity = vectorizer.calculate_similarity( |
| doc_vectors[file], goal_vectors[goal] |
| ) |
| row.append(similarity) |
| goal_doc_matrix.append(row) |
|
|
| df_goals = pd.DataFrame( |
| goal_doc_matrix, columns=goal_names, index=files |
| ) |
| st.dataframe(df_goals.style.background_gradient(cmap="RdYlGn")) |
|
|
| |
| elif search_button: |
| vectorizer = DocumentVectorizer() |
| with st.spinner("Searching similar documents..."): |
| query_vector = vectorizer.get_embedding(search_text) |
| if query_vector: |
| similar_docs = vectorizer.vector_search(query_vector, search_limit) |
|
|
| if similar_docs: |
| st.markdown("### Similar Documents Found") |
|
|
| |
| df = pd.DataFrame(similar_docs) |
|
|
| |
| styled_df = df[["name", "similarity"]].style.background_gradient( |
| cmap="RdYlGn", subset=["similarity"] |
| ) |
|
|
| |
| styled_df = styled_df.format({"similarity": "{:.1%}"}) |
|
|
| st.dataframe(styled_df) |
|
|
| |
| for doc in similar_docs: |
| with st.expander( |
| f"📄 {doc['name']} (Similarity: {doc['similarity_display']})" |
| ): |
| st.text( |
| doc["text"][:20] + "..." |
| if len(doc["text"]) > 20 |
| else doc["text"] |
| ) |
| else: |
| st.info("No similar documents found") |
| else: |
| st.error("Could not process search query") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|