Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import tempfile | |
| import warnings | |
| from pathlib import Path | |
| from typing import List, Dict, Any, Set, Union | |
| from datetime import datetime | |
| import pytesseract | |
| from pdf2image import convert_from_path | |
| import numpy as np | |
| from langchain_community.document_loaders import PyPDFLoader, Docx2txtLoader, TextLoader | |
| from langchain_openai import ChatOpenAI, OpenAIEmbeddings | |
| from langchain_core.output_parsers import StrOutputParser | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_core.runnables import RunnablePassthrough | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_core.documents import Document | |
| from langchain.chains import ConversationalRetrievalChain | |
| from langchain.memory import ConversationBufferMemory | |
| # Suppress warnings | |
| warnings.filterwarnings("ignore", category=FutureWarning) | |
| class RiskLevel: | |
| LOW = "Low" | |
| MEDIUM = "Medium" | |
| HIGH = "High" | |
| CRITICAL = "Critical" | |
| class DocumentProcessor: | |
| """Enhanced document processing with OCR support.""" | |
| def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200): | |
| self.chunk_size = chunk_size | |
| self.chunk_overlap = chunk_overlap | |
| self.text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap | |
| ) | |
| def process_document(self, content: bytes, doc_type: str) -> List[Document]: | |
| """Process document content based on type.""" | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=doc_type) as temp_file: | |
| temp_file.write(content) | |
| temp_file_path = temp_file.name | |
| try: | |
| documents = self.load_document(temp_file_path) | |
| return self.split_documents(documents) | |
| finally: | |
| os.unlink(temp_file_path) | |
| def load_document(self, file_path: Union[str, Path]) -> List[Document]: | |
| """Load document using appropriate loader with OCR support.""" | |
| file_path = Path(file_path) | |
| suffix = file_path.suffix.lower() | |
| if suffix == '.pdf': | |
| # Try normal PDF loading first | |
| try: | |
| loader = PyPDFLoader(str(file_path)) | |
| documents = loader.load() | |
| if not any(doc.page_content.strip() for doc in documents): | |
| raise ValueError("No text content found") | |
| return documents | |
| except: | |
| # If normal loading fails, try OCR | |
| return self._process_pdf_with_ocr(file_path) | |
| elif suffix == '.docx': | |
| loader = Docx2txtLoader(str(file_path)) | |
| return loader.load() | |
| elif suffix == '.txt': | |
| loader = TextLoader(str(file_path)) | |
| return loader.load() | |
| else: | |
| raise ValueError(f"Unsupported file type: {suffix}") | |
| def _process_pdf_with_ocr(self, file_path: Path) -> List[Document]: | |
| """Process PDF with OCR using Tesseract.""" | |
| documents = [] | |
| images = convert_from_path(str(file_path)) | |
| for i, image in enumerate(images): | |
| text = pytesseract.image_to_string(image) | |
| if text.strip(): | |
| documents.append(Document( | |
| page_content=text, | |
| metadata={"source": str(file_path), "page": i + 1} | |
| )) | |
| return documents | |
| def split_documents(self, documents: List[Document]) -> List[Document]: | |
| """Split documents into chunks.""" | |
| return self.text_splitter.split_documents(documents) | |
| class AuditCopilot: | |
| """Integrated Audit Copilot with multi-functionality.""" | |
| def __init__(self, openai_api_key: str = None): | |
| self.openai_api_key = openai_api_key or os.getenv('OPENAI_API_KEY') | |
| if not self.openai_api_key: | |
| raise ValueError("OPENAI_API_KEY environment variable is not set") | |
| self.embeddings = OpenAIEmbeddings(openai_api_key=self.openai_api_key) | |
| self.vector_store = None | |
| self.chain = None | |
| self.chat_history = [] | |
| self.doc_processor = DocumentProcessor() | |
| # Initialize LLM model - using GPT-3.5-turbo for all functionalities | |
| self.llm = ChatOpenAI( | |
| model_name="gpt-3.5-turbo", | |
| temperature=0, | |
| openai_api_key=self.openai_api_key | |
| ) | |
| # Try to initialize with default document if available | |
| try: | |
| default_pdf = "IAASB-Drafting-Principles-Guidelines.pdf" | |
| if os.path.exists(default_pdf): | |
| with open(default_pdf, 'rb') as f: | |
| self.process_documents([default_pdf]) | |
| print(f"Successfully initialized with {default_pdf}") | |
| except Exception as e: | |
| print(f"Note: Could not initialize with default document: {str(e)}") | |
| # Continue initialization without failing | |
| def process_documents(self, file_paths: List[str]) -> Dict[str, str]: | |
| """Process documents and add to knowledge base.""" | |
| results = {} | |
| for file_path in file_paths: | |
| try: | |
| with open(file_path, 'rb') as f: | |
| content = f.read() | |
| doc_type = Path(file_path).suffix | |
| texts = self.doc_processor.process_document(content, doc_type) | |
| if self.vector_store is None: | |
| self.vector_store = FAISS.from_documents(texts, self.embeddings) | |
| else: | |
| self.vector_store.add_documents(texts) | |
| # Initialize conversation chain whenever vector store is updated | |
| self._initialize_conversation_chain() | |
| results[file_path] = "Success" | |
| except Exception as e: | |
| results[file_path] = f"Error: {str(e)}" | |
| return results | |
| def _initialize_conversation_chain(self): | |
| """Initialize or reinitialize the conversation chain.""" | |
| if self.vector_store is None: | |
| return | |
| memory = ConversationBufferMemory( | |
| memory_key="chat_history", | |
| return_messages=True | |
| ) | |
| self.chain = ConversationalRetrievalChain.from_llm( | |
| llm=self.llm, | |
| retriever=self.vector_store.as_retriever(search_kwargs={"k": 4}), | |
| memory=memory, | |
| verbose=True | |
| ) | |
| def get_response(self, question: str) -> str: | |
| """Get conversational response from the chain.""" | |
| if not self.chain: | |
| return "I don't have any documents to work with yet. Please upload audit documents first." | |
| try: | |
| if not question or not isinstance(question, str): | |
| return "Please provide a valid question." | |
| response = self.chain({"question": question}) | |
| if not response or 'answer' not in response: | |
| return "I'm unable to generate a response. Please try again." | |
| self.chat_history.append((question, response['answer'])) | |
| return response['answer'] | |
| except Exception as e: | |
| error_msg = f"Error generating response: {str(e)}" | |
| print(error_msg) # For logging | |
| return error_msg | |
| def get_compliance_response(self, query: str) -> Dict[str, Any]: | |
| """Generate compliance-focused response to query.""" | |
| if not query.strip(): | |
| raise ValueError("Query cannot be empty") | |
| if self.vector_store is None: | |
| raise RuntimeError("No compliance documents have been processed yet") | |
| # Create the retrieval chain | |
| retriever = self.vector_store.as_retriever(search_kwargs={"k": 4}) | |
| # Create the compliance-focused prompt template | |
| template = """You are Amy, an audit copilot and compliance expert. Answer the following question based on the provided context: | |
| Context: {context} | |
| Question: {question} | |
| Provide a detailed answer that: | |
| 1. Addresses compliance requirements and regulations | |
| 2. Identifies potential risks and their severity | |
| 3. Suggests mitigation strategies where applicable | |
| 4. Cites specific sources and regulations | |
| Response:""" | |
| prompt = ChatPromptTemplate.from_template(template) | |
| # Create the chain | |
| chain = ( | |
| { | |
| "context": retriever, | |
| "question": RunnablePassthrough() | |
| } | |
| | prompt | |
| | self.llm | |
| | StrOutputParser() | |
| ) | |
| # Get response | |
| answer = chain.invoke(query) | |
| # Get source documents | |
| source_docs = retriever.invoke(query) | |
| return { | |
| "answer": answer, | |
| "sources": self._format_sources(source_docs) | |
| } | |
| def generate_risk_assessment(self, file_path: str) -> Dict[str, Any]: | |
| """Generate risk assessment for a specific document using GPT-3.5-turbo.""" | |
| try: | |
| with open(file_path, 'rb') as f: | |
| content = f.read() | |
| texts = self.doc_processor.process_document(content, Path(file_path).suffix) | |
| # Enhanced risk assessment prompt optimized for GPT-3.5-turbo | |
| template = """You are Amy, an audit copilot specializing in risk assessment. Analyze the following audit document content and provide a comprehensive structured risk assessment: | |
| Content: {content} | |
| Provide a structured risk assessment with the following components: | |
| 1. Executive Summary: Brief overview of the document and key findings (2-3 sentences) | |
| 2. Key Risk Factors: Identify 3-5 specific risks with clear severity ratings (Low/Medium/High/Critical) | |
| 3. Compliance Issues: List any specific compliance concerns with relevant regulatory references | |
| 4. Recommended Actions: Provide actionable mitigation strategies with clear prioritization | |
| 5. Implementation Timeline: Suggest realistic timeframes for addressing each risk area | |
| Format your assessment with clear headers and bullet points where appropriate. Be specific, concise, and actionable. | |
| Assessment:""" | |
| prompt = ChatPromptTemplate.from_template(template) | |
| # Process content in manageable chunks if too large | |
| # Combine text content, limiting to approximately 8000 tokens | |
| texts_content = [doc.page_content for doc in texts] | |
| full_content = "\n".join(texts_content[:min(len(texts_content), 15)]) | |
| # Generate assessment | |
| chain = prompt | self.llm | StrOutputParser() | |
| assessment = chain.invoke({"content": full_content}) | |
| return { | |
| "assessment": assessment, | |
| "document": Path(file_path).name, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| raise RuntimeError(f"Risk assessment failed: {str(e)}") | |
| def _format_sources(self, source_documents: List[Document]) -> Set[str]: | |
| """Format source references.""" | |
| return {Path(doc.metadata['source']).name for doc in source_documents} | |
| def create_gradio_interface(): | |
| """Create Gradio interface for the integrated audit copilot.""" | |
| try: | |
| # Get OpenAI API key | |
| api_key = os.getenv("OPENAI_API_KEY") | |
| # Initialize copilot | |
| copilot = AuditCopilot(api_key) | |
| with gr.Blocks(title="Amy - Your Audit Copilot") as demo: | |
| gr.Markdown("# Amy - Your Audit Copilot") | |
| gr.Markdown("I can help you with audit document analysis, compliance questions, and risk assessment.") | |
| with gr.Tab("Conversation"): | |
| # Chat section | |
| chatbot = gr.Chatbot(label="Conversation with Amy") | |
| msg = gr.Textbox(label="Ask me anything about your IAASB documents", placeholder="Type your question here...") | |
| clear = gr.Button("Clear Chat") | |
| with gr.Tab("Document Processing"): | |
| with gr.Row(): | |
| file_input = gr.File( | |
| file_count="multiple", | |
| label="Upload Audit Documents (PDF, DOCX, TXT)" | |
| ) | |
| upload_button = gr.Button("Process Documents") | |
| upload_output = gr.Textbox(label="Processing Status") | |
| with gr.Tab("Compliance Query"): | |
| with gr.Row(): | |
| query_input = gr.Textbox( | |
| lines=3, | |
| label="Enter your compliance or regulatory query" | |
| ) | |
| query_button = gr.Button("Submit Query") | |
| query_output = gr.Textbox( | |
| lines=10, | |
| label="Amy's Response" | |
| ) | |
| with gr.Tab("Risk Assessment"): | |
| with gr.Row(): | |
| assessment_file = gr.File( | |
| label="Select Document for Risk Assessment" | |
| ) | |
| assess_button = gr.Button("Generate Risk Assessment") | |
| assessment_output = gr.Textbox( | |
| lines=15, | |
| label="Risk Assessment Report" | |
| ) | |
| # Set up event handlers | |
| def handle_file_upload(files): | |
| try: | |
| if not files: | |
| return "No files uploaded." | |
| results = copilot.process_documents([f.name for f in files]) | |
| output_lines = [] | |
| for file_path, status in results.items(): | |
| file_name = Path(file_path).name | |
| if status == "Success": | |
| output_lines.append(f"✓ Successfully processed {file_name}") | |
| else: | |
| output_lines.append(f"❌ {file_name}: {status}") | |
| return "\n".join(output_lines) | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def respond(message, chat_history): | |
| if not message.strip(): | |
| return "", chat_history | |
| bot_message = copilot.get_response(message) | |
| chat_history.append((message, bot_message)) | |
| return "", chat_history | |
| def handle_compliance_query(query): | |
| try: | |
| result = copilot.get_compliance_response(query) | |
| response = result["answer"] | |
| if result["sources"]: | |
| response += f"\n\nSources: {', '.join(result['sources'])}" | |
| return response | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def handle_risk_assessment(file): | |
| try: | |
| if not file: | |
| return "No file selected for risk assessment." | |
| result = copilot.generate_risk_assessment(file.name) | |
| return f"Risk Assessment for {result['document']}\n\n{result['assessment']}" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| # Connect event handlers | |
| upload_button.click( | |
| fn=handle_file_upload, | |
| inputs=[file_input], | |
| outputs=[upload_output] | |
| ) | |
| msg.submit(respond, [msg, chatbot], [msg, chatbot]) | |
| clear.click(lambda: None, None, chatbot, queue=False) | |
| query_button.click( | |
| fn=handle_compliance_query, | |
| inputs=[query_input], | |
| outputs=[query_output] | |
| ) | |
| assess_button.click( | |
| fn=handle_risk_assessment, | |
| inputs=[assessment_file], | |
| outputs=[assessment_output] | |
| ) | |
| return demo | |
| except Exception as e: | |
| print(f"Error creating interface: {str(e)}") | |
| raise | |
| if __name__ == "__main__": | |
| try: | |
| demo = create_gradio_interface() | |
| demo.launch(debug=True,share=True) | |
| except Exception as e: | |
| print(f"Error launching application: {str(e)}") |