| import os | |
| import argparse | |
| from structlog import get_logger | |
| import pandas as pd | |
| from src.config import pyro_source, CHANNEL_ID | |
| BATCH_SIZE = 256 | |
| logger = get_logger() | |
| def save_batch(df: pd.DataFrame, out_path: str, is_first_batch: bool): | |
| if is_first_batch: | |
| df.to_csv(out_path, index=False, mode="w") | |
| else: | |
| df.to_csv(out_path, index=False, mode="a", header=False) | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Telegram posts loader") | |
| parser.add_argument("--channel_id", type=str, default=CHANNEL_ID) | |
| parser.add_argument("--limit", type=int, required=True) | |
| parser.add_argument("--offset", type=int, default=0) | |
| args = parser.parse_args() | |
| total_limit = args.limit | |
| channel_id = args.channel_id | |
| base_offset = args.offset | |
| out_path = f"./channel_{channel_id}_posts.csv" | |
| is_first_batch = not os.path.exists(out_path) | |
| total_batches = (total_limit + BATCH_SIZE - 1) // BATCH_SIZE | |
| for batch_num in range(total_batches): | |
| logger.info(f"Batch #{batch_num} loading") | |
| current_offset = base_offset + batch_num * BATCH_SIZE | |
| posts = pyro_source.load_messages( | |
| channel_id=channel_id, | |
| limit=BATCH_SIZE, | |
| offset=current_offset | |
| ) | |
| df = pd.DataFrame(posts) | |
| save_batch(df, out_path, is_first_batch) | |
| is_first_batch = False | |
| logger.info("Finished loading") | |
| if __name__ == "__main__": | |
| main() | |