# services/data.py from __future__ import annotations import time import concurrent.futures from datetime import datetime, timedelta from typing import Any, Dict, List, Tuple, Optional import requests import pandas as pd from config import ( SOCRATA_APP_TOKEN, ALLOWED_BOROUGHS, DEFAULT_DAYS_WINDOW, ) # ---------- Socrata endpoints ---------- DATASET_URLS: Dict[str, str] = { "job_filings": "https://data.cityofnewyork.us/resource/w9ak-ipjd.json", "permit_issuance": "https://data.cityofnewyork.us/resource/rbx6-tga4.json", "electrical_permits": "https://data.cityofnewyork.us/resource/dm9a-ab7w.json", # Stalled construction complaints (official DOB dataset) "stalled_official": "https://data.cityofnewyork.us/resource/i296-73x5.json", # BIS Job Application Filings (legacy system - for finding dormant jobs) "bis_job_filings": "https://data.cityofnewyork.us/resource/ic3t-wcy2.json", # Distressed properties sources "hpd_vacate_orders": "https://data.cityofnewyork.us/resource/tb8q-a3ar.json", "dob_ecb_violations": "https://data.cityofnewyork.us/resource/6bgk-3dad.json", "vacant_unsecured": "https://data.cityofnewyork.us/resource/br7h-6m8v.json", "dob_complaints": "https://data.cityofnewyork.us/resource/eabe-havv.json", } # Per dataset core field map DATASET_FIELD_MAP: Dict[str, Dict[str, str]] = { "job_filings": { "filing_date": "filing_date", "borough": "borough", "house_no": "house_no", "street_name": "street_name", "zip": "zip", "job_id": "job_filing_number", "job_status": "filing_status", "job_type": "job_type", "desc": "job_description", }, "permit_issuance": { "filing_date": "approved_date", "borough": "borough", "house_no": "house__", "street_name": "street_name", "zip": "zip_code", "job_id": "job__", "permit_type": "permittee_s_license_type", "desc": "job_description", }, "electrical_permits": { "filing_date": "filing_date", "borough": "borough", "house_no": "house_number", "street_name": "street_name", "zip": "zip_code", "job_id": "job_filing_number", "job_status": "filing_status", }, } # ---------- Simple in-memory cache ---------- _cache: Dict[str, Tuple[pd.DataFrame, datetime]] = {} CACHE_TTL_MINUTES = 10 def _get_cached(key: str) -> Optional[pd.DataFrame]: entry = _cache.get(key) if not entry: return None df, cached_at = entry if datetime.now() - cached_at < timedelta(minutes=CACHE_TTL_MINUTES): print(f"[cache] Using cached data for {key}") return df.copy() del _cache[key] return None def _set_cached(key: str, df: pd.DataFrame) -> None: _cache[key] = (df.copy(), datetime.now()) # ---------- helpers ---------- def _headers() -> Dict[str, str]: """ Build headers for Socrata API requests. SODA3 requires authentication via app token for all requests. """ h: Dict[str, str] = { "Accept": "application/json", } if SOCRATA_APP_TOKEN: h["X-App-Token"] = SOCRATA_APP_TOKEN return h def _request(url: str, params: Dict[str, Any]) -> List[Dict[str, Any]]: """ Make a request to the Socrata API. Handles both SODA2 and SODA3 endpoints. """ headers = _headers() # Log if no token (will likely fail on SODA3) if "X-App-Token" not in headers: print("⚠️ No SOCRATA_APP_TOKEN - request may be throttled or rejected") try: r = requests.get(url, headers=headers, params=params, timeout=60) except requests.exceptions.Timeout: raise RuntimeError(f"API request timed out for {url}") except requests.exceptions.RequestException as e: raise RuntimeError(f"API request failed: {e}") if r.status_code == 403: raise RuntimeError( f"API returned 403 Forbidden. This likely means:\n" f" 1. SOCRATA_APP_TOKEN is missing or invalid\n" f" 2. The dataset requires authentication\n" f" URL: {url}\n" f" Response: {r.text[:200]}" ) elif r.status_code == 429: raise RuntimeError( f"API rate limit exceeded (429). Set SOCRATA_APP_TOKEN for higher limits.\n" f" URL: {url}" ) elif r.status_code != 200: raise RuntimeError(f"API request failed: {r.status_code} {r.text[:500]}") return r.json() def _to_dt_naive(series: pd.Series) -> pd.Series: s = pd.to_datetime(series, errors="coerce", utc=True) return s.dt.tz_localize(None) def _norm_borough(series: pd.Series) -> pd.Series: m = { "MN": "MANHATTAN", "BX": "BRONX", "BK": "BROOKLYN", "QN": "QUEENS", "SI": "STATEN ISLAND", "1": "MANHATTAN", "2": "BRONX", "3": "BROOKLYN", "4": "QUEENS", "5": "STATEN ISLAND", } return series.astype(str).str.strip().str.upper().map(lambda x: m.get(x, x)) def _full_address( df: pd.DataFrame, house_col: str, street_col: str, borough_col: str, zip_col: str | None, ) -> pd.Series: def join(row): parts = [] h = str(row.get(house_col, "") or "").strip() s = str(row.get(street_col, "") or "").strip() b = str(row.get(borough_col, "") or "").strip() z = str(row.get(zip_col, "") or "").strip() if zip_col else "" if h: parts.append(h) if s: parts.append(s) if b: parts.append(b) if z: parts.append(z) return ", ".join(p for p in parts if p) return df.apply(join, axis=1) def _days_ago_cutoff(days: int) -> Tuple[pd.Timestamp, str]: now = pd.Timestamp.utcnow().tz_localize(None) cutoff = now - pd.Timedelta(days=days) cutoff_iso = (cutoff.tz_localize("UTC").isoformat()).replace("+00:00", "Z") return cutoff, cutoff_iso def _job_base(job_filing_number: str) -> str: if not isinstance(job_filing_number, str): return "" return job_filing_number.split("-", 1)[0].strip() def _fetch_page_parallel( url: str, params: Dict[str, Any], page: int, offset: int, ) -> Tuple[List[Dict[str, Any]], int, float]: params_copy = params.copy() params_copy["$offset"] = offset t0 = time.time() rows = _request(url, params_copy) return rows, page, time.time() - t0 def _applicant_search_url(df: pd.DataFrame) -> pd.Series: """Build a Google search URL for each applicant to help find contact info.""" import urllib.parse def make_url(row): first = str(row.get("applicant_first_name") or "").strip() last = str(row.get("applicant_last_name") or "").strip() firm = str(row.get("filing_representative_business_name") or "").strip() title = str(row.get("applicant_professional_title") or "").strip() # Build the most useful query we can from available fields if first and last: query = f'"{first} {last}" architect contact email' elif firm and firm.upper() not in ("", "PREPARER", "N/A"): query = f'"{firm}" architect engineer contact email' else: return "" return "https://www.google.com/search?q=" + urllib.parse.quote(query) return df.apply(make_url, axis=1) # ---------- CORE FETCHERS ---------- def _fetch_filings_last_days(days: int) -> pd.DataFrame: """Fetch DOB NOW job filings from last N days.""" cache_key = f"job_filings_{days}" cached = _get_cached(cache_key) if cached is not None: return cached url = DATASET_URLS["job_filings"] cutoff, cutoff_iso = _days_ago_cutoff(days) # Use string comparison for date (works for ISO format text dates) # Format: YYYY-MM-DD for string comparison cutoff_str = cutoff.strftime("%Y-%m-%d") params = { "$where": f"filing_date > '{cutoff_str}'", "$limit": 50000, "$order": "filing_date DESC", } try: data = _request(url, params) except Exception as e: print(f"[job_filings] Query failed: {e}") # Fallback: no date filter, just get recent by order print("[job_filings] Retrying without date filter...") try: params = {"$limit": 10000, "$order": "filing_date DESC"} data = _request(url, params) except Exception as e2: print(f"[job_filings] Retry also failed: {e2}") return pd.DataFrame() df = pd.DataFrame(data) if df.empty: return df print(f"[job_filings] Got {len(df)} rows") # Normalize borough if "borough" in df.columns: df["borough"] = _norm_borough(df["borough"]) # Filter to allowed boroughs if "borough" in df.columns: df = df[df["borough"].isin(ALLOWED_BOROUGHS)].copy() # Build full address df["full_address"] = _full_address(df, "house_no", "street_name", "borough", "zip") # Build Google search link for applicant contact info df["applicant_search"] = _applicant_search_url(df) # Convert filing_date to datetime if "filing_date" in df.columns: df["filing_date"] = _to_dt_naive(df["filing_date"]) # Filter by date in pandas as backup df = df[df["filing_date"] >= cutoff].copy() _set_cached(cache_key, df) return df def _fetch_permits_last_days(days: int) -> pd.DataFrame: """Fetch DOB NOW approved permits from last N days.""" cache_key = f"permit_issuance_{days}" cached = _get_cached(cache_key) if cached is not None: return cached url = DATASET_URLS["permit_issuance"] cutoff, cutoff_iso = _days_ago_cutoff(days) cutoff_str = cutoff.strftime("%Y-%m-%d") params = { "$where": f"approved_date > '{cutoff_str}'", "$limit": 50000, "$order": "approved_date DESC", } try: data = _request(url, params) except Exception as e: print(f"[permit_issuance] Query failed: {e}") print("[permit_issuance] Retrying without date filter...") try: params = {"$limit": 10000, "$order": "approved_date DESC"} data = _request(url, params) except Exception as e2: print(f"[permit_issuance] Retry also failed: {e2}") return pd.DataFrame() df = pd.DataFrame(data) if df.empty: return df print(f"[permit_issuance] Got {len(df)} rows") # Normalize borough if "borough" in df.columns: df["borough"] = _norm_borough(df["borough"]) # Filter to allowed boroughs if "borough" in df.columns: df = df[df["borough"].isin(ALLOWED_BOROUGHS)].copy() # Rename to common field name if "approved_date" in df.columns: df["filing_date"] = _to_dt_naive(df["approved_date"]) # Filter by date in pandas as backup df = df[df["filing_date"] >= cutoff].copy() # Build full address df["full_address"] = _full_address(df, "house__", "street_name", "borough", "zip_code") _set_cached(cache_key, df) return df def _fetch_electrical_last_days(days: int) -> pd.DataFrame: """Fetch DOB NOW electrical permits from last N days.""" cache_key = f"electrical_permits_{days}" cached = _get_cached(cache_key) if cached is not None: return cached url = DATASET_URLS["electrical_permits"] cutoff, cutoff_iso = _days_ago_cutoff(days) cutoff_str = cutoff.strftime("%Y-%m-%d") params = { "$where": f"filing_date > '{cutoff_str}'", "$limit": 50000, "$order": "filing_date DESC", } try: data = _request(url, params) except Exception as e: print(f"[electrical_permits] Query failed: {e}") print("[electrical_permits] Retrying without date filter...") try: params = {"$limit": 10000, "$order": "filing_date DESC"} data = _request(url, params) except Exception as e2: print(f"[electrical_permits] Retry also failed: {e2}") return pd.DataFrame() df = pd.DataFrame(data) if df.empty: return df print(f"[electrical_permits] Got {len(df)} rows") # Normalize borough if "borough" in df.columns: df["borough"] = _norm_borough(df["borough"]) # Filter to allowed boroughs if "borough" in df.columns: df = df[df["borough"].isin(ALLOWED_BOROUGHS)].copy() # Build full address df["full_address"] = _full_address(df, "house_number", "street_name", "borough", "zip_code") # Convert filing_date to datetime if "filing_date" in df.columns: df["filing_date"] = _to_dt_naive(df["filing_date"]) # Filter by date in pandas as backup df = df[df["filing_date"] >= cutoff].copy() _set_cached(cache_key, df) return df # ---------- STALLED CONSTRUCTION FETCHERS ---------- def _fetch_stalled_official() -> pd.DataFrame: """ Fetch official DOB Stalled Construction Sites complaints (i296-73x5). Strategy: 1. Paginate through ALL rows (1.4M+) 2. Deduplicate by complaint_number (keep most recent) 3. Filter to complaints received in the last 18 months 4. Filter to allowed boroughs """ cache_key = "stalled_official" cached = _get_cached(cache_key) if cached is not None: return cached url = "https://data.cityofnewyork.us/resource/i296-73x5.json" # Paginate through all data all_data = [] offset = 0 page_size = 50000 print(f"[stalled_official] Fetching all rows (paginating by {page_size})...") while True: params = { "$limit": page_size, "$offset": offset, } try: data = _request(url, params) except Exception as e: print(f"[stalled_official] API request failed at offset {offset}: {e}") break if not data: break all_data.extend(data) print(f"[stalled_official] Fetched {len(all_data)} rows so far...") if len(data) < page_size: break offset += page_size if not all_data: print("[stalled_official] No data returned from API") return pd.DataFrame() df = pd.DataFrame(all_data) print(f"[stalled_official] Total rows fetched: {len(df)}") # Deduplicate by complaint_number - keep first occurrence (arbitrary, they should be same complaint) if "complaint_number" in df.columns: before = len(df) df = df.drop_duplicates(subset=["complaint_number"], keep="first") print(f"[stalled_official] Deduplicated: {before} -> {len(df)} unique complaints") # Parse complaint date if "date_complaint_received" in df.columns: df["complaint_date"] = pd.to_datetime(df["date_complaint_received"], errors="coerce") # Filter to only complaints from the last 18 months cutoff_date = pd.Timestamp.today() - pd.DateOffset(months=18) before_filter = len(df) df = df[df["complaint_date"] >= cutoff_date].copy() print(f"[stalled_official] 18-month filter (>= {cutoff_date.date()}): {before_filter} -> {len(df)} rows") # Calculate days stalled df["days_stalled"] = (pd.Timestamp.today() - df["complaint_date"]).dt.days # Map borough to standard names boro_map = { "1": "MANHATTAN", "MANHATTAN": "MANHATTAN", "Manhattan": "MANHATTAN", "2": "BRONX", "BRONX": "BRONX", "Bronx": "BRONX", "3": "BROOKLYN", "BROOKLYN": "BROOKLYN", "Brooklyn": "BROOKLYN", "4": "QUEENS", "QUEENS": "QUEENS", "Queens": "QUEENS", "5": "STATEN ISLAND", "STATEN ISLAND": "STATEN ISLAND", "Staten Island": "STATEN ISLAND", } if "borough_name" in df.columns: df["borough"] = df["borough_name"].astype(str).str.strip().map(lambda x: boro_map.get(x, x.upper() if isinstance(x, str) else x)) # Filter to allowed boroughs if "borough" in df.columns: before_boro = len(df) df = df[df["borough"].isin(ALLOWED_BOROUGHS)].copy() print(f"[stalled_official] Borough filter: {before_boro} -> {len(df)} rows") # Build full address if "house_number" in df.columns and "street_name" in df.columns: df["full_address"] = ( df["house_number"].fillna("").astype(str).str.strip() + " " + df["street_name"].fillna("").astype(str).str.title().str.strip() + ", " + df.get("borough", "").fillna("").astype(str) ) # Sort by most stalled first (oldest complaint = most days) if "days_stalled" in df.columns: df = df.sort_values("days_stalled", ascending=False).reset_index(drop=True) print(f"[stalled_official] Final: {len(df)} active stalled sites") _set_cached(cache_key, df) return df def _fetch_likely_stalled() -> pd.DataFrame: """ Fetch stalled construction projects from DOB Stalled Construction Sites API. This is now just an alias for the official stalled feed. """ return _fetch_stalled_official() # ---------- DISTRESSED PROPERTIES ---------- def _fetch_hpd_vacate_orders() -> pd.DataFrame: """Fetch HPD Vacate Orders - buildings ordered vacated.""" cache_key = "hpd_vacate_orders" cached = _get_cached(cache_key) if cached is not None: return cached url = DATASET_URLS["hpd_vacate_orders"] # Get orders from last 18 months cutoff = (datetime.now() - timedelta(days=548)).strftime("%Y-%m-%dT00:00:00") params = { "$where": f"rescinddate IS NULL AND orderdate >= '{cutoff}'", "$limit": 50000, "$order": "orderdate DESC", } print(f"[hpd_vacate_orders] Fetching...") try: data = _request(url, params) except Exception as e: print(f"[hpd_vacate_orders] API error: {e}") return pd.DataFrame() df = pd.DataFrame(data) if df.empty: print("[hpd_vacate_orders] No data returned") return df print(f"[hpd_vacate_orders] Got {len(df)} rows") # Normalize df["distress_type"] = "HPD_VACATE" df["distress_date"] = pd.to_datetime(df.get("orderdate"), errors="coerce") # Build address if "housenumber" in df.columns and "streetname" in df.columns: df["full_address"] = ( df["housenumber"].fillna("").astype(str).str.strip() + " " + df["streetname"].fillna("").astype(str).str.title().str.strip() ) # Normalize borough if "boro" in df.columns: boro_map = {"1": "MANHATTAN", "2": "BRONX", "3": "BROOKLYN", "4": "QUEENS", "5": "STATEN ISLAND"} df["borough"] = df["boro"].astype(str).map(lambda x: boro_map.get(x, x.upper())) _set_cached(cache_key, df) return df def _fetch_dob_ecb_violations() -> pd.DataFrame: """Fetch DOB ECB Violations - filter for SWO (Stop Work Order) and WWP (Work Without Permit).""" cache_key = "dob_ecb_violations" cached = _get_cached(cache_key) if cached is not None: return cached url = DATASET_URLS["dob_ecb_violations"] # Get SWO and WWP violations from last 18 months cutoff = (datetime.now() - timedelta(days=548)).strftime("%Y-%m-%dT00:00:00") params = { "$where": f"issue_date >= '{cutoff}' AND (violation_type LIKE '%SWO%' OR violation_type LIKE '%STOP%' OR infraction_code1 LIKE '%SWO%' OR ecb_violation_status = 'OPEN')", "$limit": 50000, "$order": "issue_date DESC", } print(f"[dob_ecb_violations] Fetching SWO/WWP violations...") try: data = _request(url, params) except Exception as e: print(f"[dob_ecb_violations] API error: {e}") # Try simpler query try: params = { "$where": f"issue_date >= '{cutoff}'", "$limit": 20000, "$order": "issue_date DESC", } data = _request(url, params) except Exception as e2: print(f"[dob_ecb_violations] Retry failed: {e2}") return pd.DataFrame() df = pd.DataFrame(data) if df.empty: print("[dob_ecb_violations] No data returned") return df print(f"[dob_ecb_violations] Got {len(df)} rows") # Filter for construction-related violations (SWO, WWP, etc.) violation_keywords = ["STOP WORK", "SWO", "WITHOUT PERMIT", "WWP", "ILLEGAL", "UNSAFE"] mask = df.apply(lambda row: any( kw in str(row.get("violation_type", "")).upper() or kw in str(row.get("violation_description", "")).upper() or kw in str(row.get("infraction_code1", "")).upper() for kw in violation_keywords ), axis=1) df = df[mask].copy() print(f"[dob_ecb_violations] After keyword filter: {len(df)} rows") # Normalize df["distress_type"] = "ECB_VIOLATION" df["distress_date"] = pd.to_datetime(df.get("issue_date"), errors="coerce") # Build address if "house_number" in df.columns and "street_name" in df.columns: df["full_address"] = ( df["house_number"].fillna("").astype(str).str.strip() + " " + df["street_name"].fillna("").astype(str).str.title().str.strip() ) # Normalize borough if "boro" in df.columns: boro_map = {"1": "MANHATTAN", "2": "BRONX", "3": "BROOKLYN", "4": "QUEENS", "5": "STATEN ISLAND"} df["borough"] = df["boro"].astype(str).map(lambda x: boro_map.get(x, x.upper())) _set_cached(cache_key, df) return df def _fetch_vacant_unsecured() -> pd.DataFrame: """Fetch 311 complaints about vacant/unsecured buildings - often stalled construction.""" cache_key = "vacant_unsecured" cached = _get_cached(cache_key) if cached is not None: return cached url = DATASET_URLS["vacant_unsecured"] # Get complaints from last 18 months that are still open cutoff = (datetime.now() - timedelta(days=548)).strftime("%Y-%m-%dT00:00:00") params = { "$where": f"created_date >= '{cutoff}'", "$limit": 50000, "$order": "created_date DESC", } print(f"[vacant_unsecured] Fetching 311 complaints...") try: data = _request(url, params) except Exception as e: print(f"[vacant_unsecured] API error: {e}") return pd.DataFrame() df = pd.DataFrame(data) if df.empty: print("[vacant_unsecured] No data returned") return df print(f"[vacant_unsecured] Got {len(df)} rows") # Normalize df["distress_type"] = "VACANT_UNSECURED" df["distress_date"] = pd.to_datetime(df.get("created_date"), errors="coerce") # Build address from incident_address or components if "incident_address" in df.columns: df["full_address"] = df["incident_address"].fillna("").astype(str).str.title() # Normalize borough if "borough" in df.columns: df["borough"] = df["borough"].astype(str).str.upper() _set_cached(cache_key, df) return df def _fetch_dob_complaints() -> pd.DataFrame: """Fetch DOB complaints - filter for construction-related (illegal work, unsafe conditions).""" cache_key = "dob_complaints_distressed" cached = _get_cached(cache_key) if cached is not None: return cached url = DATASET_URLS["dob_complaints"] # Get complaints from last 18 months cutoff = (datetime.now() - timedelta(days=548)).strftime("%Y-%m-%dT00:00:00") # Complaint categories for distressed/stalled: # 05 = Illegal Conversion, 45 = Construction, 71 = SRO Work W/O Permit, 83 = Debris/Unsafe params = { "$where": f"date_entered >= '{cutoff}' AND (complaint_category IN ('05', '45', '71', '83') OR status = 'OPEN')", "$limit": 30000, "$order": "date_entered DESC", } print(f"[dob_complaints] Fetching construction-related complaints...") try: data = _request(url, params) except Exception as e: print(f"[dob_complaints] API error: {e}") # Try simpler query try: params = { "$where": f"date_entered >= '{cutoff}'", "$limit": 20000, "$order": "date_entered DESC", } data = _request(url, params) except Exception as e2: print(f"[dob_complaints] Retry failed: {e2}") return pd.DataFrame() df = pd.DataFrame(data) if df.empty: print("[dob_complaints] No data returned") return df print(f"[dob_complaints] Got {len(df)} rows") # Normalize df["distress_type"] = "DOB_COMPLAINT" df["distress_date"] = pd.to_datetime(df.get("date_entered"), errors="coerce") # Build address if "house_number" in df.columns and "street_name" in df.columns: df["full_address"] = ( df["house_number"].fillna("").astype(str).str.strip() + " " + df["street_name"].fillna("").astype(str).str.title().str.strip() ) # Normalize borough boro_map = { "MANHATTAN": "MANHATTAN", "1": "MANHATTAN", "BRONX": "BRONX", "2": "BRONX", "BROOKLYN": "BROOKLYN", "3": "BROOKLYN", "QUEENS": "QUEENS", "4": "QUEENS", "STATEN ISLAND": "STATEN ISLAND", "5": "STATEN ISLAND", } if "borough" in df.columns: df["borough"] = df["borough"].astype(str).str.upper().map(lambda x: boro_map.get(x, x)) _set_cached(cache_key, df) return df def _fetch_distressed_properties() -> pd.DataFrame: """ Aggregate distressed properties from multiple sources: - HPD Vacate Orders - DOB ECB Violations (SWO/WWP) - 311 Vacant/Unsecured complaints - DOB Complaints (construction-related) Cross-reference and score by distress level. """ cache_key = "distressed_combined" cached = _get_cached(cache_key) if cached is not None: return cached print("[distressed] Fetching from all sources...") # Fetch all sources in parallel with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: future_vacate = executor.submit(_fetch_hpd_vacate_orders) future_ecb = executor.submit(_fetch_dob_ecb_violations) future_vacant = executor.submit(_fetch_vacant_unsecured) future_complaints = executor.submit(_fetch_dob_complaints) df_vacate = future_vacate.result() df_ecb = future_ecb.result() df_vacant = future_vacant.result() df_complaints = future_complaints.result() # Standardize columns for each source all_dfs = [] # Common columns we want to keep common_cols = ["bin", "bbl", "full_address", "borough", "distress_type", "distress_date"] for df, source_name in [ (df_vacate, "HPD_VACATE"), (df_ecb, "ECB_VIOLATION"), (df_vacant, "VACANT_UNSECURED"), (df_complaints, "DOB_COMPLAINT"), ]: if df.empty: continue # Ensure we have common columns for col in common_cols: if col not in df.columns: df[col] = None df["source"] = source_name all_dfs.append(df) if not all_dfs: print("[distressed] No data from any source") return pd.DataFrame() # Combine all sources combined = pd.concat(all_dfs, ignore_index=True) print(f"[distressed] Combined total: {len(combined)} rows") # Normalize BIN for grouping if "bin" in combined.columns: combined["bin"] = combined["bin"].fillna("").astype(str).str.strip() # Group by property (using BIN or address) and count distress signals # Properties with multiple signals are higher priority # Create a property key (prefer BIN, fallback to address) combined["property_key"] = combined.apply( lambda r: str(r.get("bin", "")).strip() if str(r.get("bin", "")).strip() and str(r.get("bin", "")).strip() != "0" else str(r.get("full_address", "")).strip().upper(), axis=1 ) # Count distress signals per property distress_counts = combined.groupby("property_key").agg({ "distress_type": lambda x: ", ".join(sorted(set(x))), "source": "count", }).rename(columns={"source": "distress_count", "distress_type": "distress_types"}) # Merge counts back combined = combined.merge(distress_counts, on="property_key", how="left") # Dedupe - keep one row per property with most recent distress date combined = combined.sort_values("distress_date", ascending=False) combined = combined.drop_duplicates(subset=["property_key"], keep="first") # Calculate days since distress combined["days_since_distress"] = (pd.Timestamp.today() - combined["distress_date"]).dt.days # Score: more distress signals = higher score combined["distress_score"] = combined["distress_count"].fillna(1).astype(int) # Sort by score (desc) then by date (most recent first within same score) combined = combined.sort_values( ["distress_score", "distress_date"], ascending=[False, False] ).reset_index(drop=True) # Filter to allowed boroughs if "borough" in combined.columns: combined = combined[combined["borough"].isin(ALLOWED_BOROUGHS)].copy() print(f"[distressed] Final: {len(combined)} unique distressed properties") _set_cached(cache_key, combined) return combined # ---------- LEADS UNPERMITTED ---------- def _fetch_leads_unpermitted(days: int = DEFAULT_DAYS_WINDOW) -> Tuple[pd.DataFrame, float]: """ Find filings that don't have corresponding permits yet. Cross-reference job_filings with permit_issuance. """ t0 = time.time() # Get filings filings_df = _fetch_filings_last_days(days) if filings_df.empty: return pd.DataFrame(), time.time() - t0 # Get permits permits_df = _fetch_permits_last_days(days) # Extract base job numbers for matching if "job_filing_number" in filings_df.columns: filings_df["_job_base"] = filings_df["job_filing_number"].apply(_job_base) else: filings_df["_job_base"] = "" # Get permitted job bases permitted_jobs = set() if not permits_df.empty and "job__" in permits_df.columns: permitted_jobs = set(permits_df["job__"].dropna().astype(str).str.strip()) # Filter to unpermitted filings mask = ~filings_df["_job_base"].isin(permitted_jobs) unpermitted = filings_df[mask].copy() # Drop helper column unpermitted.drop(columns=["_job_base"], inplace=True, errors="ignore") return unpermitted, time.time() - t0 # ---------- public API ---------- class SocrataClient: def __init__(self) -> None: if not SOCRATA_APP_TOKEN: print("⚠️ SOCRATA_APP_TOKEN not set – API may cap at 1,000 rows.") def fetch_dataset_last_n_days( self, dataset_key: str, days: int, ) -> Tuple[pd.DataFrame, float]: t0 = time.time() if dataset_key == "job_filings": df = _fetch_filings_last_days(days) elif dataset_key == "permit_issuance": df = _fetch_permits_last_days(days) elif dataset_key == "electrical_permits": df = _fetch_electrical_last_days(days) elif dataset_key == "stalled_official": df = _fetch_stalled_official() elif dataset_key == "likely_stalled": df = _fetch_likely_stalled() elif dataset_key == "distressed_properties": df = _fetch_distressed_properties() else: raise ValueError(f"Unknown dataset: {dataset_key}") return df, time.time() - t0 def fetch_leads_unpermitted( self, days: int = DEFAULT_DAYS_WINDOW, ) -> Tuple[pd.DataFrame, float]: return _fetch_leads_unpermitted(days)