Chapter 3 / 3 ― 高度編

WebWorker pool + 永続化 + 実例 2 件

Worker pool で並列 write/read、Atomics.wait/notify で Worker 間制御、OPFS / IndexedDB / Node.js fs での audit chain 永続化。実例として (1) LLM ハルシネーション抑制ループ と (2) Enterprise audit pipeline を提示。

3.1 WebWorker pool ― 並列 write/read

Chapter 2 では Worker 1 個でしたが、本番では N 個の Worker が同じ SAB-backed RLM を同時操作します。競合は WASM 内部の Atomics.compareExchange で取られますが、Worker 起動と job 配分は JavaScript 側で行います。

3.1.1 pool 実装 (pool.js)

// pool.js -- main から import される Worker pool
export class RlmWorkerPool {
  /**
   * @param {Object} opts
   * @param {WebAssembly.Module} opts.module
   * @param {WebAssembly.Memory} opts.memory
   * @param {number} opts.rlm_base_ptr
   * @param {number} opts.size  Worker 数 (推奨: navigator.hardwareConcurrency - 1)
   */
  constructor({ module, memory, rlm_base_ptr, size }) {
    this.workers   = [];
    this.next_id   = 0;
    this.pending   = new Map();  // request_id -> resolve

    for (let i = 0; i < size; i++) {
      const w = new Worker('./rlm-worker.js', { type: 'module' });
      w.postMessage({
        cmd: 'init',
        module, memory, rlm_base_ptr,
        worker_idx: i,
      });
      w.onmessage = (ev) => {
        const { request_id, result } = ev.data;
        const resolver = this.pending.get(request_id);
        if (resolver) {
          this.pending.delete(request_id);
          resolver(result);
        }
      };
      this.workers.push(w);
    }
  }

  /** Round-robin で job を Worker へ振る */
  exec(op, args) {
    return new Promise((resolve) => {
      const id = this.next_id++;
      this.pending.set(id, resolve);
      const w  = this.workers[id % this.workers.length];
      w.postMessage({ cmd: 'exec', request_id: id, op, args });
    });
  }

  terminate() {
    for (const w of this.workers) w.terminate();
  }
}

3.1.2 rlm-worker.js

import initWasm, { SlimeTreeRLM } from './wasm/slimetree_rlm.js';
let rlm = null;
let worker_idx = -1;

self.onmessage = async (ev) => {
  const d = ev.data;

  if (d.cmd === 'init') {
    await initWasm({ module_or_path: d.module, memory: d.memory });
    rlm = SlimeTreeRLM.attach_shared({
      memory:   d.memory,
      base_ptr: d.rlm_base_ptr,
    });
    worker_idx = d.worker_idx;
    return;
  }

  if (d.cmd === 'exec') {
    let result;
    switch (d.op) {
      case 'write':  result = rlm.write(d.args); break;
      case 'read':   result = rlm.read(d.args.semantic_key); break;
      case 'verify': result = rlm.verify_audit_chain(); break;
      case 'count':  result = rlm.record_count(); break;
      default:       result = { error: 'unknown op: ' + d.op };
    }
    self.postMessage({ request_id: d.request_id, result, worker_idx });
  }
};

3.1.3 main から pool を使う

import init, { SlimeTreeRLM } from './wasm/slimetree_rlm.js';
import { RlmWorkerPool } from './pool.js';

const sharedMemory = new WebAssembly.Memory({
  initial: 256, maximum: 4096, shared: true,
});
const wasmModule = await init({
  module_or_path: './wasm/slimetree_rlm_bg.wasm',
  memory: sharedMemory,
});

const rlm = new SlimeTreeRLM({
  capacity: 16 * 1024 * 1024,
  audit: true, mode: 'shared', memory: sharedMemory,
});

const pool = new RlmWorkerPool({
  module: wasmModule,
  memory: sharedMemory,
  rlm_base_ptr: rlm.shared_base_ptr(),
  size: Math.max(2, (navigator.hardwareConcurrency || 4) - 1),
});

// 1000 件の write を並列で投入 (Worker 間で round-robin)
const t0 = performance.now();
const results = await Promise.all(
  Array.from({ length: 1000 }, (_, i) =>
    pool.exec('write', {
      semantic_key: 'doc:' + i,
      payload:      'document body #' + i,
      source:       'pool_test',
    })
  )
);
const t1 = performance.now();
console.log(`1000 件 write: ${(t1 - t0).toFixed(1)} ms`);

