The "What" Over "How" Concept
In traditional imperative programming, we often focus on how to perform tasks - the specific steps, the order of operations, and the implementation details. This approach can lead to:
- Complex, hard-to-maintain code
- Tight coupling between components
- Difficulty in understanding business logic
- Challenges in testing and modification
Declarative programming, on the other hand, emphasizes what we want to achieve rather than how to achieve it. This paradigm shift brings several advantages:
- Clearer expression of business intent
- Better separation of concerns
- Improved maintainability
- Enhanced testability
- Easier parallel processing
Why Flow DSL?
Flow DSL is a practical implementation of the "What" over "How" principle. It provides a declarative way to express business workflows while handling the complex "how" details under the hood.
Traditional Approach
try {
// Validate order
if (!validateOrder(order)) {
throw new ValidationException();
}
// Process payment
PaymentResult payment = null;
for (int i = 0; i < 3; i++) {
try {
payment = processPayment(order);
break;
} catch (Exception e) {
if (i == 2) throw e;
Thread.sleep(100 * (i + 1));
}
}
// Check inventory
List inventory = new ArrayList<>();
ExecutorService executor = Executors.newFixedThreadPool(order.items.size());
try {
List> futures = new ArrayList<>();
for (OrderItem item : order.items) {
futures.add(executor.submit(() -> checkInventory(item)));
}
for (Future future : futures) {
inventory.add(future.get(5, TimeUnit.SECONDS));
}
} finally {
executor.shutdown();
}
} catch (Exception e) {
// Compensation logic
if (payment != null) {
rollbackPayment(payment);
}
for (InventoryResult result : inventory) {
rollbackInventory(result);
}
throw e;
}
Flow DSL Approach
Flow.of(() -> order)
.map(OrderService::validateOrder)
.flatMap(validOrder ->
Flow.of(() -> processPayment(validOrder))
.withRetry(3)
.withBackoff(Duration.ofMillis(100))
.withTimeout(Duration.ofSeconds(5))
.withCompensation(PaymentService::rollback))
.flatMap(payment ->
Flow.of(() -> order.items)
.parallelMap(InventoryService::checkInventory)
.withParallelism(order.items.size())
.withCompensation(InventoryService::rollback))
.execute();
Key Features
Declarative Syntax
Express business logic in a clear, readable manner that closely matches your domain requirements.
Built-in Resilience
Automatic retry, circuit breaking, timeout handling, and compensation actions.
Parallel Processing
Effortless parallel execution with controlled concurrency and proper resource management.
Examples
Basic Flow
Flow.of(() -> "Hello")
.map(str -> str + " ")
.map(str -> str + "World")
.map(String::toUpperCase)
.execute(); // Returns "HELLO WORLD"
Error Handling
Flow.of(() -> "data")
.onError(error -> log.error("Flow failed", error))
.onComplete(result -> log.info("Flow completed with: {}", result))
.withRetry(3)
.withBackoff(Duration.ofSeconds(1))
.execute();
Parallel Processing
Flow.parallel(
() -> "Task 1",
() -> "Task 2",
() -> "Task 3"
)
.withParallelism(3)
.execute();
Spring Boot Integration
Flow DSL integrates seamlessly with Spring Boot applications. Here's how to use it in your services:
Service Layer Integration
@Service
@Slf4j
public class OrderService {
private final PaymentService paymentService;
private final InventoryService inventoryService;
private final NotificationService notificationService;
public OrderResult processOrder(Order order) {
return Flow.of(() -> order)
.map(this::validateOrder)
.flatMap(validOrder ->
Flow.of(() -> paymentService.processPayment(validOrder))
.withRetry(3)
.withTimeout(Duration.ofSeconds(5))
.withCompensation(paymentService::rollbackPayment)
.onError(e -> log.error("Payment failed", e))
.onComplete(payment -> log.info("Payment processed: {}", payment)))
.flatMap(payment ->
Flow.of(() -> inventoryService.reserveItems(order.getItems()))
.withRetry(2)
.withCompensation(inventoryService::releaseItems))
.thenCompose(inventory ->
Flow.of(() -> notificationService.notifyCustomer(order))
.withRetry(3)
.async())
.execute();
}
@Transactional
public OrderResult processOrderWithTransaction(Order order) {
return Flow.of(() -> order)
.withContextData("transactionId", UUID.randomUUID().toString())
.onEvent(event -> {
if (event.getType() == FlowEvent.EventType.FLOW_ERROR) {
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
}
})
.map(this::validateOrder)
.flatMap(this::processPayment)
.flatMap(this::updateInventory)
.execute();
}
private Flow processPayment(Order order) {
return Flow.of(() -> paymentService.processPayment(order))
.withRetry(3)
.withTimeout(Duration.ofSeconds(5))
.withCompensation(paymentService::rollbackPayment);
}
private Flow updateInventory(PaymentResult payment) {
return Flow.of(() -> inventoryService.updateInventory(payment.getOrder()))
.withRetry(2)
.withCompensation(inventoryService::rollback);
}
}
Advanced Examples
Context Sharing
Flow.of(() -> new FlowContext())
.map(ctx -> {
ctx.put("userId", "12345");
return ctx;
})
.flatMap(ctx ->
Flow.of(() -> userService.getUser(ctx.get("userId")))
.withContext(ctx)
)
.execute();
Event Monitoring
Flow.of(() -> "process")
.withEventListener(event -> {
if (event.getType() == FlowEventType.STEP_START) {
log.info("Starting step: {}", event.getStepName());
}
})
.map(str -> str + " step 1")
.map(str -> str + " step 2")
.execute();
Conditional Flows
Flow.of(() -> order)
.filter(Order::isValid)
.branch(
order -> order.getTotal() > 1000,
premium -> premium
.map(OrderService::applyPremiumDiscount)
.map(NotificationService::sendPremiumNotification),
regular -> regular
.map(OrderService::applyRegularDiscount)
)
.execute();
Circuit Breaker Pattern
Flow.of(() -> externalService.call())
.withCircuitBreaker(CircuitBreakerConfig.builder()
.failureThreshold(5)
.resetTimeout(Duration.ofMinutes(1))
.build())
.withFallback(() -> "fallback-response")
.execute();
Async Composition
CompletableFuture result = Flow.of(() -> order)
.thenCompose(order ->
Flow.parallel(
() -> paymentService.processAsync(order),
() -> inventoryService.checkAsync(order),
() -> fraudService.validateAsync(order)
)
.withParallelism(3)
.async()
)
.thenApply(results -> OrderResult.combine(results))
.executeAsync();