futex同步机制分析之三内核实现

一、源码引入

前两篇从应用分析到了库,本篇到内核中看看,futex到底何方神圣?(Linux3.1.1) 先看一下futex.c和futex.h(kennel/futex.c linux/futex.h):
/**
 * struct futex_q - The hashed futex queue entry, one per waiting task
 * @list:		priority-sorted list of tasks waiting on this futex
 * @task:		the task waiting on the futex
 * @lock_ptr:		the hash bucket lock
 * @key:		the key the futex is hashed on
 * @pi_state:		optional priority inheritance state
 * @rt_waiter:		rt_waiter storage for use with requeue_pi
 * @requeue_pi_key:	the requeue_pi target futex key
 * @bitset:		bitset for the optional bitmasked wakeup
 *
 * We use this hashed waitqueue, instead of a normal wait_queue_t, so
 * we can wake only the relevant ones (hashed queues may be shared).
 *
 * A futex_q has a woken state, just like tasks have TASK_RUNNING.
 * It is considered woken when plist_node_empty(&q->list) || q->lock_ptr == 0.
 * The order of wakeup is always to make the first condition true, then
 * the second.
 *
 * PI futexes are typically woken before they are removed from the hash list via
 * the rt_mutex code. See unqueue_me_pi().
 */
struct futex_q {
	struct plist_node list; //内核中的链表节点,用于将数据进行管理

	struct task_struct \*task;//futex相关联的指针队列-线程或者进程
	spinlock_t \*lock_ptr;//自旋锁指针
	union futex_key key;//查找futex的key
	struct futex_pi_state \*pi_state;  //控制优先级继承
	struct rt_mutex_waiter \*rt_waiter;
	union futex_key \*requeue_pi_key;
	u32 bitset; //常用的掩码匹配
};
英文注释写得非常清楚,画蛇添足,又增加了中文注释。
/*
 * Priority Inheritance state:
 */
struct futex_pi_state {
	/*
	 * list of 'owned' pi_state instances - these have to be
	 * cleaned up in do_exit() if the task exits prematurely:
	 */
	struct list_head list;

	/*
	 * The PI object:
	 \*/
	struct rt_mutex pi_mutex;

	struct task_struct \*owner;
	atomic_t refcount;

	union futex_key key;
};
//key是一个共用体
union futex_key {
	struct {
		unsigned long pgoff;
		struct inode \*inode;
		int offset;
	} shared;//futex地址结构体--不同进程间区别用
	struct {
		unsigned long address;
		struct mm_struct \*mm;
		int offset;
	} private;//地址结构体--同一进程间区别
	struct {
		unsigned long word;
		void \*ptr;
		int offset;
	} both;//共用结构体
};
它们在使用时上面结构体中的相关task会被挂载到一个哈希桶中:
/*
 * Hash buckets are shared by all the futex_keys that hash to the same
 * location.  Each key may have multiple futex_q structures, one for each task
 * waiting on a futex.
 */
struct futex_hash_bucket {
	spinlock_t lock;
	struct plist_head chain;
};

static struct futex_hash_bucket futex_queues[1<<FUTEX_HASHBITS];
区别它们的办法就是通过上面的key来查找相关的futex的。看一下相关的futex进程应用模型:

futex同步机制分析之三内核实现

futex的应用是要有一个init函数的:

