erasure code in ISA-L

纠删码简介

原理

对于纠删码(erasure code)的介绍有很多,在此不再赘述,在本文的第一个部分,我主要介绍我对纠删码,主要是RS码的理解。

纠删码的基本原理在于,构造一个行向量乘以数据矩阵,可以生成一个数据矩阵行的一个线性排列。显然,如果对于k个数据块的m个校验向量,只要这m个向量线性无关,那么在最多损失m个数据块的情况下,就可以通过这m个向量和剩余的k-m个数据块和m个校验快恢复所有k个数据块。

具体来说,假设我们需要恢复di1,…,dim几个块,那么我们需要删去(k+m)*k的矩阵中的第i1,…,im行,然后求出剩余矩阵的逆矩阵作为恢复矩阵。恢复矩阵的第i1,…,im行乘以剩余的k个数据块,就可以得到di1,…,dim的值。

需要注意的是,如果损失的是校验快,那么逆矩阵的对应行需要乘以原矩阵,才能生成恢复矩阵。

但是这样存在一个问题:如果按照常规的四则运算进行计算,那么在计算逆矩阵的时候,会出现大量的浮点数运算,这会导致计算量过大,因此需要寻找一种更加高效的计算方法。

有限域

因此,我们引入有限域(finite field)的概念。一般来说,我们在G(2^8)上进行运算,即在一个8位的字节上进行运算。这样,我们可以定义在0-255上的加法和乘法,使得这两种运算满足结合律、交换律、分配律等性质,同时也有减法和除法的定义。具体的定义可以参考这里

这样做的好处在于将数据的运算限制在一个字节的范围中。同时,加减法可以简单地转化为异或运算,乘除法可以转化为查表运算。

ISA-L

ISA-L(Intel(R) Intelligent Storage Acceleration Library)是”a collection of optimized low-level functions targeting storage applications”,它包括“Erasure codes - Fast block Reed-Solomon type erasure codes for any encode/decode matrix in GF(2^8).”

本文主要介绍ISA-L中的纠删码实现和优化。

乘法表的生成

在ISA-L中,乘法表是通过ec_init_tables函数生成的。这个函数的定义如下:

void ec_init_tables(int k, int rows, unsigned char *a, unsigned char *g_tbls)
{
	int i, j;

	for (i = 0; i < rows; i++) {
		for (j = 0; j < k; j++) {
			gf_vect_mul_init(*a++, g_tbls);
			g_tbls += 32;
		}
	}
}

这个函数给编码矩阵的后k行的每一个元素生成了32次幂表,我感觉有点过于占用存储空间了。

它调用了gf_vect_mul_init函数,这个函数的定义如下:

