Files
rikako-note/spring/redisson/redisson.md

44 KiB
Raw Blame History

Redisson

配置

编程式配置

可以通过创建Config对象来显式配置redisson配置示例如下

Config config = new Config();
config.setTransportMode(TransportMode.EPOLL);
config.useClusterServers()
       // use "rediss://" for SSL connection
      .addNodeAddress("perredis://127.0.0.1:7181");

RedissonClient redisson = Redisson.create(config);

yml Configuration

也可以通过yml文件格式来对redisson进行配置

Config config = Config.fromYAML(new File("config-file.yaml"));  
RedissonClient redisson = Redisson.create(config);

可以通过config.toYAML方法将config转化为yaml格式

Config config = new Config();
// ... many settings are set here
String yamlFormat = config.toYAML();

yml格式中可以引入环境变量

singleServerConfig:
  address: "redis://127.0.0.1:${REDIS_PORT}"

Common Settings

如下设置用于配置config对象并且适用于各种redis模式

codec

默认值org.redisson.codec.Kryo5Codec

connectionListener

默认值null
connection listener当redisson连接到redis-server或与redis-server断开连接时被触发

nettyThreads

默认值32
redisson所有redis client共享的线程总数。netty thread用于redis相应的解码和命令的发送。

transportMode

默认值TransportMode.NIO
可选的值如下:

  • TransportMode.NIO默认
  • TransportMode.EPOLL
  • TransportMode.KQUEUE

threads

由Rtopic object listener所共享的线程数量

lockWatchdogTimeout

默认值30000
RLock watchdog timeout单位为ms。该参数仅当RLock在获取时没有指定leaseTimeout时使用。当watchdog没有延长持有锁时间到下一个watchdogTimeout间隔时当前watchdogTimeout到期后锁会过期。watchdog避免了由于客户端崩溃或其他原因造成一直持有锁的情况。

Mode

single instance mode

可以通过如下方式来配置单实例模式:

// connects to 127.0.0.1:6379 by default
RedissonClient redisson = Redisson.create();

Config config = new Config();
config.useSingleServer().setAddress("redis://myredisserver:6379");
RedissonClient redisson = Redisson.create(config);

yml配置单实例模式如下

singleServerConfig:
  idleConnectionTimeout: 10000
  connectTimeout: 10000
  timeout: 3000
  retryAttempts: 3
  retryInterval: 1500
  password: null
  subscriptionsPerConnection: 5
  clientName: null
  address: "redis://127.0.0.1:6379"
  subscriptionConnectionMinimumIdleSize: 1
  subscriptionConnectionPoolSize: 50
  connectionMinimumIdleSize: 24
  connectionPoolSize: 64
  database: 0
  dnsMonitoringInterval: 5000
threads: 16
nettyThreads: 32
codec: !<org.redisson.codec.Kryo5Codec> {}
transportMode: "NIO"

Operation Execution

redisson支持自动重试策略并且在每次尝试时都会发送命令。重试策略通过retryAttempts默认情况下为3、retryInterval默认情况下为1000ms来设置。多次尝试之间间隔retryInterval。
redisson实例是线程安全的如下是RAtomicLong对象的使用示例

RedissonClient client = Redisson.create(config);
RAtomicLong longObject = client.getAtomicLong('myLong');
// sync way
longObject.compareAndSet(3, 401);
// async way
RFuture<Boolean> result = longObject.compareAndSetAsync(3, 401);

RedissonReactiveClient client = Redisson.createReactive(config);
RAtomicLongReactive longObject = client.getAtomicLong('myLong');
// reactive way
Mono<Boolean> result = longObject.compareAndSet(3, 401);

RedissonRxClient client = Redisson.createRx(config);
RAtomicLongRx longObject= client.getAtomicLong("myLong");
// RxJava2 way
Flowable<Boolean result = longObject.compareAndSet(3, 401);

Async方式

大多数redisson object继承了异步接口可以调用异步方法实现异步操作如下

// RAtomicLong extends RAtomicLongAsync
RAtomicLongAsync longObject = client.getAtomicLong("myLong");
RFuture<Boolean> future = longObject.compareAndSetAsync(1, 401);

RFuture对象继承了Future接口和CompletionStage接口可以像CompleteableFuture一样使用

future.whenComplete((res, exception) -> {

    // handle both result and exception

});


// or
future.thenAccept(res -> {

    // handle result

}).exceptionally(exception -> {

    // handle exception

});

因该避免在future listener中使用同步方法这样可能会造成redis请求/相应处理时的错误,应使用如下方式执行:

future.whenCompleteAsync((res, exception) -> {

    // handle both result and exception

}, executor);


// or
future.thenAcceptAsync(res -> {

    // handle result

}, executor).exceptionallyAsync(exception -> {

    // handle exception

}, executor);

Redisson Object的公共操作

所有redisson object都实现了RObject和RExpiration接口使用示例如下

RObject object = redisson.get...()

object.sizeInMemory();

object.delete();

object.rename("newname");

object.isExists();

// catch expired event
object.addListener(new ExpiredObjectListener() {
   ...
});

// catch delete event
object.addListener(new DeletedObjectListener() {
   ...
});

redisson object的name属性即是在redis中的key

