logo

6.同步(锁、信号量)底层原理

作者
Modified on
Reading time
14 分钟阅读:..评论:..

前言

由于在接下来的学习内容中涉及到对信息、数据的同步,我们在开发操作系统的时候必须考虑到并发、多核情况下的同步问题,要想解决这些问题,实际应用中一般采用锁:原子变量、关中断、信号量、自旋锁。 那么其底层如何实现的呢? ——让我们跟着彭老师的讲解,一起学习! 以对一个全局变量进行操作为例,下述代码是一个中断处理函数和逻辑函数,它们都是对全局变量a进行操作

int a = 0; void interrupt_handle() { a++; } void thread_func() { a++; }

对于a++,在编译阶段一般是将其分为三步:

  • 将a存入某寄存器
  • 该寄存器值+1
  • 将寄存器的值刷新回内存

**但是,**如果在第二步完成之后突然发生中断了该如何?由时刻为标尺对a的值进行观测:

可以看到,由于中断的发生将thread_func的执行过程强行掐断,最终a的值为1,但其实它的值应该为2。 那么如何解决这个问题? 一是,将a++这个操作变成不可分割,即无法再拆解成3步执行,即是原子操作; 二是,在执行a++这个操作时,关闭中断,执行完毕之后再打开中断

原子操作

原子操作需要底层操作系统支持,X86 CPU支持许多原子指令C语言正是通过嵌入汇编代码调用这些原子指令来实现原子操作,而Java是在JVM层面对原子操作进行了实现。 另外,内嵌汇编代码编写格式的学习可参考:AT&T格式汇编代码,也可参考文末的补充部分进行理解。 实现原子读和原子写等:

//定义一个原子类型 typedef struct s_ATOMIC{ volatile s32_t a_count; //禁止编译器优化,使其每次都从内存中加载变量 }atomic_t; //原子读 static inline s32_t atomic_read(const atomic_t *v) { //x86平台取地址处是原子 return (*(volatile u32_t*)&(v)->a_count); } //原子写 static inline void atomic_write(atomic_t *v, int i) { //x86平台把一个值写入一个地址处也是原子的 v->a_count = i; } //原子加上一个整数 static inline void atomic_add(int i, atomic_t *v) { __asm__ __volatile__("lock;" "addl %1,%0" : "+m" (v->a_count) : "ir" (i)); } //原子减去一个整数 static inline void atomic_sub(int i, atomic_t *v) { __asm__ __volatile__("lock;" "subl %1,%0" : "+m" (v->a_count) : "ir" (i)); } //原子加1 static inline void atomic_inc(atomic_t *v) { __asm__ __volatile__("lock;" "incl %0" : "+m" (v->a_count)); } //原子减1 static inline void atomic_dec(atomic_t *v) { __asm__ __volatile__("lock;" "decl %0" : "+m" (v->a_count)); }

注意,“lock”前缀是多核CPU下保证数据同步的指令,该指令会锁住数据总线,防止其他CPU更改对应内存地址的值,单核CPU不需要,更详细内容查看内存屏障、validate

在使用原子操作的情况下,原本对全局变量的操作就可以改成:

atomic_t a = {0}; void interrupt_handle() { atomic_inc(&a); } void thread_func() { atomic_inc(&a); }

关中断

上述原子操作虽然可以完成同步操作,但是只能对付一些简单的单体变量,对于复杂的数据结构,如果使用原子操作,可想而知代码的复杂程度有多大。 在这个时候可以考虑通过关闭中断,从而实现相应的代码控制x86平台上的CPU关闭中断、开启中断指令是cli、sti,其主要是对CPU的eflags寄存的IF位进行设置,CPU据此来决定是否响应中断信号。 简单的关、开中断代码:

//关闭中断 void hal_cli() { __asm__ __volatile__("cli": : :"memory"); } //开启中断 void hal_sti() { __asm__ __volatile__("sti": : :"memory"); } //使用场景 void foo() { hal_cli(); //操作数据…… hal_sti(); } void bar() { hal_cli(); //操作数据…… hal_sti(); }

上述方式的问题就在于无法嵌套使用:

