stocks / core /openinsider_scraper.py
Arrechenash's picture
Initial Commit
54cf8fd
"""OpenInsider scraper - Real-time insider trading data."""
import logging
import ssl
from datetime import datetime
from urllib.request import Request, urlopen
logger = logging.getLogger(__name__)
class OpenInsiderScraper:
"""Scrape insider trading data from OpenInsider.com."""
def __init__(self):
self.base_url = "http://openinsider.com"
self.headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
}
def fetch_insider_trades(self, symbol: str, days: int = 90) -> dict:
"""
Fetch insider trading data from OpenInsider.
Args:
symbol: Stock ticker symbol
days: Number of days to look back (OpenInsider shows recent trades)
Returns:
Dict with insider trading summary and transactions.
"""
try:
# OpenInsider doesn't have symbol-specific pages, so we fetch recent trades
# and filter by symbol
url = f"{self.base_url}/screener"
req = Request(url, headers=self.headers)
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
with urlopen(req, timeout=10, context=ssl_context) as resp:
html = resp.read().decode()
# Parse insider trades from HTML table
transactions = []
buys = 0
sells = 0
buy_shares = 0
sell_shares = 0
buy_value = 0.0
sell_value = 0.0
# Look for table rows with insider trade data
# OpenInsider HTML structure: table with class "data"
import re
# Find all rows in the insider trades table
# Pattern matches table rows with trade data
row_pattern = r"<tr[^>]*>.*?</tr>"
rows = re.findall(row_pattern, html, re.DOTALL)
for row in rows:
# Check if this row contains our symbol
if symbol.upper() not in row.upper():
continue
# Extract trade data from row
cells = re.findall(r"<td[^>]*>(.*?)</td>", row, re.DOTALL)
if len(cells) < 10:
continue
try:
# Parse cells (structure varies, adjust as needed)
# Typical: Date, Insider, Title, Company, Ticker, Trade Type, Shares, Price, Value
# Clean HTML from cells
clean_cells = []
for cell in cells:
# Remove HTML tags
clean = re.sub(r"<[^>]+>", "", cell)
clean = clean.strip()
clean_cells.append(clean)
# Extract relevant fields
ticker = clean_cells[4] if len(clean_cells) > 4 else ""
if ticker.upper() != symbol.upper():
continue
trade_type = clean_cells[5] if len(clean_cells) > 5 else ""
shares_str = clean_cells[6] if len(clean_cells) > 6 else "0"
price_str = clean_cells[7] if len(clean_cells) > 7 else "0"
value_str = clean_cells[8] if len(clean_cells) > 8 else "0"
# Parse numbers (remove commas, $)
shares = int(re.sub(r"[,$]", "", shares_str) or 0)
price = float(re.sub(r"[,$]", "", price_str) or 0)
value = float(re.sub(r"[,$]", "", value_str) or 0)
# Determine if buy or sell
is_buy = "P" in trade_type.upper() or "PURCHASE" in trade_type.upper()
is_sale = "S" in trade_type.upper() or "SALE" in trade_type.upper()
transaction = {
"date": clean_cells[0] if len(clean_cells) > 0 else "",
"insider": clean_cells[1] if len(clean_cells) > 1 else "",
"title": clean_cells[2] if len(clean_cells) > 2 else "",
"company": clean_cells[3] if len(clean_cells) > 3 else "",
"ticker": ticker,
"trade_type": trade_type,
"shares": shares,
"price": price,
"value": value,
"is_buy": is_buy,
"is_sale": is_sale,
"source": "openinsider",
}
transactions.append(transaction)
if is_buy:
buys += 1
buy_shares += shares
buy_value += value
elif is_sale:
sells += 1
sell_shares += shares
sell_value += value
except Exception as e:
logger.debug(f"Error parsing OpenInsider row: {e}")
continue
# Sort transactions by date descending
transactions.sort(key=lambda x: x["date"], reverse=True)
return {
"symbol": symbol,
"period_days": days,
"buys": buys,
"sells": sells,
"net": buys - sells,
"buy_shares": buy_shares,
"sell_shares": sell_shares,
"buy_value": buy_value,
"sell_value": sell_value,
"net_value": buy_value - sell_value,
"transactions": transactions[:20], # Limit to 20 most recent
"source": "openinsider",
"last_updated": datetime.now().isoformat(),
}
except Exception as e:
logger.error(f"Error fetching OpenInsider trades for {symbol}: {e}")
return {
"symbol": symbol,
"period_days": days,
"buys": 0,
"sells": 0,
"net": 0,
"buy_shares": 0,
"sell_shares": 0,
"buy_value": 0.0,
"sell_value": 0.0,
"net_value": 0.0,
"transactions": [],
"source": "error",
"last_updated": datetime.now().isoformat(),
}
def fetch_latest_cluster_buys(self, limit: int = 20) -> list[dict]:
"""
Fetch latest cluster buys (multiple insiders buying) from OpenInsider.
Args:
limit: Maximum number of cluster buys to return
Returns:
List of symbols with cluster buying activity.
"""
try:
url = f"{self.base_url}/cluster-buy"
req = Request(url, headers=self.headers)
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
with urlopen(req, timeout=10, context=ssl_context) as resp:
html = resp.read().decode()
# Parse cluster buys from HTML
cluster_buys = []
# Simplified parsing - production would use proper HTML parser
import re
# Look for table rows with cluster buy data
row_pattern = r"<tr[^>]*>.*?</tr>"
rows = re.findall(row_pattern, html, re.DOTALL)
for row in rows[:limit]:
cells = re.findall(r"<td[^>]*>(.*?)</td>", row, re.DOTALL)
if len(cells) < 5:
continue
clean_cells = [re.sub(r"<[^>]+>", "", c).strip() for c in cells]
cluster_buys.append(
{
"symbol": clean_cells[0] if len(clean_cells) > 0 else "",
"company": clean_cells[1] if len(clean_cells) > 1 else "",
"insiders": int(clean_cells[2]) if len(clean_cells) > 2 and clean_cells[2].isdigit() else 0,
"total_shares": int(re.sub(r"[,$]", "", clean_cells[3]) or 0) if len(clean_cells) > 3 else 0,
"total_value": float(re.sub(r"[,$]", "", clean_cells[4]) or 0) if len(clean_cells) > 4 else 0,
"source": "openinsider",
}
)
return cluster_buys
except Exception as e:
logger.error(f"Error fetching OpenInsider cluster buys: {e}")
return []
def fetch_latest_ceo_trades(self, limit: int = 20) -> list[dict]:
"""
Fetch latest CEO trades from OpenInsider.
Args:
limit: Maximum number of CEO trades to return
Returns:
List of CEO insider trades.
"""
try:
url = f"{self.base_url}/ceo-trades"
req = Request(url, headers=self.headers)
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
with urlopen(req, timeout=10, context=ssl_context) as resp:
html = resp.read().decode()
# Parse CEO trades from HTML
ceo_trades = []
import re
row_pattern = r"<tr[^>]*>.*?</tr>"
rows = re.findall(row_pattern, html, re.DOTALL)
for row in rows[:limit]:
cells = re.findall(r"<td[^>]*>(.*?)</td>", row, re.DOTALL)
if len(cells) < 8:
continue
clean_cells = [re.sub(r"<[^>]+>", "", c).strip() for c in cells]
shares_str = clean_cells[6] if len(clean_cells) > 6 else "0"
price_str = clean_cells[7] if len(clean_cells) > 7 else "0"
trade_type = clean_cells[5] if len(clean_cells) > 5 else ""
is_buy = "P" in trade_type.upper()
ceo_trades.append(
{
"symbol": clean_cells[0] if len(clean_cells) > 0 else "",
"company": clean_cells[1] if len(clean_cells) > 1 else "",
"ceo_name": clean_cells[2] if len(clean_cells) > 2 else "",
"title": clean_cells[3] if len(clean_cells) > 3 else "",
"trade_type": trade_type,
"shares": int(re.sub(r"[,$]", "", shares_str) or 0),
"price": float(re.sub(r"[,$]", "", price_str) or 0),
"value": int(re.sub(r"[,$]", "", shares_str) or 0) * float(re.sub(r"[,$]", "", price_str) or 0),
"is_buy": is_buy,
"source": "openinsider",
}
)
return ceo_trades
except Exception as e:
logger.error(f"Error fetching OpenInsider CEO trades: {e}")
return []