RMap map = redisson.getMap("mymap");
map.getName(); // = mymap

和redis key相关的所有操作都通过RKeys接口暴露使用示例如下

RKeys keys = redisson.getKeys();

Iterable<String> allKeys = keys.getKeys();

Iterable<String> foundedKeys = keys.getKeysByPattern('key*');

long numOfDeletedKeys = keys.delete("obj1", "obj2", "obj3");

long deletedKeysAmount = keys.deleteByPattern("test?");

String randomKey = keys.randomKey();

long keysAmount = keys.count();

keys.flushall();

keys.flushdb();

分布式对象

object holder

RBucket

RBucket的java实现类是一个hodler可以持有任何类型的java对象。RBucket的大小限制为512MB.
RBucket的使用示例如下所示

RBucket<AnyObject> bucket = redisson.getBucket("anyObject");

bucket.set(new AnyObject(1));
AnyObject obj = bucket.get();

bucket.trySet(new AnyObject(3));
bucket.compareAndSet(new AnyObject(4), new AnyObject(5));
bucket.getAndSet(new AnyObject(6));

RBuckets

可以通过RBuckets对象来操作多个RBucket对象RBuckets使用示例如下

RBuckets buckets = redisson.getBuckets();

// get all bucket values
Map<String, V> loadedBuckets = buckets.get("myBucket1", "myBucket2", "myBucket3");

Map<String, Object> map = new HashMap<>();
map.put("myBucket1", new MyObject());
map.put("myBucket2", new MyObject());

// sets all or nothing if some bucket is already exists
buckets.trySet(map);
// store all at once
buckets.set(map);

Binary Stream Holder

RBinaryStream类型的对象用于存储字节序列。RBinaryStream实现了RBucket接口并且其大小限制也是512MB.
RBinaryStream的使用示例如下

RBinaryStream stream = redisson.getBinaryStream("anyStream");

byte[] content = ...
stream.set(content);
stream.getAndSet(content);
stream.trySet(content);
stream.compareAndSet(oldContent, content);

RBinaryStream可以和InputStream与OutputStream混用使用如下

RBinaryStream stream = redisson.getBinaryStream("anyStream");

InputStream is = stream.getInputStream();
byte[] readBuffer = ...
is.read(readBuffer);

OutputStream os = stream.getOuputStream();
byte[] contentToWrite = ...
os.write(contentToWrite);

BitSet

RBitSet实现提供了和java中BitSet类似的api其大小限制是4 294 967 295 bits.
RBitSet使用如下所示

RBitSet set = redisson.getBitSet("simpleBitset");

set.set(0, true);
set.set(1812, false);

set.clear(0);

set.and("anotherBitset");
set.xor("anotherBitset");

AtomicLong

RAtomicLong实现提供了和java中AtomicLong类似的api其使用类似如下

RAtomicLong atomicLong = redisson.getAtomicLong("myAtomicLong");
atomicLong.set(3);
atomicLong.incrementAndGet();
atomicLong.get();

AtomicDouble

RAtomicDouble实现提供了和java中AtomicDouble类似的api其使用示例如下

RAtomicDouble atomicDouble = redisson.getAtomicDouble("myAtomicDouble");
atomicDouble.set(2.81);
atomicDouble.addAndGet(4.11);
atomicDouble.get();

Topic

RTopic实现提供了发布/订阅机制其允许订阅由同名RTopic对象发布的事件。
当重新连接到redis或redis错误切换之后listener会被重新订阅在重新订阅之前所有被发布的消息都会丢失。
RTopic的使用如下所示

RTopic topic = redisson.getTopic("myTopic");
int listenerId = topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
    @Override
    public void onMessage(String channel, SomeObject message) {
        //...
    }
});

// in other thread or JVM
RTopic topic = redisson.getTopic("myTopic");
long clientsReceivedMessage = topic.publish(new SomeObject());

Reliable Topic

RRelieableTopic在实现发布/订阅模式的情况下还实现了消息的可靠传输。当redis连接断开的情况下所有消息都会被存储当重新连接到redis时消息会被重新传送。
每个RReliableTopic对象实例都由一个watchdog当第一个listener注册之后watchdog就会被启用。当订阅者超过reliableTopicWatchdogTimeout之后且watchdog没有为其延续过期时间那么订阅将会超时。该机制是为了防止client长时间崩溃之后存储的消息持续增长
当重新连接到redis之后listener会被重新注册

RReliableTopic topic = redisson.getReliableTopic("anyTopic");
topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
    @Override
    public void onMessage(CharSequence channel, SomeObject message) {
        //...
    }
});

// in other thread or JVM
RReliableTopic topic = redisson.getReliableTopic("anyTopic");
long subscribersReceivedMessage = topic.publish(new SomeObject());

Topic Pattern

RPatternTopic允许订阅多个Rtopic在重新连接到redis或redis错误切换之后listener会被重新订阅。
Pattern使用如下

  • topic? subscribes to topic1, topicA ...
  • topic?_my subscribes to topic_my, topic123_my, topicTEST_my ...
  • topic[ae] subscribes to topica and topice only

使用示例如下:

// subscribe to all topics by `topic*` pattern
RPatternTopic patternTopic = redisson.getPatternTopic("topic*");
int listenerId = patternTopic.addListener(Message.class, new PatternMessageListener<Message>() {
    @Override
    public void onMessage(String pattern, String channel, Message msg) {
        //...
    }
});

Bloom Filter

RBloomFilter中最多含有2^32个bit。
在使用之前必须通过tryInit(expectedInsertions, falseProbability)来初始化capacity。
RBloomFilter使用示例如下

RBloomFilter<SomeObject> bloomFilter = redisson.getBloomFilter("sample");
// initialize bloom filter with 
// expectedInsertions = 55000000
// falseProbability = 0.03
bloomFilter.tryInit(55000000L, 0.03);

bloomFilter.add(new SomeObject("field1Value", "field2Value"));
bloomFilter.add(new SomeObject("field5Value", "field8Value"));

bloomFilter.contains(new SomeObject("field1Value", "field8Value"));
bloomFilter.count();

HyperLogLog

RHyperLogLog能以较低的空间维护大数量的项目计算其去重后的数量其使用如下所示

RHyperLogLog<Integer> log = redisson.getHyperLogLog("log");
log.add(1);
log.add(2);
log.add(3);

log.count();

LongAdder

RLongAdder提供了java中LongAdder的实现其在client端维护了LongAdder增加和减少的性能较AtomicLong来说都有极大的提升至多可提升12000倍。其使用如下所示

RLongAdder atomicLong = redisson.getLongAdder("myLongAdder");
atomicLong.add(12);
atomicLong.increment();
atomicLong.decrement();
atomicLong.sum();

当LongAdder不再使用之后应该调用destroy方法手动进行销毁

RLongAdder atomicLong = ...
atomicLong.destroy();

DoubleAdder

RDoubleAdder提供了java中DoubleAdder的分布式实现其性能相对于AtomicDouble也有很大提升。
其使用如下所示:

RLongDouble atomicDouble = redisson.getLongDouble("myLongDouble");
atomicDouble.add(12);
atomicDouble.increment();
atomicDouble.decrement();
atomicDouble.sum();

id generator

RIdGenerator实现允许产生唯一的id但其生成算法不是简单的递增而是在第一次请求时一系列id就已经被分配并且缓存到java端直到其用完。该方法能够减少和redis的通信次数产生id的速率比RAtomicLong快。
默认情况下allocate size是2000并且值从0开始。
RIdGenerator的使用如下所示

RIdGenerator generator = redisson.getIdGenerator("generator");

// Initialize with start value = 12 and allocation size = 20000
generator.tryInit(12, 20000);

long id = generator.nextId();

Json Object Holder

RJsonBucket类型用于存储json数据通过redis 的JSON.*命令来实现。Json数据通过JsonCodec来进行编码和解码。可用的实现是org.redisson.codec.JacksonCodec。

local cache

redisson提供了带有本地缓存的json object holder版本。 local cache用于加速读取操作从而较少网络请求次数其将整个json对象缓存在redisson侧执行读取操作比无缓存快45倍。

redission client method name local cache Ultra-fast read/write
getJsonBucket() open-source version false false
getJsonBucket() Redisson PRO version false true
getLocalCachedJsonBucket() Redisson PRO version true true

RJsonBucket使用如下所示

RJsonBucket<AnyObject> bucket = redisson.getJsonBucket("anyObject", new JacksonCodec<>(AnyObject.class));

bucket.set(new AnyObject(1));
AnyObject obj = bucket.get();

bucket.trySet(new AnyObject(3));
bucket.compareAndSet(new AnyObject(4), new AnyObject(5));
bucket.getAndSet(new AnyObject(6));

List<String> values = bucket.get(new JacksonCodec<>(new TypeReference<List<String>>() {}), "values");
long aa = bucket.arrayAppend("$.obj.values", "t3", "t4");

分布式集合

Map

基于Redis的分布式Map对象实现了ConcurrentMap接口。
如果Map主要用于读取数据或者像尽量避免网络请求可以使用带有local cache的map使用示例如下

RMap<String, SomeObject> map = redisson.getMap("anyMap");
SomeObject prevObject = map.put("123", new SomeObject());
SomeObject currentObject = map.putIfAbsent("323", new SomeObject());
SomeObject obj = map.remove("123");

// use fast* methods when previous value is not required
map.fastPut("a", new SomeObject());
map.fastPutIfAbsent("d", new SomeObject());
map.fastRemove("b");

RFuture<SomeObject> putAsyncFuture = map.putAsync("321");
RFuture<Void> fastPutAsyncFuture = map.fastPutAsync("321");

map.fastPutAsync("321", new SomeObject());
map.fastRemoveAsync("321");

RMap支持为每个key绑定一个lock/readwritelock/semaphore/countdownlatch使用如下

RMap<MyKey, MyValue> map = redisson.getMap("anyMap");
MyKey k = new MyKey();
RLock keyLock = map.getLock(k);
keyLock.lock();
try {
   MyValue v = map.get(k);
   // process value ...
} finally {
   keyLock.unlock();
}

RReadWriteLock rwLock = map.getReadWriteLock(k);
rwLock.readLock().lock();
try {
   MyValue v = map.get(k);
   // process value ...
} finally {
   keyLock.readLock().unlock();
}

