中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

深入理解Java中的底層阻塞原理及實(shí)現(xiàn)

2018-10-11    來源:importnew

容器云強(qiáng)勢(shì)上線!快速搭建集群,上萬Linux鏡像隨意使用

談到阻塞,相信大家都不會(huì)陌生了。阻塞的應(yīng)用場(chǎng)景真的多得不要不要的,比如 生產(chǎn)-消費(fèi)模式,限流統(tǒng)計(jì)等等。什么 ArrayBlockingQueue、 LinkedBlockingQueue、DelayQueue 等等,都是阻塞隊(duì)列的實(shí)現(xiàn)啊,多簡(jiǎn)單!

阻塞,一般有兩個(gè)特性很亮眼:1. 不耗 CPU 等待;2. 線程安全;

額,要這么說也 OK 的。畢竟,我們遇到的問題,到這里就夠解決了。但是有沒有想過,這容器的阻塞又是如何實(shí)現(xiàn)的呢?

好吧,翻開源碼,也很簡(jiǎn)單了:(比如 ArrayBlockingQueue 的 take、put….)

// ArrayBlockingQueue

/**
 * Inserts the specified element at the tail of this queue, waiting
 * for space to become available if the queue is full.
 *
 * @throws InterruptedException {@inheritDoc}
 * @throws NullPointerException {@inheritDoc}
 */
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            // 阻塞的點(diǎn)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

/**
 * Inserts the specified element at the tail of this queue, waiting
 * up to the specified wait time for space to become available if
 * the queue is full.
 *
 * @throws InterruptedException {@inheritDoc}
 * @throws NullPointerException {@inheritDoc}
 */
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    checkNotNull(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
            if (nanos <= 0)
                return false;
            // 阻塞的點(diǎn)
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            // 阻塞的點(diǎn)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

看來,最終都是依賴了 AbstractQueuedSynchronizer 類(著名的AQS)的 await 方法,看起來像那么回事。那么這個(gè)同步器的阻塞又是如何實(shí)現(xiàn)的呢?

Java的代碼總是好跟蹤的:

//?AbstractQueuedSynchronizer.await()

/**
 * Implements interruptible condition wait.
 * <ol>
 * <li> If current thread is interrupted, throw InterruptedException.
 * <li> Save lock state returned by {@link #getState}.
 * <li> Invoke {@link #release} with saved state as argument,
 *      throwing IllegalMonitorStateException if it fails.
 * <li> Block until signalled or interrupted.
 * <li> Reacquire by invoking specialized version of
 *      {@link #acquire} with saved state as argument.
 * <li> If interrupted while blocked in step 4, throw InterruptedException.
 * </ol>
 */
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        // 此處進(jìn)行真正的阻塞
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

如上,可以看到,真正的阻塞工作又轉(zhuǎn)交給了另一個(gè)工具類: LockSupportpark 方法了,這回跟鎖扯上了關(guān)系,看起來已經(jīng)越來越接近事實(shí)了:

//?LockSupport.park()

/**
 * Disables the current thread for thread scheduling purposes unless the
 * permit is available.
 *
 * <p>If the permit is available then it is consumed and the call returns
 * immediately; otherwise
 * the current thread becomes disabled for thread scheduling
 * purposes and lies dormant until one of three things happens:
 *
 * <ul>
 * <li>Some other thread invokes {@link #unpark unpark} with the
 * current thread as the target; or
 *
 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
 * the current thread; or
 *
 * <li>The call spuriously (that is, for no reason) returns.
 * </ul>
 *
 * <p>This method does <em>not</em> report which of these caused the
 * method to return. Callers should re-check the conditions which caused
 * the thread to park in the first place. Callers may also determine,
 * for example, the interrupt status of the thread upon return.
 *
 * @param blocker the synchronization object responsible for this
 *        thread parking
 * @since 1.6
 */
public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);
    setBlocker(t, null);
}

看得出來,這里的實(shí)現(xiàn)就比較簡(jiǎn)潔了,先獲取當(dāng)前線程,設(shè)置阻塞對(duì)象,阻塞,然后解除阻塞。

好吧,到底什么是真正的阻塞,我們還是不得而知!

UNSAFE.park(false, 0L); 是個(gè)什么東西? 看起來就是這一句起到了最關(guān)鍵的作用呢!但由于這里已經(jīng)是 native 代碼,我們已經(jīng)無法再簡(jiǎn)單的查看源碼了!那咋整呢?

那不行就看C/C++的源碼唄,看一下 parker 的定義(park.hpp):