void gf_vect_mul_init(unsigned char c, unsigned char *tbl)
{
	unsigned char c2 = (c << 1) ^ ((c & 0x80) ? 0x1d : 0);	//Mult by GF{2}
	unsigned char c4 = (c2 << 1) ^ ((c2 & 0x80) ? 0x1d : 0);	//Mult by GF{2}
	unsigned char c8 = (c4 << 1) ^ ((c4 & 0x80) ? 0x1d : 0);	//Mult by GF{2}

#if (__WORDSIZE == 64 || _WIN64 || __x86_64__) && (__BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__)
	unsigned long long v1, v2, v4, v8, *t;
	unsigned long long v10, v20, v40, v80;
	unsigned char c17, c18, c20, c24;

	t = (unsigned long long *)tbl;

	v1 = c * 0x0100010001000100ull;
	v2 = c2 * 0x0101000001010000ull;
	v4 = c4 * 0x0101010100000000ull;
	v8 = c8 * 0x0101010101010101ull;

	v4 = v1 ^ v2 ^ v4;
	t[0] = v4;
	t[1] = v8 ^ v4;

	c17 = (c8 << 1) ^ ((c8 & 0x80) ? 0x1d : 0);	//Mult by GF{2}
	c18 = (c17 << 1) ^ ((c17 & 0x80) ? 0x1d : 0);	//Mult by GF{2}
	c20 = (c18 << 1) ^ ((c18 & 0x80) ? 0x1d : 0);	//Mult by GF{2}
	c24 = (c20 << 1) ^ ((c20 & 0x80) ? 0x1d : 0);	//Mult by GF{2}

	v10 = c17 * 0x0100010001000100ull;
	v20 = c18 * 0x0101000001010000ull;
	v40 = c20 * 0x0101010100000000ull;
	v80 = c24 * 0x0101010101010101ull;

	v40 = v10 ^ v20 ^ v40;
	t[2] = v40;
	t[3] = v80 ^ v40;

#else // 32-bit or other
	unsigned char c3, c5, c6, c7, c9, c10, c11, c12, c13, c14, c15;
	unsigned char c17, c18, c19, c20, c21, c22, c23, c24, c25, c26, c27, c28, c29, c30,
	    c31;

	c3 = c2 ^ c;
	c5 = c4 ^ c;
	c6 = c4 ^ c2;
	c7 = c4 ^ c3;

	c9 = c8 ^ c;
	c10 = c8 ^ c2;
	c11 = c8 ^ c3;
	c12 = c8 ^ c4;
	c13 = c8 ^ c5;
	c14 = c8 ^ c6;
	c15 = c8 ^ c7;

	tbl[0] = 0;
	tbl[1] = c;
	tbl[2] = c2;
	tbl[3] = c3;
	tbl[4] = c4;
	tbl[5] = c5;
	tbl[6] = c6;
	tbl[7] = c7;
	tbl[8] = c8;
	tbl[9] = c9;
	tbl[10] = c10;
	tbl[11] = c11;
	tbl[12] = c12;
	tbl[13] = c13;
	tbl[14] = c14;
	tbl[15] = c15; 

	c17 = (c8 << 1) ^ ((c8 & 0x80) ? 0x1d : 0);	//Mult by GF{2}
	c18 = (c17 << 1) ^ ((c17 & 0x80) ? 0x1d : 0);	//Mult by GF{2}
	c19 = c18 ^ c17;
	c20 = (c18 << 1) ^ ((c18 & 0x80) ? 0x1d : 0);	//Mult by GF{2}
	c21 = c20 ^ c17;
	c22 = c20 ^ c18;
	c23 = c20 ^ c19;
	c24 = (c20 << 1) ^ ((c20 & 0x80) ? 0x1d : 0);	//Mult by GF{2}
	c25 = c24 ^ c17;
	c26 = c24 ^ c18;
	c27 = c24 ^ c19;
	c28 = c24 ^ c20;
	c29 = c24 ^ c21;
	c30 = c24 ^ c22;
	c31 = c24 ^ c23;

	tbl[16] = 0;
	tbl[17] = c17;
	tbl[18] = c18;
	tbl[19] = c19;
	tbl[20] = c20;
	tbl[21] = c21;
	tbl[22] = c22;
	tbl[23] = c23;
	tbl[24] = c24;
	tbl[25] = c25;
	tbl[26] = c26;
	tbl[27] = c27;
	tbl[28] = c28;
	tbl[29] = c29;
	tbl[30] = c30;
	tbl[31] = c31;

#endif //__WORDSIZE == 64 || _WIN64 || __x86_64__
}

可以看出,对于64位小端的机器,这个函数使用了64位的整数来进行计算,而对于其它系统则是八位的数据。这样可以增加计算和赋值的效率。

但是,有几个地方我还不是很清楚:

1.为什么tbl[16]0?

2.为什么c21 = c20 ^ c17而不是c21 = c20 ^ c?

3.为什么tbl的大小是32?

接着来看看。

数据的编码与更新

ec_encode_data_base函数中,数据的编码是通过gf_vect_mul函数实现的。这个函数的定义如下:

void ec_encode_data_base(int len, int srcs, int dests, unsigned char *v,
			 unsigned char **src, unsigned char **dest)
{
	int i, j, l;
	unsigned char s;

	for (l = 0; l < dests; l++) {
		for (i = 0; i < len; i++) {
			s = 0;
			for (j = 0; j < srcs; j++)
				s ^= gf_mul(src[j][i], v[j * 32 + l * srcs * 32 + 1]);

			dest[l][i] = s;
		}
	}
}