Redisson支持不同的Map结构实现不同的实现主要包含如下三个特性

  • local cache: local cache通过将map entry缓存在redisson端来减少网络请求从而提升读操作的性能。拥有相同name的local cache实例会连接到相同的发布/订阅channel该channel用于在local cache实例之间交换更新/失效的事件。
  • eviction支持对每个map entry指定ttl和max idle time

如下是所有的RMap可用实现

RedissonClient method local cache eviction
getMap() false false
getMapCache() false true
getLocalCachedMap() true false

eviction

带有eviction功能的map实现了RMapCache接口并且实现了ConcurrentMap接口。
当前redis并不支持淘汰map entry的功能故而超时的map entry是通过EvictionScheduler来时不时进行清理的。其会一次性移除100条超时的entry。任务启动的时间会自动调整根据上次任务删除过期entry的数量时间会在5s到半小时之间变化。
建议在每个redisson实例中都使用具有相同名称的MapCache对象

RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap");
// or
RMapCache<String, SomeObject> map = redisson.getLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
// or
RMapCache<String, SomeObject> map = redisson.getClusteredLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
// or
RMapCache<String, SomeObject> map = redisson.getClusteredMapCache("anyMap");


// ttl = 10 minutes, 
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES);
// ttl = 10 minutes, maxIdleTime = 10 seconds
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES, 10, TimeUnit.SECONDS);

// ttl = 3 seconds
map.putIfAbsent("key2", new SomeObject(), 3, TimeUnit.SECONDS);
// ttl = 40 seconds, maxIdleTime = 10 seconds
map.putIfAbsent("key2", new SomeObject(), 40, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);

// if object is not used anymore
map.destroy();

local cache

带有local cache功能的map实现实现了RLocalCachedMap接口同时也实现了ConcurrentMap接口。
推荐在所有的redisson client中使用具有相同name的LocalCachedMap并且为所有具有相同名称的LocalCachedMap指定相同的LocalCachedMapOptions。
如下是在创建localcachedmap时可以指定的options

LocalCachedMapOptions options = LocalCachedMapOptions.defaults()

      // Defines whether to store a cache miss into the local cache.
      // Default value is false.
      .storeCacheMiss(false);

      // Defines store mode of cache data.
      // Follow options are available:
      // LOCALCACHE - store data in local cache only and use Redis only for data update/invalidation.
      // LOCALCACHE_REDIS - store data in both Redis and local cache.
      .storeMode(StoreMode.LOCALCACHE_REDIS)

      // Defines Cache provider used as local cache store.
      // Follow options are available:
      // REDISSON - uses Redisson own implementation
      // CAFFEINE - uses Caffeine implementation
      .cacheProvider(CacheProvider.REDISSON)

      // Defines local cache eviction policy.
      // Follow options are available:
      // LFU - Counts how often an item was requested. Those that are used least often are discarded first.
      // LRU - Discards the least recently used items first
      // SOFT - Uses weak references, entries are removed by GC
      // WEAK - Uses soft references, entries are removed by GC
      // NONE - No eviction
     .evictionPolicy(EvictionPolicy.NONE)

      // If cache size is 0 then local cache is unbounded.
     .cacheSize(1000)

      // Defines strategy for load missed local cache updates after Redis connection failure.
      //
      // Follow reconnection strategies are available:
      // CLEAR - Clear local cache if map instance has been disconnected for a while.
      // LOAD - Store invalidated entry hash in invalidation log for 10 minutes
      //        Cache keys for stored invalidated entry hashes will be removed 
      //        if LocalCachedMap instance has been disconnected less than 10 minutes
      //        or whole cache will be cleaned otherwise.
      // NONE - Default. No reconnection handling
     .reconnectionStrategy(ReconnectionStrategy.NONE)

      // Defines local cache synchronization strategy.
      //
      // Follow sync strategies are available:
      // INVALIDATE - Default. Invalidate cache entry across all LocalCachedMap instances on map entry change
      // UPDATE - Insert/update cache entry across all LocalCachedMap instances on map entry change
      // NONE - No synchronizations on map changes
     .syncStrategy(SyncStrategy.INVALIDATE)

      // time to live for each map entry in local cache
     .timeToLive(10000)
      // or
     .timeToLive(10, TimeUnit.SECONDS)

      // max idle time for each map entry in local cache
     .maxIdle(10000)
      // or
     .maxIdle(10, TimeUnit.SECONDS);

localcachedmap对象在不再使用之后应该调用destroy方法进行销毁

RLocalCachedMap<String, Integer> map = ...
map.destroy();

Map Persistence

redisson允许将map中的数据存储到除了redis以外的空间其通常用于和应用和数据库之间的缓存类似于spring cache。

read-through策略

当请求的条目在redisson map object中不存在时其会使用MapLoader对象来进行加载示例代码如下

