Java Executor Framework Tutorial

The Java Executor Framework, introduced in Java 5 as part of the java.util.concurrent package, provides a powerful and flexible thread pool implementation for running asynchronous tasks. It separates task submission from task execution mechanics, making concurrent programming more manageable and efficient.

Introduction

The Java Executor Framework, introduced in Java 5 as part of the java.util.concurrent package, provides a powerful and flexible thread pool implementation for running asynchronous tasks. It separates task submission from task execution mechanics, making concurrent programming more manageable and efficient.

Core Components

The Executor Framework is built around several key interfaces and classes:

Main Interfaces

  • Executor: The base interface with a single execute(Runnable) method
  • ExecutorService: Extends Executor with more methods for managing tasks and lifecycle
  • ScheduledExecutorService: Adds scheduling capabilities for delayed and periodic tasks
  • Future: Represents the result of an asynchronous computation
  • Callable: Similar to Runnable, but can return a result and throw exceptions

Important Implementation Classes

  • Executors: Factory class with static methods to create different executor types
  • ThreadPoolExecutor: The primary implementation of ExecutorService
  • ScheduledThreadPoolExecutor: Implementation of ScheduledExecutorService
  • FutureTask: Implementation of Future that also implements Runnable
  • CompletableFuture: Enhanced Future with support for composition, chaining, and exception handling

Basic Usage

Let’s start with a simple example:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BasicExecutorExample {
    public static void main(String[] args) {
        // Create a fixed thread pool with 3 threads
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        // Submit tasks to the executor
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            executor.execute(() -> {
                System.out.println("Task " + taskId + " is running on " + 
                                  Thread.currentThread().getName());
                try {
                    // Simulate some work
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("Task " + taskId + " completed");
            });
        }
        
        // Shut down the executor
        executor.shutdown();
        
        // Wait for all tasks to complete or timeout
        System.out.println("All tasks submitted. Waiting for completion...");
    }
}

This example creates a fixed thread pool with three threads and submits ten tasks to it. The thread pool will reuse the three threads to execute all tasks.

Thread Pool Types

The Executors factory class provides several static methods to create different types of thread pools:

1. Fixed Thread Pool

ExecutorService executor = Executors.newFixedThreadPool(nThreads);
  • Creates a thread pool with a fixed number of threads
  • If all threads are active when a new task is submitted, it will wait in a queue
  • Good for limiting resource usage and when you have a known number of tasks

2. Cached Thread Pool

ExecutorService executor = Executors.newCachedThreadPool();
  • Creates an expandable thread pool
  • Creates new threads as needed and reuses idle threads
  • Threads that remain idle for 60 seconds are terminated
  • Good for many short-lived tasks

3. Single Thread Executor

ExecutorService executor = Executors.newSingleThreadExecutor();
  • Creates an executor with a single worker thread
  • Tasks are guaranteed to execute sequentially
  • Good for tasks that must be executed in a specific order

4. Scheduled Thread Pool

ScheduledExecutorService executor = Executors.newScheduledThreadPool(corePoolSize);
  • Creates a thread pool that can schedule tasks to run after a delay or periodically
  • Good for tasks that need to run on a schedule

5. Work Stealing Pool (Java 8+)

ExecutorService executor = Executors.newWorkStealingPool(parallelism);
  • Creates a thread pool using the work-stealing algorithm
  • Automatically adjusts the number of threads to maintain parallelism level
  • Good for tasks that split into smaller subtasks

Working with Callable and Future

While Runnable is useful for tasks that don’t return results, Callable allows you to execute tasks that return values and throw checked exceptions.

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class CallableFutureExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        List<Future<Integer>> futures = new ArrayList<>();
        
        // Submit Callable tasks
        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            Future<Integer> future = executor.submit(() -> {
                System.out.println("Computing task " + taskId);
                // Simulate work
                Thread.sleep(1000);
                return taskId * 100;
            });
            futures.add(future);
        }
        
        // Retrieve results from futures
        for (Future<Integer> future : futures) {
            try {
                // get() blocks until the result is available
                Integer result = future.get();
                System.out.println("Result: " + result);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        
        executor.shutdown();
    }
}

