riscv-spinlock-rwlock

Introduce riscv spinlock and rwlock.

spin lock

spin lock是,当无法获得锁的时候,会原地打转,等待资源,直到获得锁之后,才会往下走,进去临界区。

spinlock提供三个函数:初始化锁,加锁,解锁。

1
2
3
4
spinlock_t lock;
spin_lock_init(&lock);
spin_lock(&lock);
spin_unlock(&unlock);

另外还有几个函数变种

1
2
3
4
spin_lock_irq()        // 在使用spinlock的同时禁止中断,释放时打开中断
spin_lock_irqsave() // 在使用spinlock的同时禁止中断,并将中断状态保存起来,释放时恢复中断状态
spin_lock_bh() // 禁止中断下半部使用spin_is_locked接口
spin_is_locked() // 查询是否上锁

变种都是在基础函数之上做的扩展,我们只分析基础函数。

spinlock_t 定义在 arch/riscv/include/asm/spinlock_types.h

1
2
3
typedef struct {
volatile unsigned int lock;
} arch_spinlock_t;

从定义可以看出,riscv架构下的spinlock并没有保证公平抢锁,只是定义了一个变量而已。这里和arm是不一样的。arm中spinlock是有定义owner和next变量,用来对锁请求进行排序,从而保证先到先得。而riscv的实现目前还是在锁释放的时刻,所有请求一起去抢,谁抢到是谁的,这种机制。

lock和unlock函数在arch/riscv/include/asm/spinlock.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/*
* Simple spin lock operations. These provide no fairness guarantees.
*/

/* FIXME: Replace this with a ticket lock, like MIPS. */

#define arch_spin_is_locked(x) (READ_ONCE((x)->lock) != 0)

static inline void arch_spin_unlock(arch_spinlock_t *lock)
{
smp_store_release(&lock->lock, 0);
}

static inline int arch_spin_trylock(arch_spinlock_t *lock)
{
int tmp = 1, busy;

__asm__ __volatile__ (
" amoswap.w %0, %2, %1\n"
RISCV_ACQUIRE_BARRIER
: "=r" (busy), "+A" (lock->lock)
: "r" (tmp)
: "memory");

return !busy;
}

static inline void arch_spin_lock(arch_spinlock_t *lock)
{
while (1) {
if (arch_spin_is_locked(lock))
continue;

if (arch_spin_trylock(lock))
break;
}
}

可以看出,riscv架构下的spinlock是使用amoswap指令完成的。

再看下_raw_spin_lock反汇编。

其核心主要是这几条指令:

1
2
3
4
5
6
7
8
9
     li a4, 1
fde: lw a5,0(a0) // load锁的值
...
bnez a5, fde // 判断锁是否被其他人占用,如果占用,回到fde重新load
amoswap.w a5, a4(a0) // 如果没有占用,尝试抢锁,将1写入锁,并将锁内的值读回来
fence r.rw // 加入barrier保证amo被别的核看见
...
bnez a5, fde // 判断锁是否抢到,如果没抢到,回到fde重新等待
... // 抢到锁了,进入临界区

再看下解锁的反汇编:

其核心就是一条store指令。

1
2
fence rw, w
sw zero, 0(a0)

举一个双核抢锁的例子来说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
hart0                 hart 1
lw a5,0(a0) // h0 load, a5=0
lw a5,0(a0) // h1 load, a5=0
bnez a5,fde // h0 判断a5等于0,不跳转
bnez a5,fde // h1 判断a5等于0,不跳转
amoswap.w a5,a4(a0) // h0 将1写入锁中,将锁中的0读回来,放到a5里
amoswap.w a5,a4(a0) // h1 将1写入锁中,将锁中的1读回来,放到a5里
bnez a5, fde // h0 判断a5等于0,不跳转,进入临界区
bnez a5, fde // h1 判断a5不等于0,跳转,重新抢锁
....
lw a5,0(a0) // h1 load, a5=1
bnez a5, fde // h1 判断a5不等于0,跳转
.... ....
sw zero, 0(a0) // h0 释放锁,将0写入锁中
lw a5,0(a0) // h1 load, a5=0
bnez a5, fde // h1 判断a5等于0,不跳转
amoswap.w a5,a4(a0) // h1 将1写入锁中,将锁中的0读回来,放到a5里
bnez a5, fde // h1 判断a5等于0,不跳转,进入临界区
...
sw zero, 0(a0) // h1 释放锁,将0写入锁中

rw lock

上面的spinlock是在只要有资源请求之前,就需要抢到锁,再进行操作。而在很多场景下,只需要对资源进行读,而不需要写,这些读请求,理论上是可以允许多个人同时进行读取的,而只有写才需要只允许一个人写。读写锁rwlock就是为这些需求定制的。可以达到提高并发性能的目的。

