发布于 

java多线程(一)

更新时间:2023年1月29日,本系列预计3篇文章,本篇文章更新60%
技术类文章欢迎勘误,勘误邮箱:report#ripic.site(#换@)

启动一个新的java线程还是比较简单的,下面代码通过Lamda表达式描述了java线程的启动方法

1
2
3
4
5
new Thread(()->{
for (int i = 0; i < 6; i++) {
System.out.println(i);
}
}).start();

Java的线程创建和启动非常简单,但如果问一个线程是怎么启动起来的往往并不清楚,甚至不知道为什么启动时是调用start(),而不是调用run()方法呢?

下面引用一张很经典的图片,来说明一个新的java线程被创建的过程:

通过上面的图片,我们很清晰的知道:

  • 线程的启动会涉及到本地方法(JNI)的调用,也就是那部分 C++ 编写的代码。
  • JVM 的实现中会有不同操作系统对线程的统一处理,比如:Win、Linux、Unix。
  • 线程的启动会涉及到线程的生命周期状态(RUNNABLE),以及唤醒操作,所以最终会有回调操作(也就是调用我们的 run() 方法)

因此我们应该清楚,我们在程序运行的过程中只涉及到start()方法的调用,在Thread内部由程序调用run()方法启动线程。

下面,深入源码查看一下线程的启动过程

start方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// JDK 源码
public synchronized void start() {
if (threadStatus != 0)
throw new IllegalThreadStateException();

group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {}
}
}
private native void start0();
  • 首先,start()方法是一个synchronized方法,为了避免多次调用,会进行threadStatus!=0的判断;
  • start0()在后续的源码中能够发现,是一个本地方法,与JVM虚拟机息息相关;
  • group.add(this)是把当前方法加入线程组。
    从上面的代码可以看出start0()是新建县城的核心,下面我们继续探究

start0()本地方法

我们注意到,在public class Thread implements Runnable{}中,存在一个静态代码块

1
2
3
4
5
private static native void registerNatives();
static {
registerNatives();
EMPTY_STACK_TRACE = new StackTraceElement[0];
}

这个方法中调用了本地方法registerNatives(),我们先来探究这个方法的底层实现。

源码:https://github.com/openjdk/jdk/blob/master/src/java.base/share/native/libjava/Thread.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static JNINativeMethod methods[] = {
{"start0", "()V", (void *)&JVM_StartThread},
{"stop0", "(" OBJ ")V", (void *)&JVM_StopThread},
{"isAlive", "()Z", (void *)&JVM_IsThreadAlive},
{"suspend0", "()V", (void *)&JVM_SuspendThread},
{"resume0", "()V", (void *)&JVM_ResumeThread},
{"setPriority0", "(I)V", (void *)&JVM_SetThreadPriority},
{"yield", "()V", (void *)&JVM_Yield},
{"sleep", "(J)V", (void *)&JVM_Sleep},
{"currentThread", "()" THD, (void *)&JVM_CurrentThread},
{"interrupt0", "()V", (void *)&JVM_Interrupt},
{"holdsLock", "(" OBJ ")Z", (void *)&JVM_HoldsLock},
{"getThreads", "()[" THD, (void *)&JVM_GetAllThreads},
{"dumpThreads", "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
{"setNativeName", "(" STR ")V", (void *)&JVM_SetNativeThreadName},
};

核心代码如上,主要是“注册”了一些核心的功能性方法,start0()调用的是JVM_StartThread,继续分析

JVM_StartThread

源码(2933行附近):https://github.com/openjdk/jdk/blob/master/src/hotspot/share/prims/jvm.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
// ...
JavaThread *native_thread = NULL;

bool throw_illegal_thread_state = false;
{
MutexLocker mu(Threads_lock);
if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {
throw_illegal_thread_state = true;
} else {
jlong size =
java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
NOT_LP64(if (size > SIZE_MAX) size = SIZE_MAX;)
size_t sz = size > 0 ? (size_t) size : 0;
native_thread = new JavaThread(&thread_entry, sz); //创建线程
if (native_thread->osthread() != NULL) {
native_thread->prepare(jthread);
}
}
}
// ...

Thread::start(native_thread); //启动线程

JVM_END

由于源码较多,仅保留了创建线程启动线程的源码,更多源码,请前往Github查看

注意到在创建线程的时候用到了thread_entry,下面贴一下他的源码

1
2
3
4
5
6
7
8
9
10
11
static void thread_entry(JavaThread* thread, TRAPS) {
HandleMark hm(THREAD);
Handle obj(THREAD, thread->threadObj());
JavaValue result(T_VOID);
JavaCalls::call_virtual(&result,
obj,
vmClasses::Thread_klass(),
vmSymbols::run_method_name(),
vmSymbols::void_method_signature(),
THREAD);
}