class Parker : public os::PlatformParker {
private:
  volatile int _counter ;
  Parker * FreeNext ;
  JavaThread * AssociatedWith ; // Current association

public:
  Parker() : PlatformParker() {
    _counter       = 0 ;
    FreeNext       = NULL ;
    AssociatedWith = NULL ;
  }
protected:
  ~Parker() { ShouldNotReachHere(); }
public:
  // For simplicity of interface with Java, all forms of park (indefinite,
  // relative, and absolute) are multiplexed into one call.  c中暴露出兩個(gè)方法給java調(diào)用
  void park(bool isAbsolute, jlong time);
  void unpark();

  // Lifecycle operators
  static Parker * Allocate (JavaThread * t) ;
  static void Release (Parker * e) ;
private:
  static Parker * volatile FreeList ;
  static volatile int ListLock ;

};

park() 方法到底是如何實(shí)現(xiàn)的呢? 其實(shí)是繼承的 os::PlatformParker 的功能,也就是平臺(tái)相關(guān)的私有實(shí)現(xiàn),以 Linux 平臺(tái)實(shí)現(xiàn)為例(os_linux.hpp):

// Linux中的parker定義
class PlatformParker : public CHeapObj<mtInternal> {
  protected:
    enum {
        REL_INDEX = 0,
        ABS_INDEX = 1
    };
    int _cur_index;  // which cond is in use: -1, 0, 1
    pthread_mutex_t _mutex [1] ;
    pthread_cond_t  _cond  [2] ; // one for relative times and one for abs.

  public:       // TODO-FIXME: make dtor private
    ~PlatformParker() { guarantee (0, "invariant") ; }

  public:
    PlatformParker() {
      int status;
      status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr());
      assert_status(status == 0, status, "cond_init rel");
      status = pthread_cond_init (&_cond[ABS_INDEX], NULL);
      assert_status(status == 0, status, "cond_init abs");
      status = pthread_mutex_init (_mutex, NULL);
      assert_status(status == 0, status, "mutex_init");
      _cur_index = -1; // mark as unused
    }
};

看到 park.cpp 中沒有重寫 park() 和 unpark() 方法,也就是說阻塞實(shí)現(xiàn)完全交由特定平臺(tái)代碼處理了(os_linux.cpp):

// park方法的實(shí)現(xiàn),依賴于 _counter, _mutex[1], _cond[2]
void Parker::park(bool isAbsolute, jlong time) {
  // Ideally we'd do something useful while spinning, such
  // as calling unpackTime().

  // Optional fast-path check:
  // Return immediately if a permit is available.
  // We depend on Atomic::xchg() having full barrier semantics
  // since we are doing a lock-free update to _counter.
  if (Atomic::xchg(0, &_counter) > 0) return;

  Thread* thread = Thread::current();
  assert(thread->is_Java_thread(), "Must be JavaThread");
  JavaThread *jt = (JavaThread *)thread;

  // Optional optimization -- avoid state transitions if there's an interrupt pending.
  // Check interrupt before trying to wait
  if (Thread::is_interrupted(thread, false)) {
    return;
  }

  // Next, demultiplex/decode time arguments
  timespec absTime;
  if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
    return;
  }
  if (time > 0) {
    unpackTime(&absTime, isAbsolute, time);
  }

  // Enter safepoint region
  // Beware of deadlocks such as 6317397.
  // The per-thread Parker:: mutex is a classic leaf-lock.
  // In particular a thread must never block on the Threads_lock while
  // holding the Parker:: mutex.  If safepoints are pending both the
  // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
  ThreadBlockInVM tbivm(jt);

  // Don't wait if cannot get lock since interference arises from
  // unblocking.  Also. check interrupt before trying wait
  if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
    return;
  }

  int status ;
  if (_counter > 0)  { // no wait needed
    _counter = 0;
    status = pthread_mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
    // Paranoia to ensure our locked and lock-free paths interact
    // correctly with each other and Java-level accesses.
    OrderAccess::fence();
    return;
  }

#ifdef ASSERT
  // Don't catch signals while blocked; let the running threads have the signals.
  // (This allows a debugger to break into the running thread.)
  sigset_t oldsigs;
  sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();
  pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
#endif

  OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
  jt->set_suspend_equivalent();
  // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()

  assert(_cur_index == -1, "invariant");
  if (time == 0) {
    _cur_index = REL_INDEX; // arbitrary choice when not timed
    status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
  } else {
    _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
    status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
    if (status != 0 && WorkAroundNPTLTimedWaitHang) {
      pthread_cond_destroy (&_cond[_cur_index]) ;
      pthread_cond_init    (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
    }
  }
  _cur_index = -1;
  assert_status(status == 0 || status == EINTR ||
                status == ETIME || status == ETIMEDOUT,
                status, "cond_timedwait");

#ifdef ASSERT
  pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
#endif

  _counter = 0 ;
  status = pthread_mutex_unlock(_mutex) ;
  assert_status(status == 0, status, "invariant") ;
  // Paranoia to ensure our locked and lock-free paths interact
  // correctly with each other and Java-level accesses.
  OrderAccess::fence();

  // If externally suspended while waiting, re-suspend
  if (jt->handle_special_suspend_equivalent_condition()) {
    jt->java_suspend_self();
  }
}

