907 lines
34 KiB
Markdown
907 lines
34 KiB
Markdown
# Redisson
|
||
## 配置
|
||
### 编程式配置
|
||
可以通过创建Config对象来显式配置redisson,配置示例如下:
|
||
```java
|
||
Config config = new Config();
|
||
config.setTransportMode(TransportMode.EPOLL);
|
||
config.useClusterServers()
|
||
// use "rediss://" for SSL connection
|
||
.addNodeAddress("perredis://127.0.0.1:7181");
|
||
|
||
RedissonClient redisson = Redisson.create(config);
|
||
```
|
||
### yml Configuration
|
||
也可以通过yml文件格式来对redisson进行配置:
|
||
```java
|
||
Config config = Config.fromYAML(new File("config-file.yaml"));
|
||
RedissonClient redisson = Redisson.create(config);
|
||
```
|
||
可以通过config.toYAML方法将config转化为yaml格式:
|
||
```java
|
||
Config config = new Config();
|
||
// ... many settings are set here
|
||
String yamlFormat = config.toYAML();
|
||
```
|
||
yml格式中可以引入环境变量
|
||
```yml
|
||
singleServerConfig:
|
||
address: "redis://127.0.0.1:${REDIS_PORT}"
|
||
```
|
||
### Common Settings
|
||
如下设置用于配置config对象,并且适用于各种redis模式
|
||
#### codec
|
||
默认值:org.redisson.codec.Kryo5Codec
|
||
#### connectionListener
|
||
默认值:null
|
||
connection listener,当redisson连接到redis-server或与redis-server断开连接时被触发
|
||
#### nettyThreads
|
||
默认值:32
|
||
redisson所有redis client共享的线程总数。netty thread用于redis相应的解码和命令的发送。
|
||
#### transportMode
|
||
默认值:TransportMode.NIO
|
||
可选的值如下:
|
||
- TransportMode.NIO(默认)
|
||
- TransportMode.EPOLL
|
||
- TransportMode.KQUEUE
|
||
#### threads
|
||
由Rtopic object listener所共享的线程数量
|
||
#### lockWatchdogTimeout
|
||
默认值:30000
|
||
RLock watchdog timeout,单位为ms。该参数仅当RLock在获取时没有指定leaseTimeout时使用。当watchdog没有延长持有锁时间到下一个watchdogTimeout间隔时,当前watchdogTimeout到期后锁会过期。watchdog避免了由于客户端崩溃或其他原因造成一直持有锁的情况。
|
||
### Mode
|
||
#### single instance mode
|
||
可以通过如下方式来配置单实例模式:
|
||
```java
|
||
// connects to 127.0.0.1:6379 by default
|
||
RedissonClient redisson = Redisson.create();
|
||
|
||
Config config = new Config();
|
||
config.useSingleServer().setAddress("redis://myredisserver:6379");
|
||
RedissonClient redisson = Redisson.create(config);
|
||
```
|
||
yml配置单实例模式如下:
|
||
```yaml
|
||
singleServerConfig:
|
||
idleConnectionTimeout: 10000
|
||
connectTimeout: 10000
|
||
timeout: 3000
|
||
retryAttempts: 3
|
||
retryInterval: 1500
|
||
password: null
|
||
subscriptionsPerConnection: 5
|
||
clientName: null
|
||
address: "redis://127.0.0.1:6379"
|
||
subscriptionConnectionMinimumIdleSize: 1
|
||
subscriptionConnectionPoolSize: 50
|
||
connectionMinimumIdleSize: 24
|
||
connectionPoolSize: 64
|
||
database: 0
|
||
dnsMonitoringInterval: 5000
|
||
threads: 16
|
||
nettyThreads: 32
|
||
codec: !<org.redisson.codec.Kryo5Codec> {}
|
||
transportMode: "NIO"
|
||
```
|
||
## Operation Execution
|
||
redisson支持自动重试策略,并且在每次尝试时都会发送命令。重试策略通过retryAttempts(默认情况下为3)、retryInterval(默认情况下为1000ms)来设置。多次尝试之间间隔retryInterval。
|
||
redisson实例是线程安全的,如下是RAtomicLong对象的使用示例:
|
||
```java
|
||
RedissonClient client = Redisson.create(config);
|
||
RAtomicLong longObject = client.getAtomicLong('myLong');
|
||
// sync way
|
||
longObject.compareAndSet(3, 401);
|
||
// async way
|
||
RFuture<Boolean> result = longObject.compareAndSetAsync(3, 401);
|
||
|
||
RedissonReactiveClient client = Redisson.createReactive(config);
|
||
RAtomicLongReactive longObject = client.getAtomicLong('myLong');
|
||
// reactive way
|
||
Mono<Boolean> result = longObject.compareAndSet(3, 401);
|
||
|
||
RedissonRxClient client = Redisson.createRx(config);
|
||
RAtomicLongRx longObject= client.getAtomicLong("myLong");
|
||
// RxJava2 way
|
||
Flowable<Boolean result = longObject.compareAndSet(3, 401);
|
||
```
|
||
### Async方式
|
||
大多数redisson object继承了异步接口,可以调用异步方法实现异步操作,如下:
|
||
```java
|
||
// RAtomicLong extends RAtomicLongAsync
|
||
RAtomicLongAsync longObject = client.getAtomicLong("myLong");
|
||
RFuture<Boolean> future = longObject.compareAndSetAsync(1, 401);
|
||
```
|
||
RFuture对象继承了Future接口和CompletionStage接口,可以像CompleteableFuture一样使用:
|
||
```java
|
||
future.whenComplete((res, exception) -> {
|
||
|
||
// handle both result and exception
|
||
|
||
});
|
||
|
||
|
||
// or
|
||
future.thenAccept(res -> {
|
||
|
||
// handle result
|
||
|
||
}).exceptionally(exception -> {
|
||
|
||
// handle exception
|
||
|
||
});
|
||
```
|
||
因该避免在future listener中使用同步方法,这样可能会造成redis请求/相应处理时的错误,应使用如下方式执行:
|
||
```java
|
||
future.whenCompleteAsync((res, exception) -> {
|
||
|
||
// handle both result and exception
|
||
|
||
}, executor);
|
||
|
||
|
||
// or
|
||
future.thenAcceptAsync(res -> {
|
||
|
||
// handle result
|
||
|
||
}, executor).exceptionallyAsync(exception -> {
|
||
|
||
// handle exception
|
||
|
||
}, executor);
|
||
```
|
||
## Redisson Object的公共操作
|
||
所有redisson object都实现了RObject和RExpiration接口,使用示例如下:
|
||
```java
|
||
RObject object = redisson.get...()
|
||
|
||
object.sizeInMemory();
|
||
|
||
object.delete();
|
||
|
||
object.rename("newname");
|
||
|
||
object.isExists();
|
||
|
||
// catch expired event
|
||
object.addListener(new ExpiredObjectListener() {
|
||
...
|
||
});
|
||
|
||
// catch delete event
|
||
object.addListener(new DeletedObjectListener() {
|
||
...
|
||
});
|
||
```
|
||
redisson object的name属性即是在redis中的key:
|
||
```java
|
||
RMap map = redisson.getMap("mymap");
|
||
map.getName(); // = mymap
|
||
```
|
||
和redis key相关的所有操作都通过RKeys接口暴露,使用示例如下:
|
||
```java
|
||
RKeys keys = redisson.getKeys();
|
||
|
||
Iterable<String> allKeys = keys.getKeys();
|
||
|
||
Iterable<String> foundedKeys = keys.getKeysByPattern('key*');
|
||
|
||
long numOfDeletedKeys = keys.delete("obj1", "obj2", "obj3");
|
||
|
||
long deletedKeysAmount = keys.deleteByPattern("test?");
|
||
|
||
String randomKey = keys.randomKey();
|
||
|
||
long keysAmount = keys.count();
|
||
|
||
keys.flushall();
|
||
|
||
keys.flushdb();
|
||
```
|
||
## 分布式对象
|
||
### object holder
|
||
#### RBucket
|
||
RBucket的java实现类是一个hodler,可以持有任何类型的java对象。RBucket的大小限制为512MB.
|
||
RBucket的使用示例如下所示:
|
||
```java
|
||
RBucket<AnyObject> bucket = redisson.getBucket("anyObject");
|
||
|
||
bucket.set(new AnyObject(1));
|
||
AnyObject obj = bucket.get();
|
||
|
||
bucket.trySet(new AnyObject(3));
|
||
bucket.compareAndSet(new AnyObject(4), new AnyObject(5));
|
||
bucket.getAndSet(new AnyObject(6));
|
||
```
|
||
#### RBuckets
|
||
可以通过RBuckets对象来操作多个RBucket对象,RBuckets使用示例如下:
|
||
```java
|
||
RBuckets buckets = redisson.getBuckets();
|
||
|
||
// get all bucket values
|
||
Map<String, V> loadedBuckets = buckets.get("myBucket1", "myBucket2", "myBucket3");
|
||
|
||
Map<String, Object> map = new HashMap<>();
|
||
map.put("myBucket1", new MyObject());
|
||
map.put("myBucket2", new MyObject());
|
||
|
||
// sets all or nothing if some bucket is already exists
|
||
buckets.trySet(map);
|
||
// store all at once
|
||
buckets.set(map);
|
||
```
|
||
### Binary Stream Holder
|
||
RBinaryStream类型的对象用于存储字节序列。RBinaryStream实现了RBucket接口,并且其大小限制也是512MB.
|
||
RBinaryStream的使用示例如下:
|
||
```java
|
||
RBinaryStream stream = redisson.getBinaryStream("anyStream");
|
||
|
||
byte[] content = ...
|
||
stream.set(content);
|
||
stream.getAndSet(content);
|
||
stream.trySet(content);
|
||
stream.compareAndSet(oldContent, content);
|
||
```
|
||
RBinaryStream可以和InputStream与OutputStream混用,使用如下:
|
||
```java
|
||
RBinaryStream stream = redisson.getBinaryStream("anyStream");
|
||
|
||
InputStream is = stream.getInputStream();
|
||
byte[] readBuffer = ...
|
||
is.read(readBuffer);
|
||
|
||
OutputStream os = stream.getOuputStream();
|
||
byte[] contentToWrite = ...
|
||
os.write(contentToWrite);
|
||
```
|
||
### BitSet
|
||
RBitSet实现提供了和java中BitSet类似的api,其大小限制是4 294 967 295 bits.
|
||
RBitSet使用如下所示:
|
||
```java
|
||
RBitSet set = redisson.getBitSet("simpleBitset");
|
||
|
||
set.set(0, true);
|
||
set.set(1812, false);
|
||
|
||
set.clear(0);
|
||
|
||
set.and("anotherBitset");
|
||
set.xor("anotherBitset");
|
||
```
|
||
### AtomicLong
|
||
RAtomicLong实现提供了和java中AtomicLong类似的api,其使用类似如下:
|
||
```java
|
||
RAtomicLong atomicLong = redisson.getAtomicLong("myAtomicLong");
|
||
atomicLong.set(3);
|
||
atomicLong.incrementAndGet();
|
||
atomicLong.get();
|
||
```
|
||
### AtomicDouble
|
||
RAtomicDouble实现提供了和java中AtomicDouble类似的api,其使用示例如下:
|
||
```java
|
||
RAtomicDouble atomicDouble = redisson.getAtomicDouble("myAtomicDouble");
|
||
atomicDouble.set(2.81);
|
||
atomicDouble.addAndGet(4.11);
|
||
atomicDouble.get();
|
||
```
|
||
### Topic
|
||
RTopic实现提供了发布/订阅机制,其允许订阅由同名RTopic对象发布的事件。
|
||
当重新连接到redis或redis错误切换之后,listener会被重新订阅,在重新订阅之前,所有被发布的消息都会丢失。
|
||
RTopic的使用如下所示:
|
||
```java
|
||
RTopic topic = redisson.getTopic("myTopic");
|
||
int listenerId = topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
|
||
@Override
|
||
public void onMessage(String channel, SomeObject message) {
|
||
//...
|
||
}
|
||
});
|
||
|
||
// in other thread or JVM
|
||
RTopic topic = redisson.getTopic("myTopic");
|
||
long clientsReceivedMessage = topic.publish(new SomeObject());
|
||
```
|
||
### Reliable Topic
|
||
RRelieableTopic在实现发布/订阅模式的情况下,还实现了消息的可靠传输。当redis连接断开的情况下,所有消息都会被存储,当重新连接到redis时,消息会被重新传送。
|
||
每个RReliableTopic对象实例都由一个watchdog,当第一个listener注册之后,watchdog就会被启用。当订阅者超过reliableTopicWatchdogTimeout之后,且watchdog没有为其延续过期时间,那么订阅将会超时。(该机制是为了防止client长时间崩溃之后存储的消息持续增长)。
|
||
**当重新连接到redis之后,listener会被重新注册**
|
||
```java
|
||
RReliableTopic topic = redisson.getReliableTopic("anyTopic");
|
||
topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
|
||
@Override
|
||
public void onMessage(CharSequence channel, SomeObject message) {
|
||
//...
|
||
}
|
||
});
|
||
|
||
// in other thread or JVM
|
||
RReliableTopic topic = redisson.getReliableTopic("anyTopic");
|
||
long subscribersReceivedMessage = topic.publish(new SomeObject());
|
||
```
|
||
|
||
### Topic Pattern
|
||
RPatternTopic允许订阅多个Rtopic,在重新连接到redis或redis错误切换之后,listener会被重新订阅。
|
||
Pattern使用如下:
|
||
- topic?: subscribes to topic1, topicA ...
|
||
- topic?_my: subscribes to topic_my, topic123_my, topicTEST_my ...
|
||
- topic[ae]: subscribes to topica and topice only
|
||
|
||
使用示例如下:
|
||
```java
|
||
// subscribe to all topics by `topic*` pattern
|
||
RPatternTopic patternTopic = redisson.getPatternTopic("topic*");
|
||
int listenerId = patternTopic.addListener(Message.class, new PatternMessageListener<Message>() {
|
||
@Override
|
||
public void onMessage(String pattern, String channel, Message msg) {
|
||
//...
|
||
}
|
||
});
|
||
```
|
||
### Bloom Filter
|
||
RBloomFilter中最多含有2^32个bit。
|
||
在使用之前,必须通过tryInit(expectedInsertions, falseProbability)来初始化capacity。
|
||
RBloomFilter使用示例如下:
|
||
```java
|
||
RBloomFilter<SomeObject> bloomFilter = redisson.getBloomFilter("sample");
|
||
// initialize bloom filter with
|
||
// expectedInsertions = 55000000
|
||
// falseProbability = 0.03
|
||
bloomFilter.tryInit(55000000L, 0.03);
|
||
|
||
bloomFilter.add(new SomeObject("field1Value", "field2Value"));
|
||
bloomFilter.add(new SomeObject("field5Value", "field8Value"));
|
||
|
||
bloomFilter.contains(new SomeObject("field1Value", "field8Value"));
|
||
bloomFilter.count();
|
||
```
|
||
### HyperLogLog
|
||
RHyperLogLog能以较低的空间维护大数量的项目,计算其去重后的数量,其使用如下所示:
|
||
```java
|
||
RHyperLogLog<Integer> log = redisson.getHyperLogLog("log");
|
||
log.add(1);
|
||
log.add(2);
|
||
log.add(3);
|
||
|
||
log.count();
|
||
```
|
||
### LongAdder
|
||
RLongAdder提供了java中LongAdder的实现,其在client端维护了LongAdder,增加和减少的性能较AtomicLong来说都有极大的提升(至多可提升12000倍)。其使用如下所示:
|
||
```java
|
||
RLongAdder atomicLong = redisson.getLongAdder("myLongAdder");
|
||
atomicLong.add(12);
|
||
atomicLong.increment();
|
||
atomicLong.decrement();
|
||
atomicLong.sum();
|
||
```
|
||
当LongAdder不再使用之后,应该调用destroy方法手动进行销毁:
|
||
```java
|
||
RLongAdder atomicLong = ...
|
||
atomicLong.destroy();
|
||
```
|
||
### DoubleAdder
|
||
RDoubleAdder提供了java中DoubleAdder的分布式实现,其性能相对于AtomicDouble也有很大提升。
|
||
其使用如下所示:
|
||
```java
|
||
RLongDouble atomicDouble = redisson.getLongDouble("myLongDouble");
|
||
atomicDouble.add(12);
|
||
atomicDouble.increment();
|
||
atomicDouble.decrement();
|
||
atomicDouble.sum();
|
||
```
|
||
### id generator
|
||
RIdGenerator实现允许产生唯一的id,但其生成算法不是简单的递增,而是在第一次请求时,一系列id就已经被分配并且缓存到java端,直到其用完。该方法能够减少和redis的通信次数,产生id的速率比RAtomicLong快。
|
||
默认情况下allocate size是2000,并且值从0开始。
|
||
RIdGenerator的使用如下所示:
|
||
```java
|
||
RIdGenerator generator = redisson.getIdGenerator("generator");
|
||
|
||
// Initialize with start value = 12 and allocation size = 20000
|
||
generator.tryInit(12, 20000);
|
||
|
||
long id = generator.nextId();
|
||
```
|
||
### Json Object Holder
|
||
RJsonBucket类型用于存储json数据,通过redis 的JSON.*命令来实现。Json数据通过JsonCodec来进行编码和解码。可用的实现是org.redisson.codec.JacksonCodec。
|
||
#### local cache
|
||
redisson提供了带有本地缓存的json object holder版本。
|
||
local cache用于加速读取操作从而较少网络请求次数,其将整个json对象缓存在redisson侧,执行读取操作比无缓存快45倍。
|
||
| redission client method name | local cache | Ultra-fast read/write |
|
||
| :-: | :-: | :-: |
|
||
| getJsonBucket() open-source version | false | false |
|
||
| getJsonBucket() Redisson PRO version | false | true |
|
||
| getLocalCachedJsonBucket() Redisson PRO version | true | true |
|
||
|
||
RJsonBucket使用如下所示:
|
||
```java
|
||
RJsonBucket<AnyObject> bucket = redisson.getJsonBucket("anyObject", new JacksonCodec<>(AnyObject.class));
|
||
|
||
bucket.set(new AnyObject(1));
|
||
AnyObject obj = bucket.get();
|
||
|
||
bucket.trySet(new AnyObject(3));
|
||
bucket.compareAndSet(new AnyObject(4), new AnyObject(5));
|
||
bucket.getAndSet(new AnyObject(6));
|
||
|
||
List<String> values = bucket.get(new JacksonCodec<>(new TypeReference<List<String>>() {}), "values");
|
||
long aa = bucket.arrayAppend("$.obj.values", "t3", "t4");
|
||
```
|
||
## 分布式集合
|
||
### Map
|
||
基于Redis的分布式Map对象实现了ConcurrentMap接口。
|
||
如果Map主要用于读取数据,或者像尽量避免网络请求,可以使用带有local cache的map,使用示例如下:
|
||
```java
|
||
RMap<String, SomeObject> map = redisson.getMap("anyMap");
|
||
SomeObject prevObject = map.put("123", new SomeObject());
|
||
SomeObject currentObject = map.putIfAbsent("323", new SomeObject());
|
||
SomeObject obj = map.remove("123");
|
||
|
||
// use fast* methods when previous value is not required
|
||
map.fastPut("a", new SomeObject());
|
||
map.fastPutIfAbsent("d", new SomeObject());
|
||
map.fastRemove("b");
|
||
|
||
RFuture<SomeObject> putAsyncFuture = map.putAsync("321");
|
||
RFuture<Void> fastPutAsyncFuture = map.fastPutAsync("321");
|
||
|
||
map.fastPutAsync("321", new SomeObject());
|
||
map.fastRemoveAsync("321");
|
||
```
|
||
RMap支持为每个key绑定一个lock/readwritelock/semaphore/countdownlatch,使用如下:
|
||
```java
|
||
RMap<MyKey, MyValue> map = redisson.getMap("anyMap");
|
||
MyKey k = new MyKey();
|
||
RLock keyLock = map.getLock(k);
|
||
keyLock.lock();
|
||
try {
|
||
MyValue v = map.get(k);
|
||
// process value ...
|
||
} finally {
|
||
keyLock.unlock();
|
||
}
|
||
|
||
RReadWriteLock rwLock = map.getReadWriteLock(k);
|
||
rwLock.readLock().lock();
|
||
try {
|
||
MyValue v = map.get(k);
|
||
// process value ...
|
||
} finally {
|
||
keyLock.readLock().unlock();
|
||
}
|
||
```
|
||
Redisson支持不同的Map结构实现,不同的实现主要包含如下三个特性:
|
||
- local cache: local cache通过将map entry缓存在redisson端来减少网络请求,从而提升读操作的性能。拥有相同name的local cache实例会连接到相同的发布/订阅channel,该channel用于在local cache实例之间交换更新/失效的事件。
|
||
- eviction:支持对每个map entry指定ttl和max idle time
|
||
|
||
如下是所有的RMap可用实现:
|
||
| RedissonClient method | local cache | eviction |
|
||
| :-: | :-: | :-: |
|
||
| getMap() | false | false |
|
||
| getMapCache() | false | true |
|
||
| getLocalCachedMap() | true | false |
|
||
|
||
#### eviction
|
||
带有eviction功能的map实现了RMapCache接口,并且实现了ConcurrentMap接口。
|
||
当前redis并不支持淘汰map entry的功能,故而超时的map entry是通过EvictionScheduler来时不时进行清理的。其会一次性移除100条超时的entry。任务启动的时间会自动调整,根据上次任务删除过期entry的数量,时间会在5s到半小时之间变化。
|
||
建议在每个redisson实例中都使用具有相同名称的MapCache对象:
|
||
```java
|
||
RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap");
|
||
// or
|
||
RMapCache<String, SomeObject> map = redisson.getLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
|
||
// or
|
||
RMapCache<String, SomeObject> map = redisson.getClusteredLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
|
||
// or
|
||
RMapCache<String, SomeObject> map = redisson.getClusteredMapCache("anyMap");
|
||
|
||
|
||
// ttl = 10 minutes,
|
||
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES);
|
||
// ttl = 10 minutes, maxIdleTime = 10 seconds
|
||
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES, 10, TimeUnit.SECONDS);
|
||
|
||
// ttl = 3 seconds
|
||
map.putIfAbsent("key2", new SomeObject(), 3, TimeUnit.SECONDS);
|
||
// ttl = 40 seconds, maxIdleTime = 10 seconds
|
||
map.putIfAbsent("key2", new SomeObject(), 40, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);
|
||
|
||
// if object is not used anymore
|
||
map.destroy();
|
||
```
|
||
#### local cache
|
||
带有local cache功能的map实现实现了RLocalCachedMap接口,同时也实现了ConcurrentMap接口。
|
||
推荐在所有的redisson client中使用具有相同name的LocalCachedMap,并且为所有具有相同名称的LocalCachedMap指定相同的LocalCachedMapOptions。
|
||
如下是在创建localcachedmap时可以指定的options:
|
||
```java
|
||
LocalCachedMapOptions options = LocalCachedMapOptions.defaults()
|
||
|
||
// Defines whether to store a cache miss into the local cache.
|
||
// Default value is false.
|
||
.storeCacheMiss(false);
|
||
|
||
// Defines store mode of cache data.
|
||
// Follow options are available:
|
||
// LOCALCACHE - store data in local cache only and use Redis only for data update/invalidation.
|
||
// LOCALCACHE_REDIS - store data in both Redis and local cache.
|
||
.storeMode(StoreMode.LOCALCACHE_REDIS)
|
||
|
||
// Defines Cache provider used as local cache store.
|
||
// Follow options are available:
|
||
// REDISSON - uses Redisson own implementation
|
||
// CAFFEINE - uses Caffeine implementation
|
||
.cacheProvider(CacheProvider.REDISSON)
|
||
|
||
// Defines local cache eviction policy.
|
||
// Follow options are available:
|
||
// LFU - Counts how often an item was requested. Those that are used least often are discarded first.
|
||
// LRU - Discards the least recently used items first
|
||
// SOFT - Uses weak references, entries are removed by GC
|
||
// WEAK - Uses soft references, entries are removed by GC
|
||
// NONE - No eviction
|
||
.evictionPolicy(EvictionPolicy.NONE)
|
||
|
||
// If cache size is 0 then local cache is unbounded.
|
||
.cacheSize(1000)
|
||
|
||
// Defines strategy for load missed local cache updates after Redis connection failure.
|
||
//
|
||
// Follow reconnection strategies are available:
|
||
// CLEAR - Clear local cache if map instance has been disconnected for a while.
|
||
// LOAD - Store invalidated entry hash in invalidation log for 10 minutes
|
||
// Cache keys for stored invalidated entry hashes will be removed
|
||
// if LocalCachedMap instance has been disconnected less than 10 minutes
|
||
// or whole cache will be cleaned otherwise.
|
||
// NONE - Default. No reconnection handling
|
||
.reconnectionStrategy(ReconnectionStrategy.NONE)
|
||
|
||
// Defines local cache synchronization strategy.
|
||
//
|
||
// Follow sync strategies are available:
|
||
// INVALIDATE - Default. Invalidate cache entry across all LocalCachedMap instances on map entry change
|
||
// UPDATE - Insert/update cache entry across all LocalCachedMap instances on map entry change
|
||
// NONE - No synchronizations on map changes
|
||
.syncStrategy(SyncStrategy.INVALIDATE)
|
||
|
||
// time to live for each map entry in local cache
|
||
.timeToLive(10000)
|
||
// or
|
||
.timeToLive(10, TimeUnit.SECONDS)
|
||
|
||
// max idle time for each map entry in local cache
|
||
.maxIdle(10000)
|
||
// or
|
||
.maxIdle(10, TimeUnit.SECONDS);
|
||
```
|
||
localcachedmap对象在不再使用之后应该调用destroy方法进行销毁:
|
||
```java
|
||
RLocalCachedMap<String, Integer> map = ...
|
||
map.destroy();
|
||
```
|
||
#### Map Persistence
|
||
redisson允许将map中的数据存储到除了redis以外的空间,其通常用于和应用和数据库之间的缓存,类似于spring cache。
|
||
##### read-through策略
|
||
当请求的条目在redisson map object中不存在时,其会使用MapLoader对象来进行加载,示例代码如下:
|
||
```java
|
||
MapLoader<String, String> mapLoader = new MapLoader<String, String>() {
|
||
|
||
@Override
|
||
public Iterable<String> loadAllKeys() {
|
||
List<String> list = new ArrayList<String>();
|
||
Statement statement = conn.createStatement();
|
||
try {
|
||
ResultSet result = statement.executeQuery("SELECT id FROM student");
|
||
while (result.next()) {
|
||
list.add(result.getString(1));
|
||
}
|
||
} finally {
|
||
statement.close();
|
||
}
|
||
|
||
return list;
|
||
}
|
||
|
||
@Override
|
||
public String load(String key) {
|
||
PreparedStatement preparedStatement = conn.prepareStatement("SELECT name FROM student where id = ?");
|
||
try {
|
||
preparedStatement.setString(1, key);
|
||
ResultSet result = preparedStatement.executeQuery();
|
||
if (result.next()) {
|
||
return result.getString(1);
|
||
}
|
||
return null;
|
||
} finally {
|
||
preparedStatement.close();
|
||
}
|
||
}
|
||
};
|
||
```
|
||
配置map的示例如下:
|
||
```java
|
||
MapOptions<K, V> options = MapOptions.<K, V>defaults()
|
||
.loader(mapLoader);
|
||
|
||
RMap<K, V> map = redisson.getMap("test", options);
|
||
// or
|
||
RMapCache<K, V> map = redisson.getMapCache("test", options);
|
||
// or with boost up to 45x times
|
||
RLocalCachedMap<K, V> map = redisson.getLocalCachedMap("test", options);
|
||
// or with boost up to 45x times
|
||
RLocalCachedMapCache<K, V> map = redisson.getLocalCachedMapCache("test", options);
|
||
```
|
||
##### write-through(sync)策略
|
||
如果map entry被更新,那么更新方法不会返回,直到通过MapWriter对象将entry更新写入到外部存储中:
|
||
```java
|
||
MapWriter<String, String> mapWriter = new MapWriter<String, String>() {
|
||
|
||
@Override
|
||
public void write(Map<String, String> map) {
|
||
PreparedStatement preparedStatement = conn.prepareStatement("INSERT INTO student (id, name) values (?, ?)");
|
||
try {
|
||
for (Entry<String, String> entry : map.entrySet()) {
|
||
preparedStatement.setString(1, entry.getKey());
|
||
preparedStatement.setString(2, entry.getValue());
|
||
preparedStatement.addBatch();
|
||
}
|
||
preparedStatement.executeBatch();
|
||
} finally {
|
||
preparedStatement.close();
|
||
}
|
||
}
|
||
|
||
@Override
|
||
public void delete(Collection<String> keys) {
|
||
PreparedStatement preparedStatement = conn.prepareStatement("DELETE FROM student where id = ?");
|
||
try {
|
||
for (String key : keys) {
|
||
preparedStatement.setString(1, key);
|
||
preparedStatement.addBatch();
|
||
}
|
||
preparedStatement.executeBatch();
|
||
} finally {
|
||
preparedStatement.close();
|
||
}
|
||
}
|
||
};
|
||
```
|
||
write-through策略配置如下:
|
||
```java
|
||
MapOptions<K, V> options = MapOptions.<K, V>defaults()
|
||
.writer(mapWriter)
|
||
.writeMode(WriteMode.WRITE_BEHIND)
|
||
.writeBehindDelay(5000)
|
||
.writeBehindBatchSize(100);
|
||
|
||
RMap<K, V> map = redisson.getMap("test", options);
|
||
// or
|
||
RMapCache<K, V> map = redisson.getMapCache("test", options);
|
||
// or with boost up to 45x times
|
||
RLocalCachedMap<K, V> map = redisson.getLocalCachedMap("test", options);
|
||
// or with boost up to 45x times
|
||
RLocalCachedMapCache<K, V> map = redisson.getLocalCachedMapCache("test", options);
|
||
```
|
||
##### write-behind策略(async)
|
||
在使用write-behind之后,对于map object的修改是批量累计的,并且按定义的延迟异步被MapWriter写入到外部存储中。
|
||
writeBehindDelay是批量写入或删除的延迟,默认情况下值为1000ms。writeBehindBatchSize是batch的容量,每个batch都包含写入和删除的命令集合,默认情况下size是50 .
|
||
配置代码如下所示:
|
||
```java
|
||
MapOptions<K, V> options = MapOptions.<K, V>defaults()
|
||
.writer(mapWriter)
|
||
.writeMode(WriteMode.WRITE_BEHIND)
|
||
.writeBehindDelay(5000)
|
||
.writeBehindBatchSize(100);
|
||
|
||
RMap<K, V> map = redisson.getMap("test", options);
|
||
// or
|
||
RMapCache<K, V> map = redisson.getMapCache("test", options);
|
||
// or with boost up to 45x times
|
||
RLocalCachedMap<K, V> map = redisson.getLocalCachedMap("test", options);
|
||
// or with boost up to 45x times
|
||
RLocalCachedMapCache<K, V> map = redisson.getLocalCachedMapCache("test", options);
|
||
```
|
||
#### Map Listener
|
||
Redisson允许为实现RMapCache接口的Map对象绑定监听器,监听如下的map事件:
|
||
- entry创建:org.redisson.api.map.event.EntryCreatedListener
|
||
- entry过期:org.redisson.api.map.event.EntryExpiredListener
|
||
- entry移除:org.redisson.api.map.event.EntryRemovedListener
|
||
- entry更新:org.redisson.api.map.event.EntryUpdatedListener
|
||
|
||
使用实例如下所示:
|
||
```java
|
||
RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap");
|
||
// or
|
||
RMapCache<String, SomeObject> map = redisson.getLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
|
||
// or
|
||
RMapCache<String, SomeObject> map = redisson.getClusteredLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
|
||
// or
|
||
RMapCache<String, SomeObject> map = redisson.getClusteredMapCache("anyMap");
|
||
|
||
|
||
int updateListener = map.addListener(new EntryUpdatedListener<Integer, Integer>() {
|
||
@Override
|
||
public void onUpdated(EntryEvent<Integer, Integer> event) {
|
||
event.getKey(); // key
|
||
event.getValue() // new value
|
||
event.getOldValue() // old value
|
||
// ...
|
||
}
|
||
});
|
||
|
||
int createListener = map.addListener(new EntryCreatedListener<Integer, Integer>() {
|
||
@Override
|
||
public void onCreated(EntryEvent<Integer, Integer> event) {
|
||
event.getKey(); // key
|
||
event.getValue() // value
|
||
// ...
|
||
}
|
||
});
|
||
|
||
int expireListener = map.addListener(new EntryExpiredListener<Integer, Integer>() {
|
||
@Override
|
||
public void onExpired(EntryEvent<Integer, Integer> event) {
|
||
event.getKey(); // key
|
||
event.getValue() // value
|
||
// ...
|
||
}
|
||
});
|
||
|
||
int removeListener = map.addListener(new EntryRemovedListener<Integer, Integer>() {
|
||
@Override
|
||
public void onRemoved(EntryEvent<Integer, Integer> event) {
|
||
event.getKey(); // key
|
||
event.getValue() // value
|
||
// ...
|
||
}
|
||
});
|
||
|
||
map.removeListener(updateListener);
|
||
map.removeListener(createListener);
|
||
map.removeListener(expireListener);
|
||
map.removeListener(removeListener);
|
||
```
|
||
#### LRU/LFU bounded Map
|
||
实现了RMapCache接口的Map对象支持按LRU或LFU顺序进行绑定。绑定了LRU或LFU的Map可以存储固定数量的entry,并且按照LRU或LFU的顺序淘汰entry。
|
||
```java
|
||
RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap");
|
||
// or
|
||
RMapCache<String, SomeObject> map = redisson.getLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
|
||
// or
|
||
RMapCache<String, SomeObject> map = redisson.getClusteredLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
|
||
// or
|
||
RMapCache<String, SomeObject> map = redisson.getClusteredMapCache("anyMap");
|
||
|
||
|
||
// tries to set limit map to 10 entries using LRU eviction algorithm
|
||
map.trySetMaxSize(10);
|
||
// ... using LFU eviction algorithm
|
||
map.trySetMaxSize(10, EvictionMode.LFU);
|
||
|
||
// set or change limit map to 10 entries using LRU eviction algorithm
|
||
map.setMaxSize(10);
|
||
// ... using LFU eviction algorithm
|
||
map.setMaxSize(10, EvictionMode.LFU);
|
||
|
||
map.put("1", "2");
|
||
map.put("3", "3", 1, TimeUnit.SECONDS);
|
||
```
|
||
### MultiMap
|
||
基于Redis的MultiMap支持为一个key绑定多个value
|
||
#### Set based MultiMap
|
||
基于set的multimap实现对于相同的key不允许出现重复的value:
|
||
```java
|
||
RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("myMultimap");
|
||
map.put(new SimpleKey("0"), new SimpleValue("1"));
|
||
map.put(new SimpleKey("0"), new SimpleValue("2"));
|
||
map.put(new SimpleKey("3"), new SimpleValue("4"));
|
||
|
||
Set<SimpleValue> allValues = map.get(new SimpleKey("0"));
|
||
|
||
List<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
|
||
Set<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);
|
||
|
||
Set<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));
|
||
```
|
||
#### List based MultiMap
|
||
基于List的MultiMap允许对相同的key存在多个相同value,并且对同一个key其value按插入顺序存储:
|
||
```java
|
||
RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
|
||
map.put(new SimpleKey("0"), new SimpleValue("1"));
|
||
map.put(new SimpleKey("0"), new SimpleValue("2"));
|
||
map.put(new SimpleKey("0"), new SimpleValue("1"));
|
||
map.put(new SimpleKey("3"), new SimpleValue("4"));
|
||
|
||
List<SimpleValue> allValues = map.get(new SimpleKey("0"));
|
||
|
||
Collection<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
|
||
List<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);
|
||
|
||
List<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));
|
||
```
|
||
#### MultiMap eviction
|
||
对于实现了MultimapCache接口的map对象,分别支持基于list和基于set的evicition。
|
||
过期的entry清除由EvictionScheduler执行,其一次性会移除100条过期的entry。Task启动时间会根据上次任务删除的条目数目自动进行调整,时间间隔在1s和2h之间进行调整。
|
||
MultiMap eviction使用示例如下所示:
|
||
```java
|
||
RSetMultimapCache<String, String> multimap = redisson.getSetMultimapCache("myMultimap");
|
||
multimap.put("1", "a");
|
||
multimap.put("1", "b");
|
||
multimap.put("1", "c");
|
||
|
||
multimap.put("2", "e");
|
||
multimap.put("2", "f");
|
||
|
||
multimap.expireKey("2", 10, TimeUnit.MINUTES);
|
||
```
|
||
### Set
|
||
基于redis的set object实现了java.util.set接口。
|
||
redis set的使用如下所示:
|
||
```java
|
||
RSet<SomeObject> set = redisson.getSet("anySet");
|
||
set.add(new SomeObject());
|
||
set.remove(new SomeObject());
|
||
```
|
||
根据是否支持数据淘汰,可以分为如下方法:
|
||
| Redisson Client method | Eviction Support |
|
||
| :-: | :-: |
|
||
| getSet() | false |
|
||
|getSetCache() | true |
|
||
#### eviction
|
||
支持淘汰功能的Set对象实现了RSetCache接口,当前redis尚不支持淘汰set中的值,故而淘汰是由EvictionScheduler实现的,其一次性移除300条entry,根据上次task淘汰的key数量,下次执行task的间隔在1s到1h之间调整。
|
||
```java
|
||
RSetCache<SomeObject> set = redisson.getSetCache("mySet");
|
||
// or
|
||
RMapCache<SomeObject> set = redisson.getClusteredSetCache("mySet");
|
||
|
||
// ttl = 10 minutes,
|
||
set.add(new SomeObject(), 10, TimeUnit.MINUTES);
|
||
|
||
// if object is not used anymore
|
||
set.destroy();
|
||
```
|
||
#### SortedSet
|
||
基于Redis的SortedSet实现了java.util.SortedSet接口,其通过比较元素来确保元素的在set中的唯一性。对于String数据类型,更推荐使用LexSortedSet,其性能表现更好。
|
||
RSortedSet的使用示例如下所示:
|
||
```java
|
||
RSortedSet<Integer> set = redisson.getSortedSet("anySet");
|
||
set.trySetComparator(new MyComparator()); // set object comparator
|
||
set.add(3);
|
||
set.add(1);
|
||
set.add(2);
|
||
|
||
set.removeAsync(0);
|
||
set.addAsync(5);
|
||
```
|
||
#### ScoredSortedSet
|
||
基于redis的分布式ScoredSortedSet根据插入元素时的score来对元素进行排序。使用示例如下所示:(默认情况下,插入元素按照score从高到低的顺寻进行排序)
|
||
```java
|
||
set.add(0.13, new SomeObject(a, b));
|
||
set.addAsync(0.251, new SomeObject(c, d));
|
||
set.add(0.302, new SomeObject(g, d));
|
||
|
||
set.pollFirst();
|
||
set.pollLast();
|
||
|
||
int index = set.rank(new SomeObject(g, d)); // get element index
|
||
Double score = set.getScore(new SomeObject(g, d)); // get element score
|
||
```
|
||
#### LexSortedSet
|
||
LexSortedSet只能用于存储String类型元素,其实现了Set\<String\>接口,其按字典顺序存储String元素。
|
||
LexSortedSet使用示例如下:
|
||
```java
|
||
RLexSortedSet set = redisson.getLexSortedSet("simple");
|
||
set.add("d");
|
||
set.addAsync("e");
|
||
set.add("f");
|
||
|
||
set.lexRangeTail("d", false);
|
||
set.lexCountHead("e");
|
||
set.lexRange("d", true, "z", false);
|
||
```
|
||
### List
|
||
基于Redis的list实现了java.util.List接口,其按照插入顺序来存储元素。
|
||
RList的使用如下:
|
||
```java
|
||
RList<SomeObject> list = redisson.getList("anyList");
|
||
list.add(new SomeObject());
|
||
list.get(0);
|
||
list.remove(new SomeObject());
|
||
``` |