Key Future Methods

  • get(): Blocks until the task completes
  • get(long timeout, TimeUnit unit): Blocks with a timeout
  • isDone(): Checks if the task is complete
  • cancel(boolean mayInterruptIfRunning): Attempts to cancel execution
  • isCancelled(): Checks if the task was cancelled

CompletableFuture API

Java 8 introduced CompletableFuture, which enhances the Future interface with support for composing asynchronous operations:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        // Create a CompletableFuture
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "Hello";
        }, executor);
        
        // Chain transformations
        CompletableFuture<String> transformedFuture = future
            .thenApply(s -> s + " World")
            .thenApply(String::toUpperCase);
        
        // Add a callback
        transformedFuture.thenAccept(result -> 
            System.out.println("Result: " + result));
        
        // Wait a bit to let the async operations complete
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        executor.shutdown();
    }
}

Key CompletableFuture Features

  • Chaining operationsthenApply()thenCompose()thenCombine()
  • Error handlingexceptionally()handle()
  • CallbacksthenAccept()thenRun()
  • Combining futuresallOf()anyOf()
  • TimeoutscompleteOnTimeout()orTimeout()

Scheduled Tasks

The ScheduledExecutorService interface allows you to schedule tasks to run after a delay or periodically:

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledTasksExample {
    public static void main(String[] args) {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
        
        // Schedule a task to run after a delay
        scheduler.schedule(() -> 
            System.out.println("Delayed task executed"), 
            2, TimeUnit.SECONDS);
        
        // Schedule a task to run periodically
        scheduler.scheduleAtFixedRate(() -> 
            System.out.println("Fixed rate task - " + System.currentTimeMillis()/1000), 
            1, 2, TimeUnit.SECONDS);
        
        // Schedule with fixed delay between tasks
        scheduler.scheduleWithFixedDelay(() -> {
            System.out.println("Fixed delay task - " + System.currentTimeMillis()/1000);
            try {
                Thread.sleep(1000); // Simulate work
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, 0, 2, TimeUnit.SECONDS);
        
        // Let the tasks run for a while
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        scheduler.shutdown();
    }
}

Key Scheduling Methods

  • schedule(Runnable, delay, unit): Executes once after the delay
  • schedule(Callable, delay, unit): Executes and returns a result after the delay
  • scheduleAtFixedRate(Runnable, initialDelay, period, unit): Executes periodically at a fixed rate
  • scheduleWithFixedDelay(Runnable, initialDelay, delay, unit): Executes with a fixed delay between tasks

The key difference between scheduleAtFixedRate and scheduleWithFixedDelay:

  • scheduleAtFixedRate: Attempts to maintain a constant execution frequency
  • scheduleWithFixedDelay: Maintains a delay period between the end of one execution and the start of the next

Thread Pool Configuration

When using ThreadPoolExecutor directly, you have fine-grained control over thread pool behavior:

import java.util.concurrent.*;

public class ThreadPoolConfigExample {
    public static void main(String[] args) {
        // Create a custom thread pool
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2,                      // corePoolSize
            5,                      // maximumPoolSize
            60, TimeUnit.SECONDS,   // keepAliveTime
            new LinkedBlockingQueue<>(10),  // workQueue
            new CustomThreadFactory(),      // threadFactory
            new CustomRejectionHandler()    // rejectionHandler
        );
        
        // Set additional properties
        executor.allowCoreThreadTimeOut(true);
        
        // Submit tasks
        for (int i = 0; i < 15; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("Task " + taskId + " running on " + 
                                  Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return taskId;
            });
        }
        
        // Shutdown after tasks complete
        executor.shutdown();
    }
    
    // Custom thread factory
    static class CustomThreadFactory implements ThreadFactory {
        private int counter = 0;
        
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r, "CustomWorker-" + counter++);
            thread.setPriority(Thread.NORM_PRIORITY);
            thread.setDaemon(false);
            return thread;
        }
    }
    
    // Custom rejection handler
    static class CustomRejectionHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("Task rejected: " + r.toString());
            // Implement custom logic for rejected tasks
            // e.g., save to disk, retry later, etc.
        }
    }
}

