PDL_QUERY / app.py
Aditya Patkar
Added readme and comments
4fad4c6
"""
This is the main app file. It contains the main function of the app.
"""
# imports
import datetime as dt
import pandas as pd
import streamlit as st
from peopledatalabs import PDLPY as pdl
def setup():
"""
Streamlit related setup. This has to be run for each page.
"""
# hide hamburger menu
hide_streamlit_style = """
<style>
#MainMenu {visibility: hidden;}
footer {visibility: hidden;}
</style>
"""
st.markdown(hide_streamlit_style, unsafe_allow_html=True)
def main():
"""
Main function of the app.
"""
setup()
# title, subheader, and description
st.title("People Data Labs Query Tool")
st.subheader("Search for a Person or Company")
st.write(
"""
This tool allows you to search for a person or company using the People Data Labs API. \n
*Guidelines*: \n
- API Key: Your API key from People Data Labs \n
- Type: Person or Company \n
- SQL Query: The SQL query you want to run. Find it on the dashboard \n
- Dataset: The dataset you want to search in. Leave blank for all \n
- Size: The number of results you want to return \n
- Pretty Print: Whether you want the results to be pretty printed.
Default is False \n \n\n
"""
)
# horizontal line and line break
st.markdown("<hr>", unsafe_allow_html=True)
st.markdown("<br>", unsafe_allow_html=True)
# search form, this is a streamlit form
st.subheader("Search")
form = st.form(key="my_form")
api_key = form.text_input(label="API Key")
type_search = form.selectbox(label="Type", options=["Person", "Company"])
sql_query = form.text_area(label="SQL Query")
sql_query = sql_query.strip('"')
dataset = form.text_input(label="Dataset")
# if dataset is empty, set to all
if dataset is None or dataset.strip() == "":
dataset = "all"
size = form.number_input(label="Size", min_value=1, max_value=1000, value=10)
pretty = form.checkbox(label="Pretty Print")
submit_button = form.form_submit_button(label="Submit")
# if submit button is clicked
if submit_button:
# check if api key and sql query are not empty
if api_key is None or api_key.strip() == "":
st.error("API Key is required!")
elif sql_query is None or sql_query.strip() == "":
st.error("SQL Query is required!")
else:
# if all is good, run the query
params = {
"sql": sql_query,
"dataset": dataset,
"size": size,
"pretty": pretty,
}
client = pdl(api_key=api_key)
# select the client based on the type of search
if type_search == "Person":
response = client.person.search(**params).json()
elif type_search == "Company":
response = client.company.search(**params).json()
# if status is 200, show the data
if response["status"] == 200:
st.success(f"Found {response['total']} records for this search.")
data = response["data"]
# convert json to csv. Json is list of dicts
data_frame = pd.DataFrame(data)
csv = data_frame.to_csv(index=False)
# show data
st.markdown("<hr>", unsafe_allow_html=True)
st.markdown("<br>", unsafe_allow_html=True)
st.subheader("Data")
now = dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# download button
st.download_button(
label="Download CSV",
data=csv,
file_name=f"{type_search}-data-{now}.csv",
)
st.dataframe(data_frame)
# handle errors
else:
st.error("Error retrieving data!")
st.error("Error:")
st.write(response["error"])
# run the app
if __name__ == "__main__":
main()