| | """ |
| | Blog Data Update Script |
| | |
| | This script updates the blog data vector store when new posts are added. |
| | It can be scheduled to run periodically or manually executed. |
| | |
| | Usage: |
| | python pipeline.py [--force-recreate] [--data-dir DATA_DIR] [--output-dir OUTPUT_DIR] [--ci] |
| | |
| | Options: |
| | --force-recreate Force recreation of the vector store even if it exists |
| | --data-dir DIR Directory containing the blog posts (default: data/) |
| | --output-dir DIR Directory to save stats and artifacts (default: ./stats) |
| | --ci Run in CI mode (no interactive prompts, exit codes for CI) |
| | """ |
| |
|
| | import os |
| | import sys |
| | import argparse |
| | from datetime import datetime |
| | import json |
| | import logging |
| | from pathlib import Path |
| | from lets_talk.config import ( |
| | CHUNK_OVERLAP, CHUNK_SIZE, VECTOR_STORAGE_PATH, DATA_DIR, |
| | FORCE_RECREATE, OUTPUT_DIR, USE_CHUNKING, SHOULD_SAVE_STATS |
| | ) |
| |
|
| | |
| | import lets_talk.utils.blog as blog |
| |
|
| | |
| | logging.basicConfig( |
| | level=logging.INFO, |
| | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
| | handlers=[logging.StreamHandler()] |
| | ) |
| | logger = logging.getLogger("blog-pipeline") |
| |
|
| | def parse_args(): |
| | """Parse command-line arguments""" |
| | parser = argparse.ArgumentParser(description="Update blog data vector store") |
| | parser.add_argument("--force-recreate", action="store_true", |
| | help="Force recreation of the vector store") |
| | parser.add_argument("--data-dir", default=DATA_DIR, |
| | help=f"Directory containing blog posts (default: {DATA_DIR})") |
| | parser.add_argument("--output-dir", default="./stats", |
| | help="Directory to save stats and artifacts (default: ./stats)") |
| | parser.add_argument("--ci", action="store_true", |
| | help="Run in CI mode (no interactive prompts, exit codes for CI)") |
| | parser.add_argument("--chunk-size", type=int, |
| | help=f"Size of each chunk in characters (default from config)") |
| | parser.add_argument("--chunk-overlap", type=int, |
| | help=f"Overlap between chunks in characters (default from config)") |
| | parser.add_argument("--no-chunking", action="store_true", |
| | help="Don't split documents into chunks (use whole documents)") |
| | return parser.parse_args() |
| |
|
| | def save_stats(stats, output_dir="./stats", ci_mode=False): |
| | """Save stats to a JSON file for tracking changes over time |
| | |
| | Args: |
| | stats: Dictionary containing statistics about the blog posts |
| | output_dir: Directory to save the stats file |
| | ci_mode: Whether to run in CI mode (use fixed filename) |
| | |
| | Returns: |
| | Tuple of (filename, stats_dict) |
| | """ |
| | |
| | Path(output_dir).mkdir(exist_ok=True, parents=True) |
| | |
| | |
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | if ci_mode: |
| | filename = f"{output_dir}/blog_stats_latest.json" |
| | |
| | history_filename = f"{output_dir}/blog_stats_{timestamp}.json" |
| | else: |
| | filename = f"{output_dir}/blog_stats_{timestamp}.json" |
| | |
| | |
| | basic_stats = { |
| | "timestamp": timestamp, |
| | "total_documents": stats["total_documents"], |
| | "total_characters": stats["total_characters"], |
| | "min_length": stats["min_length"], |
| | "max_length": stats["max_length"], |
| | "avg_length": stats["avg_length"], |
| | } |
| | |
| | with open(filename, "w") as f: |
| | json.dump(basic_stats, f, indent=2) |
| | |
| | |
| | if ci_mode: |
| | with open(history_filename, "w") as f: |
| | json.dump(basic_stats, f, indent=2) |
| | logger.info(f"Saved stats to {filename} and {history_filename}") |
| | else: |
| | logger.info(f"Saved stats to {filename}") |
| | |
| | return filename, basic_stats |
| |
|
| | def create_vector_database(data_dir=DATA_DIR, storage_path=VECTOR_STORAGE_PATH, |
| | force_recreate=FORCE_RECREATE, output_dir=OUTPUT_DIR, ci_mode=False, |
| | use_chunking=USE_CHUNKING, should_save_stats=SHOULD_SAVE_STATS, |
| | chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP): |
| | """ |
| | Create or update the vector database with blog documents. |
| | |
| | Args: |
| | data_dir: Directory containing the blog posts (default from config) |
| | storage_path: Path where the vector database will be stored (default from config) |
| | force_recreate: Whether to force recreation of the vector store (default from config) |
| | output_dir: Directory to save stats and artifacts (default from config) |
| | ci_mode: Whether to run in CI mode |
| | use_chunking: Whether to split documents into chunks (default from config) |
| | should_save_stats: Whether to save statistics about the documents (default from config) |
| | chunk_size: Size of each chunk in characters (default from config) |
| | chunk_overlap: Overlap between chunks in characters (default from config) |
| | |
| | Returns: |
| | Tuple of (success status, message, stats, stats_file, stats_file_content) |
| | """ |
| | try: |
| | |
| | logger.info(f"Loading blog posts from {data_dir}") |
| | documents = blog.load_blog_posts(data_dir) |
| | documents = blog.update_document_metadata(documents) |
| | |
| | |
| | |
| | stats = blog.get_document_stats(documents) |
| | blog.display_document_stats(stats) |
| |
|
| | |
| | stats_file = None |
| | stats_content = None |
| | if should_save_stats: |
| | stats_file, stats_content = save_stats(stats, output_dir=output_dir, ci_mode=ci_mode) |
| | |
| | if use_chunking: |
| | logger.info("Chunking documents...") |
| | |
| | chunking_params = {} |
| | if chunk_size is not None: |
| | chunking_params['chunk_size'] = chunk_size |
| | if chunk_overlap is not None: |
| | chunking_params['chunk_overlap'] = chunk_overlap |
| | |
| | logger.info(f"Using chunk size: {chunking_params.get('chunk_size', 'default')} and overlap: {chunking_params.get('chunk_overlap', 'default')}") |
| | documents = blog.split_documents(documents, **chunking_params) |
| |
|
| | |
| |
|
| | create_vector_store = (not Path.exists(Path(storage_path))) or force_recreate |
| | |
| | if create_vector_store: |
| | logger.info("Creating vector store...") |
| | vector_store = blog.create_vector_store( |
| | documents, |
| | storage_path=storage_path, |
| | force_recreate=force_recreate |
| | ) |
| | vector_store.client.close() |
| | logger.info(f"Vector store successfully created at {storage_path}") |
| | |
| | |
| | if ci_mode: |
| | build_info = { |
| | "build_timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
| | "document_count": stats["total_documents"], |
| | "storage_path": str(storage_path), |
| | "vector_store_size_bytes": get_directory_size(storage_path), |
| | } |
| | build_info_path = Path(output_dir) / "vector_store_build_info.json" |
| | with open(build_info_path, "w") as f: |
| | json.dump(build_info, f, indent=2) |
| | logger.info(f"Build info saved to {build_info_path}") |
| | |
| | return True, f"Vector store successfully created at {storage_path}", stats, stats_file, stats_content |
| | else: |
| | logger.info(f"Vector store already exists at {storage_path}") |
| | return True, f"Vector store already exists at {storage_path} (use --force-recreate to rebuild)", stats, stats_file, stats_content |
| | except Exception as e: |
| | logger.error(f"Error creating vector store: {str(e)}", exc_info=True) |
| | return False, f"Error creating vector store: {str(e)}", None, None, None |
| |
|
| | def get_directory_size(path): |
| | """Get the size of a directory in bytes""" |
| | total_size = 0 |
| | for dirpath, dirnames, filenames in os.walk(path): |
| | for filename in filenames: |
| | filepath = os.path.join(dirpath, filename) |
| | if not os.path.islink(filepath): |
| | total_size += os.path.getsize(filepath) |
| | return total_size |
| |
|
| | def main(): |
| | """Main function to update blog data""" |
| | args = parse_args() |
| | |
| | logger.info("=== Blog Data Update ===") |
| | logger.info(f"Data directory: {args.data_dir}") |
| | logger.info(f"Force recreate: {args.force_recreate}") |
| | logger.info(f"Output directory: {args.output_dir}") |
| | logger.info(f"CI mode: {args.ci}") |
| | logger.info(f"Chunking: {not args.no_chunking}") |
| | if not args.no_chunking: |
| | logger.info(f"Chunk size: {args.chunk_size if args.chunk_size else 'default from config'}") |
| | logger.info(f"Chunk overlap: {args.chunk_overlap if args.chunk_overlap else 'default from config'}") |
| | logger.info("========================") |
| | |
| | try: |
| | |
| | success, message, stats, stats_file, stats_content = create_vector_database( |
| | data_dir=args.data_dir, |
| | storage_path=VECTOR_STORAGE_PATH, |
| | force_recreate=args.force_recreate, |
| | output_dir=args.output_dir, |
| | ci_mode=args.ci, |
| | use_chunking=not args.no_chunking, |
| | chunk_size=args.chunk_size, |
| | chunk_overlap=args.chunk_overlap |
| | ) |
| | |
| | logger.info("\n=== Update Summary ===") |
| | if stats: |
| | logger.info(f"Processed {stats['total_documents']} documents") |
| | logger.info(f"Stats saved to: {stats_file}") |
| | logger.info(f"Vector DB status: {message}") |
| | logger.info("=====================") |
| | |
| | |
| | if args.ci and stats: |
| | ci_summary_path = Path(args.output_dir) / "ci_summary.json" |
| | ci_summary = { |
| | "status": "success" if success else "failure", |
| | "message": message, |
| | "stats_file": stats_file, |
| | "document_count": stats["total_documents"], |
| | "vector_store_path": str(VECTOR_STORAGE_PATH) |
| | } |
| | with open(ci_summary_path, "w") as f: |
| | json.dump(ci_summary, f, indent=2) |
| | logger.info(f"CI summary saved to {ci_summary_path}") |
| | |
| | if not success: |
| | return 1 |
| | return 0 |
| | except Exception as e: |
| | logger.error(f"Error: {e}", exc_info=True) |
| | return 1 |
| |
|
| | if __name__ == "__main__": |
| | sys.exit(main()) |
| |
|