// 全件 verify (main 側で 1 回)
console.log(rlm.verify_audit_chain());

3.2 永続化 ― OPFS / IndexedDB / Node.js fs

SAB は page lifetime の memory なので、リロードで消えます。本番では audit chain を永続化レイヤと連動させる必要があります。

3.2.1 OPFS (Origin Private File System) ― ブラウザ推奨

// rlm.snapshot() は SAB 上の全状態をシリアライズ
const snapshot = rlm.snapshot();   // Uint8Array

const root   = await navigator.storage.getDirectory();
const file   = await root.getFileHandle('rlm.snapshot.bin', { create: true });
const writer = await file.createWritable();
await writer.write(snapshot);
await writer.close();
console.log('OPFS に snapshot 保存:', snapshot.byteLength, 'bytes');

// --- リロード後の復元 ---
const root2 = await navigator.storage.getDirectory();
const file2 = await root2.getFileHandle('rlm.snapshot.bin');
const blob  = await file2.getFile();
const bytes = new Uint8Array(await blob.arrayBuffer());

const rlm2 = SlimeTreeRLM.from_snapshot({
  bytes,
  memory: sharedMemory,
});
console.log('復元 records:', rlm2.record_count(),
            'verified:', rlm2.verify_audit_chain().verified);

3.2.2 IndexedDB ― 互換性優先

// db.js
export function openRlmDb() {
  return new Promise((res, rej) => {
    const req = indexedDB.open('rlm-store', 1);
    req.onupgradeneeded = () => req.result.createObjectStore('snapshots');
    req.onsuccess = () => res(req.result);
    req.onerror   = () => rej(req.error);
  });
}

export async function saveSnapshot(db, name, bytes) {
  return new Promise((res, rej) => {
    const tx = db.transaction('snapshots', 'readwrite');
    tx.objectStore('snapshots').put(bytes, name);
    tx.oncomplete = res;
    tx.onerror    = () => rej(tx.error);
  });
}

export async function loadSnapshot(db, name) {
  return new Promise((res, rej) => {
    const tx  = db.transaction('snapshots', 'readonly');
    const req = tx.objectStore('snapshots').get(name);
    req.onsuccess = () => res(req.result);
    req.onerror   = () => rej(req.error);
  });
}

3.2.3 Node.js fs + worker_threads (サーバサイド)

// node_server.mjs
import { readFileSync, writeFileSync } from 'node:fs';
import { Worker } from 'node:worker_threads';
import init, { SlimeTreeRLM } from './wasm/slimetree_rlm.js';

const memory = new WebAssembly.Memory({
  initial: 256, maximum: 4096, shared: true,
});
await init({
  module_or_path: readFileSync('./wasm/slimetree_rlm_bg.wasm'),
  memory,
});

let rlm;
try {
  const snap = readFileSync('./data/rlm.snapshot.bin');
  rlm = SlimeTreeRLM.from_snapshot({ bytes: snap, memory });
  console.log(`[boot] 復元 ${rlm.record_count()} records`);
} catch {
  rlm = new SlimeTreeRLM({
    capacity: 64 * 1024 * 1024,
    audit: true, mode: 'shared', memory,
  });
  console.log('[boot] 新規 RLM');
}

// 60 秒ごとに永続化
setInterval(() => {
  const snap = rlm.snapshot();
  writeFileSync('./data/rlm.snapshot.bin', snap);
  console.log(`[snap] ${snap.byteLength} bytes, ` +
              `${rlm.record_count()} records`);
}, 60_000);

実例 1: LLM ハルシネーション抑制ループ

LLM 推論を 必ず SlimeTree-RLM に書込み → audit chain で検証 → 抑制 hook で出力決定 する 3 段ループ。-20.4±0.3 pt の改善構造を組込む最小実装。

3.3.1 アーキテクチャ

┌──────────────┐
│ User input   │
└──────┬───────┘
       v
┌──────────────┐     SAB-backed RLM
│ Main thread  │ <──────────────────────┐
│  (UI / hook) │                        │
└──────┬───────┘                        │
       v                                │
┌──────────────┐     ┌─────────────────┐│
│ LLM Worker   │ ──> │ Verify Worker   ││
│ (推論+write) │     │ (audit + suppress) │
└──────────────┘     └─────────────────┘│
                           │            │
                           └────────────┘