// unpark 實(shí)現(xiàn),相對(duì)簡(jiǎn)單些
void Parker::unpark() {
  int s, status ;
  status = pthread_mutex_lock(_mutex);
  assert (status == 0, "invariant") ;
  s = _counter;
  _counter = 1;
  if (s < 1) {
    // thread might be parked
    if (_cur_index != -1) {
      // thread is definitely parked
      if (WorkAroundNPTLTimedWaitHang) {
        status = pthread_cond_signal (&_cond[_cur_index]);
        assert (status == 0, "invariant");
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant");
      } else {
        // must capture correct index before unlocking
        int index = _cur_index;
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant");
        status = pthread_cond_signal (&_cond[index]);
        assert (status == 0, "invariant");
      }
    } else {
      pthread_mutex_unlock(_mutex);
      assert (status == 0, "invariant") ;
    }
  } else {
    pthread_mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
  }
}

從上面代碼可以看出,阻塞主要借助于三個(gè)變量,_cond、_mutex、_counter, 調(diào)用 Linux 系統(tǒng)的 pthread_cond_wait、pthread_mutex_lock、pthread_mutex_unlock (一組 POSIX 標(biāo)準(zhǔn)的阻塞接口)等平臺(tái)相關(guān)的方法進(jìn)行阻塞了!

而 park.cpp 中,則只有??Allocate、Release 等的一些常規(guī)操作!

// 6399321 As a temporary measure we copied & modified the ParkEvent::
// allocate() and release() code for use by Parkers.  The Parker:: forms
// will eventually be removed as we consolide and shift over to ParkEvents
// for both builtin synchronization and JSR166 operations.

volatile int Parker::ListLock = 0 ;
Parker * volatile Parker::FreeList = NULL ;

Parker * Parker::Allocate (JavaThread * t) {
  guarantee (t != NULL, "invariant") ;
  Parker * p ;

  // Start by trying to recycle an existing but unassociated
  // Parker from the global free list.
  // 8028280: using concurrent free list without memory management can leak
  // pretty badly it turns out.
  Thread::SpinAcquire(&ListLock, "ParkerFreeListAllocate");
  {
    p = FreeList;
    if (p != NULL) {
      FreeList = p->FreeNext;
    }
  }
  Thread::SpinRelease(&ListLock);

  if (p != NULL) {
    guarantee (p->AssociatedWith == NULL, "invariant") ;
  } else {
    // Do this the hard way -- materialize a new Parker..
    p = new Parker() ;
  }
  p->AssociatedWith = t ;          // Associate p with t
  p->FreeNext       = NULL ;
  return p ;
}

void Parker::Release (Parker * p) {
  if (p == NULL) return ;
  guarantee (p->AssociatedWith != NULL, "invariant") ;
  guarantee (p->FreeNext == NULL      , "invariant") ;
  p->AssociatedWith = NULL ;

  Thread::SpinAcquire(&ListLock, "ParkerFreeListRelease");
  {
    p->FreeNext = FreeList;
    FreeList = p;
  }
  Thread::SpinRelease(&ListLock);
}

綜上源碼,在進(jìn)行阻塞的時(shí)候,底層并沒有(并不一定)要用 while 死循環(huán)來阻塞,更多的是借助于操作系統(tǒng)的實(shí)現(xiàn)來進(jìn)行阻塞的。當(dāng)然,這也更符合大家的猜想!

從上的代碼我們也發(fā)現(xiàn)一點(diǎn),底層在做許多事的時(shí)候,都不忘考慮線程中斷,也就是說,即使在阻塞狀態(tài)也是可以接收中斷信號(hào)的,這為上層語言打開了方便之門。

如果要細(xì)說阻塞,其實(shí)還遠(yuǎn)沒完,不過再往操作系統(tǒng)層面如何實(shí)現(xiàn),就得再下點(diǎn)功夫,去翻翻資料了,把底線壓在操作系統(tǒng)層面,大多數(shù)情況下也夠用了!

標(biāo)簽: linux 安全 代碼

版權(quán)申明:本站文章部分自網(wǎng)絡(luò),如有侵權(quán),請(qǐng)聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀點(diǎn)!
本站所提供的圖片等素材,版權(quán)歸原作者所有,如需使用,請(qǐng)與原作者聯(lián)系。

上一篇:CAS實(shí)現(xiàn)SSO單點(diǎn)登錄

下一篇:搭建大眾點(diǎn)評(píng)CAT監(jiān)控平臺(tái)