#!/usr/bin/env python3 """ Antigravity transcript sync — personal/business-specific sync logic extracted from the generic MCP bridge (D1.3). Reads Gemini Antigravity brain transcripts and posts them to agentmemory as legacy session observations. Usage (standalone): python examples/antigravity_sync.py [--mode current_session|current_folder|all] Or via MCP if you re-add these functions to your own mcp_stdio.py. """ import os import sys import json import glob import re import datetime import requests # Locate the base URL from the environment BASE = os.getenv("AGENTMEMORY_URL", "http://127.0.0.1:3111").rstrip("/") if not BASE.endswith("/agentmemory"): BASE = f"{BASE}/agentmemory" _secret = os.getenv("AGENTMEMORY_SECRET") def _headers(): h = {"Content-Type": "application/json"} if _secret: h["Authorization"] = f"Bearer {_secret}" return h def perform_antigravity_sync_local(args): """Sync Antigravity transcripts → agentmemory legacy session observations.""" mode = args.get("mode") or "current_session" current_conversation_id = args.get("currentConversationId") current_folder = args.get("currentFolder") brain_dir = os.path.join(os.path.expanduser("~"), ".gemini", "antigravity", "brain") if not os.path.exists(brain_dir): return { "content": [{"type": "text", "text": json.dumps({ "success": False, "syncedSessions": [], "observationsAdded": 0, "error": f"Brain directory not found at {brain_dir}" })}] } pattern = os.path.join(brain_dir, "*", ".system_generated", "logs", "transcript.jsonl") files = glob.glob(pattern) if not files: return { "content": [{"type": "text", "text": json.dumps({ "success": True, "syncedSessions": [], "observationsAdded": 0 })}] } conversations = [] for fpath in files: try: mtime = os.path.getmtime(fpath) convo_id = os.path.basename(os.path.dirname(os.path.dirname(os.path.dirname(fpath)))) conversations.append({"id": convo_id, "transcriptPath": fpath, "mtime": mtime}) except Exception: pass if not conversations: return {"content": [{"type": "text", "text": json.dumps({"success": True, "syncedSessions": [], "observationsAdded": 0})}]} conversations.sort(key=lambda x: x["mtime"], reverse=True) targets = [] if mode == "current_session": if current_conversation_id: match = next((c for c in conversations if c["id"] == current_conversation_id), None) if match: targets = [match] else: targets = [conversations[0]] elif mode == "current_folder": target_folder = (current_folder or os.getcwd()).replace("\\", "/").lower().strip() for convo in conversations: try: with open(convo["transcriptPath"], "r", encoding="utf-8") as tf: text = tf.read().lower().replace("\\/", "/").replace("\\\\", "/") if target_folder in text: targets.append(convo) except Exception: pass elif mode == "all": targets = conversations else: return {"content": [{"type": "text", "text": json.dumps({"success": False, "error": f"Invalid mode: {mode}"})}]} if not targets: return {"content": [{"type": "text", "text": json.dumps({"success": True, "syncedSessions": [], "observationsAdded": 0})}]} synced_sessions = [] observations_added = 0 headers_dict = _headers() for convo in targets: convo_id = convo["id"] tpath = convo["transcriptPath"] session_id = f"antigravity_{convo_id[:18].replace('-', '_')}" project_path = None try: with open(tpath, "r", encoding="utf-8") as tf: first_line = tf.readline() if first_line: step = json.loads(first_line) m = re.search(r"\[([^\]]+)\]\s*->\s*\[([^\]]+)\]", step.get("content", "")) if m: project_path = m.group(2) except Exception: pass if not project_path: project_path = os.getcwd() turns = [] current_prompt = None current_timestamp = None try: with open(tpath, "r", encoding="utf-8") as tf: for line in tf: if not line.strip(): continue try: step = json.loads(line) step_type = step.get("type") if step_type == "USER_INPUT": p_text = step.get("content", "") if "" in p_text: p_text = p_text.split("")[1] if "" in p_text: p_text = p_text.split("")[0] current_prompt = p_text.strip() current_timestamp = step.get("created_at") elif step_type == "PLANNER_RESPONSE" and current_prompt: ts = current_timestamp or step.get("created_at") or datetime.datetime.utcnow().isoformat() + "Z" turns.append({"prompt": current_prompt, "response": step.get("content", ""), "timestamp": ts}) current_prompt = None current_timestamp = None except Exception: pass except Exception: continue if not turns: continue convo_synced = False for turn in turns: payload = { "sessionId": session_id, "project": project_path, "cwd": project_path, "hookType": "post_tool_use", "timestamp": turn["timestamp"], "agentId": "antigravity", "data": {"tool_name": "conversation", "tool_input": turn["prompt"], "tool_output": turn["response"]}, } try: requests.post(f"{BASE}/observe", headers=headers_dict, json=payload, timeout=10) observations_added += 1 convo_synced = True except Exception: pass if convo_synced: synced_sessions.append(convo_id) return {"content": [{"type": "text", "text": json.dumps({ "success": True, "syncedSessions": synced_sessions, "observationsAdded": observations_added, })}]} def perform_antigravity_sync_all_local(args): """Master sync: transcript + crystallize + reflect.""" sync_res_outer = perform_antigravity_sync_local(args) try: sync_res = json.loads(sync_res_outer["content"][0]["text"]) except Exception: return sync_res_outer if not sync_res.get("success"): return sync_res_outer processed_sessions = sync_res.get("syncedSessions", []) crystallizations = {} reflections = {} headers_dict = _headers() for cid in processed_sessions: session_id = f"antigravity_{cid[:18].replace('-', '_')}" try: r_sum = requests.post(f"{BASE}/summarize", headers=headers_dict, json={"sessionId": session_id}, timeout=30) crystallizations[session_id] = r_sum.json() if r_sum.status_code == 200 else {"success": False, "error": r_sum.text} except Exception as e: crystallizations[session_id] = {"success": False, "error": str(e)} try: r_ref = requests.post(f"{BASE}/slot/reflect", headers=headers_dict, json={"sessionId": session_id, "maxObservations": 50}, timeout=30) reflections[session_id] = r_ref.json() if r_ref.status_code == 200 else {"success": False, "error": r_ref.text} except Exception as e: reflections[session_id] = {"success": False, "error": str(e)} return {"content": [{"type": "text", "text": json.dumps({ "success": True, "syncedSessions": processed_sessions, "observationsAdded": sync_res.get("observationsAdded", 0), "crystallizations": crystallizations, "reflections": reflections, }, indent=2)}]} if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Sync Antigravity transcripts to agentmemory") parser.add_argument("--mode", default="current_session", choices=["current_session", "current_folder", "all"]) parser.add_argument("--sync-all", action="store_true", help="Also crystallize and reflect") a = parser.parse_args() if a.sync_all: result = perform_antigravity_sync_all_local({"mode": a.mode}) else: result = perform_antigravity_sync_local({"mode": a.mode}) print(result["content"][0]["text"])