import gradio as gr from ibm_watson import NaturalLanguageUnderstandingV1 from ibm_cloud_sdk_core.authenticators import IAMAuthenticator from docx import Document from PyPDF2 import PdfReader import os from dotenv import load_dotenv import json import re import unicodedata import requests def normalize_text(text): """Removes accents, special characters and converts to lowercase.""" if not text: return "" # Convert to lowercase and remove extra spaces text = text.lower().strip() # Remove accents text = "".join(c for c in unicodedata.normalize('NFD', text) if unicodedata.category(c) != 'Mn') # Remove basic punctuation for search (keep letters and numbers) text = re.sub(r'[^a-z0-9\s]', '', text) return text # Load environment variables load_dotenv() # Initialize Natural Language Understanding API_KEY = os.getenv('IBM_WATSON_API_KEY', 'YOUR_API_KEY') SERVICE_URL = os.getenv('IBM_WATSON_URL', 'YOUR_SERVICE_URL') PROJECT_ID = os.getenv('IBM_WATSONX_PROJECT_ID', 'YOUR_PROJECT_ID') WATSONX_API_KEY = os.getenv('IBM_WATSONX_API_KEY', API_KEY) # Use specific key or general as fallback authenticator = IAMAuthenticator(API_KEY) nlu = NaturalLanguageUnderstandingV1( version='2024-05-10', authenticator=authenticator ) nlu.set_service_url(SERVICE_URL) # Function to extract text from a document def extract_text(file): if not file: return "No file uploaded." try: # If file is a gr.File object, it has the .name attribute (temporary path) file_name = file.name if hasattr(file, 'name') else file if file_name.endswith('.pdf'): reader = PdfReader(file_name) text = '' for page in reader.pages: page_text = page.extract_text() if page_text: text += page_text return text elif file_name.endswith('.docx'): doc = Document(file_name) text = '' for para in doc.paragraphs: text += para.text + '\n' return text elif file_name.endswith('.txt'): with open(file_name, 'r', encoding='utf-8') as f: return f.read() else: return "Unsupported file format. Use PDF, DOCX or TXT." except Exception as e: return f"Error extracting text: {str(e)}" # Function to process text (Summary, Keywords, Classification) def process_text(text): if not text or len(text.strip()) < 10: return "Insufficient text for processing.", "", "" try: # Try automatic summarization (may not be available in all plans/regions) try: summary_res = nlu.analyze( text=text, features={'summarization': {'limit': 1}} ).get_result() summary = summary_res.get('summarization', {}).get('text', 'Summary not available.') except Exception: summary = "Automatic summarization not available in your Watson NLU plan. Showing main concepts..." # Key topics extraction (keywords) topics_res = nlu.analyze( text=text, features={'keywords': {'limit': 10}} ).get_result() topics_list = [k['text'] for k in topics_res.get('keywords', [])] topics = ", ".join(topics_list[:5]) # If summary failed, we try to use topics to create a simple description if "not available" in summary: summary = f"The document covers topics such as: {', '.join(topics_list[:3])}." # Thematic classification (categories) classification_res = nlu.analyze( text=text, features={'categories': {'limit': 5}} ).get_result() classification = ", ".join([c['label'] for c in classification_res.get('categories', [])]) return summary, topics, classification except Exception as e: return f"Processing error: {str(e)}", "", "" # Function to answer questions about the document (Search) def answer_question(question, text): if not question or not text: return "Please provide a question and ensure the document has been analyzed first." try: # 1. Extraction of important terms from the question using NLU (Keywords and Concepts) search_terms = [] try: question_analysis = nlu.analyze( text=question, features={'keywords': {}, 'concepts': {}} ).get_result() for k in question_analysis.get('keywords', []): search_terms.append(normalize_text(k['text'])) for c in question_analysis.get('concepts', []): search_terms.append(normalize_text(c['text'])) except: pass # Fallback to manual extraction if NLU fails on short question # If Watson doesn't return terms or fails, use manual split with normalization if not search_terms: search_terms = normalize_text(question).split() if not search_terms: # Last attempt: if everything fails, use the entire normalized question search_terms = [normalize_text(question)] # 2. Document text processing # Normalize full text for search normalized_text = normalize_text(text) # Split document into smaller blocks (paragraphs) raw_blocks = re.split(r'\n\s*\n', text) if len(raw_blocks) < 2: raw_blocks = text.split('\n') valid_paragraphs = [] for block in raw_blocks: clean = block.strip() if len(clean) > 20: # Keep blocks with minimum content valid_paragraphs.append({ 'original': clean, 'normalized': normalize_text(clean) }) # If still few blocks, try to split by sentences if len(valid_paragraphs) < 3: sentences = re.split(r'\.\s+', text) valid_paragraphs = [] for s in sentences: clean = s.strip() if len(clean) > 20: valid_paragraphs.append({ 'original': clean, 'normalized': normalize_text(clean) }) # 3. Relevance calculation (Ranking) best_paragraph = "" highest_score = 0 for item in valid_paragraphs: p_norm = item['normalized'] score = 0 for term in search_terms: if not term: continue # If exact term (normalized) is in paragraph if term in p_norm: score += 1 # Whole word bonus to avoid false-positives in substrings if re.search(rf'\b{re.escape(term)}\b', p_norm): score += 2 # If score is equal, we prefer shorter (more specific) paragraph if score > highest_score: highest_score = score best_paragraph = item['original'] elif score == highest_score and score > 0: if len(item['original']) < len(best_paragraph): best_paragraph = item['original'] # 4. Result return if best_paragraph and highest_score > 0: return f"Based on the document, I found this relevant snippet:\n\n\"{best_paragraph}\"" else: return "Unfortunately I didn't find a direct answer in the document. Try rephrasing your question with other terms." except Exception as e: return f"Error processing smart search: {str(e)}" # --- Smart Chat Functions (RAG with Watsonx AI) --- def get_iam_token(): """Generates an IAM access token using the Watsonx API Key.""" url = "https://iam.cloud.ibm.com/identity/token" headers = {"Content-Type": "application/x-www-form-urlencoded"} data = f"grant_type=urn:ibm:params:oauth:grant-type:apikey&apikey={WATSONX_API_KEY}" try: response = requests.post(url, headers=headers, data=data) if response.status_code == 200: return response.json().get("access_token") elif response.status_code == 400: return f"Authentication Error (400): The provided API Key is invalid or not found. Check your .env file." else: return f"Error generating token ({response.status_code}): {response.text}" except Exception as e: return f"Connection error generating token: {str(e)}" def smart_chat(question, document_text): """Performs a smart chat (RAG) using the Llama-3 model on Watsonx AI.""" if not question or not document_text: return "Please analyze a document first and type a question." token = get_iam_token() if token.startswith("Error"): return token url = "https://us-south.ml.cloud.ibm.com/ml/v1/text/chat?version=2023-05-29" # Limit document text to not exceed model token limit context = document_text[:10000] # Approximately 2500 tokens body = { "messages": [ { "role": "system", "content": ( "You are a helpful and honest AI assistant. " "Your task is to answer questions based EXCLUSIVELY on the content of the document provided below. " "If the answer is not in the text, say you didn't find the information in the document. " "Always answer in English and use Markdown formatting.\n\n" f"DOCUMENT CONTENT:\n{context}" ) }, { "role": "user", "content": question } ], "project_id": PROJECT_ID, "model_id": "meta-llama/llama-3-3-70b-instruct", "frequency_penalty": 0, "max_tokens": 2000, "presence_penalty": 0, "temperature": 0, "top_p": 1 } headers = { "Accept": "application/json", "Content-Type": "application/json", "Authorization": f"Bearer {token}" } try: response = requests.post(url, headers=headers, json=body) if response.status_code != 200: return f"Watsonx API Error: {response.text}" data = response.json() return data['choices'][0]['message']['content'] except Exception as e: return f"Chat processing error: {str(e)}" # --- Gradio Interface using Blocks --- def create_interface(): with gr.Blocks(title="Intelligent Document Analysis") as demo: gr.Markdown("# 📑 Watsonx AI - Intelligent Document Analysis") gr.Markdown("Extract information, summaries and ask questions about your PDF, DOCX or TXT documents.") with gr.Tab("1. Extraction and Analysis"): with gr.Row(): with gr.Column(): file_input = gr.File(label="Document Upload") analyze_button = gr.Button("Analyze Document", variant="primary") with gr.Column(): extracted_text = gr.Textbox(label="Extracted Text", lines=10, interactive=False) with gr.Row(): summary_output = gr.Textbox(label="Automatic Summary") topics_output = gr.Textbox(label="Key Topics") classification_output = gr.Textbox(label="Thematic Classification") with gr.Tab("2. Snippet Locator (Semantic Search)"): gr.Markdown("### 🔍 Find specific snippets in the document") gr.Markdown("This tool locates the most relevant paragraphs containing your search terms.") with gr.Row(): question_input = gr.Textbox(label="What are you looking for in the text?", placeholder="Ex: Revenue goals") question_button = gr.Button("Locate Snippet", variant="secondary") answer_output = gr.Textbox(label="Most relevant snippet found", lines=10) with gr.Tab("3. Smart Chat (RAG)"): gr.Markdown("### 🤖 Ask the Artificial Intelligence") gr.Markdown("The Llama-3 model will analyze the entire document to answer your questions with reasoning and synthesis.") with gr.Row(): chat_input = gr.Textbox(label="Your Question for IA", placeholder="Ex: What is the main theme of the document?") chat_button = gr.Button("Generate IA Response", variant="primary") chat_output = gr.Markdown() # Event definitions def run_analysis_flow(file): text = extract_text(file) summary, topics, classification = process_text(text) return text, summary, topics, classification analyze_button.click( fn=run_analysis_flow, inputs=[file_input], outputs=[extracted_text, summary_output, topics_output, classification_output] ) question_button.click( fn=answer_question, inputs=[question_input, extracted_text], outputs=[answer_output] ) chat_button.click( fn=smart_chat, inputs=[chat_input, extracted_text], outputs=[chat_output] ) return demo if __name__ == "__main__": app = create_interface() app.launch()