void foo() { hal_cli(); //操作数据第一步…… hal_sti(); } void bar() { hal_cli(); foo(); //操作数据第二步…… hal_sti(); }

当从foo()函数返回到bar()时,这时中断已经被打开,但是bar()不知道!如果继续执行操作数据第二步,那么就可能因为其他线程函数的访问造成数据不一致问题。 为了让中断管理可以嵌套,就需要在关闭、开启中断的时候保存之前的状态,如下:

typedef u32_t cpuflg_t; static inline void hal_save_flags_cli(cpuflg_t* flags) { __asm__ __volatile__( "pushfl \t\n" //把eflags寄存器压入当前栈顶 "cli \t\n" //关闭中断 "popl %0 \t\n"//把当前栈顶弹出到flags为地址的内存中 : "=m"(*flags) : : "memory" ); } static inline void hal_restore_flags_sti(cpuflg_t* flags) { __asm__ __volatile__( "pushl %0 \t\n"//把flags为地址处的值寄存器压入当前栈顶 "popfl \t\n" //把当前栈顶弹出到flags寄存器中 : : "m"(*flags) : "memory" ); }

简单来说,在关闭中断的时候,把之前的状态存入地址为flag的内存中;在开启中断时,究竟是否开启中断,是由flag地址中存储的值来确定的!(即进行关闭中断操作之前的中断状态)

注:内存中的flag只是用来保存的,CPU是否中断由寄存器中的值而定。 注:注意区分中断与子程序调用的区别。 注:这里没有区分中断的优先级,但是实际的操作系统中,低级中断应该被高级中断所打断

自旋锁

中断完美解决了原子操作只能针对单体变量的情况,但是——中断只能控制单核CPU,在多核CPU的情况下,又会遇到并发冲突的问题了,这个时候就需要使用“自旋锁”。 自旋锁原理如下:

上述流程有一个必须保证的点:读取锁变量、判断加锁操作必须是原子操作,否则还是会造成并发错误 好在x86 提供了一个**原子交换指令:**xchg——让寄存器的值和内存空间的值进行交换。 根据上述流程图,将自旋锁实现如下:

//自旋锁结构 typedef struct { //volatile可以防止编译器优化 //保证其它代码始终从内存加载lock变量的值 volatile u32_t lock; } spinlock_t; //锁初始化函数 static inline void x86_spin_lock_init(spinlock_t * lock) { lock->lock = 0;//锁值初始化为0是未加锁状态 } //加锁函数 static inline void x86_spin_lock(spinlock_t * lock) { __asm__ __volatile__ ( "1: \n" "lock; xchg %0, %1 \n"//把值为1的寄存器和lock内存中的值进行交换 "cmpl $0, %0 \n" //用0和交换回来的值进行比较 "jnz 2f \n" //不等于0则跳转后面2标号处运行 "jmp 3f \n" //若等于0则跳转后面3标号处返回 "2: \n" "cmpl $0, %1 \n"//用0和lock内存中的值进行比较 "jne 2b \n"//若不等于0则跳转到前面2标号处运行继续比较 "jmp 1b \n"//若等于0则跳转到前面1标号处运行,交换并加锁 "3: \n" : : "r"(1), "m"(*lock)); } //解锁函数 static inline void x86_spin_unlock(spinlock_t * lock) { __asm__ __volatile__( "movl $0, %0\n"//解锁把lock内存中的值设为0就行 : : "m"(*lock)); }

其中加锁函数的逻辑部分要好好理解,它通过转移指令形成了一个循环判断的逻辑,直到加锁才会退出。

注意,代码中汇编部分:   ** : "r"(1), "m"(*lock)**——系统分配一个寄存器,填入1;取内存地址为lock的值;而后xchg %0,%1即是将两者的值进行交换。

遗憾的是上述代码存在中断嵌套的问题:“如果一个CPU获取自旋锁后发生中断,中断代码里也尝试获取自旋锁,那么自旋锁永远不会被释放,发生死锁。” 关于自旋锁与中断的详细解释可以参考:Linux内核死锁,其中关于自旋锁与中断嵌套导致的一个经典场景叙述如下: “考虑下面的场景(中断上下文场景): 

  • 运行在CPU0上的**进程A在某个系统调用过程中访问了共享资源 R **
  • 运行在CPU1上的进程B在某个系统调用过程中也访问了共享资源 R 
  • 外设P的中断handler中也会访问共享资源 R 

在这样的场景下,使用spin lock可以保护访问共享资源R的临界区吗?  我们假设CPU0上的进程A持有spin lock进入临界区,这时候,外设P发生了中断事件,并且调度到了CPU1上执行,看起来没有什么问题,执行在CPU1上的handler会稍微等待一会CPU0上的进程A,等它离开临界区就会释放spin lock的,但是,如果外设P的中断事件被调度到了同一个CPU0上执行会怎么样? CPU0上的进程A在持有spin lock的状态下被中断上下文抢占,而抢占它的CPU0上的handler在进入临界区之前仍然会试图获取spin lock,悲剧发生了,CPU0上的P外设的中断handler永远的进入spin状态,这时候,CPU1上的进程B也不可避免在试图持有spin lock的时候失败而导致进入spin状态。 为了解决这样的问题,linux kernel采用了这样的办法:如果涉及到中断上下文的访问,spin lock需要和禁止本 CPU 上的中断联合使用。 ”

关于这一点的解决方式可以参考关中断,详细讲解请看:08 锁提示一下,获取自旋锁的时候,干脆把中断关闭了就好,这样就不会导致中断嵌套。 经修改后,可以实现关中断下获取自旋锁,以及恢复中断状态释放自旋锁

static inline void x86_spin_lock_disable_irq(spinlock_t * lock ,cpuflg_t* flags) { __asm__ __volatile__( "pushfq \n\t" "cli \n\t" "popq %0 \n\t" "1: \n\t" "lock; xchg %1, %2 \n\t" "cmpl $0,%1 \n\t" "jnz 2f \n\t" "jmp 3f \n" "2: \n\t" "cmpl $0,%2 \n\t" "jne 2b \n\t" "jmp 1b \n\t" "3: \n" :"=m"(*flags) : "r"(1), "m"(*lock)); } static inline void x86_spin_unlock_enabled_irq(spinlock_t* lock ,cpuflg_t* flags) { __asm__ __volatile__( "movl $0, %0\n\t" "pushq %1 \n\t" "popfq \n\t" : : "m"(*lock), "m"(*flags)); }

代码中的cpuflg表示当前的中断状态。

信号量

以上三种解决同步的方式都不适合长等待,利用自旋锁这种方式去获取需要一定时间准备的资源,并且会造成CPU的时间消耗。 试想,能不能有一种机制,当资源准备好了之后,提醒CPU去获取呢? 还真有,那就是在1965年由荷兰学者Edsger Dijkstra(没错,就是提出那个算法的男人)提出的信号量机制。 信号量机制由三个环节组成

  • 等待:程序等待资源准备好
  • 互斥:同时只有一个程序可以访问资源
  • 唤醒:资源准备好之后唤醒固定程序

由于需要等待、互斥等操作,拟定一个数据结构如下:

//等待链数据结构,用于挂载等待代码执行流(线程)的结构 //里面有用于挂载代码执行流的链表和计数器变量,后续会讲 typedef struct s_KWLST { spinlock_t wl_lock;//等待链表的锁 uint_t wl_tdnr;//计数器 list_h_t wl_list;//等待进程的链表 }kwlst_t; //信号量数据结构 typedef struct s_SEM { spinlock_t sem_lock;//维护sem_t自身数据的自旋锁 uint_t sem_flg;//信号量相关的标志 sint_t sem_count;//信号量计数值 kwlst_t sem_waitlst;//用于挂载等待代码执行流(线程)结构 }sem_t;

想想信号量一般是怎么使用的呢?

  1. 获取信号量

将信号量自身加锁如果信号值sem_count小于0,则将当前进程放入等待链;否则,对信号量执行“减一”,获取信号量成功,进入代码执行流程; 2.  代码执行 成功获取信号量之后,程序进行自己相应的操作逻辑。 3.  释放信号量 将信号量自身加锁,对信号值sem_count执行“加一”,如果大于0,则从等待链中唤醒一个进程;无论是否大于0,最终即完成信号量的释放 从上述流程可以看出,信号量其实就是一个“多人使用,用完放回”的场景。 根据以上分析一个简单的信号量实现如下:

//获取信号量 void krlsem_down(sem_t* sem) { cpuflg_t cpufg; start_step: //之前自旋锁的封装 krlspinlock_cli(&sem->sem_lock,&cpufg); if(sem->sem_count<1) {//如果信号量值小于1,则让代码执行流(线程)睡眠 krlwlst_wait(&sem->sem_waitlst); //之前自旋锁的封装 krlspinunlock_sti(&sem->sem_lock,&cpufg); //切换代码执行流,下次恢复执行时依然从下一行开始执行 //所以要goto开始处重新获取信号量进行判断 krlschedul(); goto start_step; } sem->sem_count--;//信号量值减1,表示成功获取信号量 //之前自旋锁的封装 krlspinunlock_sti(&sem->sem_lock,&cpufg); return; } //释放信号量 void krlsem_up(sem_t* sem) { cpuflg_t cpufg; //之前自旋锁的封装 krlspinlock_cli(&sem->sem_lock,&cpufg); sem->sem_count++;//释放信号量 if(sem->sem_count<1) {//如果小于1,则说数据结构出错了,挂起系统 krlspinunlock_sti(&sem->sem_lock,&cpufg); hal_sysdie("sem up err"); } //唤醒该信号量上所有等待的代码执行流(线程) krlwlst_allup(&sem->sem_waitlst); krlspinunlock_sti(&sem->sem_lock,&cpufg); krlsched_set_schedflgs(); return; }

其中krlschedul、krlwlst_wait、krlwlst_allup、krlsched_set_schedflgs是负责进程调度的相关函数,会在之后的进程章节进行讲解,敬请期待!

拓展:Linux的同步机制

Linux上实现数据同步主要有五个工具:原子变量、中断控制、自旋锁、信号量、读写锁。重点如下:

有关这部分的实现代码及讲解可以参考:09. Linux锁机制。

补充:GCC内嵌汇编代码

在 C 代码中嵌入汇编语句要比"纯粹"的汇编语言代码复杂得多,因为需要解决如何分配寄存器,以及如何与C代码中的变量相结合等问题。 通常嵌入到 C 代码中的汇编语句很难做到与其它部分没有任何关系,因此很多时候需要用到完整的内联汇编格式

__asm__("asm statements" : outputs : inputs : registers-modified);

内嵌汇编格式分为四个部分,其中每个部分用“:”符号来隔断,每个部分中的各个操作用“;”来区分。

  • asm statements:汇编代码部分,即指令部分,操作寄存器需要带上%,操作立即数需要带上$;
  • output:输出列表部分,包括寄存器、内存,=号来连接;
  • Input:输入列表部分,包括寄存器、内存、立即数等;
  • register-modified:寄存器修改列表部分,这些寄存器在汇编代码中使用了,但是不属于输入和输出列表,因此对寄存器进行说明,以便GCC能够采用相应的措施,比如不用它们来保存其他的数据。

我们以下面的代码为例:

int main() { int a = 10, b = 0; __asm__ __volatile__( ;这两行都是指令部分,%1%0分别表示操作第二个、第一个“样板”操作数 "movl %1, %%eax;\\n\\r" "movl %%eax, %0;" ;从输出部分开始计数,这就是第一个“样板”操作数:b ;不过标号为0 :"=r"(b) ;这是输入部分,第二个样板操作数:a,标号为1 :"r"(a) ;这是修改寄存器部分,由于用到了eax,告诉gcc在执行过程中该寄存器不能 ;保存其他值 :"%eax"); printf("Result: %d, %d\\n", a, b); }

上述代码完成操作:将a的值赋值给b。

参考链接

08 | 锁:并发操作中,解决数据同步的四种方法 AT&T格式汇编 信号量和条件变量的区别