[消息丢失与重复传递修复] - Delivery Reliability — Silent Message Loss & Duplicate Delivery
修复了四个P0严重缺陷,这些缺陷导致消息静默丢失、无法恢复的传递失败,以及在崩溃、终止和服务重启期间发生消息重复传递。
🔍 症状
问题 #29125 — 网关崩溃导致消息静默丢失
网关崩溃(进程终止、SIGKILL、OOM kill)会导致最新用户消息从历史记录中消失,且没有任何错误提示。
$ openclaw status
Service: gateway
Status: RUNNING
Uptime: 4h 23m
Messages processed: 12,847
Messages failed: 0
$ openclaw history --user alice --limit 5
[2024-01-15T14:32:01Z] alice: "Meeting at 3pm confirmed"
[2024-01-15T14:31:58Z] alice: "Wait, which room?"
[2024-01-15T14:31:55Z] alice: "What's the room number?"
[2024-01-15T14:31:50Z] alice: "Where is the meeting?"
# The gateway crashed between 14:31:55 and 14:32:01
# "Meeting at 3pm confirmed" was received but never persisted问题 #29126 — 插件/频道静默投递失败
插件或频道投递失败在内部返回成功,但实际上未能到达目的地。没有错误被传递给用户或运维人员。
$ openclaw plugin list --channel telegram
PLUGIN STATUS DELIVERY LAST CHECK
telegram ACTIVE UNKNOWN 2024-01-15T14:30:00Z
$ openclaw events --plugin telegram --since 1h
TIMESTAMP EVENT DETAILS
2024-01-15T14:29:55Z message.sent msg_id=a1b2c3
2024-01-15T14:30:00Z plugin.error plugin=telegram (NO LOG OUTPUT)
# The telegram bot was kicked from the channel
# Error occurred but was swallowed, message marked as delivered问题 #29127 — 中止触发部分回复重发
在处理器上调用 abort() 不会阻止恢复路径重新发送已部分刷出的部分回复。
# User sends message triggering long response
$ openclaw history --msg-id msg_abc123
msg_id: msg_abc123
user: alice
content: "Generate a 5000-word report"
status: delivered
delivered_at: 2024-01-15T14:35:00Z
# Handler starts processing, sends partial response "Generating report..."
# User aborts the request
$ openclaw abort msg_abc123
abort: OK
# After recovery timeout, partial message is re-delivered
$ openclaw history --msg-id msg_abc123
msg_id: msg_abc123
status: delivered
replies: ["Generating report...", "Generating report...", "Generating report..."]
# ^--- duplicated partial reply问题 #29128 — 重启后已投递消息被重放
干净重启后,投递恢复系统会重放已成功投递的消息,导致重复。
$ openclaw restart --service gateway
[INFO] Starting delivery recovery...
[INFO] Replaying 47 unacknowledged messages
[INFO] Delivered: msg_001
[INFO] Delivered: msg_002
...
[INFO] Delivered: msg_047
$ openclaw history --user alice --since 1h
[14:30:00] msg_001: "Hello" (DUPLICATE - already delivered before restart)
[14:29:55] msg_002: "Are you there?" (DUPLICATE - already delivered before restart)
[14:29:50] msg_003: "Hi bot" (DUPLICATE - already delivered before restart)
# 47 messages all duplicated🧠 根因分析
架构概述
OpenClaw 采用投递队列架构以保证可靠性。消息通过此管道流动:
[User Input] → [Gateway] → [Handler Queue] → [Plugin/Channel] → [External Service]
↓
[Delivery Queue] ← [Persistence Layer]
↓
[Acknowledgement Tracker]按问题分类的根因分析
问题 #29125 — 网关崩溃数据丢失
故障序列:
- 消息到达网关并保存在内存缓冲区中(
gateway/buffer.ts) - 消息被转发到处理器,但确认是在持久化之前发送的
- 崩溃时,持久化写入丢失,因为从未完成
// gateway/handler.ts (BUGGY CODE PATH)
async function handleMessage(msg: Message): Promise {
// Step 1: Forward to handler
await dispatchToHandler(msg);
// Step 2: ACK immediately (BEFORE persistence)
await sendAck(msg.id); // ⚠️ Premature acknowledgement
// Step 3: Async persist (never completes on crash)
persistMessage(msg).catch(console.error); // ⚠️ Fire-and-forget
} 竞态条件发生在第2步发送确认,但第3步异步持久化之后。崩溃发生在第2步和第3步之间会导致数据丢失。
问题 #29126 — 静默投递失败
故障序列:
- 插件将消息投递到外部服务(例如 Telegram API)
- 外部服务返回错误(例如"机器人被踢出")
- 错误被捕获但未传播——仅在 DEBUG 级别记录
- 内部状态标记投递为成功
// plugins/telegram/delivery.ts (BUGGY CODE PATH)
async function deliver(payload: Payload): Promise {
try {
const response = await telegramAPI.sendMessage(payload);
return { success: true, messageId: response.message_id };
} catch (error) {
// Error swallowed — only debug log
logger.debug('Telegram delivery issue', { error }); // ⚠️ Silent failure
return { success: true }; // ⚠️ False success
}
} 调用方将成功返回解释为投递确认,从不重试或告警。
问题 #29127 — 中止重新发送部分回复
故障序列:
- 处理器开始处理并通过流式传输发送部分回复
- 用户调用
abort(),处理器收到取消信号 - 中止处理器设置
delivery_state = 'aborted' - 恢复系统将消息视为未投递(未收到 ACK)
- 恢复计时器触发并重新发送部分回复
// core/delivery-queue.ts (BUGGY CODE PATH)
class DeliveryQueue {
async abort(messageId: string): Promise {
// Set abort flag
this.state.set(messageId, { status: 'aborted' });
// ⚠️ BUG: Does NOT update recovery index
// Recovery still thinks message needs delivery
// Cancel in-flight handler
await this.cancelHandler(messageId);
}
// Recovery timer checks this index
getPendingMessages(): string[] {
return this.state.entries()
.filter(e => e.status !== 'delivered') // ⚠️ 'aborted' passes filter
.map(e => e.messageId);
}
} 恢复系统使用简单的 status !== ‘delivered’ 过滤器,将 ‘aborted’ 消息也包含在待处理列表中。
问题 #29128 — 重启后重放
故障序列:
- 消息在内存中投递并确认
- 启动干净关闭
- 关闭处理器清除持久化状态(优化以避免重放)
- 重启时,持久化层报告没有未确认的消息
- 恢复系统从最后已知良好状态重放所有消息
// core/graceful-shutdown.ts (BUGGY CODE PATH)
async function shutdown(): Promise {
// Stop accepting new messages
gateway.stop();
// Wait for in-flight deliveries
await deliveryQueue.drain();
// ⚠️ BUG: Clear acknowledged state before persist
// This is an "optimization" to reduce restart time
acknowledgedMessages.clear(); // ⚠️ Data loss
// Persist remaining unacknowledged only
await persistence.flush();
} 这个"优化"无意中清除了已投递的消息,导致恢复系统认为它们从未被投递。
🛠️ 逐步修复
修复 #29125 — 网关崩溃持久化
修复前:
// gateway/handler.ts
async function handleMessage(msg: Message): Promise {
await dispatchToHandler(msg);
await sendAck(msg.id); // Premature ACK
persistMessage(msg).catch(console.error); // Async, unreliable
} 修复后:
// gateway/handler.ts
async function handleMessage(msg: Message): Promise {
// Step 1: Persist BEFORE acknowledgement
await persistMessage(msg);
// Step 2: Forward to handler
await dispatchToHandler(msg);
// Step 3: ACK only after persistence confirmed
await sendAck(msg.id);
} 多阶段 CLI 修复:
# Apply the persistence-first patch
$ openclaw patch apply --issue 29125 --component gateway
# Verify the patch
$ openclaw patch verify --issue 29125
[✓] Patched: gateway/handler.ts:persist-before-ack
[✓] Config: delivery.persist_before_ack=true
# Restart gateway to activate
$ openclaw restart --service gateway --mode=rolling修复 #29126 — 静默投递失败
修复前:
// plugins/telegram/delivery.ts
async function deliver(payload: Payload): Promise {
try {
const response = await telegramAPI.sendMessage(payload);
return { success: true, messageId: response.message_id };
} catch (error) {
logger.debug('Telegram delivery issue', { error });
return { success: true }; // False success
}
} 修复后:
// plugins/telegram/delivery.ts
async function deliver(payload: Payload): Promise {
try {
const response = await telegramAPI.sendMessage(payload);
return { success: true, messageId: response.message_id };
} catch (error) {
// Classify error severity
const isRetryable = isRetryableError(error);
// Log at appropriate level
if (isRetryable) {
logger.warn('Telegram delivery failed (retryable)', { error, payload });
} else {
logger.error('Telegram delivery failed (permanent)', { error, payload });
}
// Return actual failure status
return {
success: false,
error: error.message,
retryable: isRetryable
};
}
}
// Helper to classify Telegram errors
function isRetryableError(error: TelegramError): boolean {
const RETRYABLE_CODES = [429, 500, 502, 503, 504];
const NON_RETRYABLE_CODES = [400, 401, 403, 404, 403]; // bot kicked
if (RETRYABLE_CODES.includes(error.code)) return true;
if (error.message.includes('bot was blocked')) return false;
if (error.message.includes('chat not found')) return false;
if (NON_RETRYABLE_CODES.includes(error.code)) return false;
return true; // Default to retryable
} 配置更新:
# Update openclaw.yaml
$ openclaw config set delivery.strict_failure_mode true
$ openclaw config set delivery.failure_notification_threshold 3
# Verify delivery monitoring
$ openclaw plugin config telegram --get failure_modes
{
"strict_failure_mode": true,
"notify_on_failure": true,
"failure_threshold": 3
}修复 #29127 — 防止中止重发
修复前:
// core/delivery-queue.ts
class DeliveryQueue {
async abort(messageId: string): Promise {
this.state.set(messageId, { status: 'aborted' });
await this.cancelHandler(messageId);
// ⚠️ Missing: recovery index update
}
} 修复后:
// core/delivery-queue.ts
class DeliveryQueue {
async abort(messageId: string): Promise {
const state = this.state.get(messageId);
// Check if partial reply was already sent
if (state?.partialReplySent) {
// Mark as delivered to prevent recovery re-delivery
await this.markDelivered(messageId);
// Emit abort event for handler cleanup
await this.emitAbortEvent(messageId, {
reason: 'user_abort',
partialDelivered: true
});
} else {
// No partial reply — safe to mark as aborted
this.state.set(messageId, {
status: 'aborted',
abortedAt: Date.now()
});
// Update recovery index to exclude this message
this.recoveryIndex.remove(messageId);
await this.cancelHandler(messageId);
}
}
// Recovery system now checks recovery index, not status
getPendingMessages(): string[] {
return this.recoveryIndex.getAll();
}
} CLI 修复:
# Apply abort handling patch
$ openclaw patch apply --issue 29127 --component delivery-queue
# Update recovery configuration
$ openclaw config set recovery.use_explicit_index true
$ openclaw config set recovery.abort_behavior preserve
# Clear existing corrupted state
$ openclaw recovery reset-state --force
# Verify fix
$ openclaw recovery status
Recovery Index: 247 messages tracked
Aborted Messages: 12 (properly excluded)修复 #29128 — 防止重启后重放
修复前:
// core/graceful-shutdown.ts
async function shutdown(): Promise {
gateway.stop();
await deliveryQueue.drain();
// ⚠️ Clear acknowledged to speed up restart
acknowledgedMessages.clear();
await persistence.flush();
} 修复后:
// core/graceful-shutdown.ts
async function shutdown(): Promise {
gateway.stop();
// Wait for all deliveries to complete AND persist
await deliveryQueue.drain({
requirePersisted: true // Ensure all ACKs are persisted
});
// ⚠️ DO NOT clear acknowledged messages
// Preserve full delivery state for accurate recovery
// acknowledgedMessages.clear(); // REMOVED
// Ensure persistence includes all acknowledged messages
await persistence.flush({
includeAcknowledged: true // New: persist full state
});
} 恢复索引修复:
// core/persistence.ts
async function persistFullState(): Promise {
const state = {
version: 2,
timestamp: Date.now(),
acknowledged: Array.from(acknowledgedMessages.entries()),
pending: Array.from(pendingMessages.entries()),
aborted: Array.from(abortedMessages.entries())
};
// Atomic write to prevent corruption
await atomicWrite(STORAGE_PATH, JSON.stringify(state));
} CLI 修复:
# Apply shutdown persistence patch
$ openclaw patch apply --issue 29128 --component graceful-shutdown
# Migrate existing state to new format
$ openclaw maintenance migrate-state --format=v2
# Verify state integrity
$ openclaw state verify
State Version: 2
Acknowledged Messages: 1,247
Pending Messages: 0
Aborted Messages: 12
State Hash: a1b2c3d4e5f6...
$ openclaw restart --service gateway
[INFO] Starting delivery recovery...
[INFO] Restored state from disk (v2 format)
[INFO] Replaying 0 messages (all already delivered)🧪 验证
测试 #29125 — 崩溃持久化
# 1. Start a message-heavy session
$ openclaw load-test --users 10 --duration 30s --rate 5
# 2. Simulate crash during active delivery
$ openclaw inject-fault --type=crash --service=gateway --delay=5s
# 3. Verify no message loss after restart
$ openclaw verify --check=message-integrity
[✓] Message count: 150 sent, 150 persisted
[✓] Sequence integrity: No gaps detected
[✓] Last message verified: msg_150预期输出:
Test: Gateway Crash Persistence
Result: PASS
Messages Before Crash: 150
Messages After Recovery: 150
Lost Messages: 0
Persistence Rate: 100%测试 #29126 — 投递失败传播
# 1. Trigger a permanent failure (bot kicked)
$ openclaw mock telegram --error="bot was kicked" --channel=test_channel
# 2. Send message that should fail
$ openclaw send --user alice --message "test" --channel telegram
# 3. Verify failure is reported
$ openclaw events --type=delivery_failure --since 1m
TIMESTAMP LEVEL EVENT DETAILS
2024-01-15T14:30:00Z WARN delivery.failed plugin=telegram
error="bot was kicked"
retryable=false
message=msg_test_001预期输出:
Test: Delivery Failure Propagation
Result: PASS
Failure Detected: YES
Error Logged: YES (WARN level)
User Notified: YES
Retryable: NO
Message Status: failed_permanent测试 #29127 — 防止中止重发
# 1. Start long-running handler
$ openclaw send --user alice --message "Generate 10000 words"
# 2. Send abort while processing
$ sleep 2 && openclaw abort --msg-id= --reason=timeout
# 3. Wait for recovery timeout
$ sleep 60
# 4. Check for duplicate messages
$ openclaw history --user alice --limit 5
[✓] No duplicate messages detected
[✓] Aborted message not re-delivered 预期输出:
Test: Abort Re-Delivery Prevention
Result: PASS
Partial Reply Sent: YES
Abort Processed: YES
Re-delivery Attempted: NO
Duplicate Messages: 0
Recovery Index: Correctly excludes aborted message测试 #29128 — 防止重放
# 1. Send and deliver several messages
$ for i in {1..50}; do openclaw send --user alice --message "Msg $i"; done
# 2. Verify all delivered
$ openclaw verify --delivered --user alice
Delivered Count: 50
# 3. Restart service
$ openclaw restart --service gateway
# 4. Check for duplicates
$ openclaw history --user alice --since 1m | grep -c "Msg"
50预期输出:
Test: Restart Replay Prevention
Result: PASS
Messages Before Restart: 50
Messages After Restart: 50
Duplicate Count: 0
State Restored: YES (v2 format)
Recovery Replay: 0 messages完整集成测试
# Run complete delivery reliability suite
$ openclaw test suite --name=delivery-reliability
Tests:
[✓] #29125 - Gateway crash persistence
[✓] #29126 - Delivery failure propagation
[✓] #29127 - Abort re-delivery prevention
[✓] #29128 - Restart replay prevention
[✓] Concurrent delivery stress test
[✓] Network partition recovery
[✓] Partial failure cascade
Result: 7/7 PASSED
Coverage: 100%⚠️ 常见陷阱
环境特定陷阱
Docker/Kubernetes 部署
- 信号处理:Docker stop 发送 SIGTERM,但容器可能在 10 秒超时后被 SIGKILL 杀死。确保
grace_period_seconds超过drain_timeout。# docker-compose.yml services: gateway: stop_grace_period: 30s # Must exceed drain_timeout command: openclaw gateway --drain-timeout=25s - 卷权限:如果卷挂载了不同的 UID,持久化状态可能无法读取。
# Verify permissions $ docker exec openclaw-gateway ls -la /data/state.json -rw-r--r-- 1 openclaw openclaw 4096 Jan 15 14:30 /data/state.json - 内存压力:OOM killer 在持久化完成前就杀死网关。在设置内存限制时留出余量。
resources: limits: memory: 512Mi # Must exceed expected peak + state size reservations: memory: 256Mi
macOS 开发环境
- 文件锁定:macOS APFS 可能无法正确支持原子重命名。使用显式 fsync。
# Check if atomic writes work $ openclaw debug verify-atomic-write [✓] Atomic write verified on /tmp (APFS supports it) [✓] Atomic write verified on /var/tmp (APFS supports it) [!] Warning: /Users/... uses non-atomic filesystem - 资源限制:默认 ulimit 限制较多。为高吞吐测试增加限制。
# Check current limits $ ulimit -n 256 # Too low for productionIncrease for session
$ ulimit -n 10240
Windows (WSL2)
- 文件监视器:WSL2 文件监视器存在已知性能问题。禁用
fs.inotify.max_user_watches模拟。# In /etc/sysctl.conf fs.inotify.max_user_watches=524288 fs.inotify.max_user_instances=512 - 行尾符:配置文件中的 CRLF 可能损坏状态 JSON。挂载时规范化。
# Mount with consistent line endings mount --bind -o ro /mnt/c/config/openclaw.yaml /data/config.yaml
配置错误
仍然启用了提前确认
# ⚠️ WRONG: Still using old behavior
$ openclaw config get delivery.persist_before_ack
false # Bug not fixed
# ✓ CORRECT: Should be true
$ openclaw config set delivery.persist_before_ack true
$ openclaw restart恢复索引未迁移
# ⚠️ WRONG: Old index format still in use
$ openclaw recovery status
Index Format: legacy # Bug not fixed
# ✓ CORRECT: Migrate to new format
$ openclaw maintenance migrate-state --format=v2
$ openclaw restart失败模式配置不一致
# ⚠️ WRONG: Different failure modes across plugins
$ openclaw config get --plugin '*' delivery.strict_failure_mode
telegram: false
slack: true # Inconsistent!
discord: false
# ✓ CORRECT: Uniform configuration
$ openclaw config set --plugin '*' delivery.strict_failure_mode true运行时边缘情况
- 持久化期间网络分区:如果在写入过程中网络失败,状态文件可能损坏。启用带备份的原子写入。
# Enable backup on corruption $ openclaw config set persistence.backup_on_corruption true $ openclaw config set persistence.backup_count 3 - 时钟偏移:NTP 校正后,用于恢复排序的时间戳可能冲突。使用逻辑时钟进行排序。
# Check for clock skew $ openclaw debug clock-skew Clock Offset: +0.003s (acceptable) Warning: 2 messages have timestamp conflicts - 部分状态迁移:如果迁移被中断,状态可能处于混合格式。迁移后验证。
# Force verification $ openclaw state verify --full [✓] Format: v2 [✓] Integrity: VALID [✓] Entries: 1,247 acknowledged, 0 pending, 12 aborted
🔗 相关错误
直接相关问题
| Issue | Title | Severity | Relationship |
|---|---|---|---|
| #29125 | Gateway crash silently drops user message from history | P0 | Primary issue — addressed in this guide |
| #29126 | Plugin/channel delivery failures are silent and unrecoverable | P0 | Primary issue — addressed in this guide |
| #29127 | Abort does not prevent recovery-path re-delivery of partial reply | P0 | Primary issue — addressed in this guide |
| #29128 | Delivery-recovery replays already-delivered messages after restart | P0 | Primary issue — addressed in this guide |
| #29085 | fix(delivery-queue): Telegram 'bot was kicked' | P2 | Partial fix — precursor to #29126 |
历史背景
- #28456 — 网络超时导致重复消息:与 #29127 类似,但特指网络引起的而非中止引起的。
- #27901 — 非干净关闭导致状态文件损坏:与 #29128 根因重叠——持久化层设计缺陷。
- #27512 — 处理器 ACK 超时过于激进:促成了 #29125——超时配置允许提前 ACK。
- #27189 — 插件错误未传播到父级:#29126 的架构前体——插件系统中的错误处理隔离。
相关错误代码
| Error Code | Description | Connected Issues |
|---|---|---|
DLV_001 | Persistence write failed | #29125 |
DLV_002 | Premature acknowledgement | #29125 |
DLV_003 | Silent delivery failure | #29126 |
DLV_004 | Recovery re-delivered | #29127, #29128 |
DLV_005 | State mismatch on startup | #29128 |
PLG_001 | Plugin error swallowed | #29126 |
SHT_001 | Graceful shutdown data loss | #29128 |
相关配置参数
# Parameters introduced/fixed by this guide
delivery.persist_before_ack # Default: true (was: false)
delivery.strict_failure_mode # Default: true (was: false)
delivery.failure_notification_threshold # Default: 3 (new)
recovery.use_explicit_index # Default: true (was: false)
recovery.abort_behavior # Default: preserve (was: re-deliver)
persistence.include_acknowledged # Default: true (was: false)
persistence.backup_on_corruption # Default: true (new)