DelayQueue
为了确定用户提交的播放记录是否变化,我们需要将播放记录保存为一个延迟任务,等待超过一个提交周期(20s)后检查播放进度。
那么延迟任务该如何实现呢?
1.1.延迟任务方案
延迟任务的实现方案有很多,常见的有四类:
DelayQueue | Redisson | MQ | 时间轮 | |
---|---|---|---|---|
原理 | JDK自带延迟队列,基于阻塞队列实现。 | 基于Redis数据结构模拟JDK的DelayQueue实现 | 利用MQ的特性。例如RabbitMQ的死信队列 | 时间轮算法 |
优点 |
|
|
|
|
缺点 |
|
|
|
|
以上四种方案都可以解决问题,接下来我会主要介绍DelayQueue的使用。
因为这种方案使用成本最低,而且不依赖任何第三方服务,减少了网络交互。
但缺点也很明显,就是需要占用JVM内存,在数据量非常大的情况下可能会有问题。但考虑到任务存储时间比较短(只有20秒),因此也可以接收。如果你们的数据量非常大,DelayQueue不能满足业务需求,大家也可以替换为其它延迟队列方式,例如Redisson、MQ等。
下面就是
1.2 DelayQueue的源码
/*
* ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* An unbounded {@linkplain BlockingQueue blocking queue} of
* {@code Delayed} elements, in which an element can only be taken
* when its delay has expired. The <em>head</em> of the queue is that
* {@code Delayed} element whose delay expired furthest in the
* past. If no delay has expired there is no head and {@code poll}
* will return {@code null}. Expiration occurs when an element's
* {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less
* than or equal to zero. Even though unexpired elements cannot be
* removed using {@code take} or {@code poll}, they are otherwise
* treated as normal elements. For example, the {@code size} method
* returns the count of both expired and unexpired elements.
* This queue does not permit null elements.
*
* <p>This class and its iterator implement all of the <em>optional</em>
* methods of the {@link Collection} and {@link Iterator} interfaces.
* The Iterator provided in method {@link #iterator()} is <em>not</em>
* guaranteed to traverse the elements of the DelayQueue in any
* particular order.
*
* <p>This class is a member of the
* <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
* Java Collections Framework</a>.
*
* @since 1.5
* @author Doug Lea
* @param <E> the type of elements held in this queue
*/
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
/**
* Thread designated to wait for the element at the head of
* the queue. This variant of the Leader-Follower pattern
* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
* minimize unnecessary timed waiting. When a thread becomes
* the leader, it waits only for the next delay to elapse, but
* other threads await indefinitely. The leader thread must
* signal some other thread before returning from take() or
* poll(...), unless some other thread becomes leader in the
* interim. Whenever the head of the queue is replaced with
* an element with an earlier expiration time, the leader
* field is invalidated by being reset to null, and some
* waiting thread, but not necessarily the current leader, is
* signalled. So waiting threads must be prepared to acquire
* and lose leadership while waiting.
*/
private Thread leader;
/**
* Condition signalled when a newer element becomes available
* at the head of the queue or a new thread may need to
* become leader.
*/
private final Condition available = lock.newCondition();
/**
* Creates a new {@code DelayQueue} that is initially empty.
*/
public DelayQueue() {}
/**
* Creates a {@code DelayQueue} initially containing the elements of the
* given collection of {@link Delayed} instances.
*
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
/**
* Inserts the specified element into this delay queue.
*
* @param e the element to add
* @return {@code true} (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
*/
public boolean add(E e) {
return offer(e);
}
/**
* Inserts the specified element into this delay queue.
*
* @param e the element to add
* @return {@code true}
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
/**
* Inserts the specified element into this delay queue. As the queue is
* unbounded this method will never block.
*
* @param e the element to add
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) {
offer(e);
}
/**
* Inserts the specified element into this delay queue. As the queue is
* unbounded this method will never block.
*
* @param e the element to add
* @param timeout This parameter is ignored as the method never blocks
* @param unit This parameter is ignored as the method never blocks
* @return {@code true}
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
/**
* Retrieves and removes the head of this queue, or returns {@code null}
* if this queue has no elements with an expired delay.
*
* @return the head of this queue, or {@code null} if this
* queue has no elements with an expired delay
*/
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
return (first == null || first.getDelay(NANOSECONDS) > 0)
? null
: q.poll();
} finally {
lock.unlock();
}
}
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue,
* or the specified wait time expires.
*
* @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element with
* an expired delay becomes available
* @throws InterruptedException {@inheritDoc}
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
if (nanos <= 0L)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
return q.poll();
if (nanos <= 0L)
return null;
first = null; // don't retain ref while waiting
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
/**
* Retrieves, but does not remove, the head of this queue, or
* returns {@code null} if this queue is empty. Unlike
* {@code poll}, if no expired elements are available in the queue,
* this method returns the element that will expire next,
* if one exists.
*
* @return the head of this queue, or {@code null} if this
* queue is empty
*/
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
}
}
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.size();
} finally {
lock.unlock();
}
}
/**
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
/**
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection<? super E> c, int maxElements) {
Objects.requireNonNull(c);
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = 0;
for (E first;
n < maxElements
&& (first = q.peek()) != null
&& first.getDelay(NANOSECONDS) <= 0;) {
c.add(first); // In this order, in case add() throws.
q.poll();
++n;
}
return n;
} finally {
lock.unlock();
}
}
/**
* Atomically removes all of the elements from this delay queue.
* The queue will be empty after this call returns.
* Elements with an unexpired delay are not waited for; they are
* simply discarded from the queue.
*/
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.clear();
} finally {
lock.unlock();
}
}
/**
* Always returns {@code Integer.MAX_VALUE} because
* a {@code DelayQueue} is not capacity constrained.
*
* @return {@code Integer.MAX_VALUE}
*/
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
/**
* Returns an array containing all of the elements in this queue.
* The returned array elements are in no particular order.
*
* <p>The returned array will be "safe" in that no references to it are
* maintained by this queue. (In other words, this method must allocate
* a new array). The caller is thus free to modify the returned array.
*
* <p>This method acts as bridge between array-based and collection-based
* APIs.
*
* @return an array containing all of the elements in this queue
*/
public Object[] toArray() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.toArray();
} finally {
lock.unlock();
}
}
/**
* Returns an array containing all of the elements in this queue; the
* runtime type of the returned array is that of the specified array.
* The returned array elements are in no particular order.
* If the queue fits in the specified array, it is returned therein.
* Otherwise, a new array is allocated with the runtime type of the
* specified array and the size of this queue.
*
* <p>If this queue fits in the specified array with room to spare
* (i.e., the array has more elements than this queue), the element in
* the array immediately following the end of the queue is set to
* {@code null}.
*
* <p>Like the {@link #toArray()} method, this method acts as bridge between
* array-based and collection-based APIs. Further, this method allows
* precise control over the runtime type of the output array, and may,
* under certain circumstances, be used to save allocation costs.
*
* <p>The following code can be used to dump a delay queue into a newly
* allocated array of {@code Delayed}:
*
* <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre>
*
* Note that {@code toArray(new Object[0])} is identical in function to
* {@code toArray()}.
*
* @param a the array into which the elements of the queue are to
* be stored, if it is big enough; otherwise, a new array of the
* same runtime type is allocated for this purpose
* @return an array containing all of the elements in this queue
* @throws ArrayStoreException if the runtime type of the specified array
* is not a supertype of the runtime type of every element in
* this queue
* @throws NullPointerException if the specified array is null
*/
public <T> T[] toArray(T[] a) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.toArray(a);
} finally {
lock.unlock();
}
}
/**
* Removes a single instance of the specified element from this
* queue, if it is present, whether or not it has expired.
*/
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.remove(o);
} finally {
lock.unlock();
}
}
/**
* Identity-based version for use in Itr.remove.
*/
void removeEQ(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
if (o == it.next()) {
it.remove();
break;
}
}
} finally {
lock.unlock();
}
}
/**
* Returns an iterator over all the elements (both expired and
* unexpired) in this queue. The iterator does not return the
* elements in any particular order.
*
* <p>The returned iterator is
* <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
*
* @return an iterator over the elements in this queue
*/
public Iterator<E> iterator() {
return new Itr(toArray());
}
/**
* Snapshot iterator that works off copy of underlying q array.
*/
private class Itr implements Iterator<E> {
final Object[] array; // Array of all elements
int cursor; // index of next element to return
int lastRet; // index of last element, or -1 if no such
Itr(Object[] array) {
lastRet = -1;
this.array = array;
}
public boolean hasNext() {
return cursor < array.length;
}
@SuppressWarnings("unchecked")
public E next() {
if (cursor >= array.length)
throw new NoSuchElementException();
return (E)array[lastRet = cursor++];
}
public void remove() {
if (lastRet < 0)
throw new IllegalStateException();
removeEQ(array[lastRet]);
lastRet = -1;
}
}
}
可见,Delayed类型的延迟任务具备两个功能:获取剩余延迟时间、比较执行顺序。当然,我们可以对Delayed做实现和功能扩展,比如添加延迟任务的数据。
将来每一次提交播放记录,就可以将播放记录保存在这样的一个Delayed
类型的延迟任务里并设定20秒的延迟时间。然后交给DelayQueue
队列。DelayQueue
会调用compareTo
方法,根据剩余延迟时间对任务排序。剩余延迟时间越短的越靠近队首,这样就会被优先执行。
1.3.DelayQueue的用法
首先定义一个Delayed类型的延迟任务类,要能保持任务数据。
package com.tianji.learning.utils;
import lombok.Data;
import java.time.Duration;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
@Data
public class DelayTask<D> implements Delayed {
private D data;
private long deadlineNanos;
public DelayTask(D data, Duration delayTime) {
this.data = data;
this.deadlineNanos = System.nanoTime() + delayTime.toNanos();
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(Math.max(0, deadlineNanos - System.nanoTime()), TimeUnit.NANOSECONDS);
}
@Override
public int compareTo(Delayed o) {
long l = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
if(l > 0){
return 1;
}else if(l < 0){
return -1;
}else {
return 0;
}
}
}
接下来就可以创建延迟任务,交给延迟队列保存:
package com.tianji.learning.utils;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.concurrent.DelayQueue;
@Slf4j
class DelayTaskTest {
@Test
void testDelayQueue() throws InterruptedException {
// 1.初始化延迟队列
DelayQueue<DelayTask<String>> queue = new DelayQueue<>();
// 2.向队列中添加延迟执行的任务
log.info("开始初始化延迟任务。。。。");
queue.add(new DelayTask<>("延迟任务3", Duration.ofSeconds(3)));
queue.add(new DelayTask<>("延迟任务1", Duration.ofSeconds(1)));
queue.add(new DelayTask<>("延迟任务2", Duration.ofSeconds(2)));
// TODO 3.尝试执行任务
}
}
最后,补上执行任务的代码:
package com.tianji.learning.utils;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.concurrent.DelayQueue;
@Slf4j
class DelayTaskTest {
@Test
void testDelayQueue() throws InterruptedException {
// 1.初始化延迟队列
DelayQueue<DelayTask<String>> queue = new DelayQueue<>();
// 2.向队列中添加延迟执行的任务
log.info("开始初始化延迟任务。。。。");
queue.add(new DelayTask<>("延迟任务3", Duration.ofSeconds(3)));
queue.add(new DelayTask<>("延迟任务1", Duration.ofSeconds(1)));
queue.add(new DelayTask<>("延迟任务2", Duration.ofSeconds(2)));
// 3.尝试执行任务
while (true) {
DelayTask<String> task = queue.take();
log.info("开始执行延迟任务:{}", task.getData());
}
}
}
标签:return,JDK,lock,element,queue,DelayQueue,array,public,延迟
From: https://blog.csdn.net/kwb18293575696/article/details/141527206