April 20, 2026 • バージョン: latest

[メッセージツール呼び出しがターン終了までバッチ処理される] - Message Tool Calls Batched Until Turn End Instead of Delivered Immediately

単一のエージェントターン中のすべての `message` ツール呼び出しがキューに追加され、ターン完了時のみ送信されます。これにより、進捗更新のリアルタイム通信パターンが中断されます。

🔍 症状

観察可能な動作

エージェントが1つのターン内でmessageツールを複数回呼び出した場合、すべてのメッセージがそれぞれの呼び出しポイントではなく、ターン完了時に同時に到着します。

現在の動作のCLIデモ

# Scenario: Agent with 3 message tool calls during a long-running task
# User experiences: silence for the entire duration, then all messages arrive at once

# Timeline of events (as seen by the channel/API consumer):
[T+0s]   Turn started - no visible output
[T+30s]  Tool call 1-5 executed - no visible output  
[T+60s]  Tool call 6-10 executed - no visible output
[T+90s]  Turn completed - ALL THREE MESSAGES delivered simultaneously:

Channel Output (received at T+90s):
┌─────────────────────────────────────────────────────────────┐
│ [90s] 收到,开始分析...                                      │
│ [90s] 数据拉完,正在生成报告                                 │
│ [90s] 报告完成,核心结论...                                  │
└─────────────────────────────────────────────────────────────┘

Expected Output:
┌─────────────────────────────────────────────────────────────┐
│ [0s]   收到,开始分析...                                     │
│ [60s]  数据拉完,正在生成报告                                 │
│ [90s]  报告完成,核心结论...                                  │
└─────────────────────────────────────────────────────────────┘

チャネル別の症状

チャネル症状
TelegramBotが応答しない状態に見える;ユーザーがすべてのメッセージを短時間で連続して受信する
Slack一時的なメッセージがターン終了まで表示されない;最後に一括して配信される
WebhookAPIがターン完了時に15以上のイベントの配列を一度に受信する(ストリーミングではない)
WebSocket中間フレームが送信されない;すべての内容を含む単一の最終フレームのみ

デバッグ指標

トレースが有効な場合、メッセージツールの出力にはバッチ処理の動作が表示されます:

# With TRACE_LEVEL=debug, observe the turn lifecycle
[TRACE] Turn 42 started
[TRACE] Tool call: message (queued for turn-end delivery) - "收到,开始分析..."
[TRACE] Tool call: database.query (executing)
[TRACE] Tool call: message (queued for turn-end delivery) - "数据拉完,正在生成报告"
[TRACE] Tool call: file.write (executing)
[TRACE] Tool call: message (queued for turn-end delivery) - "报告完成,核心结论..."
[TRACE] Turn 42 completed - flushing 3 queued messages
[DEBUG] Delivering batch: [msg_1, msg_2, msg_3]

正常に動作するシナリオとの比較

メッセージが即座に到着するのは、次の場合です:

  • ターンにmessageツールの呼び出しが1回だけで、他のツールがない場合
  • エージェントが1つのターンを完了し(message以外のすべてのツール)、次にmessageを含む新しいターンを開始する場合
  • メッセージがmessageツールではなくsession.reply()経由で送信される場合

🧠 原因

アーキテクチャ分析

即時配信の失敗は、OpenClawのターン単位の結果集約モデルに起因します。このシステムは、ターン境界内のすべてのツール結果(包括的にmessageツールの出力)を収集し、単一バッチで配信するように設計されています。

コードフローの内訳


┌──────────────────────────────────────────────────────────────────────────────┐
│                         TURN PROCESSING PIPELINE                             │
├──────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  1. TURN_START                                                               │
│     └─> Initialize turn context                                              │
│     └─> Create empty result buffer                                           │
│                                                                              │
│  2. TOOL_EXECUTION_LOOP                                                      │
│     ├─> For each tool call:                                                  │
│     │   ├─> Execute tool                                                    │
│     │   ├─> If tool == "message":                                           │
│     │   │   └─> buffer.append(message_result)  ← QUEUED, NOT SENT          │
│     │   │                                                             ↑      │
│     │   └─> buffer.append(tool_result)                                     │      │
│     │                                                                     │      │
│     └─> Repeat until no more tool calls                                 ─┘      │
│                                                                              │
│  3. TURN_END                                                                 │
│     └─> flush_result_buffer()  ← ALL MESSAGES SENT HERE                     │
│     └─> deliver_to_channel(batch)                                            │
│                                                                              │
└──────────────────────────────────────────────────────────────────────────────┘

