Progressive Memory Flush Before Compaction
Implement multi-stage memory flush mechanism with progressive thresholds at 80%, 90%, and 95% capacity to prevent context loss during compaction.
π Symptoms
Current Behavior Manifestations
The existing compaction.memoryFlush configuration exhibits the following problematic patterns:
1. Single-Point-of-Failure Architecture
javascript // Current: Single threshold evaluation const isFull = contextTokens >= softThresholdTokens; if (isFull && !hasFlushed) { triggerMemoryFlush(); hasFlushed = true; }
When softThresholdTokens is set to 0.9 (90% of max context), the system only reacts at that single point. If the flush operation fails or the model continues generating tokens rapidly, no subsequent warnings occur.
2. Silent Context Truncation
Users observe unexpected context loss without intermediate notification:
Session started: 2,048 tokens loaded … conversation continues … [No warnings until 90% threshold] … compaction triggered unexpectedly … Context truncated: Last 340 tokens removed
3. Configuration Inflexibility
The current schema does not support granular threshold definition:
yaml
Current configuration (limited)
compaction: memoryFlush: true softThresholdTokens: 0.9 # Single float value only
Desired configuration (not yet supported)
compaction: memoryFlush: enabled: true stages: - threshold: 0.8 action: “reminder” - threshold: 0.9 action: “urgent_flush” - threshold: 0.95 action: “forced_summary”
4. Missed Recovery Opportunities
When flush operations fail due to I/O constraints or rate limiting, no retry mechanism or escalation path exists.
pre>
[ERROR] Memory flush failed: Disk I/O timeout
[ERROR] Compaction proceeding without context preservation
[CRITICAL] 340 tokens of session context lost
π§ Root Cause
Architectural Analysis
1. Binary Threshold Model
The current implementation employs a binary threshold model that treats memory capacity as an on/off state:
typescript // CompactionManager.ts - Current flawed logic export class CompactionManager { private hasTriggeredFlush = false;
evaluateMemoryPressure(currentTokens: number, maxTokens: number): void { const ratio = currentTokens / maxTokens;
// Binary decision: either below threshold OR flush already triggered
if (ratio >= this.config.softThresholdTokens && !this.hasTriggeredFlush) {
this.triggerFlush();
this.hasTriggeredFlush = true;
}
} }
This architecture violates the progressive resource management principle: systems should scale responses proportionally to resource depletion.
2. Missing Escalation Path
The absence of a stage-based escalation model means:
| Stage | Capacity | Current Behavior | Required Behavior |
|---|---|---|---|
| Stage 1 | 80% | No action | Gentle reminder, log suggestion |
| Stage 2 | 90% | Flush trigger (if not already) | Urgent flush request with retry |
| Stage 3 | 95% | Compaction proceeds | Forced summary, emergency persistence |
3. No State Machine for Flush States
Current implementation lacks a proper state machine to track flush attempts and outcomes:
typescript // Missing: FlushState enum enum FlushState { IDLE = ‘idle’, REMINDER_SENT = ‘reminder_sent’, URGENT_FLUSH_PENDING = ‘urgent_flush_pending’, FLUSH_IN_PROGRESS = ‘flush_in_progress’, FLUSH_FAILED = ‘flush_failed’, ESCALATING = ’escalating’, SUMMARY_FORCED = ‘summary_forced’ }
4. Configuration Schema Limitation
The current schema at config/compaction.ts does not accommodate multi-stage definitions:
typescript // Current: scalar value softThresholdTokens: z.number().min(0).max(1)
// Required: array of stage definitions stages: z.array(z.object({ threshold: z.number().min(0).max(1), action: z.enum([‘reminder’, ‘urgent_flush’, ‘forced_summary’]), retryAttempts: z.number().optional(), cooldownMs: z.number().optional() }))
5. Race Condition in Flush Tracking
The hasFlushed boolean flag is prone to race conditions in async contexts:
typescript // Problematic: Non-atomic read-modify-write if (ratio >= threshold && !hasFlushed) { hasFlushed = true; // Another thread may read hasFlushed as false here await triggerFlush(); // If this throws, hasFlushed is stuck as true }
π οΈ Step-by-Step Fix
Implementation Guide
Phase 1: Schema Update
File: packages/core/src/config/compaction.ts
typescript import { z } from ‘zod’;
// Define individual stage schema const FlushStageSchema = z.object({ threshold: z.number() .min(0) .max(1) .describe(‘Memory capacity ratio (0-1) triggering this stage’), action: z.enum([‘reminder’, ‘urgent_flush’, ‘forced_summary’]) .describe(‘Action to execute when threshold is crossed’), retryAttempts: z.number().min(0).max(5).default(3) .describe(‘Maximum retry attempts for flush operations’), cooldownMs: z.number().min(1000).max(60000).default(5000) .describe(‘Minimum interval between stage re-evaluations’), message: z.string().optional() .describe(‘Custom message for reminder/notification stages’) });
// Updated schema export const CompactionConfigSchema = z.object({ enabled: z.boolean().default(true), memoryFlush: z.union([ z.boolean(), z.object({ enabled: z.boolean().default(true), stages: z.array(FlushStageSchema).default([ { threshold: 0.8, action: ‘reminder’, cooldownMs: 10000 }, { threshold: 0.9, action: ‘urgent_flush’, retryAttempts: 3, cooldownMs: 5000 }, { threshold: 0.95, action: ‘forced_summary’, cooldownMs: 2000 } ]) }) ]).default(true), softThresholdTokens: z.number().min(0).max(1).optional() .describe(‘Deprecated: Use memoryFlush.stages instead’), // … existing fields });
export type FlushStage = z.infer
Phase 2: State Machine Implementation
File: packages/core/src/compaction/ProgressiveFlushManager.ts
typescript import { EventEmitter } from ’events’; import { FlushStage, FlushAction } from ‘../config/compaction’;
export enum FlushState { IDLE = ‘idle’, REMINDER_SENT = ‘reminder_sent’, URGENT_FLUSH_PENDING = ‘urgent_flush_pending’, URGENT_FLUSH_IN_PROGRESS = ‘urgent_flush_in_progress’, SUMMARY_FORCED = ‘summary_forced’, COOLDOWN = ‘cooldown’ }
interface StageTracking { stage: FlushStage; triggeredAt: number | null; attemptCount: number; lastAttemptAt: number | null; }
export class ProgressiveFlushManager extends EventEmitter { private currentState: FlushState = FlushState.IDLE; private stages: FlushStage[]; private stageTracking: Map<number, StageTracking> = new Map(); private cooldownUntil: number = 0;
constructor(stages: FlushStage[]) { super(); // Sort stages by threshold descending for efficient evaluation this.stages = […stages].sort((a, b) => b.threshold - a.threshold);
// Initialize tracking for each stage
this.stages.forEach((stage, index) => {
this.stageTracking.set(index, {
stage,
triggeredAt: null,
attemptCount: 0,
lastAttemptAt: null
});
});
}
public evaluate(currentRatio: number): FlushAction | null { // Check cooldown if (Date.now() < this.cooldownUntil) { return null; }
// Find the highest-priority triggered stage
for (const [index, tracking] of this.stageTracking.entries()) {
if (currentRatio >= tracking.stage.threshold) {
return this.handleStageTriggered(index, tracking);
}
}
// Reset if below all thresholds
this.resetToIdle();
return null;
}
private handleStageTriggered(index: number, tracking: StageTracking): FlushAction | null { const stage = tracking.stage; const now = Date.now();
// Skip if already recently triggered (cooldown)
if (tracking.lastAttemptAt &&
now - tracking.lastAttemptAt < stage.cooldownMs) {
return null;
}
// Update tracking
tracking.lastAttemptAt = now;
tracking.attemptCount++;
switch (stage.action) {
case 'reminder':
this.currentState = FlushState.REMINDER_SENT;
tracking.triggeredAt = now;
this.emit('reminder', {
threshold: stage.threshold,
message: stage.message
});
return 'reminder';
case 'urgent_flush':
if (this.currentState === FlushState.URGENT_FLUSH_PENDING ||
tracking.attemptCount >= stage.retryAttempts) {
this.currentState = FlushState.URGENT_FLUSH_IN_PROGRESS;
this.emit('urgentFlushRequired', {
threshold: stage.threshold,
attempt: tracking.attemptCount,
maxAttempts: stage.retryAttempts
});
return 'urgent_flush';
}
this.currentState = FlushState.URGENT_FLUSH_PENDING;
return null;
case 'forced_summary':
this.currentState = FlushState.SUMMARY_FORCED;
tracking.triggeredAt = now;
this.emit('forcedSummaryRequired', {
threshold: stage.threshold,
context: this.getContextForSummary()
});
return 'forced_summary';
default:
return null;
}
}
public markFlushSuccess(): void { this.emit(‘flushCompleted’, { state: this.currentState, timestamp: Date.now() }); this.resetToIdle(); }
public markFlushFailure(error: Error): void { this.emit(‘flushFailed’, { state: this.currentState, error: error.message, timestamp: Date.now() }); // Set cooldown before retry this.cooldownUntil = Date.now() + 2000; }
private resetToIdle(): void { this.currentState = FlushState.IDLE; this.stageTracking.forEach(tracking => { tracking.triggeredAt = null; tracking.attemptCount = 0; }); }
private getContextForSummary(): object { return { activeStages: Array.from(this.stageTracking.entries()) .filter(([_, t]) => t.triggeredAt !== null) .map(([idx, t]) => ({ index: idx, action: t.stage.action, triggeredAt: t.triggeredAt })) }; }
public getState(): { state: FlushState; stages: StageTracking[] } { return { state: this.currentState, stages: Array.from(this.stageTracking.values()) }; } }
Phase 3: Integration with CompactionManager
File: packages/core/src/compaction/CompactionManager.ts
typescript import { ProgressiveFlushManager } from ‘./ProgressiveFlushManager’; import { CompactionConfig, FlushStage } from ‘../config/compaction’;
export class CompactionManager { private flushManager: ProgressiveFlushManager; private legacyMode: boolean;
constructor(config: CompactionConfig) { // Determine if using legacy single-threshold or new multi-stage config if (typeof config.memoryFlush === ‘object’ && config.memoryFlush.stages) { this.flushManager = new ProgressiveFlushManager(config.memoryFlush.stages); this.legacyMode = false; this.setupEventHandlers(); } else { // Legacy fallback this.legacyMode = true; } }
private setupEventHandlers(): void { this.flushManager.on(‘reminder’, (data) => { this.log.info(‘Memory reminder’, data); this.notifyUser(data.message || ‘Consider saving important context’); });
this.flushManager.on('urgentFlushRequired', async (data) => {
this.log.warn('Urgent memory flush required', data);
await this.executeFlushWithRetry(data);
});
this.flushManager.on('forcedSummaryRequired', async (data) => {
this.log.error('Forcing session summary generation', data);
await this.initiateForcedSummary();
});
this.flushManager.on('flushCompleted', (data) => {
this.log.info('Memory flush completed successfully', data);
});
this.flushManager.on('flushFailed', (data) => {
this.log.error('Memory flush failed', data);
});
}
public evaluateMemoryPressure(currentTokens: number, maxTokens: number): void { const ratio = currentTokens / maxTokens;
if (this.legacyMode) {
// Original single-threshold logic
this.evaluateLegacy(ratio);
} else {
// New progressive flush evaluation
const action = this.flushManager.evaluate(ratio);
if (action === 'forced_summary') {
// Compaction should proceed after forced summary
this.initiateCompaction();
}
}
}
private async executeFlushWithRetry(data: {
attempt: number;
maxAttempts: number
}): PromiseFlush attempt ${attempt} failed, { error });
if (attempt === data.maxAttempts) {
this.flushManager.markFlushFailure(error as Error);
return false;
}
// Exponential backoff
await this.delay(Math.pow(2, attempt) * 1000);
}
}
return false;
}
private async performMemoryFlush(): Promise
private async initiateForcedSummary(): Promise
private delay(ms: number): Promise
Phase 4: Configuration Migration
File: packages/core/src/config/migration.ts
typescript import { CompactionConfig } from ‘./compaction’;
export function migrateCompactionConfig( legacy: { memoryFlush: boolean; softThresholdTokens?: number }, target: CompactionConfig ): CompactionConfig { // If legacy single threshold is present, map to new structure if (legacy.softThresholdTokens !== undefined && !target.memoryFlush) { const threshold = legacy.softThresholdTokens;
// Map legacy behavior to Stage 2 (urgent_flush) of new system
return {
...target,
memoryFlush: {
enabled: legacy.memoryFlush,
stages: [
{
threshold: 0.8,
action: 'reminder',
cooldownMs: 30000
},
{
threshold: threshold,
action: 'urgent_flush',
retryAttempts: 1,
cooldownMs: 5000
},
{
threshold: 0.98,
action: 'forced_summary',
cooldownMs: 2000
}
]
}
};
}
return target; }
π§ͺ Verification
Unit Test Suite
File: packages/core/src/compaction/__tests__/ProgressiveFlushManager.test.ts
typescript import { ProgressiveFlushManager } from ‘../ProgressiveFlushManager’; import { FlushStage } from ‘../../config/compaction’;
describe(‘ProgressiveFlushManager’, () => { const createTestStages = (): FlushStage[] => [ { threshold: 0.8, action: ‘reminder’, cooldownMs: 100 }, { threshold: 0.9, action: ‘urgent_flush’, retryAttempts: 2, cooldownMs: 50 }, { threshold: 0.95, action: ‘forced_summary’, cooldownMs: 100 } ];
let manager: ProgressiveFlushManager;
beforeEach(() => { manager = new ProgressiveFlushManager(createTestStages()); });
describe(‘Stage Progression’, () => { it(‘should trigger reminder at 80% threshold’, () => { const action = manager.evaluate(0.82);
expect(action).toBe('reminder');
});
it('should trigger urgent flush at 90% threshold', (done) => {
manager.on('urgentFlushRequired', (data) => {
expect(data.threshold).toBe(0.9);
done();
});
manager.evaluate(0.92);
});
it('should trigger forced summary at 95% threshold', (done) => {
manager.on('forcedSummaryRequired', (data) => {
expect(data.threshold).toBe(0.95);
done();
});
manager.evaluate(0.96);
});
it('should prioritize highest threshold when multiple are crossed', (done) => {
manager.on('forcedSummaryRequired', () => {
done(); // Should hit 95% first, not 80% or 90%
});
manager.evaluate(0.98);
});
});
describe(‘Cooldown Behavior’, () => { it(‘should not re-trigger same stage within cooldown period’, () => { manager.evaluate(0.85); const secondResult = manager.evaluate(0.85);
expect(secondResult).toBeNull();
});
it('should allow re-trigger after cooldown expires', async () => {
const action1 = manager.evaluate(0.82);
expect(action1).toBe('reminder');
// Wait for cooldown (100ms)
await new Promise(resolve => setTimeout(resolve, 150));
const action2 = manager.evaluate(0.82);
expect(action2).toBe('reminder');
});
});
describe(‘Retry Logic’, () => { it(‘should retry urgent flush up to configured attempts’, (done) => { let callCount = 0;
manager.on('urgentFlushRequired', () => {
callCount++;
if (callCount === 1) {
// Simulate failure by not calling markFlushSuccess
} else if (callCount >= 2) {
done();
}
});
manager.evaluate(0.92);
// After cooldown (50ms), should retry
setTimeout(() => manager.evaluate(0.92), 60);
});
});
describe(‘Reset Behavior’, () => { it(‘should reset all stages when dropping below threshold’, () => { manager.evaluate(0.82); // Trigger reminder const result = manager.evaluate(0.5); // Drop below 80%
expect(result).toBeNull();
// Should trigger again from scratch
expect(manager.evaluate(0.82)).toBe('reminder');
});
}); });
Integration Test
File: packages/core/src/compaction/__tests__/CompactionManager.integration.test.ts
typescript describe(‘CompactionManager Progressive Flush Integration’, () => { it(‘should emit events in correct sequence during memory pressure’, async () => { const events: string[] = []; const config: CompactionConfig = { enabled: true, memoryFlush: { enabled: true, stages: [ { threshold: 0.8, action: ‘reminder’, cooldownMs: 50 }, { threshold: 0.9, action: ‘urgent_flush’, retryAttempts: 1, cooldownMs: 50 } ] } };
const manager = new CompactionManager(config);
manager.on('reminder', () => events.push('reminder'));
manager.on('urgentFlushRequired', () => events.push('urgent'));
// Simulate progressive memory increase
manager.evaluateMemoryPressure(1840, 2000); // 92%
await new Promise(resolve => setTimeout(resolve, 60));
manager.evaluateMemoryPressure(1950, 2000); // 97.5%
expect(events).toEqual(['reminder', 'urgent']);
}); });
Manual Verification Commands
bash
Test the configuration schema validation
cd packages/core npm run test – –testPathPattern=“ProgressiveFlushManager”
Verify configuration migration
npm run config:migrate – –input=legacy-config.yaml –output=verified-config.yaml
Run integration tests
npm run test – –testPathPattern=“CompactionManager.*integration”
Verify event emission sequence
DEBUG=compaction:* npm run start:session
Expected Output Verification
pre>
When running unit tests, expect:
ProgressiveFlushManager Stage Progression β should trigger reminder at 80% threshold β should trigger urgent flush at 90% threshold β should trigger forced summary at 95% threshold Cooldown Behavior β should not re-trigger same stage within cooldown period Reset Behavior β should reset all stages when dropping below threshold
5 passing, 0 failing
When running integration tests:
CompactionManager Progressive Flush Integration β should emit events in correct sequence during memory pressure
β οΈ Common Pitfalls
Implementation Hazards
1. Stage Threshold Ordering
- Hazard: Stages must be sorted by threshold descending. Out-of-order stages cause premature escalation.
- Symptom: Stage 1 (80%) triggers at 95% capacity instead of Stage 3 (95%).
- Mitigation: Always sort stages:
stages.sort((a, b) => b.threshold - a.threshold)
2. Cooldown Timing on Rapid Token Generation
- Hazard: If token generation outpaces cooldown timers, stages may not escalate properly.
- Example: 200 tokens/second with 5-second cooldown creates 1000-token blind spot.
- Mitigation: Calculate minimum cooldown based on expected token rate:
cooldownMs >= (thresholdDelta * maxTokens) / tokensPerSecond
3. Backward Compatibility with Legacy Config
- Hazard: Existing configurations using
softThresholdTokens: 0.9may not migrate correctly. - Symptom: Users see double flush triggers or missing Stage 1 reminders.
- Mitigation: Implement explicit migration path documented in
docs/migration/v2.md
4. Async/Await Race Conditions in Event Handlers
- Hazard: Event handlers that perform async operations can trigger duplicate state transitions.
- Example:
urgentFlushRequiredhandler awaitsperformMemoryFlush(), but evaluation loop continues before completion. - Mitigation: Use mutex/lock pattern for flush operations:
typescript private flushLock = false;
private async executeFlushWithRetry(data: FlushData): Promise
5. Memory Leak in StageTracking Map
- Hazard:
attemptCountincrements indefinitely if not reset properly. - Symptom: After extended sessions, retry logic becomes non-functional.
- Mitigation: Reset tracking state in
resetToIdle()and ensure it's called on state transitions.
6. Platform-Specific Timing Granularity
- Hazard:
setTimeoutminimums vary: Node.js (~1ms), browsers (~4ms), Docker containers (~10ms). - Symptom: Cooldown tests pass locally but fail in CI containers.
- Mitigation: Use platform-aware minimum:
Math.max(cooldownMs, platformMinDelay)
7. Configuration Schema Validation Edge Cases
- Hazard: Overlapping thresholds or thresholds > 1.0 pass Zod validation but cause undefined behavior.
- Example:
[{threshold: 0.9}, {threshold: 0.9, action: 'forced_summary'}] - Mitigation: Add custom refinement:
typescript const FlushStageSchema = z.object({…}).refine( (stages) => { const thresholds = stages.map(s => s.threshold); return new Set(thresholds).size === thresholds.length; // Unique }, { message: ‘Stage thresholds must be unique’ } );
Environment-Specific Considerations
| Environment | Issue | Workaround |
|---|---|---|
| Docker | Container clock drift | Use relative timing, not absolute |
| Serverless (Lambda) | Execution time limits | Reduce retry attempts, add timeout guards |
| Mobile (React Native) | Background state suspension | Persist flush state to AsyncStorage |
| CI/CD | Variable timer resolution | Mock timers in tests, use fake timers |
π Related Errors
Related Configuration Options
compaction.memoryFlushβ Current single-threshold flush togglecompaction.softThresholdTokensβ Deprecated scalar threshold (superseded by stages)compaction.maxContextTokensβ Maximum context window sizecompaction.compressionRatioβ Target ratio for summary compression
Related Documentation
docs/reference/session-management-compaction.mdβ Primary compaction referencedocs/guides/memory-management.mdβ Memory lifecycle guidedocs/api/CompactionManager.mdβ API documentation
Related Historical Issues
| Issue | Description | Resolution |
|---|---|---|
| #423 | Single flush at 90% loses context when flush fails | This feature request |
| #389 | No user notification before compaction | Addressed by Stage 1 reminder |
| #412 | Retry mechanism for failed flushes missing | Addressed by Stage 2 retry logic |
| #456 | Memory pressure calculation inaccurate under high load | Refactored in PR #489 |
Associated Error Codes
E_FLUSH_FAILEDβ Memory flush operation failedE_FLUSH_TIMEOUTβ Flush exceeded configured timeoutE_COMPACTION_LOST_CONTEXTβ Context truncated during compactionE_SUMMARY_GENERATION_FAILEDβ Forced summary could not be created
Event Taxonomy
typescript // New events emitted by ProgressiveFlushManager type ProgressiveFlushEvent = | { type: ‘reminder’; threshold: number; message?: string } | { type: ‘urgentFlushRequired’; threshold: number; attempt: number; maxAttempts: number } | { type: ‘forcedSummaryRequired’; threshold: number; context: object } | { type: ‘flushCompleted’; state: FlushState; timestamp: number } | { type: ‘flushFailed’; state: FlushState; error: string; timestamp: number };
Migration Checklist
When upgrading to the progressive flush feature:
yaml
Pre-migration checklist
- Document current softThresholdTokens value
- Identify custom flush handlers
- Review retry requirements for urgent flush
- Update user documentation
- Test with production-like token generation rates
- Verify backward compatibility with legacy configs