Spaces:
Sleeping
Sleeping
| from typing import List, Dict, Union | |
| from groq import Groq | |
| import chromadb | |
| import os | |
| import datetime | |
| import json | |
| import xml.etree.ElementTree as ET | |
| import nltk | |
| from nltk.tokenize import sent_tokenize | |
| import PyPDF2 | |
| from sentence_transformers import SentenceTransformer | |
| class CustomEmbeddingFunction: | |
| def __init__(self): | |
| self.model = SentenceTransformer('all-MiniLM-L6-v2') | |
| def __call__(self, input: List[str]) -> List[List[float]]: | |
| embeddings = self.model.encode(input) | |
| return embeddings.tolist() | |
| class UnifiedDocumentProcessor: | |
| def __init__(self, groq_api_key, collection_name="unified_content"): | |
| """Initialize the processor with necessary clients""" | |
| self.groq_client = Groq(api_key=groq_api_key) | |
| # XML-specific settings | |
| self.max_elements_per_chunk = 50 | |
| # PDF-specific settings | |
| self.pdf_chunk_size = 500 | |
| self.pdf_overlap = 50 | |
| # Initialize NLTK | |
| self._initialize_nltk() | |
| # Initialize ChromaDB with a single collection for all document types | |
| self.chroma_client = chromadb.Client() | |
| existing_collections = self.chroma_client.list_collections() | |
| collection_exists = any(col.name == collection_name for col in existing_collections) | |
| if collection_exists: | |
| print(f"Using existing collection: {collection_name}") | |
| self.collection = self.chroma_client.get_collection( | |
| name=collection_name, | |
| embedding_function=CustomEmbeddingFunction() | |
| ) | |
| else: | |
| print(f"Creating new collection: {collection_name}") | |
| self.collection = self.chroma_client.create_collection( | |
| name=collection_name, | |
| embedding_function=CustomEmbeddingFunction() | |
| ) | |
| def _initialize_nltk(self): | |
| """Ensure NLTK's `punkt` tokenizer resource is available.""" | |
| try: | |
| nltk.data.find('tokenizers/punkt') | |
| except LookupError: | |
| print("Downloading NLTK 'punkt' tokenizer...") | |
| nltk.download('punkt') | |
| def extract_text_from_pdf(self, pdf_path: str) -> str: | |
| """Extract text from PDF file""" | |
| try: | |
| text = "" | |
| with open(pdf_path, 'rb') as file: | |
| pdf_reader = PyPDF2.PdfReader(file) | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() + " " | |
| return text.strip() | |
| except Exception as e: | |
| raise Exception(f"Error extracting text from PDF: {str(e)}") | |
| def chunk_text(self, text: str) -> List[str]: | |
| """Split text into chunks while preserving sentence boundaries""" | |
| sentences = sent_tokenize(text) | |
| chunks = [] | |
| current_chunk = [] | |
| current_size = 0 | |
| for sentence in sentences: | |
| words = sentence.split() | |
| sentence_size = len(words) | |
| if current_size + sentence_size > self.pdf_chunk_size: | |
| if current_chunk: | |
| chunks.append(' '.join(current_chunk)) | |
| overlap_words = current_chunk[-self.pdf_overlap:] if self.pdf_overlap > 0 else [] | |
| current_chunk = overlap_words + words | |
| current_size = len(current_chunk) | |
| else: | |
| current_chunk = words | |
| current_size = sentence_size | |
| else: | |
| current_chunk.extend(words) | |
| current_size += sentence_size | |
| if current_chunk: | |
| chunks.append(' '.join(current_chunk)) | |
| return chunks | |
| def flatten_xml_to_text(self, element, depth=0) -> str: | |
| """Convert XML element and its children to a flat text representation""" | |
| text_parts = [] | |
| element_info = f"Element: {element.tag}" | |
| if element.attrib: | |
| element_info += f", Attributes: {json.dumps(element.attrib)}" | |
| if element.text and element.text.strip(): | |
| element_info += f", Text: {element.text.strip()}" | |
| text_parts.append(element_info) | |
| for child in element: | |
| child_text = self.flatten_xml_to_text(child, depth + 1) | |
| text_parts.append(child_text) | |
| return "\n".join(text_parts) | |
| def chunk_xml_text(self, text: str, max_chunk_size: int = 2000) -> List[str]: | |
| """Split flattened XML text into manageable chunks""" | |
| lines = text.split('\n') | |
| chunks = [] | |
| current_chunk = [] | |
| current_size = 0 | |
| for line in lines: | |
| line_size = len(line) | |
| if current_size + line_size > max_chunk_size and current_chunk: | |
| chunks.append('\n'.join(current_chunk)) | |
| current_chunk = [] | |
| current_size = 0 | |
| current_chunk.append(line) | |
| current_size += line_size | |
| if current_chunk: | |
| chunks.append('\n'.join(current_chunk)) | |
| return chunks | |
| def generate_natural_language(self, content: Union[List[Dict], str], content_type: str) -> str: | |
| """Generate natural language description with improved error handling and chunking""" | |
| try: | |
| if content_type == "xml": | |
| prompt = f"Convert this XML structure description to a natural language summary: {content}" | |
| else: # pdf | |
| prompt = f"Summarize this text while preserving key information: {content}" | |
| max_prompt_length = 4000 | |
| if len(prompt) > max_prompt_length: | |
| prompt = prompt[:max_prompt_length] + "..." | |
| response = self.groq_client.chat.completions.create( | |
| messages=[{"role": "user", "content": prompt}], | |
| model="llama3-8b-8192", | |
| max_tokens=1000 | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| print(f"Error generating natural language: {str(e)}") | |
| if len(content) > 2000: | |
| half_length = len(content) // 2 | |
| first_half = content[:half_length] | |
| try: | |
| return self.generate_natural_language(first_half, content_type) | |
| except: | |
| return None | |
| return None | |
| # Additional methods (unchanged but structured for easier review)... | |
| def store_in_vector_db(self, natural_language: str, metadata: Dict) -> str: | |
| """Store content in vector database""" | |
| doc_id = f"{metadata['source_file']}_{metadata['content_type']}_{metadata['chunk_id']}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| self.collection.add( | |
| documents=[natural_language], | |
| metadatas=[metadata], | |
| ids=[doc_id] | |
| ) | |
| return doc_id | |
| def process_file(self, file_path: str) -> Dict: | |
| """Process any supported file type""" | |
| try: | |
| file_extension = os.path.splitext(file_path)[1].lower() | |
| if file_extension == '.xml': | |
| return self.process_xml_file(file_path) | |
| elif file_extension == '.pdf': | |
| return self.process_pdf_file(file_path) | |
| else: | |
| return { | |
| 'success': False, | |
| 'error': f'Unsupported file type: {file_extension}' | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'error': f'Error processing file: {str(e)}' | |
| } | |
| def process_xml_file(self, xml_file_path: str) -> Dict: | |
| """Process XML file with improved chunking""" | |
| try: | |
| tree = ET.parse(xml_file_path) | |
| root = tree.getroot() | |
| flattened_text = self.flatten_xml_to_text(root) | |
| chunks = self.chunk_xml_text(flattened_text) | |
| print(f"Split XML into {len(chunks)} chunks") | |
| results = [] | |
| for i, chunk in enumerate(chunks): | |
| print(f"Processing XML chunk {i+1}/{len(chunks)}") | |
| try: | |
| natural_language = self.generate_natural_language(chunk, "xml") | |
| if natural_language: | |
| metadata = { | |
| 'source_file': os.path.basename(xml_file_path), | |
| 'content_type': 'xml', | |
| 'chunk_id': i, | |
| 'total_chunks': len(chunks), | |
| 'timestamp': str(datetime.datetime.now()) | |
| } | |
| doc_id = self.store_in_vector_db(natural_language, metadata) | |
| results.append({ | |
| 'chunk': i, | |
| 'success': True, | |
| 'doc_id': doc_id, | |
| 'natural_language': natural_language | |
| }) | |
| else: | |
| results.append({ | |
| 'chunk': i, | |
| 'success': False, | |
| 'error': 'Failed to generate natural language' | |
| }) | |
| except Exception as e: | |
| print(f"Error processing chunk {i}: {str(e)}") | |
| results.append({ | |
| 'chunk': i, | |
| 'success': False, | |
| 'error': str(e) | |
| }) | |
| return { | |
| 'success': True, | |
| 'total_chunks': len(chunks), | |
| 'results': results | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| def process_pdf_file(self, pdf_file_path: str) -> Dict: | |
| """Process PDF file""" | |
| try: | |
| full_text = self.extract_text_from_pdf(pdf_file_path) | |
| chunks = self.chunk_text(full_text) | |
| print(f"Split PDF into {len(chunks)} chunks") | |
| results = [] | |
| for i, chunk in enumerate(chunks): | |
| print(f"Processing PDF chunk {i+1}/{len(chunks)}") | |
| natural_language = self.generate_natural_language(chunk, "pdf") | |
| if natural_language: | |
| metadata = { | |
| 'source_file': os.path.basename(pdf_file_path), | |
| 'content_type': 'pdf', | |
| 'chunk_id': i, | |
| 'total_chunks': len(chunks), | |
| 'timestamp': str(datetime.datetime.now()), | |
| 'chunk_size': len(chunk.split()) | |
| } | |
| doc_id = self.store_in_vector_db(natural_language, metadata) | |
| results.append({ | |
| 'chunk': i, | |
| 'success': True, | |
| 'doc_id': doc_id, | |
| 'natural_language': natural_language, | |
| 'original_text': chunk[:200] + "..." | |
| }) | |
| else: | |
| results.append({ | |
| 'chunk': i, | |
| 'success': False, | |
| 'error': 'Failed to generate natural language summary' | |
| }) | |
| return { | |
| 'success': True, | |
| 'total_chunks': len(chunks), | |
| 'results': results | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| def get_available_files(self) -> Dict[str, List[str]]: | |
| """Get list of all files in the database""" | |
| try: | |
| all_entries = self.collection.get( | |
| include=['metadatas'] | |
| ) | |
| files = { | |
| 'pdf': set(), | |
| 'xml': set() | |
| } | |
| for metadata in all_entries['metadatas']: | |
| file_type = metadata['content_type'] | |
| file_name = metadata['source_file'] | |
| files[file_type].add(file_name) | |
| return { | |
| 'pdf': sorted(list(files['pdf'])), | |
| 'xml': sorted(list(files['xml'])) | |
| } | |
| except Exception as e: | |
| print(f"Error getting available files: {str(e)}") | |
| return {'pdf': [], 'xml': []} | |
| def ask_question_selective(self, question: str, selected_files: List[str], n_results: int = 5) -> str: | |
| """Ask a question using only the selected files""" | |
| try: | |
| filter_dict = { | |
| 'source_file': {'$in': selected_files} | |
| } | |
| results = self.collection.query( | |
| query_texts=[question], | |
| n_results=n_results, | |
| where=filter_dict, | |
| include=["documents", "metadatas"] | |
| ) | |
| if not results['documents'][0]: | |
| return "No relevant content found in the selected files." | |
| context = "\n\n".join(results['documents'][0]) | |
| prompt = f"""Based on the following content from the selected files, please answer this question: {question} | |
| Content: | |
| {context} | |
| Please provide a direct answer based only on the information provided above.""" | |
| response = self.groq_client.chat.completions.create( | |
| messages=[{"role": "user", "content": prompt}], | |
| model="llama3-8b-8192", | |
| temperature=0.2 | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| return f"Error processing your question: {str(e)}" | |
| from typing import List, Dict, Union | |
| from groq import Groq | |
| import chromadb | |
| import os | |
| import datetime | |
| import json | |
| import xml.etree.ElementTree as ET | |
| import nltk | |
| from nltk.tokenize import sent_tokenize | |
| import PyPDF2 | |
| from sentence_transformers import SentenceTransformer | |
| class CustomEmbeddingFunction: | |
| def __init__(self): | |
| self.model = SentenceTransformer('all-MiniLM-L6-v2') | |
| def __call__(self, input: List[str]) -> List[List[float]]: | |
| embeddings = self.model.encode(input) | |
| return embeddings.tolist() | |
| class UnifiedDocumentProcessor: | |
| def __init__(self, groq_api_key, collection_name="unified_content"): | |
| """Initialize the processor with necessary clients""" | |
| self.groq_client = Groq(api_key=groq_api_key) | |
| # XML-specific settings | |
| self.max_elements_per_chunk = 50 | |
| # PDF-specific settings | |
| self.pdf_chunk_size = 500 | |
| self.pdf_overlap = 50 | |
| # Initialize NLTK - Updated to handle both resources | |
| self._initialize_nltk() | |
| # Initialize ChromaDB with a single collection for all document types | |
| self.chroma_client = chromadb.Client() | |
| existing_collections = self.chroma_client.list_collections() | |
| collection_exists = any(col.name == collection_name for col in existing_collections) | |
| if collection_exists: | |
| print(f"Using existing collection: {collection_name}") | |
| self.collection = self.chroma_client.get_collection( | |
| name=collection_name, | |
| embedding_function=CustomEmbeddingFunction() | |
| ) | |
| else: | |
| print(f"Creating new collection: {collection_name}") | |
| self.collection = self.chroma_client.create_collection( | |
| name=collection_name, | |
| embedding_function=CustomEmbeddingFunction() | |
| ) | |
| def _initialize_nltk(self): | |
| """Ensure both NLTK resources are available.""" | |
| try: | |
| nltk.download('punkt') | |
| try: | |
| nltk.data.find('tokenizers/punkt_tab') | |
| except LookupError: | |
| nltk.download('punkt_tab') | |
| except Exception as e: | |
| print(f"Warning: Error downloading NLTK resources: {str(e)}") | |
| print("Falling back to basic sentence splitting...") | |
| def _basic_sentence_split(self, text: str) -> List[str]: | |
| """Fallback method for sentence tokenization""" | |
| sentences = [] | |
| current = "" | |
| for char in text: | |
| current += char | |
| if char in ['.', '!', '?'] and len(current.strip()) > 0: | |
| sentences.append(current.strip()) | |
| current = "" | |
| if current.strip(): | |
| sentences.append(current.strip()) | |
| return sentences | |
| def process_file(self, file_path: str) -> Dict: | |
| """Process any supported file type""" | |
| try: | |
| file_extension = os.path.splitext(file_path)[1].lower() | |
| if file_extension == '.xml': | |
| return self.process_xml_file(file_path) | |
| elif file_extension == '.pdf': | |
| return self.process_pdf_file(file_path) | |
| else: | |
| return { | |
| 'success': False, | |
| 'error': f'Unsupported file type: {file_extension}' | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'error': f'Error processing file: {str(e)}' | |
| } | |
| def extract_text_from_pdf(self, pdf_path: str) -> str: | |
| """Extract text from PDF file""" | |
| try: | |
| text = "" | |
| with open(pdf_path, 'rb') as file: | |
| pdf_reader = PyPDF2.PdfReader(file) | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() + " " | |
| return text.strip() | |
| except Exception as e: | |
| raise Exception(f"Error extracting text from PDF: {str(e)}") | |
| def chunk_text(self, text: str) -> List[str]: | |
| """Split text into chunks while preserving sentence boundaries""" | |
| try: | |
| sentences = sent_tokenize(text) | |
| except Exception as e: | |
| print(f"Warning: Using fallback sentence splitting: {str(e)}") | |
| sentences = self._basic_sentence_split(text) | |
| chunks = [] | |
| current_chunk = [] | |
| current_size = 0 | |
| for sentence in sentences: | |
| words = sentence.split() | |
| sentence_size = len(words) | |
| if current_size + sentence_size > self.pdf_chunk_size: | |
| if current_chunk: | |
| chunks.append(' '.join(current_chunk)) | |
| overlap_words = current_chunk[-self.pdf_overlap:] if self.pdf_overlap > 0 else [] | |
| current_chunk = overlap_words + words | |
| current_size = len(current_chunk) | |
| else: | |
| current_chunk = words | |
| current_size = sentence_size | |
| else: | |
| current_chunk.extend(words) | |
| current_size += sentence_size | |
| if current_chunk: | |
| chunks.append(' '.join(current_chunk)) | |
| return chunks | |
| def flatten_xml_to_text(self, element, depth=0) -> str: | |
| """Convert XML element and its children to a flat text representation""" | |
| text_parts = [] | |
| element_info = f"Element: {element.tag}" | |
| if element.attrib: | |
| element_info += f", Attributes: {json.dumps(element.attrib)}" | |
| if element.text and element.text.strip(): | |
| element_info += f", Text: {element.text.strip()}" | |
| text_parts.append(element_info) | |
| for child in element: | |
| child_text = self.flatten_xml_to_text(child, depth + 1) | |
| text_parts.append(child_text) | |
| return "\n".join(text_parts) | |
| def chunk_xml_text(self, text: str, max_chunk_size: int = 2000) -> List[str]: | |
| """Split flattened XML text into manageable chunks""" | |
| lines = text.split('\n') | |
| chunks = [] | |
| current_chunk = [] | |
| current_size = 0 | |
| for line in lines: | |
| line_size = len(line) | |
| if current_size + line_size > max_chunk_size and current_chunk: | |
| chunks.append('\n'.join(current_chunk)) | |
| current_chunk = [] | |
| current_size = 0 | |
| current_chunk.append(line) | |
| current_size += line_size | |
| if current_chunk: | |
| chunks.append('\n'.join(current_chunk)) | |
| return chunks | |
| def generate_natural_language(self, content: Union[List[Dict], str], content_type: str) -> str: | |
| """Generate natural language description with improved error handling and chunking""" | |
| try: | |
| if content_type == "xml": | |
| prompt = f"Convert this XML structure description to a natural language summary: {content}" | |
| else: # pdf | |
| prompt = f"Summarize this text while preserving key information: {content}" | |
| max_prompt_length = 4000 | |
| if len(prompt) > max_prompt_length: | |
| prompt = prompt[:max_prompt_length] + "..." | |
| response = self.groq_client.chat.completions.create( | |
| messages=[{"role": "user", "content": prompt}], | |
| model="llama3-8b-8192", | |
| max_tokens=1000 | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| print(f"Error generating natural language: {str(e)}") | |
| if len(content) > 2000: | |
| half_length = len(content) // 2 | |
| first_half = content[:half_length] | |
| try: | |
| return self.generate_natural_language(first_half, content_type) | |
| except: | |
| return None | |
| return None | |
| def store_in_vector_db(self, natural_language: str, metadata: Dict) -> str: | |
| """Store content in vector database""" | |
| doc_id = f"{metadata['source_file']}_{metadata['content_type']}_{metadata['chunk_id']}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| self.collection.add( | |
| documents=[natural_language], | |
| metadatas=[metadata], | |
| ids=[doc_id] | |
| ) | |
| return doc_id | |
| def process_xml_file(self, xml_file_path: str) -> Dict: | |
| """Process XML file with improved chunking""" | |
| try: | |
| tree = ET.parse(xml_file_path) | |
| root = tree.getroot() | |
| flattened_text = self.flatten_xml_to_text(root) | |
| chunks = self.chunk_xml_text(flattened_text) | |
| print(f"Split XML into {len(chunks)} chunks") | |
| results = [] | |
| for i, chunk in enumerate(chunks): | |
| print(f"Processing XML chunk {i+1}/{len(chunks)}") | |
| try: | |
| natural_language = self.generate_natural_language(chunk, "xml") | |
| if natural_language: | |
| metadata = { | |
| 'source_file': os.path.basename(xml_file_path), | |
| 'content_type': 'xml', | |
| 'chunk_id': i, | |
| 'total_chunks': len(chunks), | |
| 'timestamp': str(datetime.datetime.now()) | |
| } | |
| doc_id = self.store_in_vector_db(natural_language, metadata) | |
| results.append({ | |
| 'chunk': i, | |
| 'success': True, | |
| 'doc_id': doc_id, | |
| 'natural_language': natural_language | |
| }) | |
| else: | |
| results.append({ | |
| 'chunk': i, | |
| 'success': False, | |
| 'error': 'Failed to generate natural language' | |
| }) | |
| except Exception as e: | |
| print(f"Error processing chunk {i}: {str(e)}") | |
| results.append({ | |
| 'chunk': i, | |
| 'success': False, | |
| 'error': str(e) | |
| }) | |
| return { | |
| 'success': True, | |
| 'total_chunks': len(chunks), | |
| 'results': results | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| def process_pdf_file(self, pdf_file_path: str) -> Dict: | |
| """Process PDF file""" | |
| try: | |
| full_text = self.extract_text_from_pdf(pdf_file_path) | |
| chunks = self.chunk_text(full_text) | |
| print(f"Split PDF into {len(chunks)} chunks") | |
| results = [] | |
| for i, chunk in enumerate(chunks): | |
| print(f"Processing PDF chunk {i+1}/{len(chunks)}") | |
| natural_language = self.generate_natural_language(chunk, "pdf") | |
| if natural_language: | |
| metadata = { | |
| 'source_file': os.path.basename(pdf_file_path), | |
| 'content_type': 'pdf', | |
| 'chunk_id': i, | |
| 'total_chunks': len(chunks), | |
| 'timestamp': str(datetime.datetime.now()), | |
| 'chunk_size': len(chunk.split()) | |
| } | |
| doc_id = self.store_in_vector_db(natural_language, metadata) | |
| results.append({ | |
| 'chunk': i, | |
| 'success': True, | |
| 'doc_id': doc_id, | |
| 'natural_language': natural_language, | |
| 'original_text': chunk[:200] + "..." | |
| }) | |
| else: | |
| results.append({ | |
| 'chunk': i, | |
| 'success': False, | |
| 'error': 'Failed to generate natural language summary' | |
| }) | |
| return { | |
| 'success': True, | |
| 'total_chunks': len(chunks), | |
| 'results': results | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| def get_available_files(self) -> Dict[str, List[str]]: | |
| """Get list of all files in the database""" | |
| try: | |
| all_entries = self.collection.get( | |
| include=['metadatas'] | |
| ) | |
| files = { | |
| 'pdf': set(), | |
| 'xml': set() | |
| } | |
| for metadata in all_entries['metadatas']: | |
| file_type = metadata['content_type'] | |
| file_name = metadata['source_file'] | |
| files[file_type].add(file_name) | |
| return { | |
| 'pdf': sorted(list(files['pdf'])), | |
| 'xml': sorted(list(files['xml'])) | |
| } | |
| except Exception as e: | |
| print(f"Error getting available files: {str(e)}") | |
| return {'pdf': [], 'xml': []} | |
| def ask_question_selective(self, question: str, selected_files: List[str], n_results: int = 5) -> str: | |
| """Ask a question using only the selected files""" | |
| try: | |
| filter_dict = { | |
| 'source_file': {'$in': selected_files} | |
| } | |
| results = self.collection.query( | |
| query_texts=[question], | |
| n_results=n_results, | |
| where=filter_dict, | |
| include=["documents", "metadatas"] | |
| ) | |
| if not results['documents'][0]: | |
| return "No relevant content found in the selected files." | |
| context = "\n\n".join(results['documents'][0]) | |
| prompt = f"""Based on the following content from the selected files, please answer this question: {question} | |
| Content: | |
| {context} | |
| Please provide a direct answer based only on the information provided above.""" | |
| response = self.groq_client.chat.completions.create( | |
| messages=[{"role": "user", "content": prompt}], | |
| model="llama3-8b-8192", | |
| temperature=0.2 | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| return f"Error processing your question: {str(e)}" | |