| import argparse |
| import csv |
| import html |
| import json |
| import logging |
| import re |
| import sqlite3 |
| from pathlib import Path |
| from typing import Dict, Iterable, Iterator, List, Tuple |
|
|
| from tqdm import tqdm |
|
|
|
|
| HTML_TAG_RE = re.compile(r"<[^>]+>") |
| WS_RE = re.compile(r"\s+") |
| CODE_HINT_RE = re.compile( |
| r"(\bdef\b|\bclass\b|#include|public\s+class|function\s+\w+|\breturn\b|\bimport\b)", |
| re.IGNORECASE, |
| ) |
|
|
|
|
| def setup_logger(log_path: Path) -> logging.Logger: |
| log_path.parent.mkdir(parents=True, exist_ok=True) |
| logger = logging.getLogger("codeforces_ingest") |
| logger.setLevel(logging.INFO) |
| if logger.handlers: |
| return logger |
| formatter = logging.Formatter("%(asctime)s | %(levelname)s | %(message)s") |
| fh = logging.FileHandler(log_path, encoding="utf-8") |
| fh.setFormatter(formatter) |
| sh = logging.StreamHandler() |
| sh.setFormatter(formatter) |
| logger.addHandler(fh) |
| logger.addHandler(sh) |
| return logger |
|
|
|
|
| def clean_text(text: str) -> str: |
| if not text: |
| return "" |
| text = html.unescape(str(text)) |
| text = HTML_TAG_RE.sub(" ", text) |
| text = text.replace("\r\n", "\n").replace("\r", "\n") |
| text = "\n".join(line.strip() for line in text.split("\n")) |
| text = WS_RE.sub(" ", text) |
| return text.strip() |
|
|
|
|
| def _safe_get(record: Dict[str, object], keys: Iterable[str]) -> str: |
| for key in keys: |
| val = record.get(key) |
| if val: |
| return str(val) |
| return "" |
|
|
|
|
| def _extract_pair(record: Dict[str, object]) -> Tuple[str, str]: |
| problem = _safe_get( |
| record, |
| [ |
| "problem_statement", |
| "statement", |
| "problem", |
| "question", |
| "content", |
| "description", |
| "prompt", |
| "instruction", |
| ], |
| ) |
| solution = _safe_get( |
| record, |
| [ |
| "solution", |
| "solution_code", |
| "answer", |
| "code", |
| "response", |
| "python", |
| "cpp", |
| "java", |
| "javascript", |
| ], |
| ) |
| return clean_text(problem), clean_text(solution) |
|
|
|
|
| def _iter_jsonl(path: Path) -> Iterator[Dict[str, object]]: |
| with path.open("r", encoding="utf-8", errors="ignore") as f: |
| for line in f: |
| line = line.strip() |
| if not line: |
| continue |
| try: |
| obj = json.loads(line) |
| if isinstance(obj, dict): |
| yield obj |
| except Exception: |
| continue |
|
|
|
|
| def _iter_json_stream(path: Path, logger: logging.Logger) -> Iterator[Dict[str, object]]: |
| |
| try: |
| import ijson |
| except Exception: |
| logger.warning("Skipping JSON file without ijson installed (to preserve streaming): %s", path) |
| return iter(()) |
|
|
| def gen(): |
| with path.open("rb") as f: |
| try: |
| for obj in ijson.items(f, "item"): |
| if isinstance(obj, dict): |
| yield obj |
| except Exception: |
| |
| f.seek(0) |
| try: |
| root = next(ijson.items(f, "")) |
| if isinstance(root, dict): |
| for v in root.values(): |
| if isinstance(v, list): |
| for obj in v: |
| if isinstance(obj, dict): |
| yield obj |
| except Exception: |
| return |
|
|
| return gen() |
|
|
|
|
| def _iter_csv_like(path: Path, delimiter: str) -> Iterator[Dict[str, object]]: |
| with path.open("r", encoding="utf-8", errors="ignore", newline="") as f: |
| reader = csv.DictReader(f, delimiter=delimiter) |
| for row in reader: |
| if isinstance(row, dict): |
| yield row |
|
|
|
|
| def _iter_txt_records(path: Path) -> Iterator[Dict[str, object]]: |
| |
| |
| with path.open("r", encoding="utf-8", errors="ignore") as f: |
| blob = f.read() |
| chunks = re.split(r"\n\s*[-=]{3,}\s*\n|\n\s*Problem\s+\d+\s*\n", blob, flags=re.IGNORECASE) |
| for chunk in chunks: |
| chunk = chunk.strip() |
| if len(chunk) < 120: |
| continue |
| parts = re.split(r"\n\s*(Solution|Answer)\s*:\s*\n", chunk, flags=re.IGNORECASE) |
| if len(parts) < 3: |
| continue |
| yield {"problem_statement": parts[0], "solution": parts[2]} |
|
|
|
|
| def _iter_candidate_files(input_dir: Path) -> Iterator[Path]: |
| patterns = [ |
| "**/*.jsonl", |
| "**/*.json", |
| "**/*.csv", |
| "**/*.tsv", |
| "**/*.txt", |
| ] |
| seen = set() |
| for pat in patterns: |
| for path in input_dir.glob(pat): |
| if ".git" in path.parts: |
| continue |
| lower = str(path).lower() |
| if "codeforces" not in lower: |
| continue |
| if path.name.lower() == "codeforces.jsonl": |
| continue |
| if path.is_file() and path not in seen: |
| seen.add(path) |
| yield path |
|
|
|
|
| def ingest_codeforces(input_dir: Path, output_file: Path, logger: logging.Logger) -> Dict[str, int]: |
| output_file.parent.mkdir(parents=True, exist_ok=True) |
| extracted = 0 |
| filtered = 0 |
| bad = 0 |
| usable = 0 |
| deduped = 0 |
|
|
| files = list(_iter_candidate_files(input_dir)) |
| if not files: |
| logger.warning("No Codeforces raw files found under %s", input_dir.resolve()) |
| output_file.write_text("", encoding="utf-8") |
| return {"extracted": 0, "filtered": 0, "bad": 0, "usable": 0} |
|
|
| dedupe_db = output_file.parent / "codeforces_ingest_dedupe.sqlite" |
| if dedupe_db.exists(): |
| dedupe_db.unlink() |
| for suffix in ("-wal", "-shm"): |
| s = dedupe_db.with_name(dedupe_db.name + suffix) |
| if s.exists(): |
| s.unlink() |
|
|
| conn = sqlite3.connect(str(dedupe_db)) |
| conn.execute("PRAGMA journal_mode=WAL;") |
| conn.execute("CREATE TABLE IF NOT EXISTS seen_hashes (h TEXT PRIMARY KEY)") |
|
|
| def is_dup(instruction: str, response: str) -> bool: |
| import hashlib |
|
|
| h = hashlib.sha256(f"{instruction}||{response}".encode("utf-8")).hexdigest() |
| try: |
| conn.execute("INSERT INTO seen_hashes(h) VALUES (?)", (h,)) |
| return False |
| except sqlite3.IntegrityError: |
| return True |
|
|
| with output_file.open("w", encoding="utf-8") as out_f: |
| for file_path in tqdm(files, desc="codeforces_files", unit="file"): |
| suffix = file_path.suffix.lower() |
| if suffix == ".jsonl": |
| rec_iter = _iter_jsonl(file_path) |
| elif suffix == ".json": |
| rec_iter = _iter_json_stream(file_path, logger) |
| elif suffix == ".csv": |
| rec_iter = _iter_csv_like(file_path, ",") |
| elif suffix == ".tsv": |
| rec_iter = _iter_csv_like(file_path, "\t") |
| else: |
| rec_iter = _iter_txt_records(file_path) |
|
|
| for rec in tqdm(rec_iter, desc=f"ingest:{file_path.name}", unit="rows", leave=False): |
| try: |
| extracted += 1 |
| problem, solution = _extract_pair(rec) |
| if len(problem) <= 50 or len(solution) <= 20: |
| filtered += 1 |
| continue |
| |
| if not CODE_HINT_RE.search(solution): |
| filtered += 1 |
| continue |
| instruction = f"Solve the following problem:\n{problem}" |
| if is_dup(instruction, solution): |
| deduped += 1 |
| continue |
| row = {"instruction": instruction, "response": solution} |
| out_f.write(json.dumps(row, ensure_ascii=False) + "\n") |
| usable += 1 |
| except Exception: |
| bad += 1 |
| continue |
| conn.commit() |
| conn.close() |
|
|
| skipped = filtered + bad + deduped |
| logger.info("Codeforces ingest total_input=%d", extracted) |
| logger.info("Codeforces ingest valid_output=%d", usable) |
| logger.info("Codeforces ingest skipped=%d", skipped) |
| logger.info("Codeforces ingest filtered=%d", filtered) |
| logger.info("Codeforces ingest deduped=%d", deduped) |
| logger.info("Codeforces ingest bad=%d", bad) |
| return { |
| "total_input": extracted, |
| "valid_output": usable, |
| "skipped": skipped, |
| "filtered": filtered, |
| "deduped": deduped, |
| "bad": bad, |
| } |
|
|
|
|
| def _build_parser() -> argparse.ArgumentParser: |
| parser = argparse.ArgumentParser( |
| description="Ingest Codeforces problem-solution data into JSONL for MINDI pipeline." |
| ) |
| parser.add_argument("--input-dir", type=Path, default=Path("./data/raw")) |
| parser.add_argument("--output", type=Path, default=Path("./data/raw/codeforces.jsonl")) |
| parser.add_argument("--log-file", type=Path, default=Path("./logs/codeforces_ingest.log")) |
| return parser |
|
|
|
|
| if __name__ == "__main__": |
| args = _build_parser().parse_args() |
| log = setup_logger(args.log_file) |
| stats = ingest_codeforces(args.input_dir, args.output, log) |
| print(f"Output: {args.output.resolve()}") |
| print(f"Total input: {stats['total_input']}") |
| print(f"Valid output: {stats['valid_output']}") |
| print(f"Skipped: {stats['skipped']}") |
|
|