ExecutorService появился в Java 1.5 и предназначен для управления пулом потоков.
Есть несколько стандартных реализаций, которые можно получить из простой фабрики Executors:
- newFixedThreadPool
- newWorkStealingPool
- newSingleThreadExecutor
- newCachedThreadPool
- newSingleThreadScheduledExecutor
- newScheduledThreadPool
Догадаться об особенностях реализаций сервиса можно исходя из названий методов. В статье рассмотрен пример использования FixedThreadPool.
Для запуска приложения будем использовать Spring Boot:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
group 'javagrinko' version '1.0-SNAPSHOT' apply plugin: 'java' sourceCompatibility = 1.8 repositories { mavenCentral() } dependencies { compile "org.springframework.boot:spring-boot-starter:1.4.0.RELEASE" compile group: 'com.google.guava', name: 'guava', version: '19.0' testCompile group: 'junit', name: 'junit', version: '4.11' } |
Main.java
1 2 3 4 5 6 |
@SpringBootApplication public class Main { public static void main(String[] args) { SpringApplication.run(Main.class, args); } } |
Предположим, что клиентскому классу необходимо выполнить много работы, которую можно разделить на несколько независимых подзадач. Например, необходимо получить html-страницы по списку URL-адресов или обработать большой список строк:
1 2 3 |
ArrayList<string> works = Lists.newArrayList( "work1", "work2", "work3", "work4", "work5", "work6", "work7", "work8", "work9", "work10"); |
Работа будет заключаться в приведении строк к верхнему регистру
1 2 3 |
String doWork(String work){ work.toUpperCase() } |
Сохраним список работ, который необходимо провести, в виде List
1 2 3 |
List<callable<string>> taskList = works.stream() .map((work) -> (Callable<string>) () -> workService.doWork(work)) .collect(Collectors.toList()); |
Каждый элемент списка – это элементарная работа, реализация функционального интерфейса, которую необходимо выполнить запустив метод call. Но вместо ручного вызова мы создадим сервис, который будет управлять пулом потоков:
1 |
ExecutorService executorService = Executors.newFixedThreadPool(4); |
В параметре метода newFixedThreadPool мы указали значение 4, что задает размер будущему пулу потоков.
workService – это сервис Spring, который делает работу и выводит некоторую информацию для отладки:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
public interface WorkService { String doWork(String work); } @Service public class WorkServiceImpl implements WorkService { @Override public String doWork(String work) { try { Thread.sleep(1000); String threadName = Thread.currentThread().getName(); System.out.println(work + " " + threadName); } catch (Exception e) { e.printStackTrace(); return null; } return work.toUpperCase(); } } |
Для наглядности добавим паузу при выполнении задачи в 1 секунду. Если задача будет выполняться слишком быстро, то executorService может не выделять дополнительные потоки для обработки и выполнит все задачи в одном потоке.
Запускаем выполнение списка задач методом executorService.invokeAll(), который возвращает список Future объектов. Сразу смапим все Future в интересующие нас String и выведем на экран:
1 2 3 4 5 6 7 8 9 10 |
executorService.invokeAll(taskList).stream() .map(f -> { try { return f.get(); } catch (Exception e) { e.printStackTrace(); } return null; }) .forEach(r -> System.out.println("Work result = " + r)); |
Получившийся код класса клиента:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
package javagrinko.client; import com.google.common.collect.Lists; import javagrinko.thread.WorkService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.StopWatch; import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; @Component public class ClientImpl implements Client { public static final int N_THREADS = 4; @Autowired private WorkService workService; @PostConstruct private void setUp() { doManyTasks(); } @Override public void doManyTasks() { StopWatch stopWatch = new StopWatch(); stopWatch.start(); ArrayList<string> works = Lists.newArrayList( "work1", "work2", "work3", "work4", "work5", "work6", "work7", "work8", "work9", "work10");</string> List<callable<string>> taskList = works.stream() .map((work) -> (Callable<string>) () -> workService.doWork(work)) .collect(Collectors.toList());</string></callable<string> ExecutorService executorService = Executors.newFixedThreadPool(N_THREADS); try { executorService.invokeAll(taskList).stream() .map(f -> { try { return f.get(); } catch (Exception e) { e.printStackTrace(); } return null; }) .forEach(r -> System.out.println("Work result = " + r)); } catch (InterruptedException e) { e.printStackTrace(); } stopWatch.stop(); long lastTaskTimeMillis = stopWatch.getLastTaskTimeMillis(); System.out.println("All work takes " + lastTaskTimeMillis / 1000. + " sec."); executorService.shutdown(); } } |
Запустим пример и посмотрим на вывод:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
18:02:59.472 : work4 pool-1-thread-4 18:02:59.472 : work1 pool-1-thread-1 18:02:59.472 : work3 pool-1-thread-3 18:02:59.472 : work2 pool-1-thread-2 18:03:00.472 : work5 pool-1-thread-4 18:03:00.472 : work6 pool-1-thread-1 18:03:00.472 : work8 pool-1-thread-2 18:03:00.472 : work7 pool-1-thread-3 18:03:01.472 : work10 pool-1-thread-1 18:03:01.472 : work9 pool-1-thread-4 Work result = WORK1 Work result = WORK2 Work result = WORK3 Work result = WORK4 Work result = WORK5 Work result = WORK6 Work result = WORK7 Work result = WORK8 Work result = WORK9 Work result = WORK10 All work takes 3.016 sec. |
Из логов можно сделать следующие заключения:
- Сначала одновременно запустилось 4 потока на выполнение работ work1, work2, work3, work4
- Через секунду все 4 потока доделали работу и принялись выполнять work5, work6, work7, work8
- Еще через секунду потоки закончили выполнение второй четверки работ и два потока принялись выполнять последние две работы work9 и work10
- Получившийся список строк, как и список Future, является отсортированным, хотя это нигде не регламентировалось. Полагаю, не стоит надеяться, что список всегда будет отсортирован.
- На выполнение всех задач в 4 потока потрачено 3 секунды. 1 секунда – на первую четверку задач, 2 секунда – на вторую четверку задач, 3 секунда – на оставшиеся две задачи.
Архив с исходниками можно посмотреть на GitHub.