com.sky.ondeviceagent / Runtime /AgentCore /Editor /KnowledgeIngestMenu.cs
Sky-Kim's picture
Initial commit
2e7837a
Raw
History Blame Contribute Delete
16 kB
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using LightRAG.Core;
using LightRAG.Core.Abstractions;
using LightRAG.Core.Extraction;
using LightRAG.Storage.FileBased;
using Newtonsoft.Json.Linq;
using Unity.InferenceEngine;
using UnityEditor;
using UnityEngine;
using OnDeviceAgent.AgentCore;
using OnDeviceAgent.RagLlm;
namespace OnDeviceAgent.AgentCore.Editor
{
public static class KnowledgeIngestMenu
{
const string KnowledgeSubdir = "Knowledge";
const string DbSubdir = "DB";
const string ManifestFile = "db_manifest.txt";
static volatile string s_Status = "Starting…";
static volatile float s_Progress = 0f;
// Reads ingest config from KnowledgeRagComponent in the active scene; falls back to defaults if absent.
static (string endpoint, string model) ResolveIngestConfig()
{
var rag = UnityEngine.Object.FindAnyObjectByType<KnowledgeRagComponent>(FindObjectsInactive.Include);
if (rag != null) return (rag.IngestEndpoint, rag.IngestModel);
Debug.LogWarning("[RAG] No KnowledgeRagComponent found in the active scene, using defaults (http://localhost:11434 / gemma4:e2b). Add the component to choose a different ingest model.");
return ("http://localhost:11434", "gemma4:e2b");
}
[MenuItem("OnDeviceAgent/RAG/Ingest Knowledge to DB")]
static void Ingest()
{
var knowledgeDir = Path.Combine(Application.streamingAssetsPath, KnowledgeSubdir);
var dbDir = Path.Combine(Application.streamingAssetsPath, DbSubdir);
if (!Directory.Exists(knowledgeDir))
{
EditorUtility.DisplayDialog("LightRAG Ingest", "Missing StreamingAssets/Knowledge.", "OK");
return;
}
var txts = Directory.GetFiles(knowledgeDir, "*.txt");
if (txts.Length == 0)
{
EditorUtility.DisplayDialog("LightRAG Ingest", "No .txt files in StreamingAssets/Knowledge.", "OK");
return;
}
var (endpoint, model) = ResolveIngestConfig();
var choice = EditorUtility.DisplayDialogComplex(
"LightRAG Ingest",
$"Ingest {txts.Length} .txt file(s) from StreamingAssets/Knowledge into StreamingAssets/DB.\n" +
$"LLM (entity extraction): {model} @ {endpoint}, a running Ollama server is required.\n" +
"Embeddings: multilingual-e5-small (on-device). Progress shows live cache growth.",
"Clean rebuild", "Cancel", "Incremental");
if (choice == 1) return;
if (choice == 0 && Directory.Exists(dbDir))
{
try { Directory.Delete(dbDir, true); }
catch (Exception e) { Debug.LogException(e); }
}
Directory.CreateDirectory(dbDir);
try { EnsureEmbedderLoaded(); }
catch (Exception e)
{
Debug.LogException(e);
EditorUtility.DisplayDialog("LightRAG Ingest", "Failed to load the text embedder, see console.", "OK");
return;
}
var pump = new EditorPumpDispatcher();
var cts = new CancellationTokenSource();
var sw = System.Diagnostics.Stopwatch.StartNew();
s_Status = "Initializing…";
s_Progress = 0f;
Task ingestTask = null;
try
{
ingestTask = RunIngestAsync(dbDir, txts, endpoint, model, pump, cts.Token);
while (!ingestTask.IsCompleted)
{
pump.Drain();
// Progress bar uses a time-based asymptote toward 0.9 (per-file granularity is useless for single-file ingest); elapsed time is the liveness signal.
var elapsedSec = sw.Elapsed.TotalSeconds;
var timeRamp = (float)(elapsedSec / (elapsedSec + 30.0)) * 0.9f;
var progress = Mathf.Clamp01(Mathf.Max(s_Progress, timeRamp));
var info = $"{s_Status} · elapsed {elapsedSec:0}s";
if (EditorUtility.DisplayCancelableProgressBar("LightRAG Ingest", info, progress))
{
cts.Cancel();
break;
}
Thread.Sleep(1);
}
while (!ingestTask.IsCompleted)
{
pump.Drain();
Thread.Sleep(1);
}
pump.Drain();
if (ingestTask.IsFaulted)
{
Debug.LogException(ingestTask.Exception);
EditorUtility.DisplayDialog("LightRAG Ingest", "Ingest failed, see console.", "OK");
}
else if (ingestTask.IsCanceled || cts.IsCancellationRequested)
{
Debug.LogWarning("[RAG] Ingest canceled.");
}
else
{
Debug.Log($"[RAG] Ingest complete in {sw.Elapsed.TotalSeconds:0}s. DB at: {dbDir}");
}
}
finally
{
cts.Dispose();
EditorUtility.ClearProgressBar();
AssetDatabase.Refresh();
}
}
public static void IngestHeadless()
{
// Strip stack traces from Log in batchmode - heartbeat fires every 10s and traces would bury progress. Warnings/errors keep theirs.
Application.SetStackTraceLogType(LogType.Log, StackTraceLogType.None);
// Batchmode has no scene loaded, so open the sample scene first so ResolveIngestConfig() can read the component's inspector values.
const string RagScenePath = "Assets/Examples/VoiceAgent/VoiceAgentSample.unity";
if (File.Exists(RagScenePath))
{
UnityEditor.SceneManagement.EditorSceneManager.OpenScene(
RagScenePath,
UnityEditor.SceneManagement.OpenSceneMode.Single);
Debug.Log($"[RAG] Loaded scene for ingest config: {RagScenePath}");
}
else
{
Debug.LogWarning($"[RAG] Scene not found at {RagScenePath}, falling back to ResolveIngestConfig defaults.");
}
var knowledgeDir = Path.Combine(Application.streamingAssetsPath, KnowledgeSubdir);
var dbDir = Path.Combine(Application.streamingAssetsPath, DbSubdir);
if (!Directory.Exists(knowledgeDir)) { Debug.LogError("[RAG] Missing StreamingAssets/Knowledge."); return; }
var txts = Directory.GetFiles(knowledgeDir, "*.txt");
if (txts.Length == 0) { Debug.LogError("[RAG] No .txt files in StreamingAssets/Knowledge."); return; }
var (endpoint, model) = ResolveIngestConfig();
if (Directory.Exists(dbDir)) { try { Directory.Delete(dbDir, true); } catch (Exception e) { Debug.LogException(e); } }
Directory.CreateDirectory(dbDir);
try { EnsureEmbedderLoaded(); } catch (Exception e) { Debug.LogException(e); return; }
var pump = new EditorPumpDispatcher();
var sw = System.Diagnostics.Stopwatch.StartNew();
Debug.Log($"[RAG] Headless ingest start: {txts.Length} file(s), LLM {model} @ {endpoint}.");
s_Status = "Initializing…";
s_Progress = 0f;
try
{
var task = RunIngestAsync(dbDir, txts, endpoint, model, pump, CancellationToken.None);
// Report LLM cache file size as forward-progress proxy (per-file granularity is useless for single-file); +0 KB across heartbeats = stuck.
var cachePath = Path.Combine(dbDir, "kv_store_llm_response_cache.json");
string lastStatus = null;
long lastCacheBytes = 0;
var heartbeat = System.Diagnostics.Stopwatch.StartNew();
const double heartbeatIntervalSec = 10.0;
while (!task.IsCompleted)
{
pump.Drain();
Thread.Sleep(2);
var status = s_Status;
if (status != lastStatus || heartbeat.Elapsed.TotalSeconds >= heartbeatIntervalSec)
{
long cacheBytes = 0;
try { if (File.Exists(cachePath)) cacheBytes = new FileInfo(cachePath).Length; }
catch { /* file may be mid-write, best-effort sampling */ }
var delta = cacheBytes - lastCacheBytes;
var sinceLastSec = heartbeat.Elapsed.TotalSeconds;
Debug.Log(
$"[RAG] {status} (cache {cacheBytes / 1024} KB, +{delta / 1024} KB in {sinceLastSec:0}s, elapsed {sw.Elapsed.TotalSeconds:0}s)");
lastCacheBytes = cacheBytes;
lastStatus = status;
heartbeat.Restart();
}
}
pump.Drain();
if (task.IsFaulted)
{
Debug.LogException(task.Exception);
Debug.LogError("[RAG] Headless ingest FAILED.");
}
else
{
Debug.Log($"[RAG] Headless ingest COMPLETE in {sw.Elapsed.TotalSeconds:0}s. DB: {dbDir}");
}
}
finally
{
AssetDatabase.Refresh();
}
}
static IChatLlm BuildChatLlm(string ollamaEndpoint, string ollamaModel, int numCtx)
{
var liteBase = Environment.GetEnvironmentVariable("LITELLM_BASE");
if (!string.IsNullOrEmpty(liteBase))
{
var liteModel = Environment.GetEnvironmentVariable("LITELLM_MODEL");
if (string.IsNullOrEmpty(liteModel)) liteModel = ollamaModel;
var key = Environment.GetEnvironmentVariable("LITELLM_API_KEY");
Debug.Log($"[RAG] LLM provider: LiteLLM model={liteModel} @ {liteBase}");
return new LiteLlmChatLlm(liteModel, liteBase, key);
}
Debug.Log($"[RAG] LLM provider: Ollama model={ollamaModel} @ {ollamaEndpoint}");
return new OllamaChatLlm(ollamaModel, ollamaEndpoint, numCtx);
}
static E5TextEmbedder s_E5;
static ITextEmbedder ActiveEmbedder => s_E5;
// Must be called on the Unity main thread (AssetDatabase + Sentis worker creation). Idempotent.
static void EnsureEmbedderLoaded()
{
if (s_E5 != null) return;
var e5 = new E5TextEmbedder(BackendType.CPU, Debug.Log);
if (!e5.Load("Model/E5/e5_small_fp16.sentis", "Model/E5/tokenizer.json"))
throw new Exception("[RAG] e5 embedder load failed (run OnDeviceAgent ▸ RAG ▸ Convert e5 ONNX to fp16 .sentis first)");
s_E5 = e5;
}
static EmbeddingFunc BuildEmbedding(EditorPumpDispatcher pump)
{
var e = ActiveEmbedder;
if (e == null) throw new InvalidOperationException("[RAG] embedder not pre-loaded (call EnsureEmbedderLoaded on the main thread).");
Debug.Log("[RAG] embedder: multilingual-e5-small (384)");
return e.AsEmbeddingFunc(pump);
}
static async Task RunIngestAsync(
string dbDir, string[] txts, string endpoint, string model,
EditorPumpDispatcher pump, CancellationToken ct)
{
IChatLlm llm = BuildChatLlm(endpoint, model, 8192);
// Tuned for local 12B-class Ollama: MaxAsync=2 avoids tail-of-queue starvation, LlmTimeout=900s outlasts slow decodes, MaxGleaning=0 halves LLM calls at small recall cost.
var config = new LightRagConfig
{
Temperature = 0,
MaxAsync = 2,
LlmTimeout = TimeSpan.FromSeconds(900),
Extraction = new ExtractionOptions { MaxGleaning = 0 },
};
var embedding = BuildEmbedding(pump);
var rag = FileBasedLightRag.Create(dbDir, llm, embedding, config);
await rag.InitializeAsync().ConfigureAwait(false);
for (var i = 0; i < txts.Length; i++)
{
ct.ThrowIfCancellationRequested();
var name = Path.GetFileName(txts[i]);
s_Status = $"Ingesting {name} ({i + 1}/{txts.Length})…";
s_Progress = txts.Length == 0 ? 0f : (float)i / txts.Length;
var text = File.ReadAllText(txts[i]);
await rag.InsertAsync(text, name).ConfigureAwait(false);
}
s_Status = "Flushing DB…";
s_Progress = 0.99f;
await rag.FinalizeAsync().ConfigureAwait(false);
WriteManifest(dbDir);
}
static void WriteManifest(string dbDir)
{
try
{
var files = Directory.GetFiles(dbDir, "*", SearchOption.AllDirectories);
using var writer = new StreamWriter(Path.Combine(dbDir, ManifestFile), false);
// Timestamp in header ensures each ingest yields a different manifest - StreamingDbInstaller byte-compares to detect stale installs.
writer.WriteLine("# ingested-at: " + DateTime.UtcNow.ToString("o"));
foreach (var f in files)
{
var rel = f.Substring(dbDir.Length).TrimStart('/', '\\').Replace('\\', '/');
if (rel == ManifestFile || rel.EndsWith(".meta", StringComparison.OrdinalIgnoreCase))
continue;
writer.WriteLine(rel);
}
}
catch (Exception e) { Debug.LogException(e); }
}
sealed class EditorPumpDispatcher : IUnityMainThreadDispatcher
{
readonly ConcurrentQueue<Action> m_Queue = new ConcurrentQueue<Action>();
readonly Thread m_MainThread = Thread.CurrentThread;
public bool IsOnMainThread => Thread.CurrentThread == m_MainThread;
public void Post(Action action)
{
if (action != null) m_Queue.Enqueue(action);
}
public Task RunOnMainAsync(Func<Task> work)
{
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
m_Queue.Enqueue(() =>
{
try
{
var t = work();
Bridge(t, tcs, _ => true);
}
catch (Exception e) { tcs.TrySetException(e); }
});
return tcs.Task;
}
public Task<T> RunOnMainAsync<T>(Func<Task<T>> work)
{
var tcs = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
m_Queue.Enqueue(() =>
{
try
{
var t = work();
Bridge(t, tcs, x => x.Result);
}
catch (Exception e) { tcs.TrySetException(e); }
});
return tcs.Task;
}
public void Drain()
{
while (m_Queue.TryDequeue(out var action))
{
try { action(); }
catch (Exception e) { Debug.LogException(e); }
}
}
static void Bridge<TIn, TOut>(Task<TIn> t, TaskCompletionSource<TOut> tcs, Func<Task<TIn>, TOut> select)
{
void Complete(Task<TIn> x)
{
if (x.IsFaulted) tcs.TrySetException(x.Exception!.InnerExceptions);
else if (x.IsCanceled) tcs.TrySetCanceled();
else tcs.TrySetResult(select(x));
}
if (t.IsCompleted) Complete(t);
else t.ContinueWith(Complete, TaskScheduler.Default);
}
static void Bridge(Task t, TaskCompletionSource<bool> tcs, Func<Task, bool> _)
{
void Complete(Task x)
{
if (x.IsFaulted) tcs.TrySetException(x.Exception!.InnerExceptions);
else if (x.IsCanceled) tcs.TrySetCanceled();
else tcs.TrySetResult(true);
}
if (t.IsCompleted) Complete(t);
else t.ContinueWith(Complete, TaskScheduler.Default);
}
}
}
}