Key Configuration Parameters

  • corePoolSize: The minimum number of threads to keep alive
  • maximumPoolSize: The maximum number of threads allowed
  • keepAliveTime: How long excess idle threads will wait for new tasks before terminating
  • workQueue: The queue used for holding tasks before execution
  • threadFactory: Factory for creating new threads
  • rejectionHandler: Handler for tasks that cannot be executed

Queue Types

  • LinkedBlockingQueue: Unbounded queue (default for newFixedThreadPool)
  • ArrayBlockingQueue: Bounded queue with fixed capacity
  • SynchronousQueue: No capacity, handoffs tasks directly (default for newCachedThreadPool)
  • PriorityBlockingQueue: Priority-based queue

Rejection Policies

  • AbortPolicy: Throws RejectedExecutionException (default)
  • CallerRunsPolicy: Runs the task in the caller’s thread
  • DiscardPolicy: Silently discards the task
  • DiscardOldestPolicy: Discards the oldest unexecuted task and tries again

Exception Handling

Exception handling in thread pools requires special attention:

import java.util.concurrent.*;

public class ExceptionHandlingExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        
        // Handling exceptions with Future
        Future<Integer> future = executor.submit(() -> {
            throw new RuntimeException("Deliberate exception");
        });
        
        try {
            Integer result = future.get(); // This will throw ExecutionException
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.out.println("Task interrupted: " + e.getMessage());
        } catch (ExecutionException e) {
            System.out.println("Task threw an exception: " + e.getCause().getMessage());
        }
        
        // Using try-catch within the task
        executor.execute(() -> {
            try {
                // Task code that might throw an exception
                int x = 1 / 0; // Will throw ArithmeticException
            } catch (Exception e) {
                System.out.println("Caught exception in task: " + e.getMessage());
            }
        });
        
        // Using a custom UncaughtExceptionHandler
        ThreadFactory threadFactory = r -> {
            Thread t = new Thread(r);
            t.setUncaughtExceptionHandler((thread, throwable) -> 
                System.out.println("Uncaught exception in thread " + 
                                   thread.getName() + ": " + throwable.getMessage()));
            return t;
        };
        
        ExecutorService customExecutor = Executors.newFixedThreadPool(2, threadFactory);
        
        // This exception won't be caught by UncaughtExceptionHandler because it's handled by ExecutorService
        customExecutor.execute(() -> {
            throw new RuntimeException("Exception in task");
        });
        
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        executor.shutdown();
        customExecutor.shutdown();
    }
}

Best Practices for Exception Handling

  1. Always catch and handle exceptions within tasks when possible
  2. Use Future.get() to catch exceptions from Callable tasks
  3. Consider using CompletableFuture’s exception handling methods like exceptionally() and handle()
  4. Set an UncaughtExceptionHandler for truly uncaught exceptions
  5. Log exceptions properly to aid debugging

Graceful Shutdown

Proper shutdown of thread pools is essential to avoid resource leaks:

import java.util.concurrent.*;

