AQS源码解读----AbstractQueuedSynchronizer
36 package cn.com.pep; 37 import java.util.concurrent.TimeUnit; 38 import java.util.concurrent.locks.AbstractOwnableSynchronizer; 39 import java.util.concurrent.locks.Condition; 40 import java.util.concurrent.locks.LockSupport; 41 import java.util.ArrayList; 42 import java.util.Collection; 43 import java.util.Date; 44 45 import sun.misc.Unsafe; 46 47 /** 48 * Provides a framework for implementing blocking locks and related 49 * synchronizers (semaphores, events, etc) that rely on 50 * first-in-first-out (FIFO) wait queues. This class is designed to 51 * be a useful basis for most kinds of synchronizers that rely on a 52 * single atomic {@code int} value to represent state. Subclasses 53 * must define the protected methods that change this state, and which 54 * define what that state means in terms of this object being acquired 55 * or released. Given these, the other methods in this class carry 56 * out all queuing and blocking mechanics. Subclasses can maintain 57 * other state fields, but only the atomically updated {@code int} 58 * value manipulated using methods {@link #getState}, {@link 59 * #setState} and {@link #compareAndSetState} is tracked with respect 60 * to synchronization. 61 * 62 * <p>Subclasses should be defined as non-public internal helper 63 * classes that are used to implement the synchronization properties 64 * of their enclosing class. Class 65 * {@code AbstractQueuedSynchronizer} does not implement any 66 * synchronization interface. Instead it defines methods such as 67 * {@link #acquireInterruptibly} that can be invoked as 68 * appropriate by concrete locks and related synchronizers to 69 * implement their public methods. 70 * 71 * <p>This class supports either or both a default <em>exclusive</em> 72 * mode and a <em>shared</em> mode. When acquired in exclusive mode, 73 * attempted acquires by other threads cannot succeed. Shared mode 74 * acquires by multiple threads may (but need not) succeed. This class 75 * does not "understand" these differences except in the 76 * mechanical sense that when a shared mode acquire succeeds, the next 77 * waiting thread (if one exists) must also determine whether it can 78 * acquire as well. Threads waiting in the different modes share the 79 * same FIFO queue. Usually, implementation subclasses support only 80 * one of these modes, but both can come into play for example in a 81 * {@link ReadWriteLock}. Subclasses that support only exclusive or 82 * only shared modes need not define the methods supporting the unused mode. 83 * 84 * <p>This class defines a nested {@link ConditionObject} class that 85 * can be used as a {@link Condition} implementation by subclasses 86 * supporting exclusive mode for which method {@link 87 * #isHeldExclusively} reports whether synchronization is exclusively 88 * held with respect to the current thread, method {@link #release} 89 * invoked with the current {@link #getState} value fully releases 90 * this object, and {@link #acquire}, given this saved state value, 91 * eventually restores this object to its previous acquired state. No 92 * {@code AbstractQueuedSynchronizer} method otherwise creates such a 93 * condition, so if this constraint cannot be met, do not use it. The 94 * behavior of {@link ConditionObject} depends of course on the 95 * semantics of its synchronizer implementation. 96 * 97 * <p>This class provides inspection, instrumentation, and monitoring 98 * methods for the internal queue, as well as similar methods for 99 * condition objects. These can be exported as desired into classes 100 * using an {@code AbstractQueuedSynchronizer} for their 101 * synchronization mechanics. 102 * 103 * <p>Serialization of this class stores only the underlying atomic 104 * integer maintaining state, so deserialized objects have empty 105 * thread queues. Typical subclasses requiring serializability will 106 * define a {@code readObject} method that restores this to a known 107 * initial state upon deserialization. 108 * 109 * <h3>Usage</h3> 110 * 111 * <p>To use this class as the basis of a synchronizer, redefine the 112 * following methods, as applicable, by inspecting and/or modifying 113 * the synchronization state using {@link #getState}, {@link 114 * #setState} and/or {@link #compareAndSetState}: 115 * 116 * <ul> 117 * <li> {@link #tryAcquire} 118 * <li> {@link #tryRelease} 119 * <li> {@link #tryAcquireShared} 120 * <li> {@link #tryReleaseShared} 121 * <li> {@link #isHeldExclusively} 122 * </ul> 123 * 124 * Each of these methods by default throws {@link 125 * UnsupportedOperationException}. Implementations of these methods 126 * must be internally thread-safe, and should in general be short and 127 * not block. Defining these methods is the <em>only</em> supported 128 * means of using this class. All other methods are declared 129 * {@code final} because they cannot be independently varied. 130 * 131 * <p>You may also find the inherited methods from {@link 132 * AbstractOwnableSynchronizer} useful to keep track of the thread 133 * owning an exclusive synchronizer. You are encouraged to use them 134 * -- this enables monitoring and diagnostic tools to assist users in 135 * determining which threads hold locks. 136 * 137 * <p>Even though this class is based on an internal FIFO queue, it 138 * does not automatically enforce FIFO acquisition policies. The core 139 * of exclusive synchronization takes the form: 140 * 141 * <pre> 142 * Acquire: 143 * while (!tryAcquire(arg)) { 144 * <em>enqueue thread if it is not already queued</em>; 145 * <em>possibly block current thread</em>; 146 * } 147 * 148 * Release: 149 * if (tryRelease(arg)) 150 * <em>unblock the first queued thread</em>; 151 * </pre> 152 * 153 * (Shared mode is similar but may involve cascading signals.) 154 * 155 * <p id="barging">Because checks in acquire are invoked before 156 * enqueuing, a newly acquiring thread may <em>barge</em> ahead of 157 * others that are blocked and queued. However, you can, if desired, 158 * define {@code tryAcquire} and/or {@code tryAcquireShared} to 159 * disable barging by internally invoking one or more of the inspection 160 * methods, thereby providing a <em>fair</em> FIFO acquisition order. 161 * In particular, most fair synchronizers can define {@code tryAcquire} 162 * to return {@code false} if {@link #hasQueuedPredecessors} (a method 163 * specifically designed to be used by fair synchronizers) returns 164 * {@code true}. Other variations are possible. 165 * 166 * <p>Throughput and scalability are generally highest for the 167 * default barging (also known as <em>greedy</em>, 168 * <em>renouncement</em>, and <em>convoy-avoidance</em>) strategy. 169 * While this is not guaranteed to be fair or starvation-free, earlier 170 * queued threads are allowed to recontend before later queued 171 * threads, and each recontention has an unbiased chance to succeed 172 * against incoming threads. Also, while acquires do not 173 * "spin" in the usual sense, they may perform multiple 174 * invocations of {@code tryAcquire} interspersed with other 175 * computations before blocking. This gives most of the benefits of 176 * spins when exclusive synchronization is only briefly held, without 177 * most of the liabilities when it isn't. If so desired, you can 178 * augment this by preceding calls to acquire methods with 179 * "fast-path" checks, possibly prechecking {@link #hasContended} 180 * and/or {@link #hasQueuedThreads} to only do so if the synchronizer 181 * is likely not to be contended. 182 * 183 * <p>This class provides an efficient and scalable basis for 184 * synchronization in part by specializing its range of use to 185 * synchronizers that can rely on {@code int} state, acquire, and 186 * release parameters, and an internal FIFO wait queue. When this does 187 * not suffice, you can build synchronizers from a lower level using 188 * {@link java.util.concurrent.atomic atomic} classes, your own custom 189 * {@link java.util.Queue} classes, and {@link LockSupport} blocking 190 * support. 191 * 192 * <h3>Usage Examples</h3> 193 * 194 * <p>Here is a non-reentrant mutual exclusion lock class that uses 195 * the value zero to represent the unlocked state, and one to 196 * represent the locked state. While a non-reentrant lock 197 * does not strictly require recording of the current owner 198 * thread, this class does so anyway to make usage easier to monitor. 199 * It also supports conditions and exposes 200 * one of the instrumentation methods: 201 * 202 * <pre> {@code 203 * class Mutex implements Lock, java.io.Serializable { 204 * 205 * // Our internal helper class 206 * private static class Sync extends AbstractQueuedSynchronizer { 207 * // Reports whether in locked state 208 * protected boolean isHeldExclusively() { 209 * return getState() == 1; 210 * } 211 * 212 * // Acquires the lock if state is zero 213 * public boolean tryAcquire(int acquires) { 214 * assert acquires == 1; // Otherwise unused 215 * if (compareAndSetState(0, 1)) { 216 * setExclusiveOwnerThread(Thread.currentThread()); 217 * return true; 218 * } 219 * return false; 220 * } 221 * 222 * // Releases the lock by setting state to zero 223 * protected boolean tryRelease(int releases) { 224 * assert releases == 1; // Otherwise unused 225 * if (getState() == 0) throw new IllegalMonitorStateException(); 226 * setExclusiveOwnerThread(null); 227 * setState(0); 228 * return true; 229 * } 230 * 231 * // Provides a Condition 232 * Condition newCondition() { return new ConditionObject(); } 233 * 234 * // Deserializes properly 235 * private void readObject(ObjectInputStream s) 236 * throws IOException, ClassNotFoundException { 237 * s.defaultReadObject(); 238 * setState(0); // reset to unlocked state 239 * } 240 * } 241 * 242 * // The sync object does all the hard work. We just forward to it. 243 * private final Sync sync = new Sync(); 244 * 245 * public void lock() { sync.acquire(1); } 246 * public boolean tryLock() { return sync.tryAcquire(1); } 247 * public void unlock() { sync.release(1); } 248 * public Condition newCondition() { return sync.newCondition(); } 249 * public boolean isLocked() { return sync.isHeldExclusively(); } 250 * public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } 251 * public void lockInterruptibly() throws InterruptedException { 252 * sync.acquireInterruptibly(1); 253 * } 254 * public boolean tryLock(long timeout, TimeUnit unit) 255 * throws InterruptedException { 256 * return sync.tryAcquireNanos(1, unit.toNanos(timeout)); 257 * } 258 * }}</pre> 259 * 260 * <p>Here is a latch class that is like a 261 * {@link java.util.concurrent.CountDownLatch CountDownLatch} 262 * except that it only requires a single {@code signal} to 263 * fire. Because a latch is non-exclusive, it uses the {@code shared} 264 * acquire and release methods. 265 * 266 * <pre> {@code 267 * class BooleanLatch { 268 * 269 * private static class Sync extends AbstractQueuedSynchronizer { 270 * boolean isSignalled() { return getState() != 0; } 271 * 272 * protected int tryAcquireShared(int ignore) { 273 * return isSignalled() ? 1 : -1; 274 * } 275 * 276 * protected boolean tryReleaseShared(int ignore) { 277 * setState(1); 278 * return true; 279 * } 280 * } 281 * 282 * private final Sync sync = new Sync(); 283 * public boolean isSignalled() { return sync.isSignalled(); } 284 * public void signal() { sync.releaseShared(1); } 285 * public void await() throws InterruptedException { 286 * sync.acquireSharedInterruptibly(1); 287 * } 288 * }}</pre> 289 * 290 * @since 1.5 291 * @author Doug Lea 292 */ 293 public abstract class AbstractQueuedSynchronizer 294 extends AbstractOwnableSynchronizer 295 implements java.io.Serializable { 296 297 private static final long serialVersionUID = 7373984972572414691L; 298 299 /** 300 * Creates a new {@code AbstractQueuedSynchronizer} instance 301 * with initial synchronization state of zero. 302 */ 303 protected AbstractQueuedSynchronizer() { } 304 305 /** 306 * Wait queue node class. 307 * 308 * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and 309 * Hagersten) lock queue. CLH locks are normally used for 310 * spinlocks. We instead use them for blocking synchronizers, but 311 * use the same basic tactic of holding some of the control 312 * information about a thread in the predecessor of its node. A 313 * "status" field in each node keeps track of whether a thread 314 * should block. A node is signalled when its predecessor 315 * releases. Each node of the queue otherwise serves as a 316 * specific-notification-style monitor holding a single waiting 317 * thread. The status field does NOT control whether threads are 318 * granted locks etc though. A thread may try to acquire if it is 319 * first in the queue. But being first does not guarantee success; 320 * it only gives the right to contend. So the currently released 321 * contender thread may need to rewait. 322 * 323 * <p>To enqueue into a CLH lock, you atomically splice it in as new 324 * tail. To dequeue, you just set the head field. 325 * <pre> 326 * +------+ prev +-----+ +-----+ 327 * head | | <---- | | <---- | | tail 328 * +------+ +-----+ +-----+ 329 * </pre> 330 * 331 * <p>Insertion into a CLH queue requires only a single atomic 332 * operation on "tail", so there is a simple atomic point of 333 * demarcation from unqueued to queued. Similarly, dequeuing 334 * involves only updating the "head". However, it takes a bit 335 * more work for nodes to determine who their successors are, 336 * in part to deal with possible cancellation due to timeouts 337 * and interrupts. 338 * 339 * <p>The "prev" links (not used in original CLH locks), are mainly 340 * needed to handle cancellation. If a node is cancelled, its 341 * successor is (normally) relinked to a non-cancelled 342 * predecessor. For explanation of similar mechanics in the case 343 * of spin locks, see the papers by Scott and Scherer at 344 * http://www.cs.rochester.edu/u/scott/synchronization/ 345 * 346 * <p>We also use "next" links to implement blocking mechanics. 347 * The thread id for each node is kept in its own node, so a 348 * predecessor signals the next node to wake up by traversing 349 * next link to determine which thread it is. Determination of 350 * successor must avoid races with newly queued nodes to set 351 * the "next" fields of their predecessors. This is solved 352 * when necessary by checking backwards from the atomically 353 * updated "tail" when a node's successor appears to be null. 354 * (Or, said differently, the next-links are an optimization 355 * so that we don't usually need a backward scan.) 356 * 357 * <p>Cancellation introduces some conservatism to the basic 358 * algorithms. Since we must poll for cancellation of other 359 * nodes, we can miss noticing whether a cancelled node is 360 * ahead or behind us. This is dealt with by always unparking 361 * successors upon cancellation, allowing them to stabilize on 362 * a new predecessor, unless we can identify an uncancelled 363 * predecessor who will carry this responsibility. 364 * 365 * <p>CLH queues need a dummy header node to get started. But 366 * we don't create them on construction, because it would be wasted 367 * effort if there is never contention. Instead, the node 368 * is constructed and head and tail pointers are set upon first 369 * contention. 370 * 371 * <p>Threads waiting on Conditions use the same nodes, but 372 * use an additional link. Conditions only need to link nodes 373 * in simple (non-concurrent) linked queues because they are 374 * only accessed when exclusively held. Upon await, a node is 375 * inserted into a condition queue. Upon signal, the node is 376 * transferred to the main queue. A special value of status 377 * field is used to mark which queue a node is on. 378 * 379 * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill 380 * Scherer and Michael Scott, along with members of JSR-166 381 * expert group, for helpful ideas, discussions, and critiques 382 * on the design of this class. 383 */ 384 static final class Node { 385 /** Marker to indicate a node is waiting in shared mode */ 386 static final Node SHARED = new Node(); 387 /** Marker to indicate a node is waiting in exclusive mode */ 388 static final Node EXCLUSIVE = null; 389 390 /** waitStatus value to indicate thread has cancelled */ 391 static final int CANCELLED = 1; 392 /** waitStatus value to indicate successor's thread needs unparking */ 393 static final int SIGNAL = -1; 394 /** waitStatus value to indicate thread is waiting on condition */ 395 static final int CONDITION = -2; 396 /** 397 * waitStatus value to indicate the next acquireShared should 398 * unconditionally propagate 399 */ 400 static final int PROPAGATE = -3; 401 402 /** 403 * Status field, taking on only the values: 404 * SIGNAL: The successor of this node is (or will soon be) 405 * blocked (via park), so the current node must 406 * unpark its successor when it releases or 407 * cancels. To avoid races, acquire methods must 408 * first indicate they need a signal, 409 * then retry the atomic acquire, and then, 410 * on failure, block. 411 * CANCELLED: This node is cancelled due to timeout or interrupt. 412 * Nodes never leave this state. In particular, 413 * a thread with cancelled node never again blocks. 414 * CONDITION: This node is currently on a condition queue. 415 * It will not be used as a sync queue node 416 * until transferred, at which time the status 417 * will be set to 0. (Use of this value here has 418 * nothing to do with the other uses of the 419 * field, but simplifies mechanics.) 420 * PROPAGATE: A releaseShared should be propagated to other 421 * nodes. This is set (for head node only) in 422 * doReleaseShared to ensure propagation 423 * continues, even if other operations have 424 * since intervened. 425 * 0: None of the above 426 * 427 * The values are arranged numerically to simplify use. 428 * Non-negative values mean that a node doesn't need to 429 * signal. So, most code doesn't need to check for particular 430 * values, just for sign. 431 * 432 * The field is initialized to 0 for normal sync nodes, and 433 * CONDITION for condition nodes. It is modified using CAS 434 * (or when possible, unconditional volatile writes). 435 */ 436 volatile int waitStatus; 437 438 /** 439 * Link to predecessor node that current node/thread relies on 440 * for checking waitStatus. Assigned during enqueuing, and nulled 441 * out (for sake of GC) only upon dequeuing. Also, upon 442 * cancellation of a predecessor, we short-circuit while 443 * finding a non-cancelled one, which will always exist 444 * because the head node is never cancelled: A node becomes 445 * head only as a result of successful acquire. A 446 * cancelled thread never succeeds in acquiring, and a thread only 447 * cancels itself, not any other node. 448 */ 449 volatile Node prev; 450 451 /** 452 * Link to the successor node that the current node/thread 453 * unparks upon release. Assigned during enqueuing, adjusted 454 * when bypassing cancelled predecessors, and nulled out (for 455 * sake of GC) when dequeued. The enq operation does not 456 * assign next field of a predecessor until after attachment, 457 * so seeing a null next field does not necessarily mean that 458 * node is at end of queue. However, if a next field appears 459 * to be null, we can scan prev's from the tail to 460 * double-check. The next field of cancelled nodes is set to 461 * point to the node itself instead of null, to make life 462 * easier for isOnSyncQueue. 463 */ 464 volatile Node next; 465 466 /** 467 * The thread that enqueued this node. Initialized on 468 * construction and nulled out after use. 469 */ 470 volatile Thread thread; 471 472 /** 473 * Link to next node waiting on condition, or the special 474 * value SHARED. Because condition queues are accessed only 475 * when holding in exclusive mode, we just need a simple 476 * linked queue to hold nodes while they are waiting on 477 * conditions. They are then transferred to the queue to 478 * re-acquire. And because conditions can only be exclusive, 479 * we save a field by using special value to indicate shared 480 * mode. 481 */ 482 Node nextWaiter; 483 484 /** 485 * Returns true if node is waiting in shared mode. 486 */ 487 final boolean isShared() { 488 return nextWaiter == SHARED; 489 } 490 491 /** 492 * Returns previous node, or throws NullPointerException if null. 493 * Use when predecessor cannot be null. The null check could 494 * be elided, but is present to help the VM. 495 * 496 * @return the predecessor of this node 497 */ 498 final Node predecessor() throws NullPointerException { 499 Node p = prev; 500 if (p == null) 501 throw new NullPointerException(); 502 else 503 return p; 504 } 505 506 Node() { // Used to establish initial head or SHARED marker 507 } 508 509 Node(Thread thread, Node mode) { // Used by addWaiter 510 this.nextWaiter = mode; 511 this.thread = thread; 512 } 513 514 Node(Thread thread, int waitStatus) { // Used by Condition 515 this.waitStatus = waitStatus; 516 this.thread = thread; 517 } 518 } 519 520 /** 521 * Head of the wait queue, lazily initialized. Except for 522 * initialization, it is modified only via method setHead. Note: 523 * If head exists, its waitStatus is guaranteed not to be 524 * CANCELLED. 525 */ 526 private transient volatile Node head; 527 528 /** 529 * Tail of the wait queue, lazily initialized. Modified only via 530 * method enq to add new wait node. 531 */ 532 private transient volatile Node tail; 533 534 /** 535 * The synchronization state. 536 */ 537 private volatile int state; 538 539 /** 540 * Returns the current value of synchronization state. 541 * This operation has memory semantics of a {@code volatile} read. 542 * @return current state value 543 */ 544 protected final int getState() { 545 return state; 546 } 547 548 /** 549 * Sets the value of synchronization state. 550 * This operation has memory semantics of a {@code volatile} write. 551 * @param newState the new state value 552 */ 553 protected final void setState(int newState) { 554 state = newState; 555 } 556 557 /** 558 * Atomically sets synchronization state to the given updated 559 * value if the current state value equals the expected value. 560 * This operation has memory semantics of a {@code volatile} read 561 * and write. 562 * 563 * @param expect the expected value 564 * @param update the new value 565 * @return {@code true} if successful. False return indicates that the actual 566 * value was not equal to the expected value. 567 */ 568 protected final boolean compareAndSetState(int expect, int update) { 569 // See below for intrinsics setup to support this 570 return unsafe.compareAndSwapInt(this, stateOffset, expect, update); 571 } 572 573 // Queuing utilities 574 575 /** 576 * The number of nanoseconds for which it is faster to spin 577 * rather than to use timed park. A rough estimate suffices 578 * to improve responsiveness with very short timeouts. 579 */ 580 static final long spinForTimeoutThreshold = 1000L; 581 582 /** 583 * Inserts node into queue, initializing if necessary. See picture above. 584 * @param node the node to insert 585 * @return node's predecessor 586 */ 587 private Node enq(final Node node) { 588 /*"自旋",将给定的节点插入到同步队列的尾部*/ 589 for (;;) { 590 Node t = tail; 591 /*同步队列为空,则dummy哑节点作为同步队列的头结点head,并且将尾节点tail也指向头结点head*/ 592 if (t == null) { 593 /*CAS操作,设置同步队列的头结点*/ 594 if (compareAndSetHead(new Node())) { 595 /*将尾节点设置为头结点,进入下次"自旋"*/ 596 tail = head; 597 } 598 }else { 599 /*尾部节点不为空,则进行正常添加动作*/ 600 node.prev = t; 601 /*CAS操作,设置同步队列的头结点*/ 602 if (compareAndSetTail(t, node)) { 603 t.next = node; 604 return t; 605 } 606 } 607 } 608 } 609 610 /** 611 * Creates and enqueues node for current thread and given mode. 612 * 以给定的模式包装当前线程节点,将当前节点加入到阻塞队列的队尾 613 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 614 * @return the new node 615 */ 616 private Node addWaiter(Node mode) { 617 /*以给定的模式将当前线程包装成node节点*/ 618 Node node = new Node(Thread.currentThread(), mode); 619 620 /*快速采用尾插法,将当前节点插入到同步队列的队尾*/ 621 Node predNode = tail; 622 if (predNode != null) { 623 /*preNode <-- node*/ 624 node.prev = predNode; 625 /*采用CAS将node设置阻塞队列的尾节点,设置成功,说明没有并发*/ 626 if (compareAndSetTail(tail, node)) { 627 predNode.next = node; 628 /*尾插法插入成功,则直接返回当前节点*/ 629 return node; 630 } 631 } 632 /*"自旋"将节点加入到队列的尾部,直到成功为止*/ 633 enq(node); 634 return node; 635 } 636 637 /** 638 * Sets head of queue to be node, thus dequeuing. Called only by 639 * acquire methods. Also nulls out unused fields for sake of GC 640 * and to suppress unnecessary signals and traversals. 641 * 642 * @param node the node 643 */ 644 private void setHead(Node node) { 645 head = node; 646 node.thread = null; 647 node.prev = null; 648 } 649 650 /** 651 * Wakes up node's successor, if one exists. 652 * 653 * @param node the node 654 */ 655 private void unparkSuccessor(Node node) { 656 /* 657 * If status is negative (i.e., possibly needing signal) try 658 * to clear in anticipation of signalling. It is OK if this 659 * fails or if status is changed by waiting thread. 660 */ 661 //在这里,这个节点其实是同步队列的头结点,头结点唤醒后继节点之后,使命就完成了,所以应该将其状态置为0 662 int ws = node.waitStatus; 663 if (ws < 0) 664 compareAndSetWaitStatus(node, ws, 0); 665 666 /* 667 * Thread to unpark is held in successor, which is normally 668 * just the next node. But if cancelled or apparently null, 669 * traverse backwards from tail to find the actual 670 * non-cancelled successor. 671 */ 672 Node s = node.next; 673 //因为s.next相当于从同步队列的头部遍历所以可能会出现s == null的情况,上面分析过原因,不再赘述了。 674 if (s == null || s.waitStatus > 0) { 675 s = null; 676 //从同步队列的尾部向前遍历,找到当前node节点(头结点)的最近的有效后继节点 677 for (Node t = tail; t != null && t != node; t = t.prev) 678 if (t.waitStatus <= 0) 679 s = t; 680 } 681 682 /** 683 * 找到最近的有效后继节点,则唤醒后继节点中的线程在parkAndCheckInterrupt()方法上的阻塞,去尝试竞争共享资源, 684 * 这就体现了线程之间的协作,而在这个竞争的过程中也会忽略这个Node.CANCELLED状态的节点,这当前node节点也就放弃了竞争共享资源的机会,相当于出队了。 685 */ 686 if (s != null) 687 LockSupport.unpark(s.thread); 688 } 689 690 /** 691 * Release action for shared mode -- signals successor and ensures 692 * propagation. (Note: For exclusive mode, release just amounts 693 * to calling unparkSuccessor of head if it needs signal.) 694 */ 695 private void doReleaseShared() { 696 /* 697 * Ensure that a release propagates, even if there are other 698 * in-progress acquires/releases. This proceeds in the usual 699 * way of trying to unparkSuccessor of head if it needs 700 * signal. But if it does not, status is set to PROPAGATE to 701 * ensure that upon release, propagation continues. 702 * Additionally, we must loop in case a new node is added 703 * while we are doing this. Also, unlike other uses of 704 * unparkSuccessor, we need to know if CAS to reset status 705 * fails, if so rechecking. 706 */ 707 for (;;) { 708 Node h = head; 709 if (h != null && h != tail) { 710 int ws = h.waitStatus; 711 if (ws == Node.SIGNAL) { 712 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 713 continue; // loop to recheck cases 714 unparkSuccessor(h); 715 } 716 else if (ws == 0 && 717 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 718 continue; // loop on failed CAS 719 } 720 if (h == head) // loop if head changed 721 break; 722 } 723 } 724 725 /** 726 * Sets head of queue, and checks if successor may be waiting 727 * in shared mode, if so propagating if either propagate > 0 or 728 * PROPAGATE status was set. 729 * 730 * @param node the node 731 * @param propagate the return value from a tryAcquireShared 732 */ 733 private void setHeadAndPropagate(Node node, int propagate) { 734 //后继节点成功获取了共享锁,队列的"旧head"还没有改变,将其保存下来,锁定到方法的局部变量做后序的判断使用; 735 Node h = head; // Record old head for check below 736 /** 737 * 将这个获取共享锁成功的后继节点设置为同步队列的“新head”,此时同步队列的head发生变化, 此线程还未唤起任何线程。 738 */ 739 setHead(node); 740 /** 741 * 1、h == null这个条件什么时候成立呢?仔细翻了下AQS中的源码发现: 742 * 这个setHeadAndPropagate()方法只在共享锁模式下,同步队列head的后继节点成功获取了共享锁才会调用。 743 * 获取到共享锁的当前线程是同步队列的头结点的后继节点,"旧head"有后继节点,说明同步队列不为空,那么"旧head"也必定不为空, 744 * 此方法中第一行通过h == head,在执行setHead(node)方法之前将"旧head"保存了下来,所以h == null必定不会成立, 745 * 至于为什么这么写呢? 查阅了下资料网上说"发现这个是防止空指针异常发生的标准写法(既如果要取一个对象的某个属性进行判断的时候,首先对这个对象进行null判断)。" 746 * 这说的过去吧? 747 * 748 * 2、(h = head) == null这个条件什么时候成立呢? 749 * 这个条件也是不可能成立的,下面这种情况应该是最常见的: 750 * (1)、例如有个Semaphore实例s初始化了2个许可,线程A首先调用s.acquire(2)申请了两个许可,成功申请到了许可; 751 * (2)、线程B调用了s.acquire()方法申请一个许可,申请失败,加入到同步队列; 752 * (3)、线程C调用了s.acquire()方法申请一个许可,申请失败,加入到同步队列; 753 * (4)、线程A调用了s.releaseShared(2)方法释放了两个许可,再调用doReleaseShared()方法,进行同步队列唤醒; 754 * (5)、首先唤醒了同步队列中的线程B,B线程获取到共享锁: 755 * a)、如果此时线程B还未setHead(Node)方法,还未改变同步队列的head头结点,那么线程A的唤醒工作就结束,也仅仅只是唤醒了同步队列中的线程B, 756 * 则必定有(h = head) == Node(C) != null成立,线程C的唤醒工作仍然需要线程B去执行; 757 * b)、如果此时线程B执行了SetHead(Node)方法,改变了同步队列的head头结点,那么线程A同时也会唤醒线程C,相当于线程A同时唤醒了线程B和线程C: 758 * 1)、如果线程C中的setHeadAndPropagate()在线程B前调用完毕(即线程C执行了setHead()方法改变了同步队列的head),那么 (h = head) == Node(C); 759 * 2)、如果线程C中的setHeadAndPropagate()在线程B之后才调用(即线程C此时还未执行setHead()方法,未改变同步队列的head),那么 (h = head) == Node(B) 760 * 所以综上所述,只要执行过addWaiter()方法,向同步队列中添加过线程,那么(h = head)== null必定不成立。只能理解为“防止空指针的标准写法”。 761 */ 762 if (propagate > 0 || h == null || h.waitStatus < 0 || 763 (h = head) == null || h.waitStatus < 0) { 764 Node s = node.next; 765 /** 766 * s == null这种情况是可能存在的,如果当前唤醒的这个node节点是同步队列的尾节点就可能出现node.next == null; 767 * s.isShared()指定是共享锁模式,当前线程获取共享锁之后,是需要尝试唤醒同步队列中的其它线程的。 768 */ 769 if (s == null || s.isShared()) 770 doReleaseShared(); 771 } 772 } 773 774 // Utilities for various versions of acquire 775 776 /** 777 * Cancels an ongoing attempt to acquire. 778 * @param node the node 779 */ 780 private void cancelAcquire(Node node) { 781 //当前节点为空,则说明当前线程永远不会被调度到了,所以直接返回 782 if (node == null) { 783 return; 784 } 785 786 /** 787 * 接下来将点前Node节点从同步队列出队,主要做以下几件事: 788 * 1、将当前节点不与任何线程绑定,设置当前节点为Node.CANCELLED状态; 789 * 2、将当前取消节点的前置非取消节点和后置非取消节点"链接"起来; 790 * 3、如果前置节点释放了锁,那么当前取消节点承担起后续节点的唤醒职责。 791 */ 792 793 //1、取消当前节点与线程的绑定 794 node.thread = null; 795 796 //2、找到当前节点的有效前继节点pred 797 Node pred = node.prev; 798 while (pred.waitStatus > 0) { 799 //为什么双向链表从后往前遍历呢?而不是从前往后遍历呢? 800 node.prev = pred = pred.prev; 801 } 802 803 //用作CAS操作时候的条件判断需要使用的值 804 Node predNext = pred.next; 805 806 //3、将当前节点设置为取消状态 807 node.waitStatus = Node.CANCELLED; 808 809 /** 810 * 接下来就需要将当前取消节点的前后两个有效节点"链接"起来了,"达成让当前node节点出队的目的"。 811 * 这里按照node节点在同步队列中的不同位置分了三种情况: 812 * 1、node节点是同步队列的尾节点tail; 813 * 2、node节点既不是同步队列头结点head的后继节点,也不是尾节点tail; 814 * 3、node节点是同步队列头结点head的后继节点; 815 */ 816 817 //1、node是尾节点,并且执行过程中没有并发,直接将pred设置为同步队列的tail 818 if (node == tail && compareAndSetTail(node, pred)) { 819 /* 820 * 此时pred已经设置为同步队列的tail,需要通过CAS操作,将pred的next指向null,没有节点再引用node,就完成了node节点的出队 821 * 可以看出出队操作会破坏这个同步队列的next指针,这应该“向链表从后往前遍历呢?而不是从前往后遍历呢”的原因吧? 822 */ 823 compareAndSetNext(pred, predNext, null); 824 }else { 825 /* 826 * 2、node不是尾节点,也不是头结点head的后继节点,那么当前节点node出队以后,node的有效前继结点pred, 827 * 就有义务在它自身释放资源的时候,唤醒node的有效后继节点successor,即将pred的状态设置为Node.SIGNAL; 828 */ 829 int ws; 830 //能执行到这里,说明当前node节点不是head的后继节点,也不是同步队列tail节点 831 if (pred != head && 832 ((ws = pred.waitStatus) == Node.SIGNAL || 833 //前继节点状态虽然有效但不是SIGNAL,采用CAS操作设置为SIGNAL确保后继有效节点可以被唤醒 834 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && 835 pred.thread != null) { 836 Node next = node.next; 837 //只负责唤醒有效后继节点 838 if (next != null && next.waitStatus <= 0) { 839 /** 840 * 下面这段代码相当于将pred-->next,我们提到这个同步队列是个双向队列,那么pred<--next这是谁执行的呢? 841 * 答案是其他线程:其它线程在获取共享资源在同步队列中阻塞的时候,调用shouldParkAfterFailedAcquire()方法, 842 * 从后向前遍历队列,寻找能唤醒它的有效前继节点,当找到node的时候,因为它的状态已经是Node.CANCELLED,所以会忽略node节点, 843 * 直到遍历到有效前继节点pred,将next.prev执行pred,即next--->pred,没有节点再引用node节点,所以node节点至此才完成出队。 844 */ 845 compareAndSetNext(pred, predNext, next); 846 } 847 }else { 848 //3、说明node节点是同步队列head的后继节点,调用unparkSuccessor(Node)"出队"。 849 unparkSuccessor(node); 850 } 851 852 node.next = node;//help GC 853 } 854 } 855 856 /** 857 * Checks and updates status for a node that failed to acquire. 858 * Returns true if thread should block. This is the main signal 859 * control in all acquire loops. Requires that pred == node.prev. 860 * 861 * @param pred node's predecessor holding status 862 * @param node the node 863 * @return {@code true} if thread should block 864 */ 865 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 866 int ws = pred.waitStatus; 867 /*判断前驱结点的状态,只有前驱结点的状态为SIGNAL,后继节点才能被唤醒,所以其可以安心地挂起来了*/ 868 if (ws == node.waitStatus) { 869 return true; 870 } 871 872 /*ws>0表示前驱结点中的线程已经被取消调度了,则认为其是无效节点,继续向前查找,直至找到有效状态的节点*/ 873 if (ws > 0) { 874 do { 875 //前驱结点已经被取消,则将前驱结点设置为pred = pred.prev 876 node.prev = pred = pred.prev; 877 //不断遍历,直到找到第一个不是取消状态的节点 878 } while (pred.waitStatus > 0); 879 // 880 pred.next = node; 881 }else { 882 /*前驱结点状态正常,将前驱结点状态设置为SIGNAL,则前驱结点释放资源的时候,就可以尝试唤醒它的后继节点了*/ 883 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 884 } 885 return false; 886 } 887 888 /** 889 * Convenience method to interrupt current thread. 890 */ 891 static void selfInterrupt() { 892 Thread.currentThread().interrupt(); 893 } 894 895 /** 896 * Convenience method to park and then check if interrupted 897 * 898 * @return {@code true} if interrupted 899 */ 900 private final boolean parkAndCheckInterrupt() { 901 //将当前线程挂起 902 LockSupport.park(this); 903 //检测当前线程的中断状态,并且会清除线程的中断标记 904 return Thread.interrupted(); 905 } 906 907 /* 908 * Various flavors of acquire, varying in exclusive/shared and 909 * control modes. Each is mostly the same, but annoyingly 910 * different. Only a little bit of factoring is possible due to 911 * interactions of exception mechanics (including ensuring that we 912 * cancel if tryAcquire throws exception) and other control, at 913 * least not without hurting performance too much. 914 */ 915 916 /** 917 * Acquires in exclusive uninterruptible mode for thread already in 918 * queue. Used by condition wait methods as well as acquire. 919 * 使线程阻塞在同步队列中等待获取资源,直到获取资源成功才返回,此过程中线程发生了中断就返回true,否则就返回false 920 * @param node the node 921 * @param arg the acquire argument 922 * @return {@code true} if interrupted while waiting 923 */ 924 final boolean acquireQueued(final Node node, int arg) { 925 /*标记等待过程中是否发生了异常*/ 926 boolean failed = true; 927 928 try { 929 /*标记线程阻塞的过程中是否发生了中断*/ 930 boolean interrupted = false; 931 /*线程自旋阻塞*/ 932 for(;;){ 933 /*获取当前节点的前驱结点*/ 934 final Node p = node.predecessor(); 935 /*前驱结点是头结点,则说明当前线程有资格获取共享资源,尝试获取,获取成功,将当前节点设置为头结点*/ 936 if (p == head && tryAcquire(arg)) { 937 /*将当前节点设置为头结点*/ 938 setHead(node); 939 p.next = null; //help GC 940 failed = false; 941 return interrupted; 942 } 943 944 /*判断当前线程是否可以挂起*/ 945 if (shouldParkAfterFailedAcquire(p, node) 946 /*当前线程可以挂起,则挂起线程,并且线程被unpark()唤醒,检查线程的状态*/ 947 && parkAndCheckInterrupt()) { 948 interrupted = true; 949 } 950 } 951 }finally{ 952 if (failed) { 953 /*阻塞获取同步资源的时候,发生了异常,取消当前线程的在同步队列中的排队*/ 954 cancelAcquire(node); 955 } 956 } 957 } 958 959 /** 960 * Acquires in exclusive interruptible mode. 961 * @param arg the acquire argument 962 */ 963 //尝试获取锁,并响应中断 964 private void doAcquireInterruptibly(int arg) 965 throws InterruptedException { 966 //将当前线程包装成Node节点,加入到同步队列sync queue中 967 final Node node = addWaiter(Node.EXCLUSIVE); 968 //标记是否获取锁的过程中是否发生了异常 969 boolean failed = true; 970 try { 971 //自旋 972 for (;;) { 973 //获取当前节点的前驱结点 974 final Node p = node.predecessor(); 975 //如果当前节点的前驱结点是头结点head,尝试获取锁,当前线程才有资格获取排他锁(头结点head是个哑结点,不代表任何线程) 976 if (p == head && tryAcquire(arg)) { 977 //当前节点获取锁成功,将当前节点设置为头结点head 978 setHead(node); 979 //将原来的头结点从同步队列sync queue中移除 980 p.next = null; // help GC 981 failed = false; 982 return; 983 } 984 //尝试获取锁失败,判断是否可以将当前线程安全地挂起,只有当前线程有效的前驱结点状态为Node.SIGNAL,当前线程才可以安全地被挂起,后续也会被及时地唤醒 985 if (shouldParkAfterFailedAcquire(p, node) && 986 //将当前线程挂起,并等他唤醒的时候判断是否发生了线程中断,发生了线程中断,则进入finally块中,取消当前线程对锁的尝试获取 987 parkAndCheckInterrupt()) 988 throw new InterruptedException(); 989 } 990 } finally { 991 if (failed) 992 cancelAcquire(node); 993 } 994 } 995 996 /** 997 * Acquires in exclusive timed mode. 998 * 999 * @param arg the acquire argument 1000 * @param nanosTimeout max wait time 1001 * @return {@code true} if acquired 1002 */ 1003 private boolean doAcquireNanos(int arg, long nanosTimeout) 1004 throws InterruptedException { 1005 //指定的时间<=0则直接返回false 1006 if (nanosTimeout <= 0L) 1007 return false; 1008 //计算当前线程阻塞的截至时间 1009 final long deadline = System.nanoTime() + nanosTimeout; 1010 //将当前线程包装成Node节点,加入到同步队列sync queue中 1011 final Node node = addWaiter(Node.EXCLUSIVE); 1012 boolean failed = true; 1013 try { 1014 //自旋尝试获取锁 1015 for (;;) { 1016 //获取当前节点的前驱结点 1017 final Node p = node.predecessor(); 1018 //前驱结点是头结点,则当前节点有资格尝试获取锁,则尝试获取锁 1019 if (p == head && tryAcquire(arg)) { 1020 //获取锁成功,则将当前节点设置为头结点head 1021 setHead(node); 1022 //将同步队列sync queue中原来的头结点移除队列 1023 p.next = null; // help GC 1024 //表示获取锁的过程没有发生异常 1025 failed = false; 1026 return true; 1027 } 1028 //计算剩余的等待时间 1029 nanosTimeout = deadline - System.nanoTime(); 1030 //剩余的等待时间<=0,直接返回false 1031 if (nanosTimeout <= 0L) 1032 return false; 1033 //当前线程获取锁失败,判断是否可以将当前线程安全地挂起 1034 if (shouldParkAfterFailedAcquire(p, node) && 1035 //如果剩余的等待时间<= spinForTimeoutThreshold,则不用将当前线程挂起,进行自旋即可 1036 nanosTimeout > spinForTimeoutThreshold) 1037 LockSupport.parkNanos(this, nanosTimeout); 1038 //获取锁的过程中发生了中断,则直接抛出InterruptedException异常 1039 if (Thread.interrupted()) 1040 throw new InterruptedException(); 1041 } 1042 } finally { 1043 //获取锁的过程中发生了异常,则将当前线程取消 1044 if (failed) 1045 cancelAcquire(node); 1046 } 1047 } 1048 1049 /** 1050 * Acquires in shared uninterruptible mode. 1051 * @param arg the acquire argument 1052 */ 1053 private void doAcquireShared(int arg) { 1054 //将竞争共享资源失败的线程加入到同步队列中,并标记为共享模式 1055 final Node node = addWaiter(Node.SHARED); 1056 //标记阻塞获取资源的过程中是否发生了异常 1057 boolean failed = true; 1058 1059 try { 1060 //标记阻塞获取资源的过程中,是否发生了线程中断请求 1061 boolean interrupted = false; 1062 //线程阻塞等待获取资源,被有效前继节点唤醒后,尝试竞争共享资源 1063 for(;;){ 1064 /** 1065 * 当前线程被唤醒之后,什么时候有资格竞争共享资源呢? 1066 * 之后当它的前继节点是头结点(头结点是当前持有共享资源的线程),在唤醒后继节点的过程中,可能释放了资源,所以后继节点尝试获取一次共享资源。 1067 */ 1068 final Node p = node.predecessor(); 1069 if (p == head) { 1070 int r = tryAcquireShared(arg); 1071 if (r >= 0) { 1072 setHeadAndPropagate(node, r); 1073 p.next = null;//help GC 1074 if (interrupted) { 1075 selfInterrupt(); 1076 } 1077 failed = false; 1078 return; 1079 } 1080 } 1081 1082 if (shouldParkAfterFailedAcquire(p, node) && 1083 parkAndCheckInterrupt()) { 1084 interrupted = true; 1085 } 1086 } 1087 } finally{ 1088 if (failed) { 1089 cancelAcquire(node); 1090 } 1091 } 1092 } 1093 1094 /** 1095 * Acquires in shared interruptible mode. 1096 * @param arg the acquire argument 1097 */ 1098 private void doAcquireSharedInterruptibly(int arg) 1099 throws InterruptedException { 1100 final Node node = addWaiter(Node.SHARED); 1101 boolean failed = true; 1102 try { 1103 for (;;) { 1104 final Node p = node.predecessor(); 1105 if (p == head) { 1106 int r = tryAcquireShared(arg); 1107 if (r >= 0) { 1108 setHeadAndPropagate(node, r); 1109 p.next = null; // help GC 1110 failed = false; 1111 return; 1112 } 1113 } 1114 if (shouldParkAfterFailedAcquire(p, node) && 1115 parkAndCheckInterrupt()) 1116 throw new InterruptedException(); 1117 } 1118 } finally { 1119 if (failed) 1120 cancelAcquire(node); 1121 } 1122 } 1123 1124 /** 1125 * Acquires in shared timed mode. 1126 * 1127 * @param arg the acquire argument 1128 * @param nanosTimeout max wait time 1129 * @return {@code true} if acquired 1130 */ 1131 private boolean doAcquireSharedNanos(int arg, long nanosTimeout) 1132 throws InterruptedException { 1133 if (nanosTimeout <= 0L) 1134 return false; 1135 final long deadline = System.nanoTime() + nanosTimeout; 1136 final Node node = addWaiter(Node.SHARED); 1137 boolean failed = true; 1138 try { 1139 for (;;) { 1140 final Node p = node.predecessor(); 1141 if (p == head) { 1142 int r = tryAcquireShared(arg); 1143 if (r >= 0) { 1144 setHeadAndPropagate(node, r); 1145 p.next = null; // help GC 1146 failed = false; 1147 return true; 1148 } 1149 } 1150 nanosTimeout = deadline - System.nanoTime(); 1151 if (nanosTimeout <= 0L) 1152 return false; 1153 if (shouldParkAfterFailedAcquire(p, node) && 1154 nanosTimeout > spinForTimeoutThreshold) 1155 LockSupport.parkNanos(this, nanosTimeout); 1156 if (Thread.interrupted()) 1157 throw new InterruptedException(); 1158 } 1159 } finally { 1160 if (failed) 1161 cancelAcquire(node); 1162 } 1163 } 1164 1165 // Main exported methods 1166 1167 /** 1168 * Attempts to acquire in exclusive mode. This method should query 1169 * if the state of the object permits it to be acquired in the 1170 * exclusive mode, and if so to acquire it. 1171 * 1172 * <p>This method is always invoked by the thread performing 1173 * acquire. If this method reports failure, the acquire method 1174 * may queue the thread, if it is not already queued, until it is 1175 * signalled by a release from some other thread. This can be used 1176 * to implement method {@link Lock#tryLock()}. 1177 * 1178 * <p>The default 1179 * implementation throws {@link UnsupportedOperationException}. 1180 * 1181 * @param arg the acquire argument. This value is always the one 1182 * passed to an acquire method, or is the value saved on entry 1183 * to a condition wait. The value is otherwise uninterpreted 1184 * and can represent anything you like. 1185 * @return {@code true} if successful. Upon success, this object has 1186 * been acquired. 1187 * @throws IllegalMonitorStateException if acquiring would place this 1188 * synchronizer in an illegal state. This exception must be 1189 * thrown in a consistent fashion for synchronization to work 1190 * correctly. 1191 * @throws UnsupportedOperationException if exclusive mode is not supported 1192 */ 1193 protected boolean tryAcquire(int arg) { 1194 throw new UnsupportedOperationException(); 1195 } 1196 1197 /** 1198 * Attempts to set the state to reflect a release in exclusive 1199 * mode. 1200 * 1201 * <p>This method is always invoked by the thread performing release. 1202 * 1203 * <p>The default implementation throws 1204 * {@link UnsupportedOperationException}. 1205 * 1206 * @param arg the release argument. This value is always the one 1207 * passed to a release method, or the current state value upon 1208 * entry to a condition wait. The value is otherwise 1209 * uninterpreted and can represent anything you like. 1210 * @return {@code true} if this object is now in a fully released 1211 * state, so that any waiting threads may attempt to acquire; 1212 * and {@code false} otherwise. 1213 * @throws IllegalMonitorStateException if releasing would place this 1214 * synchronizer in an illegal state. This exception must be 1215 * thrown in a consistent fashion for synchronization to work 1216 * correctly. 1217 * @throws UnsupportedOperationException if exclusive mode is not supported 1218 */ 1219 protected boolean tryRelease(int arg) { 1220 throw new UnsupportedOperationException(); 1221 } 1222 1223 /** 1224 * Attempts to acquire in shared mode. This method should query if 1225 * the state of the object permits it to be acquired in the shared 1226 * mode, and if so to acquire it. 1227 * 1228 * <p>This method is always invoked by the thread performing 1229 * acquire. If this method reports failure, the acquire method 1230 * may queue the thread, if it is not already queued, until it is 1231 * signalled by a release from some other thread. 1232 * 1233 * <p>The default implementation throws {@link 1234 * UnsupportedOperationException}. 1235 * 1236 * @param arg the acquire argument. This value is always the one 1237 * passed to an acquire method, or is the value saved on entry 1238 * to a condition wait. The value is otherwise uninterpreted 1239 * and can represent anything you like. 1240 * @return a negative value on failure; zero if acquisition in shared 1241 * mode succeeded but no subsequent shared-mode acquire can 1242 * succeed; and a positive value if acquisition in shared 1243 * mode succeeded and subsequent shared-mode acquires might 1244 * also succeed, in which case a subsequent waiting thread 1245 * must check availability. (Support for three different 1246 * return values enables this method to be used in contexts 1247 * where acquires only sometimes act exclusively.) Upon 1248 * success, this object has been acquired. 1249 * @throws IllegalMonitorStateException if acquiring would place this 1250 * synchronizer in an illegal state. This exception must be 1251 * thrown in a consistent fashion for synchronization to work 1252 * correctly. 1253 * @throws UnsupportedOperationException if shared mode is not supported 1254 */ 1255 protected int tryAcquireShared(int arg) { 1256 for(;;){ 1257 //计算可用资源量 1258 int available = getState(); 1259 //计算剩余资源量 1260 int remaining = available - arg; 1261 //计算资源数量,不够直接放回,加入CAS是为了保证state状态一定能够更新成功 1262 if (remaining < 0 || 1263 compareAndSetState(available, remaining)) { 1264 return remaining; 1265 } 1266 } 1267 1268 //throw new UnsupportedOperationException(); 1269 } 1270 1271 /** 1272 * Attempts to set the state to reflect a release in shared mode. 1273 * 1274 * <p>This method is always invoked by the thread performing release. 1275 * 1276 * <p>The default implementation throws 1277 * {@link UnsupportedOperationException}. 1278 * 1279 * @param arg the release argument. This value is always the one 1280 * passed to a release method, or the current state value upon 1281 * entry to a condition wait. The value is otherwise 1282 * uninterpreted and can represent anything you like. 1283 * @return {@code true} if this release of shared mode may permit a 1284 * waiting acquire (shared or exclusive) to succeed; and 1285 * {@code false} otherwise 1286 * @throws IllegalMonitorStateException if releasing would place this 1287 * synchronizer in an illegal state. This exception must be 1288 * thrown in a consistent fashion for synchronization to work 1289 * correctly. 1290 * @throws UnsupportedOperationException if shared mode is not supported 1291 */ 1292 protected boolean tryReleaseShared(int arg) { 1293 throw new UnsupportedOperationException(); 1294 } 1295 1296 /** 1297 * Returns {@code true} if synchronization is held exclusively with 1298 * respect to the current (calling) thread. This method is invoked 1299 * upon each call to a non-waiting {@link ConditionObject} method. 1300 * (Waiting methods instead invoke {@link #release}.) 1301 * 1302 * <p>The default implementation throws {@link 1303 * UnsupportedOperationException}. This method is invoked 1304 * internally only within {@link ConditionObject} methods, so need 1305 * not be defined if conditions are not used. 1306 * 1307 * @return {@code true} if synchronization is held exclusively; 1308 * {@code false} otherwise 1309 * @throws UnsupportedOperationException if conditions are not supported 1310 */ 1311 protected boolean isHeldExclusively() { 1312 throw new UnsupportedOperationException(); 1313 } 1314 1315 /** 1316 * Acquires in exclusive mode, ignoring interrupts. Implemented 1317 * by invoking at least once {@link #tryAcquire}, 1318 * returning on success. Otherwise the thread is queued, possibly 1319 * repeatedly blocking and unblocking, invoking {@link 1320 * #tryAcquire} until success. This method can be used 1321 * to implement method {@link Lock#lock}. 1322 * 1323 * @param arg the acquire argument. This value is conveyed to 1324 * {@link #tryAcquire} but is otherwise uninterpreted and 1325 * can represent anything you like. 1326 */ 1327 public final void acquire(int arg) { 1328 /*加塞抢占共享资源(因为同步队列中可能还有其它节点等待),获取成功则直接返回*/ 1329 if (!tryAcquire(arg) 1330 /* 1、抢占共享资源失败,则将当前节点放入到同步队列的尾部,并标记为独占模式; 1331 * 2、使线程阻塞在同步队列中获取资源,直到获取成功才返回;如果整个过程中被中断过就返回true,否则就返回false;*/ 1332 && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { 1333 /*阻塞获取资源的过程中是不响应线程中断的,内部进行了中断检测,所以这块进行线程中断*/ 1334 selfInterrupt(); 1335 } 1336 } 1337 1338 /** 1339 * Acquires in exclusive mode, aborting if interrupted. 1340 * Implemented by first checking interrupt status, then invoking 1341 * at least once {@link #tryAcquire}, returning on 1342 * success. Otherwise the thread is queued, possibly repeatedly 1343 * blocking and unblocking, invoking {@link #tryAcquire} 1344 * until success or the thread is interrupted. This method can be 1345 * used to implement method {@link Lock#lockInterruptibly}. 1346 * 1347 * @param arg the acquire argument. This value is conveyed to 1348 * {@link #tryAcquire} but is otherwise uninterpreted and 1349 * can represent anything you like. 1350 * @throws InterruptedException if the current thread is interrupted 1351 */ 1352 public final void acquireInterruptibly(int arg) 1353 throws InterruptedException { 1354 if (Thread.interrupted()) 1355 throw new InterruptedException(); 1356 if (!tryAcquire(arg)) 1357 doAcquireInterruptibly(arg); 1358 } 1359 1360 /** 1361 * Attempts to acquire in exclusive mode, aborting if interrupted, 1362 * and failing if the given timeout elapses. Implemented by first 1363 * checking interrupt status, then invoking at least once {@link 1364 * #tryAcquire}, returning on success. Otherwise, the thread is 1365 * queued, possibly repeatedly blocking and unblocking, invoking 1366 * {@link #tryAcquire} until success or the thread is interrupted 1367 * or the timeout elapses. This method can be used to implement 1368 * method {@link Lock#tryLock(long, TimeUnit)}. 1369 * 1370 * @param arg the acquire argument. This value is conveyed to 1371 * {@link #tryAcquire} but is otherwise uninterpreted and 1372 * can represent anything you like. 1373 * @param nanosTimeout the maximum number of nanoseconds to wait 1374 * @return {@code true} if acquired; {@code false} if timed out 1375 * @throws InterruptedException if the current thread is interrupted 1376 */ 1377 public final boolean tryAcquireNanos(int arg, long nanosTimeout) 1378 throws InterruptedException { 1379 //当前线程被中断,直接抛出异常 1380 if (Thread.interrupted()) 1381 throw new InterruptedException(); 1382 //尝试直接获取锁,获取成功就直接返回 1383 return tryAcquire(arg) || 1384 //否则就“阻塞”指定时间,再尝试获取锁 1385 doAcquireNanos(arg, nanosTimeout); 1386 } 1387 1388 /** 1389 * Releases in exclusive mode. Implemented by unblocking one or 1390 * more threads if {@link #tryRelease} returns true. 1391 * This method can be used to implement method {@link Lock#unlock}. 1392 * 1393 * @param arg the release argument. This value is conveyed to 1394 * {@link #tryRelease} but is otherwise uninterpreted and 1395 * can represent anything you like. 1396 * @return the value returned from {@link #tryRelease} 1397 */ 1398 public final boolean release(int arg) { 1399 if (tryRelease(arg)) { 1400 Node h = head; 1401 //当前线程已经释放锁,但是sync queue同步队列不为空,并且头结点head未被取消 1402 if (h != null && h.waitStatus != 0) 1403 //说明还有其它线程等待竞争锁,则尝试唤醒sync queue队列中等待锁的线程 1404 unparkSuccessor(h); 1405 return true; 1406 } 1407 return false; 1408 } 1409 1410 /** 1411 * Acquires in shared mode, ignoring interrupts. Implemented by 1412 * first invoking at least once {@link #tryAcquireShared}, 1413 * returning on success. Otherwise the thread is queued, possibly 1414 * repeatedly blocking and unblocking, invoking {@link 1415 * #tryAcquireShared} until success. 1416 * 1417 * @param arg the acquire argument. This value is conveyed to 1418 * {@link #tryAcquireShared} but is otherwise uninterpreted 1419 * and can represent anything you like. 1420 */ 1421 public final void acquireShared(int arg) { 1422 if (tryAcquireShared(arg) < 0) 1423 doAcquireShared(arg); 1424 } 1425 1426 /** 1427 * Acquires in shared mode, aborting if interrupted. Implemented 1428 * by first checking interrupt status, then invoking at least once 1429 * {@link #tryAcquireShared}, returning on success. Otherwise the 1430 * thread is queued, possibly repeatedly blocking and unblocking, 1431 * invoking {@link #tryAcquireShared} until success or the thread 1432 * is interrupted. 1433 * @param arg the acquire argument. 1434 * This value is conveyed to {@link #tryAcquireShared} but is 1435 * otherwise uninterpreted and can represent anything 1436 * you like. 1437 * @throws InterruptedException if the current thread is interrupted 1438 */ 1439 public final void acquireSharedInterruptibly(int arg) 1440 throws InterruptedException { 1441 //当前线程已经被中断,则直接抛出InterruptedException异常 1442 if (Thread.interrupted()) 1443 throw new InterruptedException(); 1444 if (tryAcquireShared(arg) < 0) 1445 doAcquireSharedInterruptibly(arg); 1446 } 1447 1448 /** 1449 * Attempts to acquire in shared mode, aborting if interrupted, and 1450 * failing if the given timeout elapses. Implemented by first 1451 * checking interrupt status, then invoking at least once {@link 1452 * #tryAcquireShared}, returning on success. Otherwise, the 1453 * thread is queued, possibly repeatedly blocking and unblocking, 1454 * invoking {@link #tryAcquireShared} until success or the thread 1455 * is interrupted or the timeout elapses. 1456 * 1457 * @param arg the acquire argument. This value is conveyed to 1458 * {@link #tryAcquireShared} but is otherwise uninterpreted 1459 * and can represent anything you like. 1460 * @param nanosTimeout the maximum number of nanoseconds to wait 1461 * @return {@code true} if acquired; {@code false} if timed out 1462 * @throws InterruptedException if the current thread is interrupted 1463 */ 1464 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) 1465 throws InterruptedException { 1466 if (Thread.interrupted()) 1467 throw new InterruptedException(); 1468 return tryAcquireShared(arg) >= 0 || 1469 doAcquireSharedNanos(arg, nanosTimeout); 1470 } 1471 1472 /** 1473 * Releases in shared mode. Implemented by unblocking one or more 1474 * threads if {@link #tryReleaseShared} returns true. 1475 * 1476 * @param arg the release argument. This value is conveyed to 1477 * {@link #tryReleaseShared} but is otherwise uninterpreted 1478 * and can represent anything you like. 1479 * @return the value returned from {@link #tryReleaseShared} 1480 */ 1481 public final boolean releaseShared(int arg) { 1482 //尝试释放锁,释放成功,则返回true 1483 if (tryReleaseShared(arg)) { 1484 //唤醒同步队列中,等待获取锁的线程 1485 doReleaseShared(); 1486 return true; 1487 } 1488 return false; 1489 } 1490 1491 // Queue inspection methods 1492 1493 /** 1494 * Queries whether any threads are waiting to acquire. Note that 1495 * because cancellations due to interrupts and timeouts may occur 1496 * at any time, a {@code true} return does not guarantee that any 1497 * other thread will ever acquire. 1498 * 1499 * <p>In this implementation, this operation returns in 1500 * constant time. 1501 * 1502 * @return {@code true} if there may be other threads waiting to acquire 1503 */ 1504 public final boolean hasQueuedThreads() { 1505 return head != tail; 1506 } 1507 1508 /** 1509 * Queries whether any threads have ever contended to acquire this 1510 * synchronizer; that is if an acquire method has ever blocked. 1511 * 1512 * <p>In this implementation, this operation returns in 1513 * constant time. 1514 * 1515 * @return {@code true} if there has ever been contention 1516 */ 1517 public final boolean hasContended() { 1518 return head != null; 1519 } 1520 1521 /** 1522 * Returns the first (longest-waiting) thread in the queue, or 1523 * {@code null} if no threads are currently queued. 1524 * 1525 * <p>In this implementation, this operation normally returns in 1526 * constant time, but may iterate upon contention if other threads are 1527 * concurrently modifying the queue. 1528 * 1529 * @return the first (longest-waiting) thread in the queue, or 1530 * {@code null} if no threads are currently queued 1531 */ 1532 public final Thread getFirstQueuedThread() { 1533 // handle only fast path, else relay 1534 return (head == tail) ? null : fullGetFirstQueuedThread(); 1535 } 1536 1537 /** 1538 * Version of getFirstQueuedThread called when fastpath fails 1539 */ 1540 private Thread fullGetFirstQueuedThread() { 1541 /* 1542 * The first node is normally head.next. Try to get its 1543 * thread field, ensuring consistent reads: If thread 1544 * field is nulled out or s.prev is no longer head, then 1545 * some other thread(s) concurrently performed setHead in 1546 * between some of our reads. We try this twice before 1547 * resorting to traversal. 1548 */ 1549 Node h, s; 1550 Thread st; 1551 if (((h = head) != null && (s = h.next) != null && 1552 s.prev == head && (st = s.thread) != null) || 1553 ((h = head) != null && (s = h.next) != null && 1554 s.prev == head && (st = s.thread) != null)) 1555 return st; 1556 1557 /* 1558 * Head's next field might not have been set yet, or may have 1559 * been unset after setHead. So we must check to see if tail 1560 * is actually first node. If not, we continue on, safely 1561 * traversing from tail back to head to find first, 1562 * guaranteeing termination. 1563 */ 1564 1565 Node t = tail; 1566 Thread firstThread = null; 1567 while (t != null && t != head) { 1568 Thread tt = t.thread; 1569 if (tt != null) 1570 firstThread = tt; 1571 t = t.prev; 1572 } 1573 return firstThread; 1574 } 1575 1576 /** 1577 * Returns true if the given thread is currently queued. 1578 * 1579 * <p>This implementation traverses the queue to determine 1580 * presence of the given thread. 1581 * 1582 * @param thread the thread 1583 * @return {@code true} if the given thread is on the queue 1584 * @throws NullPointerException if the thread is null 1585 */ 1586 public final boolean isQueued(Thread thread) { 1587 if (thread == null) 1588 throw new NullPointerException(); 1589 for (Node p = tail; p != null; p = p.prev) 1590 if (p.thread == thread) 1591 return true; 1592 return false; 1593 } 1594 1595 /** 1596 * Returns {@code true} if the apparent first queued thread, if one 1597 * exists, is waiting in exclusive mode. If this method returns 1598 * {@code true}, and the current thread is attempting to acquire in 1599 * shared mode (that is, this method is invoked from {@link 1600 * #tryAcquireShared}) then it is guaranteed that the current thread 1601 * is not the first queued thread. Used only as a heuristic in 1602 * ReentrantReadWriteLock. 1603 */ 1604 final boolean apparentlyFirstQueuedIsExclusive() { 1605 Node h, s; 1606 return (h = head) != null && 1607 (s = h.next) != null && 1608 !s.isShared() && 1609 s.thread != null; 1610 } 1611 1612 /** 1613 * Queries whether any threads have been waiting to acquire longer 1614 * than the current thread. 1615 * 1616 * <p>An invocation of this method is equivalent to (but may be 1617 * more efficient than): 1618 * <pre> {@code 1619 * getFirstQueuedThread() != Thread.currentThread() && 1620 * hasQueuedThreads()}</pre> 1621 * 1622 * <p>Note that because cancellations due to interrupts and 1623 * timeouts may occur at any time, a {@code true} return does not 1624 * guarantee that some other thread will acquire before the current 1625 * thread. Likewise, it is possible for another thread to win a 1626 * race to enqueue after this method has returned {@code false}, 1627 * due to the queue being empty. 1628 * 1629 * <p>This method is designed to be used by a fair synchronizer to 1630 * avoid <a href="AbstractQueuedSynchronizer#barging">barging</a>. 1631 * Such a synchronizer's {@link #tryAcquire} method should return 1632 * {@code false}, and its {@link #tryAcquireShared} method should 1633 * return a negative value, if this method returns {@code true} 1634 * (unless this is a reentrant acquire). For example, the {@code 1635 * tryAcquire} method for a fair, reentrant, exclusive mode 1636 * synchronizer might look like this: 1637 * 1638 * <pre> {@code 1639 * protected boolean tryAcquire(int arg) { 1640 * if (isHeldExclusively()) { 1641 * // A reentrant acquire; increment hold count 1642 * return true; 1643 * } else if (hasQueuedPredecessors()) { 1644 * return false; 1645 * } else { 1646 * // try to acquire normally 1647 * } 1648 * }}</pre> 1649 * 1650 * @return {@code true} if there is a queued thread preceding the 1651 * current thread, and {@code false} if the current thread 1652 * is at the head of the queue or the queue is empty 1653 * @since 1.7 1654 */ 1655 public final boolean hasQueuedPredecessors() { 1656 // The correctness of this depends on head being initialized 1657 // before tail and on head.next being accurate if the current 1658 // thread is first in queue. 1659 Node t = tail; // Read fields in reverse initialization order 1660 Node h = head; 1661 Node s; 1662 return h != t && 1663 ((s = h.next) == null || s.thread != Thread.currentThread()); 1664 } 1665 1666 1667 // Instrumentation and monitoring methods 1668 1669 /** 1670 * Returns an estimate of the number of threads waiting to 1671 * acquire. The value is only an estimate because the number of 1672 * threads may change dynamically while this method traverses 1673 * internal data structures. This method is designed for use in 1674 * monitoring system state, not for synchronization 1675 * control. 1676 * 1677 * @return the estimated number of threads waiting to acquire 1678 */ 1679 public final int getQueueLength() { 1680 int n = 0; 1681 for (Node p = tail; p != null; p = p.prev) { 1682 if (p.thread != null) 1683 ++n; 1684 } 1685 return n; 1686 } 1687 1688 /** 1689 * Returns a collection containing threads that may be waiting to 1690 * acquire. Because the actual set of threads may change 1691 * dynamically while constructing this result, the returned 1692 * collection is only a best-effort estimate. The elements of the 1693 * returned collection are in no particular order. This method is 1694 * designed to facilitate construction of subclasses that provide 1695 * more extensive monitoring facilities. 1696 * 1697 * @return the collection of threads 1698 */ 1699 public final Collection<Thread> getQueuedThreads() { 1700 ArrayList<Thread> list = new ArrayList<Thread>(); 1701 for (Node p = tail; p != null; p = p.prev) { 1702 Thread t = p.thread; 1703 if (t != null) 1704 list.add(t); 1705 } 1706 return list; 1707 } 1708 1709 /** 1710 * Returns a collection containing threads that may be waiting to 1711 * acquire in exclusive mode. This has the same properties 1712 * as {@link #getQueuedThreads} except that it only returns 1713 * those threads waiting due to an exclusive acquire. 1714 * 1715 * @return the collection of threads 1716 */ 1717 public final Collection<Thread> getExclusiveQueuedThreads() { 1718 ArrayList<Thread> list = new ArrayList<Thread>(); 1719 for (Node p = tail; p != null; p = p.prev) { 1720 if (!p.isShared()) { 1721 Thread t = p.thread; 1722 if (t != null) 1723 list.add(t); 1724 } 1725 } 1726 return list; 1727 } 1728 1729 /** 1730 * Returns a collection containing threads that may be waiting to 1731 * acquire in shared mode. This has the same properties 1732 * as {@link #getQueuedThreads} except that it only returns 1733 * those threads waiting due to a shared acquire. 1734 * 1735 * @return the collection of threads 1736 */ 1737 public final Collection<Thread> getSharedQueuedThreads() { 1738 ArrayList<Thread> list = new ArrayList<Thread>(); 1739 for (Node p = tail; p != null; p = p.prev) { 1740 if (p.isShared()) { 1741 Thread t = p.thread; 1742 if (t != null) 1743 list.add(t); 1744 } 1745 } 1746 return list; 1747 } 1748 1749 /** 1750 * Returns a string identifying this synchronizer, as well as its state. 1751 * The state, in brackets, includes the String {@code "State ="} 1752 * followed by the current value of {@link #getState}, and either 1753 * {@code "nonempty"} or {@code "empty"} depending on whether the 1754 * queue is empty. 1755 * 1756 * @return a string identifying this synchronizer, as well as its state 1757 */ 1758 public String toString() { 1759 int s = getState(); 1760 String q = hasQueuedThreads() ? "non" : ""; 1761 return super.toString() + 1762 "[State = " + s + ", " + q + "empty queue]"; 1763 } 1764 1765 1766 // Internal support methods for Conditions 1767 1768 /** 1769 * Returns true if a node, always one that was initially placed on 1770 * a condition queue, is now waiting to reacquire on sync queue. 1771 * @param node the node 1772 * @return true if is reacquiring 1773 */ 1774 //判断当前线程是否在sync queue同步队列中。 1775 final boolean isOnSyncQueue(Node node) { 1776 /** 1777 * 1、当前节点node的状态waitStatus为Node.CONDITION,则其必定还在条件队列condition中; 1778 * 2、如果当前节点node的node.prev == null,说明当前节点还未被加入到sync queue队列中,因为node节点要加入到sync queue中(无论本次加入是否成功),则本次尝试必有node.prev != null; 1779 */ 1780 if (node.waitStatus == Node.CONDITION || node.prev == null) 1781 return false; 1782 //如果当前节点node.next != null,说明其已经有后继节点了,则其必定在sync queue同步队列中 1783 if (node.next != null) // If has successor, it must be on queue 1784 return true; 1785 /* 1786 * node.prev can be non-null, but not yet on queue because 1787 * the CAS to place it on queue can fail. So we have to 1788 * traverse from tail to make sure it actually made it. It 1789 * will always be near the tail in calls to this method, and 1790 * unless the CAS failed (which is unlikely), it will be 1791 * there, so we hardly ever traverse much. 1792 */ 1793 /** 1794 * 能走到这里来,说明当前节点node.waitStatus != Node.CONDITION && node.prev != null && node.next == null 1795 * ,说明已经开始尝试将当前节点node加入到sync queue同步队列中,已经完成了node.prev = tail,但是进行接下来的CAS操作设置sync queue的尾节点,有可能失败 1796 * 导致当前节点入队失败,所以从sync queue的尾节点开始遍历,进一步确认当前节点是否成功加入到了sync queue中。 1797 */ 1798 return findNodeFromTail(node); 1799 } 1800 1801 /** 1802 * Returns true if node is on sync queue by searching backwards from tail. 1803 * Called only when needed by isOnSyncQueue. 1804 * @return true if present 1805 */ 1806 //判断指定的node节点是否在sync queue同步队列中 1807 private boolean findNodeFromTail(Node node) { 1808 Node t = tail; 1809 //从链表的尾部开始遍历 1810 for (;;) { 1811 //遍历到的当前节点等于指定的节点,则返回true 1812 if (t == node) 1813 return true; 1814 //遍历到的当前节点已经为空,说明链表已经遍历结束,未找到指定的节点,则返回false 1815 if (t == null) 1816 return false; 1817 //移动指针,进入下一次自旋遍历 1818 t = t.prev; 1819 } 1820 } 1821 1822 /** 1823 * Transfers a node from a condition queue onto sync queue. 1824 * Returns true if successful. 1825 * @param node the node 1826 * @return true if successfully transferred (else the node was 1827 * cancelled before signal) 1828 */ 1829 final boolean transferForSignal(Node node) { 1830 /* 1831 * If cannot change waitStatus, the node has been cancelled. 1832 */ 1833 //调用signal方法的时候,当前node节点已经取消了等待,则忽略这个节点 1834 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) 1835 return false; 1836 1837 /* 1838 * Splice onto queue and try to set waitStatus of predecessor to 1839 * indicate that thread is (probably) waiting. If cancelled or 1840 * attempt to set waitStatus fails, wake up to resync (in which 1841 * case the waitStatus can be transiently and harmlessly wrong). 1842 */ 1843 //如果这个节点node在条件队列condition中正常等待,则将node加入到sync queue同步队列中,并返回这个新结点的前驱结点 1844 Node p = enq(node); 1845 int ws = p.waitStatus; 1846 /** 1847 * 同步队列sync queue中的节点是通过前驱结点来唤醒的,前驱结点已经被取消,或者前驱结点的waitStatus设置Node.SIGNAL失败, 1848 * 后继节点不能通过前驱结点正常唤醒,则直接唤醒这个刚刚加入到同步队列sync queue中的节点 1849 */ 1850 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) 1851 LockSupport.unpark(node.thread); 1852 return true; 1853 } 1854 1855 /** 1856 * Transfers node, if necessary, to sync queue after a cancelled wait. 1857 * Returns true if thread was cancelled before being signalled. 1858 * 1859 * @param node the node 1860 * @return true if cancelled before the node was signalled 1861 */ 1862 final boolean transferAfterCancelledWait(Node node) { 1863 /** 1864 * 判断一个node是否被signal()过,最简单有效的方式就是是否离开了condition条件队列,进入到了sync queue同步队列中 1865 */ 1866 //如果一个节点的状态还是Node.CONDITION,说明它还未被signal过,是因为中断导致了它被唤醒(从condition条件队列转移到sync queue同步队列中) 1867 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { 1868 //将当前被唤醒的节点加入到同步队列sync queue中,准备竞争锁,但是还没有将这个节点从条件队列中移除,所以后面的处理一下 1869 enq(node); 1870 return true; 1871 } 1872 /* 1873 * If we lost out to a signal(), then we can't proceed 1874 * until it finishes its enq(). Cancelling during an 1875 * incomplete transfer is both rare and transient, so just 1876 * spin. 1877 */ 1878 while (!isOnSyncQueue(node)) 1879 Thread.yield(); 1880 return false; 1881 } 1882 1883 /** 1884 * Invokes release with current state value; returns saved state. 1885 * Cancels node and throws exception on failure. 1886 * @param node the condition node for this wait 1887 * @return previous sync state 1888 */ 1889 //完全释放线程所占用的锁 1890 final int fullyRelease(Node node) { 1891 boolean failed = true; 1892 try { 1893 //获取当前锁状态 1894 int savedState = getState(); 1895 //完全释放锁,对于重入锁而言,无论重入了几次都一次释放;同时持有锁的线程不是当前线程时候,也会直接抛出IllegalMonitorStateException异常,直接进入finally块 1896 if (release(savedState)) { 1897 failed = false; 1898 return savedState; 1899 } else { 1900 throw new IllegalMonitorStateException(); 1901 } 1902 } finally { 1903 /** 1904 * 如果释放锁失败,则将刚才新加入到条件队列尾节点的node节点的waitStatus设置为Node.CANCELLED, 1905 * 后序其它线程被包装成Node节点加入到条件队列中时候,会将这些已经被取消的节点移除 1906 */ 1907 if (failed) 1908 node.waitStatus = Node.CANCELLED; 1909 } 1910 } 1911 1912 // Instrumentation methods for conditions 1913 1914 /** 1915 * Queries whether the given ConditionObject 1916 * uses this synchronizer as its lock. 1917 * 1918 * @param condition the condition 1919 * @return {@code true} if owned 1920 * @throws NullPointerException if the condition is null 1921 */ 1922 public final boolean owns(ConditionObject condition) { 1923 return condition.isOwnedBy(this); 1924 } 1925 1926 /** 1927 * Queries whether any threads are waiting on the given condition 1928 * associated with this synchronizer. Note that because timeouts 1929 * and interrupts may occur at any time, a {@code true} return 1930 * does not guarantee that a future {@code signal} will awaken 1931 * any threads. This method is designed primarily for use in 1932 * monitoring of the system state. 1933 * 1934 * @param condition the condition 1935 * @return {@code true} if there are any waiting threads 1936 * @throws IllegalMonitorStateException if exclusive synchronization 1937 * is not held 1938 * @throws IllegalArgumentException if the given condition is 1939 * not associated with this synchronizer 1940 * @throws NullPointerException if the condition is null 1941 */ 1942 public final boolean hasWaiters(ConditionObject condition) { 1943 if (!owns(condition)) 1944 throw new IllegalArgumentException("Not owner"); 1945 return condition.hasWaiters(); 1946 } 1947 1948 /** 1949 * Returns an estimate of the number of threads waiting on the 1950 * given condition associated with this synchronizer. Note that 1951 * because timeouts and interrupts may occur at any time, the 1952 * estimate serves only as an upper bound on the actual number of 1953 * waiters. This method is designed for use in monitoring of the 1954 * system state, not for synchronization control. 1955 * 1956 * @param condition the condition 1957 * @return the estimated number of waiting threads 1958 * @throws IllegalMonitorStateException if exclusive synchronization 1959 * is not held 1960 * @throws IllegalArgumentException if the given condition is 1961 * not associated with this synchronizer 1962 * @throws NullPointerException if the condition is null 1963 */ 1964 public final int getWaitQueueLength(ConditionObject condition) { 1965 if (!owns(condition)) 1966 throw new IllegalArgumentException("Not owner"); 1967 return condition.getWaitQueueLength(); 1968 } 1969 1970 /** 1971 * Returns a collection containing those threads that may be 1972 * waiting on the given condition associated with this 1973 * synchronizer. Because the actual set of threads may change 1974 * dynamically while constructing this result, the returned 1975 * collection is only a best-effort estimate. The elements of the 1976 * returned collection are in no particular order. 1977 * 1978 * @param condition the condition 1979 * @return the collection of threads 1980 * @throws IllegalMonitorStateException if exclusive synchronization 1981 * is not held 1982 * @throws IllegalArgumentException if the given condition is 1983 * not associated with this synchronizer 1984 * @throws NullPointerException if the condition is null 1985 */ 1986 public final Collection<Thread> getWaitingThreads(ConditionObject condition) { 1987 if (!owns(condition)) 1988 throw new IllegalArgumentException("Not owner"); 1989 return condition.getWaitingThreads(); 1990 } 1991 1992 /** 1993 * Condition implementation for a {@link 1994 * AbstractQueuedSynchronizer} serving as the basis of a {@link 1995 * Lock} implementation. 1996 * 1997 * <p>Method documentation for this class describes mechanics, 1998 * not behavioral specifications from the point of view of Lock 1999 * and Condition users. Exported versions of this class will in 2000 * general need to be accompanied by documentation describing 2001 * condition semantics that rely on those of the associated 2002 * {@code AbstractQueuedSynchronizer}. 2003 * 2004 * <p>This class is Serializable, but all fields are transient, 2005 * so deserialized conditions have no waiters. 2006 */ 2007 public class ConditionObject implements Condition, java.io.Serializable { 2008 private static final long serialVersionUID = 1173984872572414699L; 2009 /** First node of condition queue. */ 2010 private transient Node firstWaiter; 2011 /** Last node of condition queue. */ 2012 private transient Node lastWaiter; 2013 2014 /** 2015 * Creates a new {@code ConditionObject} instance. 2016 */ 2017 public ConditionObject() { } 2018 2019 // Internal methods 2020 2021 /** 2022 * Adds a new waiter to wait queue. 2023 * @return its new wait node 2024 */ 2025 //将当前线程包装成Node节点,加入到条件队列中 2026 private Node addConditionWaiter() { 2027 //条件队列的尾节点 2028 Node t = lastWaiter; 2029 // If lastWaiter is cancelled, clean out. 2030 //条件尾节点不为空,说明条件队列不为空,但是它的尾节点的waitStatus已经不是Node.CONDITION了,说明其已经取消了,则清除掉条件队列中已经取消的节点 2031 if (t != null && t.waitStatus != Node.CONDITION) { 2032 //清除掉条件队列中已经取消的节点 2033 unlinkCancelledWaiters(); 2034 t = lastWaiter; 2035 } 2036 //将当前线程包装成Node节点,并将新的node节点的状态设置为Node.CONDITION 2037 Node node = new Node(Thread.currentThread(), Node.CONDITION); 2038 //如果条件队列为空,则将条件队列的头结点firstWaiter指向刚创建出来的node节点 2039 if (t == null) 2040 firstWaiter = node; 2041 else 2042 //否则, 将条件队列旧的尾节点的nextWaiter指向新创建的node节点 2043 t.nextWaiter = node; 2044 //将新创建的node节点设置为条件队列的尾节点 2045 lastWaiter = node; 2046 return node; 2047 } 2048 2049 /** 2050 * Removes and transfers nodes until hit non-cancelled one or 2051 * null. Split out from signal in part to encourage compilers 2052 * to inline the case of no waiters. 2053 * @param first (non-null) the first node on condition queue 2054 */ 2055 //唤醒条件队列中第一个未被取消的节点 2056 private void doSignal(Node first) { 2057 do { 2058 //如果条件队列中只有一个节点,则将条件队列清空 2059 if ( (firstWaiter = first.nextWaiter) == null) 2060 //则将条件队列的尾节点lastWaiter也置为null 2061 lastWaiter = null; 2062 //将当前要被唤醒的节点从条件队列中移除 2063 first.nextWaiter = null; 2064 //当前节点已经被取消了,并且条件队列中还有节点,则进行下一次循环 2065 } while (!transferForSignal(first) && 2066 (first = firstWaiter) != null); 2067 } 2068 2069 /** 2070 * Removes and transfers all nodes. 2071 * @param first (non-null) the first node on condition queue 2072 */ 2073 private void doSignalAll(Node first) { 2074 //清空条件队列 2075 lastWaiter = firstWaiter = null; 2076 do { 2077 //获取条件队列头结点的下一个结点 2078 Node next = first.nextWaiter; 2079 //将条件队列的头结点从链表上移除 2080 first.nextWaiter = null; 2081 //将这个刚从条件队列condition中移除的节点转移到sync queue同步队列中 2082 transferForSignal(first); 2083 //修改指针,准备继续下一次的遍历 2084 first = next; 2085 //first != null表示链表还未遍历完成,继续下次遍历 2086 } while (first != null); 2087 } 2088 2089 /** 2090 * Unlinks cancelled waiter nodes from condition queue. 2091 * Called only while holding lock. This is called when 2092 * cancellation occurred during condition wait, and upon 2093 * insertion of a new waiter when lastWaiter is seen to have 2094 * been cancelled. This method is needed to avoid garbage 2095 * retention in the absence of signals. So even though it may 2096 * require a full traversal, it comes into play only when 2097 * timeouts or cancellations occur in the absence of 2098 * signals. It traverses all nodes rather than stopping at a 2099 * particular target to unlink all pointers to garbage nodes 2100 * without requiring many re-traversals during cancellation 2101 * storms. 2102 */ 2103 private void unlinkCancelledWaiters() { 2104 //当前节点,从表示条件队列链表的头结点firstWaiter开始遍历 2105 Node t = firstWaiter; 2106 //表示条件队列中最后一个waitStatus不为Node.CANCELLED的节点 2107 Node trail = null; 2108 //条件队列不为空 2109 while (t != null) { 2110 //当前节点的后继节点 2111 Node next = t.nextWaiter; 2112 //如果当前节点t已经取消了正常等待,则将节点t从条件队列中移除 2113 if (t.waitStatus != Node.CONDITION) { 2114 //将当前节点从条件队列移除 2115 t.nextWaiter = null; 2116 //trail == null表示条件队列当前节点之前的都被取消了,所以将条件队列的头结点firstWaiter重置指向next 2117 if (trail == null) 2118 firstWaiter = next; 2119 else 2120 //当前节点已经被取消,trail != null,则需要将trail与当前节点断开,指向next; 2121 trail.nextWaiter = next; 2122 //如果next == null说明队列已经完成遍历,将原来的条件队列中最后一个正常等待的节点赋值给lastWaiter; 2123 if (next == null) 2124 lastWaiter = trail; 2125 } 2126 else{ 2127 //如果当前节点未被取消,就将当前节点记为队列中最后一个未被取消的节点,令trail = t; 2128 trail = t; 2129 } 2130 //修改当前节点的指针,指向下一个节点next 2131 t = next; 2132 } 2133 } 2134 2135 // public methods 2136 2137 /** 2138 * Moves the longest-waiting thread, if one exists, from the 2139 * wait queue for this condition to the wait queue for the 2140 * owning lock. 2141 * 2142 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 2143 * returns {@code false} 2144 */ 2145 public final void signal() { 2146 //判断当前线程是否是持有锁的线程,如果不是则直接抛出IllegalMonitorStateException异常 2147 if (!isHeldExclusively()) 2148 throw new IllegalMonitorStateException(); 2149 Node first = firstWaiter; 2150 //条件队列condition不为空 2151 if (first != null) 2152 //唤醒条件队列condition中的头结点 2153 doSignal(first); 2154 } 2155 2156 /** 2157 * Moves all threads from the wait queue for this condition to 2158 * the wait queue for the owning lock. 2159 * 2160 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 2161 * returns {@code false} 2162 */ 2163 public final void signalAll() { 2164 //判断当前线程是否是持有锁的线程,如果不是,则直接抛出IllegalMonitorStateException异常 2165 if (!isHeldExclusively()) 2166 throw new IllegalMonitorStateException(); 2167 Node first = firstWaiter; 2168 //判断条件队列是否为空 2169 if (first != null) 2170 //唤醒条件队列中进行等待的节点 2171 doSignalAll(first); 2172 } 2173 2174 /** 2175 * Implements uninterruptible condition wait. 2176 * <ol> 2177 * <li> Save lock state returned by {@link #getState}. 2178 * <li> Invoke {@link #release} with saved state as argument, 2179 * throwing IllegalMonitorStateException if it fails. 2180 * <li> Block until signalled. 2181 * <li> Reacquire by invoking specialized version of 2182 * {@link #acquire} with saved state as argument. 2183 * </ol> 2184 */ 2185 /** 2186 * await()方法中,中断和signal()起到了相同的效果,将当前线程唤醒,但是中断一个正常等待的线程,线程被唤醒后, 2187 * 抢到了锁,但是发现等待的条件还未满足,线程会重新被挂起,所以我们希望有一个方法,在等待的时候,不要响应线程中断, 2188 * 所以下面awaitUnitertuptibly()方法就实现了这个功能。 2189 */ 2190 public final void awaitUninterruptibly() { 2191 //将当前线程包装成Node节点,加入到条件队列condition中 2192 Node node = addConditionWaiter(); 2193 //释放当前线程所持有的锁 2194 int savedState = fullyRelease(node); 2195 boolean interrupted = false; 2196 //判断当前线程是否已经在sync queue同步队列 2197 while (!isOnSyncQueue(node)) { 2198 //当前线程未在sync queue同步队列中,说明其还没有资格竞争独占锁,将当前线程挂起 2199 LockSupport.park(this); 2200 /** 2201 * 判断当前线程是否是由于被其他线程中断唤醒的,如果是则只是简单记录下其中断标记即可, 2202 * 当前线程要跳出此while循环则满足的唯一条件是,被其他线程signal(),进入到同步队列 2203 */ 2204 if (Thread.interrupted()) 2205 interrupted = true; 2206 } 2207 2208 /** 2209 * 这也是一个阻塞操作,当前线程已经加入到同步队列中,尝试竞争锁,竞争失败,找一个合适的地方, 2210 * 将当前线程挂起,等待其他线程signal(),如果在竞争锁的过程中,发生了异常,则需要记录一下 2211 */ 2212 if (acquireQueued(node, savedState) || interrupted) 2213 selfInterrupt(); 2214 } 2215 2216 /* 2217 * For interruptible waits, we need to track whether to throw 2218 * InterruptedException, if interrupted while blocked on 2219 * condition, versus reinterrupt current thread, if 2220 * interrupted while blocked waiting to re-acquire. 2221 */ 2222 2223 /** Mode meaning to reinterrupt on exit from wait */ 2224 private static final int REINTERRUPT = 1; 2225 /** Mode meaning to throw InterruptedException on exit from wait */ 2226 private static final int THROW_IE = -1; 2227 2228 /** 2229 * Checks for interrupt, returning THROW_IE if interrupted 2230 * before signalled, REINTERRUPT if after signalled, or 2231 * 0 if not interrupted. 2232 */ 2233 private int checkInterruptWhileWaiting(Node node) { 2234 //判断在等待的过程中,线程是否发生了中断 2235 return Thread.interrupted() ? 2236 //判断线程中断时发生在signal()之前,还是signal()之后,THROW_IE表示中断发生在signal()之前,REINTERRUPT表示中断发生在signal()之后,中断来得太晚了 2237 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 2238 0; 2239 } 2240 2241 /** 2242 * Throws InterruptedException, reinterrupts current thread, or 2243 * does nothing, depending on mode. 2244 */ 2245 private void reportInterruptAfterWait(int interruptMode) 2246 throws InterruptedException { 2247 //表示还未被其他线程signal()就发生了中断,当前线程是由于中断被唤醒的,所以抛出InterruptedException异常 2248 if (interruptMode == THROW_IE) 2249 throw new InterruptedException(); 2250 //表示被其它线程signal()过了之后才发生了线程中断,中断来得太迟了,所以补一下中断标记就行了 2251 else if (interruptMode == REINTERRUPT) 2252 selfInterrupt(); 2253 } 2254 2255 /** 2256 * Implements interruptible condition wait. 2257 * <ol> 2258 * <li> If current thread is interrupted, throw InterruptedException. 2259 * <li> Save lock state returned by {@link #getState}. 2260 * <li> Invoke {@link #release} with saved state as argument, 2261 * throwing IllegalMonitorStateException if it fails. 2262 * <li> Block until signalled or interrupted. 2263 * <li> Reacquire by invoking specialized version of 2264 * {@link #acquire} with saved state as argument. 2265 * <li> If interrupted while blocked in step 4, throw InterruptedException. 2266 * </ol> 2267 */ 2268 public final void await() throws InterruptedException { 2269 //调用await()方法时,当前线程已经被中断,则直接抛出中断异常 2270 if (Thread.interrupted()) 2271 throw new InterruptedException(); 2272 //将当前线程包装成Node节点,加入到条件队列中 2273 Node node = addConditionWaiter(); 2274 //释放当前线程所占用的锁 2275 int savedState = fullyRelease(node); 2276 int interruptMode = 0; 2277 //判断代表当前线程节点是否在同步队列中,如果不在,说明还没有其它线程调用signal()方法,将当前线程从条件队列Conditon种转移到同步队列中 2278 while (!isOnSyncQueue(node)) { 2279 //当前线程还在条件队列中,没有资格竞争锁,则直接将当前线程挂起 2280 LockSupport.park(this); 2281 //能走到这里,说明当前线程被其他线程signal()或者被其它线程中断了 2282 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 2283 break; 2284 } 2285 //interruptMode != THROW_IE表示当前线程不是由于线程中断而被唤醒的 2286 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 2287 interruptMode = REINTERRUPT; 2288 //上面将从条件队列condition中唤醒的节点加入到sync queue条件队列中时候,并没有从条件队列中移除,这里需要清理一下 2289 if (node.nextWaiter != null) // clean up if cancelled 2290 unlinkCancelledWaiters(); 2291 //当前节点在等待的过程中发生了中断 2292 if (interruptMode != 0) 2293 reportInterruptAfterWait(interruptMode); 2294 } 2295 2296 /** 2297 * Implements timed condition wait. 2298 * <ol> 2299 * <li> If current thread is interrupted, throw InterruptedException. 2300 * <li> Save lock state returned by {@link #getState}. 2301 * <li> Invoke {@link #release} with saved state as argument, 2302 * throwing IllegalMonitorStateException if it fails. 2303 * <li> Block until signalled, interrupted, or timed out. 2304 * <li> Reacquire by invoking specialized version of 2305 * {@link #acquire} with saved state as argument. 2306 * <li> If interrupted while blocked in step 4, throw InterruptedException. 2307 * </ol> 2308 */ 2309 public final long awaitNanos(long nanosTimeout) 2310 throws InterruptedException { 2311 if (Thread.interrupted()) 2312 throw new InterruptedException(); 2313 Node node = addConditionWaiter(); 2314 int savedState = fullyRelease(node); 2315 final long deadline = System.nanoTime() + nanosTimeout; 2316 int interruptMode = 0; 2317 while (!isOnSyncQueue(node)) { 2318 if (nanosTimeout <= 0L) { 2319 transferAfterCancelledWait(node); 2320 break; 2321 } 2322 if (nanosTimeout >= spinForTimeoutThreshold) 2323 LockSupport.parkNanos(this, nanosTimeout); 2324 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 2325 break; 2326 nanosTimeout = deadline - System.nanoTime(); 2327 } 2328 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 2329 interruptMode = REINTERRUPT; 2330 if (node.nextWaiter != null) 2331 unlinkCancelledWaiters(); 2332 if (interruptMode != 0) 2333 reportInterruptAfterWait(interruptMode); 2334 return deadline - System.nanoTime(); 2335 } 2336 2337 /** 2338 * Implements absolute timed condition wait. 2339 * <ol> 2340 * <li> If current thread is interrupted, throw InterruptedException. 2341 * <li> Save lock state returned by {@link #getState}. 2342 * <li> Invoke {@link #release} with saved state as argument, 2343 * throwing IllegalMonitorStateException if it fails. 2344 * <li> Block until signalled, interrupted, or timed out. 2345 * <li> Reacquire by invoking specialized version of 2346 * {@link #acquire} with saved state as argument. 2347 * <li> If interrupted while blocked in step 4, throw InterruptedException. 2348 * <li> If timed out while blocked in step 4, return false, else true. 2349 * </ol> 2350 */ 2351 public final boolean awaitUntil(Date deadline) 2352 throws InterruptedException { 2353 long abstime = deadline.getTime(); 2354 if (Thread.interrupted()) 2355 throw new InterruptedException(); 2356 Node node = addConditionWaiter(); 2357 int savedState = fullyRelease(node); 2358 boolean timedout = false; 2359 int interruptMode = 0; 2360 while (!isOnSyncQueue(node)) { 2361 if (System.currentTimeMillis() > abstime) { 2362 timedout = transferAfterCancelledWait(node); 2363 break; 2364 } 2365 LockSupport.parkUntil(this, abstime); 2366 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 2367 break; 2368 } 2369 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 2370 interruptMode = REINTERRUPT; 2371 if (node.nextWaiter != null) 2372 unlinkCancelledWaiters(); 2373 if (interruptMode != 0) 2374 reportInterruptAfterWait(interruptMode); 2375 return !timedout; 2376 } 2377 2378 /** 2379 * Implements timed condition wait. 2380 * <ol> 2381 * <li> If current thread is interrupted, throw InterruptedException. 2382 * <li> Save lock state returned by {@link #getState}. 2383 * <li> Invoke {@link #release} with saved state as argument, 2384 * throwing IllegalMonitorStateException if it fails. 2385 * <li> Block until signalled, interrupted, or timed out. 2386 * <li> Reacquire by invoking specialized version of 2387 * {@link #acquire} with saved state as argument. 2388 * <li> If interrupted while blocked in step 4, throw InterruptedException. 2389 * <li> If timed out while blocked in step 4, return false, else true. 2390 * </ol> 2391 */ 2392 public final boolean await(long time, TimeUnit unit) 2393 throws InterruptedException { 2394 long nanosTimeout = unit.toNanos(time); 2395 if (Thread.interrupted()) 2396 throw new InterruptedException(); 2397 Node node = addConditionWaiter(); 2398 int savedState = fullyRelease(node); 2399 final long deadline = System.nanoTime() + nanosTimeout; 2400 boolean timedout = false; 2401 int interruptMode = 0; 2402 while (!isOnSyncQueue(node)) { 2403 if (nanosTimeout <= 0L) { 2404 timedout = transferAfterCancelledWait(node); 2405 break; 2406 } 2407 if (nanosTimeout >= spinForTimeoutThreshold) 2408 LockSupport.parkNanos(this, nanosTimeout); 2409 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 2410 break; 2411 nanosTimeout = deadline - System.nanoTime(); 2412 } 2413 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 2414 interruptMode = REINTERRUPT; 2415 if (node.nextWaiter != null) 2416 unlinkCancelledWaiters(); 2417 if (interruptMode != 0) 2418 reportInterruptAfterWait(interruptMode); 2419 return !timedout; 2420 } 2421 2422 // support for instrumentation 2423 2424 /** 2425 * Returns true if this condition was created by the given 2426 * synchronization object. 2427 * 2428 * @return {@code true} if owned 2429 */ 2430 final boolean isOwnedBy(AbstractQueuedSynchronizer sync) { 2431 return sync == AbstractQueuedSynchronizer.this; 2432 } 2433 2434 /** 2435 * Queries whether any threads are waiting on this condition. 2436 * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}. 2437 * 2438 * @return {@code true} if there are any waiting threads 2439 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 2440 * returns {@code false} 2441 */ 2442 protected final boolean hasWaiters() { 2443 if (!isHeldExclusively()) 2444 throw new IllegalMonitorStateException(); 2445 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 2446 if (w.waitStatus == Node.CONDITION) 2447 return true; 2448 } 2449 return false; 2450 } 2451 2452 /** 2453 * Returns an estimate of the number of threads waiting on 2454 * this condition. 2455 * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}. 2456 * 2457 * @return the estimated number of waiting threads 2458 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 2459 * returns {@code false} 2460 */ 2461 protected final int getWaitQueueLength() { 2462 if (!isHeldExclusively()) 2463 throw new IllegalMonitorStateException(); 2464 int n = 0; 2465 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 2466 if (w.waitStatus == Node.CONDITION) 2467 ++n; 2468 } 2469 return n; 2470 } 2471 2472 /** 2473 * Returns a collection containing those threads that may be 2474 * waiting on this Condition. 2475 * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}. 2476 * 2477 * @return the collection of threads 2478 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 2479 * returns {@code false} 2480 */ 2481 protected final Collection<Thread> getWaitingThreads() { 2482 if (!isHeldExclusively()) 2483 throw new IllegalMonitorStateException(); 2484 ArrayList<Thread> list = new ArrayList<Thread>(); 2485 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 2486 if (w.waitStatus == Node.CONDITION) { 2487 Thread t = w.thread; 2488 if (t != null) 2489 list.add(t); 2490 } 2491 } 2492 return list; 2493 } 2494 } 2495 2496 /** 2497 * Setup to support compareAndSet. We need to natively implement 2498 * this here: For the sake of permitting future enhancements, we 2499 * cannot explicitly subclass AtomicInteger, which would be 2500 * efficient and useful otherwise. So, as the lesser of evils, we 2501 * natively implement using hotspot intrinsics API. And while we 2502 * are at it, we do the same for other CASable fields (which could 2503 * otherwise be done with atomic field updaters). 2504 */ 2505 private static final Unsafe unsafe = Unsafe.getUnsafe(); 2506 private static final long stateOffset; 2507 private static final long headOffset; 2508 private static final long tailOffset; 2509 private static final long waitStatusOffset; 2510 private static final long nextOffset; 2511 2512 static { 2513 try { 2514 stateOffset = unsafe.objectFieldOffset 2515 (AbstractQueuedSynchronizer.class.getDeclaredField("state")); 2516 headOffset = unsafe.objectFieldOffset 2517 (AbstractQueuedSynchronizer.class.getDeclaredField("head")); 2518 tailOffset = unsafe.objectFieldOffset 2519 (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); 2520 waitStatusOffset = unsafe.objectFieldOffset 2521 (Node.class.getDeclaredField("waitStatus")); 2522 nextOffset = unsafe.objectFieldOffset 2523 (Node.class.getDeclaredField("next")); 2524 2525 } catch (Exception ex) { throw new Error(ex); } 2526 } 2527 2528 /** 2529 * CAS head field. Used only by enq. 2530 */ 2531 private final boolean compareAndSetHead(Node update) { 2532 return unsafe.compareAndSwapObject(this, headOffset, null, update); 2533 } 2534 2535 /** 2536 * CAS tail field. Used only by enq. 2537 */ 2538 private final boolean compareAndSetTail(Node expect, Node update) { 2539 return unsafe.compareAndSwapObject(this, tailOffset, expect, update); 2540 } 2541 2542 /** 2543 * CAS waitStatus field of a node. 2544 */ 2545 private static final boolean compareAndSetWaitStatus(Node node, 2546 int expect, 2547 int update) { 2548 return unsafe.compareAndSwapInt(node, waitStatusOffset, 2549 expect, update); 2550 } 2551 2552 /** 2553 * CAS next field of a node. 2554 */ 2555 private static final boolean compareAndSetNext(Node node, 2556 Node expect, 2557 Node update) { 2558 return unsafe.compareAndSwapObject(node, nextOffset, expect, update); 2559 } 2560 }
本文来自博客园,作者:一只烤鸭朝北走,仅用于技术学习,所有资源都来源于网络,部分是转发,部分是个人总结。欢迎共同学习和转载,转载请在醒目位置标明原文。如有侵权,请留言告知,及时撤除。转载请注明原文链接:https://www.cnblogs.com/wha6239/p/17348446.html