Spaces:
Runtime error
Runtime error
| import streamlit as st | |
| import logging | |
| import os | |
| from annotated_text import annotation | |
| from json import JSONDecodeError | |
| from markdown import markdown | |
| from utils.config import parser | |
| from utils.haystack import start_document_store, start_haystack_extractive, start_haystack_rag, query, start_preprocessor_node, start_retriever, start_reader | |
| from utils.ui import reset_results, set_initial_state | |
| # Sliders | |
| DEFAULT_DOCS_FROM_RETRIEVER = int(os.getenv("DEFAULT_DOCS_FROM_RETRIEVER", "3")) | |
| DEFAULT_NUMBER_OF_ANSWERS = int(os.getenv("DEFAULT_NUMBER_OF_ANSWERS", "3")) | |
| # Labels for the evaluation | |
| #EVAL_LABELS = os.getenv("EVAL_FILE", str(Path(__file__).parent / "eval_labels_volksbank_QA.csv")) | |
| # Whether the file upload should be enabled or not | |
| DISABLE_FILE_UPLOAD = bool(os.getenv("DISABLE_FILE_UPLOAD")) | |
| UPLOAD_DOCUMENTS = [] | |
| # Define a function to handle file uploads | |
| def upload_files(): | |
| uploaded_files = st.sidebar.file_uploader( | |
| "upload", type=["pdf", "txt", "docx"], accept_multiple_files=True, label_visibility="hidden" | |
| ) | |
| return uploaded_files | |
| # Define a function to process a single file | |
| def process_file(data_file, preprocesor, document_store): | |
| # read file and add content | |
| file_contents = data_file.read().decode("utf-8") | |
| docs = [{ | |
| 'content': str(file_contents), | |
| 'meta': {'name': str(data_file.name)} | |
| }] | |
| try: | |
| names = [item.meta.get('name') for item in document_store.get_all_documents()] | |
| #if args.store == 'inmemory': | |
| # doc = converter.convert(file_path=files, meta=None) | |
| if data_file.name in names: | |
| print(f"{data_file.name} already processed") | |
| else: | |
| print(f'preprocessing uploaded doc {data_file.name}.......') | |
| #print(data_file.read().decode("utf-8")) | |
| preprocessed_docs = preprocesor.process(docs) | |
| print('writing to document store.......') | |
| document_store.write_documents(preprocessed_docs) | |
| print('updating emebdding.......') | |
| document_store.update_embeddings(retriever) | |
| except Exception as e: | |
| print(e) | |
| try: | |
| args = parser.parse_args() | |
| set_initial_state() | |
| st.write('# '+args.name) | |
| session_state = st.session_state | |
| preprocesor = start_preprocessor_node() | |
| document_store = start_document_store(args.store) | |
| retriever = start_retriever(document_store) | |
| reader = start_reader() | |
| if args.task == 'extractive': | |
| pipeline = start_haystack_extractive(document_store, retriever, reader) | |
| else: | |
| pipeline = start_haystack_rag(document_store, retriever) | |
| # Sidebar | |
| #st.sidebar.header("Options") | |
| # File upload block | |
| if not DISABLE_FILE_UPLOAD: | |
| st.sidebar.write("## File Upload:") | |
| #data_files = st.sidebar.file_uploader( | |
| # "upload", type=["pdf", "txt", "docx"], accept_multiple_files=True, label_visibility="hidden" | |
| #) | |
| data_files = upload_files() | |
| if data_files is not None: | |
| for data_file in data_files: | |
| # Upload file | |
| if data_file: | |
| try: | |
| #raw_json = upload_doc(data_file) | |
| # Call the process_file function for each uploaded file | |
| if args.store == 'inmemory': | |
| processed_data = process_file(data_file, preprocesor, document_store) | |
| st.sidebar.write(str(data_file.name) + " β ") | |
| except Exception as e: | |
| st.sidebar.write(str(data_file.name) + " β ") | |
| st.sidebar.write("_This file could not be parsed, see the logs for more information._") | |
| # Search bar | |
| question = st.text_input("Ask a question", value=st.session_state.question, max_chars=100, on_change=reset_results) | |
| # question = "what is Pi?" | |
| run_pressed = st.button("Run") | |
| # run_pressed = True | |
| run_query = ( | |
| run_pressed or question != st.session_state.question | |
| ) | |
| # Get results for query | |
| if run_query and question: | |
| reset_results() | |
| st.session_state.question = question | |
| with st.spinner("π Running your pipeline"): | |
| try: | |
| st.session_state.results = query(pipeline, question) | |
| except JSONDecodeError as je: | |
| st.error( | |
| "π An error occurred reading the results. Is the document store working?" | |
| ) | |
| except Exception as e: | |
| logging.exception(e) | |
| st.error("π An error occurred during the request.") | |
| if st.session_state.results: | |
| results = st.session_state.results | |
| if args.task == 'extractive': | |
| answers = results['answers'] | |
| for count, answer in enumerate(answers): | |
| if answer.answer: | |
| text, context = answer.answer, answer.context | |
| start_idx = context.find(text) | |
| end_idx = start_idx + len(text) | |
| st.write( | |
| f" Answer: {markdown(context[:start_idx] + str(annotation(body=text, label='ANSWER', background='#964448', color='#ffffff')) + context[end_idx:])}", | |
| unsafe_allow_html=True, | |
| ) | |
| else: | |
| st.info( | |
| "π€ Haystack is unsure whether any of the documents contain an answer to your question. Try to reformulate it!" | |
| ) | |
| elif args.task == 'rag': | |
| st.write(f" Answer: {results['results'][0]}") | |
| # Extract and display information from the 'documents' list | |
| retrieved_documents = results['documents'] | |
| st.subheader("Retriever Results:") | |
| for document in retrieved_documents: | |
| st.write(f"Document Name: {document.meta['name']}") | |
| st.write(f"Score: {document.score}") | |
| st.write(f"Text: {document.content}") | |
| except SystemExit as e: | |
| # This exception will be raised if --help or invalid command line arguments | |
| # are used. Currently streamlit prevents the program from exiting normally | |
| # so we have to do a hard exit. | |
| os._exit(e.code) |