跳到主要内容

LockSupport源码分析

· 阅读需 12 分钟

LockSupport简介

LockSupport可以控制线程的状态,从而达到线程在等待唤醒之间切换的目的,并且不用担心阻塞和唤醒操作的顺序,但要注意连续多次唤醒的效果和一次唤醒是一样的。

注意:unpark 函数可以先于 park 调用。

【LockSupport与的区别】

  • LockSupport.park和unpark不需要在同步代码块中,wait和notify是需要的。
  • LockSupport的pork和unpark是针对线程的,而wait和notify是可以是任意对象。
  • LockSupport的unpark可以让指定线程被唤醒,但是notify是随机唤醒一个,notifyAll是全部唤醒,不够灵活。

park和unpark都是调用native方法,由JVM实现:

// java/util/concurrent/locks/LockSupport.java
public static void park() {
U.park(false, 0L);
}

// jdk/internal/misc/Unsafe.java
@HotSpotIntrinsicCandidate
public native void park(boolean isAbsolute, long time);

// java/util/concurrent/locks/LockSupport.java
public static void unpark(Thread thread) {
if (thread != null)
U.unpark(thread);
}

// jdk/internal/misc/Unsafe.java
@HotSpotIntrinsicCandidate
public native void unpark(Object thread);

Parker

// src/hotspot/share/runtime/thread.hpp
// JSR166 per-thread parker
private:
Parker* _parker;
public:
Parker* parker() { return _parker; }

线程内持有一个Parker类型的指针_parker,Parker类的定义里有个_counter变量:

public:
Parker() : PlatformParker() {
_counter = 0 ;
FreeNext = NULL ;
AssociatedWith = NULL ;
}

unpark

// src/hotspot/share/prims/unsafe.cpp
UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread)) {
Parker* p = NULL;

if (jthread != NULL) {
ThreadsListHandle tlh;
JavaThread* thr = NULL;
// oop是oopDesc*的别名,oopDesc是object类的基类,是对Java对象的描述,便于在C++中访问对象中的域。
oop java_thread = NULL;
// 将线程引用转为Java线程对象
(void) tlh.cv_internal_thread_to_JavaThread(jthread, &thr, &java_thread);
if (java_thread != NULL) {
// 尝试通过缓存的_parker的偏移量(_park_event_offset)来获取_parker的地址
jlong lp = java_lang_Thread::park_event(java_thread);
if (lp != 0) {
// 将地址转为Parker指针
p = (Parker*)addr_from_java(lp);
} else {
// 如果没有缓存_parker的偏移量(偏移量默认为0)
if (thr != NULL) {
p = thr->parker();
if (p != NULL) {
// 缓存_parker的偏移量
java_lang_Thread::set_park_event(java_thread, addr_to_java(p));
}
}
}
}
}

if (p != NULL) {
HOTSPOT_THREAD_UNPARK((uintptr_t) p);
// 调用Parker的unpark方法
p->unpark();
}
} UNSAFE_END
// src/hotspot/share/classfile/javaClasses.cpp
jlong java_lang_Thread::park_event(oop java_thread) {
if (_park_event_offset > 0) {
return java_thread->long_field(_park_event_offset);
}
return 0;
}

oopDesc是object类的基类,是对Java对象的描述,便于在C++中访问对象中的域。上面的代码主要是利用_parker在线程对象内的偏移量来获取Parker对象_parker,并调用其unpark方法。

// src/hotspot/os/posix/os_posix.cpp
void Parker::unpark() {
int status = pthread_mutex_lock(_mutex);
assert_status(status == 0, status, "invariant");
const int s = _counter;
_counter = 1;
// must capture correct index before unlocking
int index = _cur_index;
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "invariant");

// s记录的是unpark之前的_counter数,如果s < 1,说明有可能该线程在等待状态,需要唤醒。
if (s < 1 && index != -1) {
// 发信号唤醒线程
status = pthread_cond_signal(&_cond[index]);
assert_status(status == 0, status, "invariant");
}
}

_cur_index 有3个值:-1,0,1,默认是-1。_cur_index代表被使用cond的index。

【pthread_mutex_unlock、pthread_cond_signal先后顺序】

解锁互斥量mutex和发出唤醒信号condition_signal是两个单独的操作,那么就存在一个顺序的问题。谁先随后可能会产生不同的结果。如下:

  1. 按照 condition_signal(); unlock(mutext)顺序,当等待线程被唤醒时,它试图锁住mutex,但是如果此时mutex还未解锁,则线程又进入睡眠,mutex成功解锁后,此线程在再次被唤醒并锁住mutex,从而从condition_wait()中返回。
  2. 按照 unlock(mutex); condition_signal()顺序, 当等待的线程被唤醒时,因为mutex已经解锁,因此被唤醒的线程很容易就锁住了mutex然后从conditon_wait()中返回了。

对于1,等待线程可能会发生2次的上下文切换,严重影响性能。可以使用wait morphing优化:如果线程被唤醒但是不能锁住mutex,则线程被转移(morphing)到互斥量mutex的等待队列中,避免了上下文的切换造成的开销。

对于2,是一种优化,不过可能引起的问题是线程的优先级倒置,实时系统对可预测性要求高,而普通的应用程序不是问题。

pthread简介

POSIX线程(POSIX threads),简称Pthreads,是线程的POSIX标准。该标准定义了创建和操纵线程的一整套API。在类Unix操作系统(Unix、Linux、Mac OS X等)中,都使用Pthreads作为操作系统的线程。

