Linux 多線程的信號處理機制
信號機制是操作系統中非常重要的部分,它可以用于進程/線程間通信、控制進程線程的行為和處理高優的任務,GDB、CPU Profile 采集、Go 搶占式調度都依賴信號機制。信號的大致原理不是很復雜,在單進程單線程環境中,信號機制類似一個訂閱發布模式,用戶注冊信號處理函數,收到信號時,操作系統執行對應的函數,但是在多線程環境下,里面的邏輯就變得復雜了,比如說我們是否可以單獨給線程發送信號,應該在哪個線程中注冊信號處理函數,給進程發送信號時哪個線程會處理等等。本文從 Linux 源碼角度分析信號的實現原理。
進程的信號原理
首先從早期的內核代碼(1.2.13)看一下信號的實現,因為早期的代碼易于我們理解原理。我們知道 Linux 不區分進程線程,統一使用 task_struct 來表示,task_struct 中有幾個字段和信號機制相關。
unsigned long signal; // 當前收到的信號,每一 bit 對應一個信號,比如 0b10 對應信號 2
unsigned long blocked; // 屏蔽這些信號,即不處理這些信號
/*
struct sigaction {
__sighandler_t sa_handler; // 處理函數
sigset_t sa_mask;
unsigned long sa_flags;
void (*sa_restorer)(void);
};
*/
struct sigaction sigaction[32]; // 信號對應的處理函數,和信號的值一一對應,比如信號 1 對應數組 第一個元素了解了和信號相關的數據結構后,接著從注冊信號、發送信號、處理信號幾個方面分析信號的實現。
注冊信號
asmlinkage int sys_sigaction(int signum, const struct sigaction * action,
struct sigaction * oldaction)
{
struct sigaction new_sa, *p;
// current 表示當前調用進程,p 指向 signum 對應的處理函數
p = signum - 1 + current->sigaction;
if (action) {
// 復制內存
memcpy_fromfs(&new_sa, action, sizeof(struct sigaction));
}
// 返回舊的處理函數
if (oldaction) {
int err = verify_area(VERIFY_WRITE, oldaction, sizeof(*oldaction));
if (err)
return err;
memcpy_tofs(oldaction, p, sizeof(struct sigaction));
}
// 設置新的處理函數
if (action) {
*p = new_sa;
check_pending(signum);
}
return0;
}注冊信號就是在 task_struct 的 sigaction 中記錄信號對應的處理函數。
發送信號
我們應該都試過用 kill 命令給某個進程發送信息,我們也可以通過操作系統底層提供的 kill 系統調用給進程發送信息。
asmlinkage int sys_kill(int pid,int sig)
{
int err, retval = 0, count = 0;
// 如果沒有傳pid,則給該進程所在組所有進程發該信號
if (!pid)
return(kill_pg(current->pgrp,sig,0));
// 如果pid等于-1,則給除了自己和0進程外的所有進程發該信號
if (pid == -1) {
struct task_struct * p;
for_each_task(p) {
if (p->pid > 1 && p != current) {
++count;
if ((err = send_sig(sig,p,0)) != -EPERM)
retval = err;
}
}
return(count ? retval : -ESRCH);
}
// 如果pid等于除-1外的負數,則取絕對值后,給該進程組發該信號
if (pid < 0)
return(kill_pg(-pid,sig,0));
/* Normal kill */
// 否則給某個進程發該信號
return(kill_proc(pid,sig,0));
}sys_kill 支持多種場景,這里我們只關注給指定進程發送的部分。
int kill_proc(int pid, int sig, int priv)
{
struct task_struct *p;
// 遍歷進程列表,找到對應的進程,然后發送信號
for_each_task(p) {
if (p && p->pid == pid)
return send_sig(sig,p,priv);
}
return(-ESRCH);
}
int send_sig(unsigned long sig,struct task_struct * p,int priv)
{
// 設置對應的位為 1
p->signal |= 1 << (sig-1);
return0;
}發送信號的實現很簡單,就是在 task_struct 的 signal 字段置對應的為 1,表示收到了該信息。從這里可以看到重復發生信號可能會發生覆蓋,最終只會處理一次,不過現在的內核版本已經支持記錄每一個收到的信號了。
處理信號
剛才看到,發送信號時只是打了個標記,并沒有處理該信號,也就是執行用戶注冊的函數。那么什么時候才會處理呢?處理的時機有幾個,比如中斷處理后、系統調用結束后。下面是系統調用后處理信號的邏輯。
_system_call:
pushl %eax # save orig_eax
SAVE_ALL
// 執行系統調用
movl _sys_call_table(,%eax,4),%eax
call *%eax
movl %eax,EAX(%esp) # save the return value
movl errno(%ebx),%edx
negl %edx
je ret_from_sys_call
ret_from_sys_call:
// 當天進程結構體賦值到 eax
movl _current,%eax
// 判斷是否有信號并且沒有被屏蔽
movl blocked(%eax),%ecx
movl %ecx,%ebx
notl %ecx
andl signal(%eax),%ecx
// 非 0 說明有信號需要處理
jne signal_return
signal_return:
movl %esp,%ecx
pushl %ecx
pushl %ebx
// 處理信號
call _do_signal
popl %ebx
popl %ebx接著看一些 do_signal 的實現。
asmlinkage int do_signal(unsigned long oldmask, struct pt_regs * regs)
{
// mask 等于 blocked 取反,表示沒有被屏蔽的信號
unsignedlong mask = ~current->blocked;
unsignedlong handler_signal = 0;
unsignedlong *frame = NULL;
unsignedlong eip = 0;
unsignedlong signr;
struct sigaction * sa;
// 收集需要處理的信號
while ((signr = current->signal & mask)) {
// 獲取
__asm__("bsf %3,%1\n\t"
"btrl %1,%0"
:"=m" (current->signal),"=r" (signr)
:"0" (current->signal), "1" (signr));
signr++;
// 哪些信息需要處理
handler_signal |= 1 << (signr-1);
// 執行當前信號時需要屏蔽的信息,取反再與得到最終需要處理的信息
mask &= ~sa->sa_mask;
}
// 當前的指令地址
eip = regs->eip;
frame = (unsignedlong *) regs->esp;
signr = 1;
sa = current->sigaction;
// 逐個信號處理
for (mask = 1 ; mask ; sa++,signr++,mask += mask) {
// 構造棧內存布局
setup_frame(sa,&frame,eip,regs,signr,oldmask);
// do_signal 執行完畢后執行的執行
eip = (unsignedlong) sa->sa_handler;
current->blocked |= sa->sa_mask;
oldmask |= sa->sa_mask;
}
// 設置信息的棧地址和指令
regs->esp = (unsignedlong) frame;
regs->eip = eip; /* "return" to the first handler */
return1;
}
void setup_frame(struct sigaction * sa,
unsigned long ** fp, // 當前的棧地址
unsigned long eip,
struct pt_regs * regs,
int signr,
unsigned long oldmask)
{
unsignedlong * frame;
frame = *fp;
// ...
put_fs_long(signr, frame+1);
put_fs_long(eip, frame+16);// 執行完處理函數后的回跳地址
put_fs_long(regs->cs, frame+17);
put_fs_long(regs->eflags, frame+18);
put_fs_long(regs->esp, frame+19);
put_fs_long(regs->ss, frame+20);
put_fs_long(0x0000b858, CODE(0)); /* popl %eax ; movl $,%eax */
put_fs_long(0x80cd0000, CODE(4)); /* int $0x80 */
// 恢復現場,回到正常流程。
put_fs_long(__NR_sigreturn, CODE(2));
}信號處理的過程涉及到的東西比較復雜,如果大家對函數調用時棧的布局了解的話應該會比較好理解,大概就是在當前的棧上進行內存布局,讓所有的處理函數連成一條執行鏈,然后 do_signal 執行完后會從第一個執行函數開始執行,一直執行到最后一個,最終恢復現場回到正常執行流程。Go 的搶占式調度同樣是用了類似的原理,就是在信號處理函數里修改棧內存注入自定義的函數,信號處理完成后,執行自定義的函數實現搶占。
多線程的信號原理
通過剛才的介紹,大概了解了信號處理的過程和原理。但是多線程中的情況有一點不一樣,在 Linux 中,可以給進程或線程發送信號,線程有自己的接收信號和信號屏蔽字,但是進程內的所有線程共享信號處理函數,另外線程還會共享進程收到的信號。
圖片
下面通過內核源碼看一下實現(2.6.11.1),該版本代碼中和信號相關的字段如下。
/*
struct signal_struct {
// 進程級的信號,多個線程共享
struct sigpending shared_pending;
}
*/
struct signal_struct *signal; // 進程級信號
struct sighand_struct *sighand; // 進程級信號處理函數
sigset_t blocked, real_blocked; // 線程級信號屏蔽字
/*
// 同一個信號可能收到多次,在 list 中排隊,通過 signal 表示收到了什么信號
struct sigpending {
struct list_head list; // 收到的所有信號
sigset_t signal; // 收到了哪些信號
};
*/
struct sigpending pending; // 線程級收到的信號
unsigned long sas_ss_sp; // 信號處理函數的棧
size_t sas_ss_size;創建線程
當通過 clone 創建線程時,會對上面的字段進行處理。
asmlinkage int sys_clone(struct pt_regs regs)
{
return do_fork(...);
}
long do_fork(...)
{
struct task_struct *p;
// 復制內容
p = copy_process(clone_flags, stack_start, regs, stack_size, parent_tidptr, child_tidptr, pid);
return pid;
}
static task_t *copy_process(...)
{
int retval;
struct task_struct *p = NULL;
// 創建線程必須設置 CLONE_SIGHAN,共享信號處理函數
if ((clone_flags & CLONE_THREAD) && !(clone_flags & CLONE_SIGHAND))
return ERR_PTR(-EINVAL);
// 獲取一個新的 task_struct 結構體,內容復制當前進程的
p = dup_task_struct(current);
// 初始化信號相關字段
clear_tsk_thread_flag(p, TIF_SIGPENDING);
init_sigpending(&p->pending);
/*
// 引用計數加一
if (clone_flags & (CLONE_SIGHAND | CLONE_THREAD)) {
atomic_inc(¤t->sighand->count);
return 0;
}
*/
copy_sighand(clone_flags, p));
/*
// 引用計數加一
if (clone_flags & CLONE_THREAD) {
atomic_inc(¤t->signal->count);
atomic_inc(¤t->signal->live);
return 0;
}
*/
copy_signal(clone_flags, p));
}可以每個線程都有自己接收的信號(p->pending 字段),但是進程級的數據結構只是引用計數加一,也就是說它們是多個線程共享的。
發送信號
接著看給線程發送信號時的過程。
// tgid 為線程組 id,即進程 id,pid 為線程 id,即 tid
asmlinkage long sys_tgkill(int tgid, int pid, int sig)
{
struct siginfo info;
int error;
struct task_struct *p;
info.si_signo = sig;
info.si_errno = 0;
info.si_code = SI_TKILL;
info.si_pid = current->tgid;
info.si_uid = current->uid;
// 根據線程 id 找到對應的 task_struct
p = find_task_by_pid(pid);
error = -ESRCH;
// 只能給同進程的線程發
if (p && (p->tgid == tgid)) {
if (sig && p->sighand) {
error = specific_send_sig_info(sig, &info, p);
}
}
return error;
}
static int specific_send_sig_info(int sig, struct siginfo *info, struct task_struct *t)
{
int ret = 0;
ret = send_signal(sig, info, t, &t->pending);
return ret;
}
static int send_signal(int sig, struct siginfo *info, struct task_struct *t,
struct sigpending *signals)
{
struct sigqueue * q = NULL;
int ret = 0;
// 分配一個 struct sigqueue,表示一個信號
q = __sigqueue_alloc(t, GFP_ATOMIC);
if (q) {
// 插入 task_struct 結構體 pending 字段的隊列,即線程級的信號
list_add_tail(&q->list, &signals->list);
// ...
}
// 設置 bitmap,表示收到該信號
sigaddset(&signals->signal, sig);
}給線程發送信號,最后就是在 task_struct 的 pending 字段記錄置位并把信號信息加入隊列中。接下來,再看下通過 sys_kill 給進程發送信號的流程。
asmlinkage long sys_kill(int pid, int sig)
{
struct siginfo info;
info.si_signo = sig;
info.si_errno = 0;
info.si_code = SI_USER;
info.si_pid = current->tgid;
info.si_uid = current->uid;
return kill_something_info(sig, &info, pid);
}
static int kill_something_info(int sig, struct siginfo *info, int pid)
{
return kill_proc_info(sig, info, pid);
}
int kill_proc_info(int sig, struct siginfo *info, pid_t pid)
{
int error;
struct task_struct *p;
read_lock(&tasklist_lock);
// 通過 pid 找到進程結構體
p = find_task_by_pid(pid);
error = -ESRCH;
if (p)
error = group_send_sig_info(sig, info, p);
read_unlock(&tasklist_lock);
return error;
}
int group_send_sig_info(int sig, struct siginfo *info, struct task_struct *p)
{
unsignedlong flags;
int ret;
if (!ret && sig && p->sighand) {
ret = __group_send_sig_info(sig, info, p);
}
return ret;
}
staticint __group_send_sig_info(int sig, struct siginfo *info, struct task_struct *p)
{
int ret = 0;
ret = send_signal(sig, info, p, &p->signal->shared_pending);
__group_complete_signal(sig, p);
return0;
}給進程發送信號的流程和線程的類似,最終都是調 send_signal,但是有一些區別,給線程發送時 send_signal 的最后一個參數是 p->pending,給進程發送時參數是 p->signal->shared_pending,所以分別是在不同的字段記錄了接收到的信號,另外還有一個核心的邏輯在__group_complete_signal 中。
// p 沒有屏蔽該信號,p 不是退出狀態,p 是當前 task_struct 或沒有待處理的信號
#define wants_signal(sig, p, mask) \
(!sigismember(&(p)->blocked, sig) \
&& !((p)->state & mask) \
&& !((p)->flags & PF_EXITING) \
&& (task_curr(p) || !signal_pending(p)))
staticvoid __group_complete_signal(int sig, struct task_struct *p)
{
unsignedint mask;
struct task_struct *t;
// p 適合處理該信號則給 p
if (wants_signal(sig, p, mask))
t = p;
elseif (thread_group_empty(p))
/*
* There is just one thread and it does not need to be woken.
* It will dequeue unblocked signals before it runs again.
*/
return;
else {
/*
* Otherwise try to find a suitable thread.
*/
t = p->signal->curr_target;
if (t == NULL)
/* restart balancing at this thread */
t = p->signal->curr_target = p;
// 遍歷進程下的線程看哪個適合處理
while (!wants_signal(sig, t, mask)) {
t = next_thread(t);
if (t == p->signal->curr_target)
/*
* No thread needs to be woken.
* Any eligible threads will see
* the signal in the queue soon.
*/
return;
}
p->signal->curr_target = t;
}
// 喚醒選擇的線程處理信號
signal_wake_up(t, ....);
return;
}給線程發送信號時,給哪個線程發就在哪個線程的上下文執行信號處理函數,但是給進程發信號時,Linux 會根據情況選擇一個適合處理該信號的線程,然后喚醒它處理。
處理信號
最后再看下信號處理的過程。
void do_notify_resume(struct pt_regs *regs, sigset_t *oldset,
__u32 thread_info_flags)
{
if (thread_info_flags & _TIF_SIGPENDING)
do_signal(regs,oldset);
}
int fastcall do_signal(struct pt_regs *regs, sigset_t *oldset)
{
siginfo_t info;
int signr;
struct k_sigaction ka;
// 獲取一個信號
signr = get_signal_to_deliver(&info, &ka, regs, NULL);
if (signr > 0) {
// 處理信號
handle_signal(signr, &info, &ka, oldset, regs);
return1;
}
return0;
}處理信號的過程和前面的介紹類似,修改棧內存布局,注入信號處理函數,執行完 do_notify_resume 后開始執行信號處理函數,最終返回再返回正常流程。這里主要看一下獲取信號的流程。
int get_signal_to_deliver(siginfo_t *info, struct k_sigaction *return_ka,
struct pt_regs *regs, void *cookie)
{
sigset_t *mask = ¤t->blocked;
int signr = 0;
relock:
spin_lock_irq(¤t->sighand->siglock);
for (;;) {
struct k_sigaction *ka;
// 獲取一個信號
signr = dequeue_signal(current, mask, info);
// 設置信號處理函數
ka = ¤t->sighand->action[signr-1];
}
return signr;
}
int dequeue_signal(struct task_struct *tsk, sigset_t *mask, siginfo_t *info)
{
// 先從線程自己的信號字段獲取
int signr = __dequeue_signal(&tsk->pending, mask, info);
// 沒有再從進程的信號獲取
if (!signr)
signr = __dequeue_signal(&tsk->signal->shared_pending,
mask, info);
return signr;
}
staticint __dequeue_signal(struct sigpending *pending, sigset_t *mask,
siginfo_t *info)
{
int sig = 0;
// 獲取下一個待處理的信息
sig = next_signal(pending, mask);
if (sig) {
// 處理了該信號,修改相關的數據結構
if (!collect_signal(sig, pending, info))
sig = 0;
}
recalc_sigpending();
return sig;
}
static inline int collect_signal(int sig, struct sigpending *list, siginfo_t *info)
{
struct sigqueue *q, *first = NULL;
int still_pending = 0;
// 從信號隊列中獲取一個信號,并判斷是否還有該信號需要處理,因為一類信號可能會收到多個
list_for_each_entry(q, &list->list, list) {
if (q->info.si_signo == sig) {
if (first) {
still_pending = 1;
break;
}
first = q;
}
}
if (first) {
// 移出隊列
list_del_init(&first->list);
copy_siginfo(info, &first->info);
__sigqueue_free(first);
// 如果沒有該類信號則把信號 bitmap 置 0
if (!still_pending)
sigdelset(&list->signal, sig);
}
return1;
}總結
通過前面的分析可以看到,在多線程環境中,哪個線程設置處理函數并不重要,因為都是進程內共享的,重要的是給線程還是進程發送信號,當給線程發送信號時,會在線程獨有的信號字段記錄收到的信號,該線程會在自己的執行上下文調用信號處理函數,當給進程發送信號時,會在所有線程都共享的字段中記錄收到的信號,而這個信號給哪個線程處理是不確定的,操作系統會根據情況選擇一個線程并喚醒它,然后在該線程的執行上下文處理信號時,會先判斷有沒有收到線程級的信號,如果沒有的話再判斷是否有進程級的信號,然后進行處理。























