Spaces:
Build error
Build error
| import streamlit as st | |
| from io import StringIO | |
| import PyPDF4 | |
| import pdfplumber | |
| import docx2txt | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import difflib | |
| import os | |
| from huggingface_hub import InferenceClient # Import Hugging Face API | |
| # ========== CONFIG ========== | |
| st.set_page_config(page_title="📑 Contract Analyzer", layout="wide") | |
| # ========== FUNCTIONS ========== | |
| token = os.environ.get("HF_TOKEN") | |
| # Tải mô hình Hugging Face từ Hub | |
| def load_inference_client(): | |
| try: | |
| return InferenceClient( | |
| model="HuggingFaceH4/zephyr-7b-beta", | |
| token=token) | |
| except Exception as e: | |
| st.error(f"Error loading InferenceClient: {e}") | |
| return None | |
| inference_client = load_inference_client() | |
| def extract_text_from_pdf(uploaded_file): | |
| try: | |
| with pdfplumber.open(uploaded_file) as pdf: | |
| text = "\n".join(page.extract_text() or "" for page in pdf.pages) | |
| if not text.strip(): | |
| raise ValueError("No extractable text found in the PDF") | |
| return text | |
| except Exception as e: | |
| st.error(f"Error reading PDF: {e}") | |
| return "" | |
| def load_text(file): | |
| if not file: | |
| return "" | |
| try: | |
| # Check file size (e.g., limit to 10MB) | |
| if file.size > 10 * 1024 * 1024: # 10MB | |
| st.warning("File is too large. Please upload a smaller file.") | |
| return "" | |
| ext = file.name.split('.')[-1].lower() | |
| if ext == 'txt': | |
| return StringIO(file.getvalue().decode("utf-8")).read() | |
| elif ext == 'pdf': | |
| return extract_text_from_pdf(file) | |
| elif ext == 'docx': | |
| return docx2txt.process(file) | |
| else: | |
| st.warning(f"Unsupported file type: {ext}") | |
| return "" | |
| except Exception as e: | |
| st.error(f"Error loading file: {e}") | |
| return "" | |
| def highlight_diff(text1, text2): | |
| differ = difflib.Differ() | |
| diff = differ.compare(text1.split(), text2.split()) | |
| html = "" | |
| for word in diff: | |
| if word.startswith("- "): | |
| html += f'<span style="background-color:#ffcccc">{word[2:]}</span> ' | |
| elif word.startswith("+ "): | |
| html += f'<span style="background-color:#ccffcc">{word[2:]}</span> ' | |
| else: | |
| html += word[2:] + " " | |
| return html | |
| def compute_similarity(text1, text2): | |
| if not text1.strip() or not text2.strip(): | |
| return 0.0 | |
| try: | |
| tfidf = TfidfVectorizer(token_pattern=r'(?u)\b\w+\b') | |
| tfidf_matrix = tfidf.fit_transform([text1, text2]) | |
| sim = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2]) | |
| return sim[0][0] * 100 | |
| except: | |
| return difflib.SequenceMatcher(None, text1, text2).ratio() * 100 | |
| # Hàm truy vấn Zephyr từ Hugging Face | |
| def query_zephyr_model(text1, text2, question): | |
| prompt = f"Compare the following two contracts and answer the question:\nText 1: {text1}\nText 2: {text2}\nQuestion: {question}" | |
| try: | |
| result = inference_client.text_generation(prompt) | |
| return result.generated_text | |
| except Exception as e: | |
| st.error(f"Error querying the model: {e}") | |
| return None | |
| # ========== MAIN ========== | |
| def main(): | |
| st.title("📑 Contract Analyzer") | |
| st.markdown("Upload two contracts, compare them, and ask any question!") | |
| # Upload documents | |
| st.header("1. Upload Documents") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| file1 = st.file_uploader("Upload First Document", type=["txt", "pdf", "docx"], key="file1") | |
| with col2: | |
| file2 = st.file_uploader("Upload Second Document", type=["txt", "pdf", "docx"], key="file2") | |
| text1, text2 = "", "" | |
| if file1: text1 = load_text(file1) | |
| if file2: text2 = load_text(file2) | |
| if not (text1 and text2): | |
| st.warning("Please upload both documents to continue.") | |
| return | |
| # Display uploaded texts | |
| st.header("2. Documents Content") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("First Document") | |
| st.text_area("Content of first document:", text1, height=300) | |
| with col2: | |
| st.subheader("Second Document") | |
| st.text_area("Content of second document:", text2, height=300) | |
| # Compare documents | |
| st.header("3. Compare Documents") | |
| if st.button("Compare Documents"): | |
| sim_score = compute_similarity(text1, text2) | |
| st.metric("Similarity Score", f"{sim_score:.2f}%") | |
| diff_html = highlight_diff(text1, text2) | |
| st.markdown("**Differences Highlighted:**", unsafe_allow_html=True) | |
| st.markdown(f"<div style='border:1px solid #ccc; padding:10px; max-height:400px; overflow:auto'>{diff_html}</div>", unsafe_allow_html=True) | |
| # Ask any question | |
| st.header("4. Ask a Question") | |
| user_question = st.text_input("Enter your question about the contracts:") | |
| if user_question and st.button("Analyze Question"): | |
| with st.spinner("Analyzing..."): | |
| try: | |
| pred = query_zephyr_model(text1, text2, user_question) | |
| if pred: | |
| st.success(pred) | |
| else: | |
| st.error("Failed to get a valid answer from the model.") | |
| except Exception as e: | |
| st.error(f"Failed on Document: {e}") | |
| if __name__ == "__main__": | |
| main() | |