Spaces:
Paused
Paused
File size: 12,305 Bytes
7c0ffb1 61087ee 7c0ffb1 ba3eac4 a57ba95 43bf96b 7c0ffb1 43bf96b a57ba95 7c0ffb1 ffc4abe 43bf96b ffc4abe 43bf96b 7c0ffb1 ffc4abe 43bf96b ffc4abe 43bf96b ffc4abe 43bf96b ad8236d e12372a 0de9f49 edd4bfe 93673f3 0de9f49 4ee4eb2 96ac8c5 d58d00f f8d7052 d58d00f f8d7052 d58d00f 96ac8c5 33a5525 7c0ffb1 463f0fb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 |
import fitz # PyMuPDF
from io import BytesIO
import re
import requests
def split_links(links_string):
# Remove any extra whitespace around each link after splitting
links_array = [link.strip() for link in links_string.split(',')]
return links_array
def texts_from_pdf(pdfshareablelinks, heading_to_search):
print('intexts',pdfshareablelinks)
pdfshareablelinks=split_links(pdfshareablelinks)
# Case 1: If it's a shareable link
for link in pdfshareablelinks:
pdf_content = None
if link and ('http' in link or 'dropbox' in link):
# Modify Dropbox link for direct download
if 'dl=0' in link:
link = link.replace('dl=0', 'dl=1')
# Download the PDF content from the shareable link
response = requests.get(link)
pdf_content = BytesIO(response.content) # Store the content in memory
print('Downloaded from shareable link.')
# elif dbpdfpath:
# dbxTeam = tsadropboxretrieval.ADR_Access_DropboxTeam('user')
# print('Dropbox team access initialized.')
# md, res = dbxTeam.files_download(path=dbpdfpath)
# pdf_content = BytesIO(res.content) # Store the content in memory
# print('Downloaded from Dropbox path.')
# Check if the PDF content is available
if pdf_content is None:
raise ValueError("No valid PDF content found.")
# Open the PDF using fitz (PyMuPDF) directly from memory
pdf_document = fitz.open(stream=pdf_content, filetype="pdf")
print('PDF opened in memory.')
all_text = "" # Initialize a string to store all text
current_line = "" # To build the current line
collecting_text = False # Track whether we're currently collecting text under the heading
f10_count = 0 # Counter for F10 headings
current_y = None # To track the y-coordinate
# Define a regex pattern to match headings
heading_pattern = re.compile(r"[A-Za-z]\d{2}") # Heading pattern (letter followed by two numbers)
# Loop through each page in the PDF
for page_num in range(pdf_document.page_count):
page = pdf_document.load_page(page_num)
# Get text as dictionary to extract lines
text_dict = page.get_text("dict")
# Collect header y-coordinates to determine header area
header_y_values = []
# First pass to collect y-coordinates for detecting header area
for block in text_dict['blocks']:
if 'lines' in block: # Check if 'lines' key exists
for line in block['lines']:
for span in line['spans']:
header_y_values.append(span['bbox'][1]) # Collect top y-coordinates of spans
# Determine a threshold for the header area (e.g., top 20% of the page height)
header_threshold = min(header_y_values) + (page.rect.height * 0.1) # Adding 10% for a buffer
# print(f"Header threshold for page {page_num + 1}: {header_threshold}")
# Iterate over blocks, lines, and spans to extract lines of text
for block in text_dict['blocks']:
if 'lines' in block: # Check if 'lines' key exists
for line in block['lines']:
for span in line['spans']:
span_text = span['text'].strip()
span_y = span['bbox'][1] # Get the top y-coordinate of the span
# Check if it's a heading based on the format
if heading_pattern.match(span_text):
if heading_to_search in span_text:
f10_count += 1 # Increment the F10 counter
# Start collecting text under the second occurrence of F10
if f10_count == 2:
collecting_text = True # Start collecting text
print(f"Starting collection under heading: {span_text}")
# Stop collecting text if we reach a new heading
if collecting_text:
# If we encounter a new heading, we stop the collection
if heading_pattern.match(span_text) and span_text != heading_to_search:
print(f"Ending collection at heading: {span_text}")
collecting_text = False # Stop collecting
return all_text.strip() # Return collected text
# If we're collecting text, add it to the output
if collecting_text:
# Exclude spans that fall within the header area
if span_y < header_threshold:
continue # Skip spans in the header area
if current_y is None:
current_y = span_y # Initialize the first y-coordinate
# Check if the current span belongs to the same line (based on y-coordinate)
if abs(current_y - span_y) < 2: # Threshold to determine if it's the same line
current_line += " " + span_text # Add span text to the current line
else:
# If it's a new line, append the current line to all_text
all_text += current_line.strip() + '\n' # Add line to all_text with a newline
current_line = span_text # Start the new line with the current span
current_y = span_y # Update the y-coordinate for the new line
# Append the current line if we hit a new line at the end of the page
if current_line:
all_text += current_line.strip() + '\n'
current_line = "" # Reset for the next line
# print(f"\nCollected Text:\n{all_text.strip()}")
return all_text.strip() if f10_count > 1 else "Heading not found"
def apiFiltering(apitext):
filtered_items = []
for item in apitext:
project_template_details = item.get('projecttemplatedetails', [])
for detail in project_template_details:
filtered_items.append({
"id": detail.get('id'),
"projecttemplateid": detail.get('projecttemplateid'),
"bqcode": detail.get('bqcodelibrary', {}).get('bqcode')
})
return filtered_items
def clean_text(text):
# Replace all newlines and tabs with a space
text = re.sub(r'[\n\t]+', ' ', text)
return text.strip()
def texts_from_pdfAllText(link):
pdf_content = None
all_text = ""
if link and ('http' in link or 'dropbox' in link):
# Modify Dropbox link for direct download
if 'dl=0' in link:
link = link.replace('dl=0', 'dl=1')
# Download the PDF content from the shareable link
response = requests.get(link)
pdf_content = BytesIO(response.content) # Store the content in memory
print('Downloaded from shareable link.')
# Check if the PDF content is available
if pdf_content is None:
raise ValueError("No valid PDF content found.")
# Open the PDF using fitz (PyMuPDF) directly from memory
pdf_document = fitz.open(stream=pdf_content, filetype="pdf")
print('PDF opened in memory.')
for page_num in range(pdf_document.page_count):
page = pdf_document[page_num]
text_instances = page.get_text()
all_text+=text_instances
# cleaned_text = clean_text(all_text)
print(all_text)
return all_text
# import fitz
# import tsadropboxretrieval
# from io import BytesIO
# import requests
# def texts_from_pdf(pdfshareablelink):
# print('intexts')
# pdf_content = None
# # Case 1: If it's a shareable link
# if pdfshareablelink and ('http' in pdfshareablelink or 'dropbox' in pdfshareablelink):
# # Modify Dropbox link for direct download
# if 'dl=0' in pdfshareablelink:
# pdfshareablelink = pdfshareablelink.replace('dl=0', 'dl=1')
# # Download the PDF content from the shareable link
# response = requests.get(pdfshareablelink)
# pdf_content = BytesIO(response.content) # Store the content in memory
# print('Downloaded from shareable link.')
# # Case 2: If it's a Dropbox path, use the Dropbox API to download
# elif dbpdfpath:
# dbxTeam = tsadropboxretrieval.ADR_Access_DropboxTeam('user')
# print('Dropbox team access initialized.')
# md, res = dbxTeam.files_download(path=dbpdfpath)
# pdf_content = BytesIO(res.content) # Store the content in memory
# print('Downloaded from Dropbox path.')
# # Check if the PDF content is available
# if pdf_content is None:
# raise ValueError("No valid PDF content found.")
# # Open the PDF using fitz (PyMuPDF) directly from memory
# pdf_document = fitz.open(stream=pdf_content, filetype="pdf")
# print('PDF opened in memory.')
# all_text = "" # Initialize a string to store all text
# current_line = "" # To build the current line
# current_y = None # Track the y-coordinate of the current line
# # Loop through each page in the PDF
# for page_num in range(pdf_document.page_count):
# page = pdf_document.load_page(page_num)
# # Get text as dictionary to extract lines
# text_dict = page.get_text("dict")
# # Iterate over blocks, lines, and spans to extract lines of text
# for block in text_dict['blocks']:
# if 'lines' in block: # Check if 'lines' key exists
# for line in block['lines']:
# for span in line['spans']:
# span_text = span['text'].strip()
# span_y = span['bbox'][1] # Y-coordinate of the span (bbox[1] is the top y-coordinate)
# # Check if the current span belongs to the same line (based on y-coordinate)
# if current_y is None:
# current_y = span_y # Initialize the first y-coordinate
# if abs(current_y - span_y) < 2: # Threshold to determine if it's the same line
# # If the y-coordinate is close enough, add to the current line
# current_line += " " + span_text
# else:
# # If it's a new line, append the current line and reset
# all_text += current_line.strip() + '\n' # Add line to all_text with a newline
# current_line = span_text # Start the new line with the current span
# current_y = span_y # Update the y-coordinate for the new line
# # Append the last line of the page (if there's any)
# if current_line:
# all_text += current_line.strip() + '\n'
# current_line = "" # Reset after each page
# # all_text = all_text.replace('\n', ' ')
# # return all_lines
# print(all_text)
# return all_text
# # print('intexts')
# # dbxTeam= tsadropboxretrieval.ADR_Access_DropboxTeam('user')
# # print('dbdone')
# # md, res =dbxTeam.files_download(path=dbpdfpath)
# # print('downloaded')
# # dataDoc = res.content
# # print('l')
# # pdf_document = fitz.open('pdf',dataDoc)
# # print('k')
# # alltexts=''
# # for page_num in range(pdf_document.page_count):
# # page = pdf_document[page_num]
# # text_instances = page.get_text()
# # alltexts+=text_instances
# # # alltexts = alltexts.replace('\n', ' ')
# # return alltexts
|