Spaces:
Runtime error
Runtime error
File size: 6,494 Bytes
e5b9101 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
import os
import time
from selenium import webdriver
import base64
import requests
import json
import csv
import gradio as gr
from openai import OpenAI
import uuid
def capture_full_page_screenshots(url, output_folder, scroll_size=400):
driver = webdriver.Chrome()
driver.get(url)
driver.maximize_window()
if not os.path.exists(output_folder):
os.makedirs(output_folder)
total_height = driver.execute_script("return document.body.scrollHeight")
scroll_position = 0
while scroll_position < total_height:
# Generate a random UUID string for each screenshot
random_string = str(uuid.uuid4())
screenshot_path = os.path.join(output_folder, f"screenshot_{random_string}.png")
driver.save_screenshot(screenshot_path)
print(f"Saved {screenshot_path}")
scroll_position += scroll_size
driver.execute_script(f"window.scrollTo(0, {scroll_position});")
time.sleep(1)
driver.quit()
return f"Screenshots saved to {output_folder}"
def encode_image(image_path):
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
def vision(api_key, folder_path):
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
csv_file_path = 'product_details.csv'
with open(csv_file_path, mode='w', newline='', encoding='utf-8') as file:
csv_writer = csv.writer(file)
csv_writer.writerow(["Product Name", "Product Price"])
for filename in os.listdir(folder_path):
if filename.endswith(".png"):
image_path = os.path.join(folder_path, filename)
base64_image = encode_image(image_path)
payload = {
"model": "gpt-4-turbo",
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": """The image might contain name of some products and their respective pricing.
Identify them. Ignore the partially visible names. Return me the details in json format.
The json output should have two variables: 1. Product Name 2. Product Price
You should only pass the json output and say nothing else. Just the json output in needed
"""
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}"
}
}
]
}
],
"max_tokens": 300
}
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
if response.status_code == 200:
data = response.json()
if 'choices' in data and len(data['choices']) > 0 and 'message' in data['choices'][0]:
content = data['choices'][0]['message']['content']
try:
clean_content = content.strip().replace('```json', '').replace('```', '').replace('\n', '')
products = json.loads(clean_content)
for product in products:
if 'Product Name' in product and 'Product Price' in product:
csv_writer.writerow([product['Product Name'], product['Product Price']])
except json.JSONDecodeError as e:
print("Failed to parse JSON:", e)
print("Cleaned JSON content that failed:", repr(clean_content))
else:
continue
return "Successfully Updated the File"
def update_url(url_input, output_folder, api_key_input):
client = OpenAI(api_key=api_key_input)
current_url = url_input
while True:
try:
completion = client.chat.completions.create(
model="gpt-3.5-turbo-0301",
messages=[
{"role": "system", "content": "You are a URL modifier. Given an url, you will modify it accordingly. You will not access the website"},
{"role": "user", "content": f'You need to modify the given url {current_url} in a way where I can access the following page. Try to identify at which part of the url, the pagination is defined and modify that part. Just provide the modified URL. You are not supposed to talk anything else with the user.'}
]
)
# Correctly extract the updated URL from the response
updated_url = completion.choices[0].message.content
# Check if the URL is valid
response = requests.get(updated_url)
if response.status_code != 200:
print(f"Failed to access {updated_url}. Stopping the loop.")
break
capture_full_page_screenshots(updated_url, output_folder)
current_url = updated_url
except Exception as e:
print(f"An error occurred: {e}. Stopping the loop.")
break
return f"Processing Completed. Screenshots saved in {output_folder}"
def process(url, output_folder, api_key,web_type_input):
if web_type_input == "Dynamic":
capture_full_page_screenshots(url, output_folder)
vision(api_key, output_folder)
else:
capture_full_page_screenshots(url, output_folder)
update_url(url, output_folder,api_key)
vision(api_key, output_folder)
return "Processing Completed"
# Gradio UI
url_input = gr.Textbox(label="URL")
output_folder_input = gr.Textbox(label="Output Folder Path")
api_key_input = gr.Textbox(label="API Key", type="password")
web_type_input = mode_input = gr.Dropdown(label="Mode", choices=["Dynamic", "Paginated"])
gr.Interface(
fn=process,
inputs=[url_input, output_folder_input, api_key_input,web_type_input ],
outputs="text",
title="Full Page Screenshot and OCR"
).launch()
|