Spaces:
Sleeping
Sleeping
| """ | |
| Process uploaded documents for knowledge base | |
| """ | |
| import os | |
| import PyPDF2 | |
| import docx | |
| from typing import Dict, List | |
| import hashlib | |
| class DocumentProcessor: | |
| def __init__(self, upload_dir: str = "knowledge_base/internal_docs"): | |
| self.upload_dir = upload_dir | |
| os.makedirs(upload_dir, exist_ok=True) | |
| def process_uploaded_file(self, uploaded_file) -> Dict: | |
| """Process uploaded document and extract text""" | |
| file_hash = hashlib.md5(uploaded_file.getvalue()).hexdigest() | |
| file_path = os.path.join(self.upload_dir, f"{file_hash}_{uploaded_file.name}") | |
| # Save the file | |
| with open(file_path, "wb") as f: | |
| f.write(uploaded_file.getbuffer()) | |
| # Extract text based on file type | |
| text = "" | |
| if uploaded_file.name.endswith('.pdf'): | |
| text = self._extract_pdf_text(file_path) | |
| elif uploaded_file.name.endswith('.docx'): | |
| text = self._extract_docx_text(file_path) | |
| elif uploaded_file.name.endswith('.txt'): | |
| text = uploaded_file.getvalue().decode('utf-8') | |
| return { | |
| 'filename': uploaded_file.name, | |
| 'file_hash': file_hash, | |
| 'file_path': file_path, | |
| 'text': text[:5000], # Limit text for processing | |
| 'word_count': len(text.split()), | |
| 'status': 'processed' | |
| } | |
| def _extract_pdf_text(self, file_path: str) -> str: | |
| """Extract text from PDF file""" | |
| text = "" | |
| try: | |
| with open(file_path, 'rb') as file: | |
| pdf_reader = PyPDF2.PdfReader(file) | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() + "\n" | |
| except Exception as e: | |
| print(f"Error reading PDF: {e}") | |
| return text | |
| def _extract_docx_text(self, file_path: str) -> str: | |
| """Extract text from DOCX file""" | |
| text = "" | |
| try: | |
| doc = docx.Document(file_path) | |
| for paragraph in doc.paragraphs: | |
| text += paragraph.text + "\n" | |
| except Exception as e: | |
| print(f"Error reading DOCX: {e}") | |
| return text | |
| def search_in_documents(self, query: str, documents: List[Dict]) -> List[Dict]: | |
| """Search for query in processed documents""" | |
| results = [] | |
| query_lower = query.lower() | |
| for doc in documents: | |
| if query_lower in doc['text'].lower(): | |
| # Find context around the match | |
| idx = doc['text'].lower().find(query_lower) | |
| start = max(0, idx - 100) | |
| end = min(len(doc['text']), idx + len(query) + 100) | |
| context = doc['text'][start:end] | |
| results.append({ | |
| 'document': doc['filename'], | |
| 'match': query, | |
| 'context': f"...{context}...", | |
| 'relevance': 1.0 | |
| }) | |
| return results |