1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.mybatis.caches.memcached;
17
18 import java.io.IOException;
19 import java.io.Serializable;
20 import java.util.HashSet;
21 import java.util.Set;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Future;
24
25 import net.spy.memcached.CASResponse;
26 import net.spy.memcached.CASValue;
27 import net.spy.memcached.ConnectionFactoryBuilder;
28 import net.spy.memcached.MemcachedClient;
29 import net.spy.memcached.auth.AuthDescriptor;
30 import net.spy.memcached.auth.PlainCallbackHandler;
31 import net.spy.memcached.internal.OperationFuture;
32
33 import org.apache.ibatis.cache.CacheException;
34 import org.apache.ibatis.logging.Log;
35 import org.apache.ibatis.logging.LogFactory;
36
37
38
39
40 final class MemcachedClientWrapper {
41
42
43
44
45 private static final Log LOG = LogFactory.getLog(MemcachedCache.class);
46
47 private final MemcachedConfiguration configuration;
48
49 private final MemcachedClient client;
50
51
52
53
54
55
56 private class ObjectWithCas {
57
58 Object object;
59 long cas;
60
61 ObjectWithCas(Object object, long cas) {
62 this.setObject(object);
63 this.setCas(cas);
64 }
65
66 public Object getObject() {
67 return object;
68 }
69
70 public void setObject(Object object) {
71 this.object = object;
72 }
73
74 public long getCas() {
75 return cas;
76 }
77
78 public void setCas(long cas) {
79 this.cas = cas;
80 }
81
82 }
83
84 public MemcachedClientWrapper() {
85 configuration = MemcachedConfigurationBuilder.getInstance().parseConfiguration();
86 try {
87 if (configuration.isUsingSASL()) {
88 AuthDescriptor ad = new AuthDescriptor(new String[] { "PLAIN" },
89 new PlainCallbackHandler(configuration.getUsername(), configuration.getPassword()));
90 client = new MemcachedClient(new ConnectionFactoryBuilder()
91 .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY).setAuthDescriptor(ad).build(),
92 configuration.getAddresses());
93 } else {
94 client = new MemcachedClient(configuration.getConnectionFactory(), configuration.getAddresses());
95 }
96 } catch (IOException e) {
97 String message = "Impossible to instantiate a new memecached client instance, see nested exceptions";
98 LOG.error(message, e);
99 throw new RuntimeException(message, e);
100 }
101
102 if (LOG.isDebugEnabled()) {
103 LOG.debug("Running new Memcached client using " + configuration);
104 }
105 }
106
107
108
109
110
111
112
113
114
115 private String toKeyString(final Object key) {
116
117 String keyString = configuration.getKeyPrefix() + StringUtils.sha1Hex(key.toString());
118 if (LOG.isDebugEnabled()) {
119 LOG.debug("Object key '" + key + "' converted in '" + keyString + "'");
120 }
121 return keyString;
122 }
123
124
125
126
127
128
129 public Object getObject(Object key) {
130 String keyString = toKeyString(key);
131 Object ret = retrieve(keyString);
132
133 if (LOG.isDebugEnabled()) {
134 LOG.debug("Retrived object (" + keyString + ", " + ret + ")");
135 }
136
137 return ret;
138 }
139
140
141
142
143
144
145
146
147
148 private ObjectWithCas getGroup(String groupKey) {
149 if (LOG.isDebugEnabled()) {
150 LOG.debug("Retrieving group with id '" + groupKey + "'");
151 }
152
153 ObjectWithCas groups = null;
154 try {
155 groups = retrieveWithCas(groupKey);
156 } catch (Exception e) {
157 LOG.error("Impossible to retrieve group '" + groupKey + "' see nested exceptions", e);
158 }
159
160 if (groups == null) {
161 if (LOG.isDebugEnabled()) {
162 LOG.debug("Group '" + groupKey + "' not previously stored");
163 }
164 return null;
165 }
166
167 if (LOG.isDebugEnabled()) {
168 LOG.debug("retrieved group '" + groupKey + "' with values " + groups);
169 }
170
171 return groups;
172 }
173
174
175
176
177
178
179
180
181 private Object retrieve(final String keyString) {
182 Object retrieved = null;
183
184 if (configuration.isUsingAsyncGet()) {
185 Future<Object> future;
186 if (configuration.isCompressionEnabled()) {
187 future = client.asyncGet(keyString, new CompressorTranscoder());
188 } else {
189 future = client.asyncGet(keyString);
190 }
191
192 try {
193 retrieved = future.get(configuration.getTimeout(), configuration.getTimeUnit());
194 } catch (Exception e) {
195 future.cancel(false);
196 throw new CacheException(e);
197 }
198 } else {
199 if (configuration.isCompressionEnabled()) {
200 retrieved = client.get(keyString, new CompressorTranscoder());
201 } else {
202 retrieved = client.get(keyString);
203 }
204 }
205
206 return retrieved;
207 }
208
209
210
211
212
213
214
215
216
217
218 private ObjectWithCas retrieveWithCas(final String keyString) {
219 CASValue<Object> retrieved = null;
220
221 if (configuration.isUsingAsyncGet()) {
222 Future<CASValue<Object>> future;
223 if (configuration.isCompressionEnabled()) {
224 future = client.asyncGets(keyString, new CompressorTranscoder());
225 } else {
226 future = client.asyncGets(keyString);
227 }
228
229 try {
230 retrieved = future.get(configuration.getTimeout(), configuration.getTimeUnit());
231 } catch (Exception e) {
232 future.cancel(false);
233 throw new CacheException(e);
234 }
235 } else {
236 if (configuration.isCompressionEnabled()) {
237 retrieved = client.gets(keyString, new CompressorTranscoder());
238 } else {
239 retrieved = client.gets(keyString);
240 }
241 }
242
243 if (retrieved == null) {
244 return null;
245 }
246
247 return new ObjectWithCas(retrieved.getValue(), retrieved.getCas());
248 }
249
250 @SuppressWarnings("unchecked")
251 public void putObject(Object key, Object value, String id) {
252 String keyString = toKeyString(key);
253 String groupKey = toKeyString(id);
254
255 if (LOG.isDebugEnabled()) {
256 LOG.debug("Putting object (" + keyString + ", " + value + ")");
257 }
258
259 storeInMemcached(keyString, value);
260
261
262
263 boolean jobDone = false;
264
265 while (!jobDone) {
266 ObjectWithCas group = getGroup(groupKey);
267 Set<String> groupValues;
268
269 if (group == null || group.getObject() == null) {
270 groupValues = new HashSet<String>();
271 groupValues.add(keyString);
272
273 if (LOG.isDebugEnabled()) {
274 LOG.debug("Insert/Updating object (" + groupKey + ", " + groupValues + ")");
275 }
276
277 jobDone = tryToAdd(groupKey, groupValues);
278 } else {
279 groupValues = (Set<String>) group.getObject();
280 groupValues.add(keyString);
281
282 jobDone = storeInMemcached(groupKey, group);
283 }
284 }
285 }
286
287
288
289
290
291
292
293
294
295 private void storeInMemcached(String keyString, Object value) {
296 if (value != null && !Serializable.class.isAssignableFrom(value.getClass())) {
297 throw new CacheException(
298 "Object of type '" + value.getClass().getName() + "' that's non-serializable is not supported by Memcached");
299 }
300
301 if (configuration.isCompressionEnabled()) {
302 client.set(keyString, configuration.getExpiration(), value, new CompressorTranscoder());
303 } else {
304 client.set(keyString, configuration.getExpiration(), value);
305 }
306 }
307
308
309
310
311
312
313
314
315
316
317
318 private boolean storeInMemcached(String keyString, ObjectWithCas value) {
319 if (value != null && value.getObject() != null
320 && !Serializable.class.isAssignableFrom(value.getObject().getClass())) {
321 throw new CacheException("Object of type '" + value.getObject().getClass().getName()
322 + "' that's non-serializable is not supported by Memcached");
323 }
324
325 CASResponse response;
326
327 if (configuration.isCompressionEnabled()) {
328 response = client.cas(keyString, value.getCas(), value.getObject(), new CompressorTranscoder());
329 } else {
330 response = client.cas(keyString, value.getCas(), value.getObject());
331 }
332
333 return (response.equals(CASResponse.OBSERVE_MODIFIED) || response.equals(CASResponse.OK));
334 }
335
336
337
338
339
340
341
342
343
344
345
346 private boolean tryToAdd(String keyString, Object value) {
347 if (value != null && !Serializable.class.isAssignableFrom(value.getClass())) {
348 throw new CacheException(
349 "Object of type '" + value.getClass().getName() + "' that's non-serializable is not supported by Memcached");
350 }
351
352 boolean done;
353 OperationFuture<Boolean> result;
354
355 if (configuration.isCompressionEnabled()) {
356 result = client.add(keyString, configuration.getExpiration(), value, new CompressorTranscoder());
357 } else {
358 result = client.add(keyString, configuration.getExpiration(), value);
359 }
360
361 try {
362 done = result.get();
363 } catch (InterruptedException e) {
364 done = false;
365 } catch (ExecutionException e) {
366 done = false;
367 }
368
369 return done;
370 }
371
372 public Object removeObject(Object key) {
373 String keyString = toKeyString(key);
374
375 if (LOG.isDebugEnabled()) {
376 LOG.debug("Removing object '" + keyString + "'");
377 }
378
379 Object result = getObject(key);
380 if (result != null) {
381 client.delete(keyString);
382 }
383 return result;
384 }
385
386 @SuppressWarnings("unchecked")
387 public void removeGroup(String id) {
388 String groupKey = toKeyString(id);
389
390
391
392 boolean jobDone = false;
393
394 while (!jobDone) {
395 ObjectWithCas group = getGroup(groupKey);
396 Set<String> groupValues;
397
398 if (group == null || group.getObject() == null) {
399 if (LOG.isDebugEnabled()) {
400 LOG.debug("No need to flush cached entries for group '" + id + "' because is empty");
401 }
402 return;
403 }
404
405 if (LOG.isDebugEnabled()) {
406 LOG.debug("Flushing keys: " + group);
407 }
408
409 groupValues = (Set<String>) group.getObject();
410
411 for (String key : groupValues) {
412 client.delete(key);
413 }
414
415 if (LOG.isDebugEnabled()) {
416 LOG.debug("Flushing group: " + groupKey);
417 }
418
419 groupValues = (Set<String>) group.getObject();
420 groupValues.clear();
421
422 jobDone = storeInMemcached(groupKey, group);
423 }
424 }
425
426 @Override
427 protected void finalize() throws Throwable {
428 client.shutdown(configuration.getTimeout(), configuration.getTimeUnit());
429 super.finalize();
430 }
431
432 }