scraper / app.py
mylesai's picture
Update app.py
d03716d verified
import requests
import pandas as pd
import gradio as gr
import os
import time
import math
RAPIDAPI_API_KEY = os.environ['RAPIDAPI_API_KEY']
import requests
# Function to scrape Instagram profile
def scrape_instagram(user_name):
url = "https://instagram-scraper-api2.p.rapidapi.com/v1/info"
print(user_name)
querystring = {"username_or_id_or_url": f"{user_name}"}
headers = {
"x-rapidapi-key": f"{RAPIDAPI_API_KEY}",
"x-rapidapi-host": "instagram-scraper-api2.p.rapidapi.com"
}
try:
response = requests.get(url, headers=headers, params=querystring)
response.raise_for_status() # Raise HTTPError for bad responses
response_json = response.json()
if 'data' not in response_json:
print("No data found in response")
return {} # Return an empty dictionary if there is no data in the response
response_data = response_json['data']
print(response_data)
profile_info = {
'bio': response_data.get('biography', ''),
'follower_count': response_data.get('follower_count', 0),
'following_count': response_data.get('following_count', 0),
'bio_links': [item['url'] for item in response_data.get('bio_links', [])],
'full_name': response_data.get('full_name', ''),
'username': response_data.get('username', ''),
'num_posts': response_data.get('media_count', 0),
'profile_id': response_data.get('profile_pic_id', ''),
'email': response_data.get('biography_email', ''),
'badge': response_data.get('account_badges', []),
'category': response_data.get('category', ''),
'phone_number': response_data.get('contact_phone_number', ''),
'city_name': response_data.get('location_data', {}).get('city_name', ''),
'country': '',
'date_joined': ''
}
return profile_info
except requests.exceptions.RequestException as e:
print(f"Request error: {e}")
except requests.exceptions.HTTPError as e:
print(f"HTTP error: {e}")
except requests.exceptions.ConnectionError as e:
print(f"Connection error: {e}")
except requests.exceptions.Timeout as e:
print(f"Timeout error: {e}")
except ValueError as e:
print(f"JSON decode error: {e}")
except KeyError as e:
print(f"Key error: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
return {} # Return an empty dictionary if an error occurs
# Function to populate DataFrame with Instagram information
def get_insta_info(df, progress=gr.Progress()):
# Add new columns to the DataFrame
new_columns = [
'Bio', 'Follower Count', 'Following Count', 'Bio Links', 'Full Name',
'Username', 'Num Posts', 'Profile ID', 'Email', 'Badge', 'Category',
'Phone Number', 'City Name', 'Country', 'Date Joined'
]
for column in new_columns:
if column not in df.columns:
df[column] = ''
links = df['Links'].values
print(links)
for i in progress.tqdm(range(len(links)), desc='Scraping...'):
try:
time.sleep(1) # Simulate delay for scraping
profile_info = scrape_instagram(links[i])
if profile_info: # Only populate if profile_info is not empty
df.at[i, 'Bio'] = profile_info['bio']
df.at[i, 'Follower Count'] = profile_info['follower_count']
df.at[i, 'Following Count'] = profile_info['following_count']
df.at[i, 'Bio Links'] = ', '.join(profile_info['bio_links'])
df.at[i, 'Full Name'] = profile_info['full_name']
df.at[i, 'Username'] = profile_info['username']
df.at[i, 'Num Posts'] = profile_info['num_posts']
df.at[i, 'Profile ID'] = profile_info['profile_id']
df.at[i, 'Email'] = profile_info['email']
df.at[i, 'Badge'] = ', '.join(str(badge) for badge in profile_info['badge'])
df.at[i, 'Category'] = profile_info['category']
df.at[i, 'Phone Number'] = profile_info['phone_number']
df.at[i, 'City Name'] = profile_info['city_name']
df.at[i, 'Country'] = profile_info['country']
df.at[i, 'Date Joined'] = profile_info['date_joined']
except requests.exceptions.RequestException as e:
print(f"Request error for link {links[i]}: {e}")
except ValueError as e:
print(f"JSON decode error for link {links[i]}: {e}")
except KeyError as e:
print(f"Key error for link {links[i]}: {e}")
except Exception as e:
print(f"An unexpected error occurred for link {links[i]}: {e}")
return df
# Function to scrape LinkedIn profiles
def scrape_linkedins(links):
url = "https://linkedin-bulk-data-scraper.p.rapidapi.com/profiles"
headers = {
"x-rapidapi-key": f"{RAPIDAPI_API_KEY}",
"x-rapidapi-host": "linkedin-bulk-data-scraper.p.rapidapi.com",
"Content-Type": "application/json",
"x-rapidapi-user": "usama"
}
# Initialize an empty list to store the dictionaries
profile_info_list = []
chunk_size = 100
# Calculate the number of chunks needed
num_chunks = math.ceil(len(links) / chunk_size)
try:
for i in range(num_chunks):
chunk = links[i * chunk_size:(i + 1) * chunk_size]
payload = {"links": chunk}
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status() # Raise HTTPError for bad responses
data = response.json()
if 'data' not in data:
raise ValueError("Missing 'data' in response")
responses = data['data']
for response_item in responses:
response_data = response_item.get('data', {})
# Use get() method with default empty strings for missing fields
profile_info = {
'link': response_item.get('entry', ''),
'full_name': response_data.get('fullName', ''),
'headline': response_data.get('headline', ''),
'connections': response_data.get('followers', ''), # or 'connections' based on availability
'country': response_data.get('addressCountryOnly', ''),
'address': response_data.get('addressWithoutCountry', ''),
'about': response_data.get('about', ''),
'current_role': (f"{response_data.get('experiences', [{}])[0].get('title', '')} at "
f"{response_data.get('experiences', [{}])[0].get('subtitle', '')}"),
'all_roles': [f"{item.get('title', '')} at {item.get('subtitle', '')}" for item in response_data.get('experiences', [{}])],
'education': (f"{response_data.get('educations', [{}])[0].get('subtitle', '')} at "
f"{response_data.get('educations', [{}])[0].get('title', '')}"),
'all_education': [f"{item.get('subtitle', '')} at {item.get('title', '')}" for item in response_data.get('educations', [{}])]
}
# Append the dictionary to the list
profile_info_list.append(profile_info)
except requests.exceptions.RequestException as e:
print(f"Request error: {e}")
except ValueError as e:
print(f"Value error: {e}")
except KeyError as e:
print(f"Key error: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
return profile_info_list
# Function to populate DataFrame with LinkedIn information
def get_LI_info(df, progress=gr.Progress()):
try:
links = df['Links'].tolist()
profile_info_list = scrape_linkedins(links)
except Exception as e:
print(f"Error scraping LinkedIn profiles: {e}")
return df
# Create a dictionary for quick lookup based on the link
profile_info_dict = {info['link']: info for info in profile_info_list if info}
# Add new columns to the DataFrame
for column in ['Full Name', 'Headline', 'Connections', 'Country', 'Address', 'About', 'Current Role', 'All Roles', 'Most Recent Education', 'All Education']:
if column not in df.columns:
df[column] = ''
# Populate the DataFrame by matching the Link values
for index, row in progress.tqdm(df.iterrows(), desc='Scraping...'):
try:
link = row['Links']
if link in profile_info_dict:
profile_info = profile_info_dict[link]
df.at[index, 'Full Name'] = profile_info.get('full_name', '')
df.at[index, 'Headline'] = profile_info.get('headline', '')
df.at[index, 'Connections'] = profile_info.get('connections', '')
df.at[index, 'Country'] = profile_info.get('country', '')
df.at[index, 'Address'] = profile_info.get('address', '')
df.at[index, 'About'] = profile_info.get('about', '')
df.at[index, 'Current Role'] = profile_info.get('current_role', '')
df.at[index, 'All Roles'] = profile_info.get('all_roles', '')
df.at[index, 'Most Recent Education'] = profile_info.get('education', '')
df.at[index, 'All Education'] = profile_info.get('all_education', '')
else:
print(f"Profile information for link {link} not found.")
except Exception as e:
print(f"Error processing row {index} with link {link}: {e}")
return df
def get_scrape_data(csv_files, social_media, password):
if password != os.environ['DASHBOARD_PASSWORD']:
raise gr.Error('Incorrect Password')
# Initialize an empty list to store DataFrames
dataframes = []
# Read each CSV file and append the DataFrame to the list
for csv_file in csv_files:
df = pd.read_csv(csv_file.name)
dataframes.append(df)
# Concatenate all DataFrames into a single DataFrame
combined_df = pd.concat(dataframes, ignore_index=True)
# Process the combined DataFrame based on the social media platform
if social_media == 'LinkedIn':
output_df = get_LI_info(combined_df)
elif social_media == 'Instagram':
output_df = get_insta_info(combined_df)
print(output_df.head(2))
file_name = f'./{social_media}_output.csv'
output_df.to_csv(file_name)
completion_status = "Done"
return completion_status, gr.DownloadButton(label='Download Scraped Data', value=file_name, visible=True), output_df
with gr.Blocks() as block:
gr.Markdown("""
# Social Media Scraper Dashboard
This dashboard is scrapes data from Linkedin and Instagram \n
Link to Data Analysis Platform: https://ai-data-analyst.streamlit.app/
""")
with gr.Column(visible=True):
password = gr.Textbox(label='Enter Password')
csv_file = gr.File(label='Input CSV File (must be CSV File)', file_count='multiple')
social_media = gr.Radio(choices=['LinkedIn', 'Instagram'], label='Which Social Media?', info = 'Which Social Media do you want to scrape from?')
con_gen_btn = gr.Button('Scrape')
status = gr.Textbox(label='Completion Status')
data = gr.DataFrame(label='Scraped Data')
download_btn = gr.DownloadButton(label='Download Content', visible=False)
con_gen_btn.click(get_scrape_data, inputs=[csv_file, social_media, password], outputs=[status, download_btn, data])
block.queue(default_concurrency_limit=5)
block.launch()