using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using LightRAG.Core; using LightRAG.Core.Abstractions; using LightRAG.Core.Extraction; using LightRAG.Storage.FileBased; using Newtonsoft.Json.Linq; using Unity.InferenceEngine; using UnityEditor; using UnityEngine; using OnDeviceAgent.AgentCore; using OnDeviceAgent.RagLlm; namespace OnDeviceAgent.AgentCore.Editor { public static class KnowledgeIngestMenu { const string KnowledgeSubdir = "Knowledge"; const string DbSubdir = "DB"; const string ManifestFile = "db_manifest.txt"; static volatile string s_Status = "Starting…"; static volatile float s_Progress = 0f; // Reads ingest config from KnowledgeRagComponent in the active scene; falls back to defaults if absent. static (string endpoint, string model) ResolveIngestConfig() { var rag = UnityEngine.Object.FindAnyObjectByType(FindObjectsInactive.Include); if (rag != null) return (rag.IngestEndpoint, rag.IngestModel); Debug.LogWarning("[RAG] No KnowledgeRagComponent found in the active scene, using defaults (http://localhost:11434 / gemma4:e2b). Add the component to choose a different ingest model."); return ("http://localhost:11434", "gemma4:e2b"); } [MenuItem("OnDeviceAgent/RAG/Ingest Knowledge to DB")] static void Ingest() { var knowledgeDir = Path.Combine(Application.streamingAssetsPath, KnowledgeSubdir); var dbDir = Path.Combine(Application.streamingAssetsPath, DbSubdir); if (!Directory.Exists(knowledgeDir)) { EditorUtility.DisplayDialog("LightRAG Ingest", "Missing StreamingAssets/Knowledge.", "OK"); return; } var txts = Directory.GetFiles(knowledgeDir, "*.txt"); if (txts.Length == 0) { EditorUtility.DisplayDialog("LightRAG Ingest", "No .txt files in StreamingAssets/Knowledge.", "OK"); return; } var (endpoint, model) = ResolveIngestConfig(); var choice = EditorUtility.DisplayDialogComplex( "LightRAG Ingest", $"Ingest {txts.Length} .txt file(s) from StreamingAssets/Knowledge into StreamingAssets/DB.\n" + $"LLM (entity extraction): {model} @ {endpoint}, a running Ollama server is required.\n" + "Embeddings: multilingual-e5-small (on-device). Progress shows live cache growth.", "Clean rebuild", "Cancel", "Incremental"); if (choice == 1) return; if (choice == 0 && Directory.Exists(dbDir)) { try { Directory.Delete(dbDir, true); } catch (Exception e) { Debug.LogException(e); } } Directory.CreateDirectory(dbDir); try { EnsureEmbedderLoaded(); } catch (Exception e) { Debug.LogException(e); EditorUtility.DisplayDialog("LightRAG Ingest", "Failed to load the text embedder, see console.", "OK"); return; } var pump = new EditorPumpDispatcher(); var cts = new CancellationTokenSource(); var sw = System.Diagnostics.Stopwatch.StartNew(); s_Status = "Initializing…"; s_Progress = 0f; Task ingestTask = null; try { ingestTask = RunIngestAsync(dbDir, txts, endpoint, model, pump, cts.Token); while (!ingestTask.IsCompleted) { pump.Drain(); // Progress bar uses a time-based asymptote toward 0.9 (per-file granularity is useless for single-file ingest); elapsed time is the liveness signal. var elapsedSec = sw.Elapsed.TotalSeconds; var timeRamp = (float)(elapsedSec / (elapsedSec + 30.0)) * 0.9f; var progress = Mathf.Clamp01(Mathf.Max(s_Progress, timeRamp)); var info = $"{s_Status} · elapsed {elapsedSec:0}s"; if (EditorUtility.DisplayCancelableProgressBar("LightRAG Ingest", info, progress)) { cts.Cancel(); break; } Thread.Sleep(1); } while (!ingestTask.IsCompleted) { pump.Drain(); Thread.Sleep(1); } pump.Drain(); if (ingestTask.IsFaulted) { Debug.LogException(ingestTask.Exception); EditorUtility.DisplayDialog("LightRAG Ingest", "Ingest failed, see console.", "OK"); } else if (ingestTask.IsCanceled || cts.IsCancellationRequested) { Debug.LogWarning("[RAG] Ingest canceled."); } else { Debug.Log($"[RAG] Ingest complete in {sw.Elapsed.TotalSeconds:0}s. DB at: {dbDir}"); } } finally { cts.Dispose(); EditorUtility.ClearProgressBar(); AssetDatabase.Refresh(); } } public static void IngestHeadless() { // Strip stack traces from Log in batchmode - heartbeat fires every 10s and traces would bury progress. Warnings/errors keep theirs. Application.SetStackTraceLogType(LogType.Log, StackTraceLogType.None); // Batchmode has no scene loaded, so open the sample scene first so ResolveIngestConfig() can read the component's inspector values. const string RagScenePath = "Assets/Examples/VoiceAgent/VoiceAgentSample.unity"; if (File.Exists(RagScenePath)) { UnityEditor.SceneManagement.EditorSceneManager.OpenScene( RagScenePath, UnityEditor.SceneManagement.OpenSceneMode.Single); Debug.Log($"[RAG] Loaded scene for ingest config: {RagScenePath}"); } else { Debug.LogWarning($"[RAG] Scene not found at {RagScenePath}, falling back to ResolveIngestConfig defaults."); } var knowledgeDir = Path.Combine(Application.streamingAssetsPath, KnowledgeSubdir); var dbDir = Path.Combine(Application.streamingAssetsPath, DbSubdir); if (!Directory.Exists(knowledgeDir)) { Debug.LogError("[RAG] Missing StreamingAssets/Knowledge."); return; } var txts = Directory.GetFiles(knowledgeDir, "*.txt"); if (txts.Length == 0) { Debug.LogError("[RAG] No .txt files in StreamingAssets/Knowledge."); return; } var (endpoint, model) = ResolveIngestConfig(); if (Directory.Exists(dbDir)) { try { Directory.Delete(dbDir, true); } catch (Exception e) { Debug.LogException(e); } } Directory.CreateDirectory(dbDir); try { EnsureEmbedderLoaded(); } catch (Exception e) { Debug.LogException(e); return; } var pump = new EditorPumpDispatcher(); var sw = System.Diagnostics.Stopwatch.StartNew(); Debug.Log($"[RAG] Headless ingest start: {txts.Length} file(s), LLM {model} @ {endpoint}."); s_Status = "Initializing…"; s_Progress = 0f; try { var task = RunIngestAsync(dbDir, txts, endpoint, model, pump, CancellationToken.None); // Report LLM cache file size as forward-progress proxy (per-file granularity is useless for single-file); +0 KB across heartbeats = stuck. var cachePath = Path.Combine(dbDir, "kv_store_llm_response_cache.json"); string lastStatus = null; long lastCacheBytes = 0; var heartbeat = System.Diagnostics.Stopwatch.StartNew(); const double heartbeatIntervalSec = 10.0; while (!task.IsCompleted) { pump.Drain(); Thread.Sleep(2); var status = s_Status; if (status != lastStatus || heartbeat.Elapsed.TotalSeconds >= heartbeatIntervalSec) { long cacheBytes = 0; try { if (File.Exists(cachePath)) cacheBytes = new FileInfo(cachePath).Length; } catch { /* file may be mid-write, best-effort sampling */ } var delta = cacheBytes - lastCacheBytes; var sinceLastSec = heartbeat.Elapsed.TotalSeconds; Debug.Log( $"[RAG] {status} (cache {cacheBytes / 1024} KB, +{delta / 1024} KB in {sinceLastSec:0}s, elapsed {sw.Elapsed.TotalSeconds:0}s)"); lastCacheBytes = cacheBytes; lastStatus = status; heartbeat.Restart(); } } pump.Drain(); if (task.IsFaulted) { Debug.LogException(task.Exception); Debug.LogError("[RAG] Headless ingest FAILED."); } else { Debug.Log($"[RAG] Headless ingest COMPLETE in {sw.Elapsed.TotalSeconds:0}s. DB: {dbDir}"); } } finally { AssetDatabase.Refresh(); } } static IChatLlm BuildChatLlm(string ollamaEndpoint, string ollamaModel, int numCtx) { var liteBase = Environment.GetEnvironmentVariable("LITELLM_BASE"); if (!string.IsNullOrEmpty(liteBase)) { var liteModel = Environment.GetEnvironmentVariable("LITELLM_MODEL"); if (string.IsNullOrEmpty(liteModel)) liteModel = ollamaModel; var key = Environment.GetEnvironmentVariable("LITELLM_API_KEY"); Debug.Log($"[RAG] LLM provider: LiteLLM model={liteModel} @ {liteBase}"); return new LiteLlmChatLlm(liteModel, liteBase, key); } Debug.Log($"[RAG] LLM provider: Ollama model={ollamaModel} @ {ollamaEndpoint}"); return new OllamaChatLlm(ollamaModel, ollamaEndpoint, numCtx); } static E5TextEmbedder s_E5; static ITextEmbedder ActiveEmbedder => s_E5; // Must be called on the Unity main thread (AssetDatabase + Sentis worker creation). Idempotent. static void EnsureEmbedderLoaded() { if (s_E5 != null) return; var e5 = new E5TextEmbedder(BackendType.CPU, Debug.Log); if (!e5.Load("Model/E5/e5_small_fp16.sentis", "Model/E5/tokenizer.json")) throw new Exception("[RAG] e5 embedder load failed (run OnDeviceAgent ▸ RAG ▸ Convert e5 ONNX to fp16 .sentis first)"); s_E5 = e5; } static EmbeddingFunc BuildEmbedding(EditorPumpDispatcher pump) { var e = ActiveEmbedder; if (e == null) throw new InvalidOperationException("[RAG] embedder not pre-loaded (call EnsureEmbedderLoaded on the main thread)."); Debug.Log("[RAG] embedder: multilingual-e5-small (384)"); return e.AsEmbeddingFunc(pump); } static async Task RunIngestAsync( string dbDir, string[] txts, string endpoint, string model, EditorPumpDispatcher pump, CancellationToken ct) { IChatLlm llm = BuildChatLlm(endpoint, model, 8192); // Tuned for local 12B-class Ollama: MaxAsync=2 avoids tail-of-queue starvation, LlmTimeout=900s outlasts slow decodes, MaxGleaning=0 halves LLM calls at small recall cost. var config = new LightRagConfig { Temperature = 0, MaxAsync = 2, LlmTimeout = TimeSpan.FromSeconds(900), Extraction = new ExtractionOptions { MaxGleaning = 0 }, }; var embedding = BuildEmbedding(pump); var rag = FileBasedLightRag.Create(dbDir, llm, embedding, config); await rag.InitializeAsync().ConfigureAwait(false); for (var i = 0; i < txts.Length; i++) { ct.ThrowIfCancellationRequested(); var name = Path.GetFileName(txts[i]); s_Status = $"Ingesting {name} ({i + 1}/{txts.Length})…"; s_Progress = txts.Length == 0 ? 0f : (float)i / txts.Length; var text = File.ReadAllText(txts[i]); await rag.InsertAsync(text, name).ConfigureAwait(false); } s_Status = "Flushing DB…"; s_Progress = 0.99f; await rag.FinalizeAsync().ConfigureAwait(false); WriteManifest(dbDir); } static void WriteManifest(string dbDir) { try { var files = Directory.GetFiles(dbDir, "*", SearchOption.AllDirectories); using var writer = new StreamWriter(Path.Combine(dbDir, ManifestFile), false); // Timestamp in header ensures each ingest yields a different manifest - StreamingDbInstaller byte-compares to detect stale installs. writer.WriteLine("# ingested-at: " + DateTime.UtcNow.ToString("o")); foreach (var f in files) { var rel = f.Substring(dbDir.Length).TrimStart('/', '\\').Replace('\\', '/'); if (rel == ManifestFile || rel.EndsWith(".meta", StringComparison.OrdinalIgnoreCase)) continue; writer.WriteLine(rel); } } catch (Exception e) { Debug.LogException(e); } } sealed class EditorPumpDispatcher : IUnityMainThreadDispatcher { readonly ConcurrentQueue m_Queue = new ConcurrentQueue(); readonly Thread m_MainThread = Thread.CurrentThread; public bool IsOnMainThread => Thread.CurrentThread == m_MainThread; public void Post(Action action) { if (action != null) m_Queue.Enqueue(action); } public Task RunOnMainAsync(Func work) { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); m_Queue.Enqueue(() => { try { var t = work(); Bridge(t, tcs, _ => true); } catch (Exception e) { tcs.TrySetException(e); } }); return tcs.Task; } public Task RunOnMainAsync(Func> work) { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); m_Queue.Enqueue(() => { try { var t = work(); Bridge(t, tcs, x => x.Result); } catch (Exception e) { tcs.TrySetException(e); } }); return tcs.Task; } public void Drain() { while (m_Queue.TryDequeue(out var action)) { try { action(); } catch (Exception e) { Debug.LogException(e); } } } static void Bridge(Task t, TaskCompletionSource tcs, Func, TOut> select) { void Complete(Task x) { if (x.IsFaulted) tcs.TrySetException(x.Exception!.InnerExceptions); else if (x.IsCanceled) tcs.TrySetCanceled(); else tcs.TrySetResult(select(x)); } if (t.IsCompleted) Complete(t); else t.ContinueWith(Complete, TaskScheduler.Default); } static void Bridge(Task t, TaskCompletionSource tcs, Func _) { void Complete(Task x) { if (x.IsFaulted) tcs.TrySetException(x.Exception!.InnerExceptions); else if (x.IsCanceled) tcs.TrySetCanceled(); else tcs.TrySetResult(true); } if (t.IsCompleted) Complete(t); else t.ContinueWith(Complete, TaskScheduler.Default); } } } }