|
|
import os |
|
|
import json |
|
|
from pathlib import Path |
|
|
from datetime import datetime |
|
|
from typing import List, Dict, Any |
|
|
|
|
|
import numpy as np |
|
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
|
|
|
try: |
|
|
import gradio as gr |
|
|
GRADIO_AVAILABLE = True |
|
|
except Exception: |
|
|
GRADIO_AVAILABLE = False |
|
|
|
|
|
try: |
|
|
from huggingface_hub import hf_hub_upload |
|
|
HF_AVAILABLE = True |
|
|
except Exception: |
|
|
HF_AVAILABLE = False |
|
|
|
|
|
DB_PATH = Path("synchronicities.json") |
|
|
|
|
|
|
|
|
class SynchronicityDB: |
|
|
def __init__(self, path: Path = DB_PATH): |
|
|
self.path = path |
|
|
if not self.path.exists(): |
|
|
self._write({"entries": []}) |
|
|
self._data = self._read() |
|
|
|
|
|
def _read(self): |
|
|
with open(self.path, "r", encoding="utf-8") as f: |
|
|
return json.load(f) |
|
|
|
|
|
def _write(self, data): |
|
|
with open(self.path, "w", encoding="utf-8") as f: |
|
|
json.dump(data, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
def add_entry(self, text: str, tags: List[str], outcome: str = "", witness: str = "Asset 448804922"): |
|
|
entry = { |
|
|
"id": len(self._data["entries"]) + 1, |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z", |
|
|
"text": text, |
|
|
"tags": tags, |
|
|
"outcome": outcome, |
|
|
"witness": witness, |
|
|
} |
|
|
self._data["entries"].append(entry) |
|
|
self._write(self._data) |
|
|
return entry |
|
|
|
|
|
def all_texts(self) -> List[str]: |
|
|
return [e["text"] for e in self._data["entries"]] |
|
|
|
|
|
def all_entries(self) -> List[Dict[str, Any]]: |
|
|
return self._data["entries"] |
|
|
|
|
|
def export_json(self) -> str: |
|
|
return json.dumps(self._data, indent=2, ensure_ascii=False) |
|
|
|
|
|
def reset(self): |
|
|
self._write({"entries": []}) |
|
|
return True |
|
|
|
|
|
|
|
|
def extract_tfidf_matrix(texts: List[str]): |
|
|
if not texts: |
|
|
return None, None |
|
|
vect = TfidfVectorizer(max_features=2000, stop_words="english") |
|
|
mat = vect.fit_transform(texts) |
|
|
return mat, vect |
|
|
|
|
|
|
|
|
def find_similar(new_text: str, db_texts: List[str], top_k: int = 5): |
|
|
if not db_texts: |
|
|
return [] |
|
|
texts = db_texts + [new_text] |
|
|
mat, _ = extract_tfidf_matrix(texts) |
|
|
if mat is None: |
|
|
return [] |
|
|
sims = cosine_similarity(mat[-1], mat[:-1]).flatten() |
|
|
idx_sorted = np.argsort(-sims) |
|
|
results = [] |
|
|
for i in idx_sorted[:top_k]: |
|
|
results.append({"index": int(i), "score": float(sims[i])}) |
|
|
return results |
|
|
|
|
|
|
|
|
def coherence_score(matches: List[Dict[str, float]]): |
|
|
if not matches: |
|
|
return 0.0 |
|
|
return float(np.mean([m["score"] for m in matches])) |
|
|
|
|
|
|
|
|
def predict_outcomes(matches: List[Dict[str, Any]], db_entries: List[Dict[str, Any]]): |
|
|
if not matches: |
|
|
return "No prediction — not enough history." |
|
|
outcomes = [] |
|
|
tag_counts: Dict[str, int] = {} |
|
|
for m in matches: |
|
|
idx = m.get("index") |
|
|
if idx is None: |
|
|
continue |
|
|
if idx < 0 or idx >= len(db_entries): |
|
|
continue |
|
|
e = db_entries[idx] |
|
|
if e.get("outcome"): |
|
|
outcomes.append(e["outcome"]) |
|
|
for t in e.get("tags", []): |
|
|
tag_counts[t] = tag_counts.get(t, 0) + 1 |
|
|
|
|
|
suggestion_parts: List[str] = [] |
|
|
if outcomes: |
|
|
from collections import Counter |
|
|
c = Counter(outcomes) |
|
|
top_outcome, cnt = c.most_common(1)[0] |
|
|
suggestion_parts.append(f"Observed outcome pattern: '{top_outcome}' (seen {cnt} times among similar entries)") |
|
|
|
|
|
if tag_counts: |
|
|
sorted_tags = sorted(tag_counts.items(), key=lambda x: -x[1]) |
|
|
top_tags = [t for t, _ in sorted_tags[:3]] |
|
|
suggestion_parts.append(f"Recurring tags among similar events: {', '.join(top_tags)}") |
|
|
|
|
|
if not suggestion_parts: |
|
|
return "No clear prediction from similar entries. Consider recording outcomes for better forecasts." |
|
|
|
|
|
return " | ".join(suggestion_parts) |
|
|
|
|
|
|
|
|
def upload_db_to_hf(file_path: Path, repo_id: str, token: str, commit_message: str = "Update synchronicities.json"): |
|
|
if not HF_AVAILABLE: |
|
|
return False, "huggingface_hub not installed" |
|
|
if not token: |
|
|
return False, "No HF token provided" |
|
|
try: |
|
|
with open(file_path, "rb") as f: |
|
|
hf_hub_upload(repo_id=repo_id, path_or_fileobj=f, path_in_repo="synchronicities.json", token=token, repo_type="space") |
|
|
return True, "Uploaded to Hugging Face Hub" |
|
|
except Exception as e: |
|
|
return False, str(e) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
db = SynchronicityDB() |
|
|
|
|
|
|
|
|
def bot_response(user_message: str) -> str: |
|
|
lines = [l.strip() for l in user_message.splitlines() if l.strip()] |
|
|
tags: List[str] = [] |
|
|
outcome = "" |
|
|
text_lines: List[str] = [] |
|
|
for ln in lines: |
|
|
if ln.upper().startswith("TAGS:"): |
|
|
tags = [t.strip() for t in ln.split(":", 1)[1].split(",") if t.strip()] |
|
|
elif ln.upper().startswith("OUTCOME:"): |
|
|
outcome = ln.split(":", 1)[1].strip() |
|
|
else: |
|
|
text_lines.append(ln) |
|
|
|
|
|
text = " ".join(text_lines).strip() |
|
|
|
|
|
if not text: |
|
|
return "I didn't catch the event text. Please describe the synchronicity." |
|
|
|
|
|
entry = db.add_entry(text=text, tags=tags, outcome=outcome) |
|
|
|
|
|
db_texts = db.all_texts()[:-1] |
|
|
matches = find_similar(new_text=text, db_texts=db_texts, top_k=5) |
|
|
score = coherence_score(matches) |
|
|
|
|
|
assistant_parts: List[str] = [] |
|
|
assistant_parts.append("🌙 — The Oracle records your entry into the ledger of coincidence.") |
|
|
assistant_parts.append(f"A coherence whisper: {score:.3f} (0–1, higher means more resonance with past entries)") |
|
|
|
|
|
if matches: |
|
|
assistant_parts.append("I perceive echoes from the archive:") |
|
|
for m in matches: |
|
|
idx = m.get("index") |
|
|
if idx is None: |
|
|
continue |
|
|
if idx < 0 or idx >= len(db.all_entries()): |
|
|
continue |
|
|
e = db.all_entries()[idx] |
|
|
snippet = e["text"][:180] + ("..." if len(e["text"]) > 180 else "") |
|
|
assistant_parts.append(f"— {snippet} (score {m['score']:.3f}) — tags: {', '.join(e.get('tags', []))}") |
|
|
|
|
|
prediction = predict_outcomes(matches, db.all_entries()) |
|
|
assistant_parts.append("Possible suggestion & pattern note:") |
|
|
assistant_parts.append(prediction) |
|
|
|
|
|
assistant_parts.append("If you wish to tag this as an observation only, add 'OUTCOME: none'. To attach tags, write 'TAGS: tag1, tag2' on a new line.") |
|
|
|
|
|
assistant = "\n\n".join(assistant_parts) |
|
|
|
|
|
hf_token = os.environ.get("HF_TOKEN") |
|
|
hf_repo = os.environ.get("HF_REPO") |
|
|
if hf_token and hf_repo: |
|
|
ok, msg = upload_db_to_hf(DB_PATH, hf_repo, hf_token) |
|
|
if ok: |
|
|
assistant += "\n\n📡 The ledger was synchronized with your Hugging Face Space." |
|
|
else: |
|
|
assistant += f"\n\n⚠️ Sync to Hugging Face failed: {msg}" |
|
|
|
|
|
return assistant |
|
|
|
|
|
|
|
|
def reset_db_action(): |
|
|
db.reset() |
|
|
return "Database cleared." |
|
|
|
|
|
|
|
|
def export_db_action(): |
|
|
return db.export_json() |
|
|
|
|
|
|
|
|
if GRADIO_AVAILABLE: |
|
|
with gr.Blocks(title="Quantum Synchronicity Chatbot") as demo: |
|
|
gr.Markdown("# Quantum Synchronicity Chatbot — Oracle Interface") |
|
|
gr.Markdown("A mystical-toned chat interface. To add an entry, simply paste the description. Optional lines:\nTAGS: mirror, 11:11\nOUTCOME: travel_home\n\nIf HF_TOKEN and HF_REPO are set as environment variables, the database will try to sync after each entry.") |
|
|
|
|
|
chatbot = gr.Chatbot(label="Oracle") |
|
|
msg = gr.Textbox(placeholder="Type your synchronicity or question here...\n(You can add TAGS: and OUTCOME: on separate lines)") |
|
|
clear = gr.Button("Clear chat") |
|
|
|
|
|
with gr.Row(): |
|
|
add_btn = gr.Button("Add entry & analyze") |
|
|
export_btn = gr.Button("Export DB JSON") |
|
|
reset_btn = gr.Button("Reset DB") |
|
|
|
|
|
db_output = gr.Textbox(label="Database (JSON export)", lines=8) |
|
|
|
|
|
def user_submit(user_input, history): |
|
|
history = history or [] |
|
|
assistant_text = bot_response(user_input) |
|
|
history.append((user_input, assistant_text)) |
|
|
return history |
|
|
|
|
|
add_btn.click(fn=user_submit, inputs=[msg, chatbot], outputs=[chatbot]) |
|
|
export_btn.click(fn=export_db_action, inputs=None, outputs=[db_output]) |
|
|
reset_btn.click(fn=reset_db_action, inputs=None, outputs=[db_output]) |
|
|
|
|
|
clear.click(lambda: [], None, chatbot) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |
|
|
else: |
|
|
def cli_help(): |
|
|
print("Gradio is not installed in this environment. Running in CLI fallback mode.") |
|
|
print("Commands:\n add - Add a new synchronicity\n export - Print DB JSON\n reset - Clear the DB\n tests - Run basic tests\n exit - Quit") |
|
|
|
|
|
def cli_loop(): |
|
|
cli_help() |
|
|
while True: |
|
|
cmd = input("> ").strip() |
|
|
if not cmd: |
|
|
continue |
|
|
if cmd == "exit": |
|
|
break |
|
|
if cmd == "help": |
|
|
cli_help() |
|
|
continue |
|
|
if cmd == "add": |
|
|
print("Enter your synchronicity text (end with a blank line):") |
|
|
lines = [] |
|
|
while True: |
|
|
try: |
|
|
ln = input() |
|
|
except EOFError: |
|
|
ln = "" |
|
|
if ln.strip() == "": |
|
|
break |
|
|
lines.append(ln) |
|
|
text = " ".join(lines).strip() |
|
|
print("Optional: enter TAGS: comma,separated or leave blank:") |
|
|
tags_line = input().strip() |
|
|
tags = [t.strip() for t in tags_line.split(",") if t.strip()] if tags_line else [] |
|
|
print("Optional: enter OUTCOME: or leave blank:") |
|
|
outcome = input().strip() |
|
|
assistant = bot_response(f"{text}\nTAGS: {', '.join(tags)}\nOUTCOME: {outcome}") |
|
|
print("\n---\n") |
|
|
print(assistant) |
|
|
print("\n---\n") |
|
|
continue |
|
|
if cmd == "export": |
|
|
print(export_db_action()) |
|
|
continue |
|
|
if cmd == "reset": |
|
|
print(reset_db_action()) |
|
|
continue |
|
|
if cmd == "tests": |
|
|
run_tests() |
|
|
continue |
|
|
print("Unknown command. Type 'help' for options.") |
|
|
|
|
|
def run_tests(): |
|
|
import tempfile |
|
|
print("Running basic tests...") |
|
|
with tempfile.TemporaryDirectory() as td: |
|
|
test_path = Path(td) / "test_db.json" |
|
|
test_db = SynchronicityDB(path=test_path) |
|
|
assert test_db.all_entries() == [] |
|
|
e1 = test_db.add_entry("Saw mirror, 11:11 on the train", ["mirror", "11:11"], outcome="trip") |
|
|
assert e1["id"] == 1 |
|
|
e2 = test_db.add_entry("Heard same song twice", ["song"], outcome="meeting") |
|
|
assert e2["id"] == 2 |
|
|
texts = test_db.all_texts() |
|
|
assert len(texts) == 2 |
|
|
sims = find_similar("Saw mirror again", texts, top_k=2) |
|
|
assert isinstance(sims, list) |
|
|
print("All tests passed.") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
print("Gradio not available. To use the web UI, install gradio (`pip install gradio`).") |
|
|
print("If you'd like me to change expected behavior for any command, tell me in chat.") |
|
|
cli_loop() |
|
|
|