В этой статье мы напишем свою реализацию интерфейсов ItemReader, ItemPocessor и ItemWriter, а так же сервис для управления всеми задачами в приложении. Написание собственных реализаций рассматриваемых интерфейсов может быть полезно для оптимизации и ускорения конкретной пакетной обработки.
Скачайте репозиторий и переключитесь на ветку step1:
1 2 |
git clone https://github.com/JavaGrinko/batch-example.git git checkout -f step1 |
Интерфейс ItemReader<T> работает как итератор и используется как источник данных, которые необходимо обработать:
1 2 3 |
public interface ItemReader<T> { T read() throws Exception, ...; } |
Источник данных ItemReader<T> за одну итерацию должен возвращать один объект T, в нашем примере это один товар Product. Процесс итерирования продолжается до тех пор, пока метод read() не вернет null, тогда выполняется последняя обработка, последнее сохранение, и шаг заканчивается.
В последней версии Spring Batch 3.0.7 поставляются стандартные реализации ItemReader:
AggregateItemReader, AmqpItemReader, FlatFileItemReader, HibernateCursorItemReader, HibernatePagingItemReader, IbatisPagingItemReader, ItemReaderAdapter, JdbcCursorItemReader, JdbcPagingItemReader, JmsItemReader, JpaPagingItemReader, ListItemReader, MongoItemReader, Neo4jItemReader, RepositoryItemReader, StoredProcedureItemReader, StaxEventItemReader.
Для нашей задачи больше всего подходит FlatFileItemReader, но нам не нужна вся универсальность этой реализации и для увеличения производительности мы напишем свой узкопрофильный CsvItemReader:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
package javagrinko.batch.example.batch; import lombok.Data; import lombok.extern.log4j.Log4j; import org.springframework.batch.core.JobParameter; import org.springframework.batch.core.annotation.AfterStep; import org.springframework.batch.core.annotation.BeforeStep; import org.springframework.batch.item.ItemReader; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.convert.ConversionService; import org.springframework.core.io.ClassPathResource; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Scanner; @Data public class CsvItemReader<T> implements ItemReader { @Autowired private ConversionService conversionService; private File source; private Scanner scanner; private Class<T> typeParameterClass; public CsvItemReader(Class<T> typeParameterClass, String fileName) { try { source = new ClassPathResource(fileName).getFile(); } catch (IOException e) { e.printStackTrace(); } this.typeParameterClass = typeParameterClass; } @BeforeStep public void open() throws FileNotFoundException { scanner = new Scanner(source); } @Override public Object read() throws Exception { if (scanner.hasNextLine()) { String s = scanner.nextLine(); return conversionService.convert(s, typeParameterClass); } else { return null; } } @AfterStep public void destroy() { scanner.close(); } } |
Методы, аннотированные @BeforeStep, будут выполнены 1 раз перед запуском всего шага. Аналогично, метод destroy с аннотацией @AfterStep будет вызван после завершения шага вне зависимости от результата.
Для преобразования строки в объект мы используем не рефлексивный маппинг, а conversionService, который работает быстрее рефлексии. Для работы conversionService нужно заранее зарегистрировать в него конвертер строки в продукт StringProductConverter:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
@Service public class StringProductConverter implements Converter<String, Product> { @Override public Product convert(String source) { String[] split = source.split(","); Product product = new Product(); product.setId(Long.parseLong(split[0])); product.setName(split[1]); product.setDescription(split[2]); product.setPrice(Double.parseDouble(split[3])); return product; } } |
Теперь напишем реализацию интерфейса ItemProcessor<I, O>:
1 2 3 |
public interface ItemProcessor<I, O> { O process(I item) throws Exception; } |
В нашем примере название товара мы переводим в ВЕРХНИЙ РЕГИСТР:
1 2 3 4 5 6 7 8 9 10 11 12 |
package javagrinko.batch.example.batch; import javagrinko.batch.example.model.Product; import org.springframework.batch.item.ItemProcessor; public class SimpleProcessor implements ItemProcessor<Product, Product> { @Override public Product process(Product item) throws Exception { item.setName(item.getName().toUpperCase()); return item; } } |
И последнее, что нужно сделать – это реализовать интерфейс ItemWriter<T>:
1 2 3 |
public interface ItemWriter<T> { void write(List<? extends T> items) throws Exception; } |
В последней версии Spring Batch 3.0.7 поставляются стандартные реализации ItemWriter:
AmqpItemWriter, CompositeItemWriter, FlatFileItemWriter, GemfireItemWriter, HibernateItemWriter, IbatisBatchItemWriter, ItemWriterAdapter, JdbcBatchItemWriter, JmsItemWriter, JpaItemWriter, MimeMessageItemWriter, MongoItemWriter, Neo4jItemWriter, PropertyExtractingDelegatingItemWriter, RepositoryItemWriter, StaxEventItemWriter.
Для записи в базу данных мы будем использовать JdbcTemplate:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
package javagrinko.batch.example.batch; import javagrinko.batch.example.model.Product; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; public class JdbcItemWriter implements ItemWriter<Product> { @Autowired public JdbcTemplate jdbcTemplate; @Override public void write(List<? extends Product> items) throws Exception { StringBuilder values = new StringBuilder(" VALUES "); for (int i = 0; i < items.size(); i++) { values.append(" (?, ?, ?, ?)"); if (i < items.size() - 1) values.append(", "); } List<Object> arguments = items.stream() .map(p -> new Object[]{p.getId(), p.getName(), p.getDescription(), p.getPrice()}) .flatMap(objects -> Arrays.asList(objects).stream()) .collect(Collectors.toList()); String sql = "INSERT INTO PRODUCTS (PRODUCT_ID, NAME, DESCRIPTION, PRICE)" + values.toString(); jdbcTemplate.update(sql, arguments.toArray()); } } |
Здесь формируется один запрос к базе данных на вставку всех элементов списка items. Items – это пачка (chunk) записей, полученная в результате многократного вызова ItemReader.read(). Размер items задается параметром chunk в StepBuilder (см. предыдущую часть). Особенностью данной реализации является отсутствие транзакции, что экономит немного ресурсов.
Теперь, модифицируем код из первой части руководства и заменим стандартные реализации ItemReader и ItemWriter на наши собственные, а так же заменим лямбда-выражение на SimpleProcessor. В результате получим код, который можно посмотреть в репозитории на шаге step1:
1 2 |
git clone https://github.com/JavaGrinko/batch-example.git git checkout -f step1 |
Теперь, когда всё готово, напишем небольшой сервис, который будет искать нашу задачу в реестре задач приложения и запускать ее на выполнение. Для начала определим интерфейс:
1 2 3 |
public interface JobService { void start(String jobName); } |
Реализация выглядит следующим образом:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
package javagrinko.batch.example.service; import lombok.extern.log4j.Log4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.configuration.JobRegistry; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service @Log4j public class JobServiceImpl implements JobService { @Autowired private JobLauncher jobLauncher; @Autowired private JobRegistry jobRegistry; @Override public void start(String jobName) { try { Job job = jobRegistry.getJob(jobName); jobLauncher.run(job, new JobParameters()); } catch (Exception e) { e.printStackTrace(); } } } |
Здесь используется стандартный бин JobRegistry, который знает обо всех задачах, определенных в проекте, и умеет возвращать экземпляр Job по названию задачи. Для использования бина JobRegistry необходимо в конфигурационный файл добавить следующие bean definitions:
1 2 3 4 5 6 7 8 9 10 11 |
@Bean public JobRegistry jobRegistry() { return new MapJobRegistry(); } @Bean public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() { JobRegistryBeanPostProcessor bpp = new JobRegistryBeanPostProcessor(); bpp.setJobRegistry(jobRegistry()); return bpp; } |
В следующей части мы поговорим о передаче данных между шагами и о перезапуске пакетной задачи.