|
|
import gradio as gr
|
|
|
from ibm_watson import NaturalLanguageUnderstandingV1
|
|
|
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
|
|
|
from docx import Document
|
|
|
from PyPDF2 import PdfReader
|
|
|
import os
|
|
|
from dotenv import load_dotenv
|
|
|
import json
|
|
|
import re
|
|
|
import unicodedata
|
|
|
import requests
|
|
|
|
|
|
def normalize_text(text):
|
|
|
"""Removes accents, special characters and converts to lowercase."""
|
|
|
if not text:
|
|
|
return ""
|
|
|
|
|
|
text = text.lower().strip()
|
|
|
|
|
|
text = "".join(c for c in unicodedata.normalize('NFD', text) if unicodedata.category(c) != 'Mn')
|
|
|
|
|
|
text = re.sub(r'[^a-z0-9\s]', '', text)
|
|
|
return text
|
|
|
|
|
|
|
|
|
load_dotenv()
|
|
|
|
|
|
|
|
|
API_KEY = os.getenv('IBM_WATSON_API_KEY', 'YOUR_API_KEY')
|
|
|
SERVICE_URL = os.getenv('IBM_WATSON_URL', 'YOUR_SERVICE_URL')
|
|
|
PROJECT_ID = os.getenv('IBM_WATSONX_PROJECT_ID', 'YOUR_PROJECT_ID')
|
|
|
WATSONX_API_KEY = os.getenv('IBM_WATSONX_API_KEY', API_KEY)
|
|
|
|
|
|
authenticator = IAMAuthenticator(API_KEY)
|
|
|
nlu = NaturalLanguageUnderstandingV1(
|
|
|
version='2024-05-10',
|
|
|
authenticator=authenticator
|
|
|
)
|
|
|
nlu.set_service_url(SERVICE_URL)
|
|
|
|
|
|
|
|
|
def extract_text(file):
|
|
|
if not file:
|
|
|
return "No file uploaded."
|
|
|
|
|
|
try:
|
|
|
|
|
|
file_name = file.name if hasattr(file, 'name') else file
|
|
|
|
|
|
if file_name.endswith('.pdf'):
|
|
|
reader = PdfReader(file_name)
|
|
|
text = ''
|
|
|
for page in reader.pages:
|
|
|
page_text = page.extract_text()
|
|
|
if page_text:
|
|
|
text += page_text
|
|
|
return text
|
|
|
elif file_name.endswith('.docx'):
|
|
|
doc = Document(file_name)
|
|
|
text = ''
|
|
|
for para in doc.paragraphs:
|
|
|
text += para.text + '\n'
|
|
|
return text
|
|
|
elif file_name.endswith('.txt'):
|
|
|
with open(file_name, 'r', encoding='utf-8') as f:
|
|
|
return f.read()
|
|
|
else:
|
|
|
return "Unsupported file format. Use PDF, DOCX or TXT."
|
|
|
except Exception as e:
|
|
|
return f"Error extracting text: {str(e)}"
|
|
|
|
|
|
|
|
|
def process_text(text):
|
|
|
if not text or len(text.strip()) < 10:
|
|
|
return "Insufficient text for processing.", "", ""
|
|
|
|
|
|
try:
|
|
|
|
|
|
try:
|
|
|
summary_res = nlu.analyze(
|
|
|
text=text,
|
|
|
features={'summarization': {'limit': 1}}
|
|
|
).get_result()
|
|
|
summary = summary_res.get('summarization', {}).get('text', 'Summary not available.')
|
|
|
except Exception:
|
|
|
summary = "Automatic summarization not available in your Watson NLU plan. Showing main concepts..."
|
|
|
|
|
|
|
|
|
topics_res = nlu.analyze(
|
|
|
text=text,
|
|
|
features={'keywords': {'limit': 10}}
|
|
|
).get_result()
|
|
|
topics_list = [k['text'] for k in topics_res.get('keywords', [])]
|
|
|
topics = ", ".join(topics_list[:5])
|
|
|
|
|
|
|
|
|
if "not available" in summary:
|
|
|
summary = f"The document covers topics such as: {', '.join(topics_list[:3])}."
|
|
|
|
|
|
|
|
|
classification_res = nlu.analyze(
|
|
|
text=text,
|
|
|
features={'categories': {'limit': 5}}
|
|
|
).get_result()
|
|
|
classification = ", ".join([c['label'] for c in classification_res.get('categories', [])])
|
|
|
|
|
|
return summary, topics, classification
|
|
|
except Exception as e:
|
|
|
return f"Processing error: {str(e)}", "", ""
|
|
|
|
|
|
|
|
|
def answer_question(question, text):
|
|
|
if not question or not text:
|
|
|
return "Please provide a question and ensure the document has been analyzed first."
|
|
|
|
|
|
try:
|
|
|
|
|
|
search_terms = []
|
|
|
try:
|
|
|
question_analysis = nlu.analyze(
|
|
|
text=question,
|
|
|
features={'keywords': {}, 'concepts': {}}
|
|
|
).get_result()
|
|
|
|
|
|
for k in question_analysis.get('keywords', []):
|
|
|
search_terms.append(normalize_text(k['text']))
|
|
|
for c in question_analysis.get('concepts', []):
|
|
|
search_terms.append(normalize_text(c['text']))
|
|
|
except:
|
|
|
pass
|
|
|
|
|
|
|
|
|
if not search_terms:
|
|
|
search_terms = normalize_text(question).split()
|
|
|
|
|
|
if not search_terms:
|
|
|
|
|
|
search_terms = [normalize_text(question)]
|
|
|
|
|
|
|
|
|
|
|
|
normalized_text = normalize_text(text)
|
|
|
|
|
|
|
|
|
raw_blocks = re.split(r'\n\s*\n', text)
|
|
|
if len(raw_blocks) < 2:
|
|
|
raw_blocks = text.split('\n')
|
|
|
|
|
|
valid_paragraphs = []
|
|
|
for block in raw_blocks:
|
|
|
clean = block.strip()
|
|
|
if len(clean) > 20:
|
|
|
valid_paragraphs.append({
|
|
|
'original': clean,
|
|
|
'normalized': normalize_text(clean)
|
|
|
})
|
|
|
|
|
|
|
|
|
if len(valid_paragraphs) < 3:
|
|
|
sentences = re.split(r'\.\s+', text)
|
|
|
valid_paragraphs = []
|
|
|
for s in sentences:
|
|
|
clean = s.strip()
|
|
|
if len(clean) > 20:
|
|
|
valid_paragraphs.append({
|
|
|
'original': clean,
|
|
|
'normalized': normalize_text(clean)
|
|
|
})
|
|
|
|
|
|
|
|
|
best_paragraph = ""
|
|
|
highest_score = 0
|
|
|
|
|
|
for item in valid_paragraphs:
|
|
|
p_norm = item['normalized']
|
|
|
score = 0
|
|
|
|
|
|
for term in search_terms:
|
|
|
if not term: continue
|
|
|
|
|
|
if term in p_norm:
|
|
|
score += 1
|
|
|
|
|
|
if re.search(rf'\b{re.escape(term)}\b', p_norm):
|
|
|
score += 2
|
|
|
|
|
|
|
|
|
if score > highest_score:
|
|
|
highest_score = score
|
|
|
best_paragraph = item['original']
|
|
|
elif score == highest_score and score > 0:
|
|
|
if len(item['original']) < len(best_paragraph):
|
|
|
best_paragraph = item['original']
|
|
|
|
|
|
|
|
|
if best_paragraph and highest_score > 0:
|
|
|
return f"Based on the document, I found this relevant snippet:\n\n\"{best_paragraph}\""
|
|
|
else:
|
|
|
return "Unfortunately I didn't find a direct answer in the document. Try rephrasing your question with other terms."
|
|
|
|
|
|
except Exception as e:
|
|
|
return f"Error processing smart search: {str(e)}"
|
|
|
|
|
|
|
|
|
|
|
|
def get_iam_token():
|
|
|
"""Generates an IAM access token using the Watsonx API Key."""
|
|
|
url = "https://iam.cloud.ibm.com/identity/token"
|
|
|
headers = {"Content-Type": "application/x-www-form-urlencoded"}
|
|
|
data = f"grant_type=urn:ibm:params:oauth:grant-type:apikey&apikey={WATSONX_API_KEY}"
|
|
|
|
|
|
try:
|
|
|
response = requests.post(url, headers=headers, data=data)
|
|
|
if response.status_code == 200:
|
|
|
return response.json().get("access_token")
|
|
|
elif response.status_code == 400:
|
|
|
return f"Authentication Error (400): The provided API Key is invalid or not found. Check your .env file."
|
|
|
else:
|
|
|
return f"Error generating token ({response.status_code}): {response.text}"
|
|
|
except Exception as e:
|
|
|
return f"Connection error generating token: {str(e)}"
|
|
|
|
|
|
def smart_chat(question, document_text):
|
|
|
"""Performs a smart chat (RAG) using the Llama-3 model on Watsonx AI."""
|
|
|
if not question or not document_text:
|
|
|
return "Please analyze a document first and type a question."
|
|
|
|
|
|
token = get_iam_token()
|
|
|
if token.startswith("Error"):
|
|
|
return token
|
|
|
|
|
|
url = "https://us-south.ml.cloud.ibm.com/ml/v1/text/chat?version=2023-05-29"
|
|
|
|
|
|
|
|
|
context = document_text[:10000]
|
|
|
|
|
|
body = {
|
|
|
"messages": [
|
|
|
{
|
|
|
"role": "system",
|
|
|
"content": (
|
|
|
"You are a helpful and honest AI assistant. "
|
|
|
"Your task is to answer questions based EXCLUSIVELY on the content of the document provided below. "
|
|
|
"If the answer is not in the text, say you didn't find the information in the document. "
|
|
|
"Always answer in English and use Markdown formatting.\n\n"
|
|
|
f"DOCUMENT CONTENT:\n{context}"
|
|
|
)
|
|
|
},
|
|
|
{
|
|
|
"role": "user",
|
|
|
"content": question
|
|
|
}
|
|
|
],
|
|
|
"project_id": PROJECT_ID,
|
|
|
"model_id": "meta-llama/llama-3-3-70b-instruct",
|
|
|
"frequency_penalty": 0,
|
|
|
"max_tokens": 2000,
|
|
|
"presence_penalty": 0,
|
|
|
"temperature": 0,
|
|
|
"top_p": 1
|
|
|
}
|
|
|
|
|
|
headers = {
|
|
|
"Accept": "application/json",
|
|
|
"Content-Type": "application/json",
|
|
|
"Authorization": f"Bearer {token}"
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
response = requests.post(url, headers=headers, json=body)
|
|
|
if response.status_code != 200:
|
|
|
return f"Watsonx API Error: {response.text}"
|
|
|
|
|
|
data = response.json()
|
|
|
return data['choices'][0]['message']['content']
|
|
|
except Exception as e:
|
|
|
return f"Chat processing error: {str(e)}"
|
|
|
|
|
|
|
|
|
def create_interface():
|
|
|
with gr.Blocks(title="Intelligent Document Analysis") as demo:
|
|
|
gr.Markdown("# 📑 Watsonx AI - Intelligent Document Analysis")
|
|
|
gr.Markdown("Extract information, summaries and ask questions about your PDF, DOCX or TXT documents.")
|
|
|
|
|
|
with gr.Tab("1. Extraction and Analysis"):
|
|
|
with gr.Row():
|
|
|
with gr.Column():
|
|
|
file_input = gr.File(label="Document Upload")
|
|
|
analyze_button = gr.Button("Analyze Document", variant="primary")
|
|
|
|
|
|
with gr.Column():
|
|
|
extracted_text = gr.Textbox(label="Extracted Text", lines=10, interactive=False)
|
|
|
|
|
|
with gr.Row():
|
|
|
summary_output = gr.Textbox(label="Automatic Summary")
|
|
|
topics_output = gr.Textbox(label="Key Topics")
|
|
|
classification_output = gr.Textbox(label="Thematic Classification")
|
|
|
|
|
|
with gr.Tab("2. Snippet Locator (Semantic Search)"):
|
|
|
gr.Markdown("### 🔍 Find specific snippets in the document")
|
|
|
gr.Markdown("This tool locates the most relevant paragraphs containing your search terms.")
|
|
|
with gr.Row():
|
|
|
question_input = gr.Textbox(label="What are you looking for in the text?", placeholder="Ex: Revenue goals")
|
|
|
question_button = gr.Button("Locate Snippet", variant="secondary")
|
|
|
|
|
|
answer_output = gr.Textbox(label="Most relevant snippet found", lines=10)
|
|
|
|
|
|
with gr.Tab("3. Smart Chat (RAG)"):
|
|
|
gr.Markdown("### 🤖 Ask the Artificial Intelligence")
|
|
|
gr.Markdown("The Llama-3 model will analyze the entire document to answer your questions with reasoning and synthesis.")
|
|
|
with gr.Row():
|
|
|
chat_input = gr.Textbox(label="Your Question for IA", placeholder="Ex: What is the main theme of the document?")
|
|
|
chat_button = gr.Button("Generate IA Response", variant="primary")
|
|
|
|
|
|
chat_output = gr.Markdown()
|
|
|
|
|
|
|
|
|
def run_analysis_flow(file):
|
|
|
text = extract_text(file)
|
|
|
summary, topics, classification = process_text(text)
|
|
|
return text, summary, topics, classification
|
|
|
|
|
|
analyze_button.click(
|
|
|
fn=run_analysis_flow,
|
|
|
inputs=[file_input],
|
|
|
outputs=[extracted_text, summary_output, topics_output, classification_output]
|
|
|
)
|
|
|
|
|
|
question_button.click(
|
|
|
fn=answer_question,
|
|
|
inputs=[question_input, extracted_text],
|
|
|
outputs=[answer_output]
|
|
|
)
|
|
|
|
|
|
chat_button.click(
|
|
|
fn=smart_chat,
|
|
|
inputs=[chat_input, extracted_text],
|
|
|
outputs=[chat_output]
|
|
|
)
|
|
|
|
|
|
return demo
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
app = create_interface()
|
|
|
app.launch()
|
|
|
|