Spaces:
Sleeping
Sleeping
Chest Pain
v1.95: applicant info columns, Google search links, job type filter, fix all filters
92e9edc | # services/data.py | |
| from __future__ import annotations | |
| import time | |
| import concurrent.futures | |
| from datetime import datetime, timedelta | |
| from typing import Any, Dict, List, Tuple, Optional | |
| import requests | |
| import pandas as pd | |
| from config import ( | |
| SOCRATA_APP_TOKEN, | |
| ALLOWED_BOROUGHS, | |
| DEFAULT_DAYS_WINDOW, | |
| ) | |
| # ---------- Socrata endpoints ---------- | |
| DATASET_URLS: Dict[str, str] = { | |
| "job_filings": "https://data.cityofnewyork.us/resource/w9ak-ipjd.json", | |
| "permit_issuance": "https://data.cityofnewyork.us/resource/rbx6-tga4.json", | |
| "electrical_permits": "https://data.cityofnewyork.us/resource/dm9a-ab7w.json", | |
| # Stalled construction complaints (official DOB dataset) | |
| "stalled_official": "https://data.cityofnewyork.us/resource/i296-73x5.json", | |
| # BIS Job Application Filings (legacy system - for finding dormant jobs) | |
| "bis_job_filings": "https://data.cityofnewyork.us/resource/ic3t-wcy2.json", | |
| # Distressed properties sources | |
| "hpd_vacate_orders": "https://data.cityofnewyork.us/resource/tb8q-a3ar.json", | |
| "dob_ecb_violations": "https://data.cityofnewyork.us/resource/6bgk-3dad.json", | |
| "vacant_unsecured": "https://data.cityofnewyork.us/resource/br7h-6m8v.json", | |
| "dob_complaints": "https://data.cityofnewyork.us/resource/eabe-havv.json", | |
| } | |
| # Per dataset core field map | |
| DATASET_FIELD_MAP: Dict[str, Dict[str, str]] = { | |
| "job_filings": { | |
| "filing_date": "filing_date", | |
| "borough": "borough", | |
| "house_no": "house_no", | |
| "street_name": "street_name", | |
| "zip": "zip", | |
| "job_id": "job_filing_number", | |
| "job_status": "filing_status", | |
| "job_type": "job_type", | |
| "desc": "job_description", | |
| }, | |
| "permit_issuance": { | |
| "filing_date": "approved_date", | |
| "borough": "borough", | |
| "house_no": "house__", | |
| "street_name": "street_name", | |
| "zip": "zip_code", | |
| "job_id": "job__", | |
| "permit_type": "permittee_s_license_type", | |
| "desc": "job_description", | |
| }, | |
| "electrical_permits": { | |
| "filing_date": "filing_date", | |
| "borough": "borough", | |
| "house_no": "house_number", | |
| "street_name": "street_name", | |
| "zip": "zip_code", | |
| "job_id": "job_filing_number", | |
| "job_status": "filing_status", | |
| }, | |
| } | |
| # ---------- Simple in-memory cache ---------- | |
| _cache: Dict[str, Tuple[pd.DataFrame, datetime]] = {} | |
| CACHE_TTL_MINUTES = 10 | |
| def _get_cached(key: str) -> Optional[pd.DataFrame]: | |
| entry = _cache.get(key) | |
| if not entry: | |
| return None | |
| df, cached_at = entry | |
| if datetime.now() - cached_at < timedelta(minutes=CACHE_TTL_MINUTES): | |
| print(f"[cache] Using cached data for {key}") | |
| return df.copy() | |
| del _cache[key] | |
| return None | |
| def _set_cached(key: str, df: pd.DataFrame) -> None: | |
| _cache[key] = (df.copy(), datetime.now()) | |
| # ---------- helpers ---------- | |
| def _headers() -> Dict[str, str]: | |
| """ | |
| Build headers for Socrata API requests. | |
| SODA3 requires authentication via app token for all requests. | |
| """ | |
| h: Dict[str, str] = { | |
| "Accept": "application/json", | |
| } | |
| if SOCRATA_APP_TOKEN: | |
| h["X-App-Token"] = SOCRATA_APP_TOKEN | |
| return h | |
| def _request(url: str, params: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """ | |
| Make a request to the Socrata API. | |
| Handles both SODA2 and SODA3 endpoints. | |
| """ | |
| headers = _headers() | |
| # Log if no token (will likely fail on SODA3) | |
| if "X-App-Token" not in headers: | |
| print("⚠️ No SOCRATA_APP_TOKEN - request may be throttled or rejected") | |
| try: | |
| r = requests.get(url, headers=headers, params=params, timeout=60) | |
| except requests.exceptions.Timeout: | |
| raise RuntimeError(f"API request timed out for {url}") | |
| except requests.exceptions.RequestException as e: | |
| raise RuntimeError(f"API request failed: {e}") | |
| if r.status_code == 403: | |
| raise RuntimeError( | |
| f"API returned 403 Forbidden. This likely means:\n" | |
| f" 1. SOCRATA_APP_TOKEN is missing or invalid\n" | |
| f" 2. The dataset requires authentication\n" | |
| f" URL: {url}\n" | |
| f" Response: {r.text[:200]}" | |
| ) | |
| elif r.status_code == 429: | |
| raise RuntimeError( | |
| f"API rate limit exceeded (429). Set SOCRATA_APP_TOKEN for higher limits.\n" | |
| f" URL: {url}" | |
| ) | |
| elif r.status_code != 200: | |
| raise RuntimeError(f"API request failed: {r.status_code} {r.text[:500]}") | |
| return r.json() | |
| def _to_dt_naive(series: pd.Series) -> pd.Series: | |
| s = pd.to_datetime(series, errors="coerce", utc=True) | |
| return s.dt.tz_localize(None) | |
| def _norm_borough(series: pd.Series) -> pd.Series: | |
| m = { | |
| "MN": "MANHATTAN", | |
| "BX": "BRONX", | |
| "BK": "BROOKLYN", | |
| "QN": "QUEENS", | |
| "SI": "STATEN ISLAND", | |
| "1": "MANHATTAN", | |
| "2": "BRONX", | |
| "3": "BROOKLYN", | |
| "4": "QUEENS", | |
| "5": "STATEN ISLAND", | |
| } | |
| return series.astype(str).str.strip().str.upper().map(lambda x: m.get(x, x)) | |
| def _full_address( | |
| df: pd.DataFrame, | |
| house_col: str, | |
| street_col: str, | |
| borough_col: str, | |
| zip_col: str | None, | |
| ) -> pd.Series: | |
| def join(row): | |
| parts = [] | |
| h = str(row.get(house_col, "") or "").strip() | |
| s = str(row.get(street_col, "") or "").strip() | |
| b = str(row.get(borough_col, "") or "").strip() | |
| z = str(row.get(zip_col, "") or "").strip() if zip_col else "" | |
| if h: | |
| parts.append(h) | |
| if s: | |
| parts.append(s) | |
| if b: | |
| parts.append(b) | |
| if z: | |
| parts.append(z) | |
| return ", ".join(p for p in parts if p) | |
| return df.apply(join, axis=1) | |
| def _days_ago_cutoff(days: int) -> Tuple[pd.Timestamp, str]: | |
| now = pd.Timestamp.utcnow().tz_localize(None) | |
| cutoff = now - pd.Timedelta(days=days) | |
| cutoff_iso = (cutoff.tz_localize("UTC").isoformat()).replace("+00:00", "Z") | |
| return cutoff, cutoff_iso | |
| def _job_base(job_filing_number: str) -> str: | |
| if not isinstance(job_filing_number, str): | |
| return "" | |
| return job_filing_number.split("-", 1)[0].strip() | |
| def _fetch_page_parallel( | |
| url: str, | |
| params: Dict[str, Any], | |
| page: int, | |
| offset: int, | |
| ) -> Tuple[List[Dict[str, Any]], int, float]: | |
| params_copy = params.copy() | |
| params_copy["$offset"] = offset | |
| t0 = time.time() | |
| rows = _request(url, params_copy) | |
| return rows, page, time.time() - t0 | |
| def _applicant_search_url(df: pd.DataFrame) -> pd.Series: | |
| """Build a Google search URL for each applicant to help find contact info.""" | |
| import urllib.parse | |
| def make_url(row): | |
| first = str(row.get("applicant_first_name") or "").strip() | |
| last = str(row.get("applicant_last_name") or "").strip() | |
| firm = str(row.get("filing_representative_business_name") or "").strip() | |
| title = str(row.get("applicant_professional_title") or "").strip() | |
| # Build the most useful query we can from available fields | |
| if first and last: | |
| query = f'"{first} {last}" architect contact email' | |
| elif firm and firm.upper() not in ("", "PREPARER", "N/A"): | |
| query = f'"{firm}" architect engineer contact email' | |
| else: | |
| return "" | |
| return "https://www.google.com/search?q=" + urllib.parse.quote(query) | |
| return df.apply(make_url, axis=1) | |
| # ---------- CORE FETCHERS ---------- | |
| def _fetch_filings_last_days(days: int) -> pd.DataFrame: | |
| """Fetch DOB NOW job filings from last N days.""" | |
| cache_key = f"job_filings_{days}" | |
| cached = _get_cached(cache_key) | |
| if cached is not None: | |
| return cached | |
| url = DATASET_URLS["job_filings"] | |
| cutoff, cutoff_iso = _days_ago_cutoff(days) | |
| # Use string comparison for date (works for ISO format text dates) | |
| # Format: YYYY-MM-DD for string comparison | |
| cutoff_str = cutoff.strftime("%Y-%m-%d") | |
| params = { | |
| "$where": f"filing_date > '{cutoff_str}'", | |
| "$limit": 50000, | |
| "$order": "filing_date DESC", | |
| } | |
| try: | |
| data = _request(url, params) | |
| except Exception as e: | |
| print(f"[job_filings] Query failed: {e}") | |
| # Fallback: no date filter, just get recent by order | |
| print("[job_filings] Retrying without date filter...") | |
| try: | |
| params = {"$limit": 10000, "$order": "filing_date DESC"} | |
| data = _request(url, params) | |
| except Exception as e2: | |
| print(f"[job_filings] Retry also failed: {e2}") | |
| return pd.DataFrame() | |
| df = pd.DataFrame(data) | |
| if df.empty: | |
| return df | |
| print(f"[job_filings] Got {len(df)} rows") | |
| # Normalize borough | |
| if "borough" in df.columns: | |
| df["borough"] = _norm_borough(df["borough"]) | |
| # Filter to allowed boroughs | |
| if "borough" in df.columns: | |
| df = df[df["borough"].isin(ALLOWED_BOROUGHS)].copy() | |
| # Build full address | |
| df["full_address"] = _full_address(df, "house_no", "street_name", "borough", "zip") | |
| # Build Google search link for applicant contact info | |
| df["applicant_search"] = _applicant_search_url(df) | |
| # Convert filing_date to datetime | |
| if "filing_date" in df.columns: | |
| df["filing_date"] = _to_dt_naive(df["filing_date"]) | |
| # Filter by date in pandas as backup | |
| df = df[df["filing_date"] >= cutoff].copy() | |
| _set_cached(cache_key, df) | |
| return df | |
| def _fetch_permits_last_days(days: int) -> pd.DataFrame: | |
| """Fetch DOB NOW approved permits from last N days.""" | |
| cache_key = f"permit_issuance_{days}" | |
| cached = _get_cached(cache_key) | |
| if cached is not None: | |
| return cached | |
| url = DATASET_URLS["permit_issuance"] | |
| cutoff, cutoff_iso = _days_ago_cutoff(days) | |
| cutoff_str = cutoff.strftime("%Y-%m-%d") | |
| params = { | |
| "$where": f"approved_date > '{cutoff_str}'", | |
| "$limit": 50000, | |
| "$order": "approved_date DESC", | |
| } | |
| try: | |
| data = _request(url, params) | |
| except Exception as e: | |
| print(f"[permit_issuance] Query failed: {e}") | |
| print("[permit_issuance] Retrying without date filter...") | |
| try: | |
| params = {"$limit": 10000, "$order": "approved_date DESC"} | |
| data = _request(url, params) | |
| except Exception as e2: | |
| print(f"[permit_issuance] Retry also failed: {e2}") | |
| return pd.DataFrame() | |
| df = pd.DataFrame(data) | |
| if df.empty: | |
| return df | |
| print(f"[permit_issuance] Got {len(df)} rows") | |
| # Normalize borough | |
| if "borough" in df.columns: | |
| df["borough"] = _norm_borough(df["borough"]) | |
| # Filter to allowed boroughs | |
| if "borough" in df.columns: | |
| df = df[df["borough"].isin(ALLOWED_BOROUGHS)].copy() | |
| # Rename to common field name | |
| if "approved_date" in df.columns: | |
| df["filing_date"] = _to_dt_naive(df["approved_date"]) | |
| # Filter by date in pandas as backup | |
| df = df[df["filing_date"] >= cutoff].copy() | |
| # Build full address | |
| df["full_address"] = _full_address(df, "house__", "street_name", "borough", "zip_code") | |
| _set_cached(cache_key, df) | |
| return df | |
| def _fetch_electrical_last_days(days: int) -> pd.DataFrame: | |
| """Fetch DOB NOW electrical permits from last N days.""" | |
| cache_key = f"electrical_permits_{days}" | |
| cached = _get_cached(cache_key) | |
| if cached is not None: | |
| return cached | |
| url = DATASET_URLS["electrical_permits"] | |
| cutoff, cutoff_iso = _days_ago_cutoff(days) | |
| cutoff_str = cutoff.strftime("%Y-%m-%d") | |
| params = { | |
| "$where": f"filing_date > '{cutoff_str}'", | |
| "$limit": 50000, | |
| "$order": "filing_date DESC", | |
| } | |
| try: | |
| data = _request(url, params) | |
| except Exception as e: | |
| print(f"[electrical_permits] Query failed: {e}") | |
| print("[electrical_permits] Retrying without date filter...") | |
| try: | |
| params = {"$limit": 10000, "$order": "filing_date DESC"} | |
| data = _request(url, params) | |
| except Exception as e2: | |
| print(f"[electrical_permits] Retry also failed: {e2}") | |
| return pd.DataFrame() | |
| df = pd.DataFrame(data) | |
| if df.empty: | |
| return df | |
| print(f"[electrical_permits] Got {len(df)} rows") | |
| # Normalize borough | |
| if "borough" in df.columns: | |
| df["borough"] = _norm_borough(df["borough"]) | |
| # Filter to allowed boroughs | |
| if "borough" in df.columns: | |
| df = df[df["borough"].isin(ALLOWED_BOROUGHS)].copy() | |
| # Build full address | |
| df["full_address"] = _full_address(df, "house_number", "street_name", "borough", "zip_code") | |
| # Convert filing_date to datetime | |
| if "filing_date" in df.columns: | |
| df["filing_date"] = _to_dt_naive(df["filing_date"]) | |
| # Filter by date in pandas as backup | |
| df = df[df["filing_date"] >= cutoff].copy() | |
| _set_cached(cache_key, df) | |
| return df | |
| # ---------- STALLED CONSTRUCTION FETCHERS ---------- | |
| def _fetch_stalled_official() -> pd.DataFrame: | |
| """ | |
| Fetch official DOB Stalled Construction Sites complaints (i296-73x5). | |
| Strategy: | |
| 1. Paginate through ALL rows (1.4M+) | |
| 2. Deduplicate by complaint_number (keep most recent) | |
| 3. Filter to complaints received in the last 18 months | |
| 4. Filter to allowed boroughs | |
| """ | |
| cache_key = "stalled_official" | |
| cached = _get_cached(cache_key) | |
| if cached is not None: | |
| return cached | |
| url = "https://data.cityofnewyork.us/resource/i296-73x5.json" | |
| # Paginate through all data | |
| all_data = [] | |
| offset = 0 | |
| page_size = 50000 | |
| print(f"[stalled_official] Fetching all rows (paginating by {page_size})...") | |
| while True: | |
| params = { | |
| "$limit": page_size, | |
| "$offset": offset, | |
| } | |
| try: | |
| data = _request(url, params) | |
| except Exception as e: | |
| print(f"[stalled_official] API request failed at offset {offset}: {e}") | |
| break | |
| if not data: | |
| break | |
| all_data.extend(data) | |
| print(f"[stalled_official] Fetched {len(all_data)} rows so far...") | |
| if len(data) < page_size: | |
| break | |
| offset += page_size | |
| if not all_data: | |
| print("[stalled_official] No data returned from API") | |
| return pd.DataFrame() | |
| df = pd.DataFrame(all_data) | |
| print(f"[stalled_official] Total rows fetched: {len(df)}") | |
| # Deduplicate by complaint_number - keep first occurrence (arbitrary, they should be same complaint) | |
| if "complaint_number" in df.columns: | |
| before = len(df) | |
| df = df.drop_duplicates(subset=["complaint_number"], keep="first") | |
| print(f"[stalled_official] Deduplicated: {before} -> {len(df)} unique complaints") | |
| # Parse complaint date | |
| if "date_complaint_received" in df.columns: | |
| df["complaint_date"] = pd.to_datetime(df["date_complaint_received"], errors="coerce") | |
| # Filter to only complaints from the last 18 months | |
| cutoff_date = pd.Timestamp.today() - pd.DateOffset(months=18) | |
| before_filter = len(df) | |
| df = df[df["complaint_date"] >= cutoff_date].copy() | |
| print(f"[stalled_official] 18-month filter (>= {cutoff_date.date()}): {before_filter} -> {len(df)} rows") | |
| # Calculate days stalled | |
| df["days_stalled"] = (pd.Timestamp.today() - df["complaint_date"]).dt.days | |
| # Map borough to standard names | |
| boro_map = { | |
| "1": "MANHATTAN", "MANHATTAN": "MANHATTAN", "Manhattan": "MANHATTAN", | |
| "2": "BRONX", "BRONX": "BRONX", "Bronx": "BRONX", | |
| "3": "BROOKLYN", "BROOKLYN": "BROOKLYN", "Brooklyn": "BROOKLYN", | |
| "4": "QUEENS", "QUEENS": "QUEENS", "Queens": "QUEENS", | |
| "5": "STATEN ISLAND", "STATEN ISLAND": "STATEN ISLAND", "Staten Island": "STATEN ISLAND", | |
| } | |
| if "borough_name" in df.columns: | |
| df["borough"] = df["borough_name"].astype(str).str.strip().map(lambda x: boro_map.get(x, x.upper() if isinstance(x, str) else x)) | |
| # Filter to allowed boroughs | |
| if "borough" in df.columns: | |
| before_boro = len(df) | |
| df = df[df["borough"].isin(ALLOWED_BOROUGHS)].copy() | |
| print(f"[stalled_official] Borough filter: {before_boro} -> {len(df)} rows") | |
| # Build full address | |
| if "house_number" in df.columns and "street_name" in df.columns: | |
| df["full_address"] = ( | |
| df["house_number"].fillna("").astype(str).str.strip() + " " + | |
| df["street_name"].fillna("").astype(str).str.title().str.strip() + ", " + | |
| df.get("borough", "").fillna("").astype(str) | |
| ) | |
| # Sort by most stalled first (oldest complaint = most days) | |
| if "days_stalled" in df.columns: | |
| df = df.sort_values("days_stalled", ascending=False).reset_index(drop=True) | |
| print(f"[stalled_official] Final: {len(df)} active stalled sites") | |
| _set_cached(cache_key, df) | |
| return df | |
| def _fetch_likely_stalled() -> pd.DataFrame: | |
| """ | |
| Fetch stalled construction projects from DOB Stalled Construction Sites API. | |
| This is now just an alias for the official stalled feed. | |
| """ | |
| return _fetch_stalled_official() | |
| # ---------- DISTRESSED PROPERTIES ---------- | |
| def _fetch_hpd_vacate_orders() -> pd.DataFrame: | |
| """Fetch HPD Vacate Orders - buildings ordered vacated.""" | |
| cache_key = "hpd_vacate_orders" | |
| cached = _get_cached(cache_key) | |
| if cached is not None: | |
| return cached | |
| url = DATASET_URLS["hpd_vacate_orders"] | |
| # Get orders from last 18 months | |
| cutoff = (datetime.now() - timedelta(days=548)).strftime("%Y-%m-%dT00:00:00") | |
| params = { | |
| "$where": f"rescinddate IS NULL AND orderdate >= '{cutoff}'", | |
| "$limit": 50000, | |
| "$order": "orderdate DESC", | |
| } | |
| print(f"[hpd_vacate_orders] Fetching...") | |
| try: | |
| data = _request(url, params) | |
| except Exception as e: | |
| print(f"[hpd_vacate_orders] API error: {e}") | |
| return pd.DataFrame() | |
| df = pd.DataFrame(data) | |
| if df.empty: | |
| print("[hpd_vacate_orders] No data returned") | |
| return df | |
| print(f"[hpd_vacate_orders] Got {len(df)} rows") | |
| # Normalize | |
| df["distress_type"] = "HPD_VACATE" | |
| df["distress_date"] = pd.to_datetime(df.get("orderdate"), errors="coerce") | |
| # Build address | |
| if "housenumber" in df.columns and "streetname" in df.columns: | |
| df["full_address"] = ( | |
| df["housenumber"].fillna("").astype(str).str.strip() + " " + | |
| df["streetname"].fillna("").astype(str).str.title().str.strip() | |
| ) | |
| # Normalize borough | |
| if "boro" in df.columns: | |
| boro_map = {"1": "MANHATTAN", "2": "BRONX", "3": "BROOKLYN", "4": "QUEENS", "5": "STATEN ISLAND"} | |
| df["borough"] = df["boro"].astype(str).map(lambda x: boro_map.get(x, x.upper())) | |
| _set_cached(cache_key, df) | |
| return df | |
| def _fetch_dob_ecb_violations() -> pd.DataFrame: | |
| """Fetch DOB ECB Violations - filter for SWO (Stop Work Order) and WWP (Work Without Permit).""" | |
| cache_key = "dob_ecb_violations" | |
| cached = _get_cached(cache_key) | |
| if cached is not None: | |
| return cached | |
| url = DATASET_URLS["dob_ecb_violations"] | |
| # Get SWO and WWP violations from last 18 months | |
| cutoff = (datetime.now() - timedelta(days=548)).strftime("%Y-%m-%dT00:00:00") | |
| params = { | |
| "$where": f"issue_date >= '{cutoff}' AND (violation_type LIKE '%SWO%' OR violation_type LIKE '%STOP%' OR infraction_code1 LIKE '%SWO%' OR ecb_violation_status = 'OPEN')", | |
| "$limit": 50000, | |
| "$order": "issue_date DESC", | |
| } | |
| print(f"[dob_ecb_violations] Fetching SWO/WWP violations...") | |
| try: | |
| data = _request(url, params) | |
| except Exception as e: | |
| print(f"[dob_ecb_violations] API error: {e}") | |
| # Try simpler query | |
| try: | |
| params = { | |
| "$where": f"issue_date >= '{cutoff}'", | |
| "$limit": 20000, | |
| "$order": "issue_date DESC", | |
| } | |
| data = _request(url, params) | |
| except Exception as e2: | |
| print(f"[dob_ecb_violations] Retry failed: {e2}") | |
| return pd.DataFrame() | |
| df = pd.DataFrame(data) | |
| if df.empty: | |
| print("[dob_ecb_violations] No data returned") | |
| return df | |
| print(f"[dob_ecb_violations] Got {len(df)} rows") | |
| # Filter for construction-related violations (SWO, WWP, etc.) | |
| violation_keywords = ["STOP WORK", "SWO", "WITHOUT PERMIT", "WWP", "ILLEGAL", "UNSAFE"] | |
| mask = df.apply(lambda row: any( | |
| kw in str(row.get("violation_type", "")).upper() or | |
| kw in str(row.get("violation_description", "")).upper() or | |
| kw in str(row.get("infraction_code1", "")).upper() | |
| for kw in violation_keywords | |
| ), axis=1) | |
| df = df[mask].copy() | |
| print(f"[dob_ecb_violations] After keyword filter: {len(df)} rows") | |
| # Normalize | |
| df["distress_type"] = "ECB_VIOLATION" | |
| df["distress_date"] = pd.to_datetime(df.get("issue_date"), errors="coerce") | |
| # Build address | |
| if "house_number" in df.columns and "street_name" in df.columns: | |
| df["full_address"] = ( | |
| df["house_number"].fillna("").astype(str).str.strip() + " " + | |
| df["street_name"].fillna("").astype(str).str.title().str.strip() | |
| ) | |
| # Normalize borough | |
| if "boro" in df.columns: | |
| boro_map = {"1": "MANHATTAN", "2": "BRONX", "3": "BROOKLYN", "4": "QUEENS", "5": "STATEN ISLAND"} | |
| df["borough"] = df["boro"].astype(str).map(lambda x: boro_map.get(x, x.upper())) | |
| _set_cached(cache_key, df) | |
| return df | |
| def _fetch_vacant_unsecured() -> pd.DataFrame: | |
| """Fetch 311 complaints about vacant/unsecured buildings - often stalled construction.""" | |
| cache_key = "vacant_unsecured" | |
| cached = _get_cached(cache_key) | |
| if cached is not None: | |
| return cached | |
| url = DATASET_URLS["vacant_unsecured"] | |
| # Get complaints from last 18 months that are still open | |
| cutoff = (datetime.now() - timedelta(days=548)).strftime("%Y-%m-%dT00:00:00") | |
| params = { | |
| "$where": f"created_date >= '{cutoff}'", | |
| "$limit": 50000, | |
| "$order": "created_date DESC", | |
| } | |
| print(f"[vacant_unsecured] Fetching 311 complaints...") | |
| try: | |
| data = _request(url, params) | |
| except Exception as e: | |
| print(f"[vacant_unsecured] API error: {e}") | |
| return pd.DataFrame() | |
| df = pd.DataFrame(data) | |
| if df.empty: | |
| print("[vacant_unsecured] No data returned") | |
| return df | |
| print(f"[vacant_unsecured] Got {len(df)} rows") | |
| # Normalize | |
| df["distress_type"] = "VACANT_UNSECURED" | |
| df["distress_date"] = pd.to_datetime(df.get("created_date"), errors="coerce") | |
| # Build address from incident_address or components | |
| if "incident_address" in df.columns: | |
| df["full_address"] = df["incident_address"].fillna("").astype(str).str.title() | |
| # Normalize borough | |
| if "borough" in df.columns: | |
| df["borough"] = df["borough"].astype(str).str.upper() | |
| _set_cached(cache_key, df) | |
| return df | |
| def _fetch_dob_complaints() -> pd.DataFrame: | |
| """Fetch DOB complaints - filter for construction-related (illegal work, unsafe conditions).""" | |
| cache_key = "dob_complaints_distressed" | |
| cached = _get_cached(cache_key) | |
| if cached is not None: | |
| return cached | |
| url = DATASET_URLS["dob_complaints"] | |
| # Get complaints from last 18 months | |
| cutoff = (datetime.now() - timedelta(days=548)).strftime("%Y-%m-%dT00:00:00") | |
| # Complaint categories for distressed/stalled: | |
| # 05 = Illegal Conversion, 45 = Construction, 71 = SRO Work W/O Permit, 83 = Debris/Unsafe | |
| params = { | |
| "$where": f"date_entered >= '{cutoff}' AND (complaint_category IN ('05', '45', '71', '83') OR status = 'OPEN')", | |
| "$limit": 30000, | |
| "$order": "date_entered DESC", | |
| } | |
| print(f"[dob_complaints] Fetching construction-related complaints...") | |
| try: | |
| data = _request(url, params) | |
| except Exception as e: | |
| print(f"[dob_complaints] API error: {e}") | |
| # Try simpler query | |
| try: | |
| params = { | |
| "$where": f"date_entered >= '{cutoff}'", | |
| "$limit": 20000, | |
| "$order": "date_entered DESC", | |
| } | |
| data = _request(url, params) | |
| except Exception as e2: | |
| print(f"[dob_complaints] Retry failed: {e2}") | |
| return pd.DataFrame() | |
| df = pd.DataFrame(data) | |
| if df.empty: | |
| print("[dob_complaints] No data returned") | |
| return df | |
| print(f"[dob_complaints] Got {len(df)} rows") | |
| # Normalize | |
| df["distress_type"] = "DOB_COMPLAINT" | |
| df["distress_date"] = pd.to_datetime(df.get("date_entered"), errors="coerce") | |
| # Build address | |
| if "house_number" in df.columns and "street_name" in df.columns: | |
| df["full_address"] = ( | |
| df["house_number"].fillna("").astype(str).str.strip() + " " + | |
| df["street_name"].fillna("").astype(str).str.title().str.strip() | |
| ) | |
| # Normalize borough | |
| boro_map = { | |
| "MANHATTAN": "MANHATTAN", "1": "MANHATTAN", | |
| "BRONX": "BRONX", "2": "BRONX", | |
| "BROOKLYN": "BROOKLYN", "3": "BROOKLYN", | |
| "QUEENS": "QUEENS", "4": "QUEENS", | |
| "STATEN ISLAND": "STATEN ISLAND", "5": "STATEN ISLAND", | |
| } | |
| if "borough" in df.columns: | |
| df["borough"] = df["borough"].astype(str).str.upper().map(lambda x: boro_map.get(x, x)) | |
| _set_cached(cache_key, df) | |
| return df | |
| def _fetch_distressed_properties() -> pd.DataFrame: | |
| """ | |
| Aggregate distressed properties from multiple sources: | |
| - HPD Vacate Orders | |
| - DOB ECB Violations (SWO/WWP) | |
| - 311 Vacant/Unsecured complaints | |
| - DOB Complaints (construction-related) | |
| Cross-reference and score by distress level. | |
| """ | |
| cache_key = "distressed_combined" | |
| cached = _get_cached(cache_key) | |
| if cached is not None: | |
| return cached | |
| print("[distressed] Fetching from all sources...") | |
| # Fetch all sources in parallel | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: | |
| future_vacate = executor.submit(_fetch_hpd_vacate_orders) | |
| future_ecb = executor.submit(_fetch_dob_ecb_violations) | |
| future_vacant = executor.submit(_fetch_vacant_unsecured) | |
| future_complaints = executor.submit(_fetch_dob_complaints) | |
| df_vacate = future_vacate.result() | |
| df_ecb = future_ecb.result() | |
| df_vacant = future_vacant.result() | |
| df_complaints = future_complaints.result() | |
| # Standardize columns for each source | |
| all_dfs = [] | |
| # Common columns we want to keep | |
| common_cols = ["bin", "bbl", "full_address", "borough", "distress_type", "distress_date"] | |
| for df, source_name in [ | |
| (df_vacate, "HPD_VACATE"), | |
| (df_ecb, "ECB_VIOLATION"), | |
| (df_vacant, "VACANT_UNSECURED"), | |
| (df_complaints, "DOB_COMPLAINT"), | |
| ]: | |
| if df.empty: | |
| continue | |
| # Ensure we have common columns | |
| for col in common_cols: | |
| if col not in df.columns: | |
| df[col] = None | |
| df["source"] = source_name | |
| all_dfs.append(df) | |
| if not all_dfs: | |
| print("[distressed] No data from any source") | |
| return pd.DataFrame() | |
| # Combine all sources | |
| combined = pd.concat(all_dfs, ignore_index=True) | |
| print(f"[distressed] Combined total: {len(combined)} rows") | |
| # Normalize BIN for grouping | |
| if "bin" in combined.columns: | |
| combined["bin"] = combined["bin"].fillna("").astype(str).str.strip() | |
| # Group by property (using BIN or address) and count distress signals | |
| # Properties with multiple signals are higher priority | |
| # Create a property key (prefer BIN, fallback to address) | |
| combined["property_key"] = combined.apply( | |
| lambda r: str(r.get("bin", "")).strip() if str(r.get("bin", "")).strip() and str(r.get("bin", "")).strip() != "0" | |
| else str(r.get("full_address", "")).strip().upper(), | |
| axis=1 | |
| ) | |
| # Count distress signals per property | |
| distress_counts = combined.groupby("property_key").agg({ | |
| "distress_type": lambda x: ", ".join(sorted(set(x))), | |
| "source": "count", | |
| }).rename(columns={"source": "distress_count", "distress_type": "distress_types"}) | |
| # Merge counts back | |
| combined = combined.merge(distress_counts, on="property_key", how="left") | |
| # Dedupe - keep one row per property with most recent distress date | |
| combined = combined.sort_values("distress_date", ascending=False) | |
| combined = combined.drop_duplicates(subset=["property_key"], keep="first") | |
| # Calculate days since distress | |
| combined["days_since_distress"] = (pd.Timestamp.today() - combined["distress_date"]).dt.days | |
| # Score: more distress signals = higher score | |
| combined["distress_score"] = combined["distress_count"].fillna(1).astype(int) | |
| # Sort by score (desc) then by date (most recent first within same score) | |
| combined = combined.sort_values( | |
| ["distress_score", "distress_date"], | |
| ascending=[False, False] | |
| ).reset_index(drop=True) | |
| # Filter to allowed boroughs | |
| if "borough" in combined.columns: | |
| combined = combined[combined["borough"].isin(ALLOWED_BOROUGHS)].copy() | |
| print(f"[distressed] Final: {len(combined)} unique distressed properties") | |
| _set_cached(cache_key, combined) | |
| return combined | |
| # ---------- LEADS UNPERMITTED ---------- | |
| def _fetch_leads_unpermitted(days: int = DEFAULT_DAYS_WINDOW) -> Tuple[pd.DataFrame, float]: | |
| """ | |
| Find filings that don't have corresponding permits yet. | |
| Cross-reference job_filings with permit_issuance. | |
| """ | |
| t0 = time.time() | |
| # Get filings | |
| filings_df = _fetch_filings_last_days(days) | |
| if filings_df.empty: | |
| return pd.DataFrame(), time.time() - t0 | |
| # Get permits | |
| permits_df = _fetch_permits_last_days(days) | |
| # Extract base job numbers for matching | |
| if "job_filing_number" in filings_df.columns: | |
| filings_df["_job_base"] = filings_df["job_filing_number"].apply(_job_base) | |
| else: | |
| filings_df["_job_base"] = "" | |
| # Get permitted job bases | |
| permitted_jobs = set() | |
| if not permits_df.empty and "job__" in permits_df.columns: | |
| permitted_jobs = set(permits_df["job__"].dropna().astype(str).str.strip()) | |
| # Filter to unpermitted filings | |
| mask = ~filings_df["_job_base"].isin(permitted_jobs) | |
| unpermitted = filings_df[mask].copy() | |
| # Drop helper column | |
| unpermitted.drop(columns=["_job_base"], inplace=True, errors="ignore") | |
| return unpermitted, time.time() - t0 | |
| # ---------- public API ---------- | |
| class SocrataClient: | |
| def __init__(self) -> None: | |
| if not SOCRATA_APP_TOKEN: | |
| print("⚠️ SOCRATA_APP_TOKEN not set – API may cap at 1,000 rows.") | |
| def fetch_dataset_last_n_days( | |
| self, | |
| dataset_key: str, | |
| days: int, | |
| ) -> Tuple[pd.DataFrame, float]: | |
| t0 = time.time() | |
| if dataset_key == "job_filings": | |
| df = _fetch_filings_last_days(days) | |
| elif dataset_key == "permit_issuance": | |
| df = _fetch_permits_last_days(days) | |
| elif dataset_key == "electrical_permits": | |
| df = _fetch_electrical_last_days(days) | |
| elif dataset_key == "stalled_official": | |
| df = _fetch_stalled_official() | |
| elif dataset_key == "likely_stalled": | |
| df = _fetch_likely_stalled() | |
| elif dataset_key == "distressed_properties": | |
| df = _fetch_distressed_properties() | |
| else: | |
| raise ValueError(f"Unknown dataset: {dataset_key}") | |
| return df, time.time() - t0 | |
| def fetch_leads_unpermitted( | |
| self, | |
| days: int = DEFAULT_DAYS_WINDOW, | |
| ) -> Tuple[pd.DataFrame, float]: | |
| return _fetch_leads_unpermitted(days) |