继续redisson分布式集合的阅读

This commit is contained in:
2023-02-07 19:58:33 +08:00
parent 3c2996feeb
commit 3a57ae584a

View File

@@ -905,3 +905,275 @@ list.add(new SomeObject());
list.get(0);
list.remove(new SomeObject());
```
### Queue
基于Redis的Queue对象实现了java.util.Queue接口其使用示例如下所示
```java
RQueue<SomeObject> queue = redisson.getQueue("anyQueue");
queue.add(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
```
### Deque
基于Redis的Deque实现了java.util.Deque接口其使用示例如下所示
```java
RDeque<SomeObject> queue = redisson.getDeque("anyDeque");
queue.addFirst(new SomeObject());
queue.addLast(new SomeObject());
SomeObject obj = queue.removeFirst();
SomeObject someObj = queue.removeLast();
```
### Blocking Queue
基于Redis的分布式**无界**BlockingQueue实现了java.concurrent.BlockingQueue接口。
RBlockingQueue的使用示例如下所示:
```java
RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");
queue.offer(new SomeObject());
SomeObject obj = queue.peek();
SomeObject obj = queue.poll();
SomeObject obj = queue.poll(10, TimeUnit.MINUTES);
```
对于基于Redisson的BlockingQueue其poll/pollFromAny/pollLastAndOfferFirstTo/take方法其阻塞操作都是基于发布/订阅机制实现的当redisson重新连接到redisson-server之后会自动重新注册。
### Bounded Blocking Queue
基于Redis的分布式**有界**BoundedBlockingQueue实现了java.util.concurrent.Blocking接口队列容量限制可以通过trySetCapacity方法来进行定义
```java
RBoundedBlockingQueue<SomeObject> queue = redisson.getBoundedBlockingQueue("anyQueue");
// returns `true` if capacity set successfully and `false` if it already set.
queue.trySetCapacity(2);
queue.offer(new SomeObject(1));
queue.offer(new SomeObject(2));
// will be blocked until free space available in queue
queue.put(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);
```
### Blocking Deque
基于Redis的BlockingDeque实现了java.util.BlockingDeque接口其使用如下所示
```java
RBlockingDeque<Integer> deque = redisson.getBlockingDeque("anyDeque");
deque.putFirst(1);
deque.putLast(2);
Integer firstValue = queue.takeFirst();
Integer lastValue = queue.takeLast();
Integer firstValue = queue.pollFirst(10, TimeUnit.MINUTES);
Integer lastValue = queue.pollLast(3, TimeUnit.MINUTES);
```
### Blocking Fair Queue
基于Redis的BlockingFairQueue实现了java.util.BlockingQueue接口。
通常情况下,如果存在多个消费者,有的消费者网络状况较好,有的消费者网络状况较差,那么网络较好的消费者将会比网络较差的消费者消费更多的消息。
而BlockingFairQueue者通过轮询等手段保证不同消费者之间的访问顺序从而使得不同消费者消费的消息数量是均匀的。
BlockingFairQueue的使用示例如下所示
```java
RBlockingFairQueue queue = redisson.getBlockingFairQueue("myQueue");
queue.offer(new SomeObject());
SomeObject element = queue.peek();
SomeObject element = queue.poll();
SomeObject element = queue.poll(10, TimeUnit.MINUTES);
SomeObject element = queue.take();
```
> 该特性仅在redisson pro中可用。
### Blocking Fair Deque
类似于BlockingFairQueue该特性仅在redisson pro中可用。
```java
RBlockingFairDeque deque = redisson.getBlockingFairDeque("myDeque");
deque.offer(new SomeObject());
SomeObject firstElement = queue.peekFirst();
SomeObject firstElement = queue.pollFirst();
SomeObject firstElement = queue.pollFirst(10, TimeUnit.MINUTES);
SomeObject firstElement = queue.takeFirst();
SomeObject lastElement = queue.peekLast();
SomeObject lastElement = queue.pollLast();
SomeObject lastElement = queue.pollLast(10, TimeUnit.MINUTES);
SomeObject lastElement = queue.takeLast();
```
### DelayedQueue
基于Redis的Delayed Queue允许将队列中的元素以固定的延迟转移到目标队列中。目标队列可以是任何实现了RQueue的队列。
DelayedQueue的使用示例如下
```java
RBlockingQueue<String> distinationQueue = ...
RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue);
// move object to distinationQueue in 10 seconds
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// move object to distinationQueue in 1 minutes
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);
// msg1 will appear in 10 seconds
distinationQueue.poll(15, TimeUnit.SECONDS);
// msg2 will appear in 2 seconds
distinationQueue.poll(2, TimeUnit.SECONDS);
```
### PriorityQueue
基于Redis的PriorityQueue实现了java.util.Queue接口元素按照Comparable或Comparator定义的顺序进行排序。
> 默认情况下PriorityQueue实现的是小根对可以自定义比较器来实现大根堆
tryComparator方法可以用于自定义比较器
```java
public class Entry implements Comparable<Entry>, Serializable {
private String key;
private Integer value;
public Entry(String key, Integer value) {
this.key = key;
this.value = value;
}
@Override
public int compareTo(Entry o) {
return key.compareTo(o.key);
}
}
RPriorityQueue<Entry> queue = redisson.getPriorityQueue("anyQueue");
queue.add(new Entry("b", 1));
queue.add(new Entry("c", 1));
queue.add(new Entry("a", 1));
// Entry [a:1]
Entry e = queue.poll();
// Entry [b:1]
Entry e = queue.poll();
// Entry [c:1]
Entry e = queue.poll();
```
### PriorityDeque
基于Redis的PriorityDeque实现了java.util.Deque接口其使用和PriorityQueue类似
```java
public class Entry implements Comparable<Entry>, Serializable {
private String key;
private Integer value;
public Entry(String key, Integer value) {
this.key = key;
this.value = value;
}
@Override
public int compareTo(Entry o) {
return key.compareTo(o.key);
}
}
RPriorityDeque<Entry> queue = redisson.getPriorityDeque("anyQueue");
queue.add(new Entry("b", 1));
queue.add(new Entry("c", 1));
queue.add(new Entry("a", 1));
// Entry [a:1]
Entry e = queue.pollFirst();
// Entry [c:1]
Entry e = queue.pollLast();
```
### PriorityBlockingQueue
基于PriorityBlockingQueue类似于java.concurrent.PriorityBlockingQueue其使用示例如下
```java
public class Entry implements Comparable<Entry>, Serializable {
private String key;
private Integer value;
public Entry(String key, Integer value) {
this.key = key;
this.value = value;
}
@Override
public int compareTo(Entry o) {
return key.compareTo(o.key);
}
}
RPriorityBlockingQueue<Entry> queue = redisson.getPriorityBlockingQueue("anyQueue");
queue.add(new Entry("b", 1));
queue.add(new Entry("c", 1));
queue.add(new Entry("a", 1));
// Entry [a:1]
Entry e = queue.take();
```
### PriorityBlockingDeque
基于Redis的PriorityBlockingDeque实现了java.util.concurrent.BlockingDeque接口其使用示例如下
```java
public class Entry implements Comparable<Entry>, Serializable {
private String key;
private Integer value;
public Entry(String key, Integer value) {
this.key = key;
this.value = value;
}
@Override
public int compareTo(Entry o) {
return key.compareTo(o.key);
}
}
RPriorityBlockingDeque<Entry> queue = redisson.getPriorityBlockingDeque("anyQueue");
queue.add(new Entry("b", 1));
queue.add(new Entry("c", 1));
queue.add(new Entry("a", 1));
// Entry [a:1]
Entry e = queue.takeFirst();
// Entry [c:1]
Entry e = queue.takeLast();
```
### Stream
基于Redis的Stream对象允许创建consumer groupconsumer group用于消费由生产者创建的消息。
```java
RStream<String, String> stream = redisson.getStream("test");
StreamMessageId sm = stream.add(StreamAddArgs.entry("0", "0"));
stream.createGroup("testGroup");
StreamId id1 = stream.add(StreamAddArgs.entry("1", "1"));
StreamId id2 = stream.add(StreamAddArgs.entry("2", "2"));
Map<StreamId, Map<String, String>> group = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());
long amount = stream.ack("testGroup", id1, id2);
```
### RingBuffer
基于Redis的RingBuffer实现了java.util.Queue如果当前队列已经满了那么会淘汰位于队列最前端的元素。
队列容量大小通过trySetCapacity方法进行设置RingBuffer的使用示例如下所示
```java
RRingBuffer<Integer> buffer = redisson.getRingBuffer("test");
// buffer capacity is 4 elements
buffer.trySetCapacity(4);
buffer.add(1);
buffer.add(2);
buffer.add(3);
buffer.add(4);
// buffer state is 1, 2, 3, 4
buffer.add(5);
buffer.add(6);
// buffer state is 3, 4, 5, 6
```
### Transfer Queue
基于Reids的TransferQueue实现了java.util.concurrent.TransferQueue接口提供了一系列的transfer方法仅当值被consumer成功处理之后才会返回。
对于TransferQueue其提供了一系列的transfer方法有阻塞版本和非阻塞版本。
- tryTransfer会非阻塞的添加元素当存在消费者调用poll获取当前元素时该方法会将元素交给消费者否则返回false。
- transfer会阻塞等待直到有消费者线程来获取该元素
> 当调用阻塞版本trasfer之后直到该transfer返回其他线程无法向TransferQueue中添加元素。