Memory Order
借Wiki的定义,
Memory ordering describes the order of accesses to computer memory by a CPU. The term can refer either to the memory ordering generated by the compiler during compile time, or to the memory ordering generated by a CPU during runtime.
内存顺序可以指编译器乱序或者运行期乱序。
编译期乱序
编译器在编译的时候会尝试重排指令以执行优化。 比如gcc在这段的代码:https://godbolt.org/z/enPGxeMjf
int g_a = 0;
int g_b = 0;
void foo() {
g_a = g_b + 1;
g_b = 1;
}
void bar() {
while (g_b == 0) continue;
assert(g_a == 1);// maybe fail.
}
以上代码可能会被优化成如下汇编
foo():
mov eax, DWORD PTR g_b[rip] // int tmp = g_b;
mov DWORD PTR g_b[rip], 1 // g_b = 1;
add eax, 1 // tmp += 1
// g_a = tmp;
//(存在race condition),当g_b被赋值时,g_a可能还等于0
mov DWORD PTR g_a[rip], eax
ret
手动加上编译期fence后,cppreference指出不同于std::atomic_thread_fence
,std::atomic_signal_fence
不会生成运行期fence指令
void foo() {
g_a = g_b + 1;
std::atomic_signal_fence(std::memory_order_seq_cst);
g_b = 1;
}
// 编译出的汇编
foo():
mov eax, DWORD PTR g_b[rip] //int tmp = g_b;
add eax, 1 // tmp += 1;
mov DWORD PTR g_a[rip], eax //g_a = tmp;
mov DWORD PTR g_b[rip], 1 //g_b = 1;
ret
以上race condition
是从汇编推断的,实际运行了半个小时我也没触发。。可能因为指令太短了吧
再找一个必定触发的编期间重排的例子,大概来自Is it legal for a C++ optimizer to reorder calls to clock()?
例子的链接可以见: https://godbolt.org/z/9re9YTb9z
在MSVC2015
以后的cl都可以复现,在/O2
下
auto start = std::chrono::steady_clock::now();
int result = LongComputation(1000);
auto stop = std::chrono::steady_clock::now();
会被优化成
auto start = std::chrono::steady_clock::now();
auto stop = std::chrono::steady_clock::now();
int result = LongComputation(1000);
并且这个问题对于MSVC还更复杂一点。
如果LongComputation()
是一个编译期能看见其完全实现,且能确定其不会有副作用的情况下,加MemoryBarrier
不会阻挡MSVC对其进行重排序。
加入MemoryBarrier()
的例子可以见: https://godbolt.org/z/e5zP71vn1
int fib(int x)
{
if (x < 2) return 1;
int res = fib(x - 1) + fib(x - 2);
return res;
}
int main()
{
auto start = std::chrono::steady_clock::now();
MemoryBarrier(); // 即使加了屏障,编译器仍然会进行重排
std::atomic_signal_fence(std::memory_order_seq_cst);
int result = fib(42);
auto stop = std::chrono::steady_clock::now();
...
}
编译出来的汇编为
call _Query_perf_frequency
mov rbp, rax
call _Query_perf_counter
mov rdi, rax
lock or DWORD PTR [rsp], 0 ; MemoryBarrier()生成了运行期fence,然而没能阻挡编译期重排
npad 1
call _Query_perf_frequency
mov r14, rax
call _Query_perf_counter ; 连续执行两次Now后再执行fib
mov ecx, 41 ; 00000029H
mov rsi, rax
call int fib(int) ; fib
mov ecx, 40 ; 00000028H
mov ebx, eax
call int fib(int) ; fib
要完全阻挡MSVC的重排序可以考虑写作,对volatile变量的读写能阻挡编译器做编译期重排序,但是不是运行期(所以volatile变量不能在多线程里做同步,MSVC的扩展实现volatile带有acl-rel语义)
int tmp = fib(x - 1) + fib(x - 2);
int res = *(volatile int*)&(tmp);
运行期乱序
在现代CPU中,CPU可以乱序执行指令以充分发挥其计算能力,这个过程对编程者是透明的。 然而在多线程程序下,不同线程之间的变量读写往往具有一定的假定顺序,而CPU的乱序执行会导致错误。
Intel的资料里 Intel_Reordering_318147.pdf 第7页描述了4种内存读写情况(内存读写只有Load/Store两种操作,其组合只有4种):
- Load Load: Loads are not reordered with other loads.
- Store Store: Stores are not reordered with other stores.
- Load Store: Stores are not reordered with older loads.
- Store Load:Loads may be reordered with older stores to different locations but not with older stores to the same location
X86是强一致性内存模型,其只允许一种情况下出现运行期重排: Store-Load。
一个用于观察Store-Load
的程序可以见饶萌大佬的文章(https://zhuanlan.zhihu.com/p/41872203),以下例子修改自其文章
#include <atomic>
#include <bits/stdc++.h>
using namespace std;
inline void nocpufence(std::atomic<int>& x, std::atomic<int>& y, std::atomic<int>& r) {
x.store(1,std::memory_order_release);
r.store(y.load(std::memory_order_acquire),std::memory_order_release);
}
inline void cpufence(std::atomic<int>& x, std::atomic<int>& y, std::atomic<int>& r) {
x.store(1,std::memory_order_release);
std::atomic_thread_fence(std::memory_order_seq_cst);
r.store(y.load(std::memory_order_acquire),std::memory_order_release);
}
volatile int g_cnt = 0;
void threadfun(volatile int& loop_cnt, std::atomic<int>& x, std::atomic<int>& y, std::atomic<int>& r) {
while(true) {
while(g_cnt == loop_cnt) ;
asm volatile("" ::: "memory");
cpufence(x,y,r);
// nocpufence(x, y, r);
asm volatile("" ::: "memory");
loop_cnt++;
}
}
int main() {
alignas(64) volatile int cnt1 = 0;
alignas(64) volatile int cnt2 = 0;
alignas(64) std::atomic<int> x{0};
alignas(64) std::atomic<int> y{0};
alignas(64) std::atomic<int> r1{0};
alignas(64) std::atomic<int> r2{0};
thread thr1(threadfun, ref(cnt1), ref(x), ref(y), ref(r1));
thread thr2(threadfun, ref(cnt2), ref(y), ref(x), ref(r2));
int detected = 0;
while(true) {
// x = y = 0;
x.store(0,memory_order_release);
y.store(0,memory_order_release);
asm volatile("" ::: "memory");
g_cnt++;
while(cnt1 != g_cnt || cnt2 != g_cnt)
;
asm volatile("" ::: "memory");
if(r1 == 0 && r2 == 0) {
detected++;
cout << "bad, g_cnt: " << g_cnt << " detected: " << detected << endl;
}
}
return 0;
}
Seq_Cst
为了控制运行期的重排,需要编写对应平台的汇编指令以实现内存屏障阻止重排,其中x86下一种可行的汇编是mfence
指令。
为了跨平台,C++11抽象出了std::memory_order
概念,其主要包含relaxed
,release-consume/acquire
,seq_cst
几种概念。
enum memory_order {
memory_order_relaxed,
memory_order_consume,
memory_order_acquire,
memory_order_release,
memory_order_acq_rel,
memory_order_seq_cst
};
其中memory_order_acq_rel
,memory_order_seq_cst
一般用于原子变量。
比如以下场景: (https://godbolt.org/z/zbTKaWjYv)
std::atomic<uint32_t> g_counter{0};
void foo()
{
g_counter.fetch_add(1, std::memory_order_acq_rel);
}
在RISC-V下的汇编为,可以明显看到fence
指令,
lui a5,%hi(g_counter)
li a4,1
addi a5,a5,%lo(g_counter)
fence iorw,ow; amoadd.w.aq zero,a4,0(a5)
lui a4,%hi(j)
lw a5,%lo(j)(a4)
addiw a5,a5,1
sw a5,%lo(j)(a4)
其在x86下为
foo():
lock add DWORD PTR g_counter[rip], 1
add DWORD PTR j[rip], 1
其中lock add
会锁总线,所以也会起到内存屏障的作用。
当把其内存序改成relaxed
以后,RISCV的gcc放弃了fence指令
void foo2()
{
g_counter.fetch_add(1, std::memory_order_relaxed);
j++;
}
foo2():
lui a5,%hi(g_counter) //把g_counter的高20位放到a5寄存器
li a4,1 // a4寄存器放1
addi a5,a5,%lo(g_counter) // addi只能加12位,把g_counter低12位补上
amoadd.w zero,a4,0(a5) // fence 指令不见了
lui a4,%hi(j)
lw a5,%lo(j)(a4)
addiw a5,a5,1
sw a5,%lo(j)(a4)
ret
Release-Acquire
此外,Release
和Acquire
需要成对使用,以在不同线程之间同步变量以及构成Memory-Order。Release-Consume
语义似乎没有编译器实现,忽略吧。
对x86而言只有seq_cst
必须加fence,其他的除了relaxed
以外主要起提示编译器禁止重排的作用。
C++中不同MemoryOrder到对应架构的Fence指令可以见(https://www.cl.cam.ac.uk/~pes20/cpp/cpp0xmappings.html)
可以看到只有在Store Seq-Cst
下必须有mfence/XCHG/Lock
相关的指令。
在Release-Acquire
语义中,同一个原子变量在一个线程A里通过Store(Release)
操作进行store,而在另一个线程B通过Load(Acquire)
操作读取,构成Release-Acquire
屏障。
在Release-Acquire
屏障中,任何在线程A Store(Release)
之前的Store
操作,无论是原子的还是非原子的,都在线程B Load(Acquire)
操作后确定可见。
对于X86说这是默认实现的,因此从汇编中看不出什么东西,然而在弱内存序的RISC-V/ARM中,会生产对应的屏障指令以构建符合期望的内存顺序。 一个例子可以见: https://godbolt.org/z/GW3bos5WW
std::atomic<std::string*> ptr;
int data;
void producer()
{
std::string* p = new std::string("Hello");
data = 42;
ptr.store(p, std::memory_order_release);
}
void consumer()
{
std::string* p2;
while (!(p2 = ptr.load(std::memory_order_acquire)))
;
assert(*p2 == "Hello"); // never fires
assert(data == 42); // never fires
}
可以看见在RV
的汇编中
sw a4,%lo(data)(a5)
lui a5,%hi(ptr[abi:cxx11])
addi a5,a5,%lo(ptr[abi:cxx11])
fence iorw,ow; amoswap.d zero,a0,0(a5) //屏障
ld ra,8(sp)
addi sp,sp,16
jr ra
以及在ARM的汇编中
movs r1, #42
str r5, [r3, #4]
str r1, [r2, #4]
dmb ish // Data Memory Barrier
在生成了对应的运行期屏障指令后,我们通过原子变量ptr
的读写操作建立了两个线程之间的同步关系,在consumer()
线程中当p2
读取成功时候,我们能够断定data
变量已经被写入。
此外,C++的原子变量的读写都会插入编译期屏障,编译器不会试图重排原子变量前后的读写操作(待实验)。
Lockfree SPSC-Queue
我们可以用Acquire-Release
语义构建无锁固定大小的SPSC
队列。
分析MengRao
大佬的代码: (https://github.com/MengRao/SPSC_Queue.git)
其SPSCQueue
为固定大小的单入单出队列,消息大小以128
字节对齐。
固定大小的SPSCQueue
可以被实现为RingBuffer的形式,由read_idx
和write_idx
在环上一直走。其中push/front
只关心write_idx
走到哪里,因此可以构成一个同步。pop/alloc
操作与read_idx
有关,构成另外一对同步。
其用法可以在multhread_q.cc中看到。
当写入消息时,其代码约为
struct Msg
{
int val_len;
uint64_t ts;
long val[4];
};
Msg* msg = q->alloc();
msg->val = ..
q->push();
读取消息时为
Msg* msg = q->front();
...
q->pop();
其中包含两个Acquire-Release
同步关系,一个是push
和front
之间
void push() {
((std::atomic<uint32_t>*)&write_idx)->store(write_idx + 1, std::memory_order_release);
}
T* front() {
if (read_idx == ((std::atomic<uint32_t>*)&write_idx)->load(std::memory_order_acquire)) {
return nullptr;
}
return &data[read_idx % CNT];
}
在消息写入线程中,调用push会更新write_idx
的数值,并且获得release
语义,而在读取线程中会调用load + acquire
语义,构成读写屏障。
在q->push()
之前,也就是填充msg
结构体的相关store操作,将会在读取线程load
以后立刻可见。
另外一对同步发生在alloc
和pop
之间。
T* alloc() {
if (write_idx - read_idx_cach == CNT) {
read_idx_cach = ((std::atomic<uint32_t>*)&read_idx)->load(std::memory_order_consume);
if (__builtin_expect(write_idx - read_idx_cach == CNT, 0)) { // no enough space
return nullptr;
}
}
return &data[write_idx % CNT];
}
void pop() {
((std::atomic<uint32_t>*)&read_idx)->store(read_idx + 1, std::memory_order_release);
}
其中alloc
是判断队列是否还有空的slot,而pop是移动read_idx
而让出新的slot。
在这里作者做了点优化,即缓存了read_idx
的值。
- 当
write_idx - read_idx < CNT
时,说明队列里还有至少1个slot,此时写的线程并不用关心到底还剩多少个slot,也即不关心read_idx究竟等于多少。 - 当
write_idx == read_idx_cache
,此时写的线程通过缓存的read_idx认为slot已经用尽。此时通过load(acquire)
语义获取真的read_index的值,判断是否真的已经耗尽。
pop
和alloc
的store-load
构成release-acquire
语义(没有编译器实现了consume语义,因此consume就等于acquire)。在写的线程read_idx->load
时,req_acq
语义确保其能看到最新的read_idx的值。