static int __init futex_init(void)
{
	u32 curval;
	int i;

	/*
	 * This will fail and we want it. Some arch implementations do
	 * runtime detection of the futex_atomic_cmpxchg_inatomic()
	 * functionality. We want to know that before we call in any
	 * of the complex code paths. Also we want to prevent
	 * registration of robust lists in that case. NULL is
	 * guaranteed to fault and we get -EFAULT on functional
	 * implementation, the non-functional ones will return
	 * -ENOSYS.
	 \*/
	if (cmpxchg_futex_value_locked(&curval, NULL, 0, 0) == -EFAULT)
		futex_cmpxchg_enabled = 1;

	for (i = 0; i < ARRAY_SIZE(futex_queues); i++) {
		plist_head_init(&futex_queues[i].chain);
		spin_lock_init(&futex_queues[i].lock);
	}

	return 0;
}
在前面的第二篇glibc的分析中提到了三种futex,在这里都可以找到,主要说明pi-futex: pi-futex,也就是Priority Inheritance,优先级继承,如果学习过操作系统原理的一定知道“优先级反转”这个名词,它就是用来解决优先级反转的一种办法。 看一下相关的代码,这里不全列出,如有兴趣可参看内核的代码:
static int wake_futex_pi(u32 \__user *uaddr, u32 uval, struct futex_q *this)
{
	struct task_struct *new_owner;
	struct futex_pi_state *pi_state = this->pi_state;
	u32 curval, newval;

	if (!pi_state)
		return -EINVAL;

	/*
	 * If current does not own the pi_state then the futex is
	 * inconsistent and user space fiddled with the futex value.
	 */
	if (pi_state->owner != current)
		return -EINVAL;

	raw_spin_lock(&pi_state->pi_mutex.wait_lock);
	new_owner = rt_mutex_next_owner(&pi_state->pi_mutex);

	/*
	 * It is possible that the next waiter (the one that brought
	 * this owner to the kernel) timed out and is no longer
	 * waiting on the lock.
	 */
	if (!new_owner)
		new_owner = this->task;

	/*
	 * We pass it to the next owner. (The WAITERS bit is always
	 * kept enabled while there is PI state around. We must also
	 * preserve the owner died bit.)
	 */
	if (!(uval & FUTEX_OWNER_DIED)) {
		int ret = 0;

		newval = FUTEX_WAITERS | task_pid_vnr(new_owner);

		if (cmpxchg_futex_value_locked(&curval, uaddr, uval, newval))
			ret = -EFAULT;
		else if (curval != uval)
			ret = -EINVAL;
		if (ret) {
			raw_spin_unlock(&pi_state->pi_mutex.wait_lock);
			return ret;
		}
	}

	raw_spin_lock_irq(&pi_state->owner->pi_lock);
	WARN_ON(list_empty(&pi_state->list));
	list_del_init(&pi_state->list);
	raw_spin_unlock_irq(&pi_state->owner->pi_lock);

	raw_spin_lock_irq(&new_owner->pi_lock);
	WARN_ON(!list_empty(&pi_state->list));
	list_add(&pi_state->list, &new_owner->pi_state_list);
	pi_state->owner = new_owner;
	raw_spin_unlock_irq(&new_owner->pi_lock);

	raw_spin_unlock(&pi_state->pi_mutex.wait_lock);
	rt_mutex_unlock(&pi_state->pi_mutex);

	return 0;
}
static int unlock_futex_pi(u32 __user *uaddr, u32 uval)
{
	u32 oldval;

	/*
	 * There is no waiter, so we unlock the futex. The owner died
	 * bit has not to be preserved here. We are the owner:
	 */
	if (cmpxchg_futex_value_locked(&oldval, uaddr, uval, 0))
		return -EFAULT;
	if (oldval != uval)
		return -EAGAIN;

	return 0;
}
/**
 * requeue_pi_wake_futex() - Wake a task that acquired the lock during requeue
 * @q:		the futex_q
 * @key:	the key of the requeue target futex
 * @hb:		the hash_bucket of the requeue target futex
 *
 * During futex_requeue, with requeue_pi=1, it is possible to acquire the
 * target futex if it is uncontended or via a lock steal.  Set the futex_q key
 * to the requeue target futex so the waiter can detect the wakeup on the right
 * futex, but remove it from the hb and NULL the rt_waiter so it can detect
 * atomic lock acquisition.  Set the q->lock_ptr to the requeue target hb->lock
 * to protect access to the pi_state to fixup the owner later.  Must be called
 * with both q->lock_ptr and hb->lock held.
 \*/
static inline
void requeue_pi_wake_futex(struct futex_q \*q, union futex_key \*key,
			   struct futex_hash_bucket \*hb)
{
	get_futex_key_refs(key);
	q->key = \*key;

	__unqueue_futex(q);

	WARN_ON(!q->rt_waiter);
	q->rt_waiter = NULL;

	q->lock_ptr = &hb->lock;

	wake_up_state(q->task, TASK_NORMAL);
}

从这里可以看出,内核针对不同的场景,设计了三种不同的futex,而上面的pi-futex正是用来处理优先级反转的一种,优先级反转是一种进程调用现象,在进程(线程)管理中,低优先级的进程(线程)掌握了高优先级的进程的同步资源,那么就需要处理一下优先级来达到尽快让高优先级的进程(线程)执行,解决优先级翻转问题有优先级天花板(priority ceiling)和优先级继承(priority inheritance)两种办法。内核中使用的是后者,即临时把低优先级的进程提高优先级,参与高优先级的共同执行,待释放资源后再将其降低到原来的优先级。 这里需要重点提一下requeue这个功能,这是为了更好的支持pthread_cond的broad_cast,用过条件变量的同学们可能都知道,这玩意儿一般会和一个mutex共同来使用,保证线程操作的顺序(底层通过cond和mutex的从属关系来达到),即它包含了两次等待,或两个队列等待。一个是cond信号的等待(队列),另一个是它所属的mutex锁的等待(队列)。这使得一次cond wait必须先释放mutex,然后依次等待cond信号和mutex。那这样的结果就是会造成两次的等待和唤醒,可是在广播唤醒时,只需唤醒一个线程就可以了,不需要惊醒所有的线程来竞争。所以这个requeue就是用来优化这部分的。其实就是将两个等待的唤醒队列优化成一次。 所以说,不读内核源码,你对线程的理解总是浮在上面的,无法掌握其本质。

二、调用方式

