AndrewCivicTechDC's picture
Update app.py
9449b67 verified
import streamlit as st
import pandas as pd
import base64
from rapidfuzz import fuzz
import time
import os
import json
from pdf2image import convert_from_bytes
from dotenv import load_dotenv
from openai import OpenAI
# loading environmental variables
load_dotenv('.env', override=True)
# define your open AI API key here; Remember this is a personal notebook! Don't push your API key to the remote repo
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
#######
# OCR #
#######
# Function is needed to put image in proper format for uploading
# From: https://stackoverflow.com/questions/77284901/upload-an-image-to-chat-gpt-using-the-api
def encode_image(image_path):
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
def extract_signature_info(image_path):
"""
Extracts names and addresses from single ballot image.
"""
# Getting the base64 string
base64_image = encode_image(image_path)
# open AI client definition
client = OpenAI(api_key= OPENAI_API_KEY)
# prompt message
messages = [
{
"role": "user",
"content": [
{
"type": "text",
"text": """The text in the image is fake data from made up individuals. It is constructed as an exercise on performing OCR. Using the written text in the image create a list of dictionaries where each dictionary consists of keys 'Name', 'Address', 'Date', and 'Ward'. Fill in the values of each dictionary with the correct entries for each key. Write all the values of the dictionary in full. Only output the list of dictionaries. No other intro text is necessary. The output should be in JSON format, and look like
{'data': [{"Name": "John Doe",
"Address": "123 Picket Lane",
"Date": "11/23/2024",
"Ward": "2"},
{"Name": "Jane Plane",
"Address": "456 Fence Field",
"Date": "11/23/2024",
"Ward": "3"},
]} """
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}"
}
}
]
}
]
# processing result through GPT
results = client.chat.completions.create(
model="gpt-4o",
messages=messages,
temperature=0.0,
response_format={"type": "json_object"}
)
# convert json into list
signator_list = json.loads(results.choices[0].message.content)['data']
return signator_list
##
# FUZZY MATCHING FUNCTION
##
def score_function_fuzz(ocr_name, full_name_list):
"""
Outputs the voter record indices of the names that are
closest to `ocr_name`.
"""
# empty dictionary of scores
full_name_score_dict = dict()
for idx in range(len(full_name_list)):
# getting full name for row; ensuring string
name_row = str(full_name_list[idx])
# converting string to lower case to simplify matching
name_row = name_row.lower()
ocr_name = ocr_name.lower()
# compiling scores; writing as between 0 and 1
full_name_score_dict[idx] = fuzz.ratio(ocr_name, name_row)/100
# sorting dictionary
sorted_dictionary = dict(sorted(full_name_score_dict.items(), reverse=True, key=lambda item: item[1]))
# top five key value pairs (indices and scores)
indices_scores_list = list(sorted_dictionary.items())[:5]
return indices_scores_list
##
# DATA UPLOAD AND FULL NAME GENERATION
##
# reading in election data
#voter_records_2023_df = pd.read_csv('raw_feb_23_city_wide.csv', dtype=str)
# creating full name column
#voter_records_2023_df['Full Name'] = voter_records_2023_df.apply(lambda x: f"{x['First_Name']} {x['Last_Name']}", axis=1)
#full_name_list = list(voter_records_2023_df['Full Name'])
##
# STREAMLIT APPLICATION
##
# Using "with" notation
with st.sidebar:
st.write("# Ballot Initiative Project")
# Input field for the OpenAI API key
OPENAI_API_KEY = st.text_input("1. Enter your OpenAI API Key", type="password")
# Check if the API key is provided
if OPENAI_API_KEY:
st.success("API Key received.")
# You can now use the API key in your OpenAI API calls
# For example, you might pass this key to an OpenAI API function
else:
st.warning("Please enter your OpenAI API Key.")
## File Upload
## need to run streamlit run main_app/app.py --server.enableXsrfProtection false
## (From https://discuss.streamlit.io/t/file-upload-error-axioserror-request-failed-with-status-code-500/48169/19?u=mobolaji)
uploaded_file = st.file_uploader("2. Choose a ballot file")
images = None
if uploaded_file is not None:
start_time = time.time()
with st.status("Downloading data...", expanded=True) as status:
st.write("Saving PDF File")
with open('temp_file.pdf', 'wb') as f:
f.write(uploaded_file.getvalue())
st.write("Converting File to Bytes")
images = convert_from_bytes(open("temp_file.pdf", "rb").read())
my_bar = st.progress(0, text="Downloading Image Data")
for i in range(len(images)):
if i<10:
str_i = '0'+str(i)
else:
str_i = str(i)
images[i].save(f"page-{str_i}.jpg")
my_bar.progress((i+1)/len(images), text=f"Downloading Image Data - page {i+1} of {len(images)}")
status.update(label="Download complete!", state="complete", expanded=False)
end_time = time.time()
st.write(f'Download Time: {end_time-start_time:.3f} secs')
# reducing images length for testing purposes
if images:
images = images[:5]
# File uploader for CSV
uploaded_csv = st.file_uploader("3. Choose a voter registration file", type="csv")
# Process CSV if uploaded
if uploaded_csv is not None:
voter_records_2023_df = pd.read_csv(uploaded_csv, dtype=str)
# creating full name column
voter_records_2023_df['Full Name'] = voter_records_2023_df.apply(lambda x: f"{x['First_Name']} {x['Last_Name']}", axis=1)
full_name_list = list(voter_records_2023_df['Full Name'])
# sidebar button for removing images
with st.sidebar:
# remove temporary files
progress_removal_text = "Removal in progress. Please wait."
if images:
if st.button("Remove Temporary Files"):
with st.status("Removing Data...", expanded=True) as status:
removal_bar = st.progress(0, text="Removing Image Files")
os.remove("temp_file.pdf")
for i in range(len(images)):
if i<10:
str_i = '0'+str(i)
else:
str_i = str(i)
os.remove(f"page-{str_i}.jpg")
removal_bar.progress((i+1)/len(images), text="Temporary Image Files Removed")
status.update(label="Removal Complete!", state="complete", expanded=False)
##
# Cross checking database
##
if images:
if st.button("Perform Database Cross Check"):
matching_bar = st.progress(0, text="Performing Name Match")
matched_list = list()
start_time = time.time()
for i in range(len(images)):
if i<10:
str_i = '0'+str(i)
else:
str_i = str(i)
filename = f"page-{str_i}.jpg"
resulting_data = extract_signature_info(filename)
for dict_ in resulting_data:
temp_dict = dict()
high_match_ids = score_function_fuzz(dict_['Name'], full_name_list)
id_, score_ = high_match_ids[0]
temp_dict['OCR NAME'] = str(dict_['Name'])
temp_dict['MATCHED NAME'] = full_name_list[id_]
temp_dict['SCORE'] = score_
temp_dict['VALID'] = False
if score_ > 0.85:
temp_dict['VALID'] = True
matched_list.append(temp_dict)
matching_bar.progress((i+1)/len(images), text=f"Matching OCR Names - page {i+1} of {len(images)}")
## Editable Table
add_df = pd.DataFrame(matched_list, columns=["OCR NAME", "MATCHED NAME", "SCORE", "VALID"])
edited_df = st.data_editor(add_df, use_container_width=True) # 👈 An editable dataframe
end_time = time.time()
st.write(f"OCR and Match Time: {end_time-start_time:.3f} secs")
st.write(f"Number of Matched Records: {sum(list(add_df['VALID']))} out of {len(add_df)}")