Project: A Concurrent Task Processor
Project: A Concurrent Task Processor
Throughout this tutorial you have built up a toolkit: thread pools, Callable and Future, CompletableFuture, concurrent collections, locks, and synchronizers. In this capstone project you will combine those pieces into a small but realistic application — a Concurrent Task Processor — that accepts a batch of work items, processes them in parallel using a bounded thread pool, aggregates the results, and reports progress as work completes.
The project has three layers:
- Task definition — a record that describes one unit of work.
- Processor — submits tasks to an
ExecutorService, gathersCompletableFuturehandles, and streams the finished results. - Driver — wires everything together, feeds tasks in, and prints a final summary.
Step 1: Model the Work
A Java record makes the task description concise and immutable. Each WorkItem has an id and a simulated payload:
equals, hashCode, toString, and final fields for free. Immutable data flowing through concurrent code is always safer — no shared mutable state to protect.
Step 2: Build the Processor
The processor class owns a fixed-size thread pool and exposes a single method that accepts a list of work items and returns a list of CompletableFuture<String>. Using CompletableFuture instead of raw Future lets us chain callbacks without blocking the calling thread:
Thread.ofVirtual().factory() creates a virtual-thread factory. Passing it to newFixedThreadPool means each task gets a lightweight virtual thread. For IO-heavy workloads this scales far better than platform threads — the JVM parks virtual threads during blocking calls without occupying an OS thread. For pure CPU work, keep the pool size at Runtime.getRuntime().availableProcessors().
Step 3: Aggregate Results with a Concurrent Counter
As futures complete we want a live success/failure count. LongAdder is designed exactly for high-contention increments — faster than AtomicLong under heavy writes because it stripes the counter across cells:
Step 4: Drive the Pipeline
The driver creates the work items, submits them, attaches a per-item callback to print progress immediately as each future finishes, then waits for the whole batch before printing the summary:
thenAccept instead of looping over future.get()? Calling get() in a loop processes results in submission order — if item 1 takes 300 ms but item 2 finishes in 50 ms, you still wait. thenAccept fires the callback on whichever thread completes the future first, so you see output arrive in completion order, not submission order. allOf(...).join() then waits only for the slowest one.
Key Trade-offs in the Design
- Pool size vs. workload shape: For IO-heavy tasks (network, disk), virtual threads or a large pool beats a small fixed pool. For CPU-bound tasks, a pool of size
availableProcessors()maximises throughput without over-subscribing the CPU. - Error isolation:
exceptionallyconverts an exception into a sentinel string so one bad item never cancels the batch. An alternative ishandle(), which receives both result and exception together and lets you return a richer error object. - Back-pressure: If tasks are produced faster than they are consumed the pool queue grows unboundedly. For high-throughput systems consider a
ThreadPoolExecutorwith a boundedArrayBlockingQueueand aCallerRunsPolicyrejection handler — that slows the producer naturally. - Observability: In production you would replace
System.out.printlnwith structured logging and expose the aggregator counters via a metrics endpoint (Micrometer, Prometheus).
try-with-resources block. The AutoCloseable implementation shuts the pool down when the block exits. If a reference escapes, callers can submit tasks to a shut-down executor and get RejectedExecutionException at runtime.
Extending the Project
Once the baseline works, consider these exercises to deepen your understanding:
- Timeout per task — wrap each
supplyAsyncwith.orTimeout(2, TimeUnit.SECONDS). Tasks that run too long complete exceptionally withTimeoutException. - Priority queue — replace the list with a
PriorityBlockingQueue<WorkItem>sorted by a priority field. Feed items from the queue intosubmitAllin batches. - Phased processing — chain a second
CompletableFuturestage after the first that persists the result to a database. UsethenApplyAsyncwith a second, smaller executor to avoid blocking the compute pool with IO.
Summary
This project brought together every concept from the tutorial: an ExecutorService for pooled execution, CompletableFuture for non-blocking composition, exceptionally for fault isolation, LongAdder for high-performance aggregation, and allOf for barrier synchronisation. The key insight is that each tool has a job — combine them by role, keep shared state minimal and well-typed, and let the functional pipeline carry data through the stages cleanly.