驱动程序中的并发与控制(一)
驱动程序中的并发与控制(二)
驱动程序中的并发与控制(三)
信号量(semaphore)
相对于自旋锁,信号量的最大特点是允许调用它的线程进入睡眠状态。
信号量的定义与初始化
/* include\linux\semaphore.h */
struct semaphore {
raw_spinlock_t lock;
unsigned int count;
struct list_head wait_list;
};
其中,lock是个自旋锁变量,用于实现对信号量的另一个成员count的原子操作。无符号整型变量count用于表示通过该信号量允许进入临界区进程的个数。
wait_list用于管理所有在该信号量上睡眠的进程,无法获得该信号量的进程将进入睡眠状态。
如果驱动程序中定义了一个struct semaphore型的信号量变量,需要注意的是不要直接对该变量的成员进行赋值,而应该使用sema_init函数来初始化该信号量。sema_init函数定义如下:
/* include\linux\semaphore.h */
static inline void sema_init(struct semaphore *sem, int val)
{
static struct lock_class_key __key;
*sem = (struct semaphore) __SEMAPHORE_INITIALIZER(*sem, val);
lockdep_init_map(&sem->lock.dep_map, "semaphore->lock", &__key, 0);
}
初始化主要通过__SEMAPHORE_INITIALIZER宏完成:
#define __SEMAPHORE_INITIALIZER(name, n) \
{ \
.lock = __RAW_SPIN_LOCK_UNLOCKED((name).lock), \
.count = n, \
.wait_list = LIST_HEAD_INIT((name).wait_list), \
}
所以sema_init(struct semaphore *sem, int val)调用会把信号量sem的lock值设定为解锁状态,count值设定为函数的调用参数val,同时初始化wait_list链表头。
down操作
信号量上的主要操作是DOWN和UP,在Linux内核中对信号量的DOWN操作有:
void down(struct semaphore *sem)
int down_interruptible(struct semaphore *sem)
int down_killable(struct semaphore *sem)
int down_trylock(struct semaphore *sem)
int down_timeout(struct semaphore *sem, long timeout)
上面这些函数中,驱动程序使用最频繁的是down_interruptible函数,这里重点讨论该函数,之后再对其他DOWN操作的功能作概述性的描述。
down_interruptible函数的定义如下:
/* kernel/locking/semaphore.c */
int down_interruptible(struct semaphore *sem)
{
unsigned long flags;
int result = 0;
raw_spin_lock_irqsave(&sem->lock, flags); //自旋锁上锁,保证对sem->count操作的原子性
if (likely(sem->count > 0)) // sem->count大于0表示有资源,允许获得锁,就将count值减1
sem->count--;
else
result = __down_interruptible(sem); //无资源,无法获得锁则进入睡眠
raw_spin_unlock_irqrestore(&sem->lock, flags);
return result;
}
static noinline int __sched __down_interruptible(struct semaphore *sem)
{
//TASK_INTERRUPTIBLE表示进程能被中断唤醒
return __down_common(sem, TASK_INTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
}
static inline int __sched __down_common(struct semaphore *sem, long state,
long timeout)
{
struct task_struct *task = current;
struct semaphore_waiter waiter;
/* 通过一个struct semaphore_waiter变量waiter,把当前进程放
到信号量sem的成员变量wait_list所管理的队列中 */
list_add_tail(&waiter.list, &sem->wait_list);
waiter.task = task;
waiter.up = false;
for (;;) {
if (signal_pending_state(state, task)) //判定是否被用户空间发送的信号所中断
goto interrupted;
if (unlikely(timeout <= 0)) //超时引起的唤醒
goto timed_out;
__set_task_state(task, state); //将当前进程设置为TASK_INTERRUPTIBLE
raw_spin_unlock_irq(&sem->lock); //释放spinlock,否则其他进程无法释放信号量
timeout = schedule_timeout(timeout); //主动启动调度
raw_spin_lock_irq(&sem->lock); //被唤醒后,获得spinlock
/* 如果waiter.up不为0,说明进程在信号量sem的wait_list队列中被该信号量的UP操作所唤醒,进程可以获得信号量,返回0 */
if (waiter.up)
return 0;
}
timed_out:
list_del(&waiter.list);
return -ETIME;
interrupted:
list_del(&waiter.list);
return -EINTR;
}
调用down_interruptible函数成功获得信号量的话,返回值为0,如果进程是因为被用户空间发送的信号所中断或者是超时引起的唤醒,则返回相应的错误代码。除了调用down_interruptible函数获取信号量外,还可以调用down函数等。与down_interruptible相比,down函数是不可中断的,这意味着调用它的进程如果无法获得信号量,将一直处于睡眠状态直到有别的进程释放了该信号量。从用户空间的角度,如果应用程序阻塞在了驱动程序的down函数中,将无法通过一些强制措施比如按Ctrl+D 组合键等来结束该进程。因此,除非必要,否则驱动程序中应该避免使用down函数。
void down(struct semaphore *sem)
/* 睡眠的进程可以因收到一些致命性信号(fatal signal)被唤醒而导致获取信号量的操作被中断,在驱动程序中极少使用 */
int down_killable(struct semaphore *sem)
/* 进程试图获得信号量,但若无法获得信号量则直接返冋1,而不进入睡眠状态,返冋0意味着函数的调用者已经获得了信号量 */
int down_trylock(struct semaphore *sem)
/* 函数在无法获得信号量的情况下将进入睡眠状态,但是处于这种睡眠状态有时间限制, 如果在jiffies指明的时间到期时函数依然无法获得信号量,则将返回错误码-ETIME,在到期前进程的睡眠状态为TASK_UNINTERRUPTIBLE, 成功获得信号量的函数返回0 */
int down_timeout(struct semaphore *sem, long timeout)
UP操作
相对众多版本的DOWN操作,Linux下只有一个UP函数:
/* kernel/locking/semaphore.c */
void up(struct semaphore *sem)
{
unsigned long flags;
raw_spin_lock_irqsave(&sem->lock, flags);
/* 如果信号量sem的wait_list队列为空,则表明没有其他进程正在等待该信号量,那么只要把sem的count加1即可 */
if (likely(list_empty(&sem->wait_list)))
sem->count++;
else
__up(sem); /* 如果wait_list队列不为空,则说明有其他进程正睡眠在wait_list上等待该信号量,此时调用__up(sem)来唤醒进程 */
raw_spin_unlock_irqrestore(&sem->lock, flags);
}
static noinline void __sched __up(struct semaphore *sem)
{
struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list,
struct semaphore_waiter, list);
list_del(&waiter->list); //从链表中删除
waiter->up = true;
wake_up_process(waiter->task); //唤醒进程
}
为了让讨论具体化,下面以一个例子来说明,假设一个信号量sem的count=2,说明允许有两个进程进入临界区,假设有进程A、B、C、D和E先后调用down_interruptible来获得信号量,那么进程A和B将得到信号量进入临界区,C、D和E将睡眠在sem的wait_list中。此时的情形如下图所示:
__up函数首先用list_first_entiy取得sem->wait_list链表上的第一个waiter节点C,然后将其从链表中删除,waiter->up = true , 最后调用wake_up_process来唤醒waiter C上的进程C。这样进程C将从之前down_interruptible调用中的timeout = schedule_timeout(timeout)处醒来,waiter->up = true, down_interruptible返回0,进程C获得信号量,进程D和E继续等待直到有进程释放信号量或者被用户空间中断掉。
即使不是信号量的拥有者,也可以调用up函数来释放一个信号量,这点与下面介绍的mutex是不同的。
互斥锁mutex
用count=1的信号量实现的互斥方法还不是Linux下经典的用法,Linux内核针对count=1的信号量重新定义了一个新的数据结构struct mutex,—般都称其为互斥锁或者互斥体。
互斥锁的定义与初始化
互斥锁mutex的概念本来就来自semaphore,struct mutex和struct semaphore并没有本质的不同:
/* include/linux/mutex.h */
struct mutex {
/* 1: unlocked, 0: locked, negative: locked, possible waiters */
atomic_t count;
spinlock_t wait_lock;
struct list_head wait_list;
#if defined(CONFIG_DEBUG_MUTEXES) || defined(CONFIG_MUTEX_SPIN_ON_OWNER)
struct task_struct *owner;
#endif
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
#ifdef CONFIG_DEBUG_MUTEXES
void *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};
如同struct semaphore一样,对struct mutex的初始化不能直接通过操作其成员变量的方式进行,而应该利用内核提供的宏或者函数。
# define mutex_init(mutex) \
do { \
static struct lock_class_key __key; \
\
__mutex_init((mutex), #mutex, &__key); \
} while (0)
/* kernel/locking/mutex.c */
void __mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key)
{
atomic_set(&lock->count, 1);
spin_lock_init(&lock->wait_lock);
INIT_LIST_HEAD(&lock->wait_list);
mutex_clear_owner(lock);
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
osq_lock_init(&lock->osq);
#endif
debug_mutex_init(lock, name, key);
}
互斥锁的DOWN操作
互斥锁mutex上的DOWN操作在Linux内核中为mutex_lock函数,定义如下:
void __sched mutex_lock(struct mutex *lock)
{
might_sleep();
/*
* The locking fastpath is the 1->0 transition from
* 'unlocked' into 'locked' state.
*/
__mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath);
mutex_set_owner(lock);
}
函数的设计思想体现在__mutex_fastpath_lock和__mutex_lock_slowpath两条主线上,__mutex_fastpath_lock用来快速判断当前可否获得互斥锁,如果成功获得锁,则函数直接返回,否则进入到__mutex_lock_slowpath函数中。
__mutex_fastpath_lock是一平台相关函数,下面以ARM处理器(ARMv6及其以上指令集)为例,分析其代码实现:
static inline void __mutex_fastpath_lock(atomic_t *count, void (*fail_fn)(atomic_t *))
{
//对原子变量减1,判断是否小于0,如果小于0表示未获得锁
if (unlikely(atomic_dec_return_acquire(count) < 0))
fail_fn(count); //未获得锁,执行__mutex_lock_slowpath
}
__visible void __sched __mutex_lock_slowpath(atomic_t *lock_count)
{
struct mutex *lock = container_of(lock_count, struct mutex, count);
__mutex_lock_common(lock, TASK_UNINTERRUPTIBLE, 0,
NULL, _RET_IP_, NULL, 0);
}
static __always_inline int __sched
__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
struct lockdep_map *nest_lock, unsigned long ip,
struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
struct task_struct *task = current;
struct mutex_waiter waiter;
unsigned long flags;
int ret;
//use_ww_ctx为0,不走该分支
if (use_ww_ctx) {
struct ww_mutex *ww = container_of(lock, struct ww_mutex, base);
if (unlikely(ww_ctx == READ_ONCE(ww->ctx)))
return -EALREADY;
}
//禁止抢占
preempt_disable();
mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);
if (mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) {
/* got the lock, yay! */
preempt_enable();
return 0;
}
//锁上自旋锁
spin_lock_mutex(&lock->wait_lock, flags);
/*如果count为1表示此时有进程释放mutex,把count设为0,代表当前锁被我持有,然后跳出等待执行临界区代码,这里要注意了,如果mutex的wait_lock有进程睡眠的话,那么进程在释放mutex时会唤醒mutex的wait_lock链表的第一个进程,但是当前mutex被我持有,所以唤醒的进程要判断count的值,如果为0,那么表示有人在访问临界资源了,那么继续睡眠*/
if (!mutex_is_locked(lock) &&
(atomic_xchg_acquire(&lock->count, 0) == 1))
goto skip_wait;
debug_mutex_lock_common(lock, &waiter);
debug_mutex_add_waiter(lock, &waiter, task);
/* 把当前进程放入mutex的wait_list */
list_add_tail(&waiter.list, &lock->wait_list);
waiter.task = task;
lock_contended(&lock->dep_map, ip);
for (;;) {
/* 如果count为0,代表着此时有一个进程在访问临界资源,将count设置-1,表示有别的进程访问临界资源。
如果count为1,表示当前metex无进程在竞争,那么将count设为-1,获得mutex,退出访问临界资源
*/
if (atomic_read(&lock->count) >= 0 &&
(atomic_xchg_acquire(&lock->count, -1) == 1))
break;
//被信号唤醒,那就退出
if (unlikely(signal_pending_state(state, task))) {
ret = -EINTR;
goto err;
}
if (use_ww_ctx && ww_ctx->acquired > 0) {
ret = __ww_mutex_lock_check_stamp(lock, ww_ctx);
if (ret)
goto err;
}
//设置进程状态为TASK_UNINTERRUPTIBLE
__set_task_state(task, state);
//释放自旋锁让别的进程可以访问wait_lock
spin_unlock_mutex(&lock->wait_lock, flags);
schedule_preempt_disabled(); //进程睡眠:
spin_lock_mutex(&lock->wait_lock, flags);
}
__set_task_state(task, TASK_RUNNING); //设置进程状态为TASK_RUNNING
//将进程从mutex的wait_list中删除
mutex_remove_waiter(lock, &waiter, task);
/* 如何wait_list没有其他进程等待了,则count设为0 */
if (likely(list_empty(&lock->wait_list)))
atomic_set(&lock->count, 0);
debug_mutex_free_waiter(&waiter);
skip_wait:
/* got the lock - cleanup and rejoice! */
lock_acquired(&lock->dep_map, ip);
mutex_set_owner(lock);
if (use_ww_ctx) {
struct ww_mutex *ww = container_of(lock, struct ww_mutex, base);
ww_mutex_set_context_slowpath(ww, ww_ctx);
}
spin_unlock_mutex(&lock->wait_lock, flags);
preempt_enable(); //使能抢占
return 0;
err:
mutex_remove_waiter(lock, &waiter, task);
spin_unlock_mutex(&lock->wait_lock, flags);
debug_mutex_free_waiter(&waiter);
mutex_release(&lock->dep_map, 1, ip);
preempt_enable();
return ret;
}
在Linux源码中,__mutex_lock_slowpath函数与信号量DOWN操作中的down函数非常相似,不过__mutex_lock_slowpath在把当前进程放入mutex的wait_list之前会试图多次询问mutex中的count是否为1,也就是说当前进程在进入wait_list之前会多次考察别的进程是否已经释放了这个互斥锁。这主要基于这样一个事实:拥有互斥锁的进程总是会在尽可能短的时间里释放。如果别的进程已经释放了该互斥锁,那么当前进程将可以获得该互斥锁而没有必要再去睡眠。
互斥锁的UP操作
互斥锁的UP操作为mutex_unlock,函数定义如下:
void __sched mutex_unlock(struct mutex *lock)
{
/*
* The unlocking fastpath is the 0->1 transition from 'locked'
* into 'unlocked' state:
*/
#ifndef CONFIG_DEBUG_MUTEXES
/*
* When debugging is enabled we must not clear the owner before time,
* the slow path will always be taken, and that clears the owner field
* after verifying that it was indeed current.
*/
mutex_clear_owner(lock);
#endif
__mutex_fastpath_unlock(&lock->count, __mutex_unlock_slowpath);
}
和mutex_lock函数一样,mutex_lock函数也有两条主线:__mutex_fastpath_unlock和__mutex_unlock_slowpath,分别用于对互斥锁的快速和慢速解锁操作。
__mutex_fastpath_unlock的定义如下:
static inline void
__mutex_fastpath_unlock(atomic_t *count, void (*fail_fn)(atomic_t *))
{
//count加1,如果大于0,表示无人等待mutex,直接返回
if (unlikely(atomic_inc_return_release(count) <= 0))
fail_fn(count); //唤醒别的进程
}
__visible void
__mutex_unlock_slowpath(atomic_t *lock_count)
{
struct mutex *lock = container_of(lock_count, struct mutex, count);
__mutex_unlock_common_slowpath(lock, 1);
}
static inline void
__mutex_unlock_common_slowpath(struct mutex *lock, int nested)
{
unsigned long flags;
WAKE_Q(wake_q);
if (__mutex_slowpath_needs_to_unlock())
atomic_set(&lock->count, 1); //将count设为1
spin_lock_mutex(&lock->wait_lock, flags);
mutex_release(&lock->dep_map, nested, _RET_IP_);
debug_mutex_unlock(lock);
//从wait_list取出第一个进程
if (!list_empty(&lock->wait_list)) {
/* get the first entry from the wait-list: */
struct mutex_waiter *waiter =
list_entry(lock->wait_list.next,
struct mutex_waiter, list);
debug_mutex_wake_waiter(lock, waiter);
wake_q_add(&wake_q, waiter->task);
}
spin_unlock_mutex(&lock->wait_lock, flags);
wake_up_q(&wake_q); //唤醒进程
}