Sahil
Update app.py
5a591f6 verified
import os
import json
import time
import threading
import random
from datetime import datetime
from flask import Flask, request, jsonify, send_from_directory
from flask_cors import CORS
from datasets import load_dataset, Dataset
from openai import OpenAI
app = Flask(__name__, static_folder=".", static_url_path="")
CORS(app, supports_credentials=True)
# OpenAI Configuration
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
client = OpenAI(api_key=OPENAI_API_KEY) if OPENAI_API_KEY else None
# HuggingFace Configuration
HF_TOKEN = os.getenv("HF_TOKEN")
TRAINING_DATASET = "Sahil5112/ContinuumGPT"
# Hierarchical Memory Configuration
LEVEL_1_MAX = 20 # Max entries before archiving to Level 2
LEVEL_2_MAX = 50 # Max entries before archiving to Level 3
AUTO_ARCHIVE_ENABLED = True
AUTO_ARCHIVE_INTERVAL = 600 # 10 minutes
AUTO_TRAINING_INTERVAL = 300 # 5 minutes - auto-generate training examples
AUTO_SAVE_ENABLED = True # Always save responses immediately
# Memory buffer
MEMORY_BUFFER = []
# Auto-training prompts for diverse learning
AUTO_TRAINING_PROMPTS = [
"Explain quantum computing in simple terms",
"How does machine learning work?",
"What are the best practices for web development?",
"Write a Python function to sort a list",
"Explain the difference between AI and ML",
"How do neural networks learn?",
"What is the future of artificial intelligence?",
"Create a creative story about space exploration",
"Explain blockchain technology",
"How does natural language processing work?",
"What are design patterns in software engineering?",
"Explain the concept of recursion with examples",
"How do databases manage concurrent transactions?",
"What is the difference between REST and GraphQL?",
"Explain cloud computing and its benefits",
"How does encryption work?",
"What are the principles of good UI/UX design?",
"Explain the concept of Big O notation",
"How do search engines rank websites?",
"What is containerization and why is it useful?"
]
def load_training_dataset():
"""Load existing training data from HuggingFace"""
try:
if HF_TOKEN:
dataset = load_dataset(TRAINING_DATASET, split="train", token=HF_TOKEN)
return [dict(row) for row in dataset]
else:
print("⚠️ No HF_TOKEN - using local storage only")
return []
except Exception as e:
print(f"Could not load training dataset: {e}")
return []
def save_to_training_dataset(training_examples):
"""Save training examples to HuggingFace dataset"""
if not HF_TOKEN:
print("❌ No HF_TOKEN - cannot save to HuggingFace")
return False
try:
existing_data = load_training_dataset()
existing_data.extend(training_examples)
dataset = Dataset.from_list(existing_data)
dataset.push_to_hub(TRAINING_DATASET, token=HF_TOKEN, private=False)
print(f"βœ… Saved {len(training_examples)} entries to {TRAINING_DATASET}")
print(f"πŸ“Š Total dataset size: {len(existing_data)} entries")
return True
except Exception as e:
print(f"❌ Error saving to dataset: {e}")
return False
def call_openai_gpt4o_mini(prompt):
"""Call OpenAI GPT-4o-mini"""
if not client:
return None
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are ContinuumGPT, a helpful AI assistant."},
{"role": "user", "content": prompt}
],
temperature=0.7,
max_tokens=1000
)
return response.choices[0].message.content
except Exception as e:
print(f"OpenAI API Error: {e}")
return None
def summarize_conversation(query, response):
"""Create a summary of the conversation using AI"""
if not client:
return f"{query[:50]}... -> {response[:50]}..."
try:
summary_prompt = f"Summarize this conversation in one sentence:\nUser: {query}\nAI: {response}"
summary = call_openai_gpt4o_mini(summary_prompt)
return summary if summary else f"Q&A about {query[:30]}..."
except:
return f"Q&A about {query[:30]}..."
def create_super_summary(summaries):
"""Create a global super-summary from multiple summaries"""
if not client or not summaries:
return "General knowledge compilation"
try:
combined = "\n".join(summaries[:10])
prompt = f"Create a brief summary of these conversation topics:\n{combined}"
super_summary = call_openai_gpt4o_mini(prompt)
return super_summary if super_summary else "General knowledge compilation"
except:
return "General knowledge compilation"
def archive_level_1_to_level_2():
"""Archive Level 1 entries to Level 2 with compression"""
dataset = load_training_dataset()
level_1_entries = [d for d in dataset if d.get("level") == 1 and not d.get("archived")]
if len(level_1_entries) <= LEVEL_1_MAX:
return
print(f"πŸ“¦ Archiving {len(level_1_entries)} Level 1 entries to Level 2...")
archived_entries = []
for entry in level_1_entries:
archived_entry = {
"query": entry.get("query"),
"response": entry.get("response"),
"summary": entry.get("summary"),
"archived": True,
"level": 2,
"counter": entry.get("counter", 1) + 1,
"timestamp": datetime.now().isoformat(),
"original_timestamp": entry.get("timestamp")
}
archived_entries.append(archived_entry)
updated_dataset = [d for d in dataset if d.get("level") != 1 or d.get("archived")]
updated_dataset.extend(archived_entries)
if HF_TOKEN:
dataset_obj = Dataset.from_list(updated_dataset)
dataset_obj.push_to_hub(TRAINING_DATASET, token=HF_TOKEN, private=False)
print(f"βœ… Archived to Level 2: {len(archived_entries)} entries")
def archive_level_2_to_level_3():
"""Archive Level 2 entries to Level 3 with super-summarization"""
dataset = load_training_dataset()
level_2_entries = [d for d in dataset if d.get("level") == 2]
if len(level_2_entries) <= LEVEL_2_MAX:
return
print(f"πŸ—œοΈ Creating Level 3 super-summary from {len(level_2_entries)} Level 2 entries...")
summaries = [d.get("summary", "") for d in level_2_entries if d.get("summary")]
super_summary = create_super_summary(summaries)
level_3_entry = {
"query": "Global Knowledge Archive",
"response": super_summary,
"summary": super_summary,
"archived": True,
"level": 3,
"counter": len(level_2_entries),
"timestamp": datetime.now().isoformat(),
"entries_compressed": len(level_2_entries)
}
updated_dataset = [d for d in dataset if d.get("level") != 2]
updated_dataset.append(level_3_entry)
if HF_TOKEN:
dataset_obj = Dataset.from_list(updated_dataset)
dataset_obj.push_to_hub(TRAINING_DATASET, token=HF_TOKEN, private=False)
print(f"βœ… Created Level 3 super-summary")
def auto_training_worker():
"""Background worker that automatically generates training examples"""
print("πŸ€– Auto-training worker started")
training_count = 0
while True:
try:
if not HF_TOKEN or not client:
print("⚠️ Auto-training paused - need HF_TOKEN and OPENAI_API_KEY")
time.sleep(AUTO_TRAINING_INTERVAL)
continue
# Select random training prompt
prompt = random.choice(AUTO_TRAINING_PROMPTS)
print(f"\nπŸŽ“ Auto-training #{training_count + 1}: '{prompt[:50]}...'")
# Generate response
response = call_openai_gpt4o_mini(prompt)
if not response:
print("❌ Failed to generate response")
time.sleep(AUTO_TRAINING_INTERVAL)
continue
# Create summary
summary = summarize_conversation(prompt, response)
# Create Level 1 memory entry
memory_entry = {
"query": prompt,
"response": response,
"summary": summary,
"archived": False,
"level": 1,
"counter": 1,
"timestamp": datetime.now().isoformat(),
"auto_generated": True
}
# Save immediately to dataset
if save_to_training_dataset([memory_entry]):
training_count += 1
print(f"βœ… Auto-trained and saved! Total auto-training: {training_count}")
time.sleep(AUTO_TRAINING_INTERVAL)
except Exception as e:
print(f"❌ Auto-training error: {e}")
time.sleep(AUTO_TRAINING_INTERVAL)
def auto_archive_worker():
"""Background worker that automatically archives memory levels"""
print("πŸ—„οΈ Auto-archive worker started")
while AUTO_ARCHIVE_ENABLED:
try:
if HF_TOKEN and client:
print("\nπŸ”„ Running auto-archive check...")
archive_level_1_to_level_2()
archive_level_2_to_level_3()
print(f"⏳ Next archive check in {AUTO_ARCHIVE_INTERVAL}s...\n")
else:
print("⚠️ Auto-archive paused - need HF_TOKEN and OPENAI_API_KEY")
except Exception as e:
print(f"❌ Auto-archive error: {e}")
time.sleep(AUTO_ARCHIVE_INTERVAL)
@app.route("/")
def index():
return send_from_directory(".", "index.html")
@app.route("/api/chat", methods=["POST"])
def chat():
"""Handle chat requests with automatic dataset saving"""
data = request.get_json()
query = data.get("query", "").strip()
if not query:
return jsonify({"success": False, "error": "Missing query"}), 400
# Generate response
response = call_openai_gpt4o_mini(query)
if not response:
return jsonify({
"success": False,
"error": "OPENAI_API_KEY not set. Please add your OpenAI API key to enable AI.",
"response": None
})
# Create summary
summary = summarize_conversation(query, response)
# Create Level 1 memory entry
memory_entry = {
"query": query,
"response": response,
"summary": summary,
"archived": False,
"level": 1,
"counter": 1,
"timestamp": datetime.now().isoformat(),
"auto_generated": False
}
# AUTO-SAVE: Save immediately to HuggingFace
if AUTO_SAVE_ENABLED and HF_TOKEN:
save_to_training_dataset([memory_entry])
saved_status = "saved"
else:
MEMORY_BUFFER.append(memory_entry)
saved_status = "buffered"
return jsonify({
"success": True,
"response": response,
"summary": summary,
"level": 1,
"buffered": len(MEMORY_BUFFER),
"saved": saved_status
})
@app.route("/api/dataset-stats", methods=["GET"])
def dataset_stats():
"""Get statistics about the hierarchical memory dataset"""
try:
training_data = load_training_dataset()
level_1 = [d for d in training_data if d.get("level") == 1]
level_2 = [d for d in training_data if d.get("level") == 2]
level_3 = [d for d in training_data if d.get("level") == 3]
auto_generated = [d for d in training_data if d.get("auto_generated")]
return jsonify({
"success": True,
"total_entries": len(training_data),
"level_1_fresh": len([d for d in level_1 if not d.get("archived")]),
"level_2_archived": len(level_2),
"level_3_super": len(level_3),
"buffered": len(MEMORY_BUFFER),
"auto_generated_count": len(auto_generated),
"dataset_url": f"https://huggingface.co/datasets/{TRAINING_DATASET}"
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/flush-buffer", methods=["POST"])
def flush_buffer():
"""Manually flush the memory buffer to HuggingFace"""
global MEMORY_BUFFER
if not MEMORY_BUFFER:
return jsonify({"message": "Buffer is empty, nothing to flush"})
success = save_to_training_dataset(MEMORY_BUFFER.copy())
count = len(MEMORY_BUFFER)
MEMORY_BUFFER.clear()
if success:
return jsonify({
"success": True,
"message": f"Flushed {count} entries to HuggingFace"
})
else:
return jsonify({"error": "Failed to flush buffer"}), 500
@app.route("/api/archive-now", methods=["POST"])
def archive_now():
"""Manually trigger archiving process"""
try:
archive_level_1_to_level_2()
archive_level_2_to_level_3()
return jsonify({"success": True, "message": "Archiving completed"})
except Exception as e:
return jsonify({"error": str(e)}), 500
if __name__ == "__main__":
port = int(os.getenv("PORT", 7860))
print("πŸš€ Starting ContinuumGPT Hierarchical Memory System...")
print(f"πŸ“Š Training Dataset: {TRAINING_DATASET}")
print(f"πŸŽ“ Dataset URL: https://huggingface.co/datasets/{TRAINING_DATASET}")
print("")
print("🧠 Hierarchical Memory Architecture:")
print(f" Level 1 (Fresh): Detailed Q&A (max {LEVEL_1_MAX} before archiving)")
print(f" Level 2 (Archived): Compressed summaries (max {LEVEL_2_MAX} before archiving)")
print(f" Level 3 (Super): Global knowledge compilation")
print(f" Auto-archiving: Every {AUTO_ARCHIVE_INTERVAL} seconds")
print(f" Auto-training: Every {AUTO_TRAINING_INTERVAL} seconds")
print(f" Auto-save: {'ENABLED' if AUTO_SAVE_ENABLED else 'DISABLED'}")
print("")
if OPENAI_API_KEY:
print("βœ… OpenAI API Key Configured")
# Start auto-archive worker
archive_thread = threading.Thread(target=auto_archive_worker, daemon=True)
archive_thread.start()
print("βœ… Auto-archive worker started")
# Start auto-training worker
training_thread = threading.Thread(target=auto_training_worker, daemon=True)
training_thread.start()
print("βœ… Auto-training worker started - will generate examples every 5 minutes")
else:
print("⚠️ OpenAI API Key Missing - Add OPENAI_API_KEY to enable")
if HF_TOKEN:
print("βœ… HuggingFace Integration Active")
training_data = load_training_dataset()
level_counts = {1: 0, 2: 0, 3: 0}
for d in training_data:
level = d.get("level", 1)
level_counts[level] = level_counts.get(level, 0) + 1
print(f"πŸ“š Current dataset: L1={level_counts[1]}, L2={level_counts[2]}, L3={level_counts[3]}")
if AUTO_SAVE_ENABLED:
print("πŸ’Ύ Auto-save ENABLED - all responses saved immediately to dataset")
else:
print("⚠️ HuggingFace Integration Disabled - Add HF_TOKEN to enable")
print("")
app.run(host="0.0.0.0", port=port, debug=False, threaded=True)