insighthub-backend / app /datasets /base_loader.py
im1d's picture
initial deploy
72d2ab9
"""
ุงู„ูˆุงุฌู‡ุฉ ุงู„ุฃุณุงุณูŠุฉ ู„ูƒู„ dataset loader.
ูƒู„ loader ูŠุณุชู„ู… DatasetMeta ูˆูŠูุฑุฌุน iterator ูŠุนุทูŠ ุตููˆู ู…ูˆุญูŽู‘ุฏุฉ
ุจุงู„ุดูƒู„ ุงู„ู‚ูŠุงุณูŠ:
{
"text": str, # ุงู„ู†ุต ุงู„ุฃุตู„ูŠ (ุฅู„ุฒุงู…ูŠ)
"created_at": str, # ISO timestamp ุฃูˆ None
"user_id": str, # ู…ุนุฑู‘ู ุงู„ู…ุณุชุฎุฏู… ุฃูˆ None
"user_handle": str, # @handle ุฃูˆ None
"user_followers": int,
"likes": int,
"retweets": int,
"replies": int,
"lang": str,
"label": Any, # ุชุตู†ูŠู ู…ู† ุงู„ู€ dataset ุฅู† ูˆูุฌุฏ
"source_dataset": str # id ุงู„ู€ dataset (ูŠูุถุงู ุชู„ู‚ุงุฆูŠุงู‹)
}
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any, Dict, Iterator, Optional
from app.datasets.registry import DatasetMeta, FieldMapping
# ุงู„ุดูƒู„ ุงู„ู‚ูŠุงุณูŠ ู„ูƒู„ ุตู ุจุนุฏ ุงู„ุชุทุจูŠุน
CANONICAL_KEYS = [
"text",
"created_at",
"user_id",
"user_handle",
"user_followers",
"likes",
"retweets",
"replies",
"lang",
"label",
"source_dataset",
]
def to_canonical(raw: Dict[str, Any], meta: DatasetMeta) -> Optional[Dict[str, Any]]:
"""
ูŠุญูˆู‘ู„ ุตูุงู‹ ู…ู† ุงู„ุดูƒู„ ุงู„ุฃุตู„ูŠ ู„ู„ู€ dataset ุฅู„ู‰ ุงู„ุดูƒู„ ุงู„ู‚ูŠุงุณูŠ.
ูŠูุฑุฌุน None ุฅุฐุง ูƒุงู† ุงู„ู†ุต ูุงุฑุบุงู‹.
"""
fm: FieldMapping = meta.fields
text = raw.get(fm.text)
if not text or not isinstance(text, str) or not text.strip():
return None
def _get(field: Optional[str], default: Any = None) -> Any:
if not field:
return default
return raw.get(field, default)
return {
"text": text.strip(),
"created_at": _get(fm.created_at),
"user_id": _get(fm.user_id),
"user_handle": _get(fm.user_handle),
"user_followers": _safe_int(_get(fm.user_followers)),
"likes": _safe_int(_get(fm.likes)),
"retweets": _safe_int(_get(fm.retweets)),
"replies": _safe_int(_get(fm.replies)),
"lang": _get(fm.lang, meta.language),
"label": _get(fm.label),
"source_dataset": meta.id,
}
def _safe_int(value: Any) -> int:
if value is None:
return 0
try:
return int(value)
except (ValueError, TypeError):
return 0
class BaseLoader(ABC):
"""ู‚ุงุนุฏุฉ ูƒู„ loader - ูŠุฌุจ ุชู†ููŠุฐ stream()."""
def __init__(self, meta: DatasetMeta):
self.meta = meta
@abstractmethod
def stream(
self,
max_rows: Optional[int] = None,
) -> Iterator[Dict[str, Any]]:
"""ูŠูู†ุชุฌ ุตููˆูุงู‹ ุจุงู„ุดูƒู„ ุงู„ู‚ูŠุงุณูŠ. lazy - ู„ุง ูŠุญู…ู‘ู„ ูƒู„ ุดูŠุก ููŠ ุงู„ุฐุงูƒุฑุฉ."""
def take(self, n: int) -> list[Dict[str, Any]]:
"""ู…ุณุงุนุฏ ู„ุฌู„ุจ ุฃูˆู„ n ุตู."""
out: list[Dict[str, Any]] = []
for row in self.stream(max_rows=n):
out.append(row)
if len(out) >= n:
break
return out