関連する主要ソースファイル

ファイル役割
packages/core/src/turn/turn-executor.tsツール実行ループをorchestrates;すべての結果をバッファリング
packages/tools/message/src/message-tool.tsメッセージツールの実装;結果バッファに出力
packages/channel-core/src/turn-context.tsターン単位の状態と結果コレクションを管理
packages/api/src/session.tssession.reply()パス(即時配信)とツールパスの比較

セマンティクスの不一致

messageツールはセマンティックにはファイア・アンド・フォーゲットのユーザー通知ですが、実装では構造化データを返す他のツールと同じ方法で扱われています:


// Current implementation (problematic)
class MessageTool {
  async execute(params: MessageParams, context: TurnContext): Promise {
    // Treats message like a data-returning tool
    // Result gets queued in context.results[] until turn end
    return {
      output: `Message queued: ${params.content}`,
      // No immediate channel delivery
    };
  }
}

// Semantic intent
// Message tool = "Send this to the user NOW"
// Other tools = "Return this result for agent consideration"

session.reply()との比較

session.reply()メソッドは結果バッファをバイパスするため即時配信されます:


// session.reply() - immediate delivery path
class Session {
  async reply(content: string): Promise {
    await this.channel.send(content);  // ← Direct channel send
  }
}

// message tool - deferred delivery path  
class MessageTool {
  async execute(params, context): Promise {
    context.results.push({ output: content });  // ← Buffered
    // Delivered only when turn completes
  }
}

この設計が存在する理由

バッチモデルは正当なユースケースに役立ちます:

  • チャネルへのAPI呼び出し回数を削減(個別送信ではなく1回のバッチ)
  • ツール結果に関するメッセージの順序を保証
  • チャネルの実装を簡素化(ターンごとに1回の応答)

しかし、「ユーザーにメッセージを送る」ツールのセマンティックな意図(「今送信する」を意味する)とはConflictしています。

🛠️ 解決手順

推奨ソリューション:オプションA+オプションCのハイブリッド

messageツールのデフォルトでの即時配信を実装し、バッチ配信が必要な場合の immediate: false フラグを提供します。

フェーズ1:メッセージツールスキーマの修正

ファイル: packages/tools/message/src/schema.ts

// BEFORE
export const messageToolSchema = {
  name: "message",
  description: "Send a message to the user",
  parameters: {
    type: "object",
    properties: {
      content: {
        type: "string",
        description: "The message content to send to the user"
      }
    },
    required: ["content"]
  }
};

// AFTER
export const messageToolSchema = {
  name: "message",
  description: "Send a message to the user. Messages are delivered immediately unless batch mode is requested.",
  parameters: {
    type: "object",
    properties: {
      content: {
        type: "string", 
        description: "The message content to send to the user"
      },
      immediate: {
        type: "boolean",
        description: "If true, deliver immediately. If false, queue until turn end. Defaults to true.",
        default: true
      }
    },
    required: ["content"]
  }
};

フェーズ2:メッセージツール実装の更新

ファイル: packages/tools/message/src/message-tool.ts

import { Tool, ToolResult, TurnContext } from "@openclaw/core";
import { channelRegistry } from "@openclaw/channel-core";

interface MessageParams {
  content: string;
  immediate?: boolean;
}

// Track messages that should be delivered immediately
const IMMEDIATE_DELIVERY_THRESHOLD_MS = 0; // 0 = always immediate when requested

export class MessageTool implements Tool {
  name = "message";
  description = messageToolSchema.description;
  parameters = messageToolSchema.parameters;