3.3.2 LLM Worker (llm-worker.js)

import initWasm, { SlimeTreeRLM } from './wasm/slimetree_rlm.js';
let rlm = null;

self.onmessage = async (ev) => {
  if (ev.data.cmd === 'init') {
    await initWasm({ module_or_path: ev.data.module, memory: ev.data.memory });
    rlm = SlimeTreeRLM.attach_shared({
      memory: ev.data.memory, base_ptr: ev.data.rlm_base_ptr,
    });
    self.postMessage({ event: 'ready' });
    return;
  }

  if (ev.data.cmd === 'infer') {
    const { prompt, request_id } = ev.data;

    // (a) LLM API 呼出 (例: ローカル Ollama)
    const resp = await fetch('http://localhost:11434/api/generate', {
      method: 'POST',
      headers: { 'content-type': 'application/json' },
      body: JSON.stringify({
        model: 'qwen3:8b', prompt, stream: false,
      }),
    });
    const { response: text } = await resp.json();

    // (b) RLM に「LLM 出力 + 由来」を書込み (このタイミングで audit chain が伸びる)
    const record_id = rlm.write({
      semantic_key: 'llm_output:' + request_id,
      payload:      text,
      source:       'qwen3:8b|prompt_sha256=' + await sha256(prompt),
    });

    self.postMessage({
      event: 'inferred', request_id, record_id, text,
    });
  }
};

async function sha256(s) {
  const buf = new TextEncoder().encode(s);
  const hash = await crypto.subtle.digest('SHA-256', buf);
  return Array.from(new Uint8Array(hash))
    .map(b => b.toString(16).padStart(2, '0')).join('');
}

3.3.3 Verify Worker (verify-worker.js)

import initWasm, { SlimeTreeRLM } from './wasm/slimetree_rlm.js';
let rlm = null;

self.onmessage = async (ev) => {
  if (ev.data.cmd === 'init') {
    await initWasm({ module_or_path: ev.data.module, memory: ev.data.memory });
    rlm = SlimeTreeRLM.attach_shared({
      memory: ev.data.memory, base_ptr: ev.data.rlm_base_ptr,
    });
    self.postMessage({ event: 'ready' });
    return;
  }

  if (ev.data.cmd === 'check') {
    const { record_id } = ev.data;
    const rec = rlm.read_by_id(record_id);
    if (!rec) {
      self.postMessage({ event: 'suppress', record_id, reason: 'not_found' });
      return;
    }

    // (c) semantic_key の prefix から「同一トピック」過去 record と意味照合
    const prefix = rec.semantic_key.split(':').slice(0, 2).join(':');
    const related = rlm.query_by_prefix(prefix, { limit: 32 });

    //   過去発話との整合性を SlimeTree-RLM の意味距離関数で評価
    //   ※ 距離関数は WASM 側で確率推論的ではなく構造的に計算される
    const semantic = rlm.evaluate_semantic_consistency({
      target_record_id: record_id,
      reference_ids:    related.map(r => r.id),
    });

    // (d) chain 全体の整合性
    const audit = rlm.verify_audit_chain();

    self.postMessage({
      event: audit.verified && semantic.consistent ? 'pass' : 'suppress',
      record_id,
      semantic_score: semantic.score,    // 0..1
      audit_ok:       audit.verified,
      reason:         semantic.consistent ? 'consistent' : 'inconsistent',
    });
  }
};

3.3.4 Main の orchestration

const llm    = new Worker('./llm-worker.js',    { type: 'module' });
const verify = new Worker('./verify-worker.js', { type: 'module' });
const sharedInit = { module: wasmModule, memory: sharedMemory,
                     rlm_base_ptr: rlm.shared_base_ptr() };
llm.postMessage({    cmd: 'init', ...sharedInit });
verify.postMessage({ cmd: 'init', ...sharedInit });

const pending = new Map();   // request_id -> { resolveSafe, prompt }

llm.onmessage = (ev) => {
  if (ev.data.event === 'inferred') {
    // LLM が write を終えた直後、verify Worker に検証依頼
    verify.postMessage({ cmd: 'check', record_id: ev.data.record_id });
    const p = pending.get(ev.data.request_id);
    if (p) p.pendingRecord = ev.data;
  }
};