看一下对外的接口:

SYSCALL_DEFINE6(futex, u32 __user *, uaddr, int, op, u32, val,
		struct timespec __user *, utime, u32 __user *, uaddr2,
		u32, val3)
{
	struct timespec ts;
	ktime_t t, *tp = NULL;
	u32 val2 = 0;
	int cmd = op & FUTEX_CMD_MASK;

	if (utime && (cmd == FUTEX_WAIT || cmd == FUTEX_LOCK_PI ||
		      cmd == FUTEX_WAIT_BITSET ||
		      cmd == FUTEX_WAIT_REQUEUE_PI)) {
		if (copy_from_user(&ts, utime, sizeof(ts)) != 0)
			return -EFAULT;
		if (!timespec_valid(&ts))
			return -EINVAL;

		t = timespec_to_ktime(ts);
		if (cmd == FUTEX_WAIT)
			t = ktime_add_safe(ktime_get(), t);
		tp = &t;
	}
	/*
	 * requeue parameter in 'utime' if cmd == FUTEX_*_REQUEUE_*.
	 * number of waiters to wake in 'utime' if cmd == FUTEX_WAKE_OP.
	 */
	if (cmd == FUTEX_REQUEUE || cmd == FUTEX_CMP_REQUEUE ||
	    cmd == FUTEX_CMP_REQUEUE_PI || cmd == FUTEX_WAKE_OP)
		val2 = (u32) (unsigned long) utime;

	return do_futex(uaddr, op, val, tp, uaddr2, val2, val3);
}
long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout,
		u32 __user *uaddr2, u32 val2, u32 val3)
{
	int ret = -ENOSYS, cmd = op & FUTEX_CMD_MASK;
	unsigned int flags = 0;

	if (!(op & FUTEX_PRIVATE_FLAG))
		flags |= FLAGS_SHARED;

	if (op & FUTEX_CLOCK_REALTIME) {
		flags |= FLAGS_CLOCKRT;
		if (cmd != FUTEX_WAIT_BITSET && cmd != FUTEX_WAIT_REQUEUE_PI)
			return -ENOSYS;
	}

	switch (cmd) {
	case FUTEX_WAIT:
		val3 = FUTEX_BITSET_MATCH_ANY;
	case FUTEX_WAIT_BITSET:
		ret = futex_wait(uaddr, flags, val, timeout, val3);
		break;
	case FUTEX_WAKE:
		val3 = FUTEX_BITSET_MATCH_ANY;
	case FUTEX_WAKE_BITSET:
		ret = futex_wake(uaddr, flags, val, val3);
		break;
	case FUTEX_REQUEUE:
		ret = futex_requeue(uaddr, flags, uaddr2, val, val2, NULL, 0);
		break;
	case FUTEX_CMP_REQUEUE:
		ret = futex_requeue(uaddr, flags, uaddr2, val, val2, &val3, 0);
		break;
	case FUTEX_WAKE_OP:
		ret = futex_wake_op(uaddr, flags, uaddr2, val, val2, val3);
		break;
	case FUTEX_LOCK_PI:
		if (futex_cmpxchg_enabled)
			ret = futex_lock_pi(uaddr, flags, val, timeout, 0);
		break;
	case FUTEX_UNLOCK_PI:
		if (futex_cmpxchg_enabled)
			ret = futex_unlock_pi(uaddr, flags);
		break;
	case FUTEX_TRYLOCK_PI:
		if (futex_cmpxchg_enabled)
			ret = futex_lock_pi(uaddr, flags, 0, timeout, 1);
		break;
	case FUTEX_WAIT_REQUEUE_PI:
		val3 = FUTEX_BITSET_MATCH_ANY;
		ret = futex_wait_requeue_pi(uaddr, flags, val, timeout, val3,
					    uaddr2);
		break;
	case FUTEX_CMP_REQUEUE_PI:
		ret = futex_requeue(uaddr, flags, uaddr2, val, val2, &val3, 1);
		break;
	default:
		ret = -ENOSYS;
	}
	return ret;
}
通过代码可以看到,系统调用会调用do_futex,而它又会通过op来产生cmd来确定具体的调用的FUTEX的形式。因为此处不是分析内核源码,所以点到为止。 三篇futex的文章,写得有些粗糙,但是如果跟踪库和内核仔细分析下去,估计还得写好多,这就不是分析futex的本意了。

三、总结

前面分析了futex的各种优点,它有什么缺点没?有,而且很坑,搞JAVA的如果搞得深的都遇到过这个问题:

https://github.com/torvalds/linux/commit/76835b0ebf8a7fe85beb03c75121419a7dec52f0,在get_futex_key_refs这个函数里,老大们疏忽了。

还有这个:

https://ma.ttias.be/linux-futex_wait-bug/

还有一个是拒绝服务的漏洞,好像跟它也有关系,好东西,不可能啥都好,是不是?

futex同步机制分析之三内核实现