Spaces:
Build error
Build error
| import streamlit as st | |
| import pandas as pd | |
| import base64 | |
| from rapidfuzz import fuzz | |
| import time | |
| import os | |
| import json | |
| from pdf2image import convert_from_bytes | |
| from dotenv import load_dotenv | |
| from openai import OpenAI | |
| # loading environmental variables | |
| load_dotenv('.env', override=True) | |
| # define your open AI API key here; Remember this is a personal notebook! Don't push your API key to the remote repo | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| ####### | |
| # OCR # | |
| ####### | |
| # Function is needed to put image in proper format for uploading | |
| # From: https://stackoverflow.com/questions/77284901/upload-an-image-to-chat-gpt-using-the-api | |
| def encode_image(image_path): | |
| with open(image_path, "rb") as image_file: | |
| return base64.b64encode(image_file.read()).decode('utf-8') | |
| def extract_signature_info(image_path): | |
| """ | |
| Extracts names and addresses from single ballot image. | |
| """ | |
| # Getting the base64 string | |
| base64_image = encode_image(image_path) | |
| # open AI client definition | |
| client = OpenAI(api_key= OPENAI_API_KEY) | |
| # prompt message | |
| messages = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "text", | |
| "text": """The text in the image is fake data from made up individuals. It is constructed as an exercise on performing OCR. Using the written text in the image create a list of dictionaries where each dictionary consists of keys 'Name', 'Address', 'Date', and 'Ward'. Fill in the values of each dictionary with the correct entries for each key. Write all the values of the dictionary in full. Only output the list of dictionaries. No other intro text is necessary. The output should be in JSON format, and look like | |
| {'data': [{"Name": "John Doe", | |
| "Address": "123 Picket Lane", | |
| "Date": "11/23/2024", | |
| "Ward": "2"}, | |
| {"Name": "Jane Plane", | |
| "Address": "456 Fence Field", | |
| "Date": "11/23/2024", | |
| "Ward": "3"}, | |
| ]} """ | |
| }, | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:image/jpeg;base64,{base64_image}" | |
| } | |
| } | |
| ] | |
| } | |
| ] | |
| # processing result through GPT | |
| results = client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=messages, | |
| temperature=0.0, | |
| response_format={"type": "json_object"} | |
| ) | |
| # convert json into list | |
| signator_list = json.loads(results.choices[0].message.content)['data'] | |
| return signator_list | |
| ## | |
| # FUZZY MATCHING FUNCTION | |
| ## | |
| def score_function_fuzz(ocr_name, full_name_list): | |
| """ | |
| Outputs the voter record indices of the names that are | |
| closest to `ocr_name`. | |
| """ | |
| # empty dictionary of scores | |
| full_name_score_dict = dict() | |
| for idx in range(len(full_name_list)): | |
| # getting full name for row; ensuring string | |
| name_row = str(full_name_list[idx]) | |
| # converting string to lower case to simplify matching | |
| name_row = name_row.lower() | |
| ocr_name = ocr_name.lower() | |
| # compiling scores; writing as between 0 and 1 | |
| full_name_score_dict[idx] = fuzz.ratio(ocr_name, name_row)/100 | |
| # sorting dictionary | |
| sorted_dictionary = dict(sorted(full_name_score_dict.items(), reverse=True, key=lambda item: item[1])) | |
| # top five key value pairs (indices and scores) | |
| indices_scores_list = list(sorted_dictionary.items())[:5] | |
| return indices_scores_list | |
| ## | |
| # DATA UPLOAD AND FULL NAME GENERATION | |
| ## | |
| # reading in election data | |
| #voter_records_2023_df = pd.read_csv('raw_feb_23_city_wide.csv', dtype=str) | |
| # creating full name column | |
| #voter_records_2023_df['Full Name'] = voter_records_2023_df.apply(lambda x: f"{x['First_Name']} {x['Last_Name']}", axis=1) | |
| #full_name_list = list(voter_records_2023_df['Full Name']) | |
| ## | |
| # STREAMLIT APPLICATION | |
| ## | |
| # Using "with" notation | |
| with st.sidebar: | |
| st.write("# Ballot Initiative Project") | |
| # Input field for the OpenAI API key | |
| OPENAI_API_KEY = st.text_input("1. Enter your OpenAI API Key", type="password") | |
| # Check if the API key is provided | |
| if OPENAI_API_KEY: | |
| st.success("API Key received.") | |
| # You can now use the API key in your OpenAI API calls | |
| # For example, you might pass this key to an OpenAI API function | |
| else: | |
| st.warning("Please enter your OpenAI API Key.") | |
| ## File Upload | |
| ## need to run streamlit run main_app/app.py --server.enableXsrfProtection false | |
| ## (From https://discuss.streamlit.io/t/file-upload-error-axioserror-request-failed-with-status-code-500/48169/19?u=mobolaji) | |
| uploaded_file = st.file_uploader("2. Choose a ballot file") | |
| images = None | |
| if uploaded_file is not None: | |
| start_time = time.time() | |
| with st.status("Downloading data...", expanded=True) as status: | |
| st.write("Saving PDF File") | |
| with open('temp_file.pdf', 'wb') as f: | |
| f.write(uploaded_file.getvalue()) | |
| st.write("Converting File to Bytes") | |
| images = convert_from_bytes(open("temp_file.pdf", "rb").read()) | |
| my_bar = st.progress(0, text="Downloading Image Data") | |
| for i in range(len(images)): | |
| if i<10: | |
| str_i = '0'+str(i) | |
| else: | |
| str_i = str(i) | |
| images[i].save(f"page-{str_i}.jpg") | |
| my_bar.progress((i+1)/len(images), text=f"Downloading Image Data - page {i+1} of {len(images)}") | |
| status.update(label="Download complete!", state="complete", expanded=False) | |
| end_time = time.time() | |
| st.write(f'Download Time: {end_time-start_time:.3f} secs') | |
| # reducing images length for testing purposes | |
| if images: | |
| images = images[:5] | |
| # File uploader for CSV | |
| uploaded_csv = st.file_uploader("3. Choose a voter registration file", type="csv") | |
| # Process CSV if uploaded | |
| if uploaded_csv is not None: | |
| voter_records_2023_df = pd.read_csv(uploaded_csv, dtype=str) | |
| # creating full name column | |
| voter_records_2023_df['Full Name'] = voter_records_2023_df.apply(lambda x: f"{x['First_Name']} {x['Last_Name']}", axis=1) | |
| full_name_list = list(voter_records_2023_df['Full Name']) | |
| # sidebar button for removing images | |
| with st.sidebar: | |
| # remove temporary files | |
| progress_removal_text = "Removal in progress. Please wait." | |
| if images: | |
| if st.button("Remove Temporary Files"): | |
| with st.status("Removing Data...", expanded=True) as status: | |
| removal_bar = st.progress(0, text="Removing Image Files") | |
| os.remove("temp_file.pdf") | |
| for i in range(len(images)): | |
| if i<10: | |
| str_i = '0'+str(i) | |
| else: | |
| str_i = str(i) | |
| os.remove(f"page-{str_i}.jpg") | |
| removal_bar.progress((i+1)/len(images), text="Temporary Image Files Removed") | |
| status.update(label="Removal Complete!", state="complete", expanded=False) | |
| ## | |
| # Cross checking database | |
| ## | |
| if images: | |
| if st.button("Perform Database Cross Check"): | |
| matching_bar = st.progress(0, text="Performing Name Match") | |
| matched_list = list() | |
| start_time = time.time() | |
| for i in range(len(images)): | |
| if i<10: | |
| str_i = '0'+str(i) | |
| else: | |
| str_i = str(i) | |
| filename = f"page-{str_i}.jpg" | |
| resulting_data = extract_signature_info(filename) | |
| for dict_ in resulting_data: | |
| temp_dict = dict() | |
| high_match_ids = score_function_fuzz(dict_['Name'], full_name_list) | |
| id_, score_ = high_match_ids[0] | |
| temp_dict['OCR NAME'] = str(dict_['Name']) | |
| temp_dict['MATCHED NAME'] = full_name_list[id_] | |
| temp_dict['SCORE'] = score_ | |
| temp_dict['VALID'] = False | |
| if score_ > 0.85: | |
| temp_dict['VALID'] = True | |
| matched_list.append(temp_dict) | |
| matching_bar.progress((i+1)/len(images), text=f"Matching OCR Names - page {i+1} of {len(images)}") | |
| ## Editable Table | |
| add_df = pd.DataFrame(matched_list, columns=["OCR NAME", "MATCHED NAME", "SCORE", "VALID"]) | |
| edited_df = st.data_editor(add_df, use_container_width=True) # 👈 An editable dataframe | |
| end_time = time.time() | |
| st.write(f"OCR and Match Time: {end_time-start_time:.3f} secs") | |
| st.write(f"Number of Matched Records: {sum(list(add_df['VALID']))} out of {len(add_df)}") |