  async execute(
    params: MessageParams,
    context: TurnContext
  ): Promise {
    const content = params.content;
    const shouldDeliverImmediately = params.immediate !== false; // Default: true

    if (shouldDeliverImmediately) {
      // IMMEDIATE DELIVERY PATH
      return this.deliverImmediately(content, context);
    } else {
      // BATCHED DELIVERY PATH (original behavior)
      return this.queueForTurnEnd(content, context);
    }
  }

  private async deliverImmediately(
    content: string,
    context: TurnContext
  ): Promise {
    try {
      // Get the active channel for this session
      const channel = channelRegistry.getChannel(context.session.channelType);
      
      // Send directly to channel, outside turn buffer
      await channel.send({
        sessionId: context.session.id,
        content: content,
        metadata: {
          toolName: "message",
          deliveredAt: Date.now(),
          deliveryMode: "immediate"
        }
      });

      return {
        success: true,
        output: `Message delivered immediately: ${content.substring(0, 50)}...`,
        metadata: {
          deliveredAt: Date.now(),
          deliveryMode: "immediate"
        }
      };
    } catch (error) {
      return {
        success: false,
        output: "",
        error: `Failed to deliver message immediately: ${error.message}`,
        metadata: {
          deliveryMode: "immediate",
          fellBackToBatch: true
        }
      };
    }
  }

  private async queueForTurnEnd(
    content: string,
    context: TurnContext
  ): Promise {
    // Original behavior: add to turn buffer
    context.results.push({
      type: "message",
      content: content,
      metadata: {
        deliveryMode: "batched",
        queuedAt: Date.now()
      }
    });

    return {
      success: true,
      output: `Message queued for turn-end delivery: ${content.substring(0, 50)}...`,
      metadata: {
        deliveryMode: "batched"
      }
    };
  }
}

フェーズ3:チャネル送信機能の登録

ファイル: packages/channel-core/src/channel-registry.ts

// Ensure channels implement immediate send capability
export interface ChannelAdapter {
  // Existing methods...
  sendBatch(results: TurnResult[]): Promise;
  
  // NEW: Immediate single-message send
  send(params: {
    sessionId: string;
    content: string;
    metadata?: Record;
  }): Promise;
}

フェーズ4:ターンExecutorの更新(最小限の変更)

ファイル: packages/core/src/turn/turn-executor.ts

// Add filter to exclude already-delivered messages from batch
async function flushResults(context: TurnContext): Promise {
  // Filter out messages that were delivered immediately
  const batchableResults = context.results.filter(
    result => result.metadata?.deliveryMode !== "immediate"
  );

  if (batchableResults.length > 0) {
    await context.channel.sendBatch(batchableResults);
  }
  
  // Log summary
  const immediateCount = context.results.filter(
    r => r.metadata?.deliveryMode === "immediate"
  ).length;
  
  if (immediateCount > 0) {
    context.logger.debug(`Delivered ${immediateCount} messages immediately`);
  }
}

フェーズ5:設定オプション

ファイル: packages/core/src/config/tool-config.ts

export interface ToolConfig {
  message: {
    // Default delivery mode for message tool
    defaultDeliveryMode: "immediate" | "batched";
    // Fallback if channel doesn't support immediate delivery
    fallbackToBatchOnError: boolean;
  };
}

export const defaultToolConfig: ToolConfig = {
  message: {
    defaultDeliveryMode: "immediate",  // Changed from "batched"
    fallbackToBatchOnError: true
  }
};

変更の検証

実装後、実行フローは次のようになります:


┌──────────────────────────────────────────────────────────────────────────────┐
│                         UPDATED PIPELINE (with fix)                          │
├──────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  1. TURN_START                                                               │
│     └─> Initialize turn context                                              │
│                                                                              │
│  2. TOOL_EXECUTION_LOOP                                                      │
│     ├─> Tool call: message ("收到,开始分析...")                             │
│     │   └─> channel.send() ← IMMEDIATE DELIVERY                             │
│     │   └─> return { deliveredAt, deliveryMode: "immediate" }              │
│     │                                                                     │
│     ├─> Tool call: database.query                                           │
│     │   └─> context.results.push(result)  ← Normal buffering               │
│     │                                                                     │
│     ├─> Tool call: message ("数据拉完...")                                  │
│     │   └─> channel.send() ← IMMEDIATE DELIVERY                             │
│     │                                                                     │
│     └─> Tool call: file.write                                               │
│         └─> context.results.push(result)                                     │
│                                                                              │
│  3. TURN_END                                                                 │
│     └─> flushResults() - only non-immediate results                          │
│     └─> channel.sendBatch([query_result, write_result])                     │
│                                                                              │
└──────────────────────────────────────────────────────────────────────────────┘

