import os import json import time import threading import random from datetime import datetime from flask import Flask, request, jsonify, send_from_directory from flask_cors import CORS from datasets import load_dataset, Dataset from openai import OpenAI app = Flask(__name__, static_folder=".", static_url_path="") CORS(app, supports_credentials=True) # OpenAI Configuration OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") client = OpenAI(api_key=OPENAI_API_KEY) if OPENAI_API_KEY else None # HuggingFace Configuration HF_TOKEN = os.getenv("HF_TOKEN") TRAINING_DATASET = "Sahil5112/ContinuumGPT" # Hierarchical Memory Configuration LEVEL_1_MAX = 20 # Max entries before archiving to Level 2 LEVEL_2_MAX = 50 # Max entries before archiving to Level 3 AUTO_ARCHIVE_ENABLED = True AUTO_ARCHIVE_INTERVAL = 600 # 10 minutes AUTO_TRAINING_INTERVAL = 300 # 5 minutes - auto-generate training examples AUTO_SAVE_ENABLED = True # Always save responses immediately # Memory buffer MEMORY_BUFFER = [] # Auto-training prompts for diverse learning AUTO_TRAINING_PROMPTS = [ "Explain quantum computing in simple terms", "How does machine learning work?", "What are the best practices for web development?", "Write a Python function to sort a list", "Explain the difference between AI and ML", "How do neural networks learn?", "What is the future of artificial intelligence?", "Create a creative story about space exploration", "Explain blockchain technology", "How does natural language processing work?", "What are design patterns in software engineering?", "Explain the concept of recursion with examples", "How do databases manage concurrent transactions?", "What is the difference between REST and GraphQL?", "Explain cloud computing and its benefits", "How does encryption work?", "What are the principles of good UI/UX design?", "Explain the concept of Big O notation", "How do search engines rank websites?", "What is containerization and why is it useful?" ] def load_training_dataset(): """Load existing training data from HuggingFace""" try: if HF_TOKEN: dataset = load_dataset(TRAINING_DATASET, split="train", token=HF_TOKEN) return [dict(row) for row in dataset] else: print("āš ļø No HF_TOKEN - using local storage only") return [] except Exception as e: print(f"Could not load training dataset: {e}") return [] def save_to_training_dataset(training_examples): """Save training examples to HuggingFace dataset""" if not HF_TOKEN: print("āŒ No HF_TOKEN - cannot save to HuggingFace") return False try: existing_data = load_training_dataset() existing_data.extend(training_examples) dataset = Dataset.from_list(existing_data) dataset.push_to_hub(TRAINING_DATASET, token=HF_TOKEN, private=False) print(f"āœ… Saved {len(training_examples)} entries to {TRAINING_DATASET}") print(f"šŸ“Š Total dataset size: {len(existing_data)} entries") return True except Exception as e: print(f"āŒ Error saving to dataset: {e}") return False def call_openai_gpt4o_mini(prompt): """Call OpenAI GPT-4o-mini""" if not client: return None try: response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "You are ContinuumGPT, a helpful AI assistant."}, {"role": "user", "content": prompt} ], temperature=0.7, max_tokens=1000 ) return response.choices[0].message.content except Exception as e: print(f"OpenAI API Error: {e}") return None def summarize_conversation(query, response): """Create a summary of the conversation using AI""" if not client: return f"{query[:50]}... -> {response[:50]}..." try: summary_prompt = f"Summarize this conversation in one sentence:\nUser: {query}\nAI: {response}" summary = call_openai_gpt4o_mini(summary_prompt) return summary if summary else f"Q&A about {query[:30]}..." except: return f"Q&A about {query[:30]}..." def create_super_summary(summaries): """Create a global super-summary from multiple summaries""" if not client or not summaries: return "General knowledge compilation" try: combined = "\n".join(summaries[:10]) prompt = f"Create a brief summary of these conversation topics:\n{combined}" super_summary = call_openai_gpt4o_mini(prompt) return super_summary if super_summary else "General knowledge compilation" except: return "General knowledge compilation" def archive_level_1_to_level_2(): """Archive Level 1 entries to Level 2 with compression""" dataset = load_training_dataset() level_1_entries = [d for d in dataset if d.get("level") == 1 and not d.get("archived")] if len(level_1_entries) <= LEVEL_1_MAX: return print(f"šŸ“¦ Archiving {len(level_1_entries)} Level 1 entries to Level 2...") archived_entries = [] for entry in level_1_entries: archived_entry = { "query": entry.get("query"), "response": entry.get("response"), "summary": entry.get("summary"), "archived": True, "level": 2, "counter": entry.get("counter", 1) + 1, "timestamp": datetime.now().isoformat(), "original_timestamp": entry.get("timestamp") } archived_entries.append(archived_entry) updated_dataset = [d for d in dataset if d.get("level") != 1 or d.get("archived")] updated_dataset.extend(archived_entries) if HF_TOKEN: dataset_obj = Dataset.from_list(updated_dataset) dataset_obj.push_to_hub(TRAINING_DATASET, token=HF_TOKEN, private=False) print(f"āœ… Archived to Level 2: {len(archived_entries)} entries") def archive_level_2_to_level_3(): """Archive Level 2 entries to Level 3 with super-summarization""" dataset = load_training_dataset() level_2_entries = [d for d in dataset if d.get("level") == 2] if len(level_2_entries) <= LEVEL_2_MAX: return print(f"šŸ—œļø Creating Level 3 super-summary from {len(level_2_entries)} Level 2 entries...") summaries = [d.get("summary", "") for d in level_2_entries if d.get("summary")] super_summary = create_super_summary(summaries) level_3_entry = { "query": "Global Knowledge Archive", "response": super_summary, "summary": super_summary, "archived": True, "level": 3, "counter": len(level_2_entries), "timestamp": datetime.now().isoformat(), "entries_compressed": len(level_2_entries) } updated_dataset = [d for d in dataset if d.get("level") != 2] updated_dataset.append(level_3_entry) if HF_TOKEN: dataset_obj = Dataset.from_list(updated_dataset) dataset_obj.push_to_hub(TRAINING_DATASET, token=HF_TOKEN, private=False) print(f"āœ… Created Level 3 super-summary") def auto_training_worker(): """Background worker that automatically generates training examples""" print("šŸ¤– Auto-training worker started") training_count = 0 while True: try: if not HF_TOKEN or not client: print("āš ļø Auto-training paused - need HF_TOKEN and OPENAI_API_KEY") time.sleep(AUTO_TRAINING_INTERVAL) continue # Select random training prompt prompt = random.choice(AUTO_TRAINING_PROMPTS) print(f"\nšŸŽ“ Auto-training #{training_count + 1}: '{prompt[:50]}...'") # Generate response response = call_openai_gpt4o_mini(prompt) if not response: print("āŒ Failed to generate response") time.sleep(AUTO_TRAINING_INTERVAL) continue # Create summary summary = summarize_conversation(prompt, response) # Create Level 1 memory entry memory_entry = { "query": prompt, "response": response, "summary": summary, "archived": False, "level": 1, "counter": 1, "timestamp": datetime.now().isoformat(), "auto_generated": True } # Save immediately to dataset if save_to_training_dataset([memory_entry]): training_count += 1 print(f"āœ… Auto-trained and saved! Total auto-training: {training_count}") time.sleep(AUTO_TRAINING_INTERVAL) except Exception as e: print(f"āŒ Auto-training error: {e}") time.sleep(AUTO_TRAINING_INTERVAL) def auto_archive_worker(): """Background worker that automatically archives memory levels""" print("šŸ—„ļø Auto-archive worker started") while AUTO_ARCHIVE_ENABLED: try: if HF_TOKEN and client: print("\nšŸ”„ Running auto-archive check...") archive_level_1_to_level_2() archive_level_2_to_level_3() print(f"ā³ Next archive check in {AUTO_ARCHIVE_INTERVAL}s...\n") else: print("āš ļø Auto-archive paused - need HF_TOKEN and OPENAI_API_KEY") except Exception as e: print(f"āŒ Auto-archive error: {e}") time.sleep(AUTO_ARCHIVE_INTERVAL) @app.route("/") def index(): return send_from_directory(".", "index.html") @app.route("/api/chat", methods=["POST"]) def chat(): """Handle chat requests with automatic dataset saving""" data = request.get_json() query = data.get("query", "").strip() if not query: return jsonify({"success": False, "error": "Missing query"}), 400 # Generate response response = call_openai_gpt4o_mini(query) if not response: return jsonify({ "success": False, "error": "OPENAI_API_KEY not set. Please add your OpenAI API key to enable AI.", "response": None }) # Create summary summary = summarize_conversation(query, response) # Create Level 1 memory entry memory_entry = { "query": query, "response": response, "summary": summary, "archived": False, "level": 1, "counter": 1, "timestamp": datetime.now().isoformat(), "auto_generated": False } # AUTO-SAVE: Save immediately to HuggingFace if AUTO_SAVE_ENABLED and HF_TOKEN: save_to_training_dataset([memory_entry]) saved_status = "saved" else: MEMORY_BUFFER.append(memory_entry) saved_status = "buffered" return jsonify({ "success": True, "response": response, "summary": summary, "level": 1, "buffered": len(MEMORY_BUFFER), "saved": saved_status }) @app.route("/api/dataset-stats", methods=["GET"]) def dataset_stats(): """Get statistics about the hierarchical memory dataset""" try: training_data = load_training_dataset() level_1 = [d for d in training_data if d.get("level") == 1] level_2 = [d for d in training_data if d.get("level") == 2] level_3 = [d for d in training_data if d.get("level") == 3] auto_generated = [d for d in training_data if d.get("auto_generated")] return jsonify({ "success": True, "total_entries": len(training_data), "level_1_fresh": len([d for d in level_1 if not d.get("archived")]), "level_2_archived": len(level_2), "level_3_super": len(level_3), "buffered": len(MEMORY_BUFFER), "auto_generated_count": len(auto_generated), "dataset_url": f"https://huggingface.co/datasets/{TRAINING_DATASET}" }) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/flush-buffer", methods=["POST"]) def flush_buffer(): """Manually flush the memory buffer to HuggingFace""" global MEMORY_BUFFER if not MEMORY_BUFFER: return jsonify({"message": "Buffer is empty, nothing to flush"}) success = save_to_training_dataset(MEMORY_BUFFER.copy()) count = len(MEMORY_BUFFER) MEMORY_BUFFER.clear() if success: return jsonify({ "success": True, "message": f"Flushed {count} entries to HuggingFace" }) else: return jsonify({"error": "Failed to flush buffer"}), 500 @app.route("/api/archive-now", methods=["POST"]) def archive_now(): """Manually trigger archiving process""" try: archive_level_1_to_level_2() archive_level_2_to_level_3() return jsonify({"success": True, "message": "Archiving completed"}) except Exception as e: return jsonify({"error": str(e)}), 500 if __name__ == "__main__": port = int(os.getenv("PORT", 7860)) print("šŸš€ Starting ContinuumGPT Hierarchical Memory System...") print(f"šŸ“Š Training Dataset: {TRAINING_DATASET}") print(f"šŸŽ“ Dataset URL: https://huggingface.co/datasets/{TRAINING_DATASET}") print("") print("🧠 Hierarchical Memory Architecture:") print(f" Level 1 (Fresh): Detailed Q&A (max {LEVEL_1_MAX} before archiving)") print(f" Level 2 (Archived): Compressed summaries (max {LEVEL_2_MAX} before archiving)") print(f" Level 3 (Super): Global knowledge compilation") print(f" Auto-archiving: Every {AUTO_ARCHIVE_INTERVAL} seconds") print(f" Auto-training: Every {AUTO_TRAINING_INTERVAL} seconds") print(f" Auto-save: {'ENABLED' if AUTO_SAVE_ENABLED else 'DISABLED'}") print("") if OPENAI_API_KEY: print("āœ… OpenAI API Key Configured") # Start auto-archive worker archive_thread = threading.Thread(target=auto_archive_worker, daemon=True) archive_thread.start() print("āœ… Auto-archive worker started") # Start auto-training worker training_thread = threading.Thread(target=auto_training_worker, daemon=True) training_thread.start() print("āœ… Auto-training worker started - will generate examples every 5 minutes") else: print("āš ļø OpenAI API Key Missing - Add OPENAI_API_KEY to enable") if HF_TOKEN: print("āœ… HuggingFace Integration Active") training_data = load_training_dataset() level_counts = {1: 0, 2: 0, 3: 0} for d in training_data: level = d.get("level", 1) level_counts[level] = level_counts.get(level, 0) + 1 print(f"šŸ“š Current dataset: L1={level_counts[1]}, L2={level_counts[2]}, L3={level_counts[3]}") if AUTO_SAVE_ENABLED: print("šŸ’¾ Auto-save ENABLED - all responses saved immediately to dataset") else: print("āš ļø HuggingFace Integration Disabled - Add HF_TOKEN to enable") print("") app.run(host="0.0.0.0", port=port, debug=False, threaded=True)