当前位置:首页>看资讯 >门窗常识>现代操作系统中,一般采用()和()两种存储管理方法(现代操作系统由哪几个关键元素构成)

现代操作系统中,一般采用()和()两种存储管理方法(现代操作系统由哪几个关键元素构成)

2023-04-30 来源:断桥铝门窗责任编辑:断桥铝门窗 浏览数:8 门窗网

核心提示:因为现代操作系统是多处理器计算的架构,必然更容易遇到多个进程,多个线程访问共享数据的情况,如下图所示:图中每一种颜色代表一种竞态情况,主要归结为三类:进程与进程之间:单核上的抢占,多核上的SMP;进程与中断之间:中断又包含了上半部与下半部,中断总是能打断进程的执行流;中断与中断之间:外设的中断可以路由到不同的CPU上,它们之间也可能带来竞态;这时候就需要一种同步机制来保护并发访问的内存数据。本系列文章分为两部分,这一章主要讨论原子操作,自旋锁,信号量和互斥锁原子操作原子操作是在执行结束前不可打断的操作,也

现代操作系统中的原子操作——进程与中断

因为现代操作系统是多处理器计算的架构,必然更容易遇到多个进程,多个线程访问共享数据的情况,如下图所示:图中每一种颜色代表一种竞态情况,主要归结为三类:进程与进程之间:单核上的抢占,多核上的SMP;进程与中断之间:中断又包含了上半部与下半部,中断总是能打断进程的执行流;中断与中断之间:外设的中断可以路由到不同的CPU上,它们之间也可能带来竞态;这时候就需要一种同步机制来保护并发访问的内存数据。
本系列文章分为两部分,这一章主要讨论原子操作,自旋锁,信号量和互斥锁原子操作原子操作是在执行结束前不可打断的操作,也是最小的执行单位。
以 arm 平台为例,原子操作的 API 包括如下:int atomic_read(atomic_t *v)读操作void atomic_set(atomic_t *v, int i)设置变量void atomic_add(int i, atomic_t *v)增加 ivoid atomic_sub(int i, atomic_t *v)减少 ivoid atomic_inc(atomic_t *v)增加 1void atomic_dec(atomic_t *v)减少 1void atomic_inc_and_test(atomic_t *v)加 1 是否为 0void atomic_dec_and_test(atomic_t *v)减 1 是否为 0void atomic_add_negative(int i, atomic_t *v)加 i 是否为负void atomic_add_return(int i, atomic_t *v)增加 i 返回结果void atomic_sub_return(int i, atomic_t *v)减少 i 返回结果void atomic_inc_return(int i, atomic_t *v)加 1 返回void atomic_dec_return(int i, atomic_t *v)减 1 返回原子操作通常是内联函数,往往是通过内嵌汇编指令来实现的,如果某个函数本身就是原子的,它往往被定义成一个宏,以下为例。
#define ATOMIC_OP(op, c_op, asm_op) static inline void atomic_##op(int i, atomic_t *v) { unsigned long tmp; int result; prefetchw(&v->counter); __asm__ __volatile__("@ atomic_" #op "n" "1: ldrex %0, [%3]n" " " #asm_op " %0, %0, %4n" " strex %1, %0, [%3]n" " teq %1, #0n" " bne 1b" : "=&r" (result), "=&r" (tmp), "+Qo" (v->counter) : "r" (&v->counter), "Ir" (i) : "cc"); } 可见原子操作的原子性依赖于 ldrex 与 strex 实现,ldrex 读取数据时会进行独占标记,防止其他内核路径访问,直至调用 strex 完成写入后清除标记。
ldrex 和 strex 指令,是将单纯的更新内存的原子操作分成了两个独立的步骤:ldrex 用来读取内存中的值,并标记对该段内存的独占访问:ldrex Rx, [Ry]读取寄存器 Ry 指向的4字节内存值,将其保存到 Rx 寄存器中,同时标记对 Ry 指向内存区域的独占访问。
如果执行 ldrex 指令的时候发现已经被标记为独占访问了,并不会对指令的执行产生影响。
更多Linux内核视频教程文档资料免费领取后台私信怕【内核】自行获取。
strex 在更新内存数值时,会检查该段内存是否已经被标记为独占访问,并以此来决定是否更新内存中的值:strex Rx, Ry, [Rz]如果执行这条指令的时候发现已经被标记为独占访问了,则将寄存器 Ry 中的值更新到寄存器 Rz 指向的内存,并将寄存器 Rx 设置成 0。
指令执行成功后,会将独占访问标记位清除。
如果执行这条指令的时候发现没有设置独占标记,则不会更新内存,且将寄存器 Rx 的值设置成 1。
ARM 内部的实现如下所示,这里不再赘述。
自旋锁 spin_lockLinux内核中最常见的锁是自旋锁,自旋锁最多只能被一个可执行线程持有。
如果一个线程试图获取一个已被持有的自旋锁,这个线程会进行循环——旋转等待(会浪费处理器时间)锁重新可用。
自旋锁持有期间不可被抢占。
另一种处理锁争用的方式:让等待线程睡眠,直到锁重新可用时再唤醒它,这样处理器不必循环等待,可以去执行其他代码,但是这会有两次明显的上下文切换的开销,信号量便提供了这种锁机制。
自旋锁的使用接口如下:API说明spin_lock()获取指定的自旋锁spin_lock_irq()禁止本地中断并获取指定的锁spin_lock_irqsave()保存本地中断当前状态,禁止本地中断,获取指定的锁spin_unlock()释放指定的锁spin_unlock_irq()释放指定的锁,并激活本地中断spin_unlock_irqrestore()释放指定的锁,并让本地中断恢复以前状态spin_lock_init()动态初始化指定的锁spin_trylock()试图获取指定的锁,成功返回0,否则返回非0spin_is_locked()测试指定的锁是否已被占用,已被占用返回非0,否则返回0以 spin_lock 为例看下它的用法:DEFINE_SPINLOCK(mr_lock); spin_lock(&mr_lock); spin_unlock(&mr_lock); static inline void arch_spin_lock(arch_spinlock_t *lock) { unsigned int tmp; arch_spinlock_t lockval, newval; asm volatile( ARM64_LSE_ATOMIC_INSN( " prfm pstl1strm, %3n" "1: ldaxr %w0, %3n" " add %w1, %w0, %w5n" " stxr %w2, %w1, %3n" " cbnz %w2, 1bn", " mov %w2, %w5n" " ldadda %w2, %w0, %3n" __nops(3) ) " eor %w1, %w0, %w0, ror #16n" " cbz %w1, 3fn" " sevln" "2: wfen" " ldaxrh %w2, %4n" " eor %w1, %w2, %w0, lsr #16n" " cbnz %w1, 2bn" "3:" : "=&r" (lockval), "=&r" (newval), "=&r" (tmp), "+Q" (*lock) : "Q" (lock->owner), "I" (1 << TICKET_SHIFT) : "memory"); } static inline void arch_spin_unlock(arch_spinlock_t *lock) { unsigned long tmp; asm volatile(ARM64_LSE_ATOMIC_INSN( " ldrh %w1, %0n" " add %w1, %w1, #1n" " stlrh %w1, %0", " mov %w1, #1n" " staddlh %w1, %0n" __nops(1)) : "=Q" (lock->owner), "=&r" (tmp) : : "memory"); } 上边的代码中,核心逻辑在于 asm volatile() 内联汇编中,有很多独占的操作指令,只有基于指令的独占操作,才能保证软件上的互斥。
把核心逻辑翻译成 C 语言:可以看出,Linux 中针对每一个 spin_lock 有两个计数。
分别是 next 和 owner(初始值为0)。
进程 A 申请锁时,会判断 next 和 owner 的值是否相等。
如果相等就代表锁可以申请成功,否则原地自旋。
直到 owner 和 next 的值相等才会退出自旋。
信号量 Semaphore信号量是在多线程环境下使用的一种措施,它负责协调各个进程,以保证他们能够正确、合理地使用公共资源。
它和 spin_lock 最大的不同之处就是:无法获取信号量的进程可以睡眠,因此会导致系统调度。
信号量的定义如下:struct semaphore { raw_spinlock_t lock; //利用自旋锁同步 unsigned int count; //用于资源计数 struct list_head wait_list; //等待队列 };信号量在创建时设置一个初始值 count,用于表示当前可用的资源数。
一个任务要想访问共享资源,首先必须得到信号量,获取信号量的操作方式为 count - 1。
若当前 count 为负数,表明无法获得信号量,该任务必须挂起在该信号量的等待队列等待;若当前 count 为非负数,表示可获得信号量,因而可立刻访问被该信号量保护的共享资源。
当任务访问完被信号量保护的共享资源后,必须释放信号量,释放信号量是操作 count + 1,如果加一后的 count 为非正数,表明有任务等待,则唤醒所有等待该信号量的任务。
了解了信号量的结构与定义,接下来我们看下常用的信号量接口:API说明DEFINE_SEMAPHORE(name)声明信号量并初始化为 1void sema_init(struct semaphore *sem, int val)声明信号量并初始化为 valdown获得信号量,task 不可被中断,除非是致命信号down_interruptible获得信号量,task 可被中断down_trylock能够获得信号量时,count --,否则立刻返回,不加入 waitlistdown_killable获得信号量,task 可被 killup释放信号量这里我们看下最核心的两个实现 down 和 up。
downdown 用于调用者获得信号量,若 count 大于0,说明资源可用,将其减一即可。
void down(struct semaphore *sem) { unsigned long flags; raw_spin_lock_irqsave(&sem->lock, flags); if (likely(sem->count > 0)) sem->count--; else __down(sem); raw_spin_unlock_irqrestore(&sem->lock, flags); } EXPORT_SYMBOL(down);若 count < 0,调用函数 __down(),将 task 加入等待队列,并进入等待队列,并进入调度循环等待,直至其被 __up 唤醒,或者因超时以被移除等待队列。
static inline int __sched __down_common(struct semaphore *sem, long state, long timeout) { struct semaphore_waiter waiter; list_add_tail(&waiter.list, &sem->wait_list); waiter.task = current; waiter.up = false; for (;;) { if (signal_pending_state(state, current)) goto interrupted; if (unlikely(timeout <= 0)) goto timed_out; __set_current_state(state); raw_spin_unlock_irq(&sem->lock); timeout = schedule_timeout(timeout); raw_spin_lock_irq(&sem->lock); if (waiter.up) return 0; } timed_out: list_del(&waiter.list); return -ETIME; interrupted: list_del(&waiter.list); return -EINTR; } upup 用于调用者释放信号量,若 waitlist 为空,说明无等待任务,count + 1,该信号量可用。
void up(struct semaphore *sem) { unsigned long flags; raw_spin_lock_irqsave(&sem->lock, flags); if (likely(list_empty(&sem->wait_list))) sem->count++; else __up(sem); raw_spin_unlock_irqrestore(&sem->lock, flags); } EXPORT_SYMBOL(up); 若 waitlist 非空,将 task 从等待队列移除,并唤醒该 task,对应 __down 条件。
static noinline void __sched __up(struct semaphore *sem) { struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list, struct semaphore_waiter, list); list_del(&waiter->list); waiter->up = true; wake_up_process(waiter->task); } 互斥锁 mutexLinux 内核中,还有一种类似信号量的同步机制叫做互斥锁。
互斥锁类似于 count 等于 1 的信号量。
所以说信号量是在多个进程/线程访问某个公共资源的时候,进行保护的一种机制。
而互斥锁是单个进程/线程访问某个公共资源的一种保护,于互斥操作。
互斥锁有一个特殊的地方:只有持锁者才能解锁。
如下图所示:用一句话来讲信号量和互斥锁的区别,就是信号量用于线程的同步,互斥锁用于线程的互斥。
互斥锁的结构体定义:struct mutex { atomic_long_t owner; //互斥锁的持有者 spinlock_t wait_lock; //利用自旋锁同步 #ifdef CONFIG_MUTEX_SPIN_ON_OWNER struct optimistic_spin_queue osq; #endif struct list_head wait_list; //等待队列 ...... }; 其常用的接口如下所示:API说明DEFINE_MUTEX(name)静态声明互斥锁并初始化解锁状态mutex_init(mutex)动态声明互斥锁并初始化解锁状态void mutex_destroy(struct mutex *lock)销毁该互斥锁bool mutex_is_locked(struct mutex *lock)判断互斥锁是否被锁住mutex_lock获得锁,task 不可被中断mutex_unlock解锁mutex_trylock尝试获得锁,不能加锁则立刻返回mutex_lock_interruptible获得锁,task 可以被中断mutex_lock_killable获得锁,task 可以被中断mutex_lock_io获得锁,在该 task 等待琐时,它会被调度器标记为 io 等待状态上面讲的自旋锁,信号量和互斥锁的实现,都是使用了原子操作指令。
由于原子操作会 lock,当线程在多个 CPU 上争抢进入临界区的时候,都会操作那个在多个 CPU 之间共享的数据 lock。
CPU 0 操作了 lock,为了数据的一致性,CPU 0 的操作会导致其他 CPU 的 L1 中的 lock 变成 invalid,在随后的来自其他 CPU 对 lock 的访问会导致 L1 cache miss(更准确的说是communication cache miss),必须从下一个 level 的 cache 中获取。
这就会使缓存一致性变得很糟,导致性能下降。
所以内核提供一种新的同步方式:RCU(读-复制-更新)。
RCU 解决了什么RCU 是读写锁的高性能版本,它的核心理念是读者访问的同时,写者可以更新访问对象的副本,但写者需要等待所有读者完成访问之后,才能删除老对象。
读者没有任何同步开销,而写者的同步开销则取决于使用的写者间同步机制。
RCU 适用于需要频繁的读取数据,而相应修改数据并不多的情景,例如在文件系统中,经常需要查找定位目录,而对目录的修改相对来说并不多,这就是 RCU 发挥作用的最佳场景。
RCU 例子RCU 常用的接口如下图所示:API说明rcu_read_lock标记读者进入读端临界区rcu_read_unlock标记读者退出临界区synchronize_rcu同步RCU,即所有的读者已经完成读端临界区,写者才可以继续下一步操作。
由于该函数将阻塞写者,只能在进程上下文中使用call_rcu把回调函数 func 注册到RCU回调函数链上,然后立即返回rcu_assign_pointer用于RCU指针赋值rcu_dereference用于RCU指针取值list_add_rcu向RCU注册一个链表结构list_del_rcu从RCU移除一个链表结构为了更好的理解,在剖析 RCU 之前先看一个例子:#include <linux/kernel.h> #include <linux/module.h> #include <linux/init.h> #include <linux/slab.h> #include <linux/spinlock.h> #include <linux/rcupdate.h> #include <linux/kthread.h> #include <linux/delay.h> struct foo { int a; struct rcu_head rcu; }; static struct foo *g_ptr; static int myrcu_reader_thread1(void *data) //读者线程1 { struct foo *p1 = NULL; while (1) { if(kthread_should_stop()) break; msleep(20); rcu_read_lock(); mdelay(200); p1 = rcu_dereference(g_ptr); if (p1) printk("%s: read a=%dn", __func__, p1->a); rcu_read_unlock(); } return 0; } static int myrcu_reader_thread2(void *data) //读者线程2 { struct foo *p2 = NULL; while (1) { if(kthread_should_stop()) break; msleep(30); rcu_read_lock(); mdelay(100); p2 = rcu_dereference(g_ptr); if (p2) printk("%s: read a=%dn", __func__, p2->a); rcu_read_unlock(); } return 0; } static void myrcu_del(struct rcu_head *rh) //回收处理操作 { struct foo *p = container_of(rh, struct foo, rcu); printk("%s: a=%dn", __func__, p->a); kfree(p); } static int myrcu_writer_thread(void *p) //写者线程 { struct foo *old; struct foo *new_ptr; int value = (unsigned long)p; while (1) { if(kthread_should_stop()) break; msleep(250); new_ptr = kmalloc(sizeof (struct foo), GFP_KERNEL); old = g_ptr; *new_ptr = *old; new_ptr->a = value; rcu_assign_pointer(g_ptr, new_ptr); call_rcu(&old->rcu, myrcu_del); printk("%s: write to new %dn", __func__, value); value++; } return 0; } static struct task_struct *reader_thread1; static struct task_struct *reader_thread2; static struct task_struct *writer_thread; static int __init my_test_init(void) { int value = 5; printk("figo: my module initn"); g_ptr = kzalloc(sizeof (struct foo), GFP_KERNEL); reader_thread1 = kthread_run(myrcu_reader_thread1, NULL, "rcu_reader1"); reader_thread2 = kthread_run(myrcu_reader_thread2, NULL, "rcu_reader2"); writer_thread = kthread_run(myrcu_writer_thread, (void *)(unsigned long)value, "rcu_writer"); return 0; } static void __exit my_test_exit(void) { printk("goodbyen"); kthread_stop(reader_thread1); kthread_stop(reader_thread2); kthread_stop(writer_thread); if (g_ptr) kfree(g_ptr); } MODULE_LICENSE("GPL"); module_init(my_test_init); module_exit(my_test_exit); 执行结果是:myrcu_reader_thread2: read a=0myrcu_reader_thread1: read a=0myrcu_reader_thread2: read a=0myrcu_writer_thread: write to new 5myrcu_reader_thread2: read a=5myrcu_reader_thread1: read a=5myrcu_del: a=0RCU 原理可以用下面一张图来总结,当写线程 myrcu_writer_thread 写完后,会更新到另外两个读线程 myrcu_reader_thread1 和 myrcu_reader_thread2。
读线程像是订阅者,一旦写线程对临界区有更新,写线程就像发布者一样通知到订阅者那里,如下图所示。
写者在拷贝副本修改后进行 update 时,首先把旧的临界资源数据移除(Removal);然后把旧的数据进行回收(Reclamation)。
结合 API 实现就是,首先使用 rcu_assign_pointer 来移除旧的指针指向,指向更新后的临界资源;然后使用 synchronize_rcu 或 call_rcu 来启动 Reclaimer,对旧的临界资源进行回收(其中 synchronize_rcu 表示同步等待回收,call_rcu 表示异步回收)。
为了确保没有读者正在访问要回收的临界资源,Reclaimer 需要等待所有的读者退出临界区,这个等待的时间叫做宽限期(Grace Period)。
Grace Period中间的黄色部分代表的就是 Grace Period,中文叫做宽限期,从 Removal 到 Reclamation,中间就隔了一个宽限期,只有当宽限期结束后,才会触发回收的工作。
宽限期的结束代表着 Reader 都已经退出了临界区,因此回收工作也就是安全的操作了。
宽限期是否结束,与 CPU 的执行状态检测有关,也就是检测静止状态 Quiescent Status。
Quiescent StatusQuiescent Status,用于描述 CPU 的执行状态。
当某个 CPU 正在访问 RCU 保护的临界区时,认为是活动的状态,而当它离开了临界区后,则认为它是静止的状态。
当所有的 CPU 都至少经历过一次 Quiescent Status 后,宽限期将结束并触发回收工作。
因为 rcu_read_lock 和 rcu_read_unlock 分别是关闭抢占和打开抢占,如下所示:static inline void __rcu_read_lock(void) { preempt_disable(); }static inline void __rcu_read_unlock(void) { preempt_enable(); }所以发生抢占,就说明不在 rcu_read_lock 和 rcu_read_unlock 之间,即已经完成访问或者还未开始访问。
Linux 同步方式的总结机制等待机制优缺场景原子操作无;ldrex 与 strex 实现内存独占访问性能相当高;场景受限资源计数自旋锁忙等待;唯一持有多处理器下性能优异;临界区时间长会浪费中断上下文信号量睡眠等待(阻塞);多数持有相对灵活,适用于复杂情况;耗时长情况复杂且耗时长的情景;比如内核与用户空间的交互互斥锁睡眠等待(阻塞);优先自旋等待;唯一持有较信号量高效,适用于复杂场景;存在若干限制条件满足使用条件下,互斥锁优先于信号量RCU绝大部分为读而只有极少部分为写的情况下,它是非常高效的;但延后释放内存会造成内存开销,写者阻塞比较严重读多写少的情况下,对内存消耗不敏感的情况下,满足 RCU 条件的情况下,优先于读写锁使用;对于动态分配数据结构这类引用计数的机制,也有高性能的表现。

打赏
分享到:
0相关评论
阅读上文 >> 姣忔棩鍗曡瘝绉疮(姣忔棩鍗曡瘝)
阅读下文 >> 勇闯英格兰与两名壮汉的博弈(英格兰勇夺世界杯)

大家喜欢看的

  • 品牌
  • 资讯
  • 展会
  • 视频
  • 图片
  • 供应
  • 求购
  • 商城

版权与免责声明:

凡注明稿件来源的内容均为转载稿或由企业用户注册发布,本网转载出于传递更多信息的目的;如转载稿涉及版权问题,请作者联系我们,同时对于用户评论等信息,本网并不意味着赞同其观点或证实其内容的真实性;


本文地址:http://www.menchuang.net/news/149292.html

转载本站原创文章请注明来源:门窗网

推荐新闻

更多

行业专题

更多行业专题

微信“扫一扫”
即可分享此文章

友情链接

门窗网 【测试站】(c)2008-2022 MenChuang.net SYSTEM All Rights Reserved

服务热线: ICP备案号:陕ICP备2022013085号