Skip to main content

Custom Strategies

While Polly Dart provides six comprehensive built-in strategies, you might need to create custom strategies for specific scenarios. This guide shows you how to build your own resilience strategies that integrate seamlessly with the Polly Dart pipeline.

Understanding Strategy Architecture

Every strategy in Polly Dart implements the same core pattern:

Core Strategy Interface

abstract class ResilienceStrategy<T> {
Future<T> execute<T>(
Future<T> Function(ResilienceContext context) operation,
ResilienceContext context,
);
}

Building Your First Custom Strategy

Let's create a Bulkhead Strategy that isolates resources by limiting concurrent executions:

Step 1: Define Strategy Options

class BulkheadStrategyOptions {
final int maxConcurrency;
final Duration? maxWaitTime;
final OnBulkheadRejectedCallback? onRejected;

const BulkheadStrategyOptions({
required this.maxConcurrency,
this.maxWaitTime,
this.onRejected,
});
}

typedef OnBulkheadRejectedCallback = Future<void> Function(
OnBulkheadRejectedArguments args
);

class OnBulkheadRejectedArguments {
final ResilienceContext context;
final int currentConcurrency;
final Duration waitTime;

OnBulkheadRejectedArguments({
required this.context,
required this.currentConcurrency,
required this.waitTime,
});
}

class BulkheadRejectedException implements Exception {
final String message;
final int maxConcurrency;
final int currentConcurrency;

BulkheadRejectedException(
this.message,
this.maxConcurrency,
this.currentConcurrency,
);


String toString() => 'BulkheadRejectedException: $message '
'(max: $maxConcurrency, current: $currentConcurrency)';
}

Step 2: Implement the Strategy

class BulkheadStrategy<T> implements ResilienceStrategy<T> {
final BulkheadStrategyOptions _options;
final Semaphore _semaphore;
int _currentConcurrency = 0;

BulkheadStrategy(this._options)
: _semaphore = Semaphore(_options.maxConcurrency);


Future<T> execute<T>(
Future<T> Function(ResilienceContext context) operation,
ResilienceContext context,
) async {
final stopwatch = Stopwatch()..start();

// Try to acquire a permit
final acquired = await _tryAcquirePermit();

if (!acquired) {
await _handleRejection(context, stopwatch.elapsed);
throw BulkheadRejectedException(
'Bulkhead capacity exceeded',
_options.maxConcurrency,
_currentConcurrency,
);
}

try {
_currentConcurrency++;
return await operation(context);
} finally {
_currentConcurrency--;
_semaphore.release();
}
}

Future<bool> _tryAcquirePermit() async {
if (_options.maxWaitTime == null) {
// No wait time - immediate check
return _semaphore.tryAcquire();
} else {
// Wait for specified time
return await _semaphore.acquire(_options.maxWaitTime!);
}
}

Future<void> _handleRejection(ResilienceContext context, Duration waitTime) async {
final callback = _options.onRejected;
if (callback != null) {
await callback(OnBulkheadRejectedArguments(
context: context,
currentConcurrency: _currentConcurrency,
waitTime: waitTime,
));
}
}
}

// Helper class for concurrency control
class Semaphore {
final int _maxCount;
int _currentCount;
final Queue<Completer<void>> _waitQueue = Queue<Completer<void>>();

Semaphore(this._maxCount) : _currentCount = _maxCount;

bool tryAcquire() {
if (_currentCount > 0) {
_currentCount--;
return true;
}
return false;
}

Future<bool> acquire(Duration timeout) async {
if (tryAcquire()) {
return true;
}

final completer = Completer<void>();
_waitQueue.add(completer);

try {
await completer.future.timeout(timeout);
return true;
} on TimeoutException {
_waitQueue.remove(completer);
return false;
}
}

void release() {
if (_waitQueue.isNotEmpty) {
final completer = _waitQueue.removeFirst();
completer.complete();
} else {
_currentCount++;
}
}
}

Step 3: Integrate with Pipeline Builder

extension BulkheadExtension on ResiliencePipelineBuilder {
ResiliencePipelineBuilder addBulkhead(BulkheadStrategyOptions options) {
return addStrategy(BulkheadStrategy(options));
}
}

Step 4: Usage Example

final pipeline = ResiliencePipelineBuilder()
.addBulkhead(BulkheadStrategyOptions(
maxConcurrency: 5,
maxWaitTime: Duration(seconds: 2),
onRejected: (args) async {
logger.warning('Bulkhead rejected: ${args.currentConcurrency}/${args.maxConcurrency}');
},
))
.addRetry(RetryStrategyOptions(maxRetryAttempts: 2))
.addTimeout(Duration(seconds: 10))
.build();

