View Javadoc
1   /*
2    *    Copyright 2012-2022 the original author or authors.
3    *
4    *    Licensed under the Apache License, Version 2.0 (the "License");
5    *    you may not use this file except in compliance with the License.
6    *    You may obtain a copy of the License at
7    *
8    *       https://www.apache.org/licenses/LICENSE-2.0
9    *
10   *    Unless required by applicable law or agreed to in writing, software
11   *    distributed under the License is distributed on an "AS IS" BASIS,
12   *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *    See the License for the specific language governing permissions and
14   *    limitations under the License.
15   */
16  package org.mybatis.caches.memcached;
17  
18  import java.io.IOException;
19  import java.io.Serializable;
20  import java.util.HashSet;
21  import java.util.Set;
22  import java.util.concurrent.ExecutionException;
23  import java.util.concurrent.Future;
24  
25  import net.spy.memcached.CASResponse;
26  import net.spy.memcached.CASValue;
27  import net.spy.memcached.ConnectionFactoryBuilder;
28  import net.spy.memcached.MemcachedClient;
29  import net.spy.memcached.auth.AuthDescriptor;
30  import net.spy.memcached.auth.PlainCallbackHandler;
31  import net.spy.memcached.internal.OperationFuture;
32  
33  import org.apache.ibatis.cache.CacheException;
34  import org.apache.ibatis.logging.Log;
35  import org.apache.ibatis.logging.LogFactory;
36  
37  /**
38   * @author Simone Tripodi
39   */
40  final class MemcachedClientWrapper {
41  
42    /**
43     * This class log.
44     */
45    private static final Log LOG = LogFactory.getLog(MemcachedCache.class);
46  
47    private final MemcachedConfiguration configuration;
48  
49    private final MemcachedClient client;
50  
51    /**
52     * Used to represent an object retrieved from Memcached along with its CAS information
53     *
54     * @author Weisz, Gustavo E.
55     */
56    private class ObjectWithCas {
57  
58      Object object;
59      long cas;
60  
61      ObjectWithCas(Object object, long cas) {
62        this.setObject(object);
63        this.setCas(cas);
64      }
65  
66      public Object getObject() {
67        return object;
68      }
69  
70      public void setObject(Object object) {
71        this.object = object;
72      }
73  
74      public long getCas() {
75        return cas;
76      }
77  
78      public void setCas(long cas) {
79        this.cas = cas;
80      }
81  
82    }
83  
84    public MemcachedClientWrapper() {
85      configuration = MemcachedConfigurationBuilder.getInstance().parseConfiguration();
86      try {
87        if (configuration.isUsingSASL()) {
88          AuthDescriptor ad = new AuthDescriptor(new String[] { "PLAIN" },
89              new PlainCallbackHandler(configuration.getUsername(), configuration.getPassword()));
90          client = new MemcachedClient(new ConnectionFactoryBuilder()
91              .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY).setAuthDescriptor(ad).build(),
92              configuration.getAddresses());
93        } else {
94          client = new MemcachedClient(configuration.getConnectionFactory(), configuration.getAddresses());
95        }
96      } catch (IOException e) {
97        String message = "Impossible to instantiate a new memecached client instance, see nested exceptions";
98        LOG.error(message, e);
99        throw new RuntimeException(message, e);
100     }
101 
102     if (LOG.isDebugEnabled()) {
103       LOG.debug("Running new Memcached client using " + configuration);
104     }
105   }
106 
107   /**
108    * Converts the MyBatis object key in the proper string representation.
109    *
110    * @param key
111    *          the MyBatis object key.
112    *
113    * @return the proper string representation.
114    */
115   private String toKeyString(final Object key) {
116     // issue #1, key too long
117     String keyString = configuration.getKeyPrefix() + StringUtils.sha1Hex(key.toString());
118     if (LOG.isDebugEnabled()) {
119       LOG.debug("Object key '" + key + "' converted in '" + keyString + "'");
120     }
121     return keyString;
122   }
123 
124   /**
125    * @param key
126    *
127    * @return
128    */
129   public Object getObject(Object key) {
130     String keyString = toKeyString(key);
131     Object ret = retrieve(keyString);
132 
133     if (LOG.isDebugEnabled()) {
134       LOG.debug("Retrived object (" + keyString + ", " + ret + ")");
135     }
136 
137     return ret;
138   }
139 
140   /**
141    * Return the stored group in Memcached identified by the specified key.
142    *
143    * @param groupKey
144    *          the group key.
145    *
146    * @return the group if was previously stored, null otherwise.
147    */
148   private ObjectWithCas getGroup(String groupKey) {
149     if (LOG.isDebugEnabled()) {
150       LOG.debug("Retrieving group with id '" + groupKey + "'");
151     }
152 
153     ObjectWithCas groups = null;
154     try {
155       groups = retrieveWithCas(groupKey);
156     } catch (Exception e) {
157       LOG.error("Impossible to retrieve group '" + groupKey + "' see nested exceptions", e);
158     }
159 
160     if (groups == null) {
161       if (LOG.isDebugEnabled()) {
162         LOG.debug("Group '" + groupKey + "' not previously stored");
163       }
164       return null;
165     }
166 
167     if (LOG.isDebugEnabled()) {
168       LOG.debug("retrieved group '" + groupKey + "' with values " + groups);
169     }
170 
171     return groups;
172   }
173 
174   /**
175    * @param keyString
176    *
177    * @return
178    *
179    * @throws Exception
180    */
181   private Object retrieve(final String keyString) {
182     Object retrieved = null;
183 
184     if (configuration.isUsingAsyncGet()) {
185       Future<Object> future;
186       if (configuration.isCompressionEnabled()) {
187         future = client.asyncGet(keyString, new CompressorTranscoder());
188       } else {
189         future = client.asyncGet(keyString);
190       }
191 
192       try {
193         retrieved = future.get(configuration.getTimeout(), configuration.getTimeUnit());
194       } catch (Exception e) {
195         future.cancel(false);
196         throw new CacheException(e);
197       }
198     } else {
199       if (configuration.isCompressionEnabled()) {
200         retrieved = client.get(keyString, new CompressorTranscoder());
201       } else {
202         retrieved = client.get(keyString);
203       }
204     }
205 
206     return retrieved;
207   }
208 
209   /**
210    * Retrieves an object along with its cas using the given key
211    *
212    * @param keyString
213    *
214    * @return
215    *
216    * @throws Exception
217    */
218   private ObjectWithCas retrieveWithCas(final String keyString) {
219     CASValue<Object> retrieved = null;
220 
221     if (configuration.isUsingAsyncGet()) {
222       Future<CASValue<Object>> future;
223       if (configuration.isCompressionEnabled()) {
224         future = client.asyncGets(keyString, new CompressorTranscoder());
225       } else {
226         future = client.asyncGets(keyString);
227       }
228 
229       try {
230         retrieved = future.get(configuration.getTimeout(), configuration.getTimeUnit());
231       } catch (Exception e) {
232         future.cancel(false);
233         throw new CacheException(e);
234       }
235     } else {
236       if (configuration.isCompressionEnabled()) {
237         retrieved = client.gets(keyString, new CompressorTranscoder());
238       } else {
239         retrieved = client.gets(keyString);
240       }
241     }
242 
243     if (retrieved == null) {
244       return null;
245     }
246 
247     return new ObjectWithCas(retrieved.getValue(), retrieved.getCas());
248   }
249 
250   @SuppressWarnings("unchecked")
251   public void putObject(Object key, Object value, String id) {
252     String keyString = toKeyString(key);
253     String groupKey = toKeyString(id);
254 
255     if (LOG.isDebugEnabled()) {
256       LOG.debug("Putting object (" + keyString + ", " + value + ")");
257     }
258 
259     storeInMemcached(keyString, value);
260 
261     // add namespace key into memcached
262     // Optimistic lock approach...
263     boolean jobDone = false;
264 
265     while (!jobDone) {
266       ObjectWithCas group = getGroup(groupKey);
267       Set<String> groupValues;
268 
269       if (group == null || group.getObject() == null) {
270         groupValues = new HashSet<String>();
271         groupValues.add(keyString);
272 
273         if (LOG.isDebugEnabled()) {
274           LOG.debug("Insert/Updating object (" + groupKey + ", " + groupValues + ")");
275         }
276 
277         jobDone = tryToAdd(groupKey, groupValues);
278       } else {
279         groupValues = (Set<String>) group.getObject();
280         groupValues.add(keyString);
281 
282         jobDone = storeInMemcached(groupKey, group);
283       }
284     }
285   }
286 
287   /**
288    * Stores an object identified by a key in Memcached.
289    *
290    * @param keyString
291    *          the object key
292    * @param value
293    *          the object has to be stored.
294    */
295   private void storeInMemcached(String keyString, Object value) {
296     if (value != null && !Serializable.class.isAssignableFrom(value.getClass())) {
297       throw new CacheException(
298           "Object of type '" + value.getClass().getName() + "' that's non-serializable is not supported by Memcached");
299     }
300 
301     if (configuration.isCompressionEnabled()) {
302       client.set(keyString, configuration.getExpiration(), value, new CompressorTranscoder());
303     } else {
304       client.set(keyString, configuration.getExpiration(), value);
305     }
306   }
307 
308   /**
309    * Tries to update an object value in memcached considering the cas validation.
310    * <p>
311    * Returns true if the object passed the cas validation and was modified.
312    *
313    * @param keyString
314    * @param value
315    *
316    * @return
317    */
318   private boolean storeInMemcached(String keyString, ObjectWithCas value) {
319     if (value != null && value.getObject() != null
320         && !Serializable.class.isAssignableFrom(value.getObject().getClass())) {
321       throw new CacheException("Object of type '" + value.getObject().getClass().getName()
322           + "' that's non-serializable is not supported by Memcached");
323     }
324 
325     CASResponse response;
326 
327     if (configuration.isCompressionEnabled()) {
328       response = client.cas(keyString, value.getCas(), value.getObject(), new CompressorTranscoder());
329     } else {
330       response = client.cas(keyString, value.getCas(), value.getObject());
331     }
332 
333     return (response.equals(CASResponse.OBSERVE_MODIFIED) || response.equals(CASResponse.OK));
334   }
335 
336   /**
337    * Tries to store an object identified by a key in Memcached.
338    * <p>
339    * Will fail if the object already exists.
340    *
341    * @param keyString
342    * @param value
343    *
344    * @return
345    */
346   private boolean tryToAdd(String keyString, Object value) {
347     if (value != null && !Serializable.class.isAssignableFrom(value.getClass())) {
348       throw new CacheException(
349           "Object of type '" + value.getClass().getName() + "' that's non-serializable is not supported by Memcached");
350     }
351 
352     boolean done;
353     OperationFuture<Boolean> result;
354 
355     if (configuration.isCompressionEnabled()) {
356       result = client.add(keyString, configuration.getExpiration(), value, new CompressorTranscoder());
357     } else {
358       result = client.add(keyString, configuration.getExpiration(), value);
359     }
360 
361     try {
362       done = result.get();
363     } catch (InterruptedException e) {
364       done = false;
365     } catch (ExecutionException e) {
366       done = false;
367     }
368 
369     return done;
370   }
371 
372   public Object removeObject(Object key) {
373     String keyString = toKeyString(key);
374 
375     if (LOG.isDebugEnabled()) {
376       LOG.debug("Removing object '" + keyString + "'");
377     }
378 
379     Object result = getObject(key);
380     if (result != null) {
381       client.delete(keyString);
382     }
383     return result;
384   }
385 
386   @SuppressWarnings("unchecked")
387   public void removeGroup(String id) {
388     String groupKey = toKeyString(id);
389 
390     // remove namespace key into memcached
391     // Optimistic lock approach...
392     boolean jobDone = false;
393 
394     while (!jobDone) {
395       ObjectWithCas group = getGroup(groupKey);
396       Set<String> groupValues;
397 
398       if (group == null || group.getObject() == null) {
399         if (LOG.isDebugEnabled()) {
400           LOG.debug("No need to flush cached entries for group '" + id + "' because is empty");
401         }
402         return;
403       }
404 
405       if (LOG.isDebugEnabled()) {
406         LOG.debug("Flushing keys: " + group);
407       }
408 
409       groupValues = (Set<String>) group.getObject();
410 
411       for (String key : groupValues) {
412         client.delete(key);
413       }
414 
415       if (LOG.isDebugEnabled()) {
416         LOG.debug("Flushing group: " + groupKey);
417       }
418 
419       groupValues = (Set<String>) group.getObject();
420       groupValues.clear();
421 
422       jobDone = storeInMemcached(groupKey, group);
423     }
424   }
425 
426   @Override
427   protected void finalize() throws Throwable {
428     client.shutdown(configuration.getTimeout(), configuration.getTimeUnit());
429     super.finalize();
430   }
431 
432 }