
1. preface
previous articlebriefly introduces Java of Future
I left a hole in the interface, and I'm here to make up for it today.Future
There are many realizations, and java.util.concurrent.FutureTask
It is the one most frequently mentioned. Today we will take a look at this implementation.
2. FutureTask
can be seen from the above FutureTask
existing Runnable
The characteristics are also Future
characteristics. It can be seen that the designer performs asynchronous calculations in the thread cycle and performs state control and result acquisition of the asynchronous calculations. And we were on the last article Future
Different states of asynchronous computing are FutureTask
A state machine is used to describe the state:
/** Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
It is also noted from the source code comments FutureTask
of, I simply drew a diagram to more clearly describe the relationship:
The above state flow runs through FutureTask
entire life cycle. Next, let's conduct some analysis of the flow of these states.
3. Status of FutureTask
3.1 NEW
This state is FutureTask
Use the state of initialization with the construct.FutureTask
There are two constructors, one is Callable
constructor as an argument; the other is Runnable
and generic result containers result
As a constructor for parameters, where Runnable
It was eventually converted Callable
。NEW The state did three things:
- initialization
FutureTask
。 - Initialize the logical thread that executes the task
Callable
。 - Set the current status to NEW 。
3.2 CANCELLED
This is actually talking aboutFuture
As already said at the interface, canceling the calculation means the end of the asynchronous calculation life cycle. For details, please refer to the relevant explanation of an article. But we still wanted to see how it was cancelled:
public boolean cancel(boolean mayInterruptIfRunning) {
//如果正处于NEW状态,希望请求正在运行也希望中断就设置为INTERRUTPTING,否则直接设置CANCELLED
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally {
// 更新到最终的打断状态
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 唤醒等待线程 来处理结束的状态
finishCompletion();
}
return true;
}
use an CAS Atomic operation to attempt to cancel. Currently if it is NEW The state is then combined with another policy parameter mayInterruptIfRunning
Let's see if it is being interrupted or has been cancelled, and decide whether to cancel. If a runtime interrupt is allowed to first update the status to INTERRUPTING state, and then the thread interrupt updates the state to INTERRUPTED 。
3.3 COMPLETING
Being completed. At first, I thought the task was in progress, but I was wrong. This status means that the calculation has been completed, but it has not been exposed, and it is being set to the exposed one. outcome
variables. so RUNNING Where is the state?
The run state transitions to a terminal state only in methods set,setException, and cancel.
Here are the relevant notes,RUN The state is just all set
method and cancel
A transitional state at the time. In fact, if you think about it, if the operating state remains unchanged, there is actually nothing we need to care about.isDone()
The method explains everything, as long as it is not NEW The status means that the task has been completed,But it's not over。
3.4 NORMAL
When the state is NEW converted to COMPLETING (This is anotherCAS After operation), the calculation results are exposed and assigned to outcome
, and then use spinlock to keep signaling to the waiting queue that the calculation has been completed. There is a very interesting place. The author's closing logic is very cleverly written:
private void finishCompletion() {
//The current state has been asserted that the task must have started, that is, it is not the initialization NEW state
// assert state > COMPLETING;
//There are currently threads suspended waiting to get results
for (WaitNode q; (q = waiters) != null;) {
//Preemption of threads is cleverly combined with for. Setting null directly is simple and practical.
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
//Keep spinning, of course LockSupport and Thread. Interrupted pairing must be spun
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
//Optimize recycling
q.next = null; // unlink to help gc
//Assignment to get the thread quickly and get the result quickly is also clever here
q = next;
}
break;
}
}
//Empty method You can overwrite it to make some records
done();
callable = null; // to reduce footprint
}
3.5 EXCEPTIONAL
There is not much to say when issuing a set exception state when an interrupt exception or other exception is thrown.
3.6 INTERRUPTING
When other threads learn that the current state is INTERRUPTING When, pass Thread.yield
Give up the current CPU Time slices and get ready to compete again CPU Dispatch rights. It's like discovering that something is being handled, we go out and wait again, and wait until someone else is done with it, and then we can try again.
3.7 INTERRUPTED
see comparative CANCELLED analysis.
4. How FutureTask works
FutureTask
In addition to controlling the state, everything else is based on the state and then specific strategies are implemented. There are two methods we actually use.
4.1 run
Asynchronous tasks are mainly calculated in this method, keeping in mind that the calculations are calculated in another thread.
public void run() {
// 如果当前不是NEW,或者当前任务已经有了其他执行线程执行 就不再重复
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
// 在另一个线程中进行计算 并记录结果 处理异常
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
// runner 用来执行 callable 并进行 cas
// 状态被设置后 runner 被设置为null
// 防止 run 方法并发
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
// 设置 runner 为 null 后需要检查一下状态 防止泄露中断
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
4.2 get
get
The method is a process of waiting for the result of a task. The core method is awaitDone
, when asynchronous tasks are executing, the thread hangs or waits directly for the task to complete, or waits directly for a timeout to abandon. See the following source code analysis:
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// 如果当前线程被中断就不停尝试移除等待队列
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 完成 取消或者发生异常 直接返回 不再等待
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 如果还在完成中 就先让出调度继续等待 yield 效率要高 它会继续抢占调度来尝试
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 如果刚刚进入等待状态就 初始化一个等待队列
else if (q == null)
q = new WaitNode();
// 尝试将没有入队的等待线程加入等待队列 基于cas
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 处理超时逻辑
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
// 不设置超时将一直阻塞到当前等待结果线程
LockSupport.park(this);
}
}
this paperFutureTask
I have analyzed the status control, operation, and obtained results in asynchronous computing. Due to my limited personal capabilities, I would like to give you more advice if there is anything wrong.
Comments0