InitialMarkups / tsadropboxretrieval.py
Marthee's picture
Update tsadropboxretrieval.py
52bfe43 verified
# -*- coding: utf-8 -*-
"""TSADropboxRetrieval.ipynb
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/1d-UI3Y-z7Dj-vqu69CxluOUnN4rvsUuE
"""
# !pip install dropbox -q
# pip install pymupdf #==1.22.5
import os
import base64
import requests
import json
import pathlib
import pandas as pd
import dropbox
from dropbox.exceptions import AuthError
import fitz
import io
import re
import pyarrow
from io import BytesIO
from functools import lru_cache
from io import BytesIO
import os
@lru_cache(maxsize=1)
def load_parquet_df():
return GetParquetDF()
files_list=[]
app_key='9bljerefjumct38'
app_secret='nl6k66clw1j1k12'
access_code='sl.Bou05Rb15xPy851-I1UV8oOabHPY21AEPl5nrYl-Q0ninFSy0kTuRWPSve_JPbd3Z03E7eBY4r9R454rdzDM0AxLkyqrQEDzyAGUwP7kZ7s2CR6EwvdLD2a7Xh8nFEs38voLTH2IHzrQ2QEx7rji4OJ8aSQStKtJkI7_dh8tYHj5'
refresh_token='qK2VqvbxWMMAAAAAAAAAAXFQvrHM4xUwWUcZ6l5vGOygn1iAA6zlDjmAQNBbZprL'
basic_auth=base64.b64encode(f'{app_key}:{app_secret}'.encode())
def ADR_Access_DropboxTeam(flag):
if flag=='user':
dbxTeam = dropbox.DropboxTeam(app_key=app_key,
app_secret=app_secret,
oauth2_refresh_token = refresh_token).as_user('dbmid:AACjJg2GKc3tI42iOnD01dd6s0XDyyx6Thw')
elif flag=='admin':
dbxTeam = dropbox.DropboxTeam(app_key=app_key,
app_secret=app_secret,
oauth2_refresh_token = refresh_token).as_admin('dbmid:AACjJg2GKc3tI42iOnD01dd6s0XDyyx6Thw')
root_namespace_id = dbxTeam.users_get_current_account().root_info.root_namespace_id
dbxTeam = dbxTeam.with_path_root(dropbox.common.PathRoot.root(root_namespace_id))
return dbxTeam
def getSharedLink(path):
dbxTeam=ADR_Access_DropboxTeam('user')
try:
shared_link_metadata = dbxTeam.sharing_create_shared_link_with_settings(path)
except:
shared_link_metadata=dbxTeam.sharing_create_shared_link(path)
return shared_link_metadata.url
def handle_entries(entries , files_list):
for file in entries:
if isinstance(file, dropbox.files.FileMetadata):
# if str(file.name).endswith(".pdf"):
metadata = {
'name': file.name,
'path_display': file.path_display,
'client_modified': file.client_modified,
'server_modified': file.server_modified
}
files_list.append(metadata)
df = pd.DataFrame.from_records(files_list)
return df
def dropbox_connect():
"""Create a connection to myyyyyyy Dropbox."""
print('connecy')
try:
# print('ayhaga')
dbxMe = dropbox.Dropbox(
app_key='67w6ibpa9d2b60x',
app_secret='d3ecz8g1604fu04',
oauth2_refresh_token = 'R_LACBBNhysAAAAAAAAAAXt9mMy9OYIV_v4pF45lG6Z8DHNV66rq1q7acWjj_H5g',
)
# dbx=dropbox.Dropbox(access_token)
except AuthError as e:
print('Error connecting to Dropbox with access token: ' + str(e))
return dbxMe
def dropbox_upload_file(df, flag=0):
try:
dbxTeam= ADR_Access_DropboxTeam('admin')
path='/TSA JOBS/ADR Test/DropboxDirectory/df.parquet.gzip'
doc=df.to_parquet()
dbxTeam.files_delete(path)
meta=dbxTeam.files_upload(doc,path)
except Exception as e:
print('Error uploading file to Dropbox: ' + str(e))
return dbxTeam
def check_if_file_exists(dbxTeam,path):
try:
md = dbxTeam.files_get_metadata(path)
exists_bool = True
return exists_bool
except Exception as error_response:
exists_bool = False
return exists_bool
def uploadmarkupPDFTable(doc,pdfname,path):
dbxTeam= ADR_Access_DropboxTeam('admin')
try:
path=path+pdfname
exists_bool=check_if_file_exists(dbxTeam,path)
if exists_bool:
print("if gowa el else <3")
dbxTeam.files_delete(path)
print("abl el meta <3")
meta=dbxTeam.files_upload(doc.read() ,path)
try:
shared_link_metadata = dbxTeam.sharing_create_shared_link_with_settings(path)
except:
shared_link_metadata=dbxTeam.sharing_create_shared_link(path)
# print(shared_link_metadata.url)
return shared_link_metadata.url
except Exception as e:
print('Error uploading file to Dropbox: ' + str(e))
def upload_string_file(content_str, filename, path):
try:
dbxTeam = ADR_Access_DropboxTeam('admin')
full_path = path + filename
# convert string to bytes
f = BytesIO(content_str.encode("utf-8"))
# delete if file already exists
if check_if_file_exists(dbxTeam, full_path):
dbxTeam.files_delete(full_path)
# upload
meta = dbxTeam.files_upload(f.getvalue(), full_path)
# create shared link
try:
shared_link_metadata = dbxTeam.sharing_create_shared_link_with_settings(full_path)
except:
shared_link_metadata = dbxTeam.sharing_create_shared_link(full_path)
return shared_link_metadata.url
except Exception as e:
print("Error uploading file to Dropbox:", str(e))
return "Error uploading file to Dropbox."
def uploadanyFile(doc, pdfname, path, flag=0):
try:
dbxTeam = ADR_Access_DropboxTeam('admin')
# Determine initial file path
if flag: # XML upload
pdfname = str(pdfname).split('.pdf')[0] + '.xml'
file_path = path + pdfname
f = BytesIO()
doc.write(f, encoding='utf-8', xml_declaration=True)
data_to_upload = f.getvalue()
else:
file_path = path + pdfname
data_to_upload = doc.write()
# Extract base name and extension for duplicate handling
base_name, ext = os.path.splitext(pdfname)
counter = 1
# If file exists, increment suffix until unique
while check_if_file_exists(dbxTeam, file_path):
new_name = f"{base_name}({counter}){ext}"
file_path = path + new_name
counter += 1
# Upload file
meta = dbxTeam.files_upload(data_to_upload, file_path)
# Try to create or retrieve shared link
try:
shared_link_metadata = dbxTeam.sharing_create_shared_link_with_settings(file_path)
except:
shared_link_metadata = dbxTeam.sharing_create_shared_link(file_path)
return shared_link_metadata.url
except Exception as e:
print('Error uploading file to Dropbox: ' + str(e))
return 'Error uploading file to Dropbox.'
# Call when the dropbox is updated with new items - if not , call parquet saved version of the df of saved items
# Call when the dropbox is updated with new items - if not , call parquet saved version of the df of saved items
def DropboxItemstoDF(folder_path):
files_list=[]
dbxTeam=ADR_Access_DropboxTeam('user') # or pass dbx in parameters
# folder_path = "/TSA JOBS"
res = dbxTeam.files_list_folder(path=folder_path, recursive=True )
# df1=handle_entries(res.entries , files_list)
if res.has_more:
while res.has_more:
res = dbxTeam.files_list_folder_continue(cursor=res.cursor)
df2=handle_entries(res.entries , files_list)
# dbxTeam=dropbox_upload_file(df2)
# print(df2)
return df2 , files_list
def GetParquetDF():
# Initialize Dropbox client
dbxTeam = ADR_Access_DropboxTeam('user') # or pass dbx in parameters
# Define the path to the Parquet file on Dropbox
path = '/TSA JOBS/ADR Test/DropboxDirectory/df.parquet.gzip'
try:
# Try to create a shared link with settings
shared_link_metadata = dbxTeam.sharing_create_shared_link_with_settings(path=path)
except dropbox.exceptions.ApiError:
# If settings are not supported, create a shared link without settings
shared_link_metadata = dbxTeam.sharing_create_shared_link(path=path)
# Get the file content from the shared link
_, res = dbxTeam.sharing_get_shared_link_file(url=shared_link_metadata.url)
data = res.content
# Read the Parquet file content into a pandas DataFrame
with io.BytesIO(data) as pq_file:
df = pd.read_parquet(pq_file)
return df
def getPathtoPDF_File(nameofPDF,progress_callback=None):
parquetDf = load_parquet_df()
nameofPDF=nameofPDF.replace('"', '')
try:
# path=parquetDf.loc[parquetDf['name'] == nameofPDF, 'path_display'].iloc[0]
path = parquetDf.at[parquetDf.index[parquetDf['name'] == nameofPDF][0], 'path_display']
if progress_callback:
progress_callback(60)
link=getSharedLink(path)
print(path,link)
except:
return 'Project does not exist'
return path,link
# parquetDf
# getPathtoPDF_File('A5157-EBLA-V5-XX-SH-L-0004-D2-01.pdf')
def getPDFData(path):
dbxTeam= ADR_Access_DropboxTeam('admin')
md, res =dbxTeam.files_download(path)
data = res.content
return data
def retrieveProjects(projname, progress_callback=None):
# if progress_callback:
progress_callback(20)
projnameNospaces = projname.strip().replace('"', '').replace("'", '').replace(" ", "")
print(projname,projnameNospaces)
projname = '/' + projnameNospaces[:4] # Extract main project name
projname = projname.replace('/"', '') # Remove unwanted characters
print('projname', projname)
parquetDf = load_parquet_df()
documentsToMeasure = []
RelevantDocuments = []
# Send progress update (20%)
if progress_callback:
progress_callback(40)
# Store the original path before converting it to lowercase
parquetDf['original_path_display'] = parquetDf['path_display']
# Create a lowercase column for case-insensitive matching
parquetDf['path_display_lower'] = parquetDf['path_display'].str.lower()
if progress_callback:
progress_callback(50)
##### Updated code #######
# Filter using the lowercase column but retrieve the original paths
def path_matches(x):
#remove the file name from the path
folder_path = os.path.dirname(x)
#Check if project number (like /2564) appears in folder path
has_projnum = projname in folder_path
#Check if "01 project details" appears in folder path
has_details = '01 project details' in folder_path
return has_projnum and has_details
#Apply the mask
mask = parquetDf['path_display_lower'].apply(path_matches)
#### End of updated code #####
if progress_callback:
progress_callback(60)
# Retrieve the original (case-sensitive) paths before lowering them
RelevantDocuments = parquetDf[mask][['name', 'original_path_display']].values.tolist()
documentsToMeasure = [doc for doc in RelevantDocuments if doc[0].endswith('.pdf')] # Keep only PDFs
print('documentsToMeasure',documentsToMeasure)
# Send progress update (80%)
if progress_callback:
progress_callback(70)
# Extract path from the original (case-sensitive) column
if RelevantDocuments:
extracted_path = RelevantDocuments[0][1].split("01 Project Details")[0] + "01 Project Details"
else:
extracted_path = None # Handle case when no match is found
# Remove temporary columns
parquetDf.drop(columns=['original_path_display', 'path_display_lower'], inplace=True)
if progress_callback:
progress_callback(80)
return documentsToMeasure, RelevantDocuments, extracted_path