Spaces:
Sleeping
Sleeping
| from llama_parse import LlamaParse | |
| from llama_index.core import SimpleDirectoryReader | |
| import os | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| import tempfile | |
| import requests | |
| import streamlit as st | |
| import boto3 | |
| from botocore.exceptions import ClientError, NoCredentialsError | |
| def check_pdf(read_file_path): | |
| try: | |
| parser = LlamaParse(result_type="markdown", api_key=os.environ['LLAMA_CLOUD_API_KEY'], ignore_errors=False) | |
| file_extractor = {".pdf": parser} | |
| markdown_data = SimpleDirectoryReader(input_files=[read_file_path], file_extractor=file_extractor).load_data() | |
| if markdown_data == []: | |
| st.error('No markdown data found') | |
| else: | |
| st.success('File Parsed successfully') | |
| except Exception as e: | |
| st.error(f"An error occurred: {e}") | |
| def download_file_from_url(url, filename): | |
| st.markdown(f"Downloading file from {url} to {filename}") | |
| os.makedirs(os.path.dirname(filename), exist_ok=True) | |
| response = requests.get(url, stream=True) | |
| if response.status_code == 200: | |
| with open(filename, 'wb') as file: | |
| for chunk in response.iter_content(chunk_size=1024): | |
| file.write(chunk) | |
| st.markdown(f"File downloaded and saved as {filename}") | |
| else: | |
| st.error(f"Failed to download file. Status code: {response.status_code}") | |
| url = st.text_input("Enter URL", key="url") | |
| if url: | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| download_file_from_url(url, os.path.join(temp_dir, "task_for_you.pdf")) | |
| check_pdf(os.path.join(temp_dir, "task_for_you.pdf")) | |
| def download_files_from_s3(bucket_name, local_folder, file_path_list): | |
| s3 = boto3.client('s3') | |
| folder_prefix = '' | |
| try: | |
| # List objects in the S3 bucket | |
| paginator = s3.get_paginator('list_objects_v2') | |
| page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=folder_prefix) | |
| # Download filtered files | |
| for page in page_iterator: | |
| for obj in page.get('Contents', []): | |
| key = obj['Key'] | |
| # Apply file filter if specified | |
| if key not in file_path_list: | |
| continue | |
| # Construct local file path | |
| local_path = os.path.join(local_folder, key) | |
| os.makedirs(os.path.dirname(local_path), exist_ok=True) | |
| try: | |
| st.markdown(f"Downloading: {key} -> {local_path}") | |
| s3.download_file(bucket_name, key, local_path) | |
| st.markdown(f"Downloaded: {local_path}") | |
| except Exception as e: | |
| st.error(f"Error downloading {key}: {e}") | |
| for path in file_path_list: | |
| if not os.path.isfile(os.path.join(local_folder, path)): | |
| st.error(f"Failed to download file {path}") | |
| except NoCredentialsError: | |
| st.error("No AWS credentials found.") | |
| except Exception as e: | |
| st.error(f"An error occurred: {e}") | |
| bucket_name = st.text_input("Enter bucket name", key="bucket_name") | |
| key = st.text_input("Enter key", key="key") | |
| if st.button("Submit"): | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| download_files_from_s3(bucket_name, temp_dir, [key]) | |
| file_name = os.path.join(temp_dir, key) | |
| check_pdf(os.path.join(temp_dir, file_name)) | |