erasure code in ISA-L
纠删码简介
原理
对于纠删码(erasure code)的介绍有很多,在此不再赘述,在本文的第一个部分,我主要介绍我对纠删码,主要是RS码的理解。
纠删码的基本原理在于,构造一个行向量乘以数据矩阵,可以生成一个数据矩阵行的一个线性排列。显然,如果对于k个数据块的m个校验向量,只要这m个向量线性无关,那么在最多损失m个数据块的情况下,就可以通过这m个向量和剩余的k-m个数据块和m个校验快恢复所有k个数据块。
具体来说,假设我们需要恢复di1,…,dim几个块,那么我们需要删去(k+m)*k的矩阵中的第i1,…,im行,然后求出剩余矩阵的逆矩阵作为恢复矩阵。恢复矩阵的第i1,…,im行乘以剩余的k个数据块,就可以得到di1,…,dim的值。
需要注意的是,如果损失的是校验快,那么逆矩阵的对应行需要乘以原矩阵,才能生成恢复矩阵。
但是这样存在一个问题:如果按照常规的四则运算进行计算,那么在计算逆矩阵的时候,会出现大量的浮点数运算,这会导致计算量过大,因此需要寻找一种更加高效的计算方法。
有限域
因此,我们引入有限域(finite field)的概念。一般来说,我们在G(2^8)上进行运算,即在一个8位的字节上进行运算。这样,我们可以定义在0-255上的加法和乘法,使得这两种运算满足结合律、交换律、分配律等性质,同时也有减法和除法的定义。具体的定义可以参考这里。
这样做的好处在于将数据的运算限制在一个字节的范围中。同时,加减法可以简单地转化为异或运算,乘除法可以转化为查表运算。
ISA-L
ISA-L(Intel(R) Intelligent Storage Acceleration Library)是”a collection of optimized low-level functions targeting storage applications”,它包括“Erasure codes - Fast block Reed-Solomon type erasure codes for any encode/decode matrix in GF(2^8).”
本文主要介绍ISA-L中的纠删码实现和优化。
乘法表的生成
在ISA-L中,乘法表是通过ec_init_tables
函数生成的。这个函数的定义如下:
void ec_init_tables(int k, int rows, unsigned char *a, unsigned char *g_tbls)
{
int i, j;
for (i = 0; i < rows; i++) {
for (j = 0; j < k; j++) {
gf_vect_mul_init(*a++, g_tbls);
g_tbls += 32;
}
}
}
这个函数给编码矩阵的后k
行的每一个元素生成了32次幂表,我感觉有点过于占用存储空间了。
它调用了gf_vect_mul_init
函数,这个函数的定义如下:
void gf_vect_mul_init(unsigned char c, unsigned char *tbl)
{
unsigned char c2 = (c << 1) ^ ((c & 0x80) ? 0x1d : 0); //Mult by GF{2}
unsigned char c4 = (c2 << 1) ^ ((c2 & 0x80) ? 0x1d : 0); //Mult by GF{2}
unsigned char c8 = (c4 << 1) ^ ((c4 & 0x80) ? 0x1d : 0); //Mult by GF{2}
#if (__WORDSIZE == 64 || _WIN64 || __x86_64__) && (__BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__)
unsigned long long v1, v2, v4, v8, *t;
unsigned long long v10, v20, v40, v80;
unsigned char c17, c18, c20, c24;
t = (unsigned long long *)tbl;
v1 = c * 0x0100010001000100ull;
v2 = c2 * 0x0101000001010000ull;
v4 = c4 * 0x0101010100000000ull;
v8 = c8 * 0x0101010101010101ull;
v4 = v1 ^ v2 ^ v4;
t[0] = v4;
t[1] = v8 ^ v4;
c17 = (c8 << 1) ^ ((c8 & 0x80) ? 0x1d : 0); //Mult by GF{2}
c18 = (c17 << 1) ^ ((c17 & 0x80) ? 0x1d : 0); //Mult by GF{2}
c20 = (c18 << 1) ^ ((c18 & 0x80) ? 0x1d : 0); //Mult by GF{2}
c24 = (c20 << 1) ^ ((c20 & 0x80) ? 0x1d : 0); //Mult by GF{2}
v10 = c17 * 0x0100010001000100ull;
v20 = c18 * 0x0101000001010000ull;
v40 = c20 * 0x0101010100000000ull;
v80 = c24 * 0x0101010101010101ull;
v40 = v10 ^ v20 ^ v40;
t[2] = v40;
t[3] = v80 ^ v40;
#else // 32-bit or other
unsigned char c3, c5, c6, c7, c9, c10, c11, c12, c13, c14, c15;
unsigned char c17, c18, c19, c20, c21, c22, c23, c24, c25, c26, c27, c28, c29, c30,
c31;
c3 = c2 ^ c;
c5 = c4 ^ c;
c6 = c4 ^ c2;
c7 = c4 ^ c3;
c9 = c8 ^ c;
c10 = c8 ^ c2;
c11 = c8 ^ c3;
c12 = c8 ^ c4;
c13 = c8 ^ c5;
c14 = c8 ^ c6;
c15 = c8 ^ c7;
tbl[0] = 0;
tbl[1] = c;
tbl[2] = c2;
tbl[3] = c3;
tbl[4] = c4;
tbl[5] = c5;
tbl[6] = c6;
tbl[7] = c7;
tbl[8] = c8;
tbl[9] = c9;
tbl[10] = c10;
tbl[11] = c11;
tbl[12] = c12;
tbl[13] = c13;
tbl[14] = c14;
tbl[15] = c15;
c17 = (c8 << 1) ^ ((c8 & 0x80) ? 0x1d : 0); //Mult by GF{2}
c18 = (c17 << 1) ^ ((c17 & 0x80) ? 0x1d : 0); //Mult by GF{2}
c19 = c18 ^ c17;
c20 = (c18 << 1) ^ ((c18 & 0x80) ? 0x1d : 0); //Mult by GF{2}
c21 = c20 ^ c17;
c22 = c20 ^ c18;
c23 = c20 ^ c19;
c24 = (c20 << 1) ^ ((c20 & 0x80) ? 0x1d : 0); //Mult by GF{2}
c25 = c24 ^ c17;
c26 = c24 ^ c18;
c27 = c24 ^ c19;
c28 = c24 ^ c20;
c29 = c24 ^ c21;
c30 = c24 ^ c22;
c31 = c24 ^ c23;
tbl[16] = 0;
tbl[17] = c17;
tbl[18] = c18;
tbl[19] = c19;
tbl[20] = c20;
tbl[21] = c21;
tbl[22] = c22;
tbl[23] = c23;
tbl[24] = c24;
tbl[25] = c25;
tbl[26] = c26;
tbl[27] = c27;
tbl[28] = c28;
tbl[29] = c29;
tbl[30] = c30;
tbl[31] = c31;
#endif //__WORDSIZE == 64 || _WIN64 || __x86_64__
}
可以看出,对于64位小端的机器,这个函数使用了64位的整数来进行计算,而对于其它系统则是八位的数据。这样可以增加计算和赋值的效率。
但是,有几个地方我还不是很清楚:
1.为什么tbl[16]
是0
?
2.为什么c21 = c20 ^ c17
而不是c21 = c20 ^ c
?
3.为什么tbl
的大小是32?
接着来看看。
数据的编码与更新
在ec_encode_data_base
函数中,数据的编码是通过gf_vect_mul
函数实现的。这个函数的定义如下:
void ec_encode_data_base(int len, int srcs, int dests, unsigned char *v,
unsigned char **src, unsigned char **dest)
{
int i, j, l;
unsigned char s;
for (l = 0; l < dests; l++) {
for (i = 0; i < len; i++) {
s = 0;
for (j = 0; j < srcs; j++)
s ^= gf_mul(src[j][i], v[j * 32 + l * srcs * 32 + 1]);
dest[l][i] = s;
}
}
}
可以看出,这个函数的实现非常简单,它只是对于每一个数据块,对于每一个字节,进行了一次异或运算。但是,它调用了gf_mul
函数,这个函数的定义如下:
unsigned char gf_mul(unsigned char a, unsigned char b)
{
#ifndef GF_LARGE_TABLES
int i;
if ((a == 0) || (b == 0))
return 0;
return gff_base[(i = gflog_base[a] + gflog_base[b]) > 254 ? i - 255 : i];
#else
return gf_mul_table_base[b * 256 + a];
#endif
}
其中,gff_base
和gflog_base
是在ec_base.h
中预先定义好的全局静态数组。
void ec_encode_data_update_base(int len, int k, int rows, int vec_i, unsigned char *v,
unsigned char *data, unsigned char **dest)
{
int i, l;
unsigned char s;
for (l = 0; l < rows; l++) {
for (i = 0; i < len; i++) {
s = dest[l][i];
s ^= gf_mul(data[i], v[vec_i * 32 + l * k * 32 + 1]);
dest[l][i] = s;
}
}
}
在ec_encode_data_update_base
函数中,实现数据的更新。它与ec_encode_data_base
函数的区别在于,它只更新了一个数据块。后面还实现了有限域上的点积和线性组合,这里不再赘述。
点积的汇编优化
ec_encode_data_sse
定义了SSE指令集下的点积函数,它的定义如下:
void ec_encode_data_sse(int len, int k, int rows, unsigned char *g_tbls, unsigned char **data,
unsigned char **coding)
{
if (len < 16) {
ec_encode_data_base(len, k, rows, g_tbls, data, coding);
return;
}
while (rows >= 6) {
gf_6vect_dot_prod_sse(len, k, g_tbls, data, coding);
g_tbls += 6 * k * 32;
coding += 6;
rows -= 6;
}
switch (rows) {
case 5:
gf_5vect_dot_prod_sse(len, k, g_tbls, data, coding);
break;
case 4:
gf_4vect_dot_prod_sse(len, k, g_tbls, data, coding);
break;
case 3:
gf_3vect_dot_prod_sse(len, k, g_tbls, data, coding);
break;
case 2:
gf_2vect_dot_prod_sse(len, k, g_tbls, data, coding);
break;
case 1:
gf_vect_dot_prod_sse(len, k, g_tbls, data, *coding);
break;
case 0:
break;
}
}
可以看出,数据量较小时直接使用ec_encode_data_base
函数,否则使用SSE指令集下的点积函数,一次最多处理6个数据块。
我们先来看看gf_vect_dot_prod_sse.asm
中的实现。这段代码前面一部分是一些宏定义,这里不再赘述。我们直接来看gf_vect_dot_prod_sse
函数的实现:
mk_global gf_vect_dot_prod_sse, function
func(gf_vect_dot_prod_sse)
FUNC_SAVE
SLDR len, len_m
sub len, 16
SSTR len_m, len
jl .return_fail
xor pos, pos
movdqa xmask0f, [mask0f] ;Load mask of lower nibble in each byte
这一部分是一些函数的初始化和参数加载。
.loop16:
pxor xp, xp
mov tmp, mul_array
xor vec_i, vec_i
这部分把xp
、vec_i
初始化为0,tmp
指向mul_array
指定的乘法表。
.next_vect:
mov ptr, [src+vec_i*PS]
movdqu xgft_lo, [tmp] ;Load array Cx{00}, Cx{01}, ..., Cx{0f}
movdqu xgft_hi, [tmp+16] ; " Cx{00}, Cx{10}, ..., Cx{f0}
XLDR x0, [ptr+pos] ;Get next source vector
add tmp, 32
add vec_i, 1
这部分将xgft_lo
和xgft_hi
分别加载为tmp
指向的乘法表的前16个和后16个字节,x0
加载为src
指向的数据块的pos
位置的字节,接着更新tmp
和vec_i
。
movdqa xtmpa, x0 ;Keep unshifted copy of src
psraw x0, 4 ;Shift to put high nibble into bits 4-0
pand x0, xmask0f ;Mask high src nibble in bits 4-0
pand xtmpa, xmask0f ;Mask low src nibble in bits 4-0
pshufb xgft_hi, x0 ;Lookup mul table of high nibble
pshufb xgft_lo, xtmpa ;Lookup mul table of low nibble
pxor xgft_hi, xgft_lo ;GF add high and low partials
pxor xp, xgft_hi ;xp += partial
这里是比较复杂的一部分。首先,我们先给出pshufb
指令的定义:
Synopsis
__m128i _mm_shuffle_epi8 (__m128i a, __m128i b)
Instruction:
pshufb xmm, xmm
CPUID Flags: SSSE3 Description
Shuffle packed 8-bit integers in a according to shuffle control mask in the corresponding 8-bit element of b, and store the results in dst.
Operation
FOR j := 0 to 15 i := j*8 IF b[i+7] == 1 dst[i+7:i] := 0 ELSE index[3:0] := b[i+3:i] dst[i+7:i] := a[index*8+7:index*8] FI ENDFOR
比较奇怪的一点是这里的SSE版本是SSSE3而不是头文件中定义的SSE4.1。
这里的pshufb
指令的作用是,每个循环考虑b
中的一个字节。如果这个字节的最后一位是1,那么dst
中对应的字节就是0。否则,b
中的前四位作为索引,a
中对应的字节作为dst
中的字节。
我们再会看汇编代码。首先,我们通过移位和掩码操作,将x0
中每个字节的高四位和低四位分别提取出来。然后,我们通过pshufb
指令,将x0
中每个字节的高四位和低四位分别作为索引,从乘法表中取出对应的字节,然后进行异或操作。
这也就是将原向量{x0[0],x0[1],...,x0[15]}
转化为{Cx{x0[0]},Cx{x0[1]},...,Cx{x0[15]}}
的过程,其中Cx{u}=c^u
,由乘法表给出。
SLDR vec, vec_m
cmp vec_i, vec
jl .next_vect
SLDR dest, dest_m
XSTR [dest+pos], xp
add pos, 16 ;Loop on 16 bytes at a time
SLDR len, len_m
cmp pos, len
jle .loop16
lea tmp, [len + 16]
cmp pos, tmp
je .return_pass
;; Tail len
mov pos, len ;Overlapped offset length-16
jmp .loop16 ;Do one more overlap pass
接下来,就是把获得的数据存入dest
指针指向的空间,和一些边界条件的判断。
注意这里有两层循环,外层循环是对于每一个源数据块,内层循环是对于每16个字节。
那么,我们现在可以总结一下这个函数的作用了。它的作用是,对于每一个数据块,对于每一个字节,将这个字节的高四位和低四位分别作为索引,从乘法表中取出对应的字节,然后进行异或操作,最后将结果存入dest
指针指向的空间。
为什么这样就可以实现点积呢?
我们先看一下C语言的实现:
void gf_vect_dot_prod_base(int len, int vlen, unsigned char *v,
unsigned char **src, unsigned char *dest)
{
int i, j;
unsigned char s;
for (i = 0; i < len; i++) {
s = 0;
for (j = 0; j < vlen; j++)
s ^= gf_mul(src[j][i], v[j * 32 + 1]);
dest[i] = s;
}
}
可以看出,这个函数的作用是,循环的将第i
个向量的第j
个字节和v[j*32+1]
进行乘法运算,然后进行异或操作,最后将结果存入dest
指针指向的空间。v[j*32+1]
是乘法表中的第j
行的第1
个元素,也就是Cx{1}
。