Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import pandas as pd | |
| import random | |
| import time | |
| import requests | |
| import json | |
| import re | |
| import matplotlib.pyplot as plt | |
| import networkx as nx | |
| from datetime import datetime | |
| import gradio as gr | |
| import io | |
| from PIL import Image | |
| # ------------------------------------------------------------------- | |
| # TemporalPredictionSystem Class Definition | |
| # ------------------------------------------------------------------- | |
| class TemporalPredictionSystem: | |
| def __init__(self): | |
| self.nodes = [] | |
| self.edges = [] | |
| self.time_scales = [0.1, 0.5, 1, 2, 5] # Different time scales | |
| self.recursion_depth = 3 | |
| self.causal_strength = 5 | |
| self.time_scale = 3 | |
| # System metrics | |
| self.prediction_accuracy = 0 | |
| self.causal_consistency = 0 | |
| self.meta_rule_optimizations = 0 | |
| self.fixed_point_iterations = 0 | |
| # Simulation state | |
| self.is_running = False | |
| self.tick_count = 0 | |
| # Constants | |
| self.MAX_NODES = 50 | |
| self.NODE_DECAY_RATE = 0.02 | |
| self.EDGE_THRESHOLD = 0.3 | |
| # Markov Chain data for text predictions | |
| self.word_corpus = [ | |
| "future", "past", "present", "time", "space", "causality", "effect", "prediction", | |
| "reality", "quantum", "probability", "divergence", "convergence", "branch", "path", | |
| "timeline", "paradox", "stability", "chaos", "deterministic", "random", "uncertain", | |
| "fixed", "flexible", "loop", "recursive", "retrocausal", "forward", "backward", "cycle", | |
| "wave", "particle", "superposition", "entanglement", "observer", "collapse", "multiverse", | |
| "dimension", "parallel", "simulation", "emergence", "complexity", "singularity", "event", | |
| "horizon", "threshold", "attractor", "repeller", "equilibrium", "evolution", "revolution" | |
| ] | |
| self.quotes_data = [] | |
| self.tags_data = [] | |
| self.markov_chain = {} | |
| self.timelines = [] | |
| self.predicted_futures = [] | |
| # Chat model properties | |
| self.conversation_history = [] | |
| self.personality_traits = { | |
| "optimism": 0.7, | |
| "creativity": 0.8, | |
| "analytical": 0.65, | |
| "humor": 0.5, | |
| "empathy": 0.6 | |
| } | |
| self.learning_rate = 0.05 | |
| self.context_size = 5 | |
| self.knowledge_base = {} | |
| self.temporal_bias = 0.3 # How much to bias toward future perspective | |
| self.self_model_accuracy = 0.2 # Starts low, improves with conversation | |
| self.model_growth_rate = 0 | |
| self.total_interactions = 0 | |
| # Offset for continuous quote expansion | |
| self.quote_offset = 0 | |
| # Response generators | |
| self.response_templates = [ | |
| "Based on your current trajectory, I see {{future}}.", | |
| "From my perspective {{timeYears}} years ahead, I can tell you that {{insight}}.", | |
| "Your interest in {{topic}} eventually leads to {{outcome}}.", | |
| "I remember this conversation. It {{impact}} my thinking significantly.", | |
| "In the future, you'll realize that {{realization}}.", | |
| "What you're doing now with {{activity}} actually {{futureResult}}.", | |
| "The {{decision}} you're considering has interesting consequences: {{consequences}}.", | |
| "From my vantage point, I'd suggest focusing more on {{suggestion}}.", | |
| "Your intuition about {{subject}} is {{accuracy}} - I know because I've {{evidence}}." | |
| ] | |
| self.insight_patterns = [ | |
| "this challenge ultimately strengthens your {{attribute}}", | |
| "you develop a unique perspective on {{topic}} that others find valuable", | |
| "the uncertainty you feel now resolves into {{outcome}}", | |
| "what seems important now shifts dramatically after you {{experience}}", | |
| "this period of {{state}} leads to unexpected opportunities in {{domain}}", | |
| "your approach to {{activity}} evolves in ways that surprise even you", | |
| "the questions you're asking now form the foundation of your future {{role}}" | |
| ] | |
| # Topic and concept modeling | |
| self.topic_extractor = {} | |
| self.topic_relations = {} | |
| # Initialize with basic topics from word corpus | |
| for word in self.word_corpus: | |
| self.topic_extractor[word] = 1 | |
| self.topic_relations[word] = [] | |
| # Fetch quotes data and build Markov chain | |
| self.fetch_quotes_data() | |
| def fetch_quotes_data(self): | |
| """Fetch quotes data from HuggingFace dataset API with dynamic offset for continuous expansion.""" | |
| try: | |
| url = f"https://datasets-server.huggingface.co/rows?dataset=Abirate%2Fenglish_quotes&config=default&split=train&offset={self.quote_offset}&length=100" | |
| response = requests.get(url) | |
| data = response.json() | |
| if data and "rows" in data: | |
| for row in data["rows"]: | |
| if row and "row" in row and "quote" in row["row"]: | |
| quote = row["row"]["quote"] | |
| self.quotes_data.append(quote) | |
| # Process quote into words for corpus | |
| words = [w for w in re.sub(r'[^\w\s]', '', quote.lower()).split() if len(w) > 3] | |
| self.word_corpus = list(set(self.word_corpus + words)) | |
| # Extract tags if available | |
| if "tags" in row["row"]: | |
| tags = row["row"]["tags"] if isinstance(row["row"]["tags"], list) else [row["row"]["tags"]] | |
| self.tags_data.extend(tags) | |
| tag_words = [tag.lower() for tag in tags if len(tag) > 3] | |
| self.word_corpus = list(set(self.word_corpus + tag_words)) | |
| print(f"Loaded {len(self.quotes_data)} quotes and {len(self.tags_data)} tags") | |
| print(f"Word corpus expanded to {len(self.word_corpus)} words") | |
| # Update topic extractor with any new words | |
| for word in self.word_corpus: | |
| if word not in self.topic_extractor: | |
| self.topic_extractor[word] = 1 | |
| self.topic_relations[word] = [] | |
| # Rebuild Markov chain with new data | |
| self.build_initial_markov_chain() | |
| # Increment offset for the next fetch | |
| self.quote_offset += 100 | |
| except Exception as e: | |
| print(f"Error fetching quotes data: {e}") | |
| self.build_initial_markov_chain() | |
| def build_initial_markov_chain(self): | |
| """Build a Markov chain from quotes and corpus data""" | |
| chain = {} | |
| for quote in self.quotes_data: | |
| words = [w for w in re.sub(r'[^\w\s]', '', quote.lower()).split() if len(w) > 3] | |
| for i in range(len(words) - 1): | |
| word = words[i] | |
| next_word = words[i + 1] | |
| if word not in chain: | |
| chain[word] = {} | |
| chain[word][next_word] = chain[word].get(next_word, 0) + 1 | |
| if len(chain) < 30: | |
| for i in range(len(self.word_corpus)): | |
| word = self.word_corpus[i] | |
| next_word = self.word_corpus[(i + 1) % len(self.word_corpus)] | |
| if word not in chain: | |
| chain[word] = {} | |
| chain[word][next_word] = chain[word].get(next_word, 0) + 1 | |
| for tag in self.tags_data: | |
| related_words = random.sample(self.word_corpus, min(5, len(self.word_corpus))) | |
| tag_word = tag.lower() | |
| if len(tag_word) > 3: | |
| if tag_word not in chain: | |
| chain[tag_word] = {} | |
| for related_word in related_words: | |
| chain[tag_word][related_word] = chain[tag_word].get(related_word, 0) + 1 | |
| for i in range(50): | |
| word1 = random.choice(self.word_corpus) | |
| word2 = random.choice(self.word_corpus) | |
| if word1 not in chain: | |
| chain[word1] = {} | |
| chain[word1][word2] = chain[word1].get(word2, 0) + 1 | |
| self.markov_chain = chain | |
| def initialize(self): | |
| """Initialize the system with nodes, edges, and timelines""" | |
| self.generate_initial_nodes() | |
| self.generate_initial_edges() | |
| self.reset_metrics() | |
| self.initialize_timelines() | |
| self.generate_predictions() | |
| def initialize_timelines(self): | |
| """Initialize timelines with seed words and predictions""" | |
| self.timelines = [] | |
| timeline_count = 3 + random.randint(0, 2) | |
| for i in range(timeline_count): | |
| seed_word = random.choice(self.word_corpus) | |
| length = 3 + random.randint(0, 4) | |
| words = [seed_word] | |
| current_word = seed_word | |
| for j in range(1, length): | |
| current_word = self.get_next_word_from_markov(current_word) | |
| words.append(current_word) | |
| self.timelines.append({ | |
| "id": f"timeline-{i}", | |
| "words": words, | |
| "probability": random.random(), | |
| "timeScale": random.randint(0, len(self.time_scales) - 1), | |
| "stability": random.random() | |
| }) | |
| def get_next_word_from_markov(self, current_word): | |
| """Get next word based on Markov chain probabilities""" | |
| if current_word not in self.markov_chain: | |
| return random.choice(self.word_corpus) | |
| next_words = self.markov_chain[current_word] | |
| words = list(next_words.keys()) | |
| weights = list(next_words.values()) | |
| total_weight = sum(weights) | |
| r = random.random() * total_weight | |
| running_sum = 0 | |
| for i in range(len(words)): | |
| running_sum += weights[i] | |
| if r <= running_sum: | |
| return words[i] | |
| return random.choice(words) if words else random.choice(self.word_corpus) | |
| def generate_predictions(self): | |
| """Generate future predictions for each timeline""" | |
| self.predicted_futures = [] | |
| for timeline in self.timelines: | |
| future_count = 2 + random.randint(0, 1) | |
| for i in range(future_count): | |
| last_word = timeline["words"][-1] | |
| future_length = 2 + random.randint(0, 2) | |
| current_word = last_word | |
| future_words = [current_word] | |
| for j in range(1, future_length): | |
| current_word = self.get_next_word_from_markov(current_word) | |
| future_words.append(current_word) | |
| self.predicted_futures.append({ | |
| "id": f"future-{len(self.predicted_futures)}", | |
| "timelineId": timeline["id"], | |
| "words": future_words, | |
| "probability": random.random() * timeline["probability"], | |
| "confidence": random.random(), | |
| "timeScale": timeline["timeScale"] | |
| }) | |
| def generate_initial_nodes(self): | |
| """Generate initial nodes for visualization and computation""" | |
| self.nodes = [] | |
| initial_node_count = 5 + self.recursion_depth * 2 | |
| for i in range(initial_node_count): | |
| self.nodes.append({ | |
| "id": f"node-{i}", | |
| "x": random.random() * 700 + 50, | |
| "y": random.random() * 400 + 50, | |
| "timeScale": random.randint(0, len(self.time_scales) - 1), | |
| "value": random.random(), | |
| "prediction": random.random(), | |
| "age": 0, | |
| "radius": 10 + random.random() * 15, | |
| "word": random.choice(self.word_corpus), | |
| "future": self.generate_future_phrase(3) | |
| }) | |
| def generate_future_phrase(self, length): | |
| """Generate a phrase of specified length using Markov chain""" | |
| words = [] | |
| start_word = random.choice(self.word_corpus) | |
| current_word = start_word | |
| words.append(current_word) | |
| for i in range(1, length): | |
| current_word = self.get_next_word_from_markov(current_word) | |
| words.append(current_word) | |
| return " ".join(words) | |
| def generate_initial_edges(self): | |
| """Generate initial edges between nodes""" | |
| self.edges = [] | |
| for i in range(len(self.nodes)): | |
| source_node = self.nodes[i] | |
| connection_count = random.randint(1, 3) | |
| for j in range(connection_count): | |
| target_index = i | |
| while target_index == i: | |
| target_index = random.randint(0, len(self.nodes) - 1) | |
| target_node = self.nodes[target_index] | |
| self.edges.append({ | |
| "id": f"edge-{len(self.edges)}", | |
| "source": source_node["id"], | |
| "target": target_node["id"], | |
| "strength": random.random() * self.causal_strength / 10, | |
| "age": 0 | |
| }) | |
| def reset_metrics(self): | |
| """Reset system metrics""" | |
| self.prediction_accuracy = 0 | |
| self.causal_consistency = 0 | |
| self.meta_rule_optimizations = 0 | |
| self.fixed_point_iterations = 0 | |
| def tick(self): | |
| """Run one simulation tick""" | |
| self.tick_count += 1 | |
| self.propagate_influences() | |
| self.apply_retrocausality() | |
| self.apply_fixed_point_combination() | |
| if self.tick_count % 10 == 0: | |
| self.optimize_meta_rules() | |
| self.enforce_causal_consistency() | |
| self.manage_node_lifecycle() | |
| self.manage_edge_lifecycle() | |
| self.update_metrics() | |
| if self.tick_count % 5 == 0: | |
| self.update_timelines() | |
| self.generate_predictions() | |
| def propagate_influences(self): | |
| """Propagate causal influences through the network""" | |
| node_values_snapshot = [{ | |
| "id": node["id"], | |
| "value": node["value"], | |
| "prediction": node["prediction"] | |
| } for node in self.nodes] | |
| for node in self.nodes: | |
| incoming_edges = [edge for edge in self.edges if edge["target"] == node["id"]] | |
| if incoming_edges: | |
| influence_sum = 0 | |
| weight_sum = 0 | |
| for edge in incoming_edges: | |
| source_node = next((n for n in node_values_snapshot if n["id"] == edge["source"]), None) | |
| if source_node: | |
| influence_sum += source_node["value"] * edge["strength"] | |
| weight_sum += edge["strength"] | |
| if weight_sum > 0: | |
| scale_factor = self.time_scales[node["timeScale"]] * (self.time_scale / 3) | |
| influence_value = influence_sum / weight_sum | |
| node["value"] = (node["value"] * (1 - scale_factor * 0.1) + | |
| influence_value * scale_factor * 0.1 + | |
| (random.random() - 0.5) * 0.02) | |
| node["value"] = max(0, min(1, node["value"])) | |
| def apply_retrocausality(self): | |
| """Apply backward-in-time influence to update predictions""" | |
| for node in self.nodes: | |
| outgoing_edges = [edge for edge in self.edges if edge["source"] == node["id"]] | |
| if outgoing_edges: | |
| prediction_error = 0 | |
| for edge in outgoing_edges: | |
| target_node = next((n for n in self.nodes if n["id"] == edge["target"]), None) | |
| if target_node: | |
| error = abs(target_node["value"] - target_node["prediction"]) | |
| prediction_error += error * edge["strength"] | |
| adjustment_factor = 0.05 * (self.recursion_depth / 5) | |
| node["prediction"] = node["prediction"] * (1 - adjustment_factor) + \ | |
| (node["value"] + (random.random() - 0.5) * 0.1) * adjustment_factor | |
| node["prediction"] = max(0, min(1, node["prediction"])) | |
| def apply_fixed_point_combination(self): | |
| """Apply Y combinator-inspired approach to find fixed points""" | |
| if self.tick_count % 5 == 0: | |
| iterations = 1 + self.recursion_depth // 2 | |
| for i in range(iterations): | |
| nodes_to_update = random.sample( | |
| self.nodes, | |
| min(3 + self.recursion_depth // 2, len(self.nodes)) | |
| ) | |
| for node in nodes_to_update: | |
| old_value = node["value"] | |
| combined_value = (node["value"] + node["prediction"]) / 2 | |
| node["value"] = old_value * 0.8 + combined_value * 0.2 | |
| self.fixed_point_iterations += 1 | |
| def optimize_meta_rules(self): | |
| """Optimize system meta-rules based on performance""" | |
| current_accuracy = self.calculate_prediction_accuracy() | |
| if current_accuracy < 0.6: | |
| for edge in self.edges: | |
| source_node = next((n for n in self.nodes if n["id"] == edge["source"]), None) | |
| target_node = next((n for n in self.nodes if n["id"] == edge["target"]), None) | |
| if source_node and target_node: | |
| prediction_error = abs(source_node["prediction"] - target_node["value"]) | |
| if prediction_error < 0.3: | |
| edge["strength"] = min(1, edge["strength"] * 1.05) | |
| else: | |
| edge["strength"] = max(0.1, edge["strength"] * 0.95) | |
| if random.random() < 0.2 and len(self.edges) < len(self.nodes) * 3: | |
| self.create_new_edge() | |
| self.meta_rule_optimizations += 1 | |
| if random.random() < 0.1 and len(self.nodes) < self.MAX_NODES: | |
| self.create_new_node() | |
| def enforce_causal_consistency(self): | |
| """Enforce logical consistency in causal relationships""" | |
| causal_loops = self.detect_causal_loops() | |
| if causal_loops: | |
| for loop in causal_loops: | |
| weakest_edge = None | |
| min_strength = float('inf') | |
| for i in range(len(loop) - 1): | |
| edge = next((e for e in self.edges if e["source"] == loop[i] and e["target"] == loop[i+1]), None) | |
| if edge and edge["strength"] < min_strength: | |
| weakest_edge = edge | |
| min_strength = edge["strength"] | |
| if weakest_edge: | |
| weakest_edge["strength"] *= 0.9 | |
| for node in self.nodes: | |
| incoming_edges = [edge for edge in self.edges if edge["target"] == node["id"]] | |
| if len(incoming_edges) > 1: | |
| contradictions = 0 | |
| for i in range(len(incoming_edges)): | |
| for j in range(i + 1, len(incoming_edges)): | |
| source_node_i = next((n for n in self.nodes if n["id"] == incoming_edges[i]["source"]), None) | |
| source_node_j = next((n for n in self.nodes if n["id"] == incoming_edges[j]["source"]), None) | |
| if source_node_i and source_node_j: | |
| value_difference = abs(source_node_i["value"] - source_node_j["value"]) | |
| if value_difference > 0.5: | |
| contradictions += 1 | |
| incoming_edges[i]["strength"] *= 0.98 | |
| incoming_edges[j]["strength"] *= 0.98 | |
| self.causal_consistency = max(0, self.causal_consistency - contradictions * 0.01) | |
| def manage_node_lifecycle(self): | |
| """Manage node creation and removal""" | |
| for node in self.nodes: | |
| node["age"] += 1 | |
| if len(self.nodes) > self.MAX_NODES: | |
| node_connections = [] | |
| for node in self.nodes: | |
| connections = len([e for e in self.edges if e["source"] == node["id"] or e["target"] == node["id"]]) | |
| node_connections.append({"node": node, "connections": connections}) | |
| node_connections.sort(key=lambda x: (x["connections"], -x["node"]["age"])) | |
| node_to_remove = node_connections[0]["node"] | |
| self.nodes = [n for n in self.nodes if n["id"] != node_to_remove["id"]] | |
| self.edges = [e for e in self.edges if e["source"] != node_to_remove["id"] and e["target"] != node_to_remove["id"]] | |
| def manage_edge_lifecycle(self): | |
| """Manage edge creation and removal""" | |
| for edge in self.edges: | |
| edge["age"] += 1 | |
| self.edges = [edge for edge in self.edges if edge["strength"] > self.EDGE_THRESHOLD] | |
| def update_metrics(self): | |
| """Update system performance metrics""" | |
| self.prediction_accuracy = self.calculate_prediction_accuracy() | |
| if self.tick_count % 20 == 0: | |
| self.causal_consistency = min(1, self.causal_consistency + 0.01) | |
| def calculate_prediction_accuracy(self): | |
| """Calculate how close predictions are to actual values""" | |
| if not self.nodes: | |
| return 0 | |
| total_error = sum(abs(node["value"] - node["prediction"]) for node in self.nodes) | |
| average_error = total_error / len(self.nodes) | |
| return max(0, 1 - average_error) | |
| def detect_causal_loops(self): | |
| """Detect causal loops in the network using DFS""" | |
| loops = [] | |
| visited = set() | |
| def dfs(node_id, path): | |
| if node_id in path: | |
| loops.append(path[path.index(node_id):] + [node_id]) | |
| return | |
| if node_id in visited: | |
| return | |
| path.append(node_id) | |
| visited.add(node_id) | |
| outgoing_edges = [edge for edge in self.edges if edge["source"] == node_id] | |
| for edge in outgoing_edges: | |
| dfs(edge["target"], path[:]) | |
| for node in self.nodes: | |
| if node["id"] not in visited: | |
| dfs(node["id"], []) | |
| return loops | |
| def create_new_node(self): | |
| """Create a new node connected to existing nodes""" | |
| node_id = f"node-{len(self.nodes)}" | |
| x, y = 0, 0 | |
| if self.nodes: | |
| reference_node = random.choice(self.nodes) | |
| x = reference_node["x"] + (random.random() - 0.5) * 100 | |
| y = reference_node["y"] + (random.random() - 0.5) * 100 | |
| x = max(50, min(750, x)) | |
| y = max(50, min(450, y)) | |
| else: | |
| x = random.random() * 700 + 50 | |
| y = random.random() * 400 + 50 | |
| word = random.choice(self.word_corpus) | |
| if self.timelines: | |
| random_timeline = random.choice(self.timelines) | |
| word = random.choice(random_timeline["words"]) | |
| new_node = { | |
| "id": node_id, | |
| "x": x, | |
| "y": y, | |
| "timeScale": random.randint(0, len(self.time_scales) - 1), | |
| "value": random.random(), | |
| "prediction": random.random(), | |
| "age": 0, | |
| "radius": 10 + random.random() * 15, | |
| "word": word, | |
| "future": self.generate_future_phrase(3) | |
| } | |
| self.nodes.append(new_node) | |
| connection_count = random.randint(1, 3) | |
| for i in range(connection_count): | |
| if len(self.nodes) > 1: | |
| target_index = 0 | |
| while True: | |
| target_index = random.randint(0, len(self.nodes) - 1) | |
| if self.nodes[target_index]["id"] != new_node["id"]: | |
| break | |
| self.edges.append({ | |
| "id": f"edge-{len(self.edges)}", | |
| "source": new_node["id"], | |
| "target": self.nodes[target_index]["id"], | |
| "strength": random.random() * 0.5 + 0.3, | |
| "age": 0 | |
| }) | |
| def create_new_edge(self): | |
| """Create a new edge between existing nodes that aren't already connected""" | |
| if len(self.nodes) < 2: | |
| return | |
| source_index = random.randint(0, len(self.nodes) - 1) | |
| target_index = source_index | |
| while target_index == source_index: | |
| target_index = random.randint(0, len(self.nodes) - 1) | |
| source_id = self.nodes[source_index]["id"] | |
| target_id = self.nodes[target_index]["id"] | |
| edge_exists = any(edge["source"] == source_id and edge["target"] == target_id for edge in self.edges) | |
| if not edge_exists: | |
| self.edges.append({ | |
| "id": f"edge-{len(self.edges)}", | |
| "source": source_id, | |
| "target": target_id, | |
| "strength": random.random() * 0.5 + 0.3, | |
| "age": 0 | |
| }) | |
| def update_timelines(self): | |
| """Update existing timelines and occasionally create new ones""" | |
| for timeline in self.timelines: | |
| if random.random() < 0.3: | |
| last_word = timeline["words"][-1] | |
| next_word = self.get_next_word_from_markov(last_word) | |
| timeline["words"].append(next_word) | |
| if len(timeline["words"]) > 10: | |
| timeline["words"].pop(0) | |
| related_nodes = [node for node in self.nodes | |
| if node["word"] in timeline["words"] or | |
| timeline["words"][-1] in node["future"]] | |
| if related_nodes: | |
| avg_prediction_accuracy = sum(abs(node["value"] - node["prediction"]) for node in related_nodes) / len(related_nodes) | |
| timeline["probability"] = max(0.1, 1 - avg_prediction_accuracy) | |
| timeline["stability"] = max(0.1, timeline["stability"] * 0.95 + self.causal_consistency * 0.05) | |
| if random.random() < 0.2 and len(self.timelines) < 8: | |
| if self.timelines: | |
| source_timeline = random.choice(self.timelines) | |
| branch_point = random.randint(0, len(source_timeline["words"]) - 1) | |
| new_words = source_timeline["words"][:branch_point + 1] | |
| current_word = new_words[-1] | |
| additional_words = 2 + random.randint(0, 2) | |
| for i in range(additional_words): | |
| current_word = self.get_next_word_from_markov(current_word) | |
| new_words.append(current_word) | |
| self.timelines.append({ | |
| "id": f"timeline-{len(self.timelines)}", | |
| "words": new_words, | |
| "probability": source_timeline["probability"] * 0.8, | |
| "timeScale": source_timeline["timeScale"], | |
| "stability": source_timeline["stability"] * 0.8 | |
| }) | |
| self.timelines = [timeline for timeline in self.timelines | |
| if timeline["stability"] > 0.2 or random.random() < 0.7] | |
| def process_user_message(self, message): | |
| """Process user message and generate a response""" | |
| self.conversation_history.append({ | |
| "role": "user", | |
| "content": message, | |
| "timestamp": datetime.now().timestamp() | |
| }) | |
| self.total_interactions += 1 | |
| extracted_topics = self.extract_topics(message) | |
| self.update_knowledge_base(extracted_topics, message) | |
| for _ in range(5): | |
| self.tick() | |
| response = self.generate_future_response(message, extracted_topics) | |
| self.conversation_history.append({ | |
| "role": "future", | |
| "content": response["text"], | |
| "timestamp": datetime.now().timestamp(), | |
| "timeOffset": response["timeOffset"], | |
| "confidence": response["confidence"] | |
| }) | |
| self.update_model_growth() | |
| return response | |
| def extract_topics(self, message): | |
| """Extract topics from message""" | |
| words = [w for w in re.sub(r'[^\w\s]', '', message.lower()).split() if len(w) > 2] | |
| extracted_topics = [] | |
| for word in words: | |
| if word in self.topic_extractor: | |
| extracted_topics.append(word) | |
| self.topic_extractor[word] += 1 | |
| for word in words: | |
| if word not in self.topic_extractor and len(word) > 3: | |
| self.topic_extractor[word] = 1 | |
| extracted_topics.append(word) | |
| for topic in extracted_topics: | |
| if topic != word: | |
| if word not in self.topic_relations: | |
| self.topic_relations[word] = [] | |
| self.topic_relations[word].append(topic) | |
| if topic not in self.topic_relations: | |
| self.topic_relations[topic] = [] | |
| self.topic_relations[topic].append(word) | |
| for word in words: | |
| if len(word) > 3 and word not in self.word_corpus: | |
| self.word_corpus.append(word) | |
| random_existing_words = random.sample(self.word_corpus, min(3, len(self.word_corpus))) | |
| for existing_word in random_existing_words: | |
| if existing_word not in self.markov_chain: | |
| self.markov_chain[existing_word] = {} | |
| self.markov_chain[existing_word][word] = self.markov_chain[existing_word].get(word, 0) + 1 | |
| if word not in self.markov_chain: | |
| self.markov_chain[word] = {} | |
| self.markov_chain[word][existing_word] = self.markov_chain[word].get(existing_word, 0) + 1 | |
| return extracted_topics | |
| def update_knowledge_base(self, topics, message): | |
| """Update knowledge base with topics and message""" | |
| for topic in topics: | |
| if topic not in self.knowledge_base: | |
| self.knowledge_base[topic] = [] | |
| self.knowledge_base[topic].append(message) | |
| if len(self.knowledge_base[topic]) > 10: | |
| self.knowledge_base[topic].pop(0) | |
| def generate_future_response(self, message, topics): | |
| """Generate a response from the future perspective""" | |
| time_offset = self.determine_time_offset() | |
| confidence = 0.3 + (self.self_model_accuracy * 0.5) + (self.model_growth_rate * 0.2) | |
| relevant_timeline = self.select_relevant_timeline(topics) | |
| response_text = self.construct_response(message, topics, time_offset, relevant_timeline) | |
| return { | |
| "text": response_text, | |
| "timeOffset": time_offset, | |
| "confidence": min(0.95, confidence) | |
| } | |
| def determine_time_offset(self): | |
| """Determine how far in the future the response comes from""" | |
| base_years = 1 + int(self.self_model_accuracy * 10) | |
| random_factor = random.random() * 2 - 1 | |
| years = max(1, round(base_years + random_factor)) | |
| return years | |
| def select_relevant_timeline(self, topics): | |
| """Select the most relevant timeline based on topics""" | |
| if not self.timelines: | |
| return None | |
| best_match = None | |
| best_match_score = -1 | |
| for timeline in self.timelines: | |
| score = 0 | |
| for word in timeline["words"]: | |
| if word in topics: | |
| score += 2 | |
| for topic in topics: | |
| if topic in self.topic_relations and word in self.topic_relations[topic]: | |
| score += 1 | |
| score *= timeline["probability"] | |
| if score > best_match_score: | |
| best_match_score = score | |
| best_match = timeline | |
| if best_match_score == 0: | |
| sorted_timelines = sorted(self.timelines, key=lambda t: t["probability"], reverse=True) | |
| best_match = sorted_timelines[random.randint(0, min(2, len(sorted_timelines) - 1))] | |
| return best_match | |
| def construct_response(self, message, topics, time_offset, timeline): | |
| """Construct response using templates and filling in variables""" | |
| template_idx = random.randint(0, len(self.response_templates) - 1) | |
| template = self.response_templates[template_idx] | |
| if "{{future}}" in template: | |
| future_scenario = self.generate_future_scenario(topics, timeline) | |
| template = template.replace("{{future}}", future_scenario) | |
| if "{{timeYears}}" in template: | |
| template = template.replace("{{timeYears}}", str(time_offset)) | |
| if "{{insight}}" in template: | |
| insight = self.generate_insight(topics) | |
| template = template.replace("{{insight}}", insight) | |
| if "{{topic}}" in template and topics: | |
| topic = random.choice(topics) | |
| template = template.replace("{{topic}}", topic) | |
| if "{{outcome}}" in template: | |
| outcome = self.generate_outcome(topics, timeline) | |
| template = template.replace("{{outcome}}", outcome) | |
| if "{{impact}}" in template: | |
| impacts = ["changed", "transformed", "refined", "confirmed", "challenged"] | |
| impact = random.choice(impacts) | |
| template = template.replace("{{impact}}", impact) | |
| if "{{realization}}" in template: | |
| realization = self.generate_realization(topics) | |
| template = template.replace("{{realization}}", realization) | |
| if "{{activity}}" in template and len(message) > 10: | |
| words = message.split() | |
| activity_idx = random.randint(0, len(words) - 1) | |
| end_idx = min(activity_idx + 2, len(words)) | |
| activity = " ".join(words[activity_idx:end_idx]) | |
| template = template.replace("{{activity}}", activity) | |
| if "{{futureResult}}" in template: | |
| results = [ | |
| "becomes a core part of your expertise", | |
| "evolves into something quite different", | |
| "connects to unexpected opportunities", | |
| "proves more valuable than you currently realize", | |
| "becomes a foundation for your future work" | |
| ] | |
| result = random.choice(results) | |
| template = template.replace("{{futureResult}}", result) | |
| if "{{decision}}" in template: | |
| decisions = ["path", "choice", "approach", "perspective", "commitment", "idea"] | |
| decision = random.choice(decisions) | |
| template = template.replace("{{decision}}", decision) | |
| if "{{consequences}}" in template: | |
| consequences = self.generate_consequences(topics, timeline) | |
| template = template.replace("{{consequences}}", consequences) | |
| if "{{suggestion}}" in template: | |
| suggestion = self.generate_suggestion(topics) | |
| template = template.replace("{{suggestion}}", suggestion) | |
| if "{{subject}}" in template and topics: | |
| subject = random.choice(topics) | |
| template = template.replace("{{subject}}", subject) | |
| if "{{accuracy}}" in template: | |
| accuracies = [ | |
| "surprisingly accurate", "partially correct", | |
| "insightful but incomplete", | |
| "headed in the right direction", "more significant than you realize" | |
| ] | |
| accuracy = random.choice(accuracies) | |
| template = template.replace("{{accuracy}}", accuracy) | |
| if "{{evidence}}" in template: | |
| evidences = [ | |
| "explored it further", "seen the results", "lived through it", | |
| "connected those dots", "studied this extensively", "experienced it firsthand" | |
| ] | |
| evidence = random.choice(evidences) | |
| template = template.replace("{{evidence}}", evidence) | |
| template = re.sub(r'\{\{[^}]+\}\}', "interesting", template) | |
| return template | |
| def generate_future_scenario(self, topics, timeline): | |
| """Generate a future scenario based on topics and timeline""" | |
| scenario = "" | |
| if timeline and timeline["words"]: | |
| scenario = " leads to ".join(timeline["words"]) | |
| predictions = [p for p in self.predicted_futures if p["timelineId"] == timeline["id"]] | |
| if predictions: | |
| prediction = random.choice(predictions) | |
| scenario += ", ultimately resulting in " + " ".join(prediction["words"]) | |
| else: | |
| if topics: | |
| base_topic = random.choice(topics) | |
| current = base_topic | |
| scenario_words = [current] | |
| for i in range(4): | |
| current = self.get_next_word_from_markov(current) | |
| scenario_words.append(current) | |
| scenario = " ".join(scenario_words) | |
| else: | |
| scenario = self.generate_future_phrase(5) | |
| return scenario | |
| def generate_insight(self, topics): | |
| """Generate an insight based on patterns and topics""" | |
| pattern_idx = random.randint(0, len(self.insight_patterns) - 1) | |
| pattern = self.insight_patterns[pattern_idx] | |
| if "{{attribute}}" in pattern: | |
| attributes = [ | |
| "resilience", "creativity", "focus", "adaptability", | |
| "intuition", "strategic thinking", "empathy" | |
| ] | |
| attribute = random.choice(attributes) | |
| pattern = pattern.replace("{{attribute}}", attribute) | |
| if "{{topic}}" in pattern and topics: | |
| topic = random.choice(topics) | |
| pattern = pattern.replace("{{topic}}", topic) | |
| if "{{outcome}}" in pattern: | |
| outcomes = [ | |
| "a clear sense of purpose", "unexpected confidence", | |
| "a unique perspective", "valuable expertise", | |
| "a new direction" | |
| ] | |
| outcome = random.choice(outcomes) | |
| pattern = pattern.replace("{{outcome}}", outcome) | |
| if "{{experience}}" in pattern: | |
| experiences = [ | |
| "encounter a key mentor", "discover a new passion", | |
| "solve a challenging problem", "make a brave decision", | |
| "connect seemingly unrelated ideas" | |
| ] | |
| experience = random.choice(experiences) | |
| pattern = pattern.replace("{{experience}}", experience) | |
| if "{{state}}" in pattern: | |
| states = [ | |
| "exploration", "uncertainty", "learning", "questioning", | |
| "experimenting", "reflection" | |
| ] | |
| state = random.choice(states) | |
| pattern = pattern.replace("{{state}}", state) | |
| if "{{domain}}" in pattern: | |
| domains = [ | |
| "technology", "creativity", "relationships", "career", | |
| "personal growth", "community", "leadership" | |
| ] | |
| domain = random.choice(domains) | |
| pattern = pattern.replace("{{domain}}", domain) | |
| if "{{activity}}" in pattern and topics: | |
| topic = random.choice(topics) | |
| pattern = pattern.replace("{{activity}}", "understanding " + topic) | |
| if "{{role}}" in pattern: | |
| roles = [ | |
| "expertise", "perspective", "approach", "contribution", | |
| "methodology", "philosophy", "framework" | |
| ] | |
| role = random.choice(roles) | |
| pattern = pattern.replace("{{role}}", role) | |
| pattern = re.sub(r'\{\{[^}]+\}\}', "development", pattern) | |
| return pattern | |
| def generate_outcome(self, topics, timeline): | |
| """Generate an outcome based on topics and timeline""" | |
| if timeline and timeline["words"]: | |
| outcome_base = " ".join(timeline["words"][-2:]) | |
| return outcome_base + " becoming significant to you" | |
| if topics: | |
| seed = random.choice(topics) | |
| current = seed | |
| outcome_words = [current] | |
| for i in range(3): | |
| current = self.get_next_word_from_markov(current) | |
| outcome_words.append(current) | |
| return " ".join(outcome_words) | |
| generic_outcomes = [ | |
| "a deeper understanding of what truly matters to you", | |
| "an unexpected opportunity that aligns with your values", | |
| "a more nuanced perspective that serves you well", | |
| "connecting seemingly unrelated areas of interest", | |
| "developing a unique approach that becomes your signature" | |
| ] | |
| return random.choice(generic_outcomes) | |
| def generate_realization(self, topics): | |
| """Generate a realization insight""" | |
| realizations = [ | |
| "the obstacles you're facing now are developing crucial skills", | |
| "your unique combination of interests creates your edge", | |
| "what seems like a detour is actually a direct path", | |
| "the questions you're asking matter more than immediate answers", | |
| "your intuitive approaches often outperform conventional wisdom", | |
| "consistency in small actions leads to your biggest breakthroughs", | |
| "your current doubts become the foundation of future confidence" | |
| ] | |
| realization = random.choice(realizations) | |
| if topics and random.random() > 0.5: | |
| topic = random.choice(topics) | |
| realization += " particularly regarding " + topic | |
| return realization | |
| def generate_consequences(self, topics, timeline): | |
| """Generate potential consequences""" | |
| base_consequence = "" | |
| if timeline and timeline["words"]: | |
| base_consequence = "leading to " + " ".join(timeline["words"][-2:]) | |
| elif topics: | |
| topic = random.choice(topics) | |
| base_consequence = "connecting to " + topic + " in unexpected ways" | |
| else: | |
| consequences = [ | |
| "opening new perspectives", | |
| "creating valuable connections", | |
| "developing specialized knowledge", | |
| "revealing hidden opportunities", | |
| "challenging key assumptions" | |
| ] | |
| base_consequence = random.choice(consequences) | |
| temporal_phrases = [ | |
| "sooner than you expect", | |
| "with effects that cascade over time", | |
| "creating a foundation for future decisions", | |
| "with initial challenges but long-term benefits", | |
| "in ways that aren't immediately obvious" | |
| ] | |
| temporal_phrase = random.choice(temporal_phrases) | |
| return base_consequence + " " + temporal_phrase | |
| def generate_suggestion(self, topics): | |
| """Generate a suggestion based on topics""" | |
| suggestions = [ | |
| "connecting seemingly unrelated ideas and experiences", | |
| "documenting your thought process, not just outcomes", | |
| "pursuing quality of understanding over quantity of information", | |
| "allowing more time for reflection between actions", | |
| "articulating your unique perspective more boldly", | |
| "recognizing patterns across different domains of interest", | |
| "exploring the boundaries where different fields intersect" | |
| ] | |
| suggestion = random.choice(suggestions) | |
| if topics and random.random() > 0.5: | |
| topic = random.choice(topics) | |
| suggestion = topic + " and " + suggestion | |
| return suggestion | |
| def update_model_growth(self): | |
| """Update model growth based on interactions""" | |
| base_growth = 0.005 | |
| topic_diversity = min(1, len(self.topic_extractor) / 100) | |
| new_growth = base_growth * (1 + topic_diversity) | |
| self.model_growth_rate = self.model_growth_rate * 0.95 + new_growth * 0.05 | |
| self.self_model_accuracy = min(0.9, self.self_model_accuracy + self.model_growth_rate) | |
| def get_model_stats(self): | |
| """Get current model statistics""" | |
| return { | |
| "knowledgeSize": len(self.knowledge_base), | |
| "vocabularySize": len(self.word_corpus), | |
| "selfModelAccuracy": round(self.self_model_accuracy * 100), | |
| "growthRate": round(self.model_growth_rate * 10000) / 100, | |
| "totalInteractions": self.total_interactions, | |
| "contextualAwareness": min(100, round(len(self.topic_relations) / 10)), | |
| "learningProgress": min(100, round(self.self_model_accuracy * 100)) | |
| } | |
| def set_recursion_depth(self, depth): | |
| self.recursion_depth = depth | |
| def set_time_scale(self, scale): | |
| self.time_scale = scale | |
| def set_causal_strength(self, strength): | |
| self.causal_strength = strength | |
| def run_simulation_steps(self, steps=10): | |
| for _ in range(steps): | |
| self.tick() | |
| def visualize_network(self, figsize=(12, 8)): | |
| """Visualize the causal network using networkx""" | |
| G = nx.DiGraph() | |
| for node in self.nodes: | |
| G.add_node(node["id"], | |
| word=node["word"], | |
| timeScale=node["timeScale"], | |
| value=node["value"], | |
| prediction=node["prediction"]) | |
| for edge in self.edges: | |
| G.add_edge(edge["source"], edge["target"], | |
| weight=edge["strength"], | |
| age=edge["age"]) | |
| plt.figure(figsize=figsize) | |
| pos = nx.spring_layout(G, seed=42) | |
| time_scale_colors = ['#42c5f4', '#4286f4', '#4242f5', '#8442f5', '#f542f5'] | |
| node_colors = [time_scale_colors[G.nodes[n]['timeScale']] for n in G.nodes] | |
| node_sizes = [100 + 500 * G.nodes[n]['prediction'] for n in G.nodes] | |
| nx.draw_networkx_nodes(G, pos, node_color=node_colors, node_size=node_sizes, alpha=0.8) | |
| edges = G.edges() | |
| edge_weights = [G[u][v]['weight'] * 2 for u, v in edges] | |
| nx.draw_networkx_edges(G, pos, edgelist=edges, width=edge_weights, | |
| edge_color='#8888ff', alpha=0.6, arrows=True, arrowsize=10) | |
| nx.draw_networkx_labels(G, pos, labels={n: G.nodes[n]['word'] for n in G.nodes}, | |
| font_size=10, font_color='white') | |
| plt.title('Temporal Prediction Network') | |
| plt.axis('off') | |
| plt.tight_layout() | |
| return plt | |
| # ------------------------------------------------------------------- | |
| # Global System Initialization | |
| # ------------------------------------------------------------------- | |
| system = TemporalPredictionSystem() | |
| time.sleep(2) | |
| system.run_simulation_steps(20) | |
| print("Future Self Conversation System initialized and pre-trained.") | |
| # ------------------------------------------------------------------- | |
| # Gradio Interface Functions | |
| # ------------------------------------------------------------------- | |
| def chat_fn(user_message, history): | |
| if history is None: | |
| history = [] | |
| response = system.process_user_message(user_message) | |
| bot_message = f"{response['text']} ({response['timeOffset']} years in future | Confidence: {int(response['confidence']*100)}%)" | |
| history.append({"role": "user", "content": user_message}) | |
| history.append({"role": "assistant", "content": bot_message}) | |
| system.run_simulation_steps(5) | |
| return history | |
| def update_network_visualization(): | |
| plt_obj = system.visualize_network() | |
| fig = plt.gcf() | |
| buf = io.BytesIO() | |
| fig.savefig(buf, format="png") | |
| buf.seek(0) | |
| img = Image.open(buf) | |
| plt.close(fig) | |
| return img | |
| def pre_train_fn(progress=gr.Progress()): | |
| steps = 20 | |
| for i in range(steps): | |
| system.run_simulation_steps(1) | |
| progress((i+1)/steps) | |
| yield f"<h3>Pre-training in progress: {(i+1)/steps*100:.0f}% complete</h3>", (i+1)/steps*100, gr.update(visible=False) | |
| time.sleep(0.2) | |
| yield "<h3>Pre-training complete!</h3>", 100, gr.update(visible=True) | |
| # ------------------------------------------------------------------- | |
| # Gradio App Layout (with custom CSS for mobile responsiveness) | |
| # ------------------------------------------------------------------- | |
| with gr.Blocks(title="Future Self Conversation System") as demo: | |
| # Custom CSS for chat height and mobile responsiveness | |
| gr.HTML(""" | |
| <style> | |
| /* Lower chat area height */ | |
| #chatbot { height: 300px !important; } | |
| /* Mobile responsive adjustments */ | |
| @media (max-width: 600px) { | |
| #chatbot { height: 200px !important; } | |
| } | |
| /* Ensure split screen columns stack on mobile */ | |
| .gradio-container .row { flex-wrap: wrap; } | |
| .gradio-container .column { flex: 1 1 300px; } | |
| </style> | |
| """) | |
| # Pre-training progress area (shown on page load) | |
| pretrain_status = gr.HTML("<h3>Initializing pre-training...</h3>") | |
| progress_bar = gr.Slider(minimum=0, maximum=100, value=0, interactive=False, label="Pre-training Progress") | |
| # Main interface (hidden until pre-training completes) | |
| main_interface = gr.Column(visible=False) | |
| with main_interface: | |
| with gr.Row(): | |
| with gr.Column(): | |
| # Use type="messages" to get openai-style dictionaries. | |
| chatbot = gr.Chatbot(label="Conversation", elem_id="chatbot", type="messages") | |
| txt = gr.Textbox(placeholder="Type your message here...", label="Your Message") | |
| send_btn = gr.Button("Send") | |
| with gr.Column(): | |
| network_image = gr.Image(label="Neural Network Visualization") | |
| # A hidden refresh button used to update network visualization automatically. | |
| refresh_btn = gr.Button("Refresh Network", visible=False, elem_id="refresh_network") | |
| refresh_btn.click(fn=update_network_visualization, inputs=[], outputs=network_image) | |
| # Inject custom JS to click the hidden refresh button every 5 seconds. | |
| gr.HTML(""" | |
| <script> | |
| setInterval(function() { | |
| document.getElementById("refresh_network").click(); | |
| }, 5000); | |
| </script> | |
| """) | |
| send_btn.click(fn=chat_fn, inputs=[txt, chatbot], outputs=chatbot).then(lambda: "", None, txt) | |
| # On page load, run pre-training and then reveal the main interface. | |
| demo.load(fn=pre_train_fn, outputs=[pretrain_status, progress_bar, main_interface]) | |
| # ------------------------------------------------------------------- | |
| # Launch the App | |
| # ------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| demo.launch() | |