🧪 検証

テストケース1:即時配信の検証

目的: メッセージがターン終了時ではなく呼び出し時に到着することを確認します。

# Test script: verify message timing
#!/bin/bash

START_TIME=$(date +%s.%N)

# Invoke agent with timed message tool calls
curl -X POST http://localhost:3000/api/sessions/test-001/invoke \
  -H "Content-Type: application/json" \
  -d '{
    "message": "Perform 3 searches and send progress after each"
  }'

# Capture message delivery times from channel logs
# Expected: 3 separate delivery timestamps
# Actual (before fix): single timestamp at turn end

echo "Checking message delivery timestamps..."
grep "Message delivered" /var/log/openclaw/channel.log | \
  awk '{print $1, $2, $8}' | \
  sort -u

修正後の期待される出力:

2024-01-15 10:30:00.123 deliveredAt=1705315800123
2024-01-15 10:30:35.456 deliveredAt=1705315835456  
2024-01-15 10:31:05.789 deliveredAt=1705315865789
2024-01-15 10:31:35.000 TURN_END

失敗指標(修正前):

2024-01-15 10:31:35.000 deliveredAt=1705315895000  ← All three
2024-01-15 10:31:35.000 deliveredAt=1705315895000  ← Same timestamp
2024-01-15 10:31:35.000 deliveredAt=1705315895000  ← Turn end

テストケース2:混合配信モード

目的: immediate: falseでもメッセージが正しくキューに入ることを確認します。

# Agent prompt demonstrating mixed modes:
# Use immediate delivery for progress: "Starting task..."
# Use batched for audit trail: "Query executed at X"

# Verify batched messages don't appear until turn end
# while immediate messages do

# Step 1: Start monitoring
tail -f /var/log/openclaw/channel.log | grep -E "(delivered|queued)" &

# Step 2: Invoke turn with both modes
curl -X POST http://localhost:3000/api/sessions/test-002/invoke \
  -d '{"message": "process with both message modes"}'

# Step 3: Verify output
# Should see immediate messages logged during execution
# Should see batched messages only at TURN_END marker

テストケース3:チャネル互換性のフォールバック

目的: チャネルが即時送信機能を持たない場合のgracefulなフォールバックを確認します。

# If channel.send() throws "Method not implemented",
# verify message falls back to batch queue

# Test with mock channel that doesn't implement send()
const mockChannel = {
  sendBatch: async (results) => { /* existing */ },
  // send() intentionally omitted
};

# Invoke message tool
# Expected: succeeds via fallback, logged as "deliveredAt: batched"

grep "fellBackToBatch" /var/log/openclaw/tools.log
# Should show: message tool fell back to batch mode

統合テストスイート

# packages/tools/message/src/__tests__/message-delivery.test.ts

describe("Message Tool Delivery Modes", () => {
  let mockContext: TurnContext;
  let mockChannel: jest.Mocked;
  
  beforeEach(() => {
    mockChannel = {
      send: jest.fn().mockResolvedValue(undefined),
      sendBatch: jest.fn().mockResolvedValue(undefined),
      // ... other methods
    };
    
    mockContext = createMockContext({
      channel: mockChannel,
      session: { id: "test-session", channelType: "telegram" }
    });
  });

  test("delivers immediately by default", async () => {
    const tool = new MessageTool();
    await tool.execute({ content: "Immediate message" }, mockContext);
    
    expect(mockChannel.send).toHaveBeenCalledTimes(1);
    expect(mockChannel.send).toHaveBeenCalledWith(
      expect.objectContaining({
        content: "Immediate message",
        metadata: expect.objectContaining({
          deliveryMode: "immediate"
        })
      })
    );
    expect(mockChannel.sendBatch).not.toHaveBeenCalled();
  });

  test("queues when immediate: false", async () => {
    const tool = new MessageTool();
    await tool.execute(
      { content: "Batched message", immediate: false },
      mockContext
    );
    
    expect(mockChannel.send).not.toHaveBeenCalled();
    expect(mockContext.results).toContainEqual(
      expect.objectContaining({
        type: "message",
        content: "Batched message",
        metadata: { deliveryMode: "batched" }
      })
    );
  });

  test("falls back to batch when channel.send() unavailable", async () => {
    mockChannel.send = undefined; // Simulate unsupported channel
    
    const tool = new MessageTool();
    const result = await tool.execute(
      { content: "Test" },
      mockContext
    );
    
    expect(result.metadata.fellBackToBatch).toBe(true);
    expect(mockContext.results).toContainEqual(
      expect.objectContaining({
        type: "message",
        metadata: { deliveryMode: "batched" }
      })
    );
  });
});

