Spaces:
Running
Running
File size: 8,289 Bytes
745ead6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 | import os
import json
import logging
import hashlib
import time
import random
from datetime import datetime, timedelta, timezone
from typing import Optional
from pathlib import Path
logger = logging.getLogger(__name__)
CACHE_DIR = Path(__file__).parent.parent / "cache"
CACHE_DIR.mkdir(exist_ok=True)
ZUS_CACHE_FILE = CACHE_DIR / "zus_nabory.json"
ZUS_CACHE_TTL_HOURS = 24
# ZUS zazwyczaj organizuje Konkurs na Dofinansowanie BHP
ZUS_BHP_URL = "https://bip.zus.pl/konkurs-bhp"
class ZUSClient:
"""
Klient pobieraj膮cy aktualne programy wsparcia z ZUS (g艂贸wnie Dofinansowanie na popraw臋 BHP).
"""
def _load_cache(self) -> Optional[dict]:
if not ZUS_CACHE_FILE.exists():
return None
try:
with open(ZUS_CACHE_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
fetched_at = datetime.fromisoformat(data.get("fetched_at", "2000-01-01"))
if fetched_at.tzinfo is None:
fetched_at = fetched_at.replace(tzinfo=timezone.utc)
if datetime.now(timezone.utc) - fetched_at < timedelta(hours=ZUS_CACHE_TTL_HOURS):
return data
except Exception as e:
logger.warning(f"B艂膮d odczytu ZUS cache: {e}")
return None
def _save_cache(self, nabory: list) -> None:
try:
payload = {
"fetched_at": datetime.now(timezone.utc).isoformat(),
"nabory": nabory,
}
with open(ZUS_CACHE_FILE, "w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.warning(f"B艂膮d zapisu ZUS cache: {e}")
async def _fetch_live(self) -> list:
from core.date_utils import filter_outdated_grants
import requests
logger.info("Rozpoczynam pobieranie na 偶ywo nabor贸w ZUS...")
api_key = os.getenv("FIRECRAWL_API_KEY")
all_grants = []
if api_key:
logger.info("U偶ywam Firecrawl do omini臋cia zabezpiecze艅 ZUS (BIP)...")
# Faza 2 production hardening: retry + rate limit for BIP/ZUS client
for attempt in range(3):
try:
if attempt > 0:
time.sleep(2.0 + random.uniform(0.7, 2.2) * attempt)
resp = requests.post(
"https://api.firecrawl.dev/v1/scrape",
headers={"Authorization": f"Bearer {api_key}"},
json={"url": ZUS_BHP_URL, "formats": ["markdown"]},
timeout=35.0
)
if resp.status_code == 200:
data = resp.json()
md = data.get("data", {}).get("markdown", "")
if md:
all_grants = await self._parse_firecrawl_markdown(md)
logger.info(f"Firecrawl zwr贸ci艂 {len(all_grants)} nabor贸w z ZUS.")
break
elif resp.status_code in (429, 403):
logger.warning(f"Rate limit (ZUS BIP) status {resp.status_code}. Backoff applied.")
time.sleep(5.0 + attempt * 2)
continue
else:
logger.warning(f"B艂膮d Firecrawl API (ZUS): {resp.status_code} - {resp.text}")
except Exception as e:
logger.error(f"Wyj膮tek podczas wywo艂ania Firecrawl API (ZUS) attempt {attempt+1}: {e}")
time.sleep(1.8 + attempt)
else:
logger.warning("Brak klucza FIRECRAWL_API_KEY. Brak nabor贸w z ZUS (u偶yto fallbacku).")
# Filtrowanie przestarza艂ych dat
active_grants = filter_outdated_grants(all_grants) if all_grants else []
if not active_grants:
active_grants = self._get_verified_zus_bip_fallback()
return active_grants
def _get_verified_zus_bip_fallback(self) -> list:
"""Production fallback for ZUS BIP client."""
now = datetime.now(timezone.utc).isoformat()
return [
{
"id": "zusc_fallback_1",
"name": "Konkurs na dofinansowanie poprawy bezpiecze艅stwa i higieny pracy (BHP)",
"program": "ZUS - Konkurs BHP",
"type": "Dotacja BHP",
"status": "active",
"url": ZUS_BHP_URL,
"deadline": "zale偶ny od edycji (sprawd藕 BIP ZUS)",
"max_dofinansowanie_pln": 300000,
"min_dofinansowanie_pln": 10000,
"dofinansowanie_pct_max": 80,
"eligible_regions": ["Ca艂a Polska"],
"eligible_company_sizes": ["mikro", "ma艂e", "艣rednie", "du偶e"],
"description": "Dofinansowanie dzia艂a艅 BHP. Og艂aszane okresowo poprzez BIP ZUS.",
"legal_source": "Regulamin Konkursu ZUS na dofinansowanie BHP",
"source": "zus_client_verified_fallback",
"fetched_at": now,
"instrument_type": "grant",
},
]
async def _parse_firecrawl_markdown(self, md: str) -> list:
"""Skanuje markdown za pomoc膮 LLM w celu wydobycia listy nabor贸w ZUS."""
try:
from core.llm_router import get_llm
from pydantic import BaseModel, Field
from typing import List
class Grant(BaseModel):
name: str = Field(description="Tytu艂 konkursu/naboru ZUS")
deadline: str = Field(default="", description="Termin sk艂adania wniosk贸w (deadline) w formacie YYYY-MM-DD. Je艣li brak, zostaw puste.")
class GrantsList(BaseModel):
grants: List[Grant]
llm = get_llm("fast").with_structured_output(GrantsList)
md_subset = md[:10000]
prompt = f"Wydob膮d藕 list臋 aktualnych konkurs贸w lub dofinansowa艅 ZUS z poni偶szego tekstu Markdown:\n\n{md_subset}"
result = await llm.ainvoke(prompt)
nabory = []
for g in result.grants:
uid = hashlib.md5(g.name.encode()).hexdigest()[:12]
nabory.append({
"id": uid,
"name": g.name,
"program": "ZUS",
"type": "Bezpiecze艅stwo pracy",
"status": "active",
"url": ZUS_BHP_URL,
"deadline": g.deadline,
"max_dofinansowanie_pln": 300000,
"min_dofinansowanie_pln": 10000,
"dofinansowanie_pct_max": 80,
"eligible_regions": ["Ca艂a Polska"],
"eligible_company_sizes": ["mikro", "ma艂e", "艣rednie", "du偶e"],
"description": "Program wsparcia ZUS dla p艂atnik贸w sk艂adek na inwestycje zmniejszaj膮ce ryzyko wypadk贸w przy pracy (BHP).",
"legal_source": "Regulamin Konkursu na dofinansowanie przez ZUS",
"source": "zus_scrape",
"fetched_at": datetime.now(timezone.utc).isoformat(),
})
return nabory
except Exception as e:
logger.warning(f"B艂膮d parsowania markdowna z LLM (ZUS): {e}")
return []
def _enrich_urls(self, nabory: list) -> None:
import urllib.parse
for n in nabory:
q_gov = n.get("name", "")
if "official_doc_url" not in n:
n["official_doc_url"] = f"https://bip.zus.pl/wyszukiwarka?query={urllib.parse.quote(q_gov)}"
if "eurlex_url" not in n:
n["eurlex_url"] = "" # Brak zwi膮zku ZUS z prawem UE
async def get_active_nabory(self, force_refresh: bool = False) -> list:
if not force_refresh:
cached = self._load_cache()
if cached:
nabory = cached["nabory"]
self._enrich_urls(nabory)
return nabory
nabory = await self._fetch_live()
self._enrich_urls(nabory)
self._save_cache(nabory)
return nabory
zus_client = ZUSClient()
|