Files
rikako-note/spring/redisson/redisson.md

14 KiB
Raw Blame History

Redisson

配置

编程式配置

可以通过创建Config对象来显式配置redisson配置示例如下

Config config = new Config();
config.setTransportMode(TransportMode.EPOLL);
config.useClusterServers()
       // use "rediss://" for SSL connection
      .addNodeAddress("perredis://127.0.0.1:7181");

RedissonClient redisson = Redisson.create(config);

yml Configuration

也可以通过yml文件格式来对redisson进行配置

Config config = Config.fromYAML(new File("config-file.yaml"));  
RedissonClient redisson = Redisson.create(config);

可以通过config.toYAML方法将config转化为yaml格式

Config config = new Config();
// ... many settings are set here
String yamlFormat = config.toYAML();

yml格式中可以引入环境变量

singleServerConfig:
  address: "redis://127.0.0.1:${REDIS_PORT}"

Common Settings

如下设置用于配置config对象并且适用于各种redis模式

codec

默认值org.redisson.codec.Kryo5Codec

connectionListener

默认值null
connection listener当redisson连接到redis-server或与redis-server断开连接时被触发

nettyThreads

默认值32
redisson所有redis client共享的线程总数。netty thread用于redis相应的解码和命令的发送。

transportMode

默认值TransportMode.NIO
可选的值如下:

  • TransportMode.NIO默认
  • TransportMode.EPOLL
  • TransportMode.KQUEUE

threads

由Rtopic object listener所共享的线程数量

lockWatchdogTimeout

默认值30000
RLock watchdog timeout单位为ms。该参数仅当RLock在获取时没有指定leaseTimeout时使用。当watchdog没有延长持有锁时间到下一个watchdogTimeout间隔时当前watchdogTimeout到期后锁会过期。watchdog避免了由于客户端崩溃或其他原因造成一直持有锁的情况。

Mode

single instance mode

可以通过如下方式来配置单实例模式:

// connects to 127.0.0.1:6379 by default
RedissonClient redisson = Redisson.create();

Config config = new Config();
config.useSingleServer().setAddress("redis://myredisserver:6379");
RedissonClient redisson = Redisson.create(config);

yml配置单实例模式如下

singleServerConfig:
  idleConnectionTimeout: 10000
  connectTimeout: 10000
  timeout: 3000
  retryAttempts: 3
  retryInterval: 1500
  password: null
  subscriptionsPerConnection: 5
  clientName: null
  address: "redis://127.0.0.1:6379"
  subscriptionConnectionMinimumIdleSize: 1
  subscriptionConnectionPoolSize: 50
  connectionMinimumIdleSize: 24
  connectionPoolSize: 64
  database: 0
  dnsMonitoringInterval: 5000
threads: 16
nettyThreads: 32
codec: !<org.redisson.codec.Kryo5Codec> {}
transportMode: "NIO"

Operation Execution

redisson支持自动重试策略并且在每次尝试时都会发送命令。重试策略通过retryAttempts默认情况下为3、retryInterval默认情况下为1000ms来设置。多次尝试之间间隔retryInterval。
redisson实例是线程安全的如下是RAtomicLong对象的使用示例

RedissonClient client = Redisson.create(config);
RAtomicLong longObject = client.getAtomicLong('myLong');
// sync way
longObject.compareAndSet(3, 401);
// async way
RFuture<Boolean> result = longObject.compareAndSetAsync(3, 401);

RedissonReactiveClient client = Redisson.createReactive(config);
RAtomicLongReactive longObject = client.getAtomicLong('myLong');
// reactive way
Mono<Boolean> result = longObject.compareAndSet(3, 401);

RedissonRxClient client = Redisson.createRx(config);
RAtomicLongRx longObject= client.getAtomicLong("myLong");
// RxJava2 way
Flowable<Boolean result = longObject.compareAndSet(3, 401);

Async方式

大多数redisson object继承了异步接口可以调用异步方法实现异步操作如下

// RAtomicLong extends RAtomicLongAsync
RAtomicLongAsync longObject = client.getAtomicLong("myLong");
RFuture<Boolean> future = longObject.compareAndSetAsync(1, 401);

RFuture对象继承了Future接口和CompletionStage接口可以像CompleteableFuture一样使用

future.whenComplete((res, exception) -> {

    // handle both result and exception

});


// or
future.thenAccept(res -> {

    // handle result

}).exceptionally(exception -> {

    // handle exception

});

因该避免在future listener中使用同步方法这样可能会造成redis请求/相应处理时的错误,应使用如下方式执行:

future.whenCompleteAsync((res, exception) -> {

    // handle both result and exception

}, executor);


// or
future.thenAcceptAsync(res -> {

    // handle result

}, executor).exceptionallyAsync(exception -> {

    // handle exception

}, executor);

