33 KiB
Redisson
配置
编程式配置
可以通过创建Config对象来显式配置redisson,配置示例如下:
Config config = new Config();
config.setTransportMode(TransportMode.EPOLL);
config.useClusterServers()
// use "rediss://" for SSL connection
.addNodeAddress("perredis://127.0.0.1:7181");
RedissonClient redisson = Redisson.create(config);
yml Configuration
也可以通过yml文件格式来对redisson进行配置:
Config config = Config.fromYAML(new File("config-file.yaml"));
RedissonClient redisson = Redisson.create(config);
可以通过config.toYAML方法将config转化为yaml格式:
Config config = new Config();
// ... many settings are set here
String yamlFormat = config.toYAML();
yml格式中可以引入环境变量
singleServerConfig:
address: "redis://127.0.0.1:${REDIS_PORT}"
Common Settings
如下设置用于配置config对象,并且适用于各种redis模式
codec
默认值:org.redisson.codec.Kryo5Codec
connectionListener
默认值:null
connection listener,当redisson连接到redis-server或与redis-server断开连接时被触发
nettyThreads
默认值:32
redisson所有redis client共享的线程总数。netty thread用于redis相应的解码和命令的发送。
transportMode
默认值:TransportMode.NIO
可选的值如下:
- TransportMode.NIO(默认)
- TransportMode.EPOLL
- TransportMode.KQUEUE
threads
由Rtopic object listener所共享的线程数量
lockWatchdogTimeout
默认值:30000
RLock watchdog timeout,单位为ms。该参数仅当RLock在获取时没有指定leaseTimeout时使用。当watchdog没有延长持有锁时间到下一个watchdogTimeout间隔时,当前watchdogTimeout到期后锁会过期。watchdog避免了由于客户端崩溃或其他原因造成一直持有锁的情况。
Mode
single instance mode
可以通过如下方式来配置单实例模式:
// connects to 127.0.0.1:6379 by default
RedissonClient redisson = Redisson.create();
Config config = new Config();
config.useSingleServer().setAddress("redis://myredisserver:6379");
RedissonClient redisson = Redisson.create(config);
yml配置单实例模式如下:
singleServerConfig:
idleConnectionTimeout: 10000
connectTimeout: 10000
timeout: 3000
retryAttempts: 3
retryInterval: 1500
password: null
subscriptionsPerConnection: 5
clientName: null
address: "redis://127.0.0.1:6379"
subscriptionConnectionMinimumIdleSize: 1
subscriptionConnectionPoolSize: 50
connectionMinimumIdleSize: 24
connectionPoolSize: 64
database: 0
dnsMonitoringInterval: 5000
threads: 16
nettyThreads: 32
codec: !<org.redisson.codec.Kryo5Codec> {}
transportMode: "NIO"
Operation Execution
redisson支持自动重试策略,并且在每次尝试时都会发送命令。重试策略通过retryAttempts(默认情况下为3)、retryInterval(默认情况下为1000ms)来设置。多次尝试之间间隔retryInterval。
redisson实例是线程安全的,如下是RAtomicLong对象的使用示例:
RedissonClient client = Redisson.create(config);
RAtomicLong longObject = client.getAtomicLong('myLong');
// sync way
longObject.compareAndSet(3, 401);
// async way
RFuture<Boolean> result = longObject.compareAndSetAsync(3, 401);
RedissonReactiveClient client = Redisson.createReactive(config);
RAtomicLongReactive longObject = client.getAtomicLong('myLong');
// reactive way
Mono<Boolean> result = longObject.compareAndSet(3, 401);
RedissonRxClient client = Redisson.createRx(config);
RAtomicLongRx longObject= client.getAtomicLong("myLong");
// RxJava2 way
Flowable<Boolean result = longObject.compareAndSet(3, 401);
Async方式
大多数redisson object继承了异步接口,可以调用异步方法实现异步操作,如下:
// RAtomicLong extends RAtomicLongAsync
RAtomicLongAsync longObject = client.getAtomicLong("myLong");
RFuture<Boolean> future = longObject.compareAndSetAsync(1, 401);
RFuture对象继承了Future接口和CompletionStage接口,可以像CompleteableFuture一样使用:
future.whenComplete((res, exception) -> {
// handle both result and exception
});
// or
future.thenAccept(res -> {
// handle result
}).exceptionally(exception -> {
// handle exception
});
因该避免在future listener中使用同步方法,这样可能会造成redis请求/相应处理时的错误,应使用如下方式执行:
future.whenCompleteAsync((res, exception) -> {
// handle both result and exception
}, executor);
// or
future.thenAcceptAsync(res -> {
// handle result
}, executor).exceptionallyAsync(exception -> {
// handle exception
}, executor);
Redisson Object的公共操作
所有redisson object都实现了RObject和RExpiration接口,使用示例如下:
RObject object = redisson.get...()
object.sizeInMemory();
object.delete();
object.rename("newname");
object.isExists();
// catch expired event
object.addListener(new ExpiredObjectListener() {
...
});
// catch delete event
object.addListener(new DeletedObjectListener() {
...
});
redisson object的name属性即是在redis中的key:
RMap map = redisson.getMap("mymap");
map.getName(); // = mymap
和redis key相关的所有操作都通过RKeys接口暴露,使用示例如下:
RKeys keys = redisson.getKeys();
Iterable<String> allKeys = keys.getKeys();
Iterable<String> foundedKeys = keys.getKeysByPattern('key*');
long numOfDeletedKeys = keys.delete("obj1", "obj2", "obj3");
long deletedKeysAmount = keys.deleteByPattern("test?");
String randomKey = keys.randomKey();
long keysAmount = keys.count();
keys.flushall();
keys.flushdb();
分布式对象
object holder
RBucket
RBucket的java实现类是一个hodler,可以持有任何类型的java对象。RBucket的大小限制为512MB.
RBucket的使用示例如下所示:
RBucket<AnyObject> bucket = redisson.getBucket("anyObject");
bucket.set(new AnyObject(1));
AnyObject obj = bucket.get();
bucket.trySet(new AnyObject(3));
bucket.compareAndSet(new AnyObject(4), new AnyObject(5));
bucket.getAndSet(new AnyObject(6));
RBuckets
可以通过RBuckets对象来操作多个RBucket对象,RBuckets使用示例如下:
RBuckets buckets = redisson.getBuckets();
// get all bucket values
Map<String, V> loadedBuckets = buckets.get("myBucket1", "myBucket2", "myBucket3");
Map<String, Object> map = new HashMap<>();
map.put("myBucket1", new MyObject());
map.put("myBucket2", new MyObject());
// sets all or nothing if some bucket is already exists
buckets.trySet(map);
// store all at once
buckets.set(map);
Binary Stream Holder
RBinaryStream类型的对象用于存储字节序列。RBinaryStream实现了RBucket接口,并且其大小限制也是512MB.
RBinaryStream的使用示例如下:
RBinaryStream stream = redisson.getBinaryStream("anyStream");
byte[] content = ...
stream.set(content);
stream.getAndSet(content);
stream.trySet(content);
stream.compareAndSet(oldContent, content);
RBinaryStream可以和InputStream与OutputStream混用,使用如下:
RBinaryStream stream = redisson.getBinaryStream("anyStream");
InputStream is = stream.getInputStream();
byte[] readBuffer = ...
is.read(readBuffer);
OutputStream os = stream.getOuputStream();
byte[] contentToWrite = ...
os.write(contentToWrite);
BitSet
RBitSet实现提供了和java中BitSet类似的api,其大小限制是4 294 967 295 bits.
RBitSet使用如下所示:
RBitSet set = redisson.getBitSet("simpleBitset");
set.set(0, true);
set.set(1812, false);
set.clear(0);
set.and("anotherBitset");
set.xor("anotherBitset");
AtomicLong
RAtomicLong实现提供了和java中AtomicLong类似的api,其使用类似如下:
RAtomicLong atomicLong = redisson.getAtomicLong("myAtomicLong");
atomicLong.set(3);
atomicLong.incrementAndGet();
atomicLong.get();
AtomicDouble
RAtomicDouble实现提供了和java中AtomicDouble类似的api,其使用示例如下:
RAtomicDouble atomicDouble = redisson.getAtomicDouble("myAtomicDouble");
atomicDouble.set(2.81);
atomicDouble.addAndGet(4.11);
atomicDouble.get();
Topic
RTopic实现提供了发布/订阅机制,其允许订阅由同名RTopic对象发布的事件。
当重新连接到redis或redis错误切换之后,listener会被重新订阅,在重新订阅之前,所有被发布的消息都会丢失。
RTopic的使用如下所示:
RTopic topic = redisson.getTopic("myTopic");
int listenerId = topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
@Override
public void onMessage(String channel, SomeObject message) {
//...
}
});
// in other thread or JVM
RTopic topic = redisson.getTopic("myTopic");
long clientsReceivedMessage = topic.publish(new SomeObject());
Reliable Topic
RRelieableTopic在实现发布/订阅模式的情况下,还实现了消息的可靠传输。当redis连接断开的情况下,所有消息都会被存储,当重新连接到redis时,消息会被重新传送。
每个RReliableTopic对象实例都由一个watchdog,当第一个listener注册之后,watchdog就会被启用。当订阅者超过reliableTopicWatchdogTimeout之后,且watchdog没有为其延续过期时间,那么订阅将会超时。(该机制是为了防止client长时间崩溃之后存储的消息持续增长)。
当重新连接到redis之后,listener会被重新注册
RReliableTopic topic = redisson.getReliableTopic("anyTopic");
topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
@Override
public void onMessage(CharSequence channel, SomeObject message) {
//...
}
});
// in other thread or JVM
RReliableTopic topic = redisson.getReliableTopic("anyTopic");
long subscribersReceivedMessage = topic.publish(new SomeObject());
Topic Pattern
RPatternTopic允许订阅多个Rtopic,在重新连接到redis或redis错误切换之后,listener会被重新订阅。
Pattern使用如下:
- topic?: subscribes to topic1, topicA ...
- topic?_my: subscribes to topic_my, topic123_my, topicTEST_my ...
- topic[ae]: subscribes to topica and topice only
使用示例如下:
// subscribe to all topics by `topic*` pattern
RPatternTopic patternTopic = redisson.getPatternTopic("topic*");
int listenerId = patternTopic.addListener(Message.class, new PatternMessageListener<Message>() {
@Override
public void onMessage(String pattern, String channel, Message msg) {
//...
}
});
Bloom Filter
RBloomFilter中最多含有2^32个bit。
在使用之前,必须通过tryInit(expectedInsertions, falseProbability)来初始化capacity。
RBloomFilter使用示例如下:
RBloomFilter<SomeObject> bloomFilter = redisson.getBloomFilter("sample");
// initialize bloom filter with
// expectedInsertions = 55000000
// falseProbability = 0.03
bloomFilter.tryInit(55000000L, 0.03);
bloomFilter.add(new SomeObject("field1Value", "field2Value"));
bloomFilter.add(new SomeObject("field5Value", "field8Value"));
bloomFilter.contains(new SomeObject("field1Value", "field8Value"));
bloomFilter.count();
HyperLogLog
RHyperLogLog能以较低的空间维护大数量的项目,计算其去重后的数量,其使用如下所示:
RHyperLogLog<Integer> log = redisson.getHyperLogLog("log");
log.add(1);
log.add(2);
log.add(3);
log.count();
LongAdder
RLongAdder提供了java中LongAdder的实现,其在client端维护了LongAdder,增加和减少的性能较AtomicLong来说都有极大的提升(至多可提升12000倍)。其使用如下所示:
RLongAdder atomicLong = redisson.getLongAdder("myLongAdder");
atomicLong.add(12);
atomicLong.increment();
atomicLong.decrement();
atomicLong.sum();
当LongAdder不再使用之后,应该调用destroy方法手动进行销毁:
RLongAdder atomicLong = ...
atomicLong.destroy();
DoubleAdder
RDoubleAdder提供了java中DoubleAdder的分布式实现,其性能相对于AtomicDouble也有很大提升。
其使用如下所示:
RLongDouble atomicDouble = redisson.getLongDouble("myLongDouble");
atomicDouble.add(12);
atomicDouble.increment();
atomicDouble.decrement();
atomicDouble.sum();
id generator
RIdGenerator实现允许产生唯一的id,但其生成算法不是简单的递增,而是在第一次请求时,一系列id就已经被分配并且缓存到java端,直到其用完。该方法能够减少和redis的通信次数,产生id的速率比RAtomicLong快。
默认情况下allocate size是2000,并且值从0开始。
RIdGenerator的使用如下所示:
RIdGenerator generator = redisson.getIdGenerator("generator");
// Initialize with start value = 12 and allocation size = 20000
generator.tryInit(12, 20000);
long id = generator.nextId();
Json Object Holder
RJsonBucket类型用于存储json数据,通过redis 的JSON.*命令来实现。Json数据通过JsonCodec来进行编码和解码。可用的实现是org.redisson.codec.JacksonCodec。
local cache
redisson提供了带有本地缓存的json object holder版本。 local cache用于加速读取操作从而较少网络请求次数,其将整个json对象缓存在redisson侧,执行读取操作比无缓存快45倍。
| redission client method name | local cache | Ultra-fast read/write |
|---|---|---|
| getJsonBucket() open-source version | false | false |
| getJsonBucket() Redisson PRO version | false | true |
| getLocalCachedJsonBucket() Redisson PRO version | true | true |
RJsonBucket使用如下所示:
RJsonBucket<AnyObject> bucket = redisson.getJsonBucket("anyObject", new JacksonCodec<>(AnyObject.class));
bucket.set(new AnyObject(1));
AnyObject obj = bucket.get();
bucket.trySet(new AnyObject(3));
bucket.compareAndSet(new AnyObject(4), new AnyObject(5));
bucket.getAndSet(new AnyObject(6));
List<String> values = bucket.get(new JacksonCodec<>(new TypeReference<List<String>>() {}), "values");
long aa = bucket.arrayAppend("$.obj.values", "t3", "t4");
分布式集合
Map
基于Redis的分布式Map对象实现了ConcurrentMap接口。
如果Map主要用于读取数据,或者像尽量避免网络请求,可以使用带有local cache的map,使用示例如下:
RMap<String, SomeObject> map = redisson.getMap("anyMap");
SomeObject prevObject = map.put("123", new SomeObject());
SomeObject currentObject = map.putIfAbsent("323", new SomeObject());
SomeObject obj = map.remove("123");
// use fast* methods when previous value is not required
map.fastPut("a", new SomeObject());
map.fastPutIfAbsent("d", new SomeObject());
map.fastRemove("b");
RFuture<SomeObject> putAsyncFuture = map.putAsync("321");
RFuture<Void> fastPutAsyncFuture = map.fastPutAsync("321");
map.fastPutAsync("321", new SomeObject());
map.fastRemoveAsync("321");
RMap支持为每个key绑定一个lock/readwritelock/semaphore/countdownlatch,使用如下:
RMap<MyKey, MyValue> map = redisson.getMap("anyMap");
MyKey k = new MyKey();
RLock keyLock = map.getLock(k);
keyLock.lock();
try {
MyValue v = map.get(k);
// process value ...
} finally {
keyLock.unlock();
}
RReadWriteLock rwLock = map.getReadWriteLock(k);
rwLock.readLock().lock();
try {
MyValue v = map.get(k);
// process value ...
} finally {
keyLock.readLock().unlock();
}
Redisson支持不同的Map结构实现,不同的实现主要包含如下三个特性:
- local cache: local cache通过将map entry缓存在redisson端来减少网络请求,从而提升读操作的性能。拥有相同name的local cache实例会连接到相同的发布/订阅channel,该channel用于在local cache实例之间交换更新/失效的事件。
- eviction:支持对每个map entry指定ttl和max idle time
如下是所有的RMap可用实现:
| RedissonClient method | local cache | eviction |
|---|---|---|
| getMap() | false | false |
| getMapCache() | false | true |
| getLocalCachedMap() | true | false |
eviction
带有eviction功能的map实现了RMapCache接口,并且实现了ConcurrentMap接口。
当前redis并不支持淘汰map entry的功能,故而超时的map entry是通过EvictionScheduler来时不时进行清理的。其会一次性移除100条超时的entry。任务启动的时间会自动调整,根据上次任务删除过期entry的数量,时间会在5s到半小时之间变化。
建议在每个redisson实例中都使用具有相同名称的MapCache对象:
RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap");
// or
RMapCache<String, SomeObject> map = redisson.getLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
// or
RMapCache<String, SomeObject> map = redisson.getClusteredLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
// or
RMapCache<String, SomeObject> map = redisson.getClusteredMapCache("anyMap");
// ttl = 10 minutes,
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES);
// ttl = 10 minutes, maxIdleTime = 10 seconds
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES, 10, TimeUnit.SECONDS);
// ttl = 3 seconds
map.putIfAbsent("key2", new SomeObject(), 3, TimeUnit.SECONDS);
// ttl = 40 seconds, maxIdleTime = 10 seconds
map.putIfAbsent("key2", new SomeObject(), 40, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);
// if object is not used anymore
map.destroy();
local cache
带有local cache功能的map实现实现了RLocalCachedMap接口,同时也实现了ConcurrentMap接口。
推荐在所有的redisson client中使用具有相同name的LocalCachedMap,并且为所有具有相同名称的LocalCachedMap指定相同的LocalCachedMapOptions。
如下是在创建localcachedmap时可以指定的options:
LocalCachedMapOptions options = LocalCachedMapOptions.defaults()
// Defines whether to store a cache miss into the local cache.
// Default value is false.
.storeCacheMiss(false);
// Defines store mode of cache data.
// Follow options are available:
// LOCALCACHE - store data in local cache only and use Redis only for data update/invalidation.
// LOCALCACHE_REDIS - store data in both Redis and local cache.
.storeMode(StoreMode.LOCALCACHE_REDIS)
// Defines Cache provider used as local cache store.
// Follow options are available:
// REDISSON - uses Redisson own implementation
// CAFFEINE - uses Caffeine implementation
.cacheProvider(CacheProvider.REDISSON)
// Defines local cache eviction policy.
// Follow options are available:
// LFU - Counts how often an item was requested. Those that are used least often are discarded first.
// LRU - Discards the least recently used items first
// SOFT - Uses weak references, entries are removed by GC
// WEAK - Uses soft references, entries are removed by GC
// NONE - No eviction
.evictionPolicy(EvictionPolicy.NONE)
// If cache size is 0 then local cache is unbounded.
.cacheSize(1000)
// Defines strategy for load missed local cache updates after Redis connection failure.
//
// Follow reconnection strategies are available:
// CLEAR - Clear local cache if map instance has been disconnected for a while.
// LOAD - Store invalidated entry hash in invalidation log for 10 minutes
// Cache keys for stored invalidated entry hashes will be removed
// if LocalCachedMap instance has been disconnected less than 10 minutes
// or whole cache will be cleaned otherwise.
// NONE - Default. No reconnection handling
.reconnectionStrategy(ReconnectionStrategy.NONE)
// Defines local cache synchronization strategy.
//
// Follow sync strategies are available:
// INVALIDATE - Default. Invalidate cache entry across all LocalCachedMap instances on map entry change
// UPDATE - Insert/update cache entry across all LocalCachedMap instances on map entry change
// NONE - No synchronizations on map changes
.syncStrategy(SyncStrategy.INVALIDATE)
// time to live for each map entry in local cache
.timeToLive(10000)
// or
.timeToLive(10, TimeUnit.SECONDS)
// max idle time for each map entry in local cache
.maxIdle(10000)
// or
.maxIdle(10, TimeUnit.SECONDS);
localcachedmap对象在不再使用之后应该调用destroy方法进行销毁:
RLocalCachedMap<String, Integer> map = ...
map.destroy();
Map Persistence
redisson允许将map中的数据存储到除了redis以外的空间,其通常用于和应用和数据库之间的缓存,类似于spring cache。
read-through策略
当请求的条目在redisson map object中不存在时,其会使用MapLoader对象来进行加载,示例代码如下:
MapLoader<String, String> mapLoader = new MapLoader<String, String>() {
@Override
public Iterable<String> loadAllKeys() {
List<String> list = new ArrayList<String>();
Statement statement = conn.createStatement();
try {
ResultSet result = statement.executeQuery("SELECT id FROM student");
while (result.next()) {
list.add(result.getString(1));
}
} finally {
statement.close();
}
return list;
}
@Override
public String load(String key) {
PreparedStatement preparedStatement = conn.prepareStatement("SELECT name FROM student where id = ?");
try {
preparedStatement.setString(1, key);
ResultSet result = preparedStatement.executeQuery();
if (result.next()) {
return result.getString(1);
}
return null;
} finally {
preparedStatement.close();
}
}
};
配置map的示例如下:
MapOptions<K, V> options = MapOptions.<K, V>defaults()
.loader(mapLoader);
RMap<K, V> map = redisson.getMap("test", options);
// or
RMapCache<K, V> map = redisson.getMapCache("test", options);
// or with boost up to 45x times
RLocalCachedMap<K, V> map = redisson.getLocalCachedMap("test", options);
// or with boost up to 45x times
RLocalCachedMapCache<K, V> map = redisson.getLocalCachedMapCache("test", options);
write-through(sync)策略
如果map entry被更新,那么更新方法不会返回,直到通过MapWriter对象将entry更新写入到外部存储中:
MapWriter<String, String> mapWriter = new MapWriter<String, String>() {
@Override
public void write(Map<String, String> map) {
PreparedStatement preparedStatement = conn.prepareStatement("INSERT INTO student (id, name) values (?, ?)");
try {
for (Entry<String, String> entry : map.entrySet()) {
preparedStatement.setString(1, entry.getKey());
preparedStatement.setString(2, entry.getValue());
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
} finally {
preparedStatement.close();
}
}
@Override
public void delete(Collection<String> keys) {
PreparedStatement preparedStatement = conn.prepareStatement("DELETE FROM student where id = ?");
try {
for (String key : keys) {
preparedStatement.setString(1, key);
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
} finally {
preparedStatement.close();
}
}
};
write-through策略配置如下:
MapOptions<K, V> options = MapOptions.<K, V>defaults()
.writer(mapWriter)
.writeMode(WriteMode.WRITE_BEHIND)
.writeBehindDelay(5000)
.writeBehindBatchSize(100);
RMap<K, V> map = redisson.getMap("test", options);
// or
RMapCache<K, V> map = redisson.getMapCache("test", options);
// or with boost up to 45x times
RLocalCachedMap<K, V> map = redisson.getLocalCachedMap("test", options);
// or with boost up to 45x times
RLocalCachedMapCache<K, V> map = redisson.getLocalCachedMapCache("test", options);
write-behind策略(async)
在使用write-behind之后,对于map object的修改是批量累计的,并且按定义的延迟异步被MapWriter写入到外部存储中。
writeBehindDelay是批量写入或删除的延迟,默认情况下值为1000ms。writeBehindBatchSize是batch的容量,每个batch都包含写入和删除的命令集合,默认情况下size是50 .
配置代码如下所示:
MapOptions<K, V> options = MapOptions.<K, V>defaults()
.writer(mapWriter)
.writeMode(WriteMode.WRITE_BEHIND)
.writeBehindDelay(5000)
.writeBehindBatchSize(100);
RMap<K, V> map = redisson.getMap("test", options);
// or
RMapCache<K, V> map = redisson.getMapCache("test", options);
// or with boost up to 45x times
RLocalCachedMap<K, V> map = redisson.getLocalCachedMap("test", options);
// or with boost up to 45x times
RLocalCachedMapCache<K, V> map = redisson.getLocalCachedMapCache("test", options);
Map Listener
Redisson允许为实现RMapCache接口的Map对象绑定监听器,监听如下的map事件:
- entry创建:org.redisson.api.map.event.EntryCreatedListener
- entry过期:org.redisson.api.map.event.EntryExpiredListener
- entry移除:org.redisson.api.map.event.EntryRemovedListener
- entry更新:org.redisson.api.map.event.EntryUpdatedListener
使用实例如下所示:
RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap");
// or
RMapCache<String, SomeObject> map = redisson.getLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
// or
RMapCache<String, SomeObject> map = redisson.getClusteredLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
// or
RMapCache<String, SomeObject> map = redisson.getClusteredMapCache("anyMap");
int updateListener = map.addListener(new EntryUpdatedListener<Integer, Integer>() {
@Override
public void onUpdated(EntryEvent<Integer, Integer> event) {
event.getKey(); // key
event.getValue() // new value
event.getOldValue() // old value
// ...
}
});
int createListener = map.addListener(new EntryCreatedListener<Integer, Integer>() {
@Override
public void onCreated(EntryEvent<Integer, Integer> event) {
event.getKey(); // key
event.getValue() // value
// ...
}
});
int expireListener = map.addListener(new EntryExpiredListener<Integer, Integer>() {
@Override
public void onExpired(EntryEvent<Integer, Integer> event) {
event.getKey(); // key
event.getValue() // value
// ...
}
});
int removeListener = map.addListener(new EntryRemovedListener<Integer, Integer>() {
@Override
public void onRemoved(EntryEvent<Integer, Integer> event) {
event.getKey(); // key
event.getValue() // value
// ...
}
});
map.removeListener(updateListener);
map.removeListener(createListener);
map.removeListener(expireListener);
map.removeListener(removeListener);
LRU/LFU bounded Map
实现了RMapCache接口的Map对象支持按LRU或LFU顺序进行绑定。绑定了LRU或LFU的Map可以存储固定数量的entry,并且按照LRU或LFU的顺序淘汰entry。
RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap");
// or
RMapCache<String, SomeObject> map = redisson.getLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
// or
RMapCache<String, SomeObject> map = redisson.getClusteredLocalCachedMapCache("anyMap", LocalCachedMapOptions.defaults());
// or
RMapCache<String, SomeObject> map = redisson.getClusteredMapCache("anyMap");
// tries to set limit map to 10 entries using LRU eviction algorithm
map.trySetMaxSize(10);
// ... using LFU eviction algorithm
map.trySetMaxSize(10, EvictionMode.LFU);
// set or change limit map to 10 entries using LRU eviction algorithm
map.setMaxSize(10);
// ... using LFU eviction algorithm
map.setMaxSize(10, EvictionMode.LFU);
map.put("1", "2");
map.put("3", "3", 1, TimeUnit.SECONDS);
MultiMap
基于Redis的MultiMap支持为一个key绑定多个value
Set based MultiMap
基于set的multimap实现对于相同的key不允许出现重复的value:
RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("myMultimap");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("3"), new SimpleValue("4"));
Set<SimpleValue> allValues = map.get(new SimpleKey("0"));
List<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
Set<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);
Set<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));
List based MultiMap
基于List的MultiMap允许对相同的key存在多个相同value,并且对同一个key其value按插入顺序存储:
RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("3"), new SimpleValue("4"));
List<SimpleValue> allValues = map.get(new SimpleKey("0"));
Collection<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
List<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);
List<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));
MultiMap eviction
对于实现了MultimapCache接口的map对象,分别支持基于list和基于set的evicition。
过期的entry清除由EvictionScheduler执行,其一次性会移除100条过期的entry。Task启动时间会根据上次任务删除的条目数目自动进行调整,时间间隔在1s和2h之间进行调整。
MultiMap eviction使用示例如下所示:
RSetMultimapCache<String, String> multimap = redisson.getSetMultimapCache("myMultimap");
multimap.put("1", "a");
multimap.put("1", "b");
multimap.put("1", "c");
multimap.put("2", "e");
multimap.put("2", "f");
multimap.expireKey("2", 10, TimeUnit.MINUTES);
Set
基于redis的set object实现了java.util.set接口。
redis set的使用如下所示:
RSet<SomeObject> set = redisson.getSet("anySet");
set.add(new SomeObject());
set.remove(new SomeObject());
根据是否支持数据淘汰,可以分为如下方法:
| Redisson Client method | Eviction Support |
|---|---|
| getSet() | false |
| getSetCache() | true |
eviction
支持淘汰功能的Set对象实现了RSetCache接口,当前redis尚不支持淘汰set中的值,故而淘汰是由EvictionScheduler实现的,其一次性移除300条entry,根据上次task淘汰的key数量,下次执行task的间隔在1s到1h之间调整。
RSetCache<SomeObject> set = redisson.getSetCache("mySet");
// or
RMapCache<SomeObject> set = redisson.getClusteredSetCache("mySet");
// ttl = 10 minutes,
set.add(new SomeObject(), 10, TimeUnit.MINUTES);
// if object is not used anymore
set.destroy();
SortedSet
基于Redis的SortedSet实现了java.util.SortedSet接口,其通过比较元素来确保元素的在set中的唯一性。对于String数据类型,更推荐使用LexSortedSet,其性能表现更好。
RSortedSet的使用示例如下所示:
RSortedSet<Integer> set = redisson.getSortedSet("anySet");
set.trySetComparator(new MyComparator()); // set object comparator
set.add(3);
set.add(1);
set.add(2);
set.removeAsync(0);
set.addAsync(5);
ScoredSortedSet
基于redis的分布式ScoredSortedSet根据插入元素时的score来对元素进行排序。使用示例如下所示:
set.add(0.13, new SomeObject(a, b));
set.addAsync(0.251, new SomeObject(c, d));
set.add(0.302, new SomeObject(g, d));
set.pollFirst();
set.pollLast();
int index = set.rank(new SomeObject(g, d)); // get element index
Double score = set.getScore(new SomeObject(g, d)); // get element score