根据方法名可以猜测,这个方法起到了线程入口的作用,这里面包含了线程会掉函数call_virtual,这个方法会回调JVM(通过这个方法调用JVM中的函数)。
回调中涉及到vmSymbols方法(源码:https://github.com/openjdk/jdk/blob/master/src/hotspot/share/classfile/vmSymbols.hpp#L400)

1
2
#define VM_SYMBOLS_DO(template, do_alias)
template(run_method_name, "run")

这里就是调用java中的run()方法,现在整个调用过程逐渐清晰起来了。下面我们继续循着这个调用过程剖析调用过程。

创建线程

JavaThread

1
native_thread = new JavaThread(&thread_entry, sz);

源码:https://github.com/openjdk/jdk/blob/master/src/hotspot/share/runtime/javaThread.cpp#L592

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
JavaThread::JavaThread(ThreadFunction entry_point, size_t stack_sz) : JavaThread() {
_jni_attach_state = _not_attaching_via_jni;
set_entry_point(entry_point);
// Create the native thread itself.
// %note runtime_23
os::ThreadType thr_type = os::java_thread;
thr_type = entry_point == &CompilerThread::thread_entry ? os::compiler_thread :
os::java_thread;
os::create_thread(this, thr_type, stack_sz);
// The _osthread may be null here because we ran out of memory (too many threads active).
// We need to throw and OutOfMemoryError - however we cannot do this here because the caller
// may hold a lock and all locks must be unlocked before throwing the exception (throwing
// the exception consists of creating the exception object & initializing it, initialization
// will leave the VM via a JavaCall and then all locks must be unlocked).
//
// The thread is still suspended when we reach here. Thread must be explicit started
// by creator! Furthermore, the thread must also explicitly be added to the Threads list
// by calling Threads:add. The reason why this is not done here, is because the thread
// object must be fully initialized (take a look at JVM_Start)
}
  • ThreadFunction entry_point,就是我们上面的 thread_entry 方法。
  • size_t stack_sz,表示进程中已有的线程个数。这两个参数(thr_type,stack_sz),都会传递给 os::create_thread 方法,用于创建线程使用。

os::create_thread

这里以linux为例
源码:https://github.com/openjdk/jdk/blob/master/src/hotspot/os/linux/os_linux.cpp#L816

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
bool os::create_thread(Thread* thread, ThreadType thr_type,
size_t req_stack_size) {
assert(thread->osthread() == NULL, "caller responsible");
// ...
// set the correct thread state
osthread->set_thread_type(thr_type);

// Initial state is ALLOCATED but not INITIALIZED
osthread->set_state(ALLOCATED);

thread->set_osthread(osthread);

// init thread attributes
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

// Calculate stack size if it's not specified by caller.
size_t stack_size = os::Posix::get_initial_stack_size(thr_type, req_stack_size);
// In glibc versions prior to 2.27 the guard size mechanism
// is not implemented properly. The posix standard requires adding
// the size of the guard pages to the stack size, instead Linux
// takes the space out of 'stacksize'. Thus we adapt the requested
// stack_size by the size of the guard pages to mimic proper
// behaviour. However, be careful not to end up with a size
// of zero due to overflow. Don't add the guard page in that case.
size_t guard_size = os::Linux::default_guard_size(thr_type);
// Configure glibc guard page. Must happen before calling
// get_static_tls_area_size(), which uses the guard_size.
pthread_attr_setguardsize(&attr, guard_size);

size_t stack_adjust_size = 0;
if (AdjustStackSizeForTLS) {
// Adjust the stack_size for on-stack TLS - see get_static_tls_area_size().
stack_adjust_size += get_static_tls_area_size(&attr);
} else {
stack_adjust_size += guard_size;
}

stack_adjust_size = align_up(stack_adjust_size, os::vm_page_size());
if (stack_size <= SIZE_MAX - stack_adjust_size) {
stack_size += stack_adjust_size;
}
assert(is_aligned(stack_size, os::vm_page_size()), "stack_size not aligned");

int status = pthread_attr_setstacksize(&attr, stack_size);
if (status != 0) {
// pthread_attr_setstacksize() function can fail
// if the stack size exceeds a system-imposed limit.
assert_status(status == EINVAL, status, "pthread_attr_setstacksize");
log_warning(os, thread)("The %sthread stack size specified is invalid: " SIZE_FORMAT "k",
(thr_type == compiler_thread) ? "compiler " : ((thr_type == java_thread) ? "" : "VM "),
stack_size / K);
thread->set_osthread(NULL);
delete osthread;
return false;
}

ThreadState state;

{
ResourceMark rm;
pthread_t tid;
int ret = 0;
int limit = 3;
do {
ret = pthread_create(&tid, &attr, (void* (*)(void*)) thread_native_entry, thread);
} while (ret == EAGAIN && limit-- > 0);

char buf[64];
if (ret == 0) {
log_info(os, thread)("Thread \"%s\" started (pthread id: " UINTX_FORMAT ", attributes: %s). ",
thread->name(), (uintx) tid, os::Posix::describe_pthread_attr(buf, sizeof(buf), &attr));
} else {
log_warning(os, thread)("Failed to start thread \"%s\" - pthread_create failed (%s) for attributes: %s.",
thread->name(), os::errno_name(ret), os::Posix::describe_pthread_attr(buf, sizeof(buf), &attr));
// Log some OS information which might explain why creating the thread failed.
log_info(os, thread)("Number of threads approx. running in the VM: %d", Threads::number_of_threads());
LogStream st(Log(os, thread)::info());
os::Posix::print_rlimit_info(&st);
os::print_memory_info(&st);
os::Linux::print_proc_sys_info(&st);
os::Linux::print_container_info(&st);
}

pthread_attr_destroy(&attr);

if (ret != 0) {
// Need to clean up stuff we've allocated so far
thread->set_osthread(NULL);
delete osthread;
return false;
}

// Store pthread info into the OSThread
osthread->set_pthread_id(tid);

// Wait until child thread is either initialized or aborted
{
Monitor* sync_with_child = osthread->startThread_lock();
MutexLocker ml(sync_with_child, Mutex::_no_safepoint_check_flag);
while ((state = osthread->get_state()) == ALLOCATED) {
sync_with_child->wait_without_safepoint_check();
}
}
}

// The thread is returned suspended (in state INITIALIZED),
// and is started higher up in the call chain
assert(state == INITIALIZED, "race condition");
return true;
}
  • osthread->set_state(ALLOCATED),初始化已分配的状态,但此时并没有初始化。
  • pthread_create unix系统线程创建函数
  • thread_native_entry(旧:java_start)实际线程创建方法(Thread start routine for all newly created threads)

thread_native_entry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
static void *thread_native_entry(Thread *thread) {
// ...
int pid = os::current_process_id();
int random = ((pid ^ counter++) & 7) * 128;
void *stackmem = alloca(random != 0 ? random : 1); // ensure we allocate > 0
// Ensure the alloca result is used in a way that prevents the compiler from eliding it.
*(char *)stackmem = 1;
#endif

thread->initialize_thread_current();

OSThread* osthread = thread->osthread();
Monitor* sync = osthread->startThread_lock();

osthread->set_thread_id(os::current_thread_id());

if (UseNUMA) {
int lgrp_id = os::numa_get_group_id();
if (lgrp_id != -1) {
thread->set_lgrp_id(lgrp_id);
}
}
// initialize signal mask for this thread
PosixSignals::hotspot_sigmask(thread);

// initialize floating point control register
os::Linux::init_thread_fpu_state();

// handshaking with parent thread
{
MutexLocker ml(sync, Mutex::_no_safepoint_check_flag);

// notify parent thread
osthread->set_state(INITIALIZED);
sync->notify_all();

// wait until os::start_thread()
while (osthread->get_state() == INITIALIZED) {
sync->wait_without_safepoint_check();
}
}

log_info(os, thread)("Thread is alive (tid: " UINTX_FORMAT ", pthread id: " UINTX_FORMAT ").",
os::current_thread_id(), (uintx) pthread_self());

assert(osthread->pthread_id() != 0, "pthread_id was not set as expected");

// call one more level start routine
thread->call_run();

// Note: at this point the thread object may already have deleted itself.
// Prevent dereferencing it from here on out.
thread = NULL;

log_info(os, thread)("Thread finished (tid: " UINTX_FORMAT ", pthread id: " UINTX_FORMAT ").",
os::current_thread_id(), (uintx) pthread_self());

return 0;
}
  • osthread->set_state(INITIALIZED);:JVM 设置线程状态,INITIALIZED 初始化完成。
  • sync->notify_all():唤醒线程
  • osthread->get_state() == INITIALIZED:循环等待条件
  • thread->call_run();:是等待线程唤醒后,也就是状态变更后,才能执行到。这在我们的线程执行UML图中,也有所体现

启动线程

waiting

等地更新