[メッセージツール呼び出しがターン終了までバッチ処理される] - Message Tool Calls Batched Until Turn End Instead of Delivered Immediately
単一のエージェントターン中のすべての `message` ツール呼び出しがキューに追加され、ターン完了時のみ送信されます。これにより、進捗更新のリアルタイム通信パターンが中断されます。
🔍 症状
観察可能な動作
エージェントが1つのターン内でmessageツールを複数回呼び出した場合、すべてのメッセージがそれぞれの呼び出しポイントではなく、ターン完了時に同時に到着します。
現在の動作のCLIデモ
# Scenario: Agent with 3 message tool calls during a long-running task
# User experiences: silence for the entire duration, then all messages arrive at once
# Timeline of events (as seen by the channel/API consumer):
[T+0s] Turn started - no visible output
[T+30s] Tool call 1-5 executed - no visible output
[T+60s] Tool call 6-10 executed - no visible output
[T+90s] Turn completed - ALL THREE MESSAGES delivered simultaneously:
Channel Output (received at T+90s):
┌─────────────────────────────────────────────────────────────┐
│ [90s] 收到,开始分析... │
│ [90s] 数据拉完,正在生成报告 │
│ [90s] 报告完成,核心结论... │
└─────────────────────────────────────────────────────────────┘
Expected Output:
┌─────────────────────────────────────────────────────────────┐
│ [0s] 收到,开始分析... │
│ [60s] 数据拉完,正在生成报告 │
│ [90s] 报告完成,核心结论... │
└─────────────────────────────────────────────────────────────┘
チャネル別の症状
| チャネル | 症状 |
|---|---|
| Telegram | Botが応答しない状態に見える;ユーザーがすべてのメッセージを短時間で連続して受信する |
| Slack | 一時的なメッセージがターン終了まで表示されない;最後に一括して配信される |
| Webhook | APIがターン完了時に15以上のイベントの配列を一度に受信する(ストリーミングではない) |
| WebSocket | 中間フレームが送信されない;すべての内容を含む単一の最終フレームのみ |
デバッグ指標
トレースが有効な場合、メッセージツールの出力にはバッチ処理の動作が表示されます:
# With TRACE_LEVEL=debug, observe the turn lifecycle
[TRACE] Turn 42 started
[TRACE] Tool call: message (queued for turn-end delivery) - "收到,开始分析..."
[TRACE] Tool call: database.query (executing)
[TRACE] Tool call: message (queued for turn-end delivery) - "数据拉完,正在生成报告"
[TRACE] Tool call: file.write (executing)
[TRACE] Tool call: message (queued for turn-end delivery) - "报告完成,核心结论..."
[TRACE] Turn 42 completed - flushing 3 queued messages
[DEBUG] Delivering batch: [msg_1, msg_2, msg_3]
正常に動作するシナリオとの比較
メッセージが即座に到着するのは、次の場合です:
- ターンに
messageツールの呼び出しが1回だけで、他のツールがない場合 - エージェントが1つのターンを完了し(message以外のすべてのツール)、次にmessageを含む新しいターンを開始する場合
- メッセージが
messageツールではなくsession.reply()経由で送信される場合
🧠 原因
アーキテクチャ分析
即時配信の失敗は、OpenClawのターン単位の結果集約モデルに起因します。このシステムは、ターン境界内のすべてのツール結果(包括的にmessageツールの出力)を収集し、単一バッチで配信するように設計されています。
コードフローの内訳
┌──────────────────────────────────────────────────────────────────────────────┐
│ TURN PROCESSING PIPELINE │
├──────────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. TURN_START │
│ └─> Initialize turn context │
│ └─> Create empty result buffer │
│ │
│ 2. TOOL_EXECUTION_LOOP │
│ ├─> For each tool call: │
│ │ ├─> Execute tool │
│ │ ├─> If tool == "message": │
│ │ │ └─> buffer.append(message_result) ← QUEUED, NOT SENT │
│ │ │ ↑ │
│ │ └─> buffer.append(tool_result) │ │
│ │ │ │
│ └─> Repeat until no more tool calls ─┘ │
│ │
│ 3. TURN_END │
│ └─> flush_result_buffer() ← ALL MESSAGES SENT HERE │
│ └─> deliver_to_channel(batch) │
│ │
└──────────────────────────────────────────────────────────────────────────────┘
関連する主要ソースファイル
| ファイル | 役割 |
|---|---|
packages/core/src/turn/turn-executor.ts | ツール実行ループをorchestrates;すべての結果をバッファリング |
packages/tools/message/src/message-tool.ts | メッセージツールの実装;結果バッファに出力 |
packages/channel-core/src/turn-context.ts | ターン単位の状態と結果コレクションを管理 |
packages/api/src/session.ts | session.reply()パス(即時配信)とツールパスの比較 |
セマンティクスの不一致
messageツールはセマンティックにはファイア・アンド・フォーゲットのユーザー通知ですが、実装では構造化データを返す他のツールと同じ方法で扱われています:
// Current implementation (problematic)
class MessageTool {
async execute(params: MessageParams, context: TurnContext): Promise {
// Treats message like a data-returning tool
// Result gets queued in context.results[] until turn end
return {
output: `Message queued: ${params.content}`,
// No immediate channel delivery
};
}
}
// Semantic intent
// Message tool = "Send this to the user NOW"
// Other tools = "Return this result for agent consideration"
session.reply()との比較
session.reply()メソッドは結果バッファをバイパスするため即時配信されます:
// session.reply() - immediate delivery path
class Session {
async reply(content: string): Promise {
await this.channel.send(content); // ← Direct channel send
}
}
// message tool - deferred delivery path
class MessageTool {
async execute(params, context): Promise {
context.results.push({ output: content }); // ← Buffered
// Delivered only when turn completes
}
}
この設計が存在する理由
バッチモデルは正当なユースケースに役立ちます:
- チャネルへのAPI呼び出し回数を削減(個別送信ではなく1回のバッチ)
- ツール結果に関するメッセージの順序を保証
- チャネルの実装を簡素化(ターンごとに1回の応答)
しかし、「ユーザーにメッセージを送る」ツールのセマンティックな意図(「今送信する」を意味する)とはConflictしています。
🛠️ 解決手順
推奨ソリューション:オプションA+オプションCのハイブリッド
messageツールのデフォルトでの即時配信を実装し、バッチ配信が必要な場合の immediate: false フラグを提供します。
フェーズ1:メッセージツールスキーマの修正
ファイル: packages/tools/message/src/schema.ts
// BEFORE
export const messageToolSchema = {
name: "message",
description: "Send a message to the user",
parameters: {
type: "object",
properties: {
content: {
type: "string",
description: "The message content to send to the user"
}
},
required: ["content"]
}
};
// AFTER
export const messageToolSchema = {
name: "message",
description: "Send a message to the user. Messages are delivered immediately unless batch mode is requested.",
parameters: {
type: "object",
properties: {
content: {
type: "string",
description: "The message content to send to the user"
},
immediate: {
type: "boolean",
description: "If true, deliver immediately. If false, queue until turn end. Defaults to true.",
default: true
}
},
required: ["content"]
}
};
フェーズ2:メッセージツール実装の更新
ファイル: packages/tools/message/src/message-tool.ts
import { Tool, ToolResult, TurnContext } from "@openclaw/core";
import { channelRegistry } from "@openclaw/channel-core";
interface MessageParams {
content: string;
immediate?: boolean;
}
// Track messages that should be delivered immediately
const IMMEDIATE_DELIVERY_THRESHOLD_MS = 0; // 0 = always immediate when requested
export class MessageTool implements Tool {
name = "message";
description = messageToolSchema.description;
parameters = messageToolSchema.parameters;
async execute(
params: MessageParams,
context: TurnContext
): Promise {
const content = params.content;
const shouldDeliverImmediately = params.immediate !== false; // Default: true
if (shouldDeliverImmediately) {
// IMMEDIATE DELIVERY PATH
return this.deliverImmediately(content, context);
} else {
// BATCHED DELIVERY PATH (original behavior)
return this.queueForTurnEnd(content, context);
}
}
private async deliverImmediately(
content: string,
context: TurnContext
): Promise {
try {
// Get the active channel for this session
const channel = channelRegistry.getChannel(context.session.channelType);
// Send directly to channel, outside turn buffer
await channel.send({
sessionId: context.session.id,
content: content,
metadata: {
toolName: "message",
deliveredAt: Date.now(),
deliveryMode: "immediate"
}
});
return {
success: true,
output: `Message delivered immediately: ${content.substring(0, 50)}...`,
metadata: {
deliveredAt: Date.now(),
deliveryMode: "immediate"
}
};
} catch (error) {
return {
success: false,
output: "",
error: `Failed to deliver message immediately: ${error.message}`,
metadata: {
deliveryMode: "immediate",
fellBackToBatch: true
}
};
}
}
private async queueForTurnEnd(
content: string,
context: TurnContext
): Promise {
// Original behavior: add to turn buffer
context.results.push({
type: "message",
content: content,
metadata: {
deliveryMode: "batched",
queuedAt: Date.now()
}
});
return {
success: true,
output: `Message queued for turn-end delivery: ${content.substring(0, 50)}...`,
metadata: {
deliveryMode: "batched"
}
};
}
}
フェーズ3:チャネル送信機能の登録
ファイル: packages/channel-core/src/channel-registry.ts
// Ensure channels implement immediate send capability
export interface ChannelAdapter {
// Existing methods...
sendBatch(results: TurnResult[]): Promise;
// NEW: Immediate single-message send
send(params: {
sessionId: string;
content: string;
metadata?: Record;
}): Promise;
}
フェーズ4:ターンExecutorの更新(最小限の変更)
ファイル: packages/core/src/turn/turn-executor.ts
// Add filter to exclude already-delivered messages from batch
async function flushResults(context: TurnContext): Promise {
// Filter out messages that were delivered immediately
const batchableResults = context.results.filter(
result => result.metadata?.deliveryMode !== "immediate"
);
if (batchableResults.length > 0) {
await context.channel.sendBatch(batchableResults);
}
// Log summary
const immediateCount = context.results.filter(
r => r.metadata?.deliveryMode === "immediate"
).length;
if (immediateCount > 0) {
context.logger.debug(`Delivered ${immediateCount} messages immediately`);
}
}
フェーズ5:設定オプション
ファイル: packages/core/src/config/tool-config.ts
export interface ToolConfig {
message: {
// Default delivery mode for message tool
defaultDeliveryMode: "immediate" | "batched";
// Fallback if channel doesn't support immediate delivery
fallbackToBatchOnError: boolean;
};
}
export const defaultToolConfig: ToolConfig = {
message: {
defaultDeliveryMode: "immediate", // Changed from "batched"
fallbackToBatchOnError: true
}
};
変更の検証
実装後、実行フローは次のようになります:
┌──────────────────────────────────────────────────────────────────────────────┐
│ UPDATED PIPELINE (with fix) │
├──────────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. TURN_START │
│ └─> Initialize turn context │
│ │
│ 2. TOOL_EXECUTION_LOOP │
│ ├─> Tool call: message ("收到,开始分析...") │
│ │ └─> channel.send() ← IMMEDIATE DELIVERY │
│ │ └─> return { deliveredAt, deliveryMode: "immediate" } │
│ │ │
│ ├─> Tool call: database.query │
│ │ └─> context.results.push(result) ← Normal buffering │
│ │ │
│ ├─> Tool call: message ("数据拉完...") │
│ │ └─> channel.send() ← IMMEDIATE DELIVERY │
│ │ │
│ └─> Tool call: file.write │
│ └─> context.results.push(result) │
│ │
│ 3. TURN_END │
│ └─> flushResults() - only non-immediate results │
│ └─> channel.sendBatch([query_result, write_result]) │
│ │
└──────────────────────────────────────────────────────────────────────────────┘
🧪 検証
テストケース1:即時配信の検証
目的: メッセージがターン終了時ではなく呼び出し時に到着することを確認します。
# Test script: verify message timing
#!/bin/bash
START_TIME=$(date +%s.%N)
# Invoke agent with timed message tool calls
curl -X POST http://localhost:3000/api/sessions/test-001/invoke \
-H "Content-Type: application/json" \
-d '{
"message": "Perform 3 searches and send progress after each"
}'
# Capture message delivery times from channel logs
# Expected: 3 separate delivery timestamps
# Actual (before fix): single timestamp at turn end
echo "Checking message delivery timestamps..."
grep "Message delivered" /var/log/openclaw/channel.log | \
awk '{print $1, $2, $8}' | \
sort -u
修正後の期待される出力:
2024-01-15 10:30:00.123 deliveredAt=1705315800123
2024-01-15 10:30:35.456 deliveredAt=1705315835456
2024-01-15 10:31:05.789 deliveredAt=1705315865789
2024-01-15 10:31:35.000 TURN_END
失敗指標(修正前):
2024-01-15 10:31:35.000 deliveredAt=1705315895000 ← All three
2024-01-15 10:31:35.000 deliveredAt=1705315895000 ← Same timestamp
2024-01-15 10:31:35.000 deliveredAt=1705315895000 ← Turn end
テストケース2:混合配信モード
目的: immediate: falseでもメッセージが正しくキューに入ることを確認します。
# Agent prompt demonstrating mixed modes:
# Use immediate delivery for progress: "Starting task..."
# Use batched for audit trail: "Query executed at X"
# Verify batched messages don't appear until turn end
# while immediate messages do
# Step 1: Start monitoring
tail -f /var/log/openclaw/channel.log | grep -E "(delivered|queued)" &
# Step 2: Invoke turn with both modes
curl -X POST http://localhost:3000/api/sessions/test-002/invoke \
-d '{"message": "process with both message modes"}'
# Step 3: Verify output
# Should see immediate messages logged during execution
# Should see batched messages only at TURN_END marker
テストケース3:チャネル互換性のフォールバック
目的: チャネルが即時送信機能を持たない場合のgracefulなフォールバックを確認します。
# If channel.send() throws "Method not implemented",
# verify message falls back to batch queue
# Test with mock channel that doesn't implement send()
const mockChannel = {
sendBatch: async (results) => { /* existing */ },
// send() intentionally omitted
};
# Invoke message tool
# Expected: succeeds via fallback, logged as "deliveredAt: batched"
grep "fellBackToBatch" /var/log/openclaw/tools.log
# Should show: message tool fell back to batch mode
統合テストスイート
# packages/tools/message/src/__tests__/message-delivery.test.ts
describe("Message Tool Delivery Modes", () => {
let mockContext: TurnContext;
let mockChannel: jest.Mocked;
beforeEach(() => {
mockChannel = {
send: jest.fn().mockResolvedValue(undefined),
sendBatch: jest.fn().mockResolvedValue(undefined),
// ... other methods
};
mockContext = createMockContext({
channel: mockChannel,
session: { id: "test-session", channelType: "telegram" }
});
});
test("delivers immediately by default", async () => {
const tool = new MessageTool();
await tool.execute({ content: "Immediate message" }, mockContext);
expect(mockChannel.send).toHaveBeenCalledTimes(1);
expect(mockChannel.send).toHaveBeenCalledWith(
expect.objectContaining({
content: "Immediate message",
metadata: expect.objectContaining({
deliveryMode: "immediate"
})
})
);
expect(mockChannel.sendBatch).not.toHaveBeenCalled();
});
test("queues when immediate: false", async () => {
const tool = new MessageTool();
await tool.execute(
{ content: "Batched message", immediate: false },
mockContext
);
expect(mockChannel.send).not.toHaveBeenCalled();
expect(mockContext.results).toContainEqual(
expect.objectContaining({
type: "message",
content: "Batched message",
metadata: { deliveryMode: "batched" }
})
);
});
test("falls back to batch when channel.send() unavailable", async () => {
mockChannel.send = undefined; // Simulate unsupported channel
const tool = new MessageTool();
const result = await tool.execute(
{ content: "Test" },
mockContext
);
expect(result.metadata.fellBackToBatch).toBe(true);
expect(mockContext.results).toContainEqual(
expect.objectContaining({
type: "message",
metadata: { deliveryMode: "batched" }
})
);
});
});
手動検証チェックリスト
- トレースログが即時配信を表示:
grep "deliverImmediately\|Message delivered" logs/trace.log - ターンエンドバッチが即時メッセージを除外:
grep "sendBatch" logs/trace.log | jq '.messages | length'メッセージツール以外のツール総数と等しくなるはずです - タイミングの分離が見える: メッセージ配信タイムスタンプがターンエンドタイムスタンプと異なる
- 設定変更が尊重される:
defaultDeliveryMode: "batched"を設定すると古い動作に戻ります
⚠️ よくある落とし穴
落とし穴1:チャネルレート制限
問題: 素早い即時送信はチャネルのレート制限(Telegramは秒間約30件のメッセージ制限など)をトリガーする可能性があります。
対策:
// Implement throttling for immediate delivery
class ThrottledChannelAdapter implements ChannelAdapter {
private sendQueue: Promise = Promise.resolve();
private minIntervalMs = 100; // Max 10 messages/second
async send(params: SendParams): Promise {
this.sendQueue = this.sendQueue.then(async () => {
await this.throttle();
return this.channel.send(params);
});
await this.sendQueue;
}
private async throttle(): Promise {
// Rate limit enforcement
}
}
落とし穴2:メッセージ順序の違反
問題: 即時メッセージがEarlierのバッチメッセージより先に到着し、年代順が崩れる可能性があります。
シナリオ:
Tool sequence:
1. message "Step 1" (immediate) → arrives at T+5s
2. database.query (batched) → queued
3. message "Step 2" (immediate) → arrives at T+10s
4. Turn end → batched results arrive at T+15s
User sees:
[T+5s] Step 1
[T+10s] Step 2
[T+15s] Query result (should have been before Step 2?)
対策: 順序の期待値を文書化する;関連するメッセージには一貫した配信モードを使用する必要があります。
落とし穴3:セッション状態の同期
問題: 即時メッセージがまだコミットされていないデータを参照する可能性があります。
例:
// Agent flow that causes inconsistency
1. message "Starting query for user ${session.userId}" // immediate
2. session.set("userId", "123") // queued
3. Turn end → state committed
User sees message with undefined userId (race condition)
対策: セッション状態の更新が同期していることを確認する;即時メッセージが安全になるまで状態書き込みを延期する。
落とし穴4:チャネルアダプターの互換性マトリックス
リスク: すべてのチャネルが即時送信 지원하는わけではない;一部はバッチ応答のみサポートします。
| チャネル | 即時送信サポート | メモ |
|---|---|---|
| Telegram | ✅ 完全 | スロットリング付きで素早い送信をサポート |
| Slack | ⚠️ 制限あり | Webhookはfire-and-forget;RTMにはレート制限あり |
| Discord | ✅ 完全 | Botメッセージは即座に送信可能 |
| WebSocket | ✅ 完全 | クライアントに直接ストリーム |
| Webhook | ✅ 完全 | コールバックURLにPOST |
| Console | ✅ 完全 | 直接stdout |
| Teams | ⚠️ 制限あり | プactivemessagingモードが必要 |
アクション: 即時モードを使用する前にChannelAdapterCapabilitiesを確認してください。
落とし穴5:トレース/ログ記録の複雑さ
問題: 交错された即時とバッチ配信により、トレースが複雑になります。
対策: フィルタリングのためにすべてのログエントリにdeliveryModeとturnIdを含める:
{
"timestamp": "...",
"level": "debug",
"message": "Message delivered",
"turnId": 42,
"deliveryMode": "immediate",
"sequenceInTurn": 1,
"content": "收到,开始分析..."
}
落とし穴6:後方互換性の回帰
リスク: バッチ動作に依存する既存の車が壊れる可能性があります。
シナリオ:
- メッセージをツール結果と一緒にグループ化することを期待する車
- ターンエンドで正確にN件のメッセージを期待するUI
対策:
- デフォルトを
immediate: trueに変更し、変更を prominently documentします - opt-out用の設定フラグ
tool.message.defaultDeliveryMode: "batched"を提供します - まずはopt-in機能としてリリースし、次のメジャーバージョンでデフォルトを変更します
落とし穴7:CI/CDでのテスト
問題: 可変リソース割り当てを持つCI環境では、タイミングベースのテストが不安定です。
対策:
// Use deterministic test with mocked time
test("delivers immediately based on flag, not timing", async () => {
const tool = new MessageTool();
await tool.execute({ content: "test" }, mockContext);
// Verify send() was called (immediate) or queued (batched)
// NOT: await waitFor(() => sendCalled())
// YES: expect(sendCalled).toBe(true)
});
🔗 関連するエラー
関連するGitHubイシュー
| イシュー | 関係性 | 主な区別 |
|---|---|---|
| #25463 | 付随的 | messageツールとsession.reply()間での1つのターン内のメッセージ順序。このイシューはすべてのmessageツール呼び出しが遅延することについて;#25463は異なるメッセージソース間の順序について。 |
| #18089 | 付随的 | 双方向メッセージ処理アーキテクチャ。双方向通信の有効化に関連するが、異なるアーキテクチャ層。 |
| #31234 | 参考情報 | 「長いターン中にユーザーが空の画面を見る」— この修正で解決される症状の記述。 |
| #28901 | 対照 | 「効率性のためにすべてのチャネル出力をバッチ処理」— このイシューがchallengesする現在の設計哲学。 |
| #34567 | ブロック中 | 「ツール結果のストリーミング」— 別の配信メカニズムを提供するストリーミングアーキテクチャで、即時配信と潜在的にならせる可能性がある。 |
関連する設定オプション
| 設定キー | 現在の動作 | この修正での変更後 |
|---|---|---|
tool.message.deliveryMode | ハードコードされた"batched" | 設定可能:“immediate” | “batched” |
turn.maxDuration | ターンタイムアウト | 長いターンが漸増的にメッセージを配信するようになった場合、調整が必要かもしれない |
channel.batchSize | バッチあたりの最大アイテム数 | セマンティックな意味が変わる;即時送信はバッチ処理をバイパス |
関連するエラーコード
| エラーコード | 説明 | 関連性 |
|---|---|---|
TOOL_TIMEOUT_01 | ツール実行がタイムアウトを超過 | メッセージ送信が遅い場合、即時配信でより表面化する可能性がある |
CHANNEL_RATE_LIMIT | チャネルがレート制限によりメッセージを拒否 | 素早い即時送信により直接トリガー |
CHANNEL_NOT_SUPPORTED | チャネルが必要な機能を持っていない | 即時配信をサポートできないチャネル用 |
SESSION_STATE_CONFLICT | 即時メッセージ送信中に状態が変更された | セッション状態が適切に同期されていない場合の競合状態 |
歴史的背景
元のバッチ処理の根拠(#18901より):
“バッチ処理はAPI呼び出し回数を減らし、メッセージ順序を保証する。バッチ処理がなければ、10回のツール呼び出しと5件のメッセージを含むターンは15回の別々のAPI呼び出しになる。”
反論(イシュー#25463の議論から):
“
messageツールはセマンティックには「今すぐユーザーに配信する」を意味する。バッチ処理は意図に反し、プログレス更新のようなリアルタイムユースケースを壊す。”
解決パス: この修正はオプションA(即時をデフォルト)+オプションC(明示的なフラグ)を実装し、即時配信をデフォルトにしながら特定ユースケース向けのバッチ処理をopt-inとして維持することで、両方の立場を和解させます。
ドキュメント相互参照
- Message Tool Reference —
immediateパラメータを含む更新されたスキーマ - Turn Processing Architecture — 即時配信パスを示す更新されたフローダイアグラム
- Channel Adapter Guide — 即時配信サポートのための
send()メソッド要件 - Migration Guide: v2.x to v3.0 — デフォルト配信モードのbreaking change通知