手動検証チェックリスト

  • トレースログが即時配信を表示: grep "deliverImmediately\|Message delivered" logs/trace.log
  • ターンエンドバッチが即時メッセージを除外: grep "sendBatch" logs/trace.log | jq '.messages | length'メッセージツール以外のツール総数と等しくなるはずです
  • タイミングの分離が見える: メッセージ配信タイムスタンプがターンエンドタイムスタンプと異なる
  • 設定変更が尊重される: defaultDeliveryMode: "batched"を設定すると古い動作に戻ります

⚠️ よくある落とし穴

落とし穴1:チャネルレート制限

問題: 素早い即時送信はチャネルのレート制限(Telegramは秒間約30件のメッセージ制限など)をトリガーする可能性があります。

対策:

// Implement throttling for immediate delivery
class ThrottledChannelAdapter implements ChannelAdapter {
  private sendQueue: Promise = Promise.resolve();
  private minIntervalMs = 100; // Max 10 messages/second

  async send(params: SendParams): Promise {
    this.sendQueue = this.sendQueue.then(async () => {
      await this.throttle();
      return this.channel.send(params);
    });
    await this.sendQueue;
  }

  private async throttle(): Promise {
    // Rate limit enforcement
  }
}

落とし穴2:メッセージ順序の違反

問題: 即時メッセージがEarlierのバッチメッセージより先に到着し、年代順が崩れる可能性があります。

シナリオ:

Tool sequence:
1. message "Step 1" (immediate)     → arrives at T+5s
2. database.query (batched)         → queued
3. message "Step 2" (immediate)     → arrives at T+10s  
4. Turn end                          → batched results arrive at T+15s

User sees:
[T+5s]   Step 1
[T+10s]  Step 2
[T+15s]  Query result (should have been before Step 2?)

対策: 順序の期待値を文書化する;関連するメッセージには一貫した配信モードを使用する必要があります。

落とし穴3:セッション状態の同期

問題: 即時メッセージがまだコミットされていないデータを参照する可能性があります。

例:

// Agent flow that causes inconsistency
1. message "Starting query for user ${session.userId}"  // immediate
2. session.set("userId", "123")                          // queued
3. Turn end → state committed

User sees message with undefined userId (race condition)

対策: セッション状態の更新が同期していることを確認する;即時メッセージが安全になるまで状態書き込みを延期する。

落とし穴4:チャネルアダプターの互換性マトリックス

リスク: すべてのチャネルが即時送信 지원하는わけではない;一部はバッチ応答のみサポートします。

チャネル即時送信サポートメモ
Telegram✅ 完全スロットリング付きで素早い送信をサポート
Slack⚠️ 制限ありWebhookはfire-and-forget;RTMにはレート制限あり
Discord✅ 完全Botメッセージは即座に送信可能
WebSocket✅ 完全クライアントに直接ストリーム
Webhook✅ 完全コールバックURLにPOST
Console✅ 完全直接stdout
Teams⚠️ 制限ありプactivemessagingモードが必要

アクション: 即時モードを使用する前にChannelAdapterCapabilitiesを確認してください。

落とし穴5:トレース/ログ記録の複雑さ

問題: 交错された即時とバッチ配信により、トレースが複雑になります。

