buildscout / services /data.py
Chest Pain
v1.95: applicant info columns, Google search links, job type filter, fix all filters
92e9edc
# services/data.py
from __future__ import annotations
import time
import concurrent.futures
from datetime import datetime, timedelta
from typing import Any, Dict, List, Tuple, Optional
import requests
import pandas as pd
from config import (
SOCRATA_APP_TOKEN,
ALLOWED_BOROUGHS,
DEFAULT_DAYS_WINDOW,
)
# ---------- Socrata endpoints ----------
DATASET_URLS: Dict[str, str] = {
"job_filings": "https://data.cityofnewyork.us/resource/w9ak-ipjd.json",
"permit_issuance": "https://data.cityofnewyork.us/resource/rbx6-tga4.json",
"electrical_permits": "https://data.cityofnewyork.us/resource/dm9a-ab7w.json",
# Stalled construction complaints (official DOB dataset)
"stalled_official": "https://data.cityofnewyork.us/resource/i296-73x5.json",
# BIS Job Application Filings (legacy system - for finding dormant jobs)
"bis_job_filings": "https://data.cityofnewyork.us/resource/ic3t-wcy2.json",
# Distressed properties sources
"hpd_vacate_orders": "https://data.cityofnewyork.us/resource/tb8q-a3ar.json",
"dob_ecb_violations": "https://data.cityofnewyork.us/resource/6bgk-3dad.json",
"vacant_unsecured": "https://data.cityofnewyork.us/resource/br7h-6m8v.json",
"dob_complaints": "https://data.cityofnewyork.us/resource/eabe-havv.json",
}
# Per dataset core field map
DATASET_FIELD_MAP: Dict[str, Dict[str, str]] = {
"job_filings": {
"filing_date": "filing_date",
"borough": "borough",
"house_no": "house_no",
"street_name": "street_name",
"zip": "zip",
"job_id": "job_filing_number",
"job_status": "filing_status",
"job_type": "job_type",
"desc": "job_description",
},
"permit_issuance": {
"filing_date": "approved_date",
"borough": "borough",
"house_no": "house__",
"street_name": "street_name",
"zip": "zip_code",
"job_id": "job__",
"permit_type": "permittee_s_license_type",
"desc": "job_description",
},
"electrical_permits": {
"filing_date": "filing_date",
"borough": "borough",
"house_no": "house_number",
"street_name": "street_name",
"zip": "zip_code",
"job_id": "job_filing_number",
"job_status": "filing_status",
},
}
# ---------- Simple in-memory cache ----------
_cache: Dict[str, Tuple[pd.DataFrame, datetime]] = {}
CACHE_TTL_MINUTES = 10
def _get_cached(key: str) -> Optional[pd.DataFrame]:
entry = _cache.get(key)
if not entry:
return None
df, cached_at = entry
if datetime.now() - cached_at < timedelta(minutes=CACHE_TTL_MINUTES):
print(f"[cache] Using cached data for {key}")
return df.copy()
del _cache[key]
return None
def _set_cached(key: str, df: pd.DataFrame) -> None:
_cache[key] = (df.copy(), datetime.now())
# ---------- helpers ----------
def _headers() -> Dict[str, str]:
"""
Build headers for Socrata API requests.
SODA3 requires authentication via app token for all requests.
"""
h: Dict[str, str] = {
"Accept": "application/json",
}
if SOCRATA_APP_TOKEN:
h["X-App-Token"] = SOCRATA_APP_TOKEN
return h
def _request(url: str, params: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Make a request to the Socrata API.
Handles both SODA2 and SODA3 endpoints.
"""
headers = _headers()
# Log if no token (will likely fail on SODA3)
if "X-App-Token" not in headers:
print("⚠️ No SOCRATA_APP_TOKEN - request may be throttled or rejected")
try:
r = requests.get(url, headers=headers, params=params, timeout=60)
except requests.exceptions.Timeout:
raise RuntimeError(f"API request timed out for {url}")
except requests.exceptions.RequestException as e:
raise RuntimeError(f"API request failed: {e}")
if r.status_code == 403:
raise RuntimeError(
f"API returned 403 Forbidden. This likely means:\n"
f" 1. SOCRATA_APP_TOKEN is missing or invalid\n"
f" 2. The dataset requires authentication\n"
f" URL: {url}\n"
f" Response: {r.text[:200]}"
)
elif r.status_code == 429:
raise RuntimeError(
f"API rate limit exceeded (429). Set SOCRATA_APP_TOKEN for higher limits.\n"
f" URL: {url}"
)
elif r.status_code != 200:
raise RuntimeError(f"API request failed: {r.status_code} {r.text[:500]}")
return r.json()
def _to_dt_naive(series: pd.Series) -> pd.Series:
s = pd.to_datetime(series, errors="coerce", utc=True)
return s.dt.tz_localize(None)
def _norm_borough(series: pd.Series) -> pd.Series:
m = {
"MN": "MANHATTAN",
"BX": "BRONX",
"BK": "BROOKLYN",
"QN": "QUEENS",
"SI": "STATEN ISLAND",
"1": "MANHATTAN",
"2": "BRONX",
"3": "BROOKLYN",
"4": "QUEENS",
"5": "STATEN ISLAND",
}
return series.astype(str).str.strip().str.upper().map(lambda x: m.get(x, x))
def _full_address(
df: pd.DataFrame,
house_col: str,
street_col: str,
borough_col: str,
zip_col: str | None,
) -> pd.Series:
def join(row):
parts = []
h = str(row.get(house_col, "") or "").strip()
s = str(row.get(street_col, "") or "").strip()
b = str(row.get(borough_col, "") or "").strip()
z = str(row.get(zip_col, "") or "").strip() if zip_col else ""
if h:
parts.append(h)
if s:
parts.append(s)
if b:
parts.append(b)
if z:
parts.append(z)
return ", ".join(p for p in parts if p)
return df.apply(join, axis=1)
def _days_ago_cutoff(days: int) -> Tuple[pd.Timestamp, str]:
now = pd.Timestamp.utcnow().tz_localize(None)
cutoff = now - pd.Timedelta(days=days)
cutoff_iso = (cutoff.tz_localize("UTC").isoformat()).replace("+00:00", "Z")
return cutoff, cutoff_iso
def _job_base(job_filing_number: str) -> str:
if not isinstance(job_filing_number, str):
return ""
return job_filing_number.split("-", 1)[0].strip()
def _fetch_page_parallel(
url: str,
params: Dict[str, Any],
page: int,
offset: int,
) -> Tuple[List[Dict[str, Any]], int, float]:
params_copy = params.copy()
params_copy["$offset"] = offset
t0 = time.time()
rows = _request(url, params_copy)
return rows, page, time.time() - t0
def _applicant_search_url(df: pd.DataFrame) -> pd.Series:
"""Build a Google search URL for each applicant to help find contact info."""
import urllib.parse
def make_url(row):
first = str(row.get("applicant_first_name") or "").strip()
last = str(row.get("applicant_last_name") or "").strip()
firm = str(row.get("filing_representative_business_name") or "").strip()
title = str(row.get("applicant_professional_title") or "").strip()
# Build the most useful query we can from available fields
if first and last:
query = f'"{first} {last}" architect contact email'
elif firm and firm.upper() not in ("", "PREPARER", "N/A"):
query = f'"{firm}" architect engineer contact email'
else:
return ""
return "https://www.google.com/search?q=" + urllib.parse.quote(query)
return df.apply(make_url, axis=1)
# ---------- CORE FETCHERS ----------
def _fetch_filings_last_days(days: int) -> pd.DataFrame:
"""Fetch DOB NOW job filings from last N days."""
cache_key = f"job_filings_{days}"
cached = _get_cached(cache_key)
if cached is not None:
return cached
url = DATASET_URLS["job_filings"]
cutoff, cutoff_iso = _days_ago_cutoff(days)
# Use string comparison for date (works for ISO format text dates)
# Format: YYYY-MM-DD for string comparison
cutoff_str = cutoff.strftime("%Y-%m-%d")
params = {
"$where": f"filing_date > '{cutoff_str}'",
"$limit": 50000,
"$order": "filing_date DESC",
}
try:
data = _request(url, params)
except Exception as e:
print(f"[job_filings] Query failed: {e}")
# Fallback: no date filter, just get recent by order
print("[job_filings] Retrying without date filter...")
try:
params = {"$limit": 10000, "$order": "filing_date DESC"}
data = _request(url, params)
except Exception as e2:
print(f"[job_filings] Retry also failed: {e2}")
return pd.DataFrame()
df = pd.DataFrame(data)
if df.empty:
return df
print(f"[job_filings] Got {len(df)} rows")
# Normalize borough
if "borough" in df.columns:
df["borough"] = _norm_borough(df["borough"])
# Filter to allowed boroughs
if "borough" in df.columns:
df = df[df["borough"].isin(ALLOWED_BOROUGHS)].copy()
# Build full address
df["full_address"] = _full_address(df, "house_no", "street_name", "borough", "zip")
# Build Google search link for applicant contact info
df["applicant_search"] = _applicant_search_url(df)
# Convert filing_date to datetime
if "filing_date" in df.columns:
df["filing_date"] = _to_dt_naive(df["filing_date"])
# Filter by date in pandas as backup
df = df[df["filing_date"] >= cutoff].copy()
_set_cached(cache_key, df)
return df
def _fetch_permits_last_days(days: int) -> pd.DataFrame:
"""Fetch DOB NOW approved permits from last N days."""
cache_key = f"permit_issuance_{days}"
cached = _get_cached(cache_key)
if cached is not None:
return cached
url = DATASET_URLS["permit_issuance"]
cutoff, cutoff_iso = _days_ago_cutoff(days)
cutoff_str = cutoff.strftime("%Y-%m-%d")
params = {
"$where": f"approved_date > '{cutoff_str}'",
"$limit": 50000,
"$order": "approved_date DESC",
}
try:
data = _request(url, params)
except Exception as e:
print(f"[permit_issuance] Query failed: {e}")
print("[permit_issuance] Retrying without date filter...")
try:
params = {"$limit": 10000, "$order": "approved_date DESC"}
data = _request(url, params)
except Exception as e2:
print(f"[permit_issuance] Retry also failed: {e2}")
return pd.DataFrame()
df = pd.DataFrame(data)
if df.empty:
return df
print(f"[permit_issuance] Got {len(df)} rows")
# Normalize borough
if "borough" in df.columns:
df["borough"] = _norm_borough(df["borough"])
# Filter to allowed boroughs
if "borough" in df.columns:
df = df[df["borough"].isin(ALLOWED_BOROUGHS)].copy()
# Rename to common field name
if "approved_date" in df.columns:
df["filing_date"] = _to_dt_naive(df["approved_date"])
# Filter by date in pandas as backup
df = df[df["filing_date"] >= cutoff].copy()
# Build full address
df["full_address"] = _full_address(df, "house__", "street_name", "borough", "zip_code")
_set_cached(cache_key, df)
return df
def _fetch_electrical_last_days(days: int) -> pd.DataFrame:
"""Fetch DOB NOW electrical permits from last N days."""
cache_key = f"electrical_permits_{days}"
cached = _get_cached(cache_key)
if cached is not None:
return cached
url = DATASET_URLS["electrical_permits"]
cutoff, cutoff_iso = _days_ago_cutoff(days)
cutoff_str = cutoff.strftime("%Y-%m-%d")
params = {
"$where": f"filing_date > '{cutoff_str}'",
"$limit": 50000,
"$order": "filing_date DESC",
}
try:
data = _request(url, params)
except Exception as e:
print(f"[electrical_permits] Query failed: {e}")
print("[electrical_permits] Retrying without date filter...")
try:
params = {"$limit": 10000, "$order": "filing_date DESC"}
data = _request(url, params)
except Exception as e2:
print(f"[electrical_permits] Retry also failed: {e2}")
return pd.DataFrame()
df = pd.DataFrame(data)
if df.empty:
return df
print(f"[electrical_permits] Got {len(df)} rows")
# Normalize borough
if "borough" in df.columns:
df["borough"] = _norm_borough(df["borough"])
# Filter to allowed boroughs
if "borough" in df.columns:
df = df[df["borough"].isin(ALLOWED_BOROUGHS)].copy()
# Build full address
df["full_address"] = _full_address(df, "house_number", "street_name", "borough", "zip_code")
# Convert filing_date to datetime
if "filing_date" in df.columns:
df["filing_date"] = _to_dt_naive(df["filing_date"])
# Filter by date in pandas as backup
df = df[df["filing_date"] >= cutoff].copy()
_set_cached(cache_key, df)
return df
# ---------- STALLED CONSTRUCTION FETCHERS ----------
def _fetch_stalled_official() -> pd.DataFrame:
"""
Fetch official DOB Stalled Construction Sites complaints (i296-73x5).
Strategy:
1. Paginate through ALL rows (1.4M+)
2. Deduplicate by complaint_number (keep most recent)
3. Filter to complaints received in the last 18 months
4. Filter to allowed boroughs
"""
cache_key = "stalled_official"
cached = _get_cached(cache_key)
if cached is not None:
return cached
url = "https://data.cityofnewyork.us/resource/i296-73x5.json"
# Paginate through all data
all_data = []
offset = 0
page_size = 50000
print(f"[stalled_official] Fetching all rows (paginating by {page_size})...")
while True:
params = {
"$limit": page_size,
"$offset": offset,
}
try:
data = _request(url, params)
except Exception as e:
print(f"[stalled_official] API request failed at offset {offset}: {e}")
break
if not data:
break
all_data.extend(data)
print(f"[stalled_official] Fetched {len(all_data)} rows so far...")
if len(data) < page_size:
break
offset += page_size
if not all_data:
print("[stalled_official] No data returned from API")
return pd.DataFrame()
df = pd.DataFrame(all_data)
print(f"[stalled_official] Total rows fetched: {len(df)}")
# Deduplicate by complaint_number - keep first occurrence (arbitrary, they should be same complaint)
if "complaint_number" in df.columns:
before = len(df)
df = df.drop_duplicates(subset=["complaint_number"], keep="first")
print(f"[stalled_official] Deduplicated: {before} -> {len(df)} unique complaints")
# Parse complaint date
if "date_complaint_received" in df.columns:
df["complaint_date"] = pd.to_datetime(df["date_complaint_received"], errors="coerce")
# Filter to only complaints from the last 18 months
cutoff_date = pd.Timestamp.today() - pd.DateOffset(months=18)
before_filter = len(df)
df = df[df["complaint_date"] >= cutoff_date].copy()
print(f"[stalled_official] 18-month filter (>= {cutoff_date.date()}): {before_filter} -> {len(df)} rows")
# Calculate days stalled
df["days_stalled"] = (pd.Timestamp.today() - df["complaint_date"]).dt.days
# Map borough to standard names
boro_map = {
"1": "MANHATTAN", "MANHATTAN": "MANHATTAN", "Manhattan": "MANHATTAN",
"2": "BRONX", "BRONX": "BRONX", "Bronx": "BRONX",
"3": "BROOKLYN", "BROOKLYN": "BROOKLYN", "Brooklyn": "BROOKLYN",
"4": "QUEENS", "QUEENS": "QUEENS", "Queens": "QUEENS",
"5": "STATEN ISLAND", "STATEN ISLAND": "STATEN ISLAND", "Staten Island": "STATEN ISLAND",
}
if "borough_name" in df.columns:
df["borough"] = df["borough_name"].astype(str).str.strip().map(lambda x: boro_map.get(x, x.upper() if isinstance(x, str) else x))
# Filter to allowed boroughs
if "borough" in df.columns:
before_boro = len(df)
df = df[df["borough"].isin(ALLOWED_BOROUGHS)].copy()
print(f"[stalled_official] Borough filter: {before_boro} -> {len(df)} rows")
# Build full address
if "house_number" in df.columns and "street_name" in df.columns:
df["full_address"] = (
df["house_number"].fillna("").astype(str).str.strip() + " " +
df["street_name"].fillna("").astype(str).str.title().str.strip() + ", " +
df.get("borough", "").fillna("").astype(str)
)
# Sort by most stalled first (oldest complaint = most days)
if "days_stalled" in df.columns:
df = df.sort_values("days_stalled", ascending=False).reset_index(drop=True)
print(f"[stalled_official] Final: {len(df)} active stalled sites")
_set_cached(cache_key, df)
return df
def _fetch_likely_stalled() -> pd.DataFrame:
"""
Fetch stalled construction projects from DOB Stalled Construction Sites API.
This is now just an alias for the official stalled feed.
"""
return _fetch_stalled_official()
# ---------- DISTRESSED PROPERTIES ----------
def _fetch_hpd_vacate_orders() -> pd.DataFrame:
"""Fetch HPD Vacate Orders - buildings ordered vacated."""
cache_key = "hpd_vacate_orders"
cached = _get_cached(cache_key)
if cached is not None:
return cached
url = DATASET_URLS["hpd_vacate_orders"]
# Get orders from last 18 months
cutoff = (datetime.now() - timedelta(days=548)).strftime("%Y-%m-%dT00:00:00")
params = {
"$where": f"rescinddate IS NULL AND orderdate >= '{cutoff}'",
"$limit": 50000,
"$order": "orderdate DESC",
}
print(f"[hpd_vacate_orders] Fetching...")
try:
data = _request(url, params)
except Exception as e:
print(f"[hpd_vacate_orders] API error: {e}")
return pd.DataFrame()
df = pd.DataFrame(data)
if df.empty:
print("[hpd_vacate_orders] No data returned")
return df
print(f"[hpd_vacate_orders] Got {len(df)} rows")
# Normalize
df["distress_type"] = "HPD_VACATE"
df["distress_date"] = pd.to_datetime(df.get("orderdate"), errors="coerce")
# Build address
if "housenumber" in df.columns and "streetname" in df.columns:
df["full_address"] = (
df["housenumber"].fillna("").astype(str).str.strip() + " " +
df["streetname"].fillna("").astype(str).str.title().str.strip()
)
# Normalize borough
if "boro" in df.columns:
boro_map = {"1": "MANHATTAN", "2": "BRONX", "3": "BROOKLYN", "4": "QUEENS", "5": "STATEN ISLAND"}
df["borough"] = df["boro"].astype(str).map(lambda x: boro_map.get(x, x.upper()))
_set_cached(cache_key, df)
return df
def _fetch_dob_ecb_violations() -> pd.DataFrame:
"""Fetch DOB ECB Violations - filter for SWO (Stop Work Order) and WWP (Work Without Permit)."""
cache_key = "dob_ecb_violations"
cached = _get_cached(cache_key)
if cached is not None:
return cached
url = DATASET_URLS["dob_ecb_violations"]
# Get SWO and WWP violations from last 18 months
cutoff = (datetime.now() - timedelta(days=548)).strftime("%Y-%m-%dT00:00:00")
params = {
"$where": f"issue_date >= '{cutoff}' AND (violation_type LIKE '%SWO%' OR violation_type LIKE '%STOP%' OR infraction_code1 LIKE '%SWO%' OR ecb_violation_status = 'OPEN')",
"$limit": 50000,
"$order": "issue_date DESC",
}
print(f"[dob_ecb_violations] Fetching SWO/WWP violations...")
try:
data = _request(url, params)
except Exception as e:
print(f"[dob_ecb_violations] API error: {e}")
# Try simpler query
try:
params = {
"$where": f"issue_date >= '{cutoff}'",
"$limit": 20000,
"$order": "issue_date DESC",
}
data = _request(url, params)
except Exception as e2:
print(f"[dob_ecb_violations] Retry failed: {e2}")
return pd.DataFrame()
df = pd.DataFrame(data)
if df.empty:
print("[dob_ecb_violations] No data returned")
return df
print(f"[dob_ecb_violations] Got {len(df)} rows")
# Filter for construction-related violations (SWO, WWP, etc.)
violation_keywords = ["STOP WORK", "SWO", "WITHOUT PERMIT", "WWP", "ILLEGAL", "UNSAFE"]
mask = df.apply(lambda row: any(
kw in str(row.get("violation_type", "")).upper() or
kw in str(row.get("violation_description", "")).upper() or
kw in str(row.get("infraction_code1", "")).upper()
for kw in violation_keywords
), axis=1)
df = df[mask].copy()
print(f"[dob_ecb_violations] After keyword filter: {len(df)} rows")
# Normalize
df["distress_type"] = "ECB_VIOLATION"
df["distress_date"] = pd.to_datetime(df.get("issue_date"), errors="coerce")
# Build address
if "house_number" in df.columns and "street_name" in df.columns:
df["full_address"] = (
df["house_number"].fillna("").astype(str).str.strip() + " " +
df["street_name"].fillna("").astype(str).str.title().str.strip()
)
# Normalize borough
if "boro" in df.columns:
boro_map = {"1": "MANHATTAN", "2": "BRONX", "3": "BROOKLYN", "4": "QUEENS", "5": "STATEN ISLAND"}
df["borough"] = df["boro"].astype(str).map(lambda x: boro_map.get(x, x.upper()))
_set_cached(cache_key, df)
return df
def _fetch_vacant_unsecured() -> pd.DataFrame:
"""Fetch 311 complaints about vacant/unsecured buildings - often stalled construction."""
cache_key = "vacant_unsecured"
cached = _get_cached(cache_key)
if cached is not None:
return cached
url = DATASET_URLS["vacant_unsecured"]
# Get complaints from last 18 months that are still open
cutoff = (datetime.now() - timedelta(days=548)).strftime("%Y-%m-%dT00:00:00")
params = {
"$where": f"created_date >= '{cutoff}'",
"$limit": 50000,
"$order": "created_date DESC",
}
print(f"[vacant_unsecured] Fetching 311 complaints...")
try:
data = _request(url, params)
except Exception as e:
print(f"[vacant_unsecured] API error: {e}")
return pd.DataFrame()
df = pd.DataFrame(data)
if df.empty:
print("[vacant_unsecured] No data returned")
return df
print(f"[vacant_unsecured] Got {len(df)} rows")
# Normalize
df["distress_type"] = "VACANT_UNSECURED"
df["distress_date"] = pd.to_datetime(df.get("created_date"), errors="coerce")
# Build address from incident_address or components
if "incident_address" in df.columns:
df["full_address"] = df["incident_address"].fillna("").astype(str).str.title()
# Normalize borough
if "borough" in df.columns:
df["borough"] = df["borough"].astype(str).str.upper()
_set_cached(cache_key, df)
return df
def _fetch_dob_complaints() -> pd.DataFrame:
"""Fetch DOB complaints - filter for construction-related (illegal work, unsafe conditions)."""
cache_key = "dob_complaints_distressed"
cached = _get_cached(cache_key)
if cached is not None:
return cached
url = DATASET_URLS["dob_complaints"]
# Get complaints from last 18 months
cutoff = (datetime.now() - timedelta(days=548)).strftime("%Y-%m-%dT00:00:00")
# Complaint categories for distressed/stalled:
# 05 = Illegal Conversion, 45 = Construction, 71 = SRO Work W/O Permit, 83 = Debris/Unsafe
params = {
"$where": f"date_entered >= '{cutoff}' AND (complaint_category IN ('05', '45', '71', '83') OR status = 'OPEN')",
"$limit": 30000,
"$order": "date_entered DESC",
}
print(f"[dob_complaints] Fetching construction-related complaints...")
try:
data = _request(url, params)
except Exception as e:
print(f"[dob_complaints] API error: {e}")
# Try simpler query
try:
params = {
"$where": f"date_entered >= '{cutoff}'",
"$limit": 20000,
"$order": "date_entered DESC",
}
data = _request(url, params)
except Exception as e2:
print(f"[dob_complaints] Retry failed: {e2}")
return pd.DataFrame()
df = pd.DataFrame(data)
if df.empty:
print("[dob_complaints] No data returned")
return df
print(f"[dob_complaints] Got {len(df)} rows")
# Normalize
df["distress_type"] = "DOB_COMPLAINT"
df["distress_date"] = pd.to_datetime(df.get("date_entered"), errors="coerce")
# Build address
if "house_number" in df.columns and "street_name" in df.columns:
df["full_address"] = (
df["house_number"].fillna("").astype(str).str.strip() + " " +
df["street_name"].fillna("").astype(str).str.title().str.strip()
)
# Normalize borough
boro_map = {
"MANHATTAN": "MANHATTAN", "1": "MANHATTAN",
"BRONX": "BRONX", "2": "BRONX",
"BROOKLYN": "BROOKLYN", "3": "BROOKLYN",
"QUEENS": "QUEENS", "4": "QUEENS",
"STATEN ISLAND": "STATEN ISLAND", "5": "STATEN ISLAND",
}
if "borough" in df.columns:
df["borough"] = df["borough"].astype(str).str.upper().map(lambda x: boro_map.get(x, x))
_set_cached(cache_key, df)
return df
def _fetch_distressed_properties() -> pd.DataFrame:
"""
Aggregate distressed properties from multiple sources:
- HPD Vacate Orders
- DOB ECB Violations (SWO/WWP)
- 311 Vacant/Unsecured complaints
- DOB Complaints (construction-related)
Cross-reference and score by distress level.
"""
cache_key = "distressed_combined"
cached = _get_cached(cache_key)
if cached is not None:
return cached
print("[distressed] Fetching from all sources...")
# Fetch all sources in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
future_vacate = executor.submit(_fetch_hpd_vacate_orders)
future_ecb = executor.submit(_fetch_dob_ecb_violations)
future_vacant = executor.submit(_fetch_vacant_unsecured)
future_complaints = executor.submit(_fetch_dob_complaints)
df_vacate = future_vacate.result()
df_ecb = future_ecb.result()
df_vacant = future_vacant.result()
df_complaints = future_complaints.result()
# Standardize columns for each source
all_dfs = []
# Common columns we want to keep
common_cols = ["bin", "bbl", "full_address", "borough", "distress_type", "distress_date"]
for df, source_name in [
(df_vacate, "HPD_VACATE"),
(df_ecb, "ECB_VIOLATION"),
(df_vacant, "VACANT_UNSECURED"),
(df_complaints, "DOB_COMPLAINT"),
]:
if df.empty:
continue
# Ensure we have common columns
for col in common_cols:
if col not in df.columns:
df[col] = None
df["source"] = source_name
all_dfs.append(df)
if not all_dfs:
print("[distressed] No data from any source")
return pd.DataFrame()
# Combine all sources
combined = pd.concat(all_dfs, ignore_index=True)
print(f"[distressed] Combined total: {len(combined)} rows")
# Normalize BIN for grouping
if "bin" in combined.columns:
combined["bin"] = combined["bin"].fillna("").astype(str).str.strip()
# Group by property (using BIN or address) and count distress signals
# Properties with multiple signals are higher priority
# Create a property key (prefer BIN, fallback to address)
combined["property_key"] = combined.apply(
lambda r: str(r.get("bin", "")).strip() if str(r.get("bin", "")).strip() and str(r.get("bin", "")).strip() != "0"
else str(r.get("full_address", "")).strip().upper(),
axis=1
)
# Count distress signals per property
distress_counts = combined.groupby("property_key").agg({
"distress_type": lambda x: ", ".join(sorted(set(x))),
"source": "count",
}).rename(columns={"source": "distress_count", "distress_type": "distress_types"})
# Merge counts back
combined = combined.merge(distress_counts, on="property_key", how="left")
# Dedupe - keep one row per property with most recent distress date
combined = combined.sort_values("distress_date", ascending=False)
combined = combined.drop_duplicates(subset=["property_key"], keep="first")
# Calculate days since distress
combined["days_since_distress"] = (pd.Timestamp.today() - combined["distress_date"]).dt.days
# Score: more distress signals = higher score
combined["distress_score"] = combined["distress_count"].fillna(1).astype(int)
# Sort by score (desc) then by date (most recent first within same score)
combined = combined.sort_values(
["distress_score", "distress_date"],
ascending=[False, False]
).reset_index(drop=True)
# Filter to allowed boroughs
if "borough" in combined.columns:
combined = combined[combined["borough"].isin(ALLOWED_BOROUGHS)].copy()
print(f"[distressed] Final: {len(combined)} unique distressed properties")
_set_cached(cache_key, combined)
return combined
# ---------- LEADS UNPERMITTED ----------
def _fetch_leads_unpermitted(days: int = DEFAULT_DAYS_WINDOW) -> Tuple[pd.DataFrame, float]:
"""
Find filings that don't have corresponding permits yet.
Cross-reference job_filings with permit_issuance.
"""
t0 = time.time()
# Get filings
filings_df = _fetch_filings_last_days(days)
if filings_df.empty:
return pd.DataFrame(), time.time() - t0
# Get permits
permits_df = _fetch_permits_last_days(days)
# Extract base job numbers for matching
if "job_filing_number" in filings_df.columns:
filings_df["_job_base"] = filings_df["job_filing_number"].apply(_job_base)
else:
filings_df["_job_base"] = ""
# Get permitted job bases
permitted_jobs = set()
if not permits_df.empty and "job__" in permits_df.columns:
permitted_jobs = set(permits_df["job__"].dropna().astype(str).str.strip())
# Filter to unpermitted filings
mask = ~filings_df["_job_base"].isin(permitted_jobs)
unpermitted = filings_df[mask].copy()
# Drop helper column
unpermitted.drop(columns=["_job_base"], inplace=True, errors="ignore")
return unpermitted, time.time() - t0
# ---------- public API ----------
class SocrataClient:
def __init__(self) -> None:
if not SOCRATA_APP_TOKEN:
print("⚠️ SOCRATA_APP_TOKEN not set – API may cap at 1,000 rows.")
def fetch_dataset_last_n_days(
self,
dataset_key: str,
days: int,
) -> Tuple[pd.DataFrame, float]:
t0 = time.time()
if dataset_key == "job_filings":
df = _fetch_filings_last_days(days)
elif dataset_key == "permit_issuance":
df = _fetch_permits_last_days(days)
elif dataset_key == "electrical_permits":
df = _fetch_electrical_last_days(days)
elif dataset_key == "stalled_official":
df = _fetch_stalled_official()
elif dataset_key == "likely_stalled":
df = _fetch_likely_stalled()
elif dataset_key == "distressed_properties":
df = _fetch_distressed_properties()
else:
raise ValueError(f"Unknown dataset: {dataset_key}")
return df, time.time() - t0
def fetch_leads_unpermitted(
self,
days: int = DEFAULT_DAYS_WINDOW,
) -> Tuple[pd.DataFrame, float]:
return _fetch_leads_unpermitted(days)