public class GracefulShutdownExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(4);
        
        // Submit some tasks
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            executor.submit(() -> {
                try {
                    System.out.println("Starting task " + taskId);
                    Thread.sleep(taskId * 500); // Variable execution time
                    System.out.println("Completed task " + taskId);
                    return taskId;
                } catch (InterruptedException e) {
                    System.out.println("Task " + taskId + " interrupted");
                    Thread.currentThread().interrupt();
                    return -1;
                }
            });
        }
        
        // Initiate graceful shutdown
        executor.shutdown();
        
        try {
            // Wait for tasks to complete, with a timeout
            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                System.out.println("Some tasks didn't finish, forcing shutdown...");
                
                // Cancel currently executing tasks
                executor.shutdownNow();
                
                // Wait again for tasks to respond to interruption
                if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                    System.out.println("Thread pool did not terminate");
                }
            } else {
                System.out.println("All tasks completed successfully");
            }
        } catch (InterruptedException e) {
            // (Re-)Cancel if current thread also interrupted
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

Shutdown Process

  1. shutdown(): Stops accepting new tasks, but allows existing tasks to complete
  2. awaitTermination(): Blocks until all tasks complete or the timeout occurs
  3. shutdownNow(): Attempts to stop all actively executing tasks and returns pending tasks
  4. Use isTerminated() to check if all tasks have completed following shutdown

Best Practices

  1. Choose the right pool type based on your workload characteristics
  2. Size your thread pools appropriately
    • generally near the number of available processors for CPU-bound tasks, higher for I/O-bound tasks
  3. Avoid blocking operations in tasks when possible
  4. Use time limits with Future.get() to avoid indefinite blocking
  5. Always shut down your executors when they’re no longer needed
  6. Monitor thread pool metrics in production environments
  7. Avoid submitting tasks from within other tasks (can lead to thread pool starvation)
  8. Use meaningful thread names for easier debugging
  9. Consider task dependencies and use CompletableFuture for complex flows
  10. Handle exceptions properly in all tasks

Real-World Examples

Web Crawler

import java.util.*;
import java.util.concurrent.*;
import java.net.URL;

public class WebCrawlerExample {
    private final ConcurrentHashMap<String, Boolean> visitedUrls = new ConcurrentHashMap<>();
    private final ExecutorService executor = Executors.newFixedThreadPool(10);
    private final int maxDepth = 3;
    
    public void crawl(String startUrl, int depth) {
        if (depth > maxDepth || visitedUrls.putIfAbsent(startUrl, true) != null) {
            return;
        }
        
        try {
            // Download and parse the page (simplified)
            System.out.println("Crawling: " + startUrl + " at depth " + depth);
            
            // Simulate getting links from the page
            List<String> links = getLinksFromPage(startUrl);
            
            // Process each link in parallel
            List<CompletableFuture<Void>> futures = new ArrayList<>();
            
            for (String link : links) {
                CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                    crawl(link, depth + 1);
                }, executor);
                futures.add(future);
            }
            
            // Wait for all crawl operations at this depth to complete
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
            
        } catch (Exception e) {
            System.out.println("Error crawling " + startUrl + ": " + e.getMessage());
        }
    }
    
    private List<String> getLinksFromPage(String url) {
        // This would actually parse HTML and extract links
        // Simplified for the example
        try {
            Thread.sleep(100); // Simulate network delay
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        // Return fake links
        return Arrays.asList(
            url + "/page1",
            url + "/page2",
            url + "/page3"
        );
    }
    
    public void shutdown() {
        executor.shutdown();
        try {
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
    
    public static void main(String[] args) {
        WebCrawlerExample crawler = new WebCrawlerExample();
        crawler.crawl("https://example.com", 0);
        crawler.shutdown();
    }
}

Image Processing Pipeline

import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;
import java.io.File;

public class ImageProcessingExample {
    private final ExecutorService downloadExecutor = Executors.newFixedThreadPool(5);
    private final ExecutorService processingExecutor = Executors.newFixedThreadPool(
        Runtime.getRuntime().availableProcessors());
    private final ExecutorService savingExecutor = Executors.newFixedThreadPool(3);
    
    public void processImages(List<String> imageUrls) {
        List<CompletableFuture<Void>> pipelines = new ArrayList<>();
        
        for (String url : imageUrls) {
            // Create a processing pipeline for each image
            CompletableFuture<byte[]> downloadFuture = CompletableFuture.supplyAsync(() -> {
                System.out.println("Downloading image: " + url);
                return downloadImage(url);
            }, downloadExecutor);
            
            CompletableFuture<byte[]> processingFuture = downloadFuture.thenApplyAsync(imageData -> {
                System.out.println("Processing image: " + url);
                return applyFilters(imageData);
            }, processingExecutor);
            
            CompletableFuture<Void> savingFuture = processingFuture.thenAcceptAsync(processedData -> {
                System.out.println("Saving image: " + url);
                saveImage(url, processedData);
            }, savingExecutor);
            
            // Add error handling
            CompletableFuture<Void> pipeline = savingFuture.exceptionally(ex -> {
                System.err.println("Error processing image " + url + ": " + ex.getMessage());
                return null;
            });
            
            pipelines.add(pipeline);
        }
        
        // Wait for all pipelines to complete
        CompletableFuture.allOf(pipelines.toArray(new CompletableFuture[0])).join();
    }
    
    private byte[] downloadImage(String url) {
        // Simulate downloading an image
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return new byte[1024]; // Dummy data
    }
    
    private byte[] applyFilters(byte[] imageData) {
        // Simulate image processing
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return imageData; // Return processed data
    }
    
    private void saveImage(String url, byte[] imageData) {
        // Simulate saving the image
        try {
            Thread.sleep(300);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        String filename = url.substring(url.lastIndexOf('/') + 1);
        System.out.println("Saved image to: " + filename);
    }
    
    public void shutdown() {
        downloadExecutor.shutdown();
        processingExecutor.shutdown();
        savingExecutor.shutdown();
    }
    
    public static void main(String[] args) {
        List<String> imageUrls = List.of(
            "https://example.com/image1.jpg",
            "https://example.com/image2.jpg",
            "https://example.com/image3.jpg",
            "https://example.com/image4.jpg",
            "https://example.com/image5.jpg"
        );
        
        ImageProcessingExample processor = new ImageProcessingExample();
        processor.processImages(imageUrls);
        processor.shutdown();
    }
}

Task Prioritization Example

import java.util.concurrent.*;

public class PriorityTaskExample {
    // Define task priority levels
    public enum TaskPriority {
        HIGH(1),
        MEDIUM(2),
        LOW(3);
        
        private final int value;
        
        TaskPriority(int value) {
            this.value = value;
        }
        
        public int getValue() {
            return value;
        }
    }
    
    // Create a task class that implements comparable
    static class PriorityTask implements Runnable, Comparable<PriorityTask> {
        private final String name;
        private final TaskPriority priority;
        
        public PriorityTask(String name, TaskPriority priority) {
            this.name = name;
            this.priority = priority;
        }
        
        @Override
        public void run() {
            System.out.println("Executing " + priority + " priority task: " + name + 
                              " on thread " + Thread.currentThread().getName());
            try {
                // Simulate work
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        @Override
        public int compareTo(PriorityTask other) {
            // Higher priority (lower value) comes first
            return Integer.compare(this.priority.getValue(), other.priority.getValue());
        }
    }
    
    public static void main(String[] args) {
        // Create thread pool with priority queue
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, 2, 0L, TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<>());
        
        // Submit tasks with different priorities
        executor.execute(new PriorityTask("Task 1", TaskPriority.LOW));
        executor.execute(new PriorityTask("Task 2", TaskPriority.HIGH));
        executor.execute(new PriorityTask("Task 3", TaskPriority.MEDIUM));
        executor.execute(new PriorityTask("Task 4", TaskPriority.HIGH));
        executor.execute(new PriorityTask("Task 5", TaskPriority.LOW));
        executor.execute(new PriorityTask("Task 6", TaskPriority.MEDIUM));
        
        // Wait for all tasks to complete
        executor.shutdown();
        try {
            executor.awaitTermination(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

Conclusion

The Java Executor Framework provides a robust and flexible solution for managing concurrent tasks and thread pools. By separating task submission from execution mechanics, it enables cleaner, more maintainable code while offering powerful features like task scheduling, result handling, and composition of asynchronous operations.

When designing concurrent applications, carefully consider your workload characteristics to choose the appropriate thread pool type and configuration. Remember to follow best practices for exception handling and proper shutdown to ensure your applications run reliably.

With the introduction of CompletableFuture in Java 8 and subsequent enhancements, the Executor Framework has become even more powerful for designing complex asynchronous workflows with dependencies, transformations, and robust error handling.

Leave a Comment

Your email address will not be published. Required fields are marked *