MapLoader<String, String> mapLoader = new MapLoader<String, String>() {
            
            @Override
            public Iterable<String> loadAllKeys() {
                List<String> list = new ArrayList<String>();
                Statement statement = conn.createStatement();
                try {
                    ResultSet result = statement.executeQuery("SELECT id FROM student");
                    while (result.next()) {
                        list.add(result.getString(1));
                    }
                } finally {
                    statement.close();
                }

                return list;
            }
            
            @Override
            public String load(String key) {
                PreparedStatement preparedStatement = conn.prepareStatement("SELECT name FROM student where id = ?");
                try {
                    preparedStatement.setString(1, key);
                    ResultSet result = preparedStatement.executeQuery();
                    if (result.next()) {
                        return result.getString(1);
                    }
                    return null;
                } finally {
                    preparedStatement.close();
                }
            }
};

配置map的示例如下

MapOptions<K, V> options = MapOptions.<K, V>defaults()
                              .loader(mapLoader);

RMap<K, V> map = redisson.getMap("test", options);
// or
RMapCache<K, V> map = redisson.getMapCache("test", options);
// or with boost up to 45x times 
RLocalCachedMap<K, V> map = redisson.getLocalCachedMap("test", options);
// or with boost up to 45x times 
RLocalCachedMapCache<K, V> map = redisson.getLocalCachedMapCache("test", options);
write-through(sync)策略

如果map entry被更新那么更新方法不会返回直到通过MapWriter对象将entry更新写入到外部存储中

MapWriter<String, String> mapWriter = new MapWriter<String, String>() {
            
            @Override
            public void write(Map<String, String> map) {
                PreparedStatement preparedStatement = conn.prepareStatement("INSERT INTO student (id, name) values (?, ?)");
                try {
                    for (Entry<String, String> entry : map.entrySet()) {
                        preparedStatement.setString(1, entry.getKey());
                        preparedStatement.setString(2, entry.getValue());
                        preparedStatement.addBatch();
                    }
                    preparedStatement.executeBatch();
                } finally {
                    preparedStatement.close();
                }
            }
            
            @Override
            public void delete(Collection<String> keys) {
                PreparedStatement preparedStatement = conn.prepareStatement("DELETE FROM student where id = ?");
                try {
                    for (String key : keys) {
                        preparedStatement.setString(1, key);
                        preparedStatement.addBatch();
                    }
                    preparedStatement.executeBatch();
                } finally {
                    preparedStatement.close();
                }
            }
};

write-through策略配置如下

MapOptions<K, V> options = MapOptions.<K, V>defaults()
                              .writer(mapWriter)
                              .writeMode(WriteMode.WRITE_BEHIND)
                              .writeBehindDelay(5000)
                              .writeBehindBatchSize(100);

RMap<K, V> map = redisson.getMap("test", options);
// or
RMapCache<K, V> map = redisson.getMapCache("test", options);
// or with boost up to 45x times 
RLocalCachedMap<K, V> map = redisson.getLocalCachedMap("test", options);
// or with boost up to 45x times 
RLocalCachedMapCache<K, V> map = redisson.getLocalCachedMapCache("test", options);
write-behind策略(async)

在使用write-behind之后对于map object的修改是批量累计的并且按定义的延迟异步被MapWriter写入到外部存储中。
writeBehindDelay是批量写入或删除的延迟默认情况下值为1000ms。writeBehindBatchSize是batch的容量每个batch都包含写入和删除的命令集合默认情况下size是50 .
配置代码如下所示:

MapOptions<K, V> options = MapOptions.<K, V>defaults()
                              .writer(mapWriter)
                              .writeMode(WriteMode.WRITE_BEHIND)
                              .writeBehindDelay(5000)
                              .writeBehindBatchSize(100);

RMap<K, V> map = redisson.getMap("test", options);
// or
RMapCache<K, V> map = redisson.getMapCache("test", options);
// or with boost up to 45x times 
RLocalCachedMap<K, V> map = redisson.getLocalCachedMap("test", options);
// or with boost up to 45x times 
RLocalCachedMapCache<K, V> map = redisson.getLocalCachedMapCache("test", options);

Map Listener

Redisson允许为实现RMapCache接口的Map对象绑定监听器监听如下的map事件

  • entry创建org.redisson.api.map.event.EntryCreatedListener
  • entry过期org.redisson.api.map.event.EntryExpiredListener
  • entry移除org.redisson.api.map.event.EntryRemovedListener
  • entry更新org.redisson.api.map.event.EntryUpdatedListener

使用实例如下所示:

RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap");
// or
RMapCache<String, SomeObject> map = redisson.getLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
// or
RMapCache<String, SomeObject> map = redisson.getClusteredLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
// or
RMapCache<String, SomeObject> map = redisson.getClusteredMapCache("anyMap");


int updateListener = map.addListener(new EntryUpdatedListener<Integer, Integer>() {
     @Override
     public void onUpdated(EntryEvent<Integer, Integer> event) {
          event.getKey(); // key
          event.getValue() // new value
          event.getOldValue() // old value
          // ...
     }
});

int createListener = map.addListener(new EntryCreatedListener<Integer, Integer>() {
     @Override
     public void onCreated(EntryEvent<Integer, Integer> event) {
          event.getKey(); // key
          event.getValue() // value
          // ...
     }
});

int expireListener = map.addListener(new EntryExpiredListener<Integer, Integer>() {
     @Override
     public void onExpired(EntryEvent<Integer, Integer> event) {
          event.getKey(); // key
          event.getValue() // value
          // ...
     }
});

