如何監(jiān)控 Node.js 線程的 CPU 負(fù)載?
雖然 Node.js 本身是單線程應(yīng)用,但是也支持創(chuàng)建額外的線程。在一個(gè)單進(jìn)程多線程的應(yīng)用中,觀測線程的 CPU 負(fù)載是非常有意義且必要的,因?yàn)橥ㄟ^進(jìn)程 CPU 負(fù)載我們看到的只是進(jìn)程內(nèi)所有線程的 CPU 負(fù)載之和,但是無法知道每個(gè)線程的負(fù)載,這樣在 CPU 負(fù)載高時(shí),我們就無法知道是哪個(gè)線程導(dǎo)致的。為了更好地了解各個(gè)線程的 CPU 負(fù)載,需要提供線程級別的 CPU 負(fù)載數(shù)據(jù)。目前,Libuv 已經(jīng)支持該能力,在比較新的 Node.js 版本中也引入了該能力,本文介紹線程 CPU 負(fù)載獲取的相關(guān)內(nèi)容。
在做 Node.js APM 時(shí),我們已經(jīng)通過 Addon + getrusage 獲取了線程 CPU 負(fù)載,其原理很簡單,getrusage 本身是支持獲取調(diào)用線程的 CPU 負(fù)載的,只不過之前因?yàn)槠脚_兼容性問題,Libuv 沒有支持該能力,現(xiàn)在 Libuv 兼容了更多平臺后,也是使用了類似的方式實(shí)現(xiàn)的。但是 Addon 一來比較麻煩,二來需要把代碼注入到目的線程,因?yàn)樵谀康木€程調(diào)用上面的函數(shù)才能獲取該線程的 CPU 負(fù)載,相對來說有一定的成本。
現(xiàn)在 Node.js 原生支持該能力后,首先解決了 Addon 的問題,我們只需要在目的線程調(diào)用 process.threadCpuUsage() 就能獲得當(dāng)前線程的 CPU 負(fù)載,但是問題二還是沒解決,還是需要進(jìn)行代碼注入,為了解決這個(gè)問題,我最近提交了一個(gè) PR,支持在主線程中獲取子線程的 CPU 負(fù)載,大致的用法如下。
const worker = new Worker(...);
await worker.cpuUsage();這樣我們就可以通過 process 的 worker 事件獲取每個(gè) worker(或者通過 diagnostics_channel),從而獲取 worker 的 CPU 負(fù)載,不需要在每個(gè)線程里注入代碼。實(shí)現(xiàn)如下。
const { Worker } = require('worker_threads');
process.on('worker', (worker) => {
setInterval(async () => {
const data = await worker.cpuUsage();
console.log(data);
}, 1000);
});
new Worker("setInterval(() => {}, 10000)", { eval:true });上面代碼就可以統(tǒng)一獲取所有線程的 CPU 負(fù)載,實(shí)現(xiàn)簡單并且邏輯解耦。
最后介紹下實(shí)現(xiàn)細(xì)節(jié)。
cpuUsage() {
const taker = this[kHandle]?.cpuUsage();
return new Promise((resolve, reject) => {
if (!taker) return reject(new ERR_WORKER_NOT_RUNNING());
taker.ondone = (err, current) => {
if (err !== null) {
return reject(err);
}
resolve({
user: current.user,
system: current.system,
});
};
});
}因?yàn)椴僮魇窃谀康木€程完成的,所以實(shí)現(xiàn)上采用的是異步方式,同步會阻塞調(diào)用 cpuUsage 的線程,完全沒有必要。cpuUsage 依賴 C++ 層的實(shí)現(xiàn)。
void Worker::CpuUsage(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
Environment* env = w->env();
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
Local<Object> wrap;
if (!env->worker_cpu_usage_taker_template()
->NewInstance(env->context())
.ToLocal(&wrap)) {
return;
}
BaseObjectPtr<WorkerCpuUsageTaker> taker =
MakeDetachedBaseObject<WorkerCpuUsageTaker>(env, wrap);
// 給子線程提交一個(gè)任務(wù)
bool scheduled = w->RequestInterrupt([taker = std::move(taker),
env](Environment* worker_env) mutable {
auto cpu_usage_stats = std::make_unique<uv_rusage_t>();
// 在子線程執(zhí)行 uv_getrusage_thread 獲取其 CPU 負(fù)載
int err = uv_getrusage_thread(cpu_usage_stats.get());
// 獲取完畢,給調(diào)用線程提交一個(gè)任務(wù)
env->SetImmediateThreadsafe(
[taker = std::move(taker),
cpu_usage_stats = std::move(cpu_usage_stats),
err = err](Environment* env) mutable {
Local<Value> argv[] = {
Null(isolate),
Undefined(isolate),
};
if (err) {
argv[0] = UVException(
isolate, err, "uv_getrusage_thread", nullptr, nullptr, nullptr);
} else {
Local<Name> names[] = {
FIXED_ONE_BYTE_STRING(isolate, "user"),
FIXED_ONE_BYTE_STRING(isolate, "system"),
};
Local<Value> values[] = {
Number::New(isolate,
1e6 * cpu_usage_stats->ru_utime.tv_sec +
cpu_usage_stats->ru_utime.tv_usec),
Number::New(isolate,
1e6 * cpu_usage_stats->ru_stime.tv_sec +
cpu_usage_stats->ru_stime.tv_usec),
};
argv[1] = Object::New(
isolate, Null(isolate), names, values, arraysize(names));
}
// 調(diào)用者線程執(zhí)行 JS 回調(diào),即 JS 的 ondone
taker->MakeCallback(env->ondone_string(), arraysize(argv), argv);
},
CallbackFlags::kUnrefed);
});
if (scheduled) {
args.GetReturnValue().Set(wrap);
}
}C++ 的實(shí)現(xiàn)有一點(diǎn)復(fù)雜,主要是因?yàn)樯婕暗蕉嗑€程之前的操作,有興趣的同學(xué)可以參考 https://github.com/nodejs/node/pull/59177。



