// Use the pipeline
final result = await pipeline.execute((context) async {
return await expensiveOperation();
});

Advanced Custom Strategy Examples

Cache-First Strategy

A strategy that tries cache first, then falls back to the operation:

class CacheFirstStrategyOptions<T> {
final CacheProvider<T> cache;
final String Function(ResilienceContext) keyGenerator;
final Duration? cacheTtl;
final bool cacheOnSuccess;

const CacheFirstStrategyOptions({
required this.cache,
required this.keyGenerator,
this.cacheTtl,
this.cacheOnSuccess = true,
});
}

class CacheFirstStrategy<T> implements ResilienceStrategy<T> {
final CacheFirstStrategyOptions<T> _options;

CacheFirstStrategy(this._options);


Future<T> execute<T>(
Future<T> Function(ResilienceContext context) operation,
ResilienceContext context,
) async {
final cacheKey = _options.keyGenerator(context);

// Try cache first
try {
final cached = await _options.cache.get(cacheKey);
if (cached != null) {
context.setProperty('cache_hit', true);
return cached as T;
}
} catch (e) {
// Cache error - continue to operation
context.setProperty('cache_error', e);
}

// Execute operation
final result = await operation(context);

// Cache successful result
if (_options.cacheOnSuccess) {
try {
await _options.cache.set(
cacheKey,
result,
ttl: _options.cacheTtl,
);
} catch (e) {
// Cache write error - don't fail the operation
context.setProperty('cache_write_error', e);
}
}

context.setProperty('cache_hit', false);
return result;
}
}

extension CacheFirstExtension on ResiliencePipelineBuilder {
ResiliencePipelineBuilder addCacheFirst<T>(CacheFirstStrategyOptions<T> options) {
return addStrategy(CacheFirstStrategy(options));
}
}

// Usage
final pipeline = ResiliencePipelineBuilder()
.addCacheFirst(CacheFirstStrategyOptions<UserProfile>(
cache: redisCache,
keyGenerator: (context) => 'user:${context.getProperty('userId')}',
cacheTtl: Duration(minutes: 15),
))
.addTimeout(Duration(seconds: 5))
.build();

Batching Strategy

Accumulates requests and executes them in batches:

class BatchingStrategyOptions<TRequest, TResponse> {
final int batchSize;
final Duration maxWaitTime;
final Future<List<TResponse>> Function(List<TRequest>) batchOperation;
final TRequest Function(ResilienceContext) requestExtractor;

const BatchingStrategyOptions({
required this.batchSize,
required this.maxWaitTime,
required this.batchOperation,
required this.requestExtractor,
});
}

class BatchingStrategy<TRequest, TResponse> implements ResilienceStrategy<TResponse> {
final BatchingStrategyOptions<TRequest, TResponse> _options;
final List<_BatchItem<TRequest, TResponse>> _pendingItems = [];
Timer? _batchTimer;

BatchingStrategy(this._options);


Future<TResponse> execute<TResponse>(
Future<TResponse> Function(ResilienceContext context) operation,
ResilienceContext context,
) async {
final request = _options.requestExtractor(context);
final completer = Completer<TResponse>();

final batchItem = _BatchItem<TRequest, TResponse>(
request: request,
completer: completer,
context: context,
);

_pendingItems.add(batchItem);

// Start timer if this is the first item
if (_pendingItems.length == 1) {
_batchTimer = Timer(_options.maxWaitTime, _processBatch);
}

// Process immediately if batch is full
if (_pendingItems.length >= _options.batchSize) {
_batchTimer?.cancel();
await _processBatch();
}

return completer.future;
}

Future<void> _processBatch() async {
if (_pendingItems.isEmpty) return;

final currentBatch = List<_BatchItem<TRequest, TResponse>>.from(_pendingItems);
_pendingItems.clear();
_batchTimer?.cancel();

try {
final requests = currentBatch.map((item) => item.request).toList();
final responses = await _options.batchOperation(requests);

// Complete individual requests
for (int i = 0; i < currentBatch.length && i < responses.length; i++) {
currentBatch[i].completer.complete(responses[i]);
}

// Handle any unmatched requests
for (int i = responses.length; i < currentBatch.length; i++) {
currentBatch[i].completer.completeError(
Exception('Batch response missing for request $i')
);
}
} catch (e) {
// Complete all with error
for (final item in currentBatch) {
item.completer.completeError(e);
}
}
}
}