int removeListener = map.addListener(new EntryRemovedListener<Integer, Integer>() {
     @Override
     public void onRemoved(EntryEvent<Integer, Integer> event) {
          event.getKey(); // key
          event.getValue() // value
          // ...
     }
});

map.removeListener(updateListener);
map.removeListener(createListener);
map.removeListener(expireListener);
map.removeListener(removeListener);

LRU/LFU bounded Map

实现了RMapCache接口的Map对象支持按LRU或LFU顺序进行绑定。绑定了LRU或LFU的Map可以存储固定数量的entry并且按照LRU或LFU的顺序淘汰entry。

RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap");
// or
RMapCache<String, SomeObject> map = redisson.getLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
// or
RMapCache<String, SomeObject> map = redisson.getClusteredLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
// or
RMapCache<String, SomeObject> map = redisson.getClusteredMapCache("anyMap");


// tries to set limit map to 10 entries using LRU eviction algorithm
map.trySetMaxSize(10);
// ... using LFU eviction algorithm
map.trySetMaxSize(10, EvictionMode.LFU);

// set or change limit map to 10 entries using LRU eviction algorithm
map.setMaxSize(10);
// ... using LFU eviction algorithm
map.setMaxSize(10, EvictionMode.LFU);

map.put("1", "2");
map.put("3", "3", 1, TimeUnit.SECONDS);

MultiMap

基于Redis的MultiMap支持为一个key绑定多个value

Set based MultiMap

基于set的multimap实现对于相同的key不允许出现重复的value

RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("myMultimap");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("3"), new SimpleValue("4"));

Set<SimpleValue> allValues = map.get(new SimpleKey("0"));

List<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
Set<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);

Set<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));

List based MultiMap

基于List的MultiMap允许对相同的key存在多个相同value并且对同一个key其value按插入顺序存储

RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("3"), new SimpleValue("4"));

List<SimpleValue> allValues = map.get(new SimpleKey("0"));

Collection<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
List<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);

List<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));

MultiMap eviction

对于实现了MultimapCache接口的map对象分别支持基于list和基于set的evicition。
过期的entry清除由EvictionScheduler执行其一次性会移除100条过期的entry。Task启动时间会根据上次任务删除的条目数目自动进行调整时间间隔在1s和2h之间进行调整。
MultiMap eviction使用示例如下所示

RSetMultimapCache<String, String> multimap = redisson.getSetMultimapCache("myMultimap");
multimap.put("1", "a");
multimap.put("1", "b");
multimap.put("1", "c");

multimap.put("2", "e");
multimap.put("2", "f");

multimap.expireKey("2", 10, TimeUnit.MINUTES);

Set

基于redis的set object实现了java.util.set接口。
redis set的使用如下所示

RSet<SomeObject> set = redisson.getSet("anySet");
set.add(new SomeObject());
set.remove(new SomeObject());

根据是否支持数据淘汰,可以分为如下方法:

Redisson Client method Eviction Support
getSet() false
getSetCache() true

eviction

支持淘汰功能的Set对象实现了RSetCache接口当前redis尚不支持淘汰set中的值故而淘汰是由EvictionScheduler实现的其一次性移除300条entry根据上次task淘汰的key数量下次执行task的间隔在1s到1h之间调整。

RSetCache<SomeObject> set = redisson.getSetCache("mySet");
// or
RMapCache<SomeObject> set = redisson.getClusteredSetCache("mySet");

// ttl = 10 minutes, 
set.add(new SomeObject(), 10, TimeUnit.MINUTES);

// if object is not used anymore
set.destroy();

SortedSet

基于Redis的SortedSet实现了java.util.SortedSet接口其通过比较元素来确保元素的在set中的唯一性。对于String数据类型更推荐使用LexSortedSet其性能表现更好。
RSortedSet的使用示例如下所示

RSortedSet<Integer> set = redisson.getSortedSet("anySet");
set.trySetComparator(new MyComparator()); // set object comparator
set.add(3);
set.add(1);
set.add(2);

set.removeAsync(0);
set.addAsync(5);

ScoredSortedSet

基于redis的分布式ScoredSortedSet根据插入元素时的score来对元素进行排序。使用示例如下所示默认情况下插入元素按照score从高到低的顺寻进行排序

set.add(0.13, new SomeObject(a, b));
set.addAsync(0.251, new SomeObject(c, d));
set.add(0.302, new SomeObject(g, d));

set.pollFirst();
set.pollLast();

int index = set.rank(new SomeObject(g, d)); // get element index
Double score = set.getScore(new SomeObject(g, d)); // get element score

LexSortedSet

LexSortedSet只能用于存储String类型元素其实现了Set<String>接口其按字典顺序存储String元素。
LexSortedSet使用示例如下

RLexSortedSet set = redisson.getLexSortedSet("simple");
set.add("d");
set.addAsync("e");
set.add("f");

set.lexRangeTail("d", false);
set.lexCountHead("e");
set.lexRange("d", true, "z", false);

List

基于Redis的list实现了java.util.List接口其按照插入顺序来存储元素。
RList的使用如下

