import requests import pandas as pd import gradio as gr import os import time import math RAPIDAPI_API_KEY = os.environ['RAPIDAPI_API_KEY'] import requests # Function to scrape Instagram profile def scrape_instagram(user_name): url = "https://instagram-scraper-api2.p.rapidapi.com/v1/info" print(user_name) querystring = {"username_or_id_or_url": f"{user_name}"} headers = { "x-rapidapi-key": f"{RAPIDAPI_API_KEY}", "x-rapidapi-host": "instagram-scraper-api2.p.rapidapi.com" } try: response = requests.get(url, headers=headers, params=querystring) response.raise_for_status() # Raise HTTPError for bad responses response_json = response.json() if 'data' not in response_json: print("No data found in response") return {} # Return an empty dictionary if there is no data in the response response_data = response_json['data'] print(response_data) profile_info = { 'bio': response_data.get('biography', ''), 'follower_count': response_data.get('follower_count', 0), 'following_count': response_data.get('following_count', 0), 'bio_links': [item['url'] for item in response_data.get('bio_links', [])], 'full_name': response_data.get('full_name', ''), 'username': response_data.get('username', ''), 'num_posts': response_data.get('media_count', 0), 'profile_id': response_data.get('profile_pic_id', ''), 'email': response_data.get('biography_email', ''), 'badge': response_data.get('account_badges', []), 'category': response_data.get('category', ''), 'phone_number': response_data.get('contact_phone_number', ''), 'city_name': response_data.get('location_data', {}).get('city_name', ''), 'country': '', 'date_joined': '' } return profile_info except requests.exceptions.RequestException as e: print(f"Request error: {e}") except requests.exceptions.HTTPError as e: print(f"HTTP error: {e}") except requests.exceptions.ConnectionError as e: print(f"Connection error: {e}") except requests.exceptions.Timeout as e: print(f"Timeout error: {e}") except ValueError as e: print(f"JSON decode error: {e}") except KeyError as e: print(f"Key error: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") return {} # Return an empty dictionary if an error occurs # Function to populate DataFrame with Instagram information def get_insta_info(df, progress=gr.Progress()): # Add new columns to the DataFrame new_columns = [ 'Bio', 'Follower Count', 'Following Count', 'Bio Links', 'Full Name', 'Username', 'Num Posts', 'Profile ID', 'Email', 'Badge', 'Category', 'Phone Number', 'City Name', 'Country', 'Date Joined' ] for column in new_columns: if column not in df.columns: df[column] = '' links = df['Links'].values print(links) for i in progress.tqdm(range(len(links)), desc='Scraping...'): try: time.sleep(1) # Simulate delay for scraping profile_info = scrape_instagram(links[i]) if profile_info: # Only populate if profile_info is not empty df.at[i, 'Bio'] = profile_info['bio'] df.at[i, 'Follower Count'] = profile_info['follower_count'] df.at[i, 'Following Count'] = profile_info['following_count'] df.at[i, 'Bio Links'] = ', '.join(profile_info['bio_links']) df.at[i, 'Full Name'] = profile_info['full_name'] df.at[i, 'Username'] = profile_info['username'] df.at[i, 'Num Posts'] = profile_info['num_posts'] df.at[i, 'Profile ID'] = profile_info['profile_id'] df.at[i, 'Email'] = profile_info['email'] df.at[i, 'Badge'] = ', '.join(str(badge) for badge in profile_info['badge']) df.at[i, 'Category'] = profile_info['category'] df.at[i, 'Phone Number'] = profile_info['phone_number'] df.at[i, 'City Name'] = profile_info['city_name'] df.at[i, 'Country'] = profile_info['country'] df.at[i, 'Date Joined'] = profile_info['date_joined'] except requests.exceptions.RequestException as e: print(f"Request error for link {links[i]}: {e}") except ValueError as e: print(f"JSON decode error for link {links[i]}: {e}") except KeyError as e: print(f"Key error for link {links[i]}: {e}") except Exception as e: print(f"An unexpected error occurred for link {links[i]}: {e}") return df # Function to scrape LinkedIn profiles def scrape_linkedins(links): url = "https://linkedin-bulk-data-scraper.p.rapidapi.com/profiles" headers = { "x-rapidapi-key": f"{RAPIDAPI_API_KEY}", "x-rapidapi-host": "linkedin-bulk-data-scraper.p.rapidapi.com", "Content-Type": "application/json", "x-rapidapi-user": "usama" } # Initialize an empty list to store the dictionaries profile_info_list = [] chunk_size = 100 # Calculate the number of chunks needed num_chunks = math.ceil(len(links) / chunk_size) try: for i in range(num_chunks): chunk = links[i * chunk_size:(i + 1) * chunk_size] payload = {"links": chunk} response = requests.post(url, json=payload, headers=headers) response.raise_for_status() # Raise HTTPError for bad responses data = response.json() if 'data' not in data: raise ValueError("Missing 'data' in response") responses = data['data'] for response_item in responses: response_data = response_item.get('data', {}) # Use get() method with default empty strings for missing fields profile_info = { 'link': response_item.get('entry', ''), 'full_name': response_data.get('fullName', ''), 'headline': response_data.get('headline', ''), 'connections': response_data.get('followers', ''), # or 'connections' based on availability 'country': response_data.get('addressCountryOnly', ''), 'address': response_data.get('addressWithoutCountry', ''), 'about': response_data.get('about', ''), 'current_role': (f"{response_data.get('experiences', [{}])[0].get('title', '')} at " f"{response_data.get('experiences', [{}])[0].get('subtitle', '')}"), 'all_roles': [f"{item.get('title', '')} at {item.get('subtitle', '')}" for item in response_data.get('experiences', [{}])], 'education': (f"{response_data.get('educations', [{}])[0].get('subtitle', '')} at " f"{response_data.get('educations', [{}])[0].get('title', '')}"), 'all_education': [f"{item.get('subtitle', '')} at {item.get('title', '')}" for item in response_data.get('educations', [{}])] } # Append the dictionary to the list profile_info_list.append(profile_info) except requests.exceptions.RequestException as e: print(f"Request error: {e}") except ValueError as e: print(f"Value error: {e}") except KeyError as e: print(f"Key error: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") return profile_info_list # Function to populate DataFrame with LinkedIn information def get_LI_info(df, progress=gr.Progress()): try: links = df['Links'].tolist() profile_info_list = scrape_linkedins(links) except Exception as e: print(f"Error scraping LinkedIn profiles: {e}") return df # Create a dictionary for quick lookup based on the link profile_info_dict = {info['link']: info for info in profile_info_list if info} # Add new columns to the DataFrame for column in ['Full Name', 'Headline', 'Connections', 'Country', 'Address', 'About', 'Current Role', 'All Roles', 'Most Recent Education', 'All Education']: if column not in df.columns: df[column] = '' # Populate the DataFrame by matching the Link values for index, row in progress.tqdm(df.iterrows(), desc='Scraping...'): try: link = row['Links'] if link in profile_info_dict: profile_info = profile_info_dict[link] df.at[index, 'Full Name'] = profile_info.get('full_name', '') df.at[index, 'Headline'] = profile_info.get('headline', '') df.at[index, 'Connections'] = profile_info.get('connections', '') df.at[index, 'Country'] = profile_info.get('country', '') df.at[index, 'Address'] = profile_info.get('address', '') df.at[index, 'About'] = profile_info.get('about', '') df.at[index, 'Current Role'] = profile_info.get('current_role', '') df.at[index, 'All Roles'] = profile_info.get('all_roles', '') df.at[index, 'Most Recent Education'] = profile_info.get('education', '') df.at[index, 'All Education'] = profile_info.get('all_education', '') else: print(f"Profile information for link {link} not found.") except Exception as e: print(f"Error processing row {index} with link {link}: {e}") return df def get_scrape_data(csv_files, social_media, password): if password != os.environ['DASHBOARD_PASSWORD']: raise gr.Error('Incorrect Password') # Initialize an empty list to store DataFrames dataframes = [] # Read each CSV file and append the DataFrame to the list for csv_file in csv_files: df = pd.read_csv(csv_file.name) dataframes.append(df) # Concatenate all DataFrames into a single DataFrame combined_df = pd.concat(dataframes, ignore_index=True) # Process the combined DataFrame based on the social media platform if social_media == 'LinkedIn': output_df = get_LI_info(combined_df) elif social_media == 'Instagram': output_df = get_insta_info(combined_df) print(output_df.head(2)) file_name = f'./{social_media}_output.csv' output_df.to_csv(file_name) completion_status = "Done" return completion_status, gr.DownloadButton(label='Download Scraped Data', value=file_name, visible=True), output_df with gr.Blocks() as block: gr.Markdown(""" # Social Media Scraper Dashboard This dashboard is scrapes data from Linkedin and Instagram \n Link to Data Analysis Platform: https://ai-data-analyst.streamlit.app/ """) with gr.Column(visible=True): password = gr.Textbox(label='Enter Password') csv_file = gr.File(label='Input CSV File (must be CSV File)', file_count='multiple') social_media = gr.Radio(choices=['LinkedIn', 'Instagram'], label='Which Social Media?', info = 'Which Social Media do you want to scrape from?') con_gen_btn = gr.Button('Scrape') status = gr.Textbox(label='Completion Status') data = gr.DataFrame(label='Scraped Data') download_btn = gr.DownloadButton(label='Download Content', visible=False) con_gen_btn.click(get_scrape_data, inputs=[csv_file, social_media, password], outputs=[status, download_btn, data]) block.queue(default_concurrency_limit=5) block.launch()