Pthreads API中的函数可以非正式的划分为三大类:

  • 线程管理(Thread management): 第一类函数直接用于线程:创建(creating),分离(detaching),连接(joining)等等。包含了用于设置和查询线程属性(可连接,调度属性等)的函数。
  • 互斥量(Mutexes): 第二类函数是用于线程同步的,称为互斥量(mutexes),是"mutual exclusion"的缩写。 Mutex函数提供了创建,销毁,锁定和解锁互斥量的功能。同时还包括了一些用于设定或修改互斥量属性的函数。
  • 条件变量(Condition variables):第三类函数处理共享一个互斥量的线程间的通信,基于程序员指定的条件。这类函数包括指定的条件变量的创建,销毁,等待和受信(signal)。设置查询条件变量属性的函数也包含其中。

条件变量

条件变量对于 wait 端:

  1. 必须与 mutex 一起使用,该布尔表达式的读写需受此 mutex 保护。
  2. 在 mutex 已上锁的时候才能调用 wait()。
  3. 把判断布尔条件和 wait() 放到 while 循环中。

对于 signal/broadcast 端:

  1. 不一定要在 mutex 已上锁的情况下调用 signal (理论上)。
  2. 在 signal 之前一般要修改布尔表达式。
  3. 修改布尔表达式通常要用 mutex 保护(至少用作 full memory barrier)。
  4. 注意区分 signal 与 broadcast:“broadcast 通常用于表明状态变化,signal 通常用于表示资源可用。(broadcast should generally be used to indicate state change rather than resource availability。)

总之,使用条件变量,调用 signal() 的时候无法知道是否已经有线程等待在 wait() 上。因此一般总是 要先修改“条件”,使其为 true,再调用 signal();这样 wait 线程先检查“条件”,只有当条件不成立时才去 wait(),避免了丢事件的可能 。换言之,通过使用“条件”,将边沿触发(edge trigger)改为电平触发(level trigger)。这里 “修改条件”和“检查条件”都必须在 mutex 保护下进行,而且这个 mutex 必须用于配合 wait()

park

// src/hotspot/os/posix/os_posix.cpp
void Parker::park(bool isAbsolute, jlong time) {
// 原子性地将_counter置为0,如果_counter原值大于0,说明已获得许可,直接返回
if (Atomic::xchg(0, &_counter) > 0) return;

Thread* thread = Thread::current();
assert(thread->is_Java_thread(), "Must be JavaThread");
JavaThread *jt = (JavaThread *)thread;

// 如果线程被中断,直接返回
if (Thread::is_interrupted(thread, false)) {
return;
}

struct timespec absTime;
// 如果isAbsolute为true,time表示等到某个时刻,否则time表示等待多久
// 如果time不合法,或是要等到纪元时间时间(显然已过),则直接返回
if (time < 0 || (isAbsolute && time == 0)) { // don't wait at all
return;
}
// 将time统一转为绝对时刻
if (time > 0) {
to_abstime(&absTime, time, isAbsolute);
}

// 进入安全区
ThreadBlockInVM tbivm(jt);

// 再次判断线程是否被中断,如果没有被中断,尝试获得互斥锁,如果获取失败,直接返回
if (Thread::is_interrupted(thread, false) ||
pthread_mutex_trylock(_mutex) != 0) {
return;
}

int status;
// 再次检查_counter,如果>0,不需要等待
if (_counter > 0) { // no wait needed
// 将_counter置为0,并释放锁
_counter = 0;
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "invariant");
// 插入内存屏障,确保_counter的可见性
OrderAccess::fence();
return;
}

// 设置线程状态为CONDVAR_WAIT
OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
jt->set_suspend_equivalent();

assert(_cur_index == -1, "invariant");
if (time == 0) {
// 如果time为0,表示不设等待超时
// 让线程等待_cond[REL_INDEX]信号,线程进入等待状态
_cur_index = REL_INDEX; // arbitrary choice when not timed
status = pthread_cond_wait(&_cond[_cur_index], _mutex);
assert_status(status == 0, status, "cond_timedwait");
}
else {
// 线程进入有超时的等待
_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
status = pthread_cond_timedwait(&_cond[_cur_index], _mutex, &absTime);
assert_status(status == 0 || status == ETIMEDOUT,
status, "cond_timedwait");
}
// 等待结束后,恢复变量,释放锁
_cur_index = -1;

_counter = 0;
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "invariant");
// 插入内存屏障
OrderAccess::fence();

if (jt->handle_special_suspend_equivalent_condition()) {
jt->java_suspend_self();
}
}

xchg

xchg 在x86平台是直接调用xchg指令:

// src/hotspot/os_cpu/linux_x86/atomic_linux_x86.hpp
template<>
template<typename T>
inline T Atomic::PlatformXchg<4>::operator()(T exchange_value,
T volatile* dest,
atomic_memory_order order) const {
STATIC_ASSERT(4 == sizeof(T));
__asm__ volatile ( "xchgl (%2),%0"
: "=r" (exchange_value)
: "0" (exchange_value), "r" (dest)
: "memory");
return exchange_value;
}

内存屏障

// src/hotspot/os_cpu/linux_x86/orderAccess_linux_x86.hpp
inline void OrderAccess::fence() {
// always use locked addl since mfence is sometimes expensive
#ifdef AMD64
__asm__ volatile ("lock; addl $0,0(%%rsp)" : : : "cc", "memory");
#else
__asm__ volatile ("lock; addl $0,0(%%esp)" : : : "cc", "memory");
#endif
compiler_barrier();
}

参考资料