RList<SomeObject> list = redisson.getList("anyList");
list.add(new SomeObject());
list.get(0);
list.remove(new SomeObject());

Queue

基于Redis的Queue对象实现了java.util.Queue接口其使用示例如下所示

RQueue<SomeObject> queue = redisson.getQueue("anyQueue");
queue.add(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();

Deque

基于Redis的Deque实现了java.util.Deque接口其使用示例如下所示

RDeque<SomeObject> queue = redisson.getDeque("anyDeque");
queue.addFirst(new SomeObject());
queue.addLast(new SomeObject());
SomeObject obj = queue.removeFirst();
SomeObject someObj = queue.removeLast();

Blocking Queue

基于Redis的分布式无界BlockingQueue实现了java.concurrent.BlockingQueue接口。
RBlockingQueue的使用示例如下所示:

RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");

queue.offer(new SomeObject());

SomeObject obj = queue.peek();
SomeObject obj = queue.poll();
SomeObject obj = queue.poll(10, TimeUnit.MINUTES);

对于基于Redisson的BlockingQueue其poll/pollFromAny/pollLastAndOfferFirstTo/take方法其阻塞操作都是基于发布/订阅机制实现的当redisson重新连接到redisson-server之后会自动重新注册。

Bounded Blocking Queue

基于Redis的分布式有界BoundedBlockingQueue实现了java.util.concurrent.Blocking接口队列容量限制可以通过trySetCapacity方法来进行定义

RBoundedBlockingQueue<SomeObject> queue = redisson.getBoundedBlockingQueue("anyQueue");
// returns `true` if capacity set successfully and `false` if it already set.
queue.trySetCapacity(2);

queue.offer(new SomeObject(1));
queue.offer(new SomeObject(2));
// will be blocked until free space available in queue
queue.put(new SomeObject());

SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

Blocking Deque

基于Redis的BlockingDeque实现了java.util.BlockingDeque接口其使用如下所示

RBlockingDeque<Integer> deque = redisson.getBlockingDeque("anyDeque");
deque.putFirst(1);
deque.putLast(2);
Integer firstValue = queue.takeFirst();
Integer lastValue = queue.takeLast();
Integer firstValue = queue.pollFirst(10, TimeUnit.MINUTES);
Integer lastValue = queue.pollLast(3, TimeUnit.MINUTES);

Blocking Fair Queue

基于Redis的BlockingFairQueue实现了java.util.BlockingQueue接口。
通常情况下,如果存在多个消费者,有的消费者网络状况较好,有的消费者网络状况较差,那么网络较好的消费者将会比网络较差的消费者消费更多的消息。
而BlockingFairQueue者通过轮询等手段保证不同消费者之间的访问顺序从而使得不同消费者消费的消息数量是均匀的。
BlockingFairQueue的使用示例如下所示

RBlockingFairQueue queue = redisson.getBlockingFairQueue("myQueue");
queue.offer(new SomeObject());

SomeObject element = queue.peek();
SomeObject element = queue.poll();
SomeObject element = queue.poll(10, TimeUnit.MINUTES);
SomeObject element = queue.take();

该特性仅在redisson pro中可用。

Blocking Fair Deque

类似于BlockingFairQueue该特性仅在redisson pro中可用。

RBlockingFairDeque deque = redisson.getBlockingFairDeque("myDeque");
deque.offer(new SomeObject());

SomeObject firstElement = queue.peekFirst();
SomeObject firstElement = queue.pollFirst();
SomeObject firstElement = queue.pollFirst(10, TimeUnit.MINUTES);
SomeObject firstElement = queue.takeFirst();

SomeObject lastElement = queue.peekLast();
SomeObject lastElement = queue.pollLast();
SomeObject lastElement = queue.pollLast(10, TimeUnit.MINUTES);
SomeObject lastElement = queue.takeLast();

DelayedQueue

基于Redis的Delayed Queue允许将队列中的元素以固定的延迟转移到目标队列中。目标队列可以是任何实现了RQueue的队列。
DelayedQueue的使用示例如下

RBlockingQueue<String> distinationQueue = ...
RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue);
// move object to distinationQueue in 10 seconds
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// move object to distinationQueue in 1 minutes
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);


// msg1 will appear in 10 seconds
distinationQueue.poll(15, TimeUnit.SECONDS);

// msg2 will appear in 2 seconds
distinationQueue.poll(2, TimeUnit.SECONDS);

PriorityQueue

基于Redis的PriorityQueue实现了java.util.Queue接口元素按照Comparable或Comparator定义的顺序进行排序。

默认情况下PriorityQueue实现的是小根对可以自定义比较器来实现大根堆 tryComparator方法可以用于自定义比较器

public class Entry implements Comparable<Entry>, Serializable {

    private String key;
    private Integer value;

    public Entry(String key, Integer value) {
        this.key = key;
        this.value = value;
    }

    @Override
    public int compareTo(Entry o) {
        return key.compareTo(o.key);
    }

}

RPriorityQueue<Entry> queue = redisson.getPriorityQueue("anyQueue");
queue.add(new Entry("b", 1));
queue.add(new Entry("c", 1));
queue.add(new Entry("a", 1));