Redisson Object的公共操作

所有redisson object都实现了RObject和RExpiration接口使用示例如下

RObject object = redisson.get...()

object.sizeInMemory();

object.delete();

object.rename("newname");

object.isExists();

// catch expired event
object.addListener(new ExpiredObjectListener() {
   ...
});

// catch delete event
object.addListener(new DeletedObjectListener() {
   ...
});

redisson object的name属性即是在redis中的key

RMap map = redisson.getMap("mymap");
map.getName(); // = mymap

和redis key相关的所有操作都通过RKeys接口暴露使用示例如下

RKeys keys = redisson.getKeys();

Iterable<String> allKeys = keys.getKeys();

Iterable<String> foundedKeys = keys.getKeysByPattern('key*');

long numOfDeletedKeys = keys.delete("obj1", "obj2", "obj3");

long deletedKeysAmount = keys.deleteByPattern("test?");

String randomKey = keys.randomKey();

long keysAmount = keys.count();

keys.flushall();

keys.flushdb();

分布式对象

object holder

RBucket

RBucket的java实现类是一个hodler可以持有任何类型的java对象。RBucket的大小限制为512MB.
RBucket的使用示例如下所示

RBucket<AnyObject> bucket = redisson.getBucket("anyObject");

bucket.set(new AnyObject(1));
AnyObject obj = bucket.get();

bucket.trySet(new AnyObject(3));
bucket.compareAndSet(new AnyObject(4), new AnyObject(5));
bucket.getAndSet(new AnyObject(6));

RBuckets

可以通过RBuckets对象来操作多个RBucket对象RBuckets使用示例如下

RBuckets buckets = redisson.getBuckets();

// get all bucket values
Map<String, V> loadedBuckets = buckets.get("myBucket1", "myBucket2", "myBucket3");

Map<String, Object> map = new HashMap<>();
map.put("myBucket1", new MyObject());
map.put("myBucket2", new MyObject());

// sets all or nothing if some bucket is already exists
buckets.trySet(map);
// store all at once
buckets.set(map);

Binary Stream Holder

RBinaryStream类型的对象用于存储字节序列。RBinaryStream实现了RBucket接口并且其大小限制也是512MB.
RBinaryStream的使用示例如下

RBinaryStream stream = redisson.getBinaryStream("anyStream");

byte[] content = ...
stream.set(content);
stream.getAndSet(content);
stream.trySet(content);
stream.compareAndSet(oldContent, content);

RBinaryStream可以和InputStream与OutputStream混用使用如下

RBinaryStream stream = redisson.getBinaryStream("anyStream");

InputStream is = stream.getInputStream();
byte[] readBuffer = ...
is.read(readBuffer);

OutputStream os = stream.getOuputStream();
byte[] contentToWrite = ...
os.write(contentToWrite);

BitSet

RBitSet实现提供了和java中BitSet类似的api其大小限制是4 294 967 295 bits.
RBitSet使用如下所示

RBitSet set = redisson.getBitSet("simpleBitset");

set.set(0, true);
set.set(1812, false);

set.clear(0);

set.and("anotherBitset");
set.xor("anotherBitset");

AtomicLong

RAtomicLong实现提供了和java中AtomicLong类似的api其使用类似如下

RAtomicLong atomicLong = redisson.getAtomicLong("myAtomicLong");
atomicLong.set(3);
atomicLong.incrementAndGet();
atomicLong.get();

AtomicDouble

RAtomicDouble实现提供了和java中AtomicDouble类似的api其使用示例如下

RAtomicDouble atomicDouble = redisson.getAtomicDouble("myAtomicDouble");
atomicDouble.set(2.81);
atomicDouble.addAndGet(4.11);
atomicDouble.get();

Topic

RTopic实现提供了发布/订阅机制其允许订阅由同名RTopic对象发布的事件。
当重新连接到redis或redis错误切换之后listener会被重新订阅在重新订阅之前所有被发布的消息都会丢失。
RTopic的使用如下所示

RTopic topic = redisson.getTopic("myTopic");
int listenerId = topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
    @Override
    public void onMessage(String channel, SomeObject message) {
        //...
    }
});

// in other thread or JVM
RTopic topic = redisson.getTopic("myTopic");
long clientsReceivedMessage = topic.publish(new SomeObject());

Reliable Topic

RRelieableTopic在实现发布/订阅模式的情况下还实现了消息的可靠传输。当redis连接断开的情况下所有消息都会被存储当重新连接到redis时消息会被重新传送。
每个RReliableTopic对象实例都由一个watchdog当第一个listener注册之后watchdog就会被启用。当订阅者超过reliableTopicWatchdogTimeout之后且watchdog没有为其延续过期时间那么订阅将会超时。该机制是为了防止client长时间崩溃之后存储的消息持续增长
当重新连接到redis之后listener会被重新注册

