Files
rikako-note/spring/redisson/redisson.md

1590 lines
56 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

- [Redisson](#redisson)
- [配置](#配置)
- [编程式配置](#编程式配置)
- [yml Configuration](#yml-configuration)
- [Common Settings](#common-settings)
- [codec](#codec)
- [connectionListener](#connectionlistener)
- [nettyThreads](#nettythreads)
- [transportMode](#transportmode)
- [threads](#threads)
- [lockWatchdogTimeout](#lockwatchdogtimeout)
- [Mode](#mode)
- [single instance mode](#single-instance-mode)
- [Operation Execution](#operation-execution)
- [Async方式](#async方式)
- [Redisson Object的公共操作](#redisson-object的公共操作)
- [分布式对象](#分布式对象)
- [object holder](#object-holder)
- [RBucket](#rbucket)
- [RBuckets](#rbuckets)
- [Binary Stream Holder](#binary-stream-holder)
- [BitSet](#bitset)
- [AtomicLong](#atomiclong)
- [AtomicDouble](#atomicdouble)
- [Topic](#topic)
- [Reliable Topic](#reliable-topic)
- [Topic Pattern](#topic-pattern)
- [Bloom Filter](#bloom-filter)
- [HyperLogLog](#hyperloglog)
- [LongAdder](#longadder)
- [DoubleAdder](#doubleadder)
- [id generator](#id-generator)
- [Json Object Holder](#json-object-holder)
- [local cache](#local-cache)
- [分布式集合](#分布式集合)
- [Map](#map)
- [eviction](#eviction)
- [local cache](#local-cache-1)
- [Map Persistence](#map-persistence)
- [read-through策略](#read-through策略)
- [write-through(sync)策略](#write-throughsync策略)
- [write-behind策略(async)](#write-behind策略async)
- [Map Listener](#map-listener)
- [LRU/LFU bounded Map](#lrulfu-bounded-map)
- [MultiMap](#multimap)
- [Set based MultiMap](#set-based-multimap)
- [List based MultiMap](#list-based-multimap)
- [MultiMap eviction](#multimap-eviction)
- [Set](#set)
- [eviction](#eviction-1)
- [SortedSet](#sortedset)
- [ScoredSortedSet](#scoredsortedset)
- [LexSortedSet](#lexsortedset)
- [List](#list)
- [Queue](#queue)
- [Deque](#deque)
- [Blocking Queue](#blocking-queue)
- [Bounded Blocking Queue](#bounded-blocking-queue)
- [Blocking Deque](#blocking-deque)
- [Blocking Fair Queue](#blocking-fair-queue)
- [Blocking Fair Deque](#blocking-fair-deque)
- [DelayedQueue](#delayedqueue)
- [PriorityQueue](#priorityqueue)
- [PriorityDeque](#prioritydeque)
- [PriorityBlockingQueue](#priorityblockingqueue)
- [PriorityBlockingDeque](#priorityblockingdeque)
- [Stream](#stream)
- [RingBuffer](#ringbuffer)
- [Transfer Queue](#transfer-queue)
- [Time Series](#time-series)
- [分布式锁和synchronizer](#分布式锁和synchronizer)
- [Lock](#lock)
- [Fair Lock](#fair-lock)
- [MultiLock](#multilock)
- [ReadWriteLock](#readwritelock)
- [Semaphore](#semaphore)
- [PermitExpirableSemaphore](#permitexpirablesemaphore)
- [CountDownLatch](#countdownlatch)
- [SpinLock](#spinlock)
- [FencedLock](#fencedlock)
- [Redisson整合Spring Cache](#redisson整合spring-cache)
- [spring cache yaml config](#spring-cache-yaml-config)
# Redisson
## 配置
### 编程式配置
可以通过创建Config对象来显式配置redisson配置示例如下
```java
Config config = new Config();
config.setTransportMode(TransportMode.EPOLL);
config.useClusterServers()
// use "rediss://" for SSL connection
.addNodeAddress("perredis://127.0.0.1:7181");
RedissonClient redisson = Redisson.create(config);
```
### yml Configuration
也可以通过yml文件格式来对redisson进行配置
```java
Config config = Config.fromYAML(new File("config-file.yaml"));
RedissonClient redisson = Redisson.create(config);
```
可以通过config.toYAML方法将config转化为yaml格式
```java
Config config = new Config();
// ... many settings are set here
String yamlFormat = config.toYAML();
```
yml格式中可以引入环境变量
```yml
singleServerConfig:
address: "redis://127.0.0.1:${REDIS_PORT}"
```
### Common Settings
如下设置用于配置config对象并且适用于各种redis模式
#### codec
默认值org.redisson.codec.Kryo5Codec
#### connectionListener
默认值null
connection listener当redisson连接到redis-server或与redis-server断开连接时被触发
#### nettyThreads
默认值32
redisson所有redis client共享的线程总数。netty thread用于redis相应的解码和命令的发送。
#### transportMode
默认值TransportMode.NIO
可选的值如下:
- TransportMode.NIO默认
- TransportMode.EPOLL
- TransportMode.KQUEUE
#### threads
由Rtopic object listener所共享的线程数量
#### lockWatchdogTimeout
默认值30000
RLock watchdog timeout单位为ms。该参数仅当RLock在获取时没有指定leaseTimeout时使用。当watchdog没有延长持有锁时间到下一个watchdogTimeout间隔时当前watchdogTimeout到期后锁会过期。watchdog避免了由于客户端崩溃或其他原因造成一直持有锁的情况。
### Mode
#### single instance mode
可以通过如下方式来配置单实例模式:
```java
// connects to 127.0.0.1:6379 by default
RedissonClient redisson = Redisson.create();
Config config = new Config();
config.useSingleServer().setAddress("redis://myredisserver:6379");
RedissonClient redisson = Redisson.create(config);
```
yml配置单实例模式如下
```yaml
singleServerConfig:
idleConnectionTimeout: 10000
connectTimeout: 10000
timeout: 3000
retryAttempts: 3
retryInterval: 1500
password: null
subscriptionsPerConnection: 5
clientName: null
address: "redis://127.0.0.1:6379"
subscriptionConnectionMinimumIdleSize: 1
subscriptionConnectionPoolSize: 50
connectionMinimumIdleSize: 24
connectionPoolSize: 64
database: 0
dnsMonitoringInterval: 5000
threads: 16
nettyThreads: 32
codec: !<org.redisson.codec.Kryo5Codec> {}
transportMode: "NIO"
```
## Operation Execution
redisson支持自动重试策略并且在每次尝试时都会发送命令。重试策略通过retryAttempts默认情况下为3、retryInterval默认情况下为1000ms来设置。多次尝试之间间隔retryInterval。
redisson实例是线程安全的如下是RAtomicLong对象的使用示例
```java
RedissonClient client = Redisson.create(config);
RAtomicLong longObject = client.getAtomicLong('myLong');
// sync way
longObject.compareAndSet(3, 401);
// async way
RFuture<Boolean> result = longObject.compareAndSetAsync(3, 401);
RedissonReactiveClient client = Redisson.createReactive(config);
RAtomicLongReactive longObject = client.getAtomicLong('myLong');
// reactive way
Mono<Boolean> result = longObject.compareAndSet(3, 401);
RedissonRxClient client = Redisson.createRx(config);
RAtomicLongRx longObject= client.getAtomicLong("myLong");
// RxJava2 way
Flowable<Boolean result = longObject.compareAndSet(3, 401);
```
### Async方式
大多数redisson object继承了异步接口可以调用异步方法实现异步操作如下
```java
// RAtomicLong extends RAtomicLongAsync
RAtomicLongAsync longObject = client.getAtomicLong("myLong");
RFuture<Boolean> future = longObject.compareAndSetAsync(1, 401);
```
RFuture对象继承了Future接口和CompletionStage接口可以像CompleteableFuture一样使用
```java
future.whenComplete((res, exception) -> {
// handle both result and exception
});
// or
future.thenAccept(res -> {
// handle result
}).exceptionally(exception -> {
// handle exception
});
```
因该避免在future listener中使用同步方法这样可能会造成redis请求/相应处理时的错误,应使用如下方式执行:
```java
future.whenCompleteAsync((res, exception) -> {
// handle both result and exception
}, executor);
// or
future.thenAcceptAsync(res -> {
// handle result
}, executor).exceptionallyAsync(exception -> {
// handle exception
}, executor);
```
## Redisson Object的公共操作
所有redisson object都实现了RObject和RExpiration接口使用示例如下
```java
RObject object = redisson.get...()
object.sizeInMemory();
object.delete();
object.rename("newname");
object.isExists();
// catch expired event
object.addListener(new ExpiredObjectListener() {
...
});
// catch delete event
object.addListener(new DeletedObjectListener() {
...
});
```
redisson object的name属性即是在redis中的key
```java
RMap map = redisson.getMap("mymap");
map.getName(); // = mymap
```
和redis key相关的所有操作都通过RKeys接口暴露使用示例如下
```java
RKeys keys = redisson.getKeys();
Iterable<String> allKeys = keys.getKeys();
Iterable<String> foundedKeys = keys.getKeysByPattern('key*');
long numOfDeletedKeys = keys.delete("obj1", "obj2", "obj3");
long deletedKeysAmount = keys.deleteByPattern("test?");
String randomKey = keys.randomKey();
long keysAmount = keys.count();
keys.flushall();
keys.flushdb();
```
## 分布式对象
### object holder
#### RBucket
RBucket的java实现类是一个hodler可以持有任何类型的java对象。RBucket的大小限制为512MB.
RBucket的使用示例如下所示
```java
RBucket<AnyObject> bucket = redisson.getBucket("anyObject");
bucket.set(new AnyObject(1));
AnyObject obj = bucket.get();
bucket.trySet(new AnyObject(3));
bucket.compareAndSet(new AnyObject(4), new AnyObject(5));
bucket.getAndSet(new AnyObject(6));
```
#### RBuckets
可以通过RBuckets对象来操作多个RBucket对象RBuckets使用示例如下
```java
RBuckets buckets = redisson.getBuckets();
// get all bucket values
Map<String, V> loadedBuckets = buckets.get("myBucket1", "myBucket2", "myBucket3");
Map<String, Object> map = new HashMap<>();
map.put("myBucket1", new MyObject());
map.put("myBucket2", new MyObject());
// sets all or nothing if some bucket is already exists
buckets.trySet(map);
// store all at once
buckets.set(map);
```
### Binary Stream Holder
RBinaryStream类型的对象用于存储字节序列。RBinaryStream实现了RBucket接口并且其大小限制也是512MB.
RBinaryStream的使用示例如下
```java
RBinaryStream stream = redisson.getBinaryStream("anyStream");
byte[] content = ...
stream.set(content);
stream.getAndSet(content);
stream.trySet(content);
stream.compareAndSet(oldContent, content);
```
RBinaryStream可以和InputStream与OutputStream混用使用如下
```java
RBinaryStream stream = redisson.getBinaryStream("anyStream");
InputStream is = stream.getInputStream();
byte[] readBuffer = ...
is.read(readBuffer);
OutputStream os = stream.getOuputStream();
byte[] contentToWrite = ...
os.write(contentToWrite);
```
### BitSet
RBitSet实现提供了和java中BitSet类似的api其大小限制是4 294 967 295 bits.
RBitSet使用如下所示
```java
RBitSet set = redisson.getBitSet("simpleBitset");
set.set(0, true);
set.set(1812, false);
set.clear(0);
set.and("anotherBitset");
set.xor("anotherBitset");
```
### AtomicLong
RAtomicLong实现提供了和java中AtomicLong类似的api其使用类似如下
```java
RAtomicLong atomicLong = redisson.getAtomicLong("myAtomicLong");
atomicLong.set(3);
atomicLong.incrementAndGet();
atomicLong.get();
```
### AtomicDouble
RAtomicDouble实现提供了和java中AtomicDouble类似的api其使用示例如下
```java
RAtomicDouble atomicDouble = redisson.getAtomicDouble("myAtomicDouble");
atomicDouble.set(2.81);
atomicDouble.addAndGet(4.11);
atomicDouble.get();
```
### Topic
RTopic实现提供了发布/订阅机制其允许订阅由同名RTopic对象发布的事件。
当重新连接到redis或redis错误切换之后listener会被重新订阅在重新订阅之前所有被发布的消息都会丢失。
RTopic的使用如下所示
```java
RTopic topic = redisson.getTopic("myTopic");
int listenerId = topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
@Override
public void onMessage(String channel, SomeObject message) {
//...
}
});
// in other thread or JVM
RTopic topic = redisson.getTopic("myTopic");
long clientsReceivedMessage = topic.publish(new SomeObject());
```
### Reliable Topic
RRelieableTopic在实现发布/订阅模式的情况下还实现了消息的可靠传输。当redis连接断开的情况下所有消息都会被存储当重新连接到redis时消息会被重新传送。
每个RReliableTopic对象实例都由一个watchdog当第一个listener注册之后watchdog就会被启用。当订阅者超过reliableTopicWatchdogTimeout之后且watchdog没有为其延续过期时间那么订阅将会超时。该机制是为了防止client长时间崩溃之后存储的消息持续增长
**当重新连接到redis之后listener会被重新注册**
```java
RReliableTopic topic = redisson.getReliableTopic("anyTopic");
topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
@Override
public void onMessage(CharSequence channel, SomeObject message) {
//...
}
});
// in other thread or JVM
RReliableTopic topic = redisson.getReliableTopic("anyTopic");
long subscribersReceivedMessage = topic.publish(new SomeObject());
```
### Topic Pattern
RPatternTopic允许订阅多个Rtopic在重新连接到redis或redis错误切换之后listener会被重新订阅。
Pattern使用如下
- topic? subscribes to topic1, topicA ...
- topic?_my subscribes to topic_my, topic123_my, topicTEST_my ...
- topic[ae] subscribes to topica and topice only
使用示例如下:
```java
// subscribe to all topics by `topic*` pattern
RPatternTopic patternTopic = redisson.getPatternTopic("topic*");
int listenerId = patternTopic.addListener(Message.class, new PatternMessageListener<Message>() {
@Override
public void onMessage(String pattern, String channel, Message msg) {
//...
}
});
```
### Bloom Filter
RBloomFilter中最多含有2^32个bit。
在使用之前必须通过tryInit(expectedInsertions, falseProbability)来初始化capacity。
RBloomFilter使用示例如下
```java
RBloomFilter<SomeObject> bloomFilter = redisson.getBloomFilter("sample");
// initialize bloom filter with
// expectedInsertions = 55000000
// falseProbability = 0.03
bloomFilter.tryInit(55000000L, 0.03);
bloomFilter.add(new SomeObject("field1Value", "field2Value"));
bloomFilter.add(new SomeObject("field5Value", "field8Value"));
bloomFilter.contains(new SomeObject("field1Value", "field8Value"));
bloomFilter.count();
```
### HyperLogLog
RHyperLogLog能以较低的空间维护大数量的项目计算其去重后的数量其使用如下所示
```java
RHyperLogLog<Integer> log = redisson.getHyperLogLog("log");
log.add(1);
log.add(2);
log.add(3);
log.count();
```
### LongAdder
RLongAdder提供了java中LongAdder的实现其在client端维护了LongAdder增加和减少的性能较AtomicLong来说都有极大的提升至多可提升12000倍。其使用如下所示
```java
RLongAdder atomicLong = redisson.getLongAdder("myLongAdder");
atomicLong.add(12);
atomicLong.increment();
atomicLong.decrement();
atomicLong.sum();
```
当LongAdder不再使用之后应该调用destroy方法手动进行销毁
```java
RLongAdder atomicLong = ...
atomicLong.destroy();
```
### DoubleAdder
RDoubleAdder提供了java中DoubleAdder的分布式实现其性能相对于AtomicDouble也有很大提升。
其使用如下所示:
```java
RLongDouble atomicDouble = redisson.getLongDouble("myLongDouble");
atomicDouble.add(12);
atomicDouble.increment();
atomicDouble.decrement();
atomicDouble.sum();
```
### id generator
RIdGenerator实现允许产生唯一的id但其生成算法不是简单的递增而是在第一次请求时一系列id就已经被分配并且缓存到java端直到其用完。该方法能够减少和redis的通信次数产生id的速率比RAtomicLong快。
默认情况下allocate size是2000并且值从0开始。
RIdGenerator的使用如下所示
```java
RIdGenerator generator = redisson.getIdGenerator("generator");
// Initialize with start value = 12 and allocation size = 20000
generator.tryInit(12, 20000);
long id = generator.nextId();
```
### Json Object Holder
RJsonBucket类型用于存储json数据通过redis 的JSON.*命令来实现。Json数据通过JsonCodec来进行编码和解码。可用的实现是org.redisson.codec.JacksonCodec。
#### local cache
redisson提供了带有本地缓存的json object holder版本。
local cache用于加速读取操作从而较少网络请求次数其将整个json对象缓存在redisson侧执行读取操作比无缓存快45倍。
| redission client method name | local cache | Ultra-fast read/write |
| :-: | :-: | :-: |
| getJsonBucket() open-source version | false | false |
| getJsonBucket() Redisson PRO version | false | true |
| getLocalCachedJsonBucket() Redisson PRO version | true | true |
RJsonBucket使用如下所示
```java
RJsonBucket<AnyObject> bucket = redisson.getJsonBucket("anyObject", new JacksonCodec<>(AnyObject.class));
bucket.set(new AnyObject(1));
AnyObject obj = bucket.get();
bucket.trySet(new AnyObject(3));
bucket.compareAndSet(new AnyObject(4), new AnyObject(5));
bucket.getAndSet(new AnyObject(6));
List<String> values = bucket.get(new JacksonCodec<>(new TypeReference<List<String>>() {}), "values");
long aa = bucket.arrayAppend("$.obj.values", "t3", "t4");
```
## 分布式集合
### Map
基于Redis的分布式Map对象实现了ConcurrentMap接口。
如果Map主要用于读取数据或者像尽量避免网络请求可以使用带有local cache的map使用示例如下
```java
RMap<String, SomeObject> map = redisson.getMap("anyMap");
SomeObject prevObject = map.put("123", new SomeObject());
SomeObject currentObject = map.putIfAbsent("323", new SomeObject());
SomeObject obj = map.remove("123");
// use fast* methods when previous value is not required
map.fastPut("a", new SomeObject());
map.fastPutIfAbsent("d", new SomeObject());
map.fastRemove("b");
RFuture<SomeObject> putAsyncFuture = map.putAsync("321");
RFuture<Void> fastPutAsyncFuture = map.fastPutAsync("321");
map.fastPutAsync("321", new SomeObject());
map.fastRemoveAsync("321");
```
RMap支持为每个key绑定一个lock/readwritelock/semaphore/countdownlatch使用如下
```java
RMap<MyKey, MyValue> map = redisson.getMap("anyMap");
MyKey k = new MyKey();
RLock keyLock = map.getLock(k);
keyLock.lock();
try {
MyValue v = map.get(k);
// process value ...
} finally {
keyLock.unlock();
}
RReadWriteLock rwLock = map.getReadWriteLock(k);
rwLock.readLock().lock();
try {
MyValue v = map.get(k);
// process value ...
} finally {
keyLock.readLock().unlock();
}
```
Redisson支持不同的Map结构实现不同的实现主要包含如下三个特性
- local cache: local cache通过将map entry缓存在redisson端来减少网络请求从而提升读操作的性能。拥有相同name的local cache实例会连接到相同的发布/订阅channel该channel用于在local cache实例之间交换更新/失效的事件。
- eviction支持对每个map entry指定ttl和max idle time
如下是所有的RMap可用实现
| RedissonClient method | local cache | eviction |
| :-: | :-: | :-: |
| getMap() | false | false |
| getMapCache() | false | true |
| getLocalCachedMap() | true | false |
#### eviction
带有eviction功能的map实现了RMapCache接口并且实现了ConcurrentMap接口。
当前redis并不支持淘汰map entry的功能故而超时的map entry是通过EvictionScheduler来时不时进行清理的。其会一次性移除100条超时的entry。任务启动的时间会自动调整根据上次任务删除过期entry的数量时间会在5s到半小时之间变化。
建议在每个redisson实例中都使用具有相同名称的MapCache对象
```java
RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap");
// or
RMapCache<String, SomeObject> map = redisson.getLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
// or
RMapCache<String, SomeObject> map = redisson.getClusteredLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
// or
RMapCache<String, SomeObject> map = redisson.getClusteredMapCache("anyMap");
// ttl = 10 minutes,
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES);
// ttl = 10 minutes, maxIdleTime = 10 seconds
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES, 10, TimeUnit.SECONDS);
// ttl = 3 seconds
map.putIfAbsent("key2", new SomeObject(), 3, TimeUnit.SECONDS);
// ttl = 40 seconds, maxIdleTime = 10 seconds
map.putIfAbsent("key2", new SomeObject(), 40, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);
// if object is not used anymore
map.destroy();
```
#### local cache
带有local cache功能的map实现实现了RLocalCachedMap接口同时也实现了ConcurrentMap接口。
推荐在所有的redisson client中使用具有相同name的LocalCachedMap并且为所有具有相同名称的LocalCachedMap指定相同的LocalCachedMapOptions。
如下是在创建localcachedmap时可以指定的options
```java
LocalCachedMapOptions options = LocalCachedMapOptions.defaults()
// Defines whether to store a cache miss into the local cache.
// Default value is false.
.storeCacheMiss(false);
// Defines store mode of cache data.
// Follow options are available:
// LOCALCACHE - store data in local cache only and use Redis only for data update/invalidation.
// LOCALCACHE_REDIS - store data in both Redis and local cache.
.storeMode(StoreMode.LOCALCACHE_REDIS)
// Defines Cache provider used as local cache store.
// Follow options are available:
// REDISSON - uses Redisson own implementation
// CAFFEINE - uses Caffeine implementation
.cacheProvider(CacheProvider.REDISSON)
// Defines local cache eviction policy.
// Follow options are available:
// LFU - Counts how often an item was requested. Those that are used least often are discarded first.
// LRU - Discards the least recently used items first
// SOFT - Uses weak references, entries are removed by GC
// WEAK - Uses soft references, entries are removed by GC
// NONE - No eviction
.evictionPolicy(EvictionPolicy.NONE)
// If cache size is 0 then local cache is unbounded.
.cacheSize(1000)
// Defines strategy for load missed local cache updates after Redis connection failure.
//
// Follow reconnection strategies are available:
// CLEAR - Clear local cache if map instance has been disconnected for a while.
// LOAD - Store invalidated entry hash in invalidation log for 10 minutes
// Cache keys for stored invalidated entry hashes will be removed
// if LocalCachedMap instance has been disconnected less than 10 minutes
// or whole cache will be cleaned otherwise.
// NONE - Default. No reconnection handling
.reconnectionStrategy(ReconnectionStrategy.NONE)
// Defines local cache synchronization strategy.
//
// Follow sync strategies are available:
// INVALIDATE - Default. Invalidate cache entry across all LocalCachedMap instances on map entry change
// UPDATE - Insert/update cache entry across all LocalCachedMap instances on map entry change
// NONE - No synchronizations on map changes
.syncStrategy(SyncStrategy.INVALIDATE)
// time to live for each map entry in local cache
.timeToLive(10000)
// or
.timeToLive(10, TimeUnit.SECONDS)
// max idle time for each map entry in local cache
.maxIdle(10000)
// or
.maxIdle(10, TimeUnit.SECONDS);
```
localcachedmap对象在不再使用之后应该调用destroy方法进行销毁
```java
RLocalCachedMap<String, Integer> map = ...
map.destroy();
```
#### Map Persistence
redisson允许将map中的数据存储到除了redis以外的空间其通常用于和应用和数据库之间的缓存类似于spring cache。
##### read-through策略
当请求的条目在redisson map object中不存在时其会使用MapLoader对象来进行加载示例代码如下
```java
MapLoader<String, String> mapLoader = new MapLoader<String, String>() {
@Override
public Iterable<String> loadAllKeys() {
List<String> list = new ArrayList<String>();
Statement statement = conn.createStatement();
try {
ResultSet result = statement.executeQuery("SELECT id FROM student");
while (result.next()) {
list.add(result.getString(1));
}
} finally {
statement.close();
}
return list;
}
@Override
public String load(String key) {
PreparedStatement preparedStatement = conn.prepareStatement("SELECT name FROM student where id = ?");
try {
preparedStatement.setString(1, key);
ResultSet result = preparedStatement.executeQuery();
if (result.next()) {
return result.getString(1);
}
return null;
} finally {
preparedStatement.close();
}
}
};
```
配置map的示例如下
```java
MapOptions<K, V> options = MapOptions.<K, V>defaults()
.loader(mapLoader);
RMap<K, V> map = redisson.getMap("test", options);
// or
RMapCache<K, V> map = redisson.getMapCache("test", options);
// or with boost up to 45x times
RLocalCachedMap<K, V> map = redisson.getLocalCachedMap("test", options);
// or with boost up to 45x times
RLocalCachedMapCache<K, V> map = redisson.getLocalCachedMapCache("test", options);
```
##### write-through(sync)策略
如果map entry被更新那么更新方法不会返回直到通过MapWriter对象将entry更新写入到外部存储中
```java
MapWriter<String, String> mapWriter = new MapWriter<String, String>() {
@Override
public void write(Map<String, String> map) {
PreparedStatement preparedStatement = conn.prepareStatement("INSERT INTO student (id, name) values (?, ?)");
try {
for (Entry<String, String> entry : map.entrySet()) {
preparedStatement.setString(1, entry.getKey());
preparedStatement.setString(2, entry.getValue());
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
} finally {
preparedStatement.close();
}
}
@Override
public void delete(Collection<String> keys) {
PreparedStatement preparedStatement = conn.prepareStatement("DELETE FROM student where id = ?");
try {
for (String key : keys) {
preparedStatement.setString(1, key);
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
} finally {
preparedStatement.close();
}
}
};
```
write-through策略配置如下
```java
MapOptions<K, V> options = MapOptions.<K, V>defaults()
.writer(mapWriter)
.writeMode(WriteMode.WRITE_BEHIND)
.writeBehindDelay(5000)
.writeBehindBatchSize(100);
RMap<K, V> map = redisson.getMap("test", options);
// or
RMapCache<K, V> map = redisson.getMapCache("test", options);
// or with boost up to 45x times
RLocalCachedMap<K, V> map = redisson.getLocalCachedMap("test", options);
// or with boost up to 45x times
RLocalCachedMapCache<K, V> map = redisson.getLocalCachedMapCache("test", options);
```
##### write-behind策略(async)
在使用write-behind之后对于map object的修改是批量累计的并且按定义的延迟异步被MapWriter写入到外部存储中。
writeBehindDelay是批量写入或删除的延迟默认情况下值为1000ms。writeBehindBatchSize是batch的容量每个batch都包含写入和删除的命令集合默认情况下size是50 .
配置代码如下所示:
```java
MapOptions<K, V> options = MapOptions.<K, V>defaults()
.writer(mapWriter)
.writeMode(WriteMode.WRITE_BEHIND)
.writeBehindDelay(5000)
.writeBehindBatchSize(100);
RMap<K, V> map = redisson.getMap("test", options);
// or
RMapCache<K, V> map = redisson.getMapCache("test", options);
// or with boost up to 45x times
RLocalCachedMap<K, V> map = redisson.getLocalCachedMap("test", options);
// or with boost up to 45x times
RLocalCachedMapCache<K, V> map = redisson.getLocalCachedMapCache("test", options);
```
#### Map Listener
Redisson允许为实现RMapCache接口的Map对象绑定监听器监听如下的map事件
- entry创建org.redisson.api.map.event.EntryCreatedListener
- entry过期org.redisson.api.map.event.EntryExpiredListener
- entry移除org.redisson.api.map.event.EntryRemovedListener
- entry更新org.redisson.api.map.event.EntryUpdatedListener
使用实例如下所示:
```java
RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap");
// or
RMapCache<String, SomeObject> map = redisson.getLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
// or
RMapCache<String, SomeObject> map = redisson.getClusteredLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
// or
RMapCache<String, SomeObject> map = redisson.getClusteredMapCache("anyMap");
int updateListener = map.addListener(new EntryUpdatedListener<Integer, Integer>() {
@Override
public void onUpdated(EntryEvent<Integer, Integer> event) {
event.getKey(); // key
event.getValue() // new value
event.getOldValue() // old value
// ...
}
});
int createListener = map.addListener(new EntryCreatedListener<Integer, Integer>() {
@Override
public void onCreated(EntryEvent<Integer, Integer> event) {
event.getKey(); // key
event.getValue() // value
// ...
}
});
int expireListener = map.addListener(new EntryExpiredListener<Integer, Integer>() {
@Override
public void onExpired(EntryEvent<Integer, Integer> event) {
event.getKey(); // key
event.getValue() // value
// ...
}
});
int removeListener = map.addListener(new EntryRemovedListener<Integer, Integer>() {
@Override
public void onRemoved(EntryEvent<Integer, Integer> event) {
event.getKey(); // key
event.getValue() // value
// ...
}
});
map.removeListener(updateListener);
map.removeListener(createListener);
map.removeListener(expireListener);
map.removeListener(removeListener);
```
#### LRU/LFU bounded Map
实现了RMapCache接口的Map对象支持按LRU或LFU顺序进行绑定。绑定了LRU或LFU的Map可以存储固定数量的entry并且按照LRU或LFU的顺序淘汰entry。
```java
RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap");
// or
RMapCache<String, SomeObject> map = redisson.getLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
// or
RMapCache<String, SomeObject> map = redisson.getClusteredLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
// or
RMapCache<String, SomeObject> map = redisson.getClusteredMapCache("anyMap");
// tries to set limit map to 10 entries using LRU eviction algorithm
map.trySetMaxSize(10);
// ... using LFU eviction algorithm
map.trySetMaxSize(10, EvictionMode.LFU);
// set or change limit map to 10 entries using LRU eviction algorithm
map.setMaxSize(10);
// ... using LFU eviction algorithm
map.setMaxSize(10, EvictionMode.LFU);
map.put("1", "2");
map.put("3", "3", 1, TimeUnit.SECONDS);
```
### MultiMap
基于Redis的MultiMap支持为一个key绑定多个value
#### Set based MultiMap
基于set的multimap实现对于相同的key不允许出现重复的value
```java
RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("myMultimap");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("3"), new SimpleValue("4"));
Set<SimpleValue> allValues = map.get(new SimpleKey("0"));
List<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
Set<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);
Set<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));
```
#### List based MultiMap
基于List的MultiMap允许对相同的key存在多个相同value并且对同一个key其value按插入顺序存储
```java
RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("3"), new SimpleValue("4"));
List<SimpleValue> allValues = map.get(new SimpleKey("0"));
Collection<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
List<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);
List<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));
```
#### MultiMap eviction
对于实现了MultimapCache接口的map对象分别支持基于list和基于set的evicition。
过期的entry清除由EvictionScheduler执行其一次性会移除100条过期的entry。Task启动时间会根据上次任务删除的条目数目自动进行调整时间间隔在1s和2h之间进行调整。
MultiMap eviction使用示例如下所示
```java
RSetMultimapCache<String, String> multimap = redisson.getSetMultimapCache("myMultimap");
multimap.put("1", "a");
multimap.put("1", "b");
multimap.put("1", "c");
multimap.put("2", "e");
multimap.put("2", "f");
multimap.expireKey("2", 10, TimeUnit.MINUTES);
```
### Set
基于redis的set object实现了java.util.set接口。
redis set的使用如下所示
```java
RSet<SomeObject> set = redisson.getSet("anySet");
set.add(new SomeObject());
set.remove(new SomeObject());
```
根据是否支持数据淘汰,可以分为如下方法:
| Redisson Client method | Eviction Support |
| :-: | :-: |
| getSet() | false |
|getSetCache() | true |
#### eviction
支持淘汰功能的Set对象实现了RSetCache接口当前redis尚不支持淘汰set中的值故而淘汰是由EvictionScheduler实现的其一次性移除300条entry根据上次task淘汰的key数量下次执行task的间隔在1s到1h之间调整。
```java
RSetCache<SomeObject> set = redisson.getSetCache("mySet");
// or
RMapCache<SomeObject> set = redisson.getClusteredSetCache("mySet");
// ttl = 10 minutes,
set.add(new SomeObject(), 10, TimeUnit.MINUTES);
// if object is not used anymore
set.destroy();
```
#### SortedSet
基于Redis的SortedSet实现了java.util.SortedSet接口其通过比较元素来确保元素的在set中的唯一性。对于String数据类型更推荐使用LexSortedSet其性能表现更好。
RSortedSet的使用示例如下所示
```java
RSortedSet<Integer> set = redisson.getSortedSet("anySet");
set.trySetComparator(new MyComparator()); // set object comparator
set.add(3);
set.add(1);
set.add(2);
set.removeAsync(0);
set.addAsync(5);
```
#### ScoredSortedSet
基于redis的分布式ScoredSortedSet根据插入元素时的score来对元素进行排序。使用示例如下所示默认情况下插入元素按照score从高到低的顺寻进行排序
```java
set.add(0.13, new SomeObject(a, b));
set.addAsync(0.251, new SomeObject(c, d));
set.add(0.302, new SomeObject(g, d));
set.pollFirst();
set.pollLast();
int index = set.rank(new SomeObject(g, d)); // get element index
Double score = set.getScore(new SomeObject(g, d)); // get element score
```
#### LexSortedSet
LexSortedSet只能用于存储String类型元素其实现了Set\<String\>接口其按字典顺序存储String元素。
LexSortedSet使用示例如下
```java
RLexSortedSet set = redisson.getLexSortedSet("simple");
set.add("d");
set.addAsync("e");
set.add("f");
set.lexRangeTail("d", false);
set.lexCountHead("e");
set.lexRange("d", true, "z", false);
```
### List
基于Redis的list实现了java.util.List接口其按照插入顺序来存储元素。
RList的使用如下
```java
RList<SomeObject> list = redisson.getList("anyList");
list.add(new SomeObject());
list.get(0);
list.remove(new SomeObject());
```
### Queue
基于Redis的Queue对象实现了java.util.Queue接口其使用示例如下所示
```java
RQueue<SomeObject> queue = redisson.getQueue("anyQueue");
queue.add(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
```
### Deque
基于Redis的Deque实现了java.util.Deque接口其使用示例如下所示
```java
RDeque<SomeObject> queue = redisson.getDeque("anyDeque");
queue.addFirst(new SomeObject());
queue.addLast(new SomeObject());
SomeObject obj = queue.removeFirst();
SomeObject someObj = queue.removeLast();
```
### Blocking Queue
基于Redis的分布式**无界**BlockingQueue实现了java.concurrent.BlockingQueue接口。
RBlockingQueue的使用示例如下所示:
```java
RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");
queue.offer(new SomeObject());
SomeObject obj = queue.peek();
SomeObject obj = queue.poll();
SomeObject obj = queue.poll(10, TimeUnit.MINUTES);
```
对于基于Redisson的BlockingQueue其poll/pollFromAny/pollLastAndOfferFirstTo/take方法其阻塞操作都是基于发布/订阅机制实现的当redisson重新连接到redisson-server之后会自动重新注册。
### Bounded Blocking Queue
基于Redis的分布式**有界**BoundedBlockingQueue实现了java.util.concurrent.Blocking接口队列容量限制可以通过trySetCapacity方法来进行定义
```java
RBoundedBlockingQueue<SomeObject> queue = redisson.getBoundedBlockingQueue("anyQueue");
// returns `true` if capacity set successfully and `false` if it already set.
queue.trySetCapacity(2);
queue.offer(new SomeObject(1));
queue.offer(new SomeObject(2));
// will be blocked until free space available in queue
queue.put(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);
```
### Blocking Deque
基于Redis的BlockingDeque实现了java.util.BlockingDeque接口其使用如下所示
```java
RBlockingDeque<Integer> deque = redisson.getBlockingDeque("anyDeque");
deque.putFirst(1);
deque.putLast(2);
Integer firstValue = queue.takeFirst();
Integer lastValue = queue.takeLast();
Integer firstValue = queue.pollFirst(10, TimeUnit.MINUTES);
Integer lastValue = queue.pollLast(3, TimeUnit.MINUTES);
```
### Blocking Fair Queue
基于Redis的BlockingFairQueue实现了java.util.BlockingQueue接口。
通常情况下,如果存在多个消费者,有的消费者网络状况较好,有的消费者网络状况较差,那么网络较好的消费者将会比网络较差的消费者消费更多的消息。
而BlockingFairQueue者通过轮询等手段保证不同消费者之间的访问顺序从而使得不同消费者消费的消息数量是均匀的。
BlockingFairQueue的使用示例如下所示
```java
RBlockingFairQueue queue = redisson.getBlockingFairQueue("myQueue");
queue.offer(new SomeObject());
SomeObject element = queue.peek();
SomeObject element = queue.poll();
SomeObject element = queue.poll(10, TimeUnit.MINUTES);
SomeObject element = queue.take();
```
> 该特性仅在redisson pro中可用。
### Blocking Fair Deque
类似于BlockingFairQueue该特性仅在redisson pro中可用。
```java
RBlockingFairDeque deque = redisson.getBlockingFairDeque("myDeque");
deque.offer(new SomeObject());
SomeObject firstElement = queue.peekFirst();
SomeObject firstElement = queue.pollFirst();
SomeObject firstElement = queue.pollFirst(10, TimeUnit.MINUTES);
SomeObject firstElement = queue.takeFirst();
SomeObject lastElement = queue.peekLast();
SomeObject lastElement = queue.pollLast();
SomeObject lastElement = queue.pollLast(10, TimeUnit.MINUTES);
SomeObject lastElement = queue.takeLast();
```
### DelayedQueue
基于Redis的Delayed Queue允许将队列中的元素以固定的延迟转移到目标队列中。目标队列可以是任何实现了RQueue的队列。
DelayedQueue的使用示例如下
```java
RBlockingQueue<String> distinationQueue = ...
RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue);
// move object to distinationQueue in 10 seconds
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// move object to distinationQueue in 1 minutes
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);
// msg1 will appear in 10 seconds
distinationQueue.poll(15, TimeUnit.SECONDS);
// msg2 will appear in 2 seconds
distinationQueue.poll(2, TimeUnit.SECONDS);
```
### PriorityQueue
基于Redis的PriorityQueue实现了java.util.Queue接口元素按照Comparable或Comparator定义的顺序进行排序。
> 默认情况下PriorityQueue实现的是小根对可以自定义比较器来实现大根堆
tryComparator方法可以用于自定义比较器
```java
public class Entry implements Comparable<Entry>, Serializable {
private String key;
private Integer value;
public Entry(String key, Integer value) {
this.key = key;
this.value = value;
}
@Override
public int compareTo(Entry o) {
return key.compareTo(o.key);
}
}
RPriorityQueue<Entry> queue = redisson.getPriorityQueue("anyQueue");
queue.add(new Entry("b", 1));
queue.add(new Entry("c", 1));
queue.add(new Entry("a", 1));
// Entry [a:1]
Entry e = queue.poll();
// Entry [b:1]
Entry e = queue.poll();
// Entry [c:1]
Entry e = queue.poll();
```
### PriorityDeque
基于Redis的PriorityDeque实现了java.util.Deque接口其使用和PriorityQueue类似
```java
public class Entry implements Comparable<Entry>, Serializable {
private String key;
private Integer value;
public Entry(String key, Integer value) {
this.key = key;
this.value = value;
}
@Override
public int compareTo(Entry o) {
return key.compareTo(o.key);
}
}
RPriorityDeque<Entry> queue = redisson.getPriorityDeque("anyQueue");
queue.add(new Entry("b", 1));
queue.add(new Entry("c", 1));
queue.add(new Entry("a", 1));
// Entry [a:1]
Entry e = queue.pollFirst();
// Entry [c:1]
Entry e = queue.pollLast();
```
### PriorityBlockingQueue
基于PriorityBlockingQueue类似于java.concurrent.PriorityBlockingQueue其使用示例如下
```java
public class Entry implements Comparable<Entry>, Serializable {
private String key;
private Integer value;
public Entry(String key, Integer value) {
this.key = key;
this.value = value;
}
@Override
public int compareTo(Entry o) {
return key.compareTo(o.key);
}
}
RPriorityBlockingQueue<Entry> queue = redisson.getPriorityBlockingQueue("anyQueue");
queue.add(new Entry("b", 1));
queue.add(new Entry("c", 1));
queue.add(new Entry("a", 1));
// Entry [a:1]
Entry e = queue.take();
```
### PriorityBlockingDeque
基于Redis的PriorityBlockingDeque实现了java.util.concurrent.BlockingDeque接口其使用示例如下
```java
public class Entry implements Comparable<Entry>, Serializable {
private String key;
private Integer value;
public Entry(String key, Integer value) {
this.key = key;
this.value = value;
}
@Override
public int compareTo(Entry o) {
return key.compareTo(o.key);
}
}
RPriorityBlockingDeque<Entry> queue = redisson.getPriorityBlockingDeque("anyQueue");
queue.add(new Entry("b", 1));
queue.add(new Entry("c", 1));
queue.add(new Entry("a", 1));
// Entry [a:1]
Entry e = queue.takeFirst();
// Entry [c:1]
Entry e = queue.takeLast();
```
### Stream
基于Redis的Stream对象允许创建consumer groupconsumer group用于消费由生产者创建的消息。
```java
RStream<String, String> stream = redisson.getStream("test");
StreamMessageId sm = stream.add(StreamAddArgs.entry("0", "0"));
stream.createGroup("testGroup");
StreamId id1 = stream.add(StreamAddArgs.entry("1", "1"));
StreamId id2 = stream.add(StreamAddArgs.entry("2", "2"));
Map<StreamId, Map<String, String>> group = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());
long amount = stream.ack("testGroup", id1, id2);
```
### RingBuffer
基于Redis的RingBuffer实现了java.util.Queue如果当前队列已经满了那么会淘汰位于队列最前端的元素。
队列容量大小通过trySetCapacity方法进行设置RingBuffer的使用示例如下所示
```java
RRingBuffer<Integer> buffer = redisson.getRingBuffer("test");
// buffer capacity is 4 elements
buffer.trySetCapacity(4);
buffer.add(1);
buffer.add(2);
buffer.add(3);
buffer.add(4);
// buffer state is 1, 2, 3, 4
buffer.add(5);
buffer.add(6);
// buffer state is 3, 4, 5, 6
```
### Transfer Queue
基于Reids的TransferQueue实现了java.util.concurrent.TransferQueue接口提供了一系列的transfer方法仅当值被consumer成功处理之后才会返回。
对于TransferQueue其提供了一系列的transfer方法有阻塞版本和非阻塞版本。
- tryTransfer会非阻塞的添加元素当存在消费者调用poll获取当前元素时该方法会将元素交给消费者否则返回false。
- transfer会阻塞等待直到有消费者线程来获取该元素
> 当调用阻塞版本trasfer之后直到该transfer返回其他线程无法向TransferQueue中添加元素。
```java
RTransferQueue<String> queue = redisson.getTransferQueue("myCountDownLatch");
queue.transfer("data");
// or try transfer immediately
queue.tryTransfer("data");
// or try transfer up to 10 seconds
queue.tryTransfer("data", 10, TimeUnit.SECONDS);
// in other thread or JVM
queue.take();
// or
queue.poll();
```
### Time Series
基于Redis的TimeSeries允许按照Timestamp来存储值并且支持为entry设置ttl。
TimeSeries的使用如下所示
```java
RTimeSeries<String> ts = redisson.getTimeSeries("myTimeSeries");
ts.add(201908110501, "10%");
ts.add(201908110502, "30%");
ts.add(201908110504, "10%");
ts.add(201908110508, "75%");
// entry time-to-live is 10 hours
ts.add(201908110510, "85%", 10, TimeUnit.HOURS);
ts.add(201908110510, "95%", 10, TimeUnit.HOURS);
String value = ts.get(201908110508);
ts.remove(201908110508);
Collection<String> values = ts.pollFirst(2);
Collection<String> range = ts.range(201908110501, 201908110508);
```
## 分布式锁和synchronizer
### Lock
基于Redis的分布式ReentrantLock对象实现了java中的Lock接口。
为了防止redisson再获取锁之后崩溃导致永远持有锁redisson维护了watchdog当持有锁的redisson实例处于活跃状态时其会延长锁的时间。默认情况下redisson的超时时间为30s。
在获取锁时可以指定leaseTime在经过指定时间后锁定的lock会自动被释放。
> RLock的行为符合java lock规范意味着只有锁的持有线程才能对锁进行解锁否则会抛出IllegalMonitorStateException异常。
> 如果想要多个线程对锁资源进行操作可以考虑使用RSemaphore
RLock的使用示例如下
```java
RLock lock = redisson.getLock("myLock");
// traditional lock method
lock.lock();
// or acquire lock and automatically unlock it after 10 seconds
lock.lock(10, TimeUnit.SECONDS);
// or wait for lock aquisition up to 100 seconds
// and automatically unlock it after 10 seconds
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}
```
### Fair Lock
fair lock会保证所有线程按请求锁的顺序获取锁所有等待的线程会被排队如果有线程died那么redisson会等5s来等待其返回。
fair lock使用如下所示
```java
RLock lock = redisson.getFairLock("myLock");
// traditional lock method
lock.lock();
// or acquire lock and automatically unlock it after 10 seconds
lock.lock(10, TimeUnit.SECONDS);
// or wait for lock aquisition up to 100 seconds
// and automatically unlock it after 10 seconds
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}
```
### MultiLock
MultiLock允许对多个锁资源同时执行操作将多个锁资源视作单个锁资源。
MultiLock使用示例如下所示
```java
RLock lock1 = redisson1.getLock("lock1");
RLock lock2 = redisson2.getLock("lock2");
RLock lock3 = redisson3.getLock("lock3");
RLock multiLock = anyRedisson.getMultiLock(lock1, lock2, lock3);
// traditional lock method
multiLock.lock();
// or acquire lock and automatically unlock it after 10 seconds
multiLock.lock(10, TimeUnit.SECONDS);
// or wait for lock aquisition up to 100 seconds
// and automatically unlock it after 10 seconds
boolean res = multiLock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
multiLock.unlock();
}
}
```
### ReadWriteLock
基于Redis的ReadWriteLock实现了java中的ReadWriteLock接口readlock允许有多个ownerwritelock只允许有一个owner。
ReadWriteLock的使用示例如下所示
```java
RReadWriteLock rwlock = redisson.getReadWriteLock("myLock");
RLock lock = rwlock.readLock();
// or
RLock lock = rwlock.writeLock();
// traditional lock method
lock.lock();
// or acquire lock and automatically unlock it after 10 seconds
lock.lock(10, TimeUnit.SECONDS);
// or wait for lock aquisition up to 100 seconds
// and automatically unlock it after 10 seconds
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}
```
### Semaphore
基于Redis的Semaphore和java中的Semaphore类似可以在使用之前进行初始化但是初始化不是必须的可以通过trySetPermits方法初始化permit的数量。
semaphore使用示例如下所示
```java
RSemaphore semaphore = redisson.getSemaphore("mySemaphore");
// acquire single permit
semaphore.acquire();
// or acquire 10 permits
semaphore.acquire(10);
// or try to acquire permit
boolean res = semaphore.tryAcquire();
// or try to acquire permit or wait up to 15 seconds
boolean res = semaphore.tryAcquire(15, TimeUnit.SECONDS);
// or try to acquire 10 permit
boolean res = semaphore.tryAcquire(10);
// or try to acquire 10 permits or wait up to 15 seconds
boolean res = semaphore.tryAcquire(10, 15, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
semaphore.release();
}
}
```
### PermitExpirableSemaphore
基于redis的分布式Semaphore对象其支持在通过acquire获取信号量时添加lease time参数。
对于带lease time的permit每个permit都通过id标识并且释放操作也是通过id释放。
允许通过addPermits方法增加或减少permits数量。
```java
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
semaphore.trySetPermits(23);
// acquire permit
String id = semaphore.acquire();
// or acquire permit with lease time in 10 seconds
String id = semaphore.acquire(10, TimeUnit.SECONDS);
// or try to acquire permit
String id = semaphore.tryAcquire();
// or try to acquire permit or wait up to 15 seconds
String id = semaphore.tryAcquire(15, TimeUnit.SECONDS);
// or try to acquire permit with least time 15 seconds or wait up to 10 seconds
String id = semaphore.tryAcquire(10, 15, TimeUnit.SECONDS);
if (id != null) {
try {
...
} finally {
semaphore.release(id);
}
}
```
### CountDownLatch
基于Redis的分布式CountDownLatch对象和java中的CountDownLatch对象类似在使用之前应该通过trySetCount来进行初始化。
CountDownLatch的使用示例如下所示
```java
RCountDownLatch latch = redisson.getCountDownLatch("myCountDownLatch");
latch.trySetCount(1);
// await for count down
latch.await();
// in other thread or JVM
RCountDownLatch latch = redisson.getCountDownLatch("myCountDownLatch");
latch.countDown();
```
### SpinLock
由于分布式锁是采用的pub/sub机制故而当短时间内获取或释放大量的锁时消息会通过pub/sub channel发送到redis集群中所有的节点。
可以通过backoff策略来自旋尝试从而代替pub/sub channel发送时间这样可以降低redis集群的网络和cpu负载。
```java
RLock lock = redisson.getSpinLock("myLock");
// traditional lock method
lock.lock();
// or acquire lock and automatically unlock it after 10 seconds
lock.lock(10, TimeUnit.SECONDS);
// or wait for lock aquisition up to 100 seconds
// and automatically unlock it after 10 seconds
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}
```
### FencedLock
FencedLock使用如下所示
```java
RFencedLock lock = redisson.getFencedLock("myLock");
// traditional lock method
Long token = lock.lockAndGetToken();
// or acquire lock and automatically unlock it after 10 seconds
token = lock.lockAndGetToken(10, TimeUnit.SECONDS);
// or wait for lock aquisition up to 100 seconds
// and automatically unlock it after 10 seconds
Long token = lock.tryLockAndGetToken(100, 10, TimeUnit.SECONDS);
if (token != null) {
try {
// check if token >= old token
...
} finally {
lock.unlock();
}
}
```
## Redisson整合Spring Cache
Redisson提供了基于redis的spring cache实现每个cache实例都有ttl和maxIdleTime两个参数配置如下所示
```java
@Configuration
@ComponentScan
@EnableCaching
public static class Application {
@Bean(destroyMethod="shutdown")
RedissonClient redisson() throws IOException {
Config config = new Config();
config.useClusterServers()
.addNodeAddress("redis://127.0.0.1:7004", "redis://127.0.0.1:7001");
return Redisson.create(config);
}
@Bean
CacheManager cacheManager(RedissonClient redissonClient) {
Map<String, CacheConfig> config = new HashMap<String, CacheConfig>();
// create "testMap" cache with ttl = 24 minutes and maxIdleTime = 12 minutes
config.put("testMap", new CacheConfig(24*60*1000, 12*60*1000));
return new RedissonSpringCacheManager(redissonClient, config);
}
}
```
cache配置也能从yaml文件中进行读取示例如下
```java
@Configuration
@ComponentScan
@EnableCaching
public static class Application {
@Bean(destroyMethod="shutdown")
RedissonClient redisson(@Value("classpath:/redisson.yaml") Resource configFile) throws IOException {
Config config = Config.fromYAML(configFile.getInputStream());
return Redisson.create(config);
}
@Bean
CacheManager cacheManager(RedissonClient redissonClient) throws IOException {
return new RedissonSpringCacheManager(redissonClient, "classpath:/cache-config.yaml");
}
}
```
### spring cache yaml config
如下是通过yaml的格式来配置spring cache
```java
testMap:
ttl: 1440000
maxIdleTime: 720000
localCacheOptions:
invalidationPolicy: "ON_CHANGE"
evictionPolicy: "NONE"
cacheSize: 0
timeToLiveInMillis: 0
maxIdleInMillis: 0
```