import os import argparse from structlog import get_logger import pandas as pd from src.config import pyro_source, CHANNEL_ID BATCH_SIZE = 256 logger = get_logger() def save_batch(df: pd.DataFrame, out_path: str, is_first_batch: bool): if is_first_batch: df.to_csv(out_path, index=False, mode="w") else: df.to_csv(out_path, index=False, mode="a", header=False) def main(): parser = argparse.ArgumentParser(description="Telegram posts loader") parser.add_argument("--channel_id", type=str, default=CHANNEL_ID) parser.add_argument("--limit", type=int, required=True) parser.add_argument("--offset", type=int, default=0) args = parser.parse_args() total_limit = args.limit channel_id = args.channel_id base_offset = args.offset out_path = f"./channel_{channel_id}_posts.csv" is_first_batch = not os.path.exists(out_path) total_batches = (total_limit + BATCH_SIZE - 1) // BATCH_SIZE for batch_num in range(total_batches): logger.info(f"Batch #{batch_num} loading") current_offset = base_offset + batch_num * BATCH_SIZE posts = pyro_source.load_messages( channel_id=channel_id, limit=BATCH_SIZE, offset=current_offset ) df = pd.DataFrame(posts) save_batch(df, out_path, is_first_batch) is_first_batch = False logger.info("Finished loading") if __name__ == "__main__": main()