RReliableTopic topic = redisson.getReliableTopic("anyTopic");
topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
    @Override
    public void onMessage(CharSequence channel, SomeObject message) {
        //...
    }
});

// in other thread or JVM
RReliableTopic topic = redisson.getReliableTopic("anyTopic");
long subscribersReceivedMessage = topic.publish(new SomeObject());

Topic Pattern

RPatternTopic允许订阅多个Rtopic在重新连接到redis或redis错误切换之后listener会被重新订阅。
Pattern使用如下

  • topic? subscribes to topic1, topicA ...
  • topic?_my subscribes to topic_my, topic123_my, topicTEST_my ...
  • topic[ae] subscribes to topica and topice only

使用示例如下:

// subscribe to all topics by `topic*` pattern
RPatternTopic patternTopic = redisson.getPatternTopic("topic*");
int listenerId = patternTopic.addListener(Message.class, new PatternMessageListener<Message>() {
    @Override
    public void onMessage(String pattern, String channel, Message msg) {
        //...
    }
});

Bloom Filter

RBloomFilter中最多含有2^32个bit。
在使用之前必须通过tryInit(expectedInsertions, falseProbability)来初始化capacity。
RBloomFilter使用示例如下

RBloomFilter<SomeObject> bloomFilter = redisson.getBloomFilter("sample");
// initialize bloom filter with 
// expectedInsertions = 55000000
// falseProbability = 0.03
bloomFilter.tryInit(55000000L, 0.03);

bloomFilter.add(new SomeObject("field1Value", "field2Value"));
bloomFilter.add(new SomeObject("field5Value", "field8Value"));

bloomFilter.contains(new SomeObject("field1Value", "field8Value"));
bloomFilter.count();

HyperLogLog

RHyperLogLog能以较低的空间维护大数量的项目计算其去重后的数量其使用如下所示

RHyperLogLog<Integer> log = redisson.getHyperLogLog("log");
log.add(1);
log.add(2);
log.add(3);

log.count();

LongAdder

RLongAdder提供了java中LongAdder的实现其在client端维护了LongAdder增加和减少的性能较AtomicLong来说都有极大的提升至多可提升12000倍。其使用如下所示

RLongAdder atomicLong = redisson.getLongAdder("myLongAdder");
atomicLong.add(12);
atomicLong.increment();
atomicLong.decrement();
atomicLong.sum();

当LongAdder不再使用之后应该调用destroy方法手动进行销毁

RLongAdder atomicLong = ...
atomicLong.destroy();

DoubleAdder

RDoubleAdder提供了java中DoubleAdder的分布式实现其性能相对于AtomicDouble也有很大提升。
其使用如下所示:

RLongDouble atomicDouble = redisson.getLongDouble("myLongDouble");
atomicDouble.add(12);
atomicDouble.increment();
atomicDouble.decrement();
atomicDouble.sum();

id generator

RIdGenerator实现允许产生唯一的id但其生成算法不是简单的递增而是在第一次请求时一系列id就已经被分配并且缓存到java端直到其用完。该方法能够减少和redis的通信次数产生id的速率比RAtomicLong快。
默认情况下allocate size是2000并且值从0开始。
RIdGenerator的使用如下所示

RIdGenerator generator = redisson.getIdGenerator("generator");

// Initialize with start value = 12 and allocation size = 20000
generator.tryInit(12, 20000);

long id = generator.nextId();

Json Object Holder

RJsonBucket类型用于存储json数据通过redis 的JSON.*命令来实现。Json数据通过JsonCodec来进行编码和解码。可用的实现是org.redisson.codec.JacksonCodec。

local cache

redisson提供了带有本地缓存的json object holder版本。 local cache用于加速读取操作从而较少网络请求次数其将整个json对象缓存在redisson侧执行读取操作比无缓存快45倍。

redission client method name local cache Ultra-fast read/write
getJsonBucket() open-source version false false
getJsonBucket() Redisson PRO version false true
getLocalCachedJsonBucket() Redisson PRO version true true

RJsonBucket使用如下所示

RJsonBucket<AnyObject> bucket = redisson.getJsonBucket("anyObject", new JacksonCodec<>(AnyObject.class));

bucket.set(new AnyObject(1));
AnyObject obj = bucket.get();

bucket.trySet(new AnyObject(3));
bucket.compareAndSet(new AnyObject(4), new AnyObject(5));
bucket.getAndSet(new AnyObject(6));

List<String> values = bucket.get(new JacksonCodec<>(new TypeReference<List<String>>() {}), "values");
long aa = bucket.arrayAppend("$.obj.values", "t3", "t4");