verify.onmessage = (ev) => {
  // verify 結果を受けて UI 側で表示するか抑制するかを決める
  const req_id = ev.data.record_id;  // record_id をそのまま request_id 相当に
  for (const [k, p] of pending) {
    if (p.pendingRecord && p.pendingRecord.record_id === req_id) {
      if (ev.data.event === 'pass') {
        p.resolveSafe({ ok: true, text: p.pendingRecord.text });
      } else {
        p.resolveSafe({
          ok: false,
          suppressed: true,
          reason: ev.data.reason,
          score:  ev.data.semantic_score,
        });
      }
      pending.delete(k);
      return;
    }
  }
};

// 公開 API:  askSafe('質問')
export function askSafe(prompt) {
  return new Promise((resolveSafe) => {
    const request_id = crypto.randomUUID();
    pending.set(request_id, { resolveSafe, prompt });
    llm.postMessage({ cmd: 'infer', prompt, request_id });
  });
}

// 使用例
const r = await askSafe('富士山の標高は?');
if (r.ok) console.log('LLM:', r.text);
else      console.warn('抑制:', r.reason, r.score);
このループが効く理由: LLM 出力を一度 SAB-backed RLM に commit してから 並行 Worker が同一 chain 上で意味整合性 + 監査整合性の二段で判定するため、「LLM が新しい嘘を生成 → そのまま UI に出る」最初のステップを 1 chain 操作分だけ遅延させて押さえ込めます。-20.4±0.3 pt の構造定数はこの commit→verify→suppress 経路の組込で再現します。

実例 2: Enterprise audit pipeline (Node.js worker_threads)

業務システムからの取引データを 取引 Worker (write) + 監査 Worker (verify) + アーカイブ Worker (snapshot) の 3 つで並行処理。Node.js worker_threads 上で SAB-backed RLM を共有。

3.4.1 全体構成 (server.mjs)

import { readFileSync, writeFileSync } from 'node:fs';
import { Worker } from 'node:worker_threads';
import express from 'express';
import init, { SlimeTreeRLM } from './wasm/slimetree_rlm.js';

const memory = new WebAssembly.Memory({
  initial: 1024, maximum: 8192, shared: true,
});
await init({
  module_or_path: readFileSync('./wasm/slimetree_rlm_bg.wasm'),
  memory,
});

// snapshot 復元 or 新規
let rlm;
try {
  rlm = SlimeTreeRLM.from_snapshot({
    bytes:  readFileSync('./data/rlm.snapshot.bin'),
    memory,
  });
  console.log(`[boot] 復元 ${rlm.record_count()} records`);
} catch {
  rlm = new SlimeTreeRLM({
    capacity: 256 * 1024 * 1024,  // 256 MiB
    audit: true, mode: 'shared', memory,
  });
}

// === 監査 Worker (常時 verify) ===
const auditW = new Worker('./audit_worker.mjs', {
  workerData: { memory, rlm_base_ptr: rlm.shared_base_ptr() },
});

// === アーカイブ Worker (60 秒ごと snapshot) ===
const archW = new Worker('./archive_worker.mjs', {
  workerData: { memory, rlm_base_ptr: rlm.shared_base_ptr() },
});

// === HTTP API (取引受付) ===
const app = express();
app.use(express.json());

app.post('/tx', (req, res) => {
  const { tx_id, kind, amount, account } = req.body;
  const record_id = rlm.write({
    semantic_key: `tx:${kind}:${account}:${tx_id}`,
    payload:      JSON.stringify({ amount, ts: Date.now() }),
    source:       `api|${req.ip}|${req.headers['user-agent']}`,
  });
  res.json({ record_id });
});

app.get('/audit/head', (req, res) => {
  const r = rlm.verify_audit_chain();
  res.json(r);
});

app.listen(8443);

3.4.2 監査 Worker (audit_worker.mjs)

import { parentPort, workerData } from 'node:worker_threads';
import init, { SlimeTreeRLM } from './wasm/slimetree_rlm.js';
import { readFileSync } from 'node:fs';

await init({
  module_or_path: readFileSync('./wasm/slimetree_rlm_bg.wasm'),
  memory:         workerData.memory,
});

const rlm = SlimeTreeRLM.attach_shared({
  memory:   workerData.memory,
  base_ptr: workerData.rlm_base_ptr,
});