可以看出,这个函数的实现非常简单,它只是对于每一个数据块,对于每一个字节,进行了一次异或运算。但是,它调用了gf_mul函数,这个函数的定义如下:

unsigned char gf_mul(unsigned char a, unsigned char b)
{
#ifndef GF_LARGE_TABLES
	int i;

	if ((a == 0) || (b == 0))
		return 0;

	return gff_base[(i = gflog_base[a] + gflog_base[b]) > 254 ? i - 255 : i];
#else
	return gf_mul_table_base[b * 256 + a];
#endif
}

其中,gff_basegflog_base是在ec_base.h中预先定义好的全局静态数组。

void ec_encode_data_update_base(int len, int k, int rows, int vec_i, unsigned char *v,
				unsigned char *data, unsigned char **dest)
{
	int i, l;
	unsigned char s;

	for (l = 0; l < rows; l++) {
		for (i = 0; i < len; i++) {
			s = dest[l][i];
			s ^= gf_mul(data[i], v[vec_i * 32 + l * k * 32 + 1]);

			dest[l][i] = s;
		}
	}
}

ec_encode_data_update_base函数中,实现数据的更新。它与ec_encode_data_base函数的区别在于,它只更新了一个数据块。后面还实现了有限域上的点积和线性组合,这里不再赘述。

点积的汇编优化

ec_encode_data_sse定义了SSE指令集下的点积函数,它的定义如下:

void ec_encode_data_sse(int len, int k, int rows, unsigned char *g_tbls, unsigned char **data,
			unsigned char **coding)
{

	if (len < 16) {
		ec_encode_data_base(len, k, rows, g_tbls, data, coding);
		return;
	}

	while (rows >= 6) {
		gf_6vect_dot_prod_sse(len, k, g_tbls, data, coding);
		g_tbls += 6 * k * 32;
		coding += 6;
		rows -= 6;
	}
	switch (rows) {
	case 5:
		gf_5vect_dot_prod_sse(len, k, g_tbls, data, coding);
		break;
	case 4:
		gf_4vect_dot_prod_sse(len, k, g_tbls, data, coding);
		break;
	case 3:
		gf_3vect_dot_prod_sse(len, k, g_tbls, data, coding);
		break;
	case 2:
		gf_2vect_dot_prod_sse(len, k, g_tbls, data, coding);
		break;
	case 1:
		gf_vect_dot_prod_sse(len, k, g_tbls, data, *coding);
		break;
	case 0:
		break;
	}

}

可以看出,数据量较小时直接使用ec_encode_data_base函数,否则使用SSE指令集下的点积函数,一次最多处理6个数据块。

我们先来看看gf_vect_dot_prod_sse.asm中的实现。这段代码前面一部分是一些宏定义,这里不再赘述。我们直接来看gf_vect_dot_prod_sse函数的实现:

mk_global gf_vect_dot_prod_sse, function
func(gf_vect_dot_prod_sse)
	FUNC_SAVE
	SLDR 	len, len_m
	sub	len, 16
	SSTR 	len_m, len
	jl	.return_fail
	xor	pos, pos
	movdqa	xmask0f, [mask0f]	;Load mask of lower nibble in each byte

这一部分是一些函数的初始化和参数加载。

.loop16:
	pxor	xp, xp
	mov	tmp, mul_array
	xor	vec_i, vec_i

这部分把xpvec_i初始化为0,tmp指向mul_array指定的乘法表。

.next_vect:

	mov	ptr, [src+vec_i*PS]
	movdqu	xgft_lo, [tmp]		;Load array Cx{00}, Cx{01}, ..., Cx{0f}
	movdqu	xgft_hi, [tmp+16]	;     "     Cx{00}, Cx{10}, ..., Cx{f0}
	XLDR	x0, [ptr+pos]		;Get next source vector

	add	tmp, 32
	add	vec_i, 1

