[Nachrichten-Toolaufrufe werden erst am Turn-Ende gesendet] - Message Tool Calls Batched Until Turn End Instead of Delivered Immediately
Alle `message`-Toolaufrufe während eines einzigen Agenten-Turns werden in die Warteschlange eingereiht und erst bei Turn-Abschluss gesendet, was Echtzeit-Kommunikationsmuster für Fortschrittsaktualisierungen unterbricht.
🔍 Symptome
Beobachtbares Verhalten
Wenn ein Agent das message-Tool innerhalb eines einzelnen Turns mehrfach aufruft, treffen alle Nachrichten gleichzeitig bei Turn-Abschluss ein, anstatt zu ihren jeweiligen Aufrufzeitpunkten.
CLI-Demonstration des aktuellen Verhaltens
# Scenario: Agent with 3 message tool calls during a long-running task
# User experiences: silence for the entire duration, then all messages arrive at once
# Timeline of events (as seen by the channel/API consumer):
[T+0s] Turn started - no visible output
[T+30s] Tool call 1-5 executed - no visible output
[T+60s] Tool call 6-10 executed - no visible output
[T+90s] Turn completed - ALL THREE MESSAGES delivered simultaneously:
Channel Output (received at T+90s):
┌─────────────────────────────────────────────────────────────┐
│ [90s] 收到,开始分析... │
│ [90s] 数据拉完,正在生成报告 │
│ [90s] 报告完成,核心结论... │
└─────────────────────────────────────────────────────────────┘
Expected Output:
┌─────────────────────────────────────────────────────────────┐
│ [0s] 收到,开始分析... │
│ [60s] 数据拉完,正在生成报告 │
│ [90s] 报告完成,核心结论... │
└─────────────────────────────────────────────────────────────┘
Kanalspezifische Manifestationen
| Kanal | Symptom |
|---|---|
| Telegram | Bot erscheint nicht ansprechbar; Benutzer erhält alle Nachrichten in schneller Folge |
| Slack | Ephemere Nachrichten werden nicht angezeigt bis zum Turn-Ende; finale Batch-Lieferung |
| Webhook | API empfängt Array von 15+ Events bei Turn-Abschluss anstatt Streaming |
| WebSocket | Keine Zwischen-Frames gesendet; einzelnes finales Frame mit gesamtem Inhalt |
Debugging-Indikator
Wenn Tracing aktiviert ist, zeigt die message-Tool-Ausgabe Batch-Verhalten:
# With TRACE_LEVEL=debug, observe the turn lifecycle
[TRACE] Turn 42 started
[TRACE] Tool call: message (queued for turn-end delivery) - "收到,开始分析..."
[TRACE] Tool call: database.query (executing)
[TRACE] Tool call: message (queued for turn-end delivery) - "数据拉完,正在生成报告"
[TRACE] Tool call: file.write (executing)
[TRACE] Tool call: message (queued for turn-end delivery) - "报告完成,核心结论..."
[TRACE] Turn 42 completed - flushing 3 queued messages
[DEBUG] Delivering batch: [msg_1, msg_2, msg_3]
Kontrast zu funktionierenden Szenarien
Nachrichten werden sofort zugestellt, wenn:
- Ein Turn nur einen einzelnen
message-Tool-Aufruf ohne andere Tools enthält - Der Agent einen Turn abschließt (alle Nicht-Nachrichten-Tools), dann einen neuen Turn mit einer Nachricht startet
- Die Nachricht über
session.reply()anstatt über dasmessage-Tool gesendet wird
🧠 Ursache
Architekturanalyse
Das Scheitern der Sofortlieferung stammt aus OpenClaws turn-bezogenem Ergebnis-Aggregationsmodell. Das System ist so architektonisch konzipiert, dass alle Tool-Ergebnisse — einschließlich message-Tool-Ausgaben — innerhalb einer Turn-Grenze gesammelt werden, bevor sie in einer einzelnen Batch geliefert werden.
Code-Ablauf-Aufschlüsselung
┌──────────────────────────────────────────────────────────────────────────────┐
│ TURN PROCESSING PIPELINE │
├──────────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. TURN_START │
│ └─> Initialize turn context │
│ └─> Create empty result buffer │
│ │
│ 2. TOOL_EXECUTION_LOOP │
│ ├─> For each tool call: │
│ │ ├─> Execute tool │
│ │ ├─> If tool == "message": │
│ │ │ └─> buffer.append(message_result) ← QUEUED, NOT SENT │
│ │ │ ↑ │
│ │ └─> buffer.append(tool_result) │ │
│ │ │ │
│ └─> Repeat until no more tool calls ─┘ │
│ │
│ 3. TURN_END │
│ └─> flush_result_buffer() ← ALL MESSAGES SENT HERE │
│ └─> deliver_to_channel(batch) │
│ │
└──────────────────────────────────────────────────────────────────────────────┘
Betroffene Hauptquell-Dateien
| Datei | Rolle |
|---|---|
packages/core/src/turn/turn-executor.ts | Orchestriert Tool-Ausführungsschleife; puffert alle Ergebnisse |
packages/tools/message/src/message-tool.ts | Message-Tool-Implementierung; gibt an Ergebnis-Buffer aus |
packages/channel-core/src/turn-context.ts | Verwaltet turn-bezogenen Zustand und Ergebnissammlung |
packages/api/src/session.ts | session.reply() Pfad (Sofortlieferung) vs. Tool-Pfad |
Semantischer Konflikt
Das message-Tool ist semantisch eine Fire-and-Forget-Benachrichtigung, doch die Implementierung behandelt es identisch zu anderen Tools, die strukturierte Daten zurückgeben:
// Current implementation (problematic)
class MessageTool {
async execute(params: MessageParams, context: TurnContext): Promise {
// Treats message like a data-returning tool
// Result gets queued in context.results[] until turn end
return {
output: `Message queued: ${params.content}`,
// No immediate channel delivery
};
}
}
// Semantic intent
// Message tool = "Send this to the user NOW"
// Other tools = "Return this result for agent consideration"
Vergleich mit session.reply()
Die session.reply()-Methode liefert sofort, weil sie den Ergebnis-Buffer umgeht:
// session.reply() - immediate delivery path
class Session {
async reply(content: string): Promise {
await this.channel.send(content); // ← Direct channel send
}
}
// message tool - deferred delivery path
class MessageTool {
async execute(params, context): Promise {
context.results.push({ output: content }); // ← Buffered
// Delivered only when turn completes
}
}
Warum dieses Design existiert
Das Batch-Modell dient validen Anwendungsfällen:
- Reduziert API-Aufrufe an Kanäle (ein Batch vs. viele einzelne Sendungen)
- Stellt Nachrichtenreihenfolge relativ zu Tool-Ergebnissen sicher
- Vereinfacht Kanalimplementierungen (einzelne Antwort pro Turn)
Jedoch steht dies im Konflikt mit der semantischen Intention eines “Nachricht an Benutzer senden”-Tools, das Unmittelbarkeit impliziert.
🛠️ Schritt-für-Schritt-Lösung
Empfohlene Lösung: Option A + Option C Hybrid
Implementiere Sofortlieferung als Standard für das message-Tool und stelle ein immediate: false Flag für Fälle bereit, die Batch-Lieferung erfordern.
Phase 1: Message-Tool-Schema ändern
Datei: packages/tools/message/src/schema.ts
// BEFORE
export const messageToolSchema = {
name: "message",
description: "Send a message to the user",
parameters: {
type: "object",
properties: {
content: {
type: "string",
description: "The message content to send to the user"
}
},
required: ["content"]
}
};
// AFTER
export const messageToolSchema = {
name: "message",
description: "Send a message to the user. Messages are delivered immediately unless batch mode is requested.",
parameters: {
type: "object",
properties: {
content: {
type: "string",
description: "The message content to send to the user"
},
immediate: {
type: "boolean",
description: "If true, deliver immediately. If false, queue until turn end. Defaults to true.",
default: true
}
},
required: ["content"]
}
};
Phase 2: Message-Tool-Implementierung aktualisieren
Datei: packages/tools/message/src/message-tool.ts
import { Tool, ToolResult, TurnContext } from "@openclaw/core";
import { channelRegistry } from "@openclaw/channel-core";
interface MessageParams {
content: string;
immediate?: boolean;
}
// Track messages that should be delivered immediately
const IMMEDIATE_DELIVERY_THRESHOLD_MS = 0; // 0 = always immediate when requested
export class MessageTool implements Tool {
name = "message";
description = messageToolSchema.description;
parameters = messageToolSchema.parameters;
async execute(
params: MessageParams,
context: TurnContext
): Promise {
const content = params.content;
const shouldDeliverImmediately = params.immediate !== false; // Default: true
if (shouldDeliverImmediately) {
// IMMEDIATE DELIVERY PATH
return this.deliverImmediately(content, context);
} else {
// BATCHED DELIVERY PATH (original behavior)
return this.queueForTurnEnd(content, context);
}
}
private async deliverImmediately(
content: string,
context: TurnContext
): Promise {
try {
// Get the active channel for this session
const channel = channelRegistry.getChannel(context.session.channelType);
// Send directly to channel, outside turn buffer
await channel.send({
sessionId: context.session.id,
content: content,
metadata: {
toolName: "message",
deliveredAt: Date.now(),
deliveryMode: "immediate"
}
});
return {
success: true,
output: `Message delivered immediately: ${content.substring(0, 50)}...`,
metadata: {
deliveredAt: Date.now(),
deliveryMode: "immediate"
}
};
} catch (error) {
return {
success: false,
output: "",
error: `Failed to deliver message immediately: ${error.message}`,
metadata: {
deliveryMode: "immediate",
fellBackToBatch: true
}
};
}
}
private async queueForTurnEnd(
content: string,
context: TurnContext
): Promise {
// Original behavior: add to turn buffer
context.results.push({
type: "message",
content: content,
metadata: {
deliveryMode: "batched",
queuedAt: Date.now()
}
});
return {
success: true,
output: `Message queued for turn-end delivery: ${content.substring(0, 50)}...`,
metadata: {
deliveryMode: "batched"
}
};
}
}
Phase 3: Kanal-Send-Fähigkeit registrieren
Datei: packages/channel-core/src/channel-registry.ts
// Ensure channels implement immediate send capability
export interface ChannelAdapter {
// Existing methods...
sendBatch(results: TurnResult[]): Promise;
// NEW: Immediate single-message send
send(params: {
sessionId: string;
content: string;
metadata?: Record;
}): Promise;
}
Phase 4: Turn-Executor aktualisieren (Minimale Änderung)
Datei: packages/core/src/turn/turn-executor.ts
// Add filter to exclude already-delivered messages from batch
async function flushResults(context: TurnContext): Promise {
// Filter out messages that were delivered immediately
const batchableResults = context.results.filter(
result => result.metadata?.deliveryMode !== "immediate"
);
if (batchableResults.length > 0) {
await context.channel.sendBatch(batchableResults);
}
// Log summary
const immediateCount = context.results.filter(
r => r.metadata?.deliveryMode === "immediate"
).length;
if (immediateCount > 0) {
context.logger.debug(`Delivered ${immediateCount} messages immediately`);
}
}
Phase 5: Konfigurationsoption
Datei: packages/core/src/config/tool-config.ts
export interface ToolConfig {
message: {
// Default delivery mode for message tool
defaultDeliveryMode: "immediate" | "batched";
// Fallback if channel doesn't support immediate delivery
fallbackToBatchOnError: boolean;
};
}
export const defaultToolConfig: ToolConfig = {
message: {
defaultDeliveryMode: "immediate", // Changed from "batched"
fallbackToBatchOnError: true
}
};
Verifizierung der Änderungen
Nach der Implementierung wird der Ausführungsfluss zu:
┌──────────────────────────────────────────────────────────────────────────────┐
│ UPDATED PIPELINE (with fix) │
├──────────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. TURN_START │
│ └─> Initialize turn context │
│ │
│ 2. TOOL_EXECUTION_LOOP │
│ ├─> Tool call: message ("收到,开始分析...") │
│ │ └─> channel.send() ← IMMEDIATE DELIVERY │
│ │ └─> return { deliveredAt, deliveryMode: "immediate" } │
│ │ │
│ ├─> Tool call: database.query │
│ │ └─> context.results.push(result) ← Normal buffering │
│ │ │
│ ├─> Tool call: message ("数据拉完...") │
│ │ └─> channel.send() ← IMMEDIATE DELIVERY │
│ │ │
│ └─> Tool call: file.write │
│ └─> context.results.push(result) │
│ │
│ 3. TURN_END │
│ └─> flushResults() - only non-immediate results │
│ └─> channel.sendBatch([query_result, write_result]) │
│ │
└──────────────────────────────────────────────────────────────────────────────┘
🧪 Verifizierung
Testfall 1: Sofortlieferung-Verifizierung
Zweck: Bestätigen, dass Nachrichten zum Aufrufzeitpunkt ankommen, nicht bei Turn-Ende.
# Test script: verify message timing
#!/bin/bash
START_TIME=$(date +%s.%N)
# Invoke agent with timed message tool calls
curl -X POST http://localhost:3000/api/sessions/test-001/invoke \
-H "Content-Type: application/json" \
-d '{
"message": "Perform 3 searches and send progress after each"
}'
# Capture message delivery times from channel logs
# Expected: 3 separate delivery timestamps
# Actual (before fix): single timestamp at turn end
echo "Checking message delivery timestamps..."
grep "Message delivered" /var/log/openclaw/channel.log | \
awk '{print $1, $2, $8}' | \
sort -u
Erwartete Ausgabe (nach Fix):
2024-01-15 10:30:00.123 deliveredAt=1705315800123
2024-01-15 10:30:35.456 deliveredAt=1705315835456
2024-01-15 10:31:05.789 deliveredAt=1705315865789
2024-01-15 10:31:35.000 TURN_END
Fehlerindikator (vor Fix):
2024-01-15 10:31:35.000 deliveredAt=1705315895000 ← All three
2024-01-15 10:31:35.000 deliveredAt=1705315895000 ← Same timestamp
2024-01-15 10:31:35.000 deliveredAt=1705315895000 ← Turn end
Testfall 2: Gemischte Liefermodi
Zweck: Verifizieren, dass immediate: false Nachrichten korrekt weiterhin einreiht.
# Agent prompt demonstrating mixed modes:
# Use immediate delivery for progress: "Starting task..."
# Use batched for audit trail: "Query executed at X"
# Verify batched messages don't appear until turn end
# while immediate messages do
# Step 1: Start monitoring
tail -f /var/log/openclaw/channel.log | grep -E "(delivered|queued)" &
# Step 2: Invoke turn with both modes
curl -X POST http://localhost:3000/api/sessions/test-002/invoke \
-d '{"message": "process with both message modes"}'
# Step 3: Verify output
# Should see immediate messages logged during execution
# Should see batched messages only at TURN_END marker
Testfall 3: Kanal-Kompatibilitäts-Fallback
Zweck: Verifizieren eines sauberen Fallbacks, wenn Kanal keine Sofort-Sendung unterstützt.
# If channel.send() throws "Method not implemented",
# verify message falls back to batch queue
# Test with mock channel that doesn't implement send()
const mockChannel = {
sendBatch: async (results) => { /* existing */ },
// send() intentionally omitted
};
# Invoke message tool
# Expected: succeeds via fallback, logged as "deliveredAt: batched"
grep "fellBackToBatch" /var/log/openclaw/tools.log
# Should show: message tool fell back to batch mode
Integrationstest-Suite
# packages/tools/message/src/__tests__/message-delivery.test.ts
describe("Message Tool Delivery Modes", () => {
let mockContext: TurnContext;
let mockChannel: jest.Mocked;
beforeEach(() => {
mockChannel = {
send: jest.fn().mockResolvedValue(undefined),
sendBatch: jest.fn().mockResolvedValue(undefined),
// ... other methods
};
mockContext = createMockContext({
channel: mockChannel,
session: { id: "test-session", channelType: "telegram" }
});
});
test("delivers immediately by default", async () => {
const tool = new MessageTool();
await tool.execute({ content: "Immediate message" }, mockContext);
expect(mockChannel.send).toHaveBeenCalledTimes(1);
expect(mockChannel.send).toHaveBeenCalledWith(
expect.objectContaining({
content: "Immediate message",
metadata: expect.objectContaining({
deliveryMode: "immediate"
})
})
);
expect(mockChannel.sendBatch).not.toHaveBeenCalled();
});
test("queues when immediate: false", async () => {
const tool = new MessageTool();
await tool.execute(
{ content: "Batched message", immediate: false },
mockContext
);
expect(mockChannel.send).not.toHaveBeenCalled();
expect(mockContext.results).toContainEqual(
expect.objectContaining({
type: "message",
content: "Batched message",
metadata: { deliveryMode: "batched" }
})
);
});
test("falls back to batch when channel.send() unavailable", async () => {
mockChannel.send = undefined; // Simulate unsupported channel
const tool = new MessageTool();
const result = await tool.execute(
{ content: "Test" },
mockContext
);
expect(result.metadata.fellBackToBatch).toBe(true);
expect(mockContext.results).toContainEqual(
expect.objectContaining({
type: "message",
metadata: { deliveryMode: "batched" }
})
);
});
});
Manuelle Verifizierungs-Checkliste
- Trace-Logs zeigen Sofortlieferung:
grep "deliverImmediately\|Message delivered" logs/trace.log - Turn-Ende-Batch schließt Sofortnachrichten aus:
grep "sendBatch" logs/trace.log | jq '.messages | length'sollte gleich Gesamt-Tools minus Nachrichten-Tools sein - Zeitliche Trennung sichtbar: Nachrichten-Lieferungszeitstempel unterscheiden sich vom Turn-Ende-Zeitstempel
- Konfigurationsänderung respektiert: Setzen von
defaultDeliveryMode: "batched"kehrt zu altem Verhalten zurück
⚠️ Häufige Fehler
Fehler 1: Kanal-Rate-Limiting
Problem: Schnelle Sofort-Sendungen können Kanal-Rate-Limits auslösen (z.B. Telegram hat ~30 Nachrichten/Sekunde-Limit).
Behebung:
// Implement throttling for immediate delivery
class ThrottledChannelAdapter implements ChannelAdapter {
private sendQueue: Promise = Promise.resolve();
private minIntervalMs = 100; // Max 10 messages/second
async send(params: SendParams): Promise {
this.sendQueue = this.sendQueue.then(async () => {
await this.throttle();
return this.channel.send(params);
});
await this.sendQueue;
}
private async throttle(): Promise {
// Rate limit enforcement
}
}
Fehler 2: Nachrichtenreihenfolge-Verletzungen
Problem: Sofortnachrichten können vor früheren Batch-Nachrichten ankommen und die chronologische Reihenfolge brechen.
Szenario:
Tool sequence:
1. message "Step 1" (immediate) → arrives at T+5s
2. database.query (batched) → queued
3. message "Step 2" (immediate) → arrives at T+10s
4. Turn end → batched results arrive at T+15s
User sees:
[T+5s] Step 1
[T+10s] Step 2
[T+15s] Query result (should have been before Step 2?)
Behebung: Reihenfolge-Erwartungen dokumentieren; Agents sollten konsistente Liefermodi für zusammenhängende Nachrichten verwenden.
Fehler 3: Sitzungszustand-Synchronisation
Problem: Sofortnachrichten können auf Daten verweisen, die noch nicht im Sitzungszustand festgeschrieben wurden.
Beispiel:
// Agent flow that causes inconsistency
1. message "Starting query for user ${session.userId}" // immediate
2. session.set("userId", "123") // queued
3. Turn end → state committed
User sees message with undefined userId (race condition)
Behebung: Sicherstellen, dass Sitzungszustandsaktualisierungen synchron sind; Zustandsschreibvorgänge aufschieben, bis Sofortnachrichten sicher sind.
Fehler 4: Kanal-Adapter-Kompatibilitätsmatrix
Risiko: Nicht alle Kanäle unterstützen Sofort-Sendung; einige unterstützen nur Batch-Antworten.
| Kanal | Sofort-Sendung-Unterstützung | Anmerkungen |
|---|---|---|
| Telegram | ✅ Voll | Unterstützt schnelle Sendungen mit Throttling |
| Slack | ⚠️ Eingeschränkt | Webhooks sind Fire-and-Forget; RTM hat Rate-Limits |
| Discord | ✅ Voll | Bot-Nachrichten können sofort gesendet werden |
| WebSocket | ✅ Voll | Direkt an Client streamen |
| Webhook | ✅ Voll | POST an Callback-URL |
| Console | ✅ Voll | Direktes stdout |
| Teams | ⚠️ Eingeschränkt | Erfordert proaktiven Messaging-Modus |
Aktion: ChannelAdapterCapabilities prüfen vor Verwendung des Sofortmodus.
Fehler 5: Trace/Logging-Komplexität
Problem: Tracing wird komplexer mit verzahnten Sofort- und Batch-Lieferungen.
Behebung: deliveryMode und turnId in alle Log-Einträge für Filterung einschließen:
{
"timestamp": "...",
"level": "debug",
"message": "Message delivered",
"turnId": 42,
"deliveryMode": "immediate",
"sequenceInTurn": 1,
"content": "收到,开始分析..."
}
Fehler 6: Rückwärtskompatibilitäts-Regression
Risiko: Bestehende Agents, die sich auf Batch-Verhalten verlassen, könnten beeinträchtigt werden.
Szenarien:
- Agents, die Nachrichten erstellen und erwarten, dass sie mit Tool-Ergebnissen gruppiert werden
- UI, die genau N Nachrichten bei Turn-Ende erwartet
Behebung:
- Standard auf
immediate: truesetzen, aber die Änderung prominent dokumentieren - Konfigurationsflag
tool.message.defaultDeliveryMode: "batched"für Opt-out bereitstellen - Als Opt-in-Funktion zuerst veröffentlichen, dann Standard in nächster Hauptversion ändern
Fehler 7: Testen in CI/CD
Problem: Zeitbasierte Tests sind flacky in CI-Umgebungen mit variabler Ressourcenallokation.
Behebung:
// Use deterministic test with mocked time
test("delivers immediately based on flag, not timing", async () => {
const tool = new MessageTool();
await tool.execute({ content: "test" }, mockContext);
// Verify send() was called (immediate) or queued (batched)
// NOT: await waitFor(() => sendCalled())
// YES: expect(sendCalled).toBe(true)
});
🔗 Zugehörige Fehler
Zugehörige GitHub-Issues
| Issue | Beziehung | Wesentlicher Unterschied |
|---|---|---|
| #25463 | Tangential | Nachrichtenreihenfolge zwischen message-Tool und session.reply() innerhalb desselben Turns. Dieses Issue behandelt alle message-Tool-Aufrufe, die verzögert werden; #25463 behandelt Reihenfolge zwischen verschiedenen Nachrichtenquellen. |
| #18089 | Tangential | Voll-Duplex-Nachrichtenbehandlungsarchitektur. Bezogen auf Ermöglichtung bidirektionaler Kommunikation, aber auf einer anderen architektonischen Ebene. |
| #31234 | Informativ | “Benutzer sieht leeren Bildschirm während langer Turns” — Symptombeschreibung, die durch diesen Fix behoben würde. |
| #28901 | Kontrast | “Alle Kanal-Ausgaben für Effizienz batchen” — aktuelle Designphilosophie, die dieses Issue in Frage stellt. |
| #34567 | Blockiert | “Streaming Tool-Ergebnisse” — Streaming-Architektur, die einen weiteren Liefermechanismus bereitstellen würde, potenziell redundant mit Sofortlieferung. |
Zugehörige Konfigurationsoptionen
| Konfigurationsschlüssel | Aktuelles Verhalten | Dieser Fix ändert zu |
|---|---|---|
| `tool |