| | import os |
| | import re |
| | import json |
| | import torch |
| | import numpy as np |
| | import logging |
| | from typing import Dict, List, Tuple, Optional |
| | from tqdm import tqdm |
| | from pydantic import BaseModel |
| | from transformers import ( |
| | AutoTokenizer, |
| | AutoModelForSeq2SeqLM, |
| | AutoModelForQuestionAnswering, |
| | pipeline, |
| | LogitsProcessor, |
| | LogitsProcessorList, |
| | PreTrainedModel, |
| | PreTrainedTokenizer |
| | ) |
| | from sentence_transformers import SentenceTransformer, CrossEncoder |
| | from sklearn.feature_extraction.text import TfidfVectorizer |
| | from rank_bm25 import BM25Okapi |
| | import PyPDF2 |
| | from sklearn.cluster import KMeans |
| | import spacy |
| | import subprocess |
| | import gradio as gr |
| |
|
| | logging.basicConfig( |
| | level=logging.INFO, |
| | format="%(asctime)s [%(levelname)s] %(message)s" |
| | ) |
| |
|
| | class ConfidenceCalibrator(LogitsProcessor): |
| | def __init__(self, calibration_factor: float = 0.9): |
| | self.calibration_factor = calibration_factor |
| |
|
| | def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor) -> torch.FloatTensor: |
| | return scores / self.calibration_factor |
| |
|
| | class DocumentResult(BaseModel): |
| | content: str |
| | confidence: float |
| | source_page: int |
| | supporting_evidence: List[str] |
| |
|
| | class OptimalModelSelector: |
| | def __init__(self): |
| | self.qa_models = { |
| | "deberta-v3": ("deepset/deberta-v3-large-squad2", 0.87) |
| | } |
| | self.summarization_models = { |
| | "bart": ("facebook/bart-large-cnn", 0.85) |
| | } |
| | self.current_models = {} |
| |
|
| | def get_best_model(self, task_type: str) -> Tuple[PreTrainedModel, PreTrainedTokenizer, float]: |
| | model_map = self.qa_models if "qa" in task_type else self.summarization_models |
| | best_model_name, best_score = max(model_map.items(), key=lambda x: x[1][1]) |
| | if best_model_name not in self.current_models: |
| | tokenizer = AutoTokenizer.from_pretrained(model_map[best_model_name][0]) |
| | model = (AutoModelForQuestionAnswering if "qa" in task_type |
| | else AutoModelForSeq2SeqLM).from_pretrained(model_map[best_model_name][0]) |
| | model = model.eval().half().to('cuda' if torch.cuda.is_available() else 'cpu') |
| | self.current_models[best_model_name] = (model, tokenizer) |
| | return *self.current_models[best_model_name], best_score |
| |
|
| | class PDFAugmentedRetriever: |
| | def __init__(self, document_texts: List[str]): |
| | self.documents = [(i, text) for i, text in enumerate(document_texts)] |
| | self.bm25 = BM25Okapi([text.split() for _, text in self.documents]) |
| | self.encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2') |
| | self.tfidf = TfidfVectorizer(stop_words='english').fit([text for _, text in self.documents]) |
| |
|
| | def retrieve(self, query: str, top_k: int = 8) -> List[Tuple[int, str, float]]: |
| | bm25_scores = self.bm25.get_scores(query.split()) |
| | semantic_scores = self.encoder.predict([(query, doc) for _, doc in self.documents]) |
| | combined_scores = 0.4 * bm25_scores + 0.6 * np.array(semantic_scores) |
| | top_indices = np.argsort(combined_scores)[-top_k:][::-1] |
| | return [(self.documents[i][0], self.documents[i][1], float(combined_scores[i])) |
| | for i in top_indices] |
| |
|
| | class DetailedExplainer: |
| | def __init__(self, |
| | explanation_model: str = "google/flan-t5-large", |
| | device: int = 0): |
| | try: |
| | self.nlp = spacy.load("en_core_web_sm") |
| | except OSError: |
| | subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"], check=True) |
| | self.nlp = spacy.load("en_core_web_sm") |
| | self.explainer = pipeline( |
| | "text2text-generation", |
| | model=explanation_model, |
| | tokenizer=explanation_model, |
| | device=device, |
| | max_length=2048, |
| | max_new_tokens=2000 |
| | ) |
| |
|
| | def extract_concepts(self, text: str) -> list: |
| | doc = self.nlp(text) |
| | concepts = set() |
| | for chunk in doc.noun_chunks: |
| | if len(chunk) > 1 and not chunk.root.is_stop: |
| | concepts.add(chunk.text.strip()) |
| | for ent in doc.ents: |
| | if ent.label_ in ["PERSON", "ORG", "GPE", "NORP", "EVENT", "WORK_OF_ART"]: |
| | concepts.add(ent.text.strip()) |
| | return list(concepts) |
| |
|
| | def explain_concept(self, concept: str, context: str, min_accuracy: float = 0.50) -> str: |
| | prompt = ( |
| | f"The following sentence from a PDF is given \n{context}\n\n\n" |
| | f"Now provide a detailed explanation of the concept '{concept}' mentioned above. " |
| | f"Include background information, context, examples, and significance. " |
| | f"Write a comprehensive explanation with at least {int(min_accuracy * 100)}% accuracy. " |
| | f"Make the explanation thorough and informative, up to 500 words if needed." |
| | ) |
| | result = self.explainer( |
| | prompt, |
| | do_sample=False, |
| | |
| | max_new_tokens=600 |
| | ) |
| | return result[0]["generated_text"].strip() |
| |
|
| | def explain_text(self, text: str, context: str) -> dict: |
| | concepts = self.extract_concepts(text) |
| | explanations = {} |
| | for concept in concepts: |
| | explanations[concept] = self.explain_concept(concept, context) |
| | return {"concepts": concepts, "explanations": explanations} |
| |
|
| | class AdvancedPDFAnalyzer: |
| | def __init__(self): |
| | self.logger = logging.getLogger("PDFAnalyzer") |
| | self.model_selector = OptimalModelSelector() |
| | self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
| | self.qa_model, self.qa_tokenizer, _ = self.model_selector.get_best_model("qa") |
| | self.qa_model = self.qa_model.to(self.device) |
| | self.summarizer = pipeline( |
| | "summarization", |
| | model="facebook/bart-large-cnn", |
| | device=0 if torch.cuda.is_available() else -1, |
| | framework="pt", |
| | max_length=2048, |
| | min_length=100 |
| | ) |
| | self.logits_processor = LogitsProcessorList([ |
| | ConfidenceCalibrator(calibration_factor=0.85) |
| | ]) |
| | self.detailed_explainer = DetailedExplainer(device=0 if torch.cuda.is_available() else -1) |
| |
|
| | def extract_text_with_metadata(self, file_path: str) -> List[Dict]: |
| | documents = [] |
| | with open(file_path, 'rb') as f: |
| | reader = PyPDF2.PdfReader(f) |
| | for i, page in enumerate(reader.pages): |
| | text = page.extract_text() |
| | if not text or not text.strip(): |
| | continue |
| | page_number = i + 1 |
| | metadata = { |
| | 'source': os.path.basename(file_path), |
| | 'page': page_number, |
| | 'char_count': len(text), |
| | 'word_count': len(text.split()), |
| | } |
| | documents.append({ |
| | 'content': self._clean_text(text), |
| | 'metadata': metadata |
| | }) |
| | if not documents: |
| | raise ValueError("No extractable content found in PDF") |
| | return documents |
| |
|
| | def _clean_text(self, text: str) -> str: |
| | text = re.sub(r'[\x00-\x1F\x7F-\x9F]', ' ', text) |
| | text = re.sub(r'\s+', ' ', text) |
| | text = re.sub(r'(\w)-\s+(\w)', r'\1\2', text) |
| | return text.strip() |
| |
|
| | def answer_question(self, question: str, documents: List[Dict]) -> Dict: |
| | retriever = PDFAugmentedRetriever([doc['content'] for doc in documents]) |
| | relevant_contexts = retriever.retrieve(question, top_k=5) |
| | answers = [] |
| | |
| | for page_idx, context, similarity_score in relevant_contexts: |
| | inputs = self.qa_tokenizer( |
| | question, |
| | context, |
| | add_special_tokens=True, |
| | return_tensors="pt", |
| | max_length=1024, |
| | truncation=True, |
| | padding=True |
| | ) |
| | inputs = {k: v.to(self.device) for k, v in inputs.items()} |
| | |
| | with torch.no_grad(): |
| | outputs = self.qa_model(**inputs) |
| | start_logits = outputs.start_logits |
| | end_logits = outputs.end_logits |
| | |
| | logits_processor = LogitsProcessorList([ConfidenceCalibrator()]) |
| | start_logits = logits_processor(inputs['input_ids'], start_logits) |
| | end_logits = logits_processor(inputs['input_ids'], end_logits) |
| | |
| | start_prob = torch.nn.functional.softmax(start_logits, dim=-1) |
| | end_prob = torch.nn.functional.softmax(end_logits, dim=-1) |
| | |
| | max_start_score, max_start_idx = torch.max(start_prob, dim=-1) |
| | max_start_idx_int = max_start_idx.item() |
| | max_end_score, max_end_idx = torch.max(end_prob[0, max_start_idx_int:], dim=-1) |
| | max_end_idx_int = max_end_idx.item() + max_start_idx_int |
| | |
| | confidence = float((max_start_score * max_end_score) * 0.9 * similarity_score) |
| | answer_tokens = inputs["input_ids"][0][max_start_idx_int:max_end_idx_int + 1] |
| | answer = self.qa_tokenizer.decode(answer_tokens, skip_special_tokens=True) |
| | |
| | |
| | if len(answer.strip()) < 20: |
| | |
| | extended_start = max(0, max_start_idx_int - 50) |
| | extended_end = min(len(inputs["input_ids"][0]), max_end_idx_int + 150) |
| | extended_tokens = inputs["input_ids"][0][extended_start:extended_end] |
| | extended_answer = self.qa_tokenizer.decode(extended_tokens, skip_special_tokens=True) |
| | if len(extended_answer.strip()) > len(answer.strip()): |
| | answer = extended_answer |
| | |
| | |
| | explanations_result = {"concepts": [], "explanations": {}} |
| | if answer and answer.strip(): |
| | try: |
| | explanations_result = self.detailed_explainer.explain_text(answer, context) |
| | except Exception as e: |
| | self.logger.warning(f"Failed to generate explanations: {e}") |
| | |
| | answers.append({ |
| | "answer": answer, |
| | "confidence": confidence, |
| | "context": context, |
| | "page_number": documents[page_idx]['metadata']['page'], |
| | "explanations": explanations_result |
| | }) |
| | |
| | if not answers: |
| | return { |
| | "answer": "No confident answer found", |
| | "confidence": 0.0, |
| | "explanations": {"concepts": [], "explanations": {}}, |
| | "page_number": 0, |
| | "context": "" |
| | } |
| | |
| | |
| | best_answer = max(answers, key=lambda x: x['confidence']) |
| | |
| | |
| | if len(answers) > 1: |
| | high_confidence_answers = [a for a in answers if a['confidence'] > 0.2] |
| | if len(high_confidence_answers) > 1: |
| | |
| | combined_explanations = {} |
| | all_concepts = set() |
| | |
| | for ans in high_confidence_answers[:3]: |
| | explanations = ans.get("explanations", {}).get("explanations", {}) |
| | concepts = ans.get("explanations", {}).get("concepts", []) |
| | all_concepts.update(concepts) |
| | combined_explanations.update(explanations) |
| | |
| | best_answer["explanations"]["explanations"] = combined_explanations |
| | best_answer["explanations"]["concepts"] = list(all_concepts) |
| | |
| | |
| | if best_answer['confidence'] < 0.3: |
| | best_answer['answer'] = f"[Low Confidence] {best_answer['answer']}" |
| | |
| | return best_answer |
| |
|
| | |
| | analyzer = AdvancedPDFAnalyzer() |
| |
|
| | |
| | documents = [] |
| |
|
| | def load_pdf(file_path: str): |
| | """Load PDF and extract documents""" |
| | global documents |
| | try: |
| | documents = analyzer.extract_text_with_metadata(file_path) |
| | return f"Successfully loaded PDF with {len(documents)} pages." |
| | except Exception as e: |
| | return f"Error loading PDF: {str(e)}" |
| |
|
| | def ask_question_gradio(question: str): |
| | if not question.strip(): |
| | return "Please enter a valid question." |
| | |
| | if not documents: |
| | return "β No PDF loaded. Please load a PDF first." |
| | |
| | try: |
| | result = analyzer.answer_question(question, documents) |
| | |
| | |
| | answer = result.get('answer', 'No answer found') |
| | confidence = result.get('confidence', 0.0) |
| | page_number = result.get('page_number', 0) |
| | explanations = result.get("explanations", {}).get("explanations", {}) |
| | |
| | |
| | explanation_text = "" |
| | if explanations: |
| | explanation_text = "\n\n".join( |
| | f"πΉ **{concept}**: {desc}" |
| | for concept, desc in explanations.items() |
| | if desc and desc.strip() |
| | ) |
| | |
| | |
| | response_parts = [ |
| | f"π **Answer**: {answer}", |
| | f"π **Confidence**: {confidence:.2f}", |
| | f"π **Page**: {page_number}" |
| | ] |
| | |
| | if explanation_text: |
| | response_parts.append(f"π **Explanations**:\n{explanation_text}") |
| | |
| | return "\n\n".join(response_parts) |
| | |
| | except Exception as e: |
| | return f"β Error: {str(e)}" |
| |
|
| | |
| | pdf_path = "example.pdf" |
| | if os.path.exists(pdf_path): |
| | load_result = load_pdf(pdf_path) |
| | print(load_result) |
| | else: |
| | print(f"PDF file '{pdf_path}' not found. Please update the path.") |
| |
|
| | demo = gr.Interface( |
| | fn=ask_question_gradio, |
| | inputs=gr.Textbox( |
| | label="Ask a question about the PDF", |
| | placeholder="Type your question here...", |
| | lines=3, |
| | max_lines=5 |
| | ), |
| | outputs=gr.Markdown( |
| | label="Answer", |
| | value="", |
| | show_copy_button=True |
| | ), |
| | title="Quandans AI - Ask Questions (Up to 2000 words)", |
| | description="Ask a question based on the document loaded in this system. The system can now provide comprehensive answers up to 2000 words with detailed explanations.", |
| | examples=[ |
| | "What is the main topic of this document?", |
| | "Provide a detailed summary of the key points from page 1", |
| | "What are the conclusions mentioned and explain them in detail?", |
| | "Give me a comprehensive overview of all the important concepts discussed" |
| | ], |
| | theme=gr.themes.Soft(), |
| | allow_flagging="never" |
| | ) |
| |
|
| | demo.launch() |