diff --git a/spring/redisson/redisson.md b/spring/redisson/redisson.md index cd8bc8b..09ee0b6 100644 --- a/spring/redisson/redisson.md +++ b/spring/redisson/redisson.md @@ -904,4 +904,276 @@ RList list = redisson.getList("anyList"); list.add(new SomeObject()); list.get(0); list.remove(new SomeObject()); -``` \ No newline at end of file +``` +### Queue +基于Redis的Queue对象实现了java.util.Queue接口,其使用示例如下所示: +```java +RQueue queue = redisson.getQueue("anyQueue"); +queue.add(new SomeObject()); +SomeObject obj = queue.peek(); +SomeObject someObj = queue.poll(); +``` +### Deque +基于Redis的Deque实现了java.util.Deque接口,其使用示例如下所示: +```java +RDeque queue = redisson.getDeque("anyDeque"); +queue.addFirst(new SomeObject()); +queue.addLast(new SomeObject()); +SomeObject obj = queue.removeFirst(); +SomeObject someObj = queue.removeLast(); +``` +### Blocking Queue +基于Redis的分布式**无界**BlockingQueue实现了java.concurrent.BlockingQueue接口。 +RBlockingQueue的使用示例如下所示: +```java +RBlockingQueue queue = redisson.getBlockingQueue("anyQueue"); + +queue.offer(new SomeObject()); + +SomeObject obj = queue.peek(); +SomeObject obj = queue.poll(); +SomeObject obj = queue.poll(10, TimeUnit.MINUTES); +``` +对于基于Redisson的BlockingQueue,其poll/pollFromAny/pollLastAndOfferFirstTo/take方法,其阻塞操作都是基于发布/订阅机制实现的,当redisson重新连接到redisson-server之后,会自动重新注册。 +### Bounded Blocking Queue +基于Redis的分布式**有界**BoundedBlockingQueue实现了java.util.concurrent.Blocking接口,队列容量限制可以通过trySetCapacity方法来进行定义: +```java +RBoundedBlockingQueue queue = redisson.getBoundedBlockingQueue("anyQueue"); +// returns `true` if capacity set successfully and `false` if it already set. +queue.trySetCapacity(2); + +queue.offer(new SomeObject(1)); +queue.offer(new SomeObject(2)); +// will be blocked until free space available in queue +queue.put(new SomeObject()); + +SomeObject obj = queue.peek(); +SomeObject someObj = queue.poll(); +SomeObject ob = queue.poll(10, TimeUnit.MINUTES); +``` +### Blocking Deque +基于Redis的BlockingDeque,实现了java.util.BlockingDeque接口,其使用如下所示: +```java +RBlockingDeque deque = redisson.getBlockingDeque("anyDeque"); +deque.putFirst(1); +deque.putLast(2); +Integer firstValue = queue.takeFirst(); +Integer lastValue = queue.takeLast(); +Integer firstValue = queue.pollFirst(10, TimeUnit.MINUTES); +Integer lastValue = queue.pollLast(3, TimeUnit.MINUTES); +``` +### Blocking Fair Queue +基于Redis的BlockingFairQueue,实现了java.util.BlockingQueue接口。 +通常情况下,如果存在多个消费者,有的消费者网络状况较好,有的消费者网络状况较差,那么网络较好的消费者将会比网络较差的消费者消费更多的消息。 +而BlockingFairQueue者通过轮询等手段保证不同消费者之间的访问顺序,从而使得不同消费者消费的消息数量是均匀的。 +BlockingFairQueue的使用示例如下所示: +```java +RBlockingFairQueue queue = redisson.getBlockingFairQueue("myQueue"); +queue.offer(new SomeObject()); + +SomeObject element = queue.peek(); +SomeObject element = queue.poll(); +SomeObject element = queue.poll(10, TimeUnit.MINUTES); +SomeObject element = queue.take(); +``` +> 该特性仅在redisson pro中可用。 + +### Blocking Fair Deque +类似于BlockingFairQueue,该特性仅在redisson pro中可用。 +```java +RBlockingFairDeque deque = redisson.getBlockingFairDeque("myDeque"); +deque.offer(new SomeObject()); + +SomeObject firstElement = queue.peekFirst(); +SomeObject firstElement = queue.pollFirst(); +SomeObject firstElement = queue.pollFirst(10, TimeUnit.MINUTES); +SomeObject firstElement = queue.takeFirst(); + +SomeObject lastElement = queue.peekLast(); +SomeObject lastElement = queue.pollLast(); +SomeObject lastElement = queue.pollLast(10, TimeUnit.MINUTES); +SomeObject lastElement = queue.takeLast(); +``` +### DelayedQueue +基于Redis的Delayed Queue允许将队列中的元素以固定的延迟转移到目标队列中。目标队列可以是任何实现了RQueue的队列。 +DelayedQueue的使用示例如下: +```java +RBlockingQueue distinationQueue = ... +RDelayedQueue delayedQueue = getDelayedQueue(distinationQueue); +// move object to distinationQueue in 10 seconds +delayedQueue.offer("msg1", 10, TimeUnit.SECONDS); +// move object to distinationQueue in 1 minutes +delayedQueue.offer("msg2", 1, TimeUnit.MINUTES); + + +// msg1 will appear in 10 seconds +distinationQueue.poll(15, TimeUnit.SECONDS); + +// msg2 will appear in 2 seconds +distinationQueue.poll(2, TimeUnit.SECONDS); +``` +### PriorityQueue +基于Redis的PriorityQueue实现了java.util.Queue接口,元素按照Comparable或Comparator定义的顺序进行排序。 +> 默认情况下,PriorityQueue实现的是小根对,可以自定义比较器来实现大根堆 +tryComparator方法可以用于自定义比较器: +```java +public class Entry implements Comparable, Serializable { + + private String key; + private Integer value; + + public Entry(String key, Integer value) { + this.key = key; + this.value = value; + } + + @Override + public int compareTo(Entry o) { + return key.compareTo(o.key); + } + +} + +RPriorityQueue queue = redisson.getPriorityQueue("anyQueue"); +queue.add(new Entry("b", 1)); +queue.add(new Entry("c", 1)); +queue.add(new Entry("a", 1)); + +// Entry [a:1] +Entry e = queue.poll(); +// Entry [b:1] +Entry e = queue.poll(); +// Entry [c:1] +Entry e = queue.poll(); +``` +### PriorityDeque +基于Redis的PriorityDeque实现了java.util.Deque接口,其使用和PriorityQueue类似: +```java +public class Entry implements Comparable, Serializable { + + private String key; + private Integer value; + + public Entry(String key, Integer value) { + this.key = key; + this.value = value; + } + + @Override + public int compareTo(Entry o) { + return key.compareTo(o.key); + } + +} + +RPriorityDeque queue = redisson.getPriorityDeque("anyQueue"); +queue.add(new Entry("b", 1)); +queue.add(new Entry("c", 1)); +queue.add(new Entry("a", 1)); + +// Entry [a:1] +Entry e = queue.pollFirst(); +// Entry [c:1] +Entry e = queue.pollLast(); +``` +### PriorityBlockingQueue +基于PriorityBlockingQueue类似于java.concurrent.PriorityBlockingQueue,其使用示例如下: +```java +public class Entry implements Comparable, Serializable { + + private String key; + private Integer value; + + public Entry(String key, Integer value) { + this.key = key; + this.value = value; + } + + @Override + public int compareTo(Entry o) { + return key.compareTo(o.key); + } + +} + +RPriorityBlockingQueue queue = redisson.getPriorityBlockingQueue("anyQueue"); +queue.add(new Entry("b", 1)); +queue.add(new Entry("c", 1)); +queue.add(new Entry("a", 1)); + +// Entry [a:1] +Entry e = queue.take(); +``` +### PriorityBlockingDeque +基于Redis的PriorityBlockingDeque实现了java.util.concurrent.BlockingDeque接口,其使用示例如下: +```java +public class Entry implements Comparable, Serializable { + + private String key; + private Integer value; + + public Entry(String key, Integer value) { + this.key = key; + this.value = value; + } + + @Override + public int compareTo(Entry o) { + return key.compareTo(o.key); + } + +} + +RPriorityBlockingDeque queue = redisson.getPriorityBlockingDeque("anyQueue"); +queue.add(new Entry("b", 1)); +queue.add(new Entry("c", 1)); +queue.add(new Entry("a", 1)); + +// Entry [a:1] +Entry e = queue.takeFirst(); +// Entry [c:1] +Entry e = queue.takeLast(); +``` +### Stream +基于Redis的Stream对象允许创建consumer group,consumer group用于消费由生产者创建的消息。 +```java +RStream stream = redisson.getStream("test"); + +StreamMessageId sm = stream.add(StreamAddArgs.entry("0", "0")); + +stream.createGroup("testGroup"); + +StreamId id1 = stream.add(StreamAddArgs.entry("1", "1")); +StreamId id2 = stream.add(StreamAddArgs.entry("2", "2")); + +Map> group = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered()); +long amount = stream.ack("testGroup", id1, id2); +``` +### RingBuffer +基于Redis的RingBuffer实现了java.util.Queue,如果当前队列已经满了,那么会淘汰位于队列最前端的元素。 +队列容量大小通过trySetCapacity方法进行设置,RingBuffer的使用示例如下所示: +```java +RRingBuffer buffer = redisson.getRingBuffer("test"); + +// buffer capacity is 4 elements +buffer.trySetCapacity(4); + +buffer.add(1); +buffer.add(2); +buffer.add(3); +buffer.add(4); + +// buffer state is 1, 2, 3, 4 + +buffer.add(5); +buffer.add(6); + +// buffer state is 3, 4, 5, 6 +``` +### Transfer Queue +基于Reids的TransferQueue实现了java.util.concurrent.TransferQueue接口,提供了一系列的transfer方法,仅当值被consumer成功处理之后才会返回。 +对于TransferQueue,其提供了一系列的transfer方法,有阻塞版本和非阻塞版本。 +- tryTransfer会非阻塞的添加元素,当存在消费者调用poll获取当前元素时,该方法会将元素交给消费者,否则返回false。 +- transfer会阻塞等待,直到有消费者线程来获取该元素 +> 当调用阻塞版本trasfer之后,直到该transfer返回,其他线程无法向TransferQueue中添加元素。 + \ No newline at end of file