// Entry [a:1]
Entry e = queue.poll();
// Entry [b:1]
Entry e = queue.poll();
// Entry [c:1]
Entry e = queue.poll();

PriorityDeque

基于Redis的PriorityDeque实现了java.util.Deque接口其使用和PriorityQueue类似

public class Entry implements Comparable<Entry>, Serializable {

    private String key;
    private Integer value;

    public Entry(String key, Integer value) {
        this.key = key;
        this.value = value;
    }

    @Override
    public int compareTo(Entry o) {
        return key.compareTo(o.key);
    }

}

RPriorityDeque<Entry> queue = redisson.getPriorityDeque("anyQueue");
queue.add(new Entry("b", 1));
queue.add(new Entry("c", 1));
queue.add(new Entry("a", 1));

// Entry [a:1]
Entry e = queue.pollFirst();
// Entry [c:1]
Entry e = queue.pollLast();

PriorityBlockingQueue

基于PriorityBlockingQueue类似于java.concurrent.PriorityBlockingQueue其使用示例如下

public class Entry implements Comparable<Entry>, Serializable {

    private String key;
    private Integer value;

    public Entry(String key, Integer value) {
        this.key = key;
        this.value = value;
    }

    @Override
    public int compareTo(Entry o) {
        return key.compareTo(o.key);
    }

}

RPriorityBlockingQueue<Entry> queue = redisson.getPriorityBlockingQueue("anyQueue");
queue.add(new Entry("b", 1));
queue.add(new Entry("c", 1));
queue.add(new Entry("a", 1));

// Entry [a:1]
Entry e = queue.take();

PriorityBlockingDeque

基于Redis的PriorityBlockingDeque实现了java.util.concurrent.BlockingDeque接口其使用示例如下

public class Entry implements Comparable<Entry>, Serializable {

    private String key;
    private Integer value;

    public Entry(String key, Integer value) {
        this.key = key;
        this.value = value;
    }

    @Override
    public int compareTo(Entry o) {
        return key.compareTo(o.key);
    }

}

RPriorityBlockingDeque<Entry> queue = redisson.getPriorityBlockingDeque("anyQueue");
queue.add(new Entry("b", 1));
queue.add(new Entry("c", 1));
queue.add(new Entry("a", 1));

// Entry [a:1]
Entry e = queue.takeFirst();
// Entry [c:1]
Entry e = queue.takeLast();

Stream

基于Redis的Stream对象允许创建consumer groupconsumer group用于消费由生产者创建的消息。

RStream<String, String> stream = redisson.getStream("test");

StreamMessageId sm = stream.add(StreamAddArgs.entry("0", "0"));

stream.createGroup("testGroup");
        
StreamId id1 = stream.add(StreamAddArgs.entry("1", "1"));
StreamId id2 = stream.add(StreamAddArgs.entry("2", "2"));
        
Map<StreamId, Map<String, String>> group = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());
long amount = stream.ack("testGroup", id1, id2);

RingBuffer

基于Redis的RingBuffer实现了java.util.Queue如果当前队列已经满了那么会淘汰位于队列最前端的元素。
队列容量大小通过trySetCapacity方法进行设置RingBuffer的使用示例如下所示

RRingBuffer<Integer> buffer = redisson.getRingBuffer("test");

// buffer capacity is 4 elements
buffer.trySetCapacity(4);

buffer.add(1);
buffer.add(2);
buffer.add(3);
buffer.add(4);

// buffer state is 1, 2, 3, 4

buffer.add(5);
buffer.add(6);

// buffer state is 3, 4, 5, 6

Transfer Queue

基于Reids的TransferQueue实现了java.util.concurrent.TransferQueue接口提供了一系列的transfer方法仅当值被consumer成功处理之后才会返回。
对于TransferQueue其提供了一系列的transfer方法有阻塞版本和非阻塞版本。

  • tryTransfer会非阻塞的添加元素当存在消费者调用poll获取当前元素时该方法会将元素交给消费者否则返回false。
  • transfer会阻塞等待直到有消费者线程来获取该元素

当调用阻塞版本trasfer之后直到该transfer返回其他线程无法向TransferQueue中添加元素。

RTransferQueue<String> queue = redisson.getTransferQueue("myCountDownLatch");

queue.transfer("data");
// or try transfer immediately
queue.tryTransfer("data");
// or try transfer up to 10 seconds
queue.tryTransfer("data", 10, TimeUnit.SECONDS);

// in other thread or JVM

queue.take();
// or
queue.poll();

Time Series

基于Redis的TimeSeries允许按照Timestamp来存储值并且支持为entry设置ttl。
TimeSeries的使用如下所示

RTimeSeries<String> ts = redisson.getTimeSeries("myTimeSeries");

ts.add(201908110501, "10%");
ts.add(201908110502, "30%");
ts.add(201908110504, "10%");
ts.add(201908110508, "75%");

// entry time-to-live is 10 hours
ts.add(201908110510, "85%", 10, TimeUnit.HOURS);
ts.add(201908110510, "95%", 10, TimeUnit.HOURS);

String value = ts.get(201908110508);
ts.remove(201908110508);

Collection<String> values = ts.pollFirst(2);
Collection<String> range = ts.range(201908110501, 201908110508);