Spaces:
Paused
Paused
| import fitz # PyMuPDF | |
| from io import BytesIO | |
| import re | |
| import requests | |
| def split_links(links_string): | |
| # Remove any extra whitespace around each link after splitting | |
| links_array = [link.strip() for link in links_string.split(',')] | |
| return links_array | |
| def texts_from_pdf(pdfshareablelinks, heading_to_search): | |
| print('intexts',pdfshareablelinks) | |
| pdfshareablelinks=split_links(pdfshareablelinks) | |
| # Case 1: If it's a shareable link | |
| for link in pdfshareablelinks: | |
| pdf_content = None | |
| if link and ('http' in link or 'dropbox' in link): | |
| # Modify Dropbox link for direct download | |
| if 'dl=0' in link: | |
| link = link.replace('dl=0', 'dl=1') | |
| # Download the PDF content from the shareable link | |
| response = requests.get(link) | |
| pdf_content = BytesIO(response.content) # Store the content in memory | |
| print('Downloaded from shareable link.') | |
| # elif dbpdfpath: | |
| # dbxTeam = tsadropboxretrieval.ADR_Access_DropboxTeam('user') | |
| # print('Dropbox team access initialized.') | |
| # md, res = dbxTeam.files_download(path=dbpdfpath) | |
| # pdf_content = BytesIO(res.content) # Store the content in memory | |
| # print('Downloaded from Dropbox path.') | |
| # Check if the PDF content is available | |
| if pdf_content is None: | |
| raise ValueError("No valid PDF content found.") | |
| # Open the PDF using fitz (PyMuPDF) directly from memory | |
| pdf_document = fitz.open(stream=pdf_content, filetype="pdf") | |
| print('PDF opened in memory.') | |
| all_text = "" # Initialize a string to store all text | |
| current_line = "" # To build the current line | |
| collecting_text = False # Track whether we're currently collecting text under the heading | |
| f10_count = 0 # Counter for F10 headings | |
| current_y = None # To track the y-coordinate | |
| # Define a regex pattern to match headings | |
| heading_pattern = re.compile(r"[A-Za-z]\d{2}") # Heading pattern (letter followed by two numbers) | |
| # Loop through each page in the PDF | |
| for page_num in range(pdf_document.page_count): | |
| page = pdf_document.load_page(page_num) | |
| # Get text as dictionary to extract lines | |
| text_dict = page.get_text("dict") | |
| # Collect header y-coordinates to determine header area | |
| header_y_values = [] | |
| # First pass to collect y-coordinates for detecting header area | |
| for block in text_dict['blocks']: | |
| if 'lines' in block: # Check if 'lines' key exists | |
| for line in block['lines']: | |
| for span in line['spans']: | |
| header_y_values.append(span['bbox'][1]) # Collect top y-coordinates of spans | |
| # Determine a threshold for the header area (e.g., top 20% of the page height) | |
| header_threshold = min(header_y_values) + (page.rect.height * 0.1) # Adding 10% for a buffer | |
| # print(f"Header threshold for page {page_num + 1}: {header_threshold}") | |
| # Iterate over blocks, lines, and spans to extract lines of text | |
| for block in text_dict['blocks']: | |
| if 'lines' in block: # Check if 'lines' key exists | |
| for line in block['lines']: | |
| for span in line['spans']: | |
| span_text = span['text'].strip() | |
| span_y = span['bbox'][1] # Get the top y-coordinate of the span | |
| # Check if it's a heading based on the format | |
| if heading_pattern.match(span_text): | |
| if heading_to_search in span_text: | |
| f10_count += 1 # Increment the F10 counter | |
| # Start collecting text under the second occurrence of F10 | |
| if f10_count == 2: | |
| collecting_text = True # Start collecting text | |
| print(f"Starting collection under heading: {span_text}") | |
| # Stop collecting text if we reach a new heading | |
| if collecting_text: | |
| # If we encounter a new heading, we stop the collection | |
| if heading_pattern.match(span_text) and span_text != heading_to_search: | |
| print(f"Ending collection at heading: {span_text}") | |
| collecting_text = False # Stop collecting | |
| return all_text.strip() # Return collected text | |
| # If we're collecting text, add it to the output | |
| if collecting_text: | |
| # Exclude spans that fall within the header area | |
| if span_y < header_threshold: | |
| continue # Skip spans in the header area | |
| if current_y is None: | |
| current_y = span_y # Initialize the first y-coordinate | |
| # Check if the current span belongs to the same line (based on y-coordinate) | |
| if abs(current_y - span_y) < 2: # Threshold to determine if it's the same line | |
| current_line += " " + span_text # Add span text to the current line | |
| else: | |
| # If it's a new line, append the current line to all_text | |
| all_text += current_line.strip() + '\n' # Add line to all_text with a newline | |
| current_line = span_text # Start the new line with the current span | |
| current_y = span_y # Update the y-coordinate for the new line | |
| # Append the current line if we hit a new line at the end of the page | |
| if current_line: | |
| all_text += current_line.strip() + '\n' | |
| current_line = "" # Reset for the next line | |
| # print(f"\nCollected Text:\n{all_text.strip()}") | |
| return all_text.strip() if f10_count > 1 else "Heading not found" | |
| def apiFiltering(apitext): | |
| filtered_items = [] | |
| for item in apitext: | |
| project_template_details = item.get('projecttemplatedetails', []) | |
| for detail in project_template_details: | |
| filtered_items.append({ | |
| "id": detail.get('id'), | |
| "projecttemplateid": detail.get('projecttemplateid'), | |
| "bqcode": detail.get('bqcodelibrary', {}).get('bqcode') | |
| }) | |
| return filtered_items | |
| def clean_text(text): | |
| # Replace all newlines and tabs with a space | |
| text = re.sub(r'[\n\t]+', ' ', text) | |
| return text.strip() | |
| def texts_from_pdfAllText(link): | |
| pdf_content = None | |
| all_text = "" | |
| if link and ('http' in link or 'dropbox' in link): | |
| # Modify Dropbox link for direct download | |
| if 'dl=0' in link: | |
| link = link.replace('dl=0', 'dl=1') | |
| # Download the PDF content from the shareable link | |
| response = requests.get(link) | |
| pdf_content = BytesIO(response.content) # Store the content in memory | |
| print('Downloaded from shareable link.') | |
| # Check if the PDF content is available | |
| if pdf_content is None: | |
| raise ValueError("No valid PDF content found.") | |
| # Open the PDF using fitz (PyMuPDF) directly from memory | |
| pdf_document = fitz.open(stream=pdf_content, filetype="pdf") | |
| print('PDF opened in memory.') | |
| for page_num in range(pdf_document.page_count): | |
| page = pdf_document[page_num] | |
| text_instances = page.get_text() | |
| all_text+=text_instances | |
| # cleaned_text = clean_text(all_text) | |
| print(all_text) | |
| return all_text | |
| # import fitz | |
| # import tsadropboxretrieval | |
| # from io import BytesIO | |
| # import requests | |
| # def texts_from_pdf(pdfshareablelink): | |
| # print('intexts') | |
| # pdf_content = None | |
| # # Case 1: If it's a shareable link | |
| # if pdfshareablelink and ('http' in pdfshareablelink or 'dropbox' in pdfshareablelink): | |
| # # Modify Dropbox link for direct download | |
| # if 'dl=0' in pdfshareablelink: | |
| # pdfshareablelink = pdfshareablelink.replace('dl=0', 'dl=1') | |
| # # Download the PDF content from the shareable link | |
| # response = requests.get(pdfshareablelink) | |
| # pdf_content = BytesIO(response.content) # Store the content in memory | |
| # print('Downloaded from shareable link.') | |
| # # Case 2: If it's a Dropbox path, use the Dropbox API to download | |
| # elif dbpdfpath: | |
| # dbxTeam = tsadropboxretrieval.ADR_Access_DropboxTeam('user') | |
| # print('Dropbox team access initialized.') | |
| # md, res = dbxTeam.files_download(path=dbpdfpath) | |
| # pdf_content = BytesIO(res.content) # Store the content in memory | |
| # print('Downloaded from Dropbox path.') | |
| # # Check if the PDF content is available | |
| # if pdf_content is None: | |
| # raise ValueError("No valid PDF content found.") | |
| # # Open the PDF using fitz (PyMuPDF) directly from memory | |
| # pdf_document = fitz.open(stream=pdf_content, filetype="pdf") | |
| # print('PDF opened in memory.') | |
| # all_text = "" # Initialize a string to store all text | |
| # current_line = "" # To build the current line | |
| # current_y = None # Track the y-coordinate of the current line | |
| # # Loop through each page in the PDF | |
| # for page_num in range(pdf_document.page_count): | |
| # page = pdf_document.load_page(page_num) | |
| # # Get text as dictionary to extract lines | |
| # text_dict = page.get_text("dict") | |
| # # Iterate over blocks, lines, and spans to extract lines of text | |
| # for block in text_dict['blocks']: | |
| # if 'lines' in block: # Check if 'lines' key exists | |
| # for line in block['lines']: | |
| # for span in line['spans']: | |
| # span_text = span['text'].strip() | |
| # span_y = span['bbox'][1] # Y-coordinate of the span (bbox[1] is the top y-coordinate) | |
| # # Check if the current span belongs to the same line (based on y-coordinate) | |
| # if current_y is None: | |
| # current_y = span_y # Initialize the first y-coordinate | |
| # if abs(current_y - span_y) < 2: # Threshold to determine if it's the same line | |
| # # If the y-coordinate is close enough, add to the current line | |
| # current_line += " " + span_text | |
| # else: | |
| # # If it's a new line, append the current line and reset | |
| # all_text += current_line.strip() + '\n' # Add line to all_text with a newline | |
| # current_line = span_text # Start the new line with the current span | |
| # current_y = span_y # Update the y-coordinate for the new line | |
| # # Append the last line of the page (if there's any) | |
| # if current_line: | |
| # all_text += current_line.strip() + '\n' | |
| # current_line = "" # Reset after each page | |
| # # all_text = all_text.replace('\n', ' ') | |
| # # return all_lines | |
| # print(all_text) | |
| # return all_text | |
| # # print('intexts') | |
| # # dbxTeam= tsadropboxretrieval.ADR_Access_DropboxTeam('user') | |
| # # print('dbdone') | |
| # # md, res =dbxTeam.files_download(path=dbpdfpath) | |
| # # print('downloaded') | |
| # # dataDoc = res.content | |
| # # print('l') | |
| # # pdf_document = fitz.open('pdf',dataDoc) | |
| # # print('k') | |
| # # alltexts='' | |
| # # for page_num in range(pdf_document.page_count): | |
| # # page = pdf_document[page_num] | |
| # # text_instances = page.get_text() | |
| # # alltexts+=text_instances | |
| # # # alltexts = alltexts.replace('\n', ' ') | |
| # # return alltexts | |