|
|
from dataclasses import asdict |
|
|
from datetime import datetime |
|
|
import io |
|
|
import json |
|
|
import os |
|
|
import time |
|
|
from typing import Any, Callable, Dict, Iterable, List, Optional, Union |
|
|
from pathlib import Path |
|
|
from click import Tuple |
|
|
from pymongo import MongoClient, UpdateOne, errors |
|
|
|
|
|
from .dto.stream_opts import StreamOptions |
|
|
|
|
|
from .dto.recipe_doc import RecipeDoc |
|
|
|
|
|
from .soup_client import SoupClient |
|
|
from backend.utils.sanitization import clean |
|
|
from bs4 import BeautifulSoup |
|
|
from backend.config.database import db_settings |
|
|
|
|
|
class JsonArraySink: |
|
|
""" |
|
|
Append-safe JSON array writer. |
|
|
- Creates file with `[` ... `]` |
|
|
- If file exists, removes trailing `]`, appends items, and re-closes. |
|
|
""" |
|
|
def __init__(self, path: str): |
|
|
self.path = path |
|
|
self._opened = False |
|
|
self._first = True |
|
|
self.f = None |
|
|
|
|
|
def _prepare(self): |
|
|
if self._opened: |
|
|
return |
|
|
|
|
|
|
|
|
if not os.path.exists(self.path): |
|
|
with open(self.path, "w", encoding="utf-8") as f: |
|
|
f.write("[\n]") |
|
|
|
|
|
self.f = open(self.path, "r+", encoding="utf-8") |
|
|
|
|
|
|
|
|
self.f.seek(0, io.SEEK_END) |
|
|
end = self.f.tell() |
|
|
step = min(4096, end) |
|
|
pos = end |
|
|
last_bracket = -1 |
|
|
while pos > 0: |
|
|
pos = max(0, pos - step) |
|
|
self.f.seek(pos) |
|
|
chunk = self.f.read(step) |
|
|
j = chunk.rfind("]") |
|
|
if j != -1: |
|
|
last_bracket = pos + j |
|
|
break |
|
|
|
|
|
if last_bracket == -1: |
|
|
|
|
|
self.f.seek(0); self.f.truncate(0); self.f.write("[\n]"); self.f.flush() |
|
|
last_bracket = 2 |
|
|
|
|
|
|
|
|
self.f.seek(0) |
|
|
prefix = self.f.read(last_bracket).strip() |
|
|
|
|
|
self._first = (prefix == "[") |
|
|
|
|
|
|
|
|
self.f.seek(last_bracket) |
|
|
self.f.truncate() |
|
|
|
|
|
self._opened = True |
|
|
|
|
|
def write_many(self, docs: List[Dict[str, Any]]): |
|
|
if not docs: |
|
|
return |
|
|
self._prepare() |
|
|
|
|
|
for d in docs: |
|
|
if not self._first: |
|
|
self.f.write(",\n") |
|
|
else: |
|
|
|
|
|
self._first = False |
|
|
self.f.write(json.dumps(d, ensure_ascii=False, indent=2, default=str)) |
|
|
|
|
|
|
|
|
self.f.write("\n]") |
|
|
self.f.flush() |
|
|
|
|
|
def close(self): |
|
|
if self.f: |
|
|
self.f.close() |
|
|
self._opened = False |
|
|
|
|
|
class MongoSink: |
|
|
def __init__(self, ): |
|
|
db_config = db_settings.get_vector_store_config() |
|
|
self.client = MongoClient(db_config["uri"], retryWrites=True, serverSelectionTimeoutMS=10000) |
|
|
self.col = self.client[db_config["database"]][db_config["collection_name"]] |
|
|
self._ensure_indexes() |
|
|
|
|
|
def _ensure_indexes(self): |
|
|
self.col.create_index("url", unique=True) |
|
|
self.col.create_index("title") |
|
|
self.col.create_index("category") |
|
|
self.col.create_index("scraped_at") |
|
|
|
|
|
def write_many(self, docs: List[Dict[str, Any]]): |
|
|
if not docs: return |
|
|
ops = [] |
|
|
now = datetime.utcnow() |
|
|
for d in docs: |
|
|
d = d.copy() |
|
|
d.setdefault("scraped_at", now) |
|
|
ops.append(UpdateOne({"url": d["url"]}, {"$set": d, "$setOnInsert": {"created_at": now}}, upsert=True)) |
|
|
try: |
|
|
self.col.bulk_write(ops, ordered=False) |
|
|
except errors.BulkWriteError as e: |
|
|
|
|
|
pass |
|
|
|
|
|
def close(self): |
|
|
self.client.close() |
|
|
|
|
|
|
|
|
|
|
|
class DualSink: |
|
|
def __init__(self, json_sink: Optional[JsonArraySink], mongo_sink: Optional[MongoSink]): |
|
|
self.json = json_sink |
|
|
self.mongo = mongo_sink |
|
|
def write_many(self, docs: List[Dict[str, Any]]): |
|
|
if self.json: self.json.write_many(docs) |
|
|
if self.mongo: self.mongo.upsert_batch(docs) |
|
|
def close(self): |
|
|
if self.json: self.json.close() |
|
|
if self.mongo: self.mongo.close() |
|
|
|
|
|
|
|
|
class BaseRecipeScraper(SoupClient): |
|
|
HEADING_TAGS = ("h1","h2","h3","h4","h5","h6") |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
*args, |
|
|
embedder= None, |
|
|
embedding_fields= None, |
|
|
**kwargs |
|
|
): |
|
|
""" |
|
|
embedder: HFEmbedder(), optional |
|
|
embedding_fields: list of (source_field, target_field) like: |
|
|
[("title", "title_emb"), ("instructions_text", "instr_emb")] |
|
|
""" |
|
|
super().__init__(*args, **kwargs) |
|
|
self.embedder = embedder |
|
|
self.embedding_fields = embedding_fields or [] |
|
|
self.logger = self.log |
|
|
|
|
|
def extract_jsonld(self, soup: BeautifulSoup) -> Optional[Dict[str, Any]]: |
|
|
def to_list(x): return x if isinstance(x, list) else [x] |
|
|
for tag in soup.find_all("script", type="application/ld+json"): |
|
|
try: |
|
|
data = json.loads(tag.string or "{}") |
|
|
except Exception: |
|
|
continue |
|
|
nodes = (data.get("@graph", [data]) if isinstance(data, dict) |
|
|
else (data if isinstance(data, list) else [])) |
|
|
for n in nodes: |
|
|
if not isinstance(n, dict): continue |
|
|
t = n.get("@type") |
|
|
if t == "Recipe" or (isinstance(t, list) and "Recipe" in t): |
|
|
doc = RecipeDoc() |
|
|
doc.title = clean(n.get("name")) |
|
|
|
|
|
ings = [] |
|
|
for ing in to_list(n.get("recipeIngredient") or []): |
|
|
if isinstance(ing, dict): |
|
|
ings.append(clean(ing.get("name") or ing.get("text"))) |
|
|
else: |
|
|
ings.append(clean(str(ing))) |
|
|
doc.ingredients = [x for x in ings if x] |
|
|
|
|
|
steps = [] |
|
|
for st in to_list(n.get("recipeInstructions") or []): |
|
|
if isinstance(st, dict): |
|
|
steps.append(clean(st.get("text") or st.get("name"))) |
|
|
else: |
|
|
steps.append(clean(str(st))) |
|
|
doc.instructions = [x for x in steps if x] |
|
|
|
|
|
doc.servings = n.get("recipeYield") |
|
|
doc.image_url = clean((n.get("image") or {}).get("url") if isinstance(n.get("image"), dict) else (n.get("image")[0] if isinstance(n.get("image"), list) else n.get("image"))) |
|
|
doc.course = clean(n.get("recipeCategory")) if isinstance(n.get("recipeCategory"), str) else None |
|
|
doc.cuisine = clean(n.get("recipeCuisine")) if isinstance(n.get("recipeCuisine"), str) else None |
|
|
return asdict(doc) |
|
|
return None |
|
|
|
|
|
@staticmethod |
|
|
def _dedupe_preserve_order(items: List[str]) -> List[str]: |
|
|
seen = set() |
|
|
out = [] |
|
|
for x in items: |
|
|
x = clean(x) |
|
|
if not x or x in seen: |
|
|
continue |
|
|
seen.add(x); out.append(x) |
|
|
return out |
|
|
|
|
|
@staticmethod |
|
|
def _to_ingredients_text(items: List[str]) -> str: |
|
|
""" |
|
|
Turn ingredient bullets into a single text block. |
|
|
Using one-per-line is great for embeddings and human readability. |
|
|
""" |
|
|
items = [clean(x) for x in items if x] |
|
|
items = BaseRecipeScraper._dedupe_preserve_order(items) |
|
|
return "\n".join(f"- {x}" for x in items) |
|
|
|
|
|
@staticmethod |
|
|
def _to_instructions_text(steps: List[str]) -> str: |
|
|
""" |
|
|
Turn ordered steps into a single text block. |
|
|
Numbered paragraphs help embeddings keep sequence context. |
|
|
""" |
|
|
steps = [clean(x) for x in steps if x] |
|
|
steps = BaseRecipeScraper._dedupe_preserve_order(steps) |
|
|
return "\n\n".join(f"{i}. {s}" for i, s in enumerate(steps, 1)) |
|
|
|
|
|
def discover_urls(self) -> Iterable[str]: |
|
|
raise NotImplementedError |
|
|
def extract_recipe(self, soup: BeautifulSoup, url: str, category: Optional[str] = None) -> RecipeDoc: |
|
|
raise NotImplementedError |
|
|
|
|
|
|
|
|
def stream(self, sink: DualSink, options: Optional[StreamOptions] = None) -> int: |
|
|
opts = options or StreamOptions() |
|
|
self.log.info( |
|
|
f"Starting stream: limit={opts.limit} batch_size={opts.batch_size} " |
|
|
f"resume_file={opts.resume_file} sink={type(sink).__name__}" |
|
|
) |
|
|
|
|
|
processed = set() |
|
|
if opts.resume_file: |
|
|
resume_path = Path("data") / opts.resume_file |
|
|
print(resume_path, 'resume_path') |
|
|
if resume_path.exists(): |
|
|
with resume_path.open("r", encoding="utf-8") as f: |
|
|
processed = {line.strip() for line in f if line.strip()} |
|
|
else: |
|
|
processed = set() |
|
|
self.log.info(f"[resume] {len(processed)} URLs already done") |
|
|
|
|
|
batch, saved = [], 0 |
|
|
try: |
|
|
for i, url in enumerate(self.discover_urls(), 1): |
|
|
if opts.limit and i > opts.limit: break |
|
|
if not self.same_domain(url): continue |
|
|
if url in processed: continue |
|
|
|
|
|
try: |
|
|
soup = self.fetch_soup(url) |
|
|
doc = self.extract_recipe(soup, url) |
|
|
doc.finalize() |
|
|
batch.append(asdict(doc)) |
|
|
except Exception as e: |
|
|
self.log.warning(f"[skip] {url} -> {e}") |
|
|
|
|
|
if opts.resume_file: |
|
|
resume_path = Path("data") / opts.resume_file |
|
|
with open(resume_path, "a", encoding="utf-8") as rf: |
|
|
rf.write(url + "\n") |
|
|
|
|
|
if len(batch) >= opts.batch_size: |
|
|
self._apply_embeddings(batch) |
|
|
sink.write_many(batch); saved += len(batch); batch = [] |
|
|
if opts.progress_callback: opts.progress_callback(saved) |
|
|
self.log.info(f"[resume] {saved} URLs already done 1") |
|
|
|
|
|
if i % 25 == 0: |
|
|
self.log.info(f"…processed {i}, saved {saved}") |
|
|
|
|
|
time.sleep(opts.delay) |
|
|
|
|
|
if batch: |
|
|
self._apply_embeddings(batch) |
|
|
sink.write_many(batch); saved += len(batch) |
|
|
if opts.progress_callback: opts.progress_callback(saved) |
|
|
self.log.info(f"[resume] {saved} URLs already done2 ") |
|
|
finally: |
|
|
sink.close() |
|
|
|
|
|
self.log.info(f"[done] saved {saved}") |
|
|
return saved |
|
|
|
|
|
@staticmethod |
|
|
def _field_to_text(val: Any) -> str: |
|
|
if isinstance(val, list): |
|
|
return "\n".join(str(x) for x in val) |
|
|
if val is None: |
|
|
return "" |
|
|
return str(val) |
|
|
|
|
|
def _gather_text(self, doc: Dict[str, Any], src: Any) -> str: |
|
|
if isinstance(src, tuple): |
|
|
parts: List[str] = [] |
|
|
for f in src: |
|
|
t = self._field_to_text(doc.get(f)) |
|
|
if t: |
|
|
|
|
|
label = "Ingredients" if f == "ingredients" else ("Instructions" if f == "instructions" else f) |
|
|
parts.append(f"{label}:\n{t}") |
|
|
return "\n\n".join(parts) |
|
|
else: |
|
|
return self._field_to_text(doc.get(src)) |
|
|
|
|
|
def _apply_embeddings(self, batch: List[Dict[str, Any]]) -> None: |
|
|
""" |
|
|
Applies embeddings to specified fields in a batch of documents. |
|
|
|
|
|
For each (source_field, destination_field) pair in `self.embedding_fields`, this method: |
|
|
- Extracts the value from `source_field` in each document of the batch. |
|
|
- Converts the value to a string. If the value is a list, joins its elements with newlines. |
|
|
- Handles `None` values by converting them to empty strings. |
|
|
- Uses `self.embedder.encode` to generate embeddings for the processed texts. |
|
|
- Stores the resulting embedding vector in `destination_field` of each document. |
|
|
|
|
|
If `self.embedder`, `self.embedding_fields`, or `batch` is not set or empty, the method returns immediately. |
|
|
|
|
|
Args: |
|
|
batch (List[Dict[str, Any]]): A list of documents to process, where each document is a dictionary. |
|
|
|
|
|
Returns: |
|
|
None |
|
|
""" |
|
|
if not self.embedder or not self.embedding_fields or not batch: |
|
|
return |
|
|
try: |
|
|
for src_spec, dst_field in self.embedding_fields: |
|
|
texts = [ self._gather_text(doc, src_spec) for doc in batch ] |
|
|
embs = self.embedder.encode(texts) |
|
|
for document, vec in zip(batch, embs): |
|
|
document[dst_field] = vec |
|
|
except Exception as e: |
|
|
self.logger.warning(f"[stream error]: {e}") |
|
|
|
|
|
|