加锁逻辑是:

  • 如果没有读写请求时,读和写都能抢到锁,但只能其中一个抢到
  • 如果有读请求时,写不能进入,新来的读可以进入
  • 如果有写请求时,新来的写和读都不能进入
  • 如果有一个或多个读请求时,写不能抢占,要等到所有读都释放,写才可以进入

解锁逻辑时:

  • 写解锁时,所有读写请求同时抢占,谁抢到是谁的
  • 读解锁时,如果还有读请求在锁中,只有读可以继续抢占;如果没有读请求了,读和写都可以抢占。

基础的函数是:

1
2
3
4
read_lock()
write_lock()
read_unlock()
write_unlock()

当然,也有一些变种,就不详细列举了。

arch_rwlock_t 定义在 arch/riscv/include/asm/spinlock_types.h

1
2
3
typedef struct {
volatile unsigned int lock;
} arch_rwlock_t;

从定义可以看出,也是只定义了一个lock变量。

lock和unlock函数在arch/riscv/include/asm/spinlock.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
static inline void arch_read_lock(arch_rwlock_t *lock)
{
int tmp;

__asm__ __volatile__(
"1: lr.w %1, %0\n"
" bltz %1, 1b\n"
" addi %1, %1, 1\n"
" sc.w %1, %1, %0\n"
" bnez %1, 1b\n"
RISCV_ACQUIRE_BARRIER
: "+A" (lock->lock), "=&r" (tmp)
:: "memory");
}

static inline void arch_write_lock(arch_rwlock_t *lock)
{
int tmp;

__asm__ __volatile__(
"1: lr.w %1, %0\n"
" bnez %1, 1b\n"
" li %1, -1\n"
" sc.w %1, %1, %0\n"
" bnez %1, 1b\n"
RISCV_ACQUIRE_BARRIER
: "+A" (lock->lock), "=&r" (tmp)
:: "memory");
}

static inline int arch_read_trylock(arch_rwlock_t *lock)
{
int busy;

__asm__ __volatile__(
"1: lr.w %1, %0\n"
" bltz %1, 1f\n"
" addi %1, %1, 1\n"
" sc.w %1, %1, %0\n"
" bnez %1, 1b\n"
RISCV_ACQUIRE_BARRIER
"1:\n"
: "+A" (lock->lock), "=&r" (busy)
:: "memory");

return !busy;
}

static inline int arch_write_trylock(arch_rwlock_t *lock)
{
int busy;

__asm__ __volatile__(
"1: lr.w %1, %0\n"
" bnez %1, 1f\n"
" li %1, -1\n"
" sc.w %1, %1, %0\n"
" bnez %1, 1b\n"
RISCV_ACQUIRE_BARRIER
"1:\n"
: "+A" (lock->lock), "=&r" (busy)
:: "memory");

return !busy;
}

static inline void arch_read_unlock(arch_rwlock_t *lock)
{
__asm__ __volatile__(
RISCV_RELEASE_BARRIER
" amoadd.w x0, %1, %0\n"
: "+A" (lock->lock)
: "r" (-1)
: "memory");
}

static inline void arch_write_unlock(arch_rwlock_t *lock)
{
smp_store_release(&lock->lock, 0);
}

主要分析几个基础函数。

read_lock反汇编如下:

write_lock反汇编如下:

read unlock 反汇编如下:

write unlock反汇编如下:

从上述代码可以分析出如下点:

  • lock=0,表示没有人占用,read/write都可以抢
  • lock>0,表示read占用了,lock的值就代表有多少个read占用了
  • lock=-1,表示write占用了,只能有一个write占用,所以只可能是-1
  • read lock时
    • 如果lock<0就要等,表示有write占用住了
    • 如果lock<=0,可以进入,将lock+1写入lock,表示read多了一个。
    • 如果sc成功,进入临界区
    • 如果sc失败,重新走一次流程
  • write lock时
    • 如果lock不等于0,表示有write或者read占住了,要等
    • 如果lock等于0,将-1写入lock
    • 如果sc成功,表示占住了锁,进入临界区
    • 如果sc失败,表示没抢到锁,重新走一次流程
  • read unlock时
    • 使用amoadd指令,将lock-1,写入lock,表示释放掉一个read源
  • write unlock时
    • 使用sw指令,将lock写为0,表示释放掉write源

Update

在2022/3/21之后的linux代码,已经将上述代码删除了,合并到了架构统一的代码中了。

commit id: 19bc59bbed

spinlock