class _BatchItem<TRequest, TResponse> {
final TRequest request;
final Completer<TResponse> completer;
final ResilienceContext context;

_BatchItem({
required this.request,
required this.completer,
required this.context,
});
}

extension BatchingExtension on ResiliencePipelineBuilder {
ResiliencePipelineBuilder addBatching<TRequest, TResponse>(
BatchingStrategyOptions<TRequest, TResponse> options,
) {
return addStrategy(BatchingStrategy(options));
}
}

// Usage
final pipeline = ResiliencePipelineBuilder()
.addBatching(BatchingStrategyOptions<int, UserProfile>(
batchSize: 10,
maxWaitTime: Duration(milliseconds: 100),
batchOperation: (userIds) => userService.getUsers(userIds),
requestExtractor: (context) => context.getProperty<int>('userId')!,
))
.build();

// Multiple calls get batched together
final futures = [1, 2, 3, 4, 5].map((userId) async {
final context = ResilienceContext();
context.setProperty('userId', userId);
return await pipeline.execute((ctx) async {
// This won't actually be called - batching handles it
throw UnimplementedError('Handled by batching strategy');
}, context: context);
});

final users = await Future.wait(futures);

Adaptive Load Shedding Strategy

Dynamically adjusts load based on system metrics:

class LoadSheddingStrategyOptions {
final SystemMetricsProvider metricsProvider;
final double cpuThreshold;
final double memoryThreshold;
final int maxQueueSize;
final LoadSheddingPolicy policy;

const LoadSheddingStrategyOptions({
required this.metricsProvider,
this.cpuThreshold = 0.8,
this.memoryThreshold = 0.85,
this.maxQueueSize = 100,
this.policy = LoadSheddingPolicy.priority,
});
}

enum LoadSheddingPolicy { fifo, priority, random }

class LoadSheddingStrategy<T> implements ResilienceStrategy<T> {
final LoadSheddingStrategyOptions _options;
final Queue<_QueuedRequest<T>> _requestQueue = Queue<_QueuedRequest<T>>();
bool _processingQueue = false;

LoadSheddingStrategy(this._options);


Future<T> execute<T>(
Future<T> Function(ResilienceContext context) operation,
ResilienceContext context,
) async {
final metrics = await _options.metricsProvider.getCurrentMetrics();

// Check if we should shed load
if (_shouldShedLoad(metrics)) {
throw LoadSheddingException('System overloaded', metrics);
}

// Check queue capacity
if (_requestQueue.length >= _options.maxQueueSize) {
_shedOldRequests();
}

// Queue or execute request
if (_shouldQueue(metrics)) {
return await _queueRequest(operation, context);
} else {
return await operation(context);
}
}

bool _shouldShedLoad(SystemMetrics metrics) {
return metrics.cpuUsage > _options.cpuThreshold ||
metrics.memoryUsage > _options.memoryThreshold;
}

bool _shouldQueue(SystemMetrics metrics) {
return metrics.cpuUsage > _options.cpuThreshold * 0.7 ||
metrics.memoryUsage > _options.memoryThreshold * 0.7;
}

Future<T> _queueRequest<T>(
Future<T> Function(ResilienceContext context) operation,
ResilienceContext context,
) async {
final completer = Completer<T>();
final priority = context.getProperty<int>('priority') ?? 0;

final queuedRequest = _QueuedRequest<T>(
operation: operation,
context: context,
completer: completer,
priority: priority,
queuedAt: DateTime.now(),
);

_requestQueue.add(queuedRequest);
_processQueueIfNeeded();

return completer.future;
}

void _processQueueIfNeeded() {
if (_processingQueue || _requestQueue.isEmpty) return;

_processingQueue = true;
_processQueue();
}

Future<void> _processQueue() async {
while (_requestQueue.isNotEmpty) {
final metrics = await _options.metricsProvider.getCurrentMetrics();

if (_shouldShedLoad(metrics)) {
await Future.delayed(Duration(milliseconds: 100));
continue;
}

final request = _getNextRequest();
if (request == null) break;

try {
final result = await request.operation(request.context);
request.completer.complete(result);
} catch (e) {
request.completer.completeError(e);
}

// Small delay to prevent CPU spinning
await Future.delayed(Duration(milliseconds: 10));
}

_processingQueue = false;
}

_QueuedRequest<T>? _getNextRequest<T>() {
if (_requestQueue.isEmpty) return null;

switch (_options.policy) {
case LoadSheddingPolicy.fifo:
return _requestQueue.removeFirst() as _QueuedRequest<T>;

case LoadSheddingPolicy.priority:
return _removeHighestPriority() as _QueuedRequest<T>;

case LoadSheddingPolicy.random:
final index = Random().nextInt(_requestQueue.length);
return _removeAt(index) as _QueuedRequest<T>;
}
}

_QueuedRequest _removeHighestPriority() {
var highest = _requestQueue.first;
var highestIndex = 0;

for (int i = 1; i < _requestQueue.length; i++) {
final current = _requestQueue.elementAt(i);
if (current.priority > highest.priority) {
highest = current;
highestIndex = i;
}
}

return _removeAt(highestIndex);
}

_QueuedRequest _removeAt(int index) {
final list = _requestQueue.toList();
final item = list.removeAt(index);
_requestQueue.clear();
_requestQueue.addAll(list);
return item;
}

void _shedOldRequests() {
final now = DateTime.now();
_requestQueue.removeWhere((request) {
final age = now.difference(request.queuedAt);
if (age > Duration(seconds: 30)) {
request.completer.completeError(
LoadSheddingException('Request timeout in queue', null)
);
return true;
}
return false;
});
}
}

