IPGET / app.py
Rfym21's picture
Create app.py
ce1a6aa verified
from flask import Flask, render_template
import requests
import time
from apscheduler.schedulers.background import BackgroundScheduler
from concurrent.futures import ThreadPoolExecutor, as_completed
app = Flask(__name__)
# Global list to store working proxies with geolocation data
working_proxies = []
# Function to get the country for a given IP
def get_country(ip):
try:
response = requests.get(f"http://ip-api.com/json/{ip}")
data = response.json()
return data.get("country", "Unknown")
except Exception:
return "Unknown"
# Function to check if a proxy is working and fetch its country
def check_proxy(proxy):
start_time = time.perf_counter()
try:
with requests.Session() as session:
response = session.head("https://www.google.com/", proxies={'http': proxy, 'https': proxy}, timeout=1) # Reduced timeout to 1 second
if response.status_code == 200:
elapsed_time = time.perf_counter() - start_time
ip = proxy.split(':')[0]
country = get_country(ip)
return proxy, elapsed_time, country
except Exception:
return None
# Function to fetch and validate proxies
def fetch_and_check_proxies():
global working_proxies
try:
print("Fetching proxies...")
resp = requests.get("https://api.proxyscrape.com/v2/?request=displayproxies&protocol=http&timeout=10000&country=all&ssl=all&anonymity=all")
proxies = [proxy.strip() for proxy in resp.text.strip().split("\n") if proxy.strip()]
print(f"Found {len(proxies)} proxies. Checking their validity...")
temp_working_proxies = []
with ThreadPoolExecutor(max_workers=200) as executor:
futures = {executor.submit(check_proxy, proxy): proxy for proxy in proxies}
for future in as_completed(futures):
result = future.result()
if result:
temp_working_proxies.append(result)
# Sort proxies by response time and update the global list
temp_working_proxies.sort(key=lambda x: x[1])
working_proxies = temp_working_proxies
print(f"Found {len(working_proxies)} working proxies.")
except Exception as e:
print(f"Error occurred: {e}")
# Schedule the proxy fetching every 10 minutes (can be adjusted as needed)
scheduler = BackgroundScheduler()
scheduler.add_job(func=fetch_and_check_proxies, trigger="interval", minutes=10)
scheduler.start()
# Route to display the working proxies
@app.route('/')
def home():
return render_template('index.html', proxies=working_proxies)
if __name__ == "__main__":
app.run(host='0.0.0.0', port=5000)