| | import os |
| | from typing import ( |
| | Any, |
| | Union, |
| | ) |
| | import zipfile |
| | import streamlit as st |
| | from streamlit.runtime.uploaded_file_manager import ( |
| | UploadedFile, |
| | UploadedFileRec, |
| | UploadedFileManager, |
| | ) |
| | from streamlit.runtime.scriptrunner import get_script_run_ctx |
| | from supabase.client import Client |
| | from langchain.vectorstores.supabase import SupabaseVectorStore |
| | from components_keys import ComponentsKeys |
| | from loaders.audio import process_audio |
| | from loaders.txt import process_txt |
| | from loaders.csv import process_csv |
| | from loaders.markdown import process_markdown |
| | from loaders.pdf import process_pdf |
| | from loaders.html import ( |
| | create_html_file, |
| | delete_tempfile, |
| | get_html, |
| | process_html, |
| | ) |
| | from loaders.powerpoint import process_powerpoint |
| | from loaders.docx import process_docx |
| | from utils import compute_sha1_from_content |
| |
|
| |
|
| | ctx = get_script_run_ctx() |
| | manager = UploadedFileManager() |
| | file_processors = { |
| | ".txt": process_txt, |
| | ".csv": process_csv, |
| | ".md": process_markdown, |
| | ".markdown": process_markdown, |
| | ".m4a": process_audio, |
| | ".mp3": process_audio, |
| | ".webm": process_audio, |
| | ".mp4": process_audio, |
| | ".mpga": process_audio, |
| | ".wav": process_audio, |
| | ".mpeg": process_audio, |
| | ".pdf": process_pdf, |
| | ".html": process_html, |
| | ".pptx": process_powerpoint, |
| | ".docx": process_docx |
| | } |
| |
|
| | def file_uploader(supabase, vector_store): |
| | |
| | |
| | |
| | accepted_file_extensions = list(file_processors.keys()) |
| | accept_multiple_files = st.secrets.self_hosted == "true" |
| | if accept_multiple_files: |
| | accepted_file_extensions += [".zip"] |
| |
|
| | files = st.file_uploader( |
| | "**Upload a file**", |
| | accept_multiple_files=accept_multiple_files, |
| | type=accepted_file_extensions, |
| | key=ComponentsKeys.FILE_UPLOADER, |
| | ) |
| | if st.secrets.self_hosted == "false": |
| | st.markdown("**In demo mode, the max file size is 1MB**") |
| | if st.button("Add to Database"): |
| | |
| | if isinstance(files, UploadedFile): |
| | filter_file(files, supabase, vector_store) |
| | |
| | elif isinstance(files, list): |
| | for file in files: |
| | filter_file(file, supabase, vector_store) |
| |
|
| | def file_already_exists(supabase, file): |
| | file_sha1 = compute_sha1_from_content(file.getvalue()) |
| | response = supabase.table("documents").select("id").eq("metadata->>file_sha1", file_sha1).execute() |
| | return len(response.data) > 0 |
| |
|
| | def file_to_uploaded_file(file: Any) -> Union[None, UploadedFile]: |
| | """Convert a file to a streamlit `UploadedFile` object. |
| | |
| | This allows us to unzip files and treat them the same way |
| | streamlit treats files uploaded through the file uploader. |
| | |
| | Parameters |
| | --------- |
| | file : Any |
| | The file. Can be any file supported by this app. |
| | |
| | Returns |
| | ------- |
| | Union[None, UploadedFile] |
| | The file converted to a streamlit `UploadedFile` object. |
| | Returns `None` if the script context cannot be grabbed. |
| | """ |
| |
|
| | if ctx is None: |
| | print("script context not found, skipping uploading file:", file.name) |
| | return |
| |
|
| | file_extension = os.path.splitext(file.name)[-1] |
| | file_name = file.name |
| | file_data = file.read() |
| | |
| | |
| | uploaded_file_rec = UploadedFileRec(None, file_name, file_extension, file_data) |
| | uploaded_file_rec = manager.add_file( |
| | ctx.session_id, |
| | ComponentsKeys.FILE_UPLOADER, |
| | uploaded_file_rec, |
| | ) |
| | return UploadedFile(uploaded_file_rec) |
| |
|
| | def filter_zip_file( |
| | file: UploadedFile, |
| | supabase: Client, |
| | vector_store: SupabaseVectorStore, |
| | ) -> None: |
| | """Unzip the zip file then filter each unzipped file. |
| | |
| | Parameters |
| | ---------- |
| | file : UploadedFile |
| | The uploaded file from the file uploader. |
| | supabase : Client |
| | The supabase client. |
| | vector_store : SupabaseVectorStore |
| | The vector store in the database. |
| | """ |
| |
|
| | with zipfile.ZipFile(file, "r") as z: |
| | unzipped_files = z.namelist() |
| | for unzipped_file in unzipped_files: |
| | with z.open(unzipped_file, "r") as f: |
| | filter_file(f, supabase, vector_store) |
| |
|
| | def filter_file(file, supabase, vector_store): |
| | |
| | |
| | if not isinstance(file, UploadedFile): |
| | file = file_to_uploaded_file(file) |
| |
|
| | file_extension = os.path.splitext(file.name)[-1] |
| | if file_extension == ".zip": |
| | filter_zip_file(file, supabase, vector_store) |
| | return True |
| |
|
| | if file_already_exists(supabase, file): |
| | st.write(f"π {file.name} is already in the database.") |
| | return False |
| |
|
| | if file.size < 1: |
| | st.write(f"π¨ {file.name} is empty.") |
| | return False |
| |
|
| | if file_extension in file_processors: |
| | if st.secrets.self_hosted == "false": |
| | file_processors[file_extension](vector_store, file, stats_db=supabase) |
| | else: |
| | file_processors[file_extension](vector_store, file, stats_db=None) |
| | st.write(f"β
{file.name} ") |
| | return True |
| |
|
| | st.write(f"β {file.name} is not a valid file type.") |
| | return False |
| |
|
| | def url_uploader(supabase, vector_store): |
| | url = st.text_area("**Add an url**",placeholder="") |
| | button = st.button("Add the URL to the database") |
| |
|
| | if button: |
| | if not st.session_state["overused"]: |
| | html = get_html(url) |
| | if html: |
| | st.write(f"Getting content ... {url} ") |
| | try: |
| | file, temp_file_path = create_html_file(url, html) |
| | except UnicodeEncodeError as e: |
| | st.write(f"β Error encoding character: {e}") |
| | file, temp_file_path = create_html_file(url, html) |
| | ret = filter_file(file, supabase, vector_store) |
| | delete_tempfile(temp_file_path, url, ret) |
| | else: |
| | st.write(f"β Failed to access to {url} .") |
| | else: |
| | st.write("You have reached your daily limit. Please come back later or self host the solution.") |