spinlock的代码被移到了:include/asm-generic/spinlock.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static __always_inline void arch_spin_lock(arch_spinlock_t *lock)
{
u32 val = atomic_fetch_add(1<<16, lock);
u16 ticket = val >> 16;

if (ticket == (u16)val)
return;

/*
* atomic_cond_read_acquire() is RCpc, but rather than defining a
* custom cond_read_rcsc() here we just emit a full fence. We only
* need the prior reads before subsequent writes ordering from
* smb_mb(), but as atomic_cond_read_acquire() just emits reads and we
* have no outstanding writes due to the atomic_fetch_add() the extra
* orderings are free.
*/
atomic_cond_read_acquire(lock, ticket == (u16)VAL);
smp_mb();
}
static __always_inline void arch_spin_unlock(arch_spinlock_t *lock)
{
u16 *ptr = (u16 *)lock + IS_ENABLED(CONFIG_CPU_BIG_ENDIAN);
u32 val = atomic_read(lock);

smp_store_release(ptr, (u16)val + 1);
}
static __always_inline int arch_spin_is_locked(arch_spinlock_t *lock)
{
u32 val = atomic_read(lock);

return ((val >> 16) != (val & 0xffff));
}

这套代码和之前的机制完全不一样了。这里使用的是amoadd来实现的。工作过程是

  • 使用高16bits表示allocated的个数,使用低16bits表示de-allocated的个数。
    • 如果高16bits和低16bits相等,表示没有人占用
    • 如果高16bits和低16bits不相等,表示有人占住了锁。
  • lock的过程
    • 将lock的高16bits增加1
    • 再读取lock的值
      • 如果原来的高16bits和低16bits相等,表示本次我抢到了锁,直接返回
      • 如果不相等,表示有其他人和我一起抢锁,我没抢到,进入循环,一直读取lock的值,直到低16bits和原来的高16bits相等,说明我等到了锁。
    • 可以看出,这样实现,是可以对抢锁的源进行排序的。比之前的代码好。
  • unlock的过程
    • 读取lock的值,将低16bits加1。

举个例子:

1
2
3
4
5
6
7
8
9
10
hart0                  hart1
val=atomic_fetch_add // h0: lock=1_0, val=0
val=atomic_fetch_add // h1: lock=2_0, val=1
val==lock[15:0] // h0: 抢到锁
.... // h0: 进入临界区
val!=lock[15:0] // h1: 没抢到锁,等待
atomic_cond_read_acquire // h1: 等待
lock[15:0]++ // h0: lock=2_1
atomic_cond_read_acquire // h1: lock[15:0] == 1, 得到锁
... // h1: 进入临界区

rwlock

rwlock的代码被移到了:include/asm-generic/qspinlock.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* queued_read_lock - acquire read lock of a queued rwlock
* @lock: Pointer to queued rwlock structure
*/
static inline void queued_read_lock(struct qrwlock *lock)
{
int cnts;

cnts = atomic_add_return_acquire(_QR_BIAS, &lock->cnts);
if (likely(!(cnts & _QW_WMASK)))
return;

/* The slowpath will decrement the reader count, if necessary. */
queued_read_lock_slowpath(lock);
}

/**
* queued_write_lock - acquire write lock of a queued rwlock
* @lock : Pointer to queued rwlock structure
*/
static inline void queued_write_lock(struct qrwlock *lock)
{
int cnts = 0;
/* Optimize for the unfair lock case where the fair flag is 0. */
if (likely(atomic_try_cmpxchg_acquire(&lock->cnts, &cnts, _QW_LOCKED)))
return;

queued_write_lock_slowpath(lock);
}

/**
* queued_read_unlock - release read lock of a queued rwlock
* @lock : Pointer to queued rwlock structure
*/
static inline void queued_read_unlock(struct qrwlock *lock)
{
/*
* Atomically decrement the reader count
*/
(void)atomic_sub_return_release(_QR_BIAS, &lock->cnts);
}

/**
* queued_write_unlock - release write lock of a queued rwlock
* @lock : Pointer to queued rwlock structure
*/
static inline void queued_write_unlock(struct qrwlock *lock)
{
smp_store_release(&lock->wlocked, 0);
}

和原来的代码机制类似。工作过程是:

  • lock[8:0] 给write用,lock[31:9]给read用

  • read_lock时,将lock[31:9] + 1。

  • read_unlock时,将lock[31:9] - 1。

  • write lock时,将lock[8:0]写为0xff

  • write unlock时,将lock[8:0]写为0

  • 当read/write lock获取失败的时候,会进入slowpath,slowpath中会配合一个spinlock来完成。

    代码在:kernel/locking/qrwlock.c