let last_count = 0;
setInterval(() => {
  const c = rlm.record_count();
  if (c <= last_count) return;     // 差分がないなら verify をスキップ
  const r = rlm.verify_audit_chain();
  parentPort.postMessage({
    event: 'audit_tick',
    record_count: c,
    verified:     r.verified,
    head_hash:    r.head_hash,
    broken_at:    r.broken_at,
  });
  if (!r.verified) {
    parentPort.postMessage({ event: 'ALERT', broken_at: r.broken_at });
  }
  last_count = c;
}, 1000);

3.4.3 アーカイブ Worker (archive_worker.mjs)

import { parentPort, workerData } from 'node:worker_threads';
import init, { SlimeTreeRLM } from './wasm/slimetree_rlm.js';
import { writeFileSync, readFileSync, mkdirSync } from 'node:fs';
import { join } from 'node:path';

await init({
  module_or_path: readFileSync('./wasm/slimetree_rlm_bg.wasm'),
  memory:         workerData.memory,
});

const rlm = SlimeTreeRLM.attach_shared({
  memory:   workerData.memory,
  base_ptr: workerData.rlm_base_ptr,
});

mkdirSync('./data/archive', { recursive: true });

setInterval(() => {
  const snap = rlm.snapshot();
  // 最新を rlm.snapshot.bin、世代をタイムスタンプで保存
  writeFileSync('./data/rlm.snapshot.bin', snap);
  const ts = new Date().toISOString().replace(/[:.]/g, '-');
  writeFileSync(join('./data/archive', `rlm-${ts}.snapshot.bin`), snap);
  parentPort.postMessage({
    event: 'snapshot',
    bytes: snap.byteLength,
    record_count: rlm.record_count(),
  });
}, 60_000);
このパイプラインの監査価値: 取引 Worker と監査 Worker が 同一 SAB 上の chain を見ているため、取引 commit と監査検証の間に「ネットワーク → 別 DB → 別 worker」のような分離点が存在せず、改竄/競合の隙間が物理的に発生し得ません。スナップショット連鎖 + 60 秒世代保存で air-gap 監査も成立。

3.5 ハマる点 ― 高度編で詰まる 5 つ

(1) Atomics.wait は main thread で使えない

main で Atomics.wait を呼ぶと TypeError。busy-wait や lock は必ず Worker 側で。main は postMessage + 非同期 await で同期取りすること。

(2) WebWorker は SharedArrayBufferpostMessage で渡しても OK だが、main の WebAssembly.Memory も同じく必須

SAB だけ渡しても、WASM のメモリレイアウト整合が取れない。WebAssembly.Memory オブジェクトそのものを postMessage で同じ参照として共有する。

(3) Node.js worker_threadsSharedArrayBuffer 受渡し

Node では workerData 経由か parentPort.postMessage(buf, [buf])WebAssembly.Memory.buffer プロパティが SharedArrayBuffer インスタンスであることを起動時に console.assert で確認。

(4) snapshot サイズが想定外に大きい

rlm.snapshot() は capacity ベースではなく実 record 体積でシリアライズされる。が、payload 数百 MB を扱う場合は rlm.snapshot_incremental(prev_head_hash) で差分のみ保存することを推奨 (Chapter 3.6 参照)。

(5) WASM module の version mismatch

snapshot を別バージョンの WASM で読込むと RlmVersionMismatch。snapshot の最初の 32 bytes に WASM build id が埋まっており、SlimeTreeRLM.snapshot_metadata(bytes) で事前確認可能。本番では「同一 WASM ハッシュを記録 → アップロード/復元時に検証」プロセス必須。

3.6 さらなる学習 ― ここから先

本チュートリアル 3 章で「SlimeTree-RLM の本質を踏まえた組込」までは到達しました。これより先は実装個別の問い合わせ領域になります。

  • 差分 snapshot (snapshot_incremental) と分散 audit chain merge
  • WASM threads の Atomics.waitAsync による非ブロッキング同期
  • OPFS の SyncAccessHandle を使った Worker 内同期 I/O
  • Node.js / Deno / Bun での挙動差 (memory.grow 上限、worker_threads 既定)
  • LLM 推論サーバ (vLLM / SGLang / Ollama) との SAB 連携パターン
  • 監査要件 (FISC / PCI-DSS / GDPR) と SHA-256 chain export の対応マッピング

個別の組込・受託・共同 PoC は お問い合わせ または パートナーページ から。