diff --git a/spring/redisson/redisson.md b/spring/redisson/redisson.md new file mode 100644 index 0000000..88518ae --- /dev/null +++ b/spring/redisson/redisson.md @@ -0,0 +1,428 @@ +# Redisson +## 配置 +### 编程式配置 +可以通过创建Config对象来显式配置redisson,配置示例如下: +```java +Config config = new Config(); +config.setTransportMode(TransportMode.EPOLL); +config.useClusterServers() + // use "rediss://" for SSL connection + .addNodeAddress("perredis://127.0.0.1:7181"); + +RedissonClient redisson = Redisson.create(config); +``` +### yml Configuration +也可以通过yml文件格式来对redisson进行配置: +```java +Config config = Config.fromYAML(new File("config-file.yaml")); +RedissonClient redisson = Redisson.create(config); +``` +可以通过config.toYAML方法将config转化为yaml格式: +```java +Config config = new Config(); +// ... many settings are set here +String yamlFormat = config.toYAML(); +``` +yml格式中可以引入环境变量 +```yml +singleServerConfig: + address: "redis://127.0.0.1:${REDIS_PORT}" +``` +### Common Settings +如下设置用于配置config对象,并且适用于各种redis模式 +#### codec +默认值:org.redisson.codec.Kryo5Codec +#### connectionListener +默认值:null +connection listener,当redisson连接到redis-server或与redis-server断开连接时被触发 +#### nettyThreads +默认值:32 +redisson所有redis client共享的线程总数。netty thread用于redis相应的解码和命令的发送。 +#### transportMode +默认值:TransportMode.NIO +可选的值如下: +- TransportMode.NIO(默认) +- TransportMode.EPOLL +- TransportMode.KQUEUE +#### threads +由Rtopic object listener所共享的线程数量 +#### lockWatchdogTimeout +默认值:30000 +RLock watchdog timeout,单位为ms。该参数仅当RLock在获取时没有指定leaseTimeout时使用。当watchdog没有延长持有锁时间到下一个watchdogTimeout间隔时,当前watchdogTimeout到期后锁会过期。watchdog避免了由于客户端崩溃或其他原因造成一直持有锁的情况。 +### Mode +#### single instance mode +可以通过如下方式来配置单实例模式: +```java +// connects to 127.0.0.1:6379 by default +RedissonClient redisson = Redisson.create(); + +Config config = new Config(); +config.useSingleServer().setAddress("redis://myredisserver:6379"); +RedissonClient redisson = Redisson.create(config); +``` +yml配置单实例模式如下: +```yaml +singleServerConfig: + idleConnectionTimeout: 10000 + connectTimeout: 10000 + timeout: 3000 + retryAttempts: 3 + retryInterval: 1500 + password: null + subscriptionsPerConnection: 5 + clientName: null + address: "redis://127.0.0.1:6379" + subscriptionConnectionMinimumIdleSize: 1 + subscriptionConnectionPoolSize: 50 + connectionMinimumIdleSize: 24 + connectionPoolSize: 64 + database: 0 + dnsMonitoringInterval: 5000 +threads: 16 +nettyThreads: 32 +codec: ! {} +transportMode: "NIO" +``` +## Operation Execution +redisson支持自动重试策略,并且在每次尝试时都会发送命令。重试策略通过retryAttempts(默认情况下为3)、retryInterval(默认情况下为1000ms)来设置。多次尝试之间间隔retryInterval。 +redisson实例是线程安全的,如下是RAtomicLong对象的使用示例: +```java +RedissonClient client = Redisson.create(config); +RAtomicLong longObject = client.getAtomicLong('myLong'); +// sync way +longObject.compareAndSet(3, 401); +// async way +RFuture result = longObject.compareAndSetAsync(3, 401); + +RedissonReactiveClient client = Redisson.createReactive(config); +RAtomicLongReactive longObject = client.getAtomicLong('myLong'); +// reactive way +Mono result = longObject.compareAndSet(3, 401); + +RedissonRxClient client = Redisson.createRx(config); +RAtomicLongRx longObject= client.getAtomicLong("myLong"); +// RxJava2 way +Flowable future = longObject.compareAndSetAsync(1, 401); +``` +RFuture对象继承了Future接口和CompletionStage接口,可以像CompleteableFuture一样使用: +```java +future.whenComplete((res, exception) -> { + + // handle both result and exception + +}); + + +// or +future.thenAccept(res -> { + + // handle result + +}).exceptionally(exception -> { + + // handle exception + +}); +``` +因该避免在future listener中使用同步方法,这样可能会造成redis请求/相应处理时的错误,应使用如下方式执行: +```java +future.whenCompleteAsync((res, exception) -> { + + // handle both result and exception + +}, executor); + + +// or +future.thenAcceptAsync(res -> { + + // handle result + +}, executor).exceptionallyAsync(exception -> { + + // handle exception + +}, executor); +``` +## Redisson Object的公共操作 +所有redisson object都实现了RObject和RExpiration接口,使用示例如下: +```java +RObject object = redisson.get...() + +object.sizeInMemory(); + +object.delete(); + +object.rename("newname"); + +object.isExists(); + +// catch expired event +object.addListener(new ExpiredObjectListener() { + ... +}); + +// catch delete event +object.addListener(new DeletedObjectListener() { + ... +}); +``` +redisson object的name属性即是在redis中的key: +```java +RMap map = redisson.getMap("mymap"); +map.getName(); // = mymap +``` +和redis key相关的所有操作都通过RKeys接口暴露,使用示例如下: +```java +RKeys keys = redisson.getKeys(); + +Iterable allKeys = keys.getKeys(); + +Iterable foundedKeys = keys.getKeysByPattern('key*'); + +long numOfDeletedKeys = keys.delete("obj1", "obj2", "obj3"); + +long deletedKeysAmount = keys.deleteByPattern("test?"); + +String randomKey = keys.randomKey(); + +long keysAmount = keys.count(); + +keys.flushall(); + +keys.flushdb(); +``` +## 分布式对象 +### object holder +#### RBucket +RBucket的java实现类是一个hodler,可以持有任何类型的java对象。RBucket的大小限制为512MB. +RBucket的使用示例如下所示: +```java +RBucket bucket = redisson.getBucket("anyObject"); + +bucket.set(new AnyObject(1)); +AnyObject obj = bucket.get(); + +bucket.trySet(new AnyObject(3)); +bucket.compareAndSet(new AnyObject(4), new AnyObject(5)); +bucket.getAndSet(new AnyObject(6)); +``` +#### RBuckets +可以通过RBuckets对象来操作多个RBucket对象,RBuckets使用示例如下: +```java +RBuckets buckets = redisson.getBuckets(); + +// get all bucket values +Map loadedBuckets = buckets.get("myBucket1", "myBucket2", "myBucket3"); + +Map map = new HashMap<>(); +map.put("myBucket1", new MyObject()); +map.put("myBucket2", new MyObject()); + +// sets all or nothing if some bucket is already exists +buckets.trySet(map); +// store all at once +buckets.set(map); +``` +### Binary Stream Holder +RBinaryStream类型的对象用于存储字节序列。RBinaryStream实现了RBucket接口,并且其大小限制也是512MB. +RBinaryStream的使用示例如下: +```java +RBinaryStream stream = redisson.getBinaryStream("anyStream"); + +byte[] content = ... +stream.set(content); +stream.getAndSet(content); +stream.trySet(content); +stream.compareAndSet(oldContent, content); +``` +RBinaryStream可以和InputStream与OutputStream混用,使用如下: +```java +RBinaryStream stream = redisson.getBinaryStream("anyStream"); + +InputStream is = stream.getInputStream(); +byte[] readBuffer = ... +is.read(readBuffer); + +OutputStream os = stream.getOuputStream(); +byte[] contentToWrite = ... +os.write(contentToWrite); +``` +### BitSet +RBitSet实现提供了和java中BitSet类似的api,其大小限制是4 294 967 295 bits. +RBitSet使用如下所示: +```java +RBitSet set = redisson.getBitSet("simpleBitset"); + +set.set(0, true); +set.set(1812, false); + +set.clear(0); + +set.and("anotherBitset"); +set.xor("anotherBitset"); +``` +### AtomicLong +RAtomicLong实现提供了和java中AtomicLong类似的api,其使用类似如下: +```java +RAtomicLong atomicLong = redisson.getAtomicLong("myAtomicLong"); +atomicLong.set(3); +atomicLong.incrementAndGet(); +atomicLong.get(); +``` +### AtomicDouble +RAtomicDouble实现提供了和java中AtomicDouble类似的api,其使用示例如下: +```java +RAtomicDouble atomicDouble = redisson.getAtomicDouble("myAtomicDouble"); +atomicDouble.set(2.81); +atomicDouble.addAndGet(4.11); +atomicDouble.get(); +``` +### Topic +RTopic实现提供了发布/订阅机制,其允许订阅由同名RTopic对象发布的事件。 +当重新连接到redis或redis错误切换之后,listener会被重新订阅,在重新订阅之前,所有被发布的消息都会丢失。 +RTopic的使用如下所示: +```java +RTopic topic = redisson.getTopic("myTopic"); +int listenerId = topic.addListener(SomeObject.class, new MessageListener() { + @Override + public void onMessage(String channel, SomeObject message) { + //... + } +}); + +// in other thread or JVM +RTopic topic = redisson.getTopic("myTopic"); +long clientsReceivedMessage = topic.publish(new SomeObject()); +``` +### Reliable Topic +RRelieableTopic在实现发布/订阅模式的情况下,还实现了消息的可靠传输。当redis连接断开的情况下,所有消息都会被存储,当重新连接到redis时,消息会被重新传送。 +每个RReliableTopic对象实例都由一个watchdog,当第一个listener注册之后,watchdog就会被启用。当订阅者超过reliableTopicWatchdogTimeout之后,且watchdog没有为其延续过期时间,那么订阅将会超时。(该机制是为了防止client长时间崩溃之后存储的消息持续增长)。 +**当重新连接到redis之后,listener会被重新注册** +```java +RReliableTopic topic = redisson.getReliableTopic("anyTopic"); +topic.addListener(SomeObject.class, new MessageListener() { + @Override + public void onMessage(CharSequence channel, SomeObject message) { + //... + } +}); + +// in other thread or JVM +RReliableTopic topic = redisson.getReliableTopic("anyTopic"); +long subscribersReceivedMessage = topic.publish(new SomeObject()); +``` + +### Topic Pattern +RPatternTopic允许订阅多个Rtopic,在重新连接到redis或redis错误切换之后,listener会被重新订阅。 +Pattern使用如下: +- topic?: subscribes to topic1, topicA ... +- topic?_my: subscribes to topic_my, topic123_my, topicTEST_my ... +- topic[ae]: subscribes to topica and topice only + +使用示例如下: +```java +// subscribe to all topics by `topic*` pattern +RPatternTopic patternTopic = redisson.getPatternTopic("topic*"); +int listenerId = patternTopic.addListener(Message.class, new PatternMessageListener() { + @Override + public void onMessage(String pattern, String channel, Message msg) { + //... + } +}); +``` +### Bloom Filter +RBloomFilter中最多含有2^32个bit。 +在使用之前,必须通过tryInit(expectedInsertions, falseProbability)来初始化capacity。 +RBloomFilter使用示例如下: +```java +RBloomFilter bloomFilter = redisson.getBloomFilter("sample"); +// initialize bloom filter with +// expectedInsertions = 55000000 +// falseProbability = 0.03 +bloomFilter.tryInit(55000000L, 0.03); + +bloomFilter.add(new SomeObject("field1Value", "field2Value")); +bloomFilter.add(new SomeObject("field5Value", "field8Value")); + +bloomFilter.contains(new SomeObject("field1Value", "field8Value")); +bloomFilter.count(); +``` +### HyperLogLog +RHyperLogLog能以较低的空间维护大数量的项目,计算其去重后的数量,其使用如下所示: +```java +RHyperLogLog log = redisson.getHyperLogLog("log"); +log.add(1); +log.add(2); +log.add(3); + +log.count(); +``` +### LongAdder +RLongAdder提供了java中LongAdder的实现,其在client端维护了LongAdder,增加和减少的性能较AtomicLong来说都有极大的提升(至多可提升12000倍)。其使用如下所示: +```java +RLongAdder atomicLong = redisson.getLongAdder("myLongAdder"); +atomicLong.add(12); +atomicLong.increment(); +atomicLong.decrement(); +atomicLong.sum(); +``` +当LongAdder不再使用之后,应该调用destroy方法手动进行销毁: +```java +RLongAdder atomicLong = ... +atomicLong.destroy(); +``` +### DoubleAdder +RDoubleAdder提供了java中DoubleAdder的分布式实现,其性能相对于AtomicDouble也有很大提升。 +其使用如下所示: +```java +RLongDouble atomicDouble = redisson.getLongDouble("myLongDouble"); +atomicDouble.add(12); +atomicDouble.increment(); +atomicDouble.decrement(); +atomicDouble.sum(); +``` +### id generator +RIdGenerator实现允许产生唯一的id,但其生成算法不是简单的递增,而是在第一次请求时,一系列id就已经被分配并且缓存到java端,直到其用完。该方法能够减少和redis的通信次数,产生id的速率比RAtomicLong快。 +默认情况下allocate size是2000,并且值从0开始。 +RIdGenerator的使用如下所示: +```java +RIdGenerator generator = redisson.getIdGenerator("generator"); + +// Initialize with start value = 12 and allocation size = 20000 +generator.tryInit(12, 20000); + +long id = generator.nextId(); +``` +### Json Object Holder +RJsonBucket类型用于存储json数据,通过redis 的JSON.*命令来实现。Json数据通过JsonCodec来进行编码和解码。可用的实现是org.redisson.codec.JacksonCodec。 +#### local cache +redisson提供了带有本地缓存的json object holder版本。 +local cache用于加速读取操作从而较少网络请求次数,其将整个json对象缓存在redisson侧,执行读取操作比无缓存快45倍。 +| redission client method name | local cache | Ultra-fast read/write | +| :-: | :-: | :-: | +| getJsonBucket() open-source version | false | false | +| getJsonBucket() Redisson PRO version | false | true | +| getLocalCachedJsonBucket() Redisson PRO version | true | true | + +RJsonBucket使用如下所示: +```java +RJsonBucket bucket = redisson.getJsonBucket("anyObject", new JacksonCodec<>(AnyObject.class)); + +bucket.set(new AnyObject(1)); +AnyObject obj = bucket.get(); + +bucket.trySet(new AnyObject(3)); +bucket.compareAndSet(new AnyObject(4), new AnyObject(5)); +bucket.getAndSet(new AnyObject(6)); + +List values = bucket.get(new JacksonCodec<>(new TypeReference>() {}), "values"); +long aa = bucket.arrayAppend("$.obj.values", "t3", "t4"); +```