这部分将xgft_loxgft_hi分别加载为tmp指向的乘法表的前16个和后16个字节,x0加载为src指向的数据块的pos位置的字节,接着更新tmpvec_i

	movdqa	xtmpa, x0		;Keep unshifted copy of src
	psraw	x0, 4			;Shift to put high nibble into bits 4-0
	pand	x0, xmask0f		;Mask high src nibble in bits 4-0
	pand	xtmpa, xmask0f		;Mask low src nibble in bits 4-0

	pshufb	xgft_hi, x0		;Lookup mul table of high nibble
	pshufb	xgft_lo, xtmpa		;Lookup mul table of low nibble
	pxor	xgft_hi, xgft_lo	;GF add high and low partials
	pxor	xp, xgft_hi		;xp += partial

这里是比较复杂的一部分。首先,我们先给出pshufb指令的定义

Synopsis

__m128i _mm_shuffle_epi8 (__m128i a, __m128i b)

Instruction:

pshufb xmm, xmm

CPUID Flags: SSSE3 Description

Shuffle packed 8-bit integers in a according to shuffle control mask in the corresponding 8-bit element of b, and store the results in dst.

Operation

FOR j := 0 to 15
	i := j*8
	IF b[i+7] == 1
		dst[i+7:i] := 0
	ELSE
		index[3:0] := b[i+3:i]
		dst[i+7:i] := a[index*8+7:index*8]
	FI
ENDFOR

比较奇怪的一点是这里的SSE版本是SSSE3而不是头文件中定义的SSE4.1

这里的pshufb指令的作用是,每个循环考虑b中的一个字节。如果这个字节的最后一位是1,那么dst中对应的字节就是0。否则,b中的前四位作为索引,a中对应的字节作为dst中的字节。

我们再会看汇编代码。首先,我们通过移位和掩码操作,将x0中每个字节的高四位和低四位分别提取出来。然后,我们通过pshufb指令,将x0中每个字节的高四位和低四位分别作为索引,从乘法表中取出对应的字节,然后进行异或操作。

这也就是将原向量{x0[0],x0[1],...,x0[15]}转化为{Cx{x0[0]},Cx{x0[1]},...,Cx{x0[15]}}的过程,其中Cx{u}=c^u,由乘法表给出。

	SLDR 	vec, vec_m
	cmp	vec_i, vec
	jl	.next_vect

	SLDR 	dest, dest_m
	XSTR	[dest+pos], xp

	add	pos, 16			;Loop on 16 bytes at a time
	SLDR 	len, len_m
	cmp	pos, len
	jle	.loop16

	lea	tmp, [len + 16]
	cmp	pos, tmp
	je	.return_pass

	;; Tail len
	mov	pos, len	;Overlapped offset length-16
	jmp	.loop16		;Do one more overlap pass

接下来,就是把获得的数据存入dest指针指向的空间,和一些边界条件的判断。

注意这里有两层循环,外层循环是对于每一个源数据块,内层循环是对于每16个字节。

那么,我们现在可以总结一下这个函数的作用了。它的作用是,对于每一个数据块,对于每一个字节,将这个字节的高四位和低四位分别作为索引,从乘法表中取出对应的字节,然后进行异或操作,最后将结果存入dest指针指向的空间。

为什么这样就可以实现点积呢?

我们先看一下C语言的实现:

void gf_vect_dot_prod_base(int len, int vlen, unsigned char *v,
			   unsigned char **src, unsigned char *dest)
{
	int i, j;
	unsigned char s;
	for (i = 0; i < len; i++) {
		s = 0;
		for (j = 0; j < vlen; j++)
			s ^= gf_mul(src[j][i], v[j * 32 + 1]);

		dest[i] = s;
	}
}

可以看出,这个函数的作用是,循环的将第i个向量的第j个字节和v[j*32+1]进行乘法运算,然后进行异或操作,最后将结果存入dest指针指向的空间。v[j*32+1]是乘法表中的第j行的第1个元素,也就是Cx{1}

results matching ""

    No results matching ""