class _QueuedRequest<T> {
final Future<T> Function(ResilienceContext context) operation;
final ResilienceContext context;
final Completer<T> completer;
final int priority;
final DateTime queuedAt;

_QueuedRequest({
required this.operation,
required this.context,
required this.completer,
required this.priority,
required this.queuedAt,
});
}

class SystemMetrics {
final double cpuUsage;
final double memoryUsage;
final int activeConnections;

SystemMetrics({
required this.cpuUsage,
required this.memoryUsage,
required this.activeConnections,
});
}

abstract class SystemMetricsProvider {
Future<SystemMetrics> getCurrentMetrics();
}

class LoadSheddingException implements Exception {
final String message;
final SystemMetrics? metrics;

LoadSheddingException(this.message, this.metrics);


String toString() => 'LoadSheddingException: $message';
}

Testing Custom Strategies

Unit Testing Framework

import 'package:test/test.dart';
import 'package:mockito/mockito.dart';

class MockCacheProvider<T> extends Mock implements CacheProvider<T> {}

void main() {
group('Custom Strategy Tests', () {
group('BulkheadStrategy', () {
test('should limit concurrent executions', () async {
final strategy = BulkheadStrategy(BulkheadStrategyOptions(
maxConcurrency: 2,
));

var concurrentCount = 0;
var maxConcurrent = 0;

Future<String> slowOperation(ResilienceContext context) async {
concurrentCount++;
maxConcurrent = max(maxConcurrent, concurrentCount);

await Future.delayed(Duration(milliseconds: 100));

concurrentCount--;
return 'completed';
}

// Start 5 operations concurrently
final futures = List.generate(5, (index) =>
strategy.execute(slowOperation, ResilienceContext())
);

await Future.wait(futures);

expect(maxConcurrent, equals(2));
});

test('should reject when maxWaitTime exceeded', () async {
final strategy = BulkheadStrategy(BulkheadStrategyOptions(
maxConcurrency: 1,
maxWaitTime: Duration(milliseconds: 50),
));

// Start long-running operation
final future1 = strategy.execute((context) async {
await Future.delayed(Duration(milliseconds: 200));
return 'first';
}, ResilienceContext());

// This should be rejected due to wait time
expect(
() => strategy.execute((context) async => 'second', ResilienceContext()),
throwsA(isA<BulkheadRejectedException>()),
);

await future1;
});
});

group('CacheFirstStrategy', () {
late MockCacheProvider<String> mockCache;
late CacheFirstStrategy<String> strategy;

setUp(() {
mockCache = MockCacheProvider<String>();
strategy = CacheFirstStrategy(CacheFirstStrategyOptions<String>(
cache: mockCache,
keyGenerator: (context) => 'test-key',
));
});

test('should return cached value when available', () async {
when(mockCache.get('test-key')).thenAnswer((_) async => 'cached-value');

final result = await strategy.execute(
(context) async => 'fresh-value',
ResilienceContext(),
);

expect(result, equals('cached-value'));
verifyNever(mockCache.set(any, any, ttl: anyNamed('ttl')));
});

test('should execute operation and cache result when cache miss', () async {
when(mockCache.get('test-key')).thenAnswer((_) async => null);
when(mockCache.set('test-key', 'fresh-value', ttl: anyNamed('ttl')))
.thenAnswer((_) async => {});

final result = await strategy.execute(
(context) async => 'fresh-value',
ResilienceContext(),
);

expect(result, equals('fresh-value'));
verify(mockCache.set('test-key', 'fresh-value', ttl: anyNamed('ttl')));
});
});
});
}

