Spaces:
Sleeping
Sleeping
| import PyPDF2 | |
| import pandas as pd | |
| import os | |
| import ast | |
| import streamlit as st | |
| import pandas as pd | |
| import os | |
| from google.oauth2.credentials import Credentials | |
| from google.auth.transport.requests import Request | |
| from google_auth_oauthlib.flow import InstalledAppFlow | |
| from googleapiclient.discovery import build | |
| from googleapiclient.http import MediaIoBaseDownload,MediaFileUpload | |
| from google.oauth2 import service_account | |
| import base64 | |
| def get_image_as_base64(image_path): | |
| with open(image_path, "rb") as img_file: | |
| return base64.b64encode(img_file.read()).decode() | |
| # Load credentials from environment variables | |
| SERVICE_ACCOUNT_INFO = { | |
| "type": "service_account", | |
| "project_id": os.environ.get("project_id"), | |
| "private_key_id": os.environ.get("private_key_id"), | |
| "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCxKy6v+KvYjqnE\nMsGl8eB2iszbrNwv/Qy3pn/mEByhjvx/bunY3+GMPahOKsseZ8HPoW00qPruroPJ\nAPjsQR9PfV7lk/IqnYJMjTrwp9L4sheDiaZkeLQnLVqSIpyuz3VX6nAhid/yA3MH\nkcSrdVIJJmgAlddhAmMqhdq4Gibu+C2Yh/PqfQ5m/EIBx3tRkRjG53bW/qx+xZYY\nbe3prfDb5fdWnJkoQg69GL8gDhr0tOomX3lwtHsonPtK/kMfi2WgCvxtamys3nAo\ntkv1KFB8UfmVx3Dy8PYWiyw0oz2mJFgXmE+x63bHJeJHQwXFcYhdenfrzrp1kIIn\nZFbwrqEbAgMBAAECggEAFmiOUVPkCbzS9yMDquVizSDgKVbTS2niY7t2SNc32oKs\nfW0TZyAQtegPM20mihQjMjZKNZNmmFglXf/hUXcpKn4tHwotN+ZP1ovfF7+Pp8Sq\nxF3dgwQLUTs4zev9IlUmE+EYnqgC1b2e8nxQQkQmr0vsHUrWgN7IQ9bF+RbXCXl5\nzZXBGPRIlC45zTBQzvZwqLV7bsllGdgrDmZaoc0vWIlF6WetktmyaeLHKndEKNDX\nYsLqJNFnwbs8WWlMuVbxAoH/BMkmQI/Zzg8DzHJ16mhIfn/FqpWcxO195QViwEbB\nuKcQTh3uX2ooWPm9ZK/bLLEigO/QWc+Wn/Gb9KPCfQKBgQDxBSSkb5CdoyILrgZq\n8Jd/Vrjjq5F4Bf02AuZn2o5CBNAoOJoGm108CX5fIbIDk/Cs9F9MNnGs/feSJvAF\nbNZEEB4bWgYrBjtY8gNNeMxX6AMSTDCMZ9nakQ1wM61WpMfRUc+2MXq+6w0jqmoS\n/dlhPcga9ElrLxFQJ7sw7RWhtQKBgQC8LhpmFKFV4xZEHo78deVrE8NndK1+KcYh\nCG/+lT84kHSnihhTVfLUuM5gWKu92pzq+tKm4vy/oF6snf13jJjcmwxj2WG5CmyZ\nvG5dWfESRgMOYfQdfKoD8hc4s41fj1awg8LLd0f4/4o2o1SQeVkVE3Jq50wIOo/r\nuh0erTg5jwKBgQDZNhBi3WTQnG+XZRvFjtSJ32c7T6kHKSadYZmyaOaiGuJaDUDK\nyYQKEmhAZgafcNmLlaS486wZArz2i37u9LpZlsj7T3OVnMUpkNpnHIHUi3URLXbn\nYsSUVPtubjQ+jLzEKV3vzcHYg89QoEl+miYpm9tWZ2gvX7jTqyrpmhFPrQKBgQCg\nzrT4bsC/6GLqq0J9MzJtomHlUqu/mGsbZ15tK5Tu/WLymOvYqyXr6lFRu/RnNR3L\nH7RLsMVT6/N7hzdGpfsq6cRYvzbnLaVIauHVHDsJ5pJOBB7b4+BVptd5ONaixpbK\nGI6p2LDEc3rk3gYxv8EEZa3s9OLgVUsPoeeog1X8uQKBgAxzBbQk6tof5W5adNKo\nZJIoB4l9Cs6Z08OqBTLKKGj8/3ZtyhfjN8bR9BTQiHW3LAJgPv6EF98lLGxNjTFc\nxQr5FfY2C4fmYNlRWDxwTTRMP9Qe6UuacZ+f74w8vmQm7Sn8HxOkxbHJfGBhlhNw\nUqPsALEU5TiO1A3lgAG8fCwj\n-----END PRIVATE KEY-----\n", | |
| "client_email": os.environ.get("client_email"), | |
| "client_id": os.environ.get("client_id"), | |
| "auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
| "token_uri": os.environ.get("token_uri"), | |
| "auth_provider_x509_cert_url": os.environ.get("auth_provider_x509_cert_url"), | |
| "client_x509_cert_url": os.environ.get("client_x509_cert_url"), | |
| "universe_domain": "googleapis.com" | |
| } | |
| SCOPES = ['https://www.googleapis.com/auth/drive'] | |
| def authenticate(): | |
| # Authenticate using service account credentials | |
| creds = service_account.Credentials.from_service_account_info(SERVICE_ACCOUNT_INFO, scopes=['https://www.googleapis.com/auth/drive']) | |
| # Build the Drive API service | |
| # service = build('drive', 'v3', credentials=creds) | |
| return creds | |
| # creds = service_account.Credentials.from_service_account_file( | |
| # SERVICE_ACCOUNT_FILE, scopes=['https://www.googleapis.com/auth/drive']) | |
| # Check if token file exists | |
| # if os.path.exists('token.json'): | |
| # creds = Credentials.from_authorized_user_file('token.json') | |
| # If no valid credentials available, ask the user to login | |
| # if not creds or not creds.valid: | |
| # if creds and creds.expired and creds.refresh_token: | |
| # creds.refresh(Request()) | |
| # else: | |
| # # flow = InstalledAppFlow.from_client_secrets_file("credentials.json", SCOPES) | |
| # flow = InstalledAppFlow.from_client_config(config,SCOPES) | |
| # creds = flow.run_local_server(port=0) | |
| # # Save the credentials for next run | |
| # with open('token.json', 'w') as token: | |
| # token.write(creds.to_json()) | |
| MAPPING_FILENAME = "Data Mapping with ItemCode.xlsx" | |
| def convert_pdf_to_excel(pdf_file): | |
| inputpdf = PyPDF2.PdfReader(pdf_file) | |
| pages_no = len(inputpdf.pages) | |
| whole_data = [] | |
| for i in range(pages_no): | |
| inputpdf = PyPDF2.PdfReader(pdf_file) | |
| # output = PyPDF2.PdfWriter() | |
| # output.add_page(inputpdf.pages[i]) | |
| pageObj = inputpdf.pages[i] | |
| page_content = pageObj.extract_text() | |
| for each_table in [i for i in page_content.split('Delivery Schedule Sheet') if i]: | |
| data = each_table.split('\n') | |
| each_table_data = [] | |
| date_qty = [] | |
| row_start_index = 0 | |
| row_stop_index = 0 | |
| year = "" | |
| for index in range(len(data)): | |
| if data[index].strip() == 'Part No.': | |
| each_table_data.append(data[index+1].replace('Part Color Code',"")) | |
| if 'Part Name' not in data[index+2]: | |
| each_table_data.append(data[index+2].replace('Part Color Code',"")) | |
| else: | |
| each_table_data.append("") | |
| if data[index].strip()=='MORIROKU TECHNOLOGY': | |
| try: | |
| year = data[index+1].split(' ')[0].split('/')[1] | |
| except Exception as e: | |
| print(e) | |
| year = "" | |
| if 'Part Name' in data[index].strip(): | |
| each_table_data.append(data[index+1].replace("Sched","")) | |
| if 'Inventory Category' in data[index].strip(): | |
| each_table_data.append(data[index+1].replace('Receive Type',"")) | |
| if data[index].strip() == 'ADJ': | |
| row_start_index = index + 1 | |
| if data[index].strip() == 'Total': | |
| row_stop_index = index | |
| if row_start_index>0 and row_stop_index>0: | |
| for index in range(row_start_index,row_stop_index): | |
| if '/' in data[index].strip(): | |
| date_qty.append([data[index].strip()[-5:].strip() + "/"+year,data[index+1].strip()]) | |
| if not date_qty: | |
| date_qty = [["",""]] | |
| each_table_data.append(date_qty) | |
| whole_data.append(each_table_data) | |
| whole_data = pd.DataFrame(whole_data) | |
| whole_data.columns = ["Part No.","Part Color Code","Part Name",'Inventory Category','Date Qty'] | |
| extracted_file = "Data Extracted.xlsx" | |
| data_for_mapping = "Data Mapping.xlsx" | |
| extracted_data_for_mapping = whole_data.drop(['Inventory Category','Date Qty'],axis=1) | |
| extracted_data_for_mapping = extracted_data_for_mapping.drop_duplicates(subset=["Part No.","Part Color Code","Part Name"]) | |
| extracted_data_for_mapping.columns = ['Customer Part no as per pdf','Customer Part color as per pdf','Customer Part name as per pdf'] | |
| extracted_data_for_mapping['Item Code'] = "" | |
| whole_data.to_excel(extracted_file, index=False) | |
| extracted_data_for_mapping.to_excel(data_for_mapping, index=False) | |
| return extracted_file,data_for_mapping | |
| def map_data_to_template(excel_file, mapping_file): | |
| # Load Excel file and mapping file | |
| extracted_data = pd.read_excel(excel_file) | |
| mapping_data = pd.read_excel(mapping_file) | |
| mapping_data = mapping_data.drop_duplicates(subset=['Customer Part no as per pdf','Customer Part name as per pdf','Customer Part color as per pdf','Item Code']) | |
| mapping_data.to_excel(MAPPING_FILENAME,index=False) | |
| save_mapping_file_to_drive() | |
| mapping_data = mapping_data.rename(columns = {'Customer Part no as per pdf':'Part No.','Customer Part name as per pdf':'Part Name','Customer Part color as per pdf':'Part Color Code'}) | |
| # Perform mapping | |
| extracted_data['Date Qty'] = extracted_data['Date Qty'].apply(lambda x: ast.literal_eval(x)) | |
| extracted_data = extracted_data.explode('Date Qty') | |
| extracted_data[['SchDate','Qty']]= pd.DataFrame(extracted_data['Date Qty'].to_list(), index= extracted_data.index) | |
| extracted_data = extracted_data.drop('Date Qty',axis=1) | |
| extracted_data = extracted_data[~extracted_data['SchDate'].isna()] | |
| mapped_data = extracted_data.merge(mapping_data, on =["Part No.","Part Name","Part Color Code"],how='outer')[['Item Code','SchDate','Qty','Inventory Category']] | |
| mapped_data = mapped_data[~mapped_data["SchDate"].isna()] | |
| mapped_data = mapped_data[~mapped_data["SchDate"].str.strip().isin(["",None])] | |
| mapped_data['SOType'] = "R" | |
| mapped_data['SchDate'] = mapped_data['SchDate'].astype("str") | |
| return mapped_data[["SchDate","SOType","Item Code","Qty","Inventory Category"]] | |
| def save_mapping_file_to_drive(): | |
| # creds = Credentials.from_authorized_user_info(credentials_dict) | |
| creds = authenticate() | |
| service = build('drive', 'v3', credentials=creds) | |
| # Authenticate with Google Drive API | |
| # service = build('drive', 'v3', credentials=creds) | |
| # List all files in the folder | |
| # results = service.files().list( | |
| # q=f"'{folder_id}' in parents and mimeType='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'", | |
| # fields="files(id, name)").execute() | |
| results = service.files().list( | |
| q=f"mimeType='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'", | |
| fields="files(id, name)").execute() | |
| files = results.get('files', []) | |
| files = [i for i in files if i.get('name')==MAPPING_FILENAME] | |
| if not files: | |
| print('No Excel Mapping files found in the folder.') | |
| else: | |
| for file in files: | |
| # Get the ID and name of the first Excel file found in the folder | |
| existing_file_id = file['id'] | |
| # Overwrite the existing file | |
| media = MediaFileUpload(MAPPING_FILENAME, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') | |
| service.files().update(fileId=existing_file_id,media_body=media).execute() | |
| # file_metadata = {'name': MAPPING_FILENAME } | |
| # media = MediaFileUpload(MAPPING_FILENAME, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') | |
| # service.files().create(body=file_metadata, media_body=media, fields='id').execute() | |
| def pull_mapping_file_from_drive(): | |
| try: | |
| creds = authenticate() | |
| # service = build('drive', 'v3', credentials=creds) | |
| # creds = Credentials.from_authorized_user_info(credentials_dict) | |
| # Authenticate with Google Drive API | |
| service = build('drive', 'v3', credentials=creds) | |
| results = service.files().list( | |
| q="mimeType='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'", | |
| fields="files(id, name)").execute() | |
| files = results.get('files', []) | |
| files = [i for i in files if i.get('name')==MAPPING_FILENAME] | |
| if files: | |
| file_id = files[0]['id'] | |
| file_name = files[0]['name'] | |
| request = service.files().get_media(fileId=file_id) | |
| fh = open(file_name, 'wb') | |
| downloader = MediaIoBaseDownload(fh, request) | |
| # Execute the download | |
| done = False | |
| while not done: | |
| status, done = downloader.next_chunk() | |
| fh.close() | |
| return 1 | |
| return 0 | |
| except Exception as e: | |
| print(e) | |
| return 0 | |
| def delete_master_file(): | |
| creds = authenticate() | |
| service = build('drive', 'v3', credentials=creds) | |
| # Authenticate with Google Drive API | |
| # service = build('drive', 'v3', credentials=creds) | |
| # List all files in the folder | |
| results = service.files().list( | |
| q=f"mimeType='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'", | |
| fields="files(id, name)").execute() | |
| files = results.get('files', []) | |
| files = [i for i in files if i.get('name')==MAPPING_FILENAME] | |
| if not files: | |
| print('No Excel Mapping files found in the folder.') | |
| else: | |
| for file in files: | |
| # Get the ID and name of the first Excel file found in the folder | |
| existing_file_id = file['id'] | |
| existing_file_name = file['name'] | |
| # Delete the existing file | |
| service.files().delete(fileId=existing_file_id).execute() | |
| print("Deleted master file") | |
| def main(): | |
| # Load your logo image | |
| logo_path = "logo.jpeg" | |
| logo_base64 = get_image_as_base64(logo_path) | |
| logo_html = f""" | |
| <div style="display: flex; justify-content: center; align-items: center; height: 100px;"> | |
| <img src="data:image/jpeg;base64,{logo_base64}" style="width: 100px; height: 100px;"> | |
| </div> | |
| """ | |
| # Display the logo HTML | |
| st.markdown(logo_html, unsafe_allow_html=True) | |
| st.markdown("<h1 style='text-align: center;'>PDF to Excel Converter</h1>", unsafe_allow_html=True) | |
| # File uploader | |
| st.markdown("### STEP 1") | |
| st.markdown("#### Upload a PDF File") | |
| uploaded_file = st.file_uploader("### Upload a PDF file", type=["pdf"]) | |
| if uploaded_file is not None: | |
| st.write("Uploaded PDF file:", uploaded_file.name) | |
| # Convert PDF to Excel | |
| extracted_file,data_for_mapping = convert_pdf_to_excel(uploaded_file) | |
| file_present = pull_mapping_file_from_drive() | |
| if file_present: | |
| try: | |
| mapping_data_from_drive = pd.read_excel(MAPPING_FILENAME) | |
| extracted_data_for_mapping = pd.read_excel(data_for_mapping) | |
| extracted_data_for_mapping.columns = [i.strip() for i in extracted_data_for_mapping.columns] | |
| mapping_data_from_drive.columns = [i.strip() for i in mapping_data_from_drive.columns if "inventory category" not in i.lower()] | |
| mapping_data_from_drive = mapping_data_from_drive.drop_duplicates(subset=['Customer Part no as per pdf','Customer Part name as per pdf','Customer Part color as per pdf','Item Code']) | |
| extracted_data_for_mapping = extracted_data_for_mapping[['Customer Part no as per pdf','Customer Part name as per pdf','Customer Part color as per pdf']].merge(mapping_data_from_drive, on = ['Customer Part no as per pdf','Customer Part name as per pdf','Customer Part color as per pdf'], how='outer') | |
| extracted_data_for_mapping.to_excel(data_for_mapping,index=False) | |
| except Exception as e: | |
| st.error("Error in the Mapping Master file on Cloud. " + str(e)) | |
| st.error("Please reupload the Data Master file with Item Code mapping") | |
| delete_master_file() | |
| file_present = None | |
| # Download link for the Excel file | |
| # st.markdown(f"Download the extracted data in Excel file [here](/{excel_file})") | |
| if os.path.exists(data_for_mapping): | |
| with open(data_for_mapping, "rb") as f: | |
| excel_bytes = f.read() | |
| st.download_button( | |
| label="Download Excel file", | |
| data=excel_bytes, | |
| file_name=data_for_mapping, | |
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" | |
| ) | |
| else: | |
| st.error("Error: Converted Excel file not found") | |
| # st.markdown("##### Click the button below if you want to upload a new mapping file") | |
| # if st.button("Delete Mapping file in Cloud", key="delete"): | |
| # delete_master_file() | |
| # file_present = pull_mapping_file_from_drive() | |
| st.markdown("### STEP 2") | |
| mapping_uploaded_file = None | |
| if not file_present: | |
| st.markdown("#### Upload the Data Master file with Item Code mapping") | |
| mapping_uploaded_file = st.file_uploader("Upload the Data Master file with Item Code mapping", type=["xlsx","ods"]) | |
| else: | |
| mapping_data = pd.read_excel(MAPPING_FILENAME) | |
| # mapping_data = mapping_data.rename(columns = {'Customer Part no as per pdf':'Part No.'}) | |
| data_for_mapping = "Data Mapping.xlsx" | |
| extracted_data_for_mapping = pd.read_excel(data_for_mapping) | |
| if 'Item Code' not in extracted_data_for_mapping.columns: | |
| extracted_data_for_mapping['Item Code'] = "" | |
| extracted_data_for_mapping = extracted_data_for_mapping[extracted_data_for_mapping['Item Code'].isna()] | |
| unmapped_part_no = extracted_data_for_mapping['Customer Part no as per pdf'].nunique() | |
| if unmapped_part_no>0: | |
| st.markdown("There are {} Part No. with No ItemCode present. Upload a new file after mapping them".format(unmapped_part_no)) | |
| st.markdown("Do you want to skip this or Upload a new Mapping File") | |
| if 'button_pressed' not in st.session_state: | |
| st.session_state.button_pressed = None | |
| # placeholder = st.empty() # Create a placeholder | |
| if st.session_state.button_pressed is None: | |
| if st.button("Skip"): | |
| st.session_state.button_pressed = "Skip" | |
| # placeholder.empty() # Clear the placeholder content | |
| if st.button("Upload a new Master Mapping"): | |
| st.session_state.button_pressed = "Upload a new Master Mapping" | |
| # placeholder.empty() # Clear the placeholder content | |
| if st.session_state.button_pressed is not None: | |
| # Common block of code that uses the variable | |
| if st.session_state.button_pressed == "Skip": | |
| mapping_uploaded_file = MAPPING_FILENAME | |
| # Add your code that runs when Yes is pressed | |
| elif st.session_state.button_pressed == "Upload a new Master Mapping": | |
| mapping_uploaded_file = st.file_uploader("Upload the Data Master file with Item Code mapping", type=["xlsx","ods"]) | |
| # Add your code that runs when No is pressed | |
| else: | |
| st.markdown("All Part No. are mapped with ItemCode so using the Mapping file available in Google Drive") | |
| mapping_uploaded_file = MAPPING_FILENAME | |
| if mapping_uploaded_file is not None: | |
| # st.write("Uploaded Mapping Excel file:", mapping_uploaded_file.name) | |
| # Perform data mapping | |
| mapped_data = map_data_to_template(extracted_file, mapping_uploaded_file) | |
| # Provide a link to download the final Excel file after mapping | |
| st.markdown("### FINAL DOWNLOAD") | |
| st.markdown("Final Excel File After Mapping") | |
| final_excel_file = 'Final Data.xlsx' | |
| mapped_data.to_excel(final_excel_file, index=False,engine='openpyxl') | |
| if os.path.exists(final_excel_file): | |
| with open(final_excel_file, "rb") as f: | |
| excel_bytes = f.read() | |
| st.download_button( | |
| label="Download Excel file", | |
| data=excel_bytes, | |
| file_name=final_excel_file, | |
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" | |
| ) | |
| else: | |
| st.error("Error: Converted Excel file not found") | |
| st.session_state.button_pressed = None | |
| if __name__ == "__main__": | |
| main() |