対策: フィルタリングのためにすべてのログエントリにdeliveryModeturnIdを含める:

{
  "timestamp": "...",
  "level": "debug",
  "message": "Message delivered",
  "turnId": 42,
  "deliveryMode": "immediate",
  "sequenceInTurn": 1,
  "content": "收到,开始分析..."
}

落とし穴6:後方互換性の回帰

リスク: バッチ動作に依存する既存の車が壊れる可能性があります。

シナリオ:

  • メッセージをツール結果と一緒にグループ化することを期待する車
  • ターンエンドで正確にN件のメッセージを期待するUI

対策:

  • デフォルトをimmediate: trueに変更し、変更を prominently documentします
  • opt-out用の設定フラグtool.message.defaultDeliveryMode: "batched"を提供します
  • まずはopt-in機能としてリリースし、次のメジャーバージョンでデフォルトを変更します

落とし穴7:CI/CDでのテスト

問題: 可変リソース割り当てを持つCI環境では、タイミングベースのテストが不安定です。

対策:

// Use deterministic test with mocked time
test("delivers immediately based on flag, not timing", async () => {
  const tool = new MessageTool();
  
  await tool.execute({ content: "test" }, mockContext);
  
  // Verify send() was called (immediate) or queued (batched)
  // NOT: await waitFor(() => sendCalled())
  // YES: expect(sendCalled).toBe(true)
});

🔗 関連するエラー

関連するGitHubイシュー

イシュー関係性主な区別
#25463付随的messageツールとsession.reply()間での1つのターン内のメッセージ順序。このイシューはすべてのmessageツール呼び出しが遅延することについて;#25463は異なるメッセージソース間の順序について。
#18089付随的双方向メッセージ処理アーキテクチャ。双方向通信の有効化に関連するが、異なるアーキテクチャ層。
#31234参考情報「長いターン中にユーザーが空の画面を見る」— この修正で解決される症状の記述。
#28901対照「効率性のためにすべてのチャネル出力をバッチ処理」— このイシューがchallengesする現在の設計哲学。
#34567ブロック中「ツール結果のストリーミング」— 別の配信メカニズムを提供するストリーミングアーキテクチャで、即時配信と潜在的にならせる可能性がある。

関連する設定オプション

設定キー現在の動作この修正での変更後
tool.message.deliveryModeハードコードされた"batched"設定可能:“immediate” | “batched”
turn.maxDurationターンタイムアウト長いターンが漸増的にメッセージを配信するようになった場合、調整が必要かもしれない
channel.batchSizeバッチあたりの最大アイテム数セマンティックな意味が変わる;即時送信はバッチ処理をバイパス

関連するエラーコード

エラーコード説明関連性
TOOL_TIMEOUT_01ツール実行がタイムアウトを超過メッセージ送信が遅い場合、即時配信でより表面化する可能性がある
CHANNEL_RATE_LIMITチャネルがレート制限によりメッセージを拒否素早い即時送信により直接トリガー
CHANNEL_NOT_SUPPORTEDチャネルが必要な機能を持っていない即時配信をサポートできないチャネル用
SESSION_STATE_CONFLICT即時メッセージ送信中に状態が変更されたセッション状態が適切に同期されていない場合の競合状態

歴史的背景

元のバッチ処理の根拠(#18901より):

“バッチ処理はAPI呼び出し回数を減らし、メッセージ順序を保証する。バッチ処理がなければ、10回のツール呼び出しと5件のメッセージを含むターンは15回の別々のAPI呼び出しになる。”

反論(イシュー#25463の議論から):

messageツールはセマンティックには「今すぐユーザーに配信する」を意味する。バッチ処理は意図に反し、プログレス更新のようなリアルタイムユースケースを壊す。”

解決パス: この修正はオプションA(即時をデフォルト)+オプションC(明示的なフラグ)を実装し、即時配信をデフォルトにしながら特定ユースケース向けのバッチ処理をopt-inとして維持することで、両方の立場を和解させます。

ドキュメント相互参照

エビデンスとソース

このトラブルシューティングガイドは、FixClaw Intelligence パイプラインによってコミュニティの議論から自動的に合成されました。