1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.mybatis.spring.batch;
17
18 import static org.springframework.util.Assert.isTrue;
19 import static org.springframework.util.Assert.notNull;
20
21 import java.util.List;
22
23 import org.apache.ibatis.executor.BatchResult;
24 import org.apache.ibatis.session.ExecutorType;
25 import org.apache.ibatis.session.SqlSession;
26 import org.apache.ibatis.session.SqlSessionFactory;
27 import org.mybatis.logging.Logger;
28 import org.mybatis.logging.LoggerFactory;
29 import org.mybatis.spring.SqlSessionTemplate;
30 import org.springframework.batch.item.Chunk;
31 import org.springframework.batch.item.ItemWriter;
32 import org.springframework.beans.factory.InitializingBean;
33 import org.springframework.core.convert.converter.Converter;
34 import org.springframework.dao.EmptyResultDataAccessException;
35 import org.springframework.dao.InvalidDataAccessResourceUsageException;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 public class MyBatisBatchItemWriter<T> implements ItemWriter<T>, InitializingBean {
56
57 private static final Logger LOGGER = LoggerFactory.getLogger(MyBatisBatchItemWriter.class);
58
59 private SqlSessionTemplate sqlSessionTemplate;
60
61 private String statementId;
62
63 private boolean assertUpdates = true;
64
65 private Converter<T, ?> itemToParameterConverter = new PassThroughConverter<>();
66
67
68
69
70
71
72
73
74 public void setAssertUpdates(boolean assertUpdates) {
75 this.assertUpdates = assertUpdates;
76 }
77
78
79
80
81
82
83
84 public void setSqlSessionFactory(SqlSessionFactory sqlSessionFactory) {
85 if (sqlSessionTemplate == null) {
86 this.sqlSessionTemplate = new SqlSessionTemplate(sqlSessionFactory, ExecutorType.BATCH);
87 }
88 }
89
90
91
92
93
94
95
96 public void setSqlSessionTemplate(SqlSessionTemplate sqlSessionTemplate) {
97 this.sqlSessionTemplate = sqlSessionTemplate;
98 }
99
100
101
102
103
104
105
106 public void setStatementId(String statementId) {
107 this.statementId = statementId;
108 }
109
110
111
112
113
114
115
116
117
118
119
120 public void setItemToParameterConverter(Converter<T, ?> itemToParameterConverter) {
121 this.itemToParameterConverter = itemToParameterConverter;
122 }
123
124
125
126
127 @Override
128 public void afterPropertiesSet() {
129 notNull(sqlSessionTemplate, "A SqlSessionFactory or a SqlSessionTemplate is required.");
130 isTrue(ExecutorType.BATCH == sqlSessionTemplate.getExecutorType(),
131 "SqlSessionTemplate's executor type must be BATCH");
132 notNull(statementId, "A statementId is required.");
133 notNull(itemToParameterConverter, "A itemToParameterConverter is required.");
134 }
135
136
137
138
139 @Override
140 public void write(final Chunk<? extends T> items) {
141
142 if (!items.isEmpty()) {
143 LOGGER.debug(() -> "Executing batch with " + items.size() + " items.");
144
145 for (T item : items) {
146 sqlSessionTemplate.update(statementId, itemToParameterConverter.convert(item));
147 }
148
149 List<BatchResult> results = sqlSessionTemplate.flushStatements();
150
151 if (assertUpdates) {
152 if (results.size() != 1) {
153 throw new InvalidDataAccessResourceUsageException("Batch execution returned invalid results. "
154 + "Expected 1 but number of BatchResult objects returned was " + results.size());
155 }
156
157 int[] updateCounts = results.get(0).getUpdateCounts();
158
159 for (int i = 0; i < updateCounts.length; i++) {
160 int value = updateCounts[i];
161 if (value == 0) {
162 throw new EmptyResultDataAccessException("Item " + i + " of " + updateCounts.length
163 + " did not update any rows: [" + items.getItems().get(i) + "]", 1);
164 }
165 }
166 }
167 }
168 }
169
170 private static class PassThroughConverter<T> implements Converter<T, T> {
171
172 @Override
173 public T convert(T source) {
174 return source;
175 }
176
177 }
178
179 }