Flow DSL: Declarative Business Logic

Focus on What Your Code Does, Not How It Does It

The "What" Over "How" Concept

In traditional imperative programming, we often focus on how to perform tasks - the specific steps, the order of operations, and the implementation details. This approach can lead to:

Declarative programming, on the other hand, emphasizes what we want to achieve rather than how to achieve it. This paradigm shift brings several advantages:

Why Flow DSL?

Flow DSL is a practical implementation of the "What" over "How" principle. It provides a declarative way to express business workflows while handling the complex "how" details under the hood.

Traditional Approach


try {
    // Validate order
    if (!validateOrder(order)) {
        throw new ValidationException();
    }
    
    // Process payment
    PaymentResult payment = null;
    for (int i = 0; i < 3; i++) {
        try {
            payment = processPayment(order);
            break;
        } catch (Exception e) {
            if (i == 2) throw e;
            Thread.sleep(100 * (i + 1));
        }
    }
    
    // Check inventory
    List inventory = new ArrayList<>();
    ExecutorService executor = Executors.newFixedThreadPool(order.items.size());
    try {
        List> futures = new ArrayList<>();
        for (OrderItem item : order.items) {
            futures.add(executor.submit(() -> checkInventory(item)));
        }
        for (Future future : futures) {
            inventory.add(future.get(5, TimeUnit.SECONDS));
        }
    } finally {
        executor.shutdown();
    }
} catch (Exception e) {
    // Compensation logic
    if (payment != null) {
        rollbackPayment(payment);
    }
    for (InventoryResult result : inventory) {
        rollbackInventory(result);
    }
    throw e;
}
                    

Flow DSL Approach


Flow.of(() -> order)
    .map(OrderService::validateOrder)
    .flatMap(validOrder -> 
        Flow.of(() -> processPayment(validOrder))
            .withRetry(3)
            .withBackoff(Duration.ofMillis(100))
            .withTimeout(Duration.ofSeconds(5))
            .withCompensation(PaymentService::rollback))
    .flatMap(payment -> 
        Flow.of(() -> order.items)
            .parallelMap(InventoryService::checkInventory)
            .withParallelism(order.items.size())
            .withCompensation(InventoryService::rollback))
    .execute();
                    

Key Features

Declarative Syntax

Express business logic in a clear, readable manner that closely matches your domain requirements.

Built-in Resilience

Automatic retry, circuit breaking, timeout handling, and compensation actions.

Parallel Processing

Effortless parallel execution with controlled concurrency and proper resource management.

Examples

Basic Flow


Flow.of(() -> "Hello")
    .map(str -> str + " ")
    .map(str -> str + "World")
    .map(String::toUpperCase)
    .execute();  // Returns "HELLO WORLD"
            

Error Handling


Flow.of(() -> "data")
    .onError(error -> log.error("Flow failed", error))
    .onComplete(result -> log.info("Flow completed with: {}", result))
    .withRetry(3)
    .withBackoff(Duration.ofSeconds(1))
    .execute();
            

Parallel Processing


Flow.parallel(
    () -> "Task 1",
    () -> "Task 2",
    () -> "Task 3"
)
.withParallelism(3)
.execute();
            

Spring Boot Integration

Flow DSL integrates seamlessly with Spring Boot applications. Here's how to use it in your services:

Service Layer Integration


@Service
@Slf4j
public class OrderService {
    private final PaymentService paymentService;
    private final InventoryService inventoryService;
    private final NotificationService notificationService;

    public OrderResult processOrder(Order order) {
        return Flow.of(() -> order)
            .map(this::validateOrder)
            .flatMap(validOrder -> 
                Flow.of(() -> paymentService.processPayment(validOrder))
                    .withRetry(3)
                    .withTimeout(Duration.ofSeconds(5))
                    .withCompensation(paymentService::rollbackPayment)
                    .onError(e -> log.error("Payment failed", e))
                    .onComplete(payment -> log.info("Payment processed: {}", payment)))
            .flatMap(payment -> 
                Flow.of(() -> inventoryService.reserveItems(order.getItems()))
                    .withRetry(2)
                    .withCompensation(inventoryService::releaseItems))
            .thenCompose(inventory -> 
                Flow.of(() -> notificationService.notifyCustomer(order))
                    .withRetry(3)
                    .async())
            .execute();
    }

    @Transactional
    public OrderResult processOrderWithTransaction(Order order) {
        return Flow.of(() -> order)
            .withContextData("transactionId", UUID.randomUUID().toString())
            .onEvent(event -> {
                if (event.getType() == FlowEvent.EventType.FLOW_ERROR) {
                    TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
                }
            })
            .map(this::validateOrder)
            .flatMap(this::processPayment)
            .flatMap(this::updateInventory)
            .execute();
    }

    private Flow processPayment(Order order) {
        return Flow.of(() -> paymentService.processPayment(order))
            .withRetry(3)
            .withTimeout(Duration.ofSeconds(5))
            .withCompensation(paymentService::rollbackPayment);
    }

    private Flow updateInventory(PaymentResult payment) {
        return Flow.of(() -> inventoryService.updateInventory(payment.getOrder()))
            .withRetry(2)
            .withCompensation(inventoryService::rollback);
    }
}
            

Advanced Examples

Context Sharing


Flow.of(() -> new FlowContext())
    .map(ctx -> {
        ctx.put("userId", "12345");
        return ctx;
    })
    .flatMap(ctx ->
        Flow.of(() -> userService.getUser(ctx.get("userId")))
            .withContext(ctx)
    )
    .execute();
            

Event Monitoring


Flow.of(() -> "process")
    .withEventListener(event -> {
        if (event.getType() == FlowEventType.STEP_START) {
            log.info("Starting step: {}", event.getStepName());
        }
    })
    .map(str -> str + " step 1")
    .map(str -> str + " step 2")
    .execute();
            

Conditional Flows


Flow.of(() -> order)
    .filter(Order::isValid)
    .branch(
        order -> order.getTotal() > 1000,
        premium -> premium
            .map(OrderService::applyPremiumDiscount)
            .map(NotificationService::sendPremiumNotification),
        regular -> regular
            .map(OrderService::applyRegularDiscount)
    )
    .execute();
            

Circuit Breaker Pattern


Flow.of(() -> externalService.call())
    .withCircuitBreaker(CircuitBreakerConfig.builder()
        .failureThreshold(5)
        .resetTimeout(Duration.ofMinutes(1))
        .build())
    .withFallback(() -> "fallback-response")
    .execute();
            

Async Composition


CompletableFuture result = Flow.of(() -> order)
    .thenCompose(order -> 
        Flow.parallel(
            () -> paymentService.processAsync(order),
            () -> inventoryService.checkAsync(order),
            () -> fraudService.validateAsync(order)
        )
        .withParallelism(3)
        .async()
    )
    .thenApply(results -> OrderResult.combine(results))
    .executeAsync();