| """
|
| Auto Discovery Service
|
| ----------------------
|
| جستجوی خودکار منابع API رایگان با استفاده از موتور جستجوی DuckDuckGo و
|
| تحلیل خروجی توسط مدلهای Hugging Face.
|
| """
|
|
|
| from __future__ import annotations
|
|
|
| import asyncio
|
| import inspect
|
| import json
|
| import logging
|
| import os
|
| import re
|
| from dataclasses import dataclass
|
| from datetime import datetime
|
| from typing import Any, Dict, List, Optional
|
| from contextlib import AsyncExitStack
|
|
|
| try:
|
| from duckduckgo_search import AsyncDDGS
|
| except ImportError:
|
| AsyncDDGS = None
|
|
|
| try:
|
| from huggingface_hub import InferenceClient
|
| except ImportError:
|
| InferenceClient = None
|
|
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| @dataclass
|
| class DiscoveryResult:
|
| """نتیجهٔ نهایی جستجو و تحلیل"""
|
|
|
| provider_id: str
|
| name: str
|
| category: str
|
| base_url: str
|
| requires_auth: bool
|
| description: str
|
| source_url: str
|
|
|
|
|
| class AutoDiscoveryService:
|
| """
|
| سرویس جستجوی خودکار منابع.
|
|
|
| این سرویس:
|
| 1. با استفاده از DuckDuckGo نتایج مرتبط با APIهای رایگان را جمعآوری میکند.
|
| 2. متن نتایج را به مدل Hugging Face میفرستد تا پیشنهادهای ساختاریافته بازگردد.
|
| 3. پیشنهادهای معتبر را به ResourceManager اضافه میکند و در صورت تأیید، ProviderManager را ریفرش میکند.
|
| """
|
|
|
| DEFAULT_QUERIES: List[str] = [
|
| "free cryptocurrency market data api",
|
| "open blockchain explorer api free tier",
|
| "free defi protocol api documentation",
|
| "open source sentiment analysis crypto api",
|
| "public nft market data api no api key",
|
| ]
|
|
|
| def __init__(
|
| self,
|
| resource_manager,
|
| provider_manager,
|
| enabled: bool = True,
|
| ):
|
| self.resource_manager = resource_manager
|
| self.provider_manager = provider_manager
|
| self.enabled = enabled and os.getenv("ENABLE_AUTO_DISCOVERY", "true").lower() == "true"
|
| self.interval_seconds = int(os.getenv("AUTO_DISCOVERY_INTERVAL_SECONDS", "43200"))
|
| self.hf_model = os.getenv("AUTO_DISCOVERY_HF_MODEL", "HuggingFaceH4/zephyr-7b-beta")
|
| self.max_candidates_per_query = int(os.getenv("AUTO_DISCOVERY_MAX_RESULTS", "8"))
|
| self._hf_client: Optional[InferenceClient] = None
|
| self._running_task: Optional[asyncio.Task] = None
|
| self._last_run_summary: Optional[Dict[str, Any]] = None
|
|
|
| if not self.enabled:
|
| logger.info("Auto discovery service disabled via configuration.")
|
| return
|
|
|
| if AsyncDDGS is None:
|
| logger.warning("duckduckgo-search package not available. Disabling auto discovery.")
|
| self.enabled = False
|
| return
|
|
|
| if InferenceClient is None:
|
| logger.warning("huggingface-hub package not available. Auto discovery will use fallback heuristics.")
|
| else:
|
|
|
| from config import get_settings
|
| settings = get_settings()
|
| hf_token = os.getenv("HF_TOKEN") or os.getenv("HF_API_TOKEN") or settings.hf_token or "hf_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
| try:
|
| self._hf_client = InferenceClient(model=self.hf_model, token=hf_token)
|
| logger.info("Auto discovery Hugging Face client initialized with model %s", self.hf_model)
|
| except Exception as exc:
|
| logger.error("Failed to initialize Hugging Face client: %s", exc)
|
| self._hf_client = None
|
|
|
| async def start(self):
|
| """شروع سرویس و ساخت حلقهٔ دورهای."""
|
| if not self.enabled:
|
| return
|
| if self._running_task and not self._running_task.done():
|
| return
|
| self._running_task = asyncio.create_task(self._run_periodic_loop())
|
| logger.info("Auto discovery service started with interval %s seconds", self.interval_seconds)
|
|
|
| async def stop(self):
|
| """توقف سرویس."""
|
| if self._running_task:
|
| self._running_task.cancel()
|
| try:
|
| await self._running_task
|
| except asyncio.CancelledError:
|
| pass
|
| self._running_task = None
|
| logger.info("Auto discovery service stopped.")
|
|
|
| async def trigger_manual_discovery(self) -> Dict[str, Any]:
|
| """اجرای دستی یک چرخهٔ کشف."""
|
| if not self.enabled:
|
| return {"status": "disabled"}
|
| summary = await self._run_discovery_cycle()
|
| return {"status": "completed", "summary": summary}
|
|
|
| def get_status(self) -> Dict[str, Any]:
|
| """وضعیت آخرین اجرا."""
|
| return {
|
| "enabled": self.enabled,
|
| "model": self.hf_model if self._hf_client else None,
|
| "interval_seconds": self.interval_seconds,
|
| "last_run": self._last_run_summary,
|
| }
|
|
|
| async def _run_periodic_loop(self):
|
| """حلقهٔ اجرای دورهای."""
|
| while self.enabled:
|
| try:
|
| await self._run_discovery_cycle()
|
| except Exception as exc:
|
| logger.exception("Auto discovery cycle failed: %s", exc)
|
| await asyncio.sleep(self.interval_seconds)
|
|
|
| async def _run_discovery_cycle(self) -> Dict[str, Any]:
|
| """یک چرخه کامل جستجو، تحلیل و ثبت."""
|
| started_at = datetime.utcnow().isoformat()
|
| candidates = await self._gather_candidates()
|
| structured = await self._infer_candidates(candidates)
|
| persisted = await self._persist_candidates(structured)
|
|
|
| summary = {
|
| "started_at": started_at,
|
| "finished_at": datetime.utcnow().isoformat(),
|
| "candidates_seen": len(candidates),
|
| "suggested": len(structured),
|
| "persisted": len(persisted),
|
| "persisted_ids": [item.provider_id for item in persisted],
|
| }
|
| self._last_run_summary = summary
|
|
|
| logger.info(
|
| "Auto discovery cycle completed. candidates=%s suggested=%s persisted=%s",
|
| summary["candidates_seen"],
|
| summary["suggested"],
|
| summary["persisted"],
|
| )
|
| return summary
|
|
|
| async def _gather_candidates(self) -> List[Dict[str, Any]]:
|
| """جمعآوری نتایج موتور جستجو."""
|
| if not self.enabled or AsyncDDGS is None:
|
| return []
|
|
|
| results: List[Dict[str, Any]] = []
|
| queries = os.getenv("AUTO_DISCOVERY_QUERIES")
|
| if queries:
|
| query_list = [q.strip() for q in queries.split(";") if q.strip()]
|
| else:
|
| query_list = self.DEFAULT_QUERIES
|
|
|
| try:
|
| async with AsyncExitStack() as stack:
|
| ddgs = await stack.enter_async_context(AsyncDDGS())
|
|
|
| for query in query_list:
|
| try:
|
| text_method = getattr(ddgs, "atext", None)
|
| if callable(text_method):
|
| async for entry in text_method(
|
| query,
|
| max_results=self.max_candidates_per_query,
|
| ):
|
| results.append(
|
| {
|
| "query": query,
|
| "title": entry.get("title", ""),
|
| "url": entry.get("href") or entry.get("url") or "",
|
| "snippet": entry.get("body", ""),
|
| }
|
| )
|
| continue
|
|
|
| text_method = getattr(ddgs, "text", None)
|
| if not callable(text_method):
|
| raise AttributeError("AsyncDDGS has no 'atext' or 'text' method")
|
|
|
| search_result = text_method(
|
| query,
|
| max_results=self.max_candidates_per_query,
|
| )
|
|
|
| if inspect.isawaitable(search_result):
|
| search_result = await search_result
|
|
|
| if hasattr(search_result, "__aiter__"):
|
| async for entry in search_result:
|
| results.append(
|
| {
|
| "query": query,
|
| "title": entry.get("title", ""),
|
| "url": entry.get("href") or entry.get("url") or "",
|
| "snippet": entry.get("body", ""),
|
| }
|
| )
|
| else:
|
| iterable = (
|
| search_result
|
| if isinstance(search_result, list)
|
| else list(search_result or [])
|
| )
|
| for entry in iterable:
|
| results.append(
|
| {
|
| "query": query,
|
| "title": entry.get("title", ""),
|
| "url": entry.get("href") or entry.get("url") or "",
|
| "snippet": entry.get("body", ""),
|
| }
|
| )
|
| except Exception as exc:
|
| logger.warning(
|
| "Failed to fetch results for query '%s': %s. Skipping remaining queries this cycle.",
|
| query,
|
| exc,
|
| )
|
| break
|
| except Exception as exc:
|
| logger.warning(
|
| "DuckDuckGo auto discovery unavailable (%s). Skipping discovery cycle.",
|
| exc,
|
| )
|
| finally:
|
| close_method = getattr(ddgs, "close", None) if "ddgs" in locals() else None
|
| if inspect.iscoroutinefunction(close_method):
|
| try:
|
| await close_method()
|
| except Exception:
|
| pass
|
| elif callable(close_method):
|
| try:
|
| close_method()
|
| except Exception:
|
| pass
|
|
|
| return results
|
|
|
| async def _infer_candidates(self, candidates: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
| """تحلیل نتایج با مدل Hugging Face یا قواعد ساده."""
|
| if not candidates:
|
| return []
|
|
|
| if self._hf_client:
|
| prompt = self._build_prompt(candidates)
|
| try:
|
| response = await asyncio.to_thread(
|
| self._hf_client.text_generation,
|
| prompt,
|
| max_new_tokens=512,
|
| temperature=0.1,
|
| top_p=0.9,
|
| repetition_penalty=1.1,
|
| )
|
| return self._parse_model_response(response)
|
| except Exception as exc:
|
| logger.warning("Hugging Face inference failed: %s", exc)
|
|
|
|
|
| return self._rule_based_filter(candidates)
|
|
|
| def _build_prompt(self, candidates: List[Dict[str, Any]]) -> str:
|
| """ساخت پرامپت برای مدل LLM."""
|
| context_lines = []
|
| for idx, item in enumerate(candidates, start=1):
|
| context_lines.append(
|
| f"{idx}. Title: {item.get('title')}\n"
|
| f" URL: {item.get('url')}\n"
|
| f" Snippet: {item.get('snippet')}"
|
| )
|
|
|
| return (
|
| "You are an expert agent that extracts publicly accessible API providers for cryptocurrency, "
|
| "blockchain, DeFi, sentiment, NFT or analytics data. From the context entries, select candidates "
|
| "that represent real API services which are freely accessible (free tier or free plan). "
|
| "Return ONLY a JSON array. Each entry MUST include keys: "
|
| "id (lowercase snake_case), name, base_url, category (one of: market_data, blockchain_explorers, "
|
| "defi, sentiment, nft, analytics, news, rpc, huggingface, whale_tracking, onchain_analytics, custom), "
|
| "requires_auth (boolean), description (short string), source_url (string). "
|
| "Do not invent APIs. Ignore SDKs, articles, or paid-only services. "
|
| "If no valid candidate exists, return an empty JSON array.\n\n"
|
| "Context:\n"
|
| + "\n".join(context_lines)
|
| )
|
|
|
| def _parse_model_response(self, response: str) -> List[Dict[str, Any]]:
|
| """تبدیل پاسخ مدل به ساختار داده."""
|
| try:
|
| match = re.search(r"\[.*\]", response, re.DOTALL)
|
| if not match:
|
| logger.debug("Model response did not contain JSON array.")
|
| return []
|
| data = json.loads(match.group(0))
|
| if isinstance(data, list):
|
| return [item for item in data if isinstance(item, dict)]
|
| return []
|
| except json.JSONDecodeError:
|
| logger.debug("Failed to decode model JSON response.")
|
| return []
|
|
|
| def _rule_based_filter(self, candidates: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
| """فیلتر ساده در صورت در دسترس نبودن مدل."""
|
| structured: List[Dict[str, Any]] = []
|
| for item in candidates:
|
| url = item.get("url", "")
|
| snippet = (item.get("snippet") or "").lower()
|
| title = (item.get("title") or "").lower()
|
| if not url or "github" in url:
|
| continue
|
| if "api" not in title and "api" not in snippet:
|
| continue
|
| if any(keyword in snippet for keyword in ["pricing", "paid plan", "enterprise only"]):
|
| continue
|
| provider_id = self._normalize_id(item.get("title") or url)
|
| structured.append(
|
| {
|
| "id": provider_id,
|
| "name": item.get("title") or provider_id,
|
| "base_url": url,
|
| "category": "custom",
|
| "requires_auth": "token" in snippet or "apikey" in snippet,
|
| "description": item.get("snippet", ""),
|
| "source_url": url,
|
| }
|
| )
|
| return structured
|
|
|
| async def _persist_candidates(self, structured: List[Dict[str, Any]]) -> List[DiscoveryResult]:
|
| """ذخیرهٔ پیشنهادهای معتبر."""
|
| persisted: List[DiscoveryResult] = []
|
| if not structured:
|
| return persisted
|
|
|
| for entry in structured:
|
| provider_id = self._normalize_id(entry.get("id") or entry.get("name"))
|
| base_url = entry.get("base_url", "")
|
|
|
| if not base_url.startswith(("http://", "https://")):
|
| continue
|
|
|
| if self.resource_manager.get_provider(provider_id):
|
| continue
|
|
|
| provider_data = {
|
| "id": provider_id,
|
| "name": entry.get("name", provider_id),
|
| "category": entry.get("category", "custom"),
|
| "base_url": base_url,
|
| "requires_auth": bool(entry.get("requires_auth")),
|
| "priority": 4,
|
| "weight": 40,
|
| "notes": entry.get("description", ""),
|
| "docs_url": entry.get("source_url", base_url),
|
| "free": True,
|
| "endpoints": {},
|
| }
|
|
|
| is_valid, message = self.resource_manager.validate_provider(provider_data)
|
| if not is_valid:
|
| logger.debug("Skipping provider %s: %s", provider_id, message)
|
| continue
|
|
|
| await asyncio.to_thread(self.resource_manager.add_provider, provider_data)
|
| persisted.append(
|
| DiscoveryResult(
|
| provider_id=provider_id,
|
| name=provider_data["name"],
|
| category=provider_data["category"],
|
| base_url=provider_data["base_url"],
|
| requires_auth=provider_data["requires_auth"],
|
| description=provider_data["notes"],
|
| source_url=provider_data["docs_url"],
|
| )
|
| )
|
|
|
| if persisted:
|
| await asyncio.to_thread(self.resource_manager.save_resources)
|
| await asyncio.to_thread(self.provider_manager.load_config)
|
| logger.info("Persisted %s new providers.", len(persisted))
|
|
|
| return persisted
|
|
|
| @staticmethod
|
| def _normalize_id(raw_value: Optional[str]) -> str:
|
| """تبدیل نام به شناسهٔ مناسب."""
|
| if not raw_value:
|
| return "unknown_provider"
|
| cleaned = re.sub(r"[^a-zA-Z0-9]+", "_", raw_value).strip("_").lower()
|
| return cleaned or "unknown_provider"
|
|
|
|
|