Integration Testing

void main() {
group('Custom Strategy Integration', () {
test('should work with other strategies', () async {
final pipeline = ResiliencePipelineBuilder()
.addBulkhead(BulkheadStrategyOptions(maxConcurrency: 2))
.addRetry(RetryStrategyOptions(maxRetryAttempts: 2))
.addTimeout(Duration(seconds: 1))
.build();

var attemptCount = 0;
final result = await pipeline.execute((context) async {
attemptCount++;
if (attemptCount == 1) {
throw Exception('First attempt fails');
}
return 'success';
});

expect(result, equals('success'));
expect(attemptCount, equals(2));
});
});
}

Strategy Composition Patterns

Decorator Pattern

Enhance existing strategies:

class LoggingStrategyDecorator<T> implements ResilienceStrategy<T> {
final ResilienceStrategy<T> _innerStrategy;
final Logger _logger;

LoggingStrategyDecorator(this._innerStrategy, this._logger);


Future<T> execute<T>(
Future<T> Function(ResilienceContext context) operation,
ResilienceContext context,
) async {
final stopwatch = Stopwatch()..start();
_logger.info('Strategy execution started');

try {
final result = await _innerStrategy.execute(operation, context);
_logger.info('Strategy execution completed in ${stopwatch.elapsedMilliseconds}ms');
return result;
} catch (e) {
_logger.error('Strategy execution failed: $e');
rethrow;
}
}
}

Strategy Factory

Create strategies based on configuration:

class StrategyFactory {
static ResilienceStrategy<T> create<T>(StrategyConfig config) {
switch (config.type) {
case StrategyType.bulkhead:
return BulkheadStrategy(config.bulkheadOptions!);
case StrategyType.cacheFirst:
return CacheFirstStrategy(config.cacheFirstOptions!);
case StrategyType.loadShedding:
return LoadSheddingStrategy(config.loadSheddingOptions!);
default:
throw ArgumentError('Unknown strategy type: ${config.type}');
}
}
}

Best Practices for Custom Strategies

Do

Follow Naming Conventions

// ✅ Good: Clear, descriptive names
class BulkheadStrategy implements ResilienceStrategy {}
class BulkheadStrategyOptions {}
class BulkheadRejectedException implements Exception {}

Handle Errors Gracefully

// ✅ Good: Proper error handling
try {
return await operation(context);
} catch (e) {
// Log, transform, or handle error appropriately
await _handleError(e, context);
rethrow;
}

Use Context for State Sharing

// ✅ Good: Share information via context
context.setProperty('cache_hit', true);
context.setProperty('execution_time', stopwatch.elapsed);

Provide Comprehensive Configuration

// ✅ Good: Flexible configuration options
class MyStrategyOptions {
final int maxItems;
final Duration timeout;
final bool enableMetrics;
final OnEventCallback? onEvent;
}

Don't

Block the Event Loop

// ❌ Bad: Blocking synchronous operations
await Future.delayed(Duration.zero); // Use this instead of Thread.sleep

Ignore Resource Cleanup

// ❌ Bad: No cleanup
class MyStrategy {
Timer? _timer;
// Missing dispose method!
}

// ✅ Good: Proper cleanup
class MyStrategy {
Timer? _timer;

void dispose() {
_timer?.cancel();
}
}

Break Pipeline Composition

// ❌ Bad: Not calling inner strategies
return await operation(context); // Skips other strategies

// ✅ Good: Proper delegation
return await _innerStrategy.execute(operation, context);

Performance Considerations

  • Memory Management: Clean up resources properly
  • CPU Usage: Avoid intensive synchronous operations
  • Concurrency: Use appropriate async patterns
  • Metrics: Track strategy performance and behavior

Publishing Custom Strategies

Consider publishing useful custom strategies as separate packages:

name: polly_dart_bulkhead
description: Bulkhead isolation strategy for Polly Dart
dependencies:
polly_dart: ^1.0.0

Next Steps

With custom strategies, you can:

  1. Learn Monitoring - Track your custom strategy behavior
  2. Master Testing - Validate custom strategy reliability
  3. Explore Examples - See real-world implementations

Custom strategies let you extend Polly Dart's capabilities to meet your unique resilience requirements! 🔧