語言中的數據安全傳遞機制
在寫代碼時,我們經常需要把一些數據從一個地方傳遞到另一個地方,或者在某個地方定義一個數據,在另一個地方使用,我們通常使用的方式是顯式傳遞或全局變量。但是在某些場景中,顯式傳遞會變得非常麻煩,比如處理請求時打印日志時帶上 logid,而全局變量在多線程環境中會存在問題。為了解決這些問題,提供更好的開發方式,開發者探索了一些其他的數據安全傳遞的機制,本文介紹這方面的內容。
C/C++ 線程局部存儲(TLS)
全局變量是最簡單的數據傳遞方式,因為它對所有代碼可見,在單線程中,每次操作對后面操作來說都是可見,在多線程中,需要額外的鎖機制。而線程局部存儲讓全局變量看起來是對所有線程可見,但實際上是對單個線程可見,它對單個線程來說保持了全局的效果,方便數據傳遞,對多線程來保持了數據的安全,不會導致線程間的數據互相修改,因為這個數據只存在某個線程的自己分配的內存中。下面是一個 C++ 中使用線程局部存儲的例子。
#include <iostream>
#include <thread>
thread_localint number = 0;
void worker() {
std::cout << "number: " << ++number << std::endl;
}
int main() {
std::thread t1(worker);
t1.join();
std::thread t2(worker);
t2.join();
return0;
}上面的代碼輸出如下:
number: 1
number: 1上面代碼的執行過程是:
- 創建線程 1,執行 ++number,這時 number 的值是 1,然后線程 1 退出。
- 創建線程 2,執行 ++number,這時 number 的值仍然是 1,然后線程 2 退出。 可以看到多個線程對同一個變量進行修改,但是是互相獨立的,因為實際上這個變量的內存是存在單獨的線程中的。
接下來再看一下 C 語言中線程局部存儲的例子。
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
pthread_key_t key;
void print() {
int *number = (int *)pthread_getspecific(key);
printf("print: %p, %d \n", pthread_self(), *number);
}
void * worker(void *arg) {
int number = rand();
printf("worker: %p, %d\n", pthread_self(), number);
if (pthread_setspecific(key, &number) != 0) {
perror("pthread_setspecific failed");
returnNULL;
}
print();
returnNULL;
}
int main() {
pthread_t threads[2];
if (pthread_key_create(&key, NULL) != 0) {
perror("pthread_key_create failed");
return1;
}
for (int i = 0; i < 2; i++) {
if (pthread_create(&threads[i], NULL, worker, NULL) != 0) {
perror("pthread_create failed");
return1;
}
}
for (int i = 0; i < 2; i++) {
pthread_join(threads[i], NULL);
}
pthread_key_delete(key);
return0;
}上面代碼輸出如下:
worker: 0x16ceef000, 16807
print: 0x16ceef000, 16807
worker: 0x16cf7b000, 282475249
print: 0x16cf7b000, 282475249C 語言中通過 pthread_key_create、pthread_setspecific 和 pthread_getspecific 三個 API 實現。
- pthread_key_create 創建一個 key,這個 key 在所有的線程中共享使用。
- pthread_setspecific 通過 key 設置線程中單獨的數據,這對其他線程是不可見的,因為它實際上是執行 pthread_setspecific 時把數據存在調用線程的內存空間中。
- pthread_getspecific 則是根據 key 從線程內存空間中獲取 pthread_setspecific 設置的數據。
- 如果有多個數據,則創建多個 key。
接著分析線程庫是如何實現線程局部存儲能力的,不同架構的實現不一樣這里的 x86_64。
下面是表示線程的數據結構。
typedef struct {
void *self; /* 指向 pthread */
} tcbhead_t;
// 表示一個線程的數據結構
struct pthread
{
union
{
tcbhead_t header;
}
pid_t tid;
int flags;
// 線程 TLS 相關的數據結構
struct pthread_key_data
{
uintptr_t seq;
void *data;
} specific_1stblock[PTHREAD_KEY_2NDLEVEL_SIZE];
// 和 specific_1stblock 類似
struct pthread_key_data *specific[PTHREAD_KEY_1STLEVEL_SIZE];
}下面是創建線程的代碼。
int __pthread_create_2_1 (pthread_t *newthread, constpthread_attr_t *attr,
void *(*start_routine) (void *), void *arg)
{
void *stackaddr = NULL;
size_t stacksize = 0;
struct pthread *pd = NULL;
// 分配一個 pthread 結構體
int err = allocate_stack (iattr, &pd, &stackaddr, &stacksize);
// header 是一個 tcbhead_t 結構體
#if TLS_TCB_AT_TP
/* Reference to the TCB itself. */
pd->header.self = pd;
/* Self-reference for TLS. */
pd->header.tcb = pd;
#endif
// 設置返回值
*newthread = (pthread_t) pd;
retval = create_thread (pd, iattr, &stopped_start, stackaddr,
stacksize, &thread_ran);
return retval;
}下面是 create_thread 的代碼。
static int create_thread (struct pthread *pd, const struct pthread_attr *attr,
bool *stopped_start, void *stackaddr,
size_t stacksize, bool *thread_ran)
{
// CLONE_SETTLS 設置 tls 相關
constint clone_flags = (CLONE_SETTLS | ...);
// x86_64 下實現如下
// # define TLS_DEFINE_INIT_TP(tp, pd) void *tp = (pd)
TLS_DEFINE_INIT_TP (tp, pd);
//
struct clone_args args =
{
.flags = clone_flags,
// ...
// 設置 FS 寄存器指向線程結構體
.tls = (uintptr_t) tp,
};
// 創建線程
int ret = __clone_internal (&args, &start_thread, pd);
return0;
}創建線程的過程中,線程庫調用 clone 系統調用時設置了 FS 寄存器指向 pthread 結構體,這是 tls 實現的基礎。
接著看使用 tls 的代碼。下面是 pthread_key_create 時創建的 key 相關的數據結構。
struct pthread_key_struct
{
// 偶數代表空虛的項
uintptr_t seq;
// 析構函數
void (*destr) (void *);
};
// 全局數據結構,用于 pthread_key_create
struct pthread_key_struct __pthread_keys[PTHREAD_KEYS_MAX];下面是 pthread_key_create 的代碼。
int ___pthread_key_create (pthread_key_t *key, void (*destr) (void *))
{
// 遍歷找到可用的 slot
for (size_t cnt = 0; cnt < PTHREAD_KEYS_MAX; ++cnt)
{
uintptr_t seq = __pthread_keys[cnt].seq;
// 空閑且可用, 設置該 slot 的 seq 為 偶數 + 1,即奇數,表示已占用,成功則返回 0,否則繼續找下一個
if (KEY_UNUSED (seq) && KEY_USABLE (seq) && ! atomic_compare_and_exchange_bool_acq (&__pthread_keys[cnt].seq, seq + 1, seq))
{
// 設置”析構函數“
__pthread_keys[cnt].destr = destr;
// 返回 key 給調用者
*key = cnt;
return0;
}
}
return EAGAIN;
}pthread_key_create 的實現很簡單,就是從一個全局數據結構數組中分配一個可用項,然后返回對應的索引,所有的線程通過這個索引在線程本身的數據結構 pthread 中存儲數據,從而達到數據只對某個線程可見的效果,即局部存儲。
接著看 pthread_setspecific 是如何設置數據的。
int ___pthread_setspecific (pthread_key_t key, constvoid *value)
{
struct pthread *self;
unsignedint idx1st;
unsignedint idx2nd;
struct pthread_key_data *level2;
unsignedint seq;
/*
// 從 FS 寄存器獲取當前線程對應的 pthread 結構體
# define THREAD_SELF \
({ struct pthread *__self; \
asm ("mov %%fs:%c1,%0" : "=r" (__self) \
: "i" (offsetof (struct pthread, header.self))); \
__self;})
# endif
*/
// 獲取當前線程對應的數據結構
self = THREAD_SELF;
/* Special case access to the first 2nd-level block. This is the
usual case. */
if (__glibc_likely (key < PTHREAD_KEY_2NDLEVEL_SIZE))
{
// 從當前線程數據結構的某個字段中獲取對應的項
level2 = &self->specific_1stblock[key];
}
else
{
// ...
}
// 記錄線程內可見的數據
level2->seq = seq;
level2->data = (void *) value;
return0;
}pthread_setspecific 第一部通過 FS 寄存器獲取了當前調用線程的數據結構,然后在該數據結構中保存對應的數據即可。pthread_getspecific 的實現也和 pthread_setspecific 相似,代碼如下。
void * ___pthread_getspecific (pthread_key_t key)
{
struct pthread_key_data *data;
// 從當前線程結構體獲取對應 key 的數據
if (__glibc_likely (key < PTHREAD_KEY_2NDLEVEL_SIZE))
data = &THREAD_SELF->specific_1stblock[key];
else
{
// ...
}
// 返回對應的數據
return data->data;
}Go 協程局部存儲
Go 本身沒有提供協程局部存儲,Go 官方建議通過 context 傳遞需要的數據,這樣就形成了每個函數都需要顯式傳遞一個 context 變量的習慣。民間有一些自己實現了協程存儲的庫:
- https://github.com/modern-go/gls。
- https://chai2010.cn/advanced-go-programming-book/ch3-asm/ch3-08-goroutine-id.html)。
大致的實現原理如下。
package main
import (
"fmt"
"runtime/debug"
"strconv"
"strings"
"sync"
)
var gls struct {
m map[int64]map[any]any
sync.Mutex
}
func init() {
gls.m = make(map[int64]map[any]any)
}
// 獲取當前的協程 ID
func currentGID() int64 {
stack := strings.TrimPrefix(string(debug.Stack()), "goroutine")
idField := strings.Fields(stack)[0]
id, err := strconv.ParseInt(idField, 10, 64)
if err != nil {
panic(fmt.Errorf("can not get goroutine id: %v", err))
}
return id
}
func tls() map[any]any {
gls.Lock()
defer gls.Unlock()
goid := currentGID()
if m, _ := gls.m[goid]; m != nil {
return m
}
gls.m[goid] = make(map[any]any)
return gls.m[goid]
}
func Get(key any) any {
return tls()[key]
}
func Set(key any, v any) {
tls()[key] = v
}
func worker(idx int) {
// 獲取協程局部存儲
fmt.Printf("idx=%d, gid=%d, number=%d\n", idx, currentGID(), Get("number"))
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 2; i++ {
wg.Add(1)
gofunc(idx int) {
defer wg.Done()
defer worker(idx)
// 設置協程局部存儲
Set("number", idx)
}(i)
}
wg.Wait()
}協程局部存儲的實現原理是在一個全局數據結構中,以協程 ID 為 key,存儲的數據為值,這樣在協程內的任何地方都可以獲取到其他地方設置的值的。
Node.js 代碼間數據傳遞
Node.js 的數據傳遞場景,除了同步情況下函數間傳遞,還有異步時,在回調里使用其他地方設置的數據,比如處理一個請求的過程中,每個同步/異步函數打印日志時都帶上 logid。
async_hooks 是 Node.js 中用于實現代碼間數據傳遞的機制,尤其用于追蹤異步操作,它的實現有點復雜。下面簡單介紹其內容。async_hooks 有三個核心概念:
- async_id:對象對應的唯一 id。
- trigger_async_id:創建對象時,對應的父對象的 async_id,比如在 A 對象的回調中創建 B 對象,則 B 對象的 trigger_async_id 是 A 對象的 async_id。
- execution_async_resource:當作正在執行的對象,比如執行 setTimeout 回調時,當作正在執行的對象是 setTimeout 創建的 Timeout 對象。
啟動時 async_id 是 1,trigger_async_id 是 0,execution_async_resource 是空對象。下面以 setTimeout 為例看一下 async_hooks 的流程。
function setTimeout(callback, after, arg1, arg2, arg3) {
const timeout = new Timeout(callback, after, args, false, true);
return timeout;
}
function Timeout(callback, after, args, isRepeat, isRefed) {
initAsyncResource(this, 'Timeout');
}
function initAsyncResource(resource, type) {
// 獲取新的 async_id
const asyncId = resource[async_id_symbol] = newAsyncId();
// 在哪個對象回調中創建了該 Timeout 對象,獲取其 async_id,默認 1
const triggerAsyncId = resource[trigger_async_id_symbol] = getDefaultTriggerAsyncId();
// 是否設置了init鉤子,是則觸發回調
if (initHooksExist())
emitInit(asyncId, type, triggerAsyncId, resource);
}執行 setTimeout 時,Node.js 會創建一個 Timeout 對象,并設置 async_hooks 的相關 id 到 Timeout 對象中。然后觸發init鉤子。
function emitInit(asyncId, type, triggerAsyncId, resource) {
emitInitNative(asyncId, type, triggerAsyncId, resource);
}當定時器到期時,會設置當前的執行資源為 Timeout 對象,async_id 為 Timeout async_id,trigger_async_id 為 Timeout 的 trigger_async_id,然后執行 before 鉤子。
function emitBeforeScript(asyncId, triggerAsyncId, resource) {
pushAsyncContext(asyncId, triggerAsyncId, resource);
if (hasHooks(kBefore))
emitBeforeNative(asyncId);
}執行 before 鉤子時我們就可以通過相關 API 獲取當前執行的 async_id 或資源,即 Timeout 對象的信息。當執行完 before 鉤子后,接著執行定時器的回調。然后執行 after 鉤子。這就是 Node.js 中,異步資源的整個生命周期和 async_hooks 鉤子的關系。但是 Promise 的情況稍微不同,因為 Promise 是 V8 實現的,但是原理相似,Node.js 通過設置 V8 的 Promise 鉤子感知 Promise 的生命周期,比如創建,before、after、settled 等,從而執行 async_hooks 定義的鉤子即可。Node.js 大部份的異步場景都是類似的方式實現的。
async_hooks 可以讓我們知道資源間的關系已經生命周期,在這個基礎上 AsyncLocalStorage 基于 async_hooks 實現了資源間數據的傳遞。接下來看看 AsyncLocalStorage 的實現。
const storageList = [];
const storageHook = createHook({
init(asyncId, type, triggerAsyncId, resource) {
// resource 為新創建的對象
// currentResource 為創建該對象的對象
const currentResource = executionAsyncResource();
for (let i = 0; i < storageList.length; ++i) {
storageList[i]._propagate(resource, currentResource, type);
}
},
});
class AsyncLocalStorage {
constructor(options = {}) {
this.kResourceStore = Symbol('kResourceStore');
this._enable();
}
_enable() {
// 開啟,感知每個創建的資源
if (!this.enabled) {
this.enabled = true;
ArrayPrototypePush(storageList, this);
storageHook.enable();
}
}
// Propagate the context from a parent resource to a child one
_propagate(resource, triggerResource, type) {
// 新創建對象指向創建該新對象的對象的數據,比如 A 創建 B,則 B 指向 A 的數據。
const store = triggerResource[this.kResourceStore];
if (this.enabled) {
resource[this.kResourceStore] = store;
}
}
enterWith(store) {
this._enable();
// 設置當前執行對象指向新的數據
const resource = executionAsyncResource();
resource[this.kResourceStore] = store;
}
run(store, callback, ...args) {
this._enable();
// 當前對象
const resource = executionAsyncResource();
const oldStore = resource[this.kResourceStore];
// 設置當前對象指向新的數據
resource[this.kResourceStore] = store;
// 執行函數,執行完后恢復指向的數據
try {
return ReflectApply(callback, null, args);
} finally {
resource[this.kResourceStore] = oldStore;
}
}
getStore() {
if (this.enabled) {
// 獲取當前對象指向的數據
const resource = executionAsyncResource();
return resource[this.kResourceStore];
}
returnthis.#defaultValue;
}
}AsyncLocalStorage 通過 enterWith 或 run 函數修改當前執行對象指向的數據,然后通過 async_hooks 的 init 鉤子函數感知在當前執行對象執行過程中創建的新對象,并把數據指針復制到新創建的對象中,從而實現了數據的傳遞。比如在對象 A 執行過程中創建了 B,然后 B 執行過程中又創建了 C,則 C 也指向了 A 的數據,從而實現在后續的執行對象的回調時都能直接獲取到前面代碼設置的數據。下面看一些例子。
- 同步代碼。
const { executionAsyncResource, AsyncLocalStorage } = require('async_hooks');
const als = new AsyncLocalStorage();
function run() {
// 默認執行對象
console.log(als.getStore());
}
// 設置默認執行對象的數據
als.enterWith({number: 1});
run();- 基于回調的異步代碼。
const { AsyncLocalStorage } = require('async_hooks');
const als = new AsyncLocalStorage();
function run() {
// 默認執行對象
console.log(als.getStore());
// 創建 Timeout 對象,繼承了默認對象的數據({number: 1})
setTimeout(function() {
// 執行對象為 Timeout,拿到數據 {number: 1}
console.log(als.getStore());
}, 100);
}
// 設置默認執行對象的數據
als.enterWith({number: 1});
run();- 基于 Promise 的異步代碼。
const { AsyncLocalStorage } = require('async_hooks');
const als = new AsyncLocalStorage();
function run() {
// 創建 Promise 對象,繼承了默認對象的數據({number: 1})
returnnewPromise((resolve) => {
// 創建 Timeout 對象,繼承了默認對象的數據({number: 1})
setTimeout(resolve, 1000);
})
}
// 設置默認執行對象的數據
als.enterWith({number: 1});
run().then(function() {
// 執行對象為 Promise,獲取 Promise 從默認對象繼承來的數據 ({number: 1})
console.log(als.getStore());
});
(asyncfunction() {
await run();
// 執行對象為 Promise,獲取 Promise 從默認對象繼承來的數據 ({number: 1})
console.log(als.getStore());
})();在比較新的 Node.js 版本中,已經使用了 V8 的 SetContinuationPreservedEmbedderData 和 GetContinuationPreservedEmbedderData API 實現數據的傳遞,該 API 使用例子如下。
void SetContext(const FunctionCallbackInfo<Value> &info) {
Isolate *isolate = info.GetIsolate();
Local<Value> value = info[0].As<Object>();
isolate->GetCurrentContext()->SetContinuationPreservedEmbedderData(value);
}
void GetContext(const FunctionCallbackInfo<Value> &info) {
Isolate *isolate = info.GetIsolate();
info.GetReturnValue().Set(
isolate->GetCurrentContext()->GetContinuationPreservedEmbedderData());
}JS 層使用如下。
const { getContext, setContext }= require('./build/Release/addon');
// 設置數據
setContext({"value": 1});
// 執行時會同步記錄上面設置的數據并傳遞下去
Promise.resolve().then(() => {
// {"value": 1} 可以一直傳下去
console.log(getContext())
Promise.resolve().then(() => {
// {"value": 1} 可以一直傳下去
console.log(getContext())
})
});
// 修改數據不會影響上面數據的傳遞
setContext({"value": 2});不過 Node.j 中的邏輯比這個復雜,后續再單獨介紹。
總結
在不同的語言中,都有數據安全傳遞的場景,一是為了減少顯式傳遞的麻煩,比如 logId 在處理請求代碼中的傳遞,二是為了保持數據的安全,比如多個線程/請求間數據不能互相影響。但是不同語言的解決方式不太一樣,C/C++ 中的場景是線程級的數據傳遞和隔離,這個是通過底層 C 庫和操作系統提供的能力實現的。Go 中則是協程級的,需要通過獲取協程 ID,然后以協程 ID 為 key 進行數據的存取。Node.js 本身是單線程的,需要實現的是數據在同步代碼和異步操作間的安全傳遞,這個需要感知操作對象的生命周期,并在操作對象間把數據傳遞下去,從而保證在每個操作對象的上下文都能獲取到前面設置的數據。



























