reedsolomon-go/streaming_test.go

703 lines
16 KiB
Go

/**
* Unit tests for ReedSolomon Streaming API
*
* Copyright 2015, Klaus Post
*/
package reedsolomon
import (
"bytes"
"io"
"io/ioutil"
"math/rand"
"testing"
)
func TestStreamEncoding(t *testing.T) {
perShard := 10 << 20
if testing.Short() {
perShard = 50000
}
r, err := NewStream(10, 3, testOptions()...)
if err != nil {
t.Fatal(err)
}
rand.Seed(0)
input := randomBytes(10, perShard)
data := toBuffers(input)
par := emptyBuffers(3)
err = r.Encode(toReaders(data), toWriters(par))
if err != nil {
t.Fatal(err)
}
// Reset Data
data = toBuffers(input)
all := append(toReaders(data), toReaders(par)...)
ok, err := r.Verify(all)
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatal("Verification failed")
}
err = r.Encode(toReaders(emptyBuffers(1)), toWriters(emptyBuffers(1)))
if err != ErrTooFewShards {
t.Errorf("expected %v, got %v", ErrTooFewShards, err)
}
err = r.Encode(toReaders(emptyBuffers(10)), toWriters(emptyBuffers(1)))
if err != ErrTooFewShards {
t.Errorf("expected %v, got %v", ErrTooFewShards, err)
}
err = r.Encode(toReaders(emptyBuffers(10)), toWriters(emptyBuffers(3)))
if err != ErrShardNoData {
t.Errorf("expected %v, got %v", ErrShardNoData, err)
}
badShards := emptyBuffers(10)
badShards[0] = randomBuffer(123)
err = r.Encode(toReaders(badShards), toWriters(emptyBuffers(3)))
if err != ErrShardSize {
t.Errorf("expected %v, got %v", ErrShardSize, err)
}
}
func TestStreamEncodingConcurrent(t *testing.T) {
perShard := 10 << 20
if testing.Short() {
perShard = 50000
}
r, err := NewStreamC(10, 3, true, true, testOptions()...)
if err != nil {
t.Fatal(err)
}
rand.Seed(0)
input := randomBytes(10, perShard)
data := toBuffers(input)
par := emptyBuffers(3)
err = r.Encode(toReaders(data), toWriters(par))
if err != nil {
t.Fatal(err)
}
// Reset Data
data = toBuffers(input)
all := append(toReaders(data), toReaders(par)...)
ok, err := r.Verify(all)
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatal("Verification failed")
}
err = r.Encode(toReaders(emptyBuffers(1)), toWriters(emptyBuffers(1)))
if err != ErrTooFewShards {
t.Errorf("expected %v, got %v", ErrTooFewShards, err)
}
err = r.Encode(toReaders(emptyBuffers(10)), toWriters(emptyBuffers(1)))
if err != ErrTooFewShards {
t.Errorf("expected %v, got %v", ErrTooFewShards, err)
}
err = r.Encode(toReaders(emptyBuffers(10)), toWriters(emptyBuffers(3)))
if err != ErrShardNoData {
t.Errorf("expected %v, got %v", ErrShardNoData, err)
}
badShards := emptyBuffers(10)
badShards[0] = randomBuffer(123)
badShards[1] = randomBuffer(123)
err = r.Encode(toReaders(badShards), toWriters(emptyBuffers(3)))
if err != ErrShardSize {
t.Errorf("expected %v, got %v", ErrShardSize, err)
}
}
func TestStreamZeroParity(t *testing.T) {
perShard := 10 << 20
if testing.Short() {
perShard = 50000
}
r, err := NewStream(10, 0, testOptions()...)
if err != nil {
t.Fatal(err)
}
rand.Seed(0)
input := randomBytes(10, perShard)
data := toBuffers(input)
err = r.Encode(toReaders(data), []io.Writer{})
if err != nil {
t.Fatal(err)
}
// Reset Data
data = toBuffers(input)
all := toReaders(data)
ok, err := r.Verify(all)
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatal("Verification failed")
}
// Reset Data
data = toBuffers(input)
// Check that Reconstruct does nothing
all = toReaders(data)
err = r.Reconstruct(all, nilWriters(10))
if err != nil {
t.Fatal(err)
}
}
func TestStreamZeroParityConcurrent(t *testing.T) {
perShard := 10 << 20
if testing.Short() {
perShard = 50000
}
r, err := NewStreamC(10, 0, true, true, testOptions()...)
if err != nil {
t.Fatal(err)
}
rand.Seed(0)
input := randomBytes(10, perShard)
data := toBuffers(input)
err = r.Encode(toReaders(data), []io.Writer{})
if err != nil {
t.Fatal(err)
}
// Reset Data
data = toBuffers(input)
all := toReaders(data)
ok, err := r.Verify(all)
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatal("Verification failed")
}
// Reset Data
data = toBuffers(input)
// Check that Reconstruct does nothing
all = toReaders(data)
err = r.Reconstruct(all, nilWriters(10))
if err != nil {
t.Fatal(err)
}
}
func randomBuffer(length int) *bytes.Buffer {
b := make([]byte, length)
fillRandom(b)
return bytes.NewBuffer(b)
}
func randomBytes(n, length int) [][]byte {
bufs := make([][]byte, n)
for j := range bufs {
bufs[j] = make([]byte, length)
fillRandom(bufs[j])
}
return bufs
}
func toBuffers(in [][]byte) []*bytes.Buffer {
out := make([]*bytes.Buffer, len(in))
for i := range in {
out[i] = bytes.NewBuffer(in[i])
}
return out
}
func toReaders(in []*bytes.Buffer) []io.Reader {
out := make([]io.Reader, len(in))
for i := range in {
out[i] = in[i]
}
return out
}
func toWriters(in []*bytes.Buffer) []io.Writer {
out := make([]io.Writer, len(in))
for i := range in {
out[i] = in[i]
}
return out
}
func nilWriters(n int) []io.Writer {
out := make([]io.Writer, n)
for i := range out {
out[i] = nil
}
return out
}
func emptyBuffers(n int) []*bytes.Buffer {
b := make([]*bytes.Buffer, n)
for i := range b {
b[i] = &bytes.Buffer{}
}
return b
}
func toBytes(in []*bytes.Buffer) [][]byte {
b := make([][]byte, len(in))
for i := range in {
b[i] = in[i].Bytes()
}
return b
}
func TestStreamReconstruct(t *testing.T) {
perShard := 10 << 20
if testing.Short() {
perShard = 50000
}
r, err := NewStream(10, 3, testOptions()...)
if err != nil {
t.Fatal(err)
}
rand.Seed(0)
shards := randomBytes(10, perShard)
parb := emptyBuffers(3)
err = r.Encode(toReaders(toBuffers(shards)), toWriters(parb))
if err != nil {
t.Fatal(err)
}
parity := toBytes(parb)
all := append(toReaders(toBuffers(shards)), toReaders(toBuffers(parity))...)
fill := make([]io.Writer, 13)
// Reconstruct with all shards present, all fill nil
err = r.Reconstruct(all, fill)
if err != nil {
t.Fatal(err)
}
all = append(toReaders(toBuffers(shards)), toReaders(toBuffers(parity))...)
// Reconstruct with 10 shards present, asking for all shards to be reconstructed
all[0] = nil
fill[0] = emptyBuffers(1)[0]
all[7] = nil
fill[7] = emptyBuffers(1)[0]
all[11] = nil
fill[11] = emptyBuffers(1)[0]
err = r.Reconstruct(all, fill)
if err != nil {
t.Fatal(err)
}
shards[0] = fill[0].(*bytes.Buffer).Bytes()
shards[7] = fill[7].(*bytes.Buffer).Bytes()
parity[1] = fill[11].(*bytes.Buffer).Bytes()
all = append(toReaders(toBuffers(shards)), toReaders(toBuffers(parity))...)
ok, err := r.Verify(all)
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatal("Verification failed")
}
all = append(toReaders(toBuffers(shards)), toReaders(toBuffers(parity))...)
// Reconstruct with 10 shards present, asking for just data shards to be reconstructed
all[0] = nil
fill[0] = emptyBuffers(1)[0]
all[7] = nil
fill[7] = emptyBuffers(1)[0]
all[11] = nil
fill[11] = nil
err = r.Reconstruct(all, fill)
if err != nil {
t.Fatal(err)
}
if fill[11] != nil {
t.Fatal("Unexpected parity block reconstructed")
}
all = append(toReaders(toBuffers(shards)), toReaders(toBuffers(parity))...)
// Reconstruct with 9 shards present (should fail)
all[0] = nil
fill[0] = emptyBuffers(1)[0]
all[4] = nil
fill[4] = emptyBuffers(1)[0]
all[7] = nil
fill[7] = emptyBuffers(1)[0]
all[11] = nil
fill[11] = emptyBuffers(1)[0]
err = r.Reconstruct(all, fill)
if err != ErrTooFewShards {
t.Errorf("expected %v, got %v", ErrTooFewShards, err)
}
err = r.Reconstruct(toReaders(emptyBuffers(3)), toWriters(emptyBuffers(3)))
if err != ErrTooFewShards {
t.Errorf("expected %v, got %v", ErrTooFewShards, err)
}
err = r.Reconstruct(toReaders(emptyBuffers(13)), toWriters(emptyBuffers(3)))
if err != ErrTooFewShards {
t.Errorf("expected %v, got %v", ErrTooFewShards, err)
}
err = r.Reconstruct(toReaders(emptyBuffers(13)), toWriters(emptyBuffers(13)))
if err != ErrReconstructMismatch {
t.Errorf("expected %v, got %v", ErrReconstructMismatch, err)
}
err = r.Reconstruct(toReaders(emptyBuffers(13)), nilWriters(13))
if err != ErrShardNoData {
t.Errorf("expected %v, got %v", ErrShardNoData, err)
}
}
func TestStreamVerify(t *testing.T) {
perShard := 10 << 20
if testing.Short() {
perShard = 50000
}
r, err := NewStream(10, 4, testOptions()...)
if err != nil {
t.Fatal(err)
}
shards := randomBytes(10, perShard)
parb := emptyBuffers(4)
err = r.Encode(toReaders(toBuffers(shards)), toWriters(parb))
if err != nil {
t.Fatal(err)
}
parity := toBytes(parb)
all := append(toReaders(toBuffers(shards)), toReaders(parb)...)
ok, err := r.Verify(all)
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatal("Verification failed")
}
// Flip bits in a random byte
parity[0][len(parity[0])-20000] = parity[0][len(parity[0])-20000] ^ 0xff
all = append(toReaders(toBuffers(shards)), toReaders(toBuffers(parity))...)
ok, err = r.Verify(all)
if err != nil {
t.Fatal(err)
}
if ok {
t.Fatal("Verification did not fail")
}
// Re-encode
err = r.Encode(toReaders(toBuffers(shards)), toWriters(parb))
if err != nil {
t.Fatal(err)
}
// Fill a data segment with random data
shards[0][len(shards[0])-30000] = shards[0][len(shards[0])-30000] ^ 0xff
all = append(toReaders(toBuffers(shards)), toReaders(parb)...)
ok, err = r.Verify(all)
if err != nil {
t.Fatal(err)
}
if ok {
t.Fatal("Verification did not fail")
}
_, err = r.Verify(toReaders(emptyBuffers(10)))
if err != ErrTooFewShards {
t.Errorf("expected %v, got %v", ErrTooFewShards, err)
}
_, err = r.Verify(toReaders(emptyBuffers(14)))
if err != ErrShardNoData {
t.Errorf("expected %v, got %v", ErrShardNoData, err)
}
}
func TestStreamOneEncode(t *testing.T) {
codec, err := NewStream(5, 5, testOptions()...)
if err != nil {
t.Fatal(err)
}
shards := [][]byte{
{0, 1},
{4, 5},
{2, 3},
{6, 7},
{8, 9},
}
parb := emptyBuffers(5)
codec.Encode(toReaders(toBuffers(shards)), toWriters(parb))
parity := toBytes(parb)
if parity[0][0] != 12 || parity[0][1] != 13 {
t.Fatal("shard 5 mismatch")
}
if parity[1][0] != 10 || parity[1][1] != 11 {
t.Fatal("shard 6 mismatch")
}
if parity[2][0] != 14 || parity[2][1] != 15 {
t.Fatal("shard 7 mismatch")
}
if parity[3][0] != 90 || parity[3][1] != 91 {
t.Fatal("shard 8 mismatch")
}
if parity[4][0] != 94 || parity[4][1] != 95 {
t.Fatal("shard 9 mismatch")
}
all := append(toReaders(toBuffers(shards)), toReaders(toBuffers(parity))...)
ok, err := codec.Verify(all)
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatal("did not verify")
}
shards[3][0]++
all = append(toReaders(toBuffers(shards)), toReaders(toBuffers(parity))...)
ok, err = codec.Verify(all)
if err != nil {
t.Fatal(err)
}
if ok {
t.Fatal("verify did not fail as expected")
}
}
func benchmarkStreamEncode(b *testing.B, dataShards, parityShards, shardSize int) {
r, err := NewStream(dataShards, parityShards, testOptions(WithAutoGoroutines(shardSize))...)
if err != nil {
b.Fatal(err)
}
shards := make([][]byte, dataShards)
for s := range shards {
shards[s] = make([]byte, shardSize)
}
rand.Seed(0)
for s := 0; s < dataShards; s++ {
fillRandom(shards[s])
}
b.SetBytes(int64(shardSize * dataShards))
b.ResetTimer()
out := make([]io.Writer, parityShards)
for i := range out {
out[i] = ioutil.Discard
}
for i := 0; i < b.N; i++ {
err = r.Encode(toReaders(toBuffers(shards)), out)
if err != nil {
b.Fatal(err)
}
}
}
func BenchmarkStreamEncode10x2x10000(b *testing.B) {
benchmarkStreamEncode(b, 10, 2, 10000)
}
func BenchmarkStreamEncode100x20x10000(b *testing.B) {
benchmarkStreamEncode(b, 100, 20, 10000)
}
func BenchmarkStreamEncode17x3x1M(b *testing.B) {
benchmarkStreamEncode(b, 17, 3, 1024*1024)
}
// Benchmark 10 data shards and 4 parity shards with 16MB each.
func BenchmarkStreamEncode10x4x16M(b *testing.B) {
benchmarkStreamEncode(b, 10, 4, 16*1024*1024)
}
// Benchmark 5 data shards and 2 parity shards with 1MB each.
func BenchmarkStreamEncode5x2x1M(b *testing.B) {
benchmarkStreamEncode(b, 5, 2, 1024*1024)
}
// Benchmark 1 data shards and 2 parity shards with 1MB each.
func BenchmarkStreamEncode10x2x1M(b *testing.B) {
benchmarkStreamEncode(b, 10, 2, 1024*1024)
}
// Benchmark 10 data shards and 4 parity shards with 1MB each.
func BenchmarkStreamEncode10x4x1M(b *testing.B) {
benchmarkStreamEncode(b, 10, 4, 1024*1024)
}
// Benchmark 50 data shards and 20 parity shards with 1MB each.
func BenchmarkStreamEncode50x20x1M(b *testing.B) {
benchmarkStreamEncode(b, 50, 20, 1024*1024)
}
// Benchmark 17 data shards and 3 parity shards with 16MB each.
func BenchmarkStreamEncode17x3x16M(b *testing.B) {
benchmarkStreamEncode(b, 17, 3, 16*1024*1024)
}
func benchmarkStreamVerify(b *testing.B, dataShards, parityShards, shardSize int) {
r, err := NewStream(dataShards, parityShards, testOptions(WithAutoGoroutines(shardSize))...)
if err != nil {
b.Fatal(err)
}
shards := make([][]byte, parityShards+dataShards)
for s := range shards {
shards[s] = make([]byte, shardSize)
}
rand.Seed(0)
for s := 0; s < dataShards; s++ {
fillRandom(shards[s])
}
err = r.Encode(toReaders(toBuffers(shards[:dataShards])), toWriters(toBuffers(shards[dataShards:])))
if err != nil {
b.Fatal(err)
}
b.SetBytes(int64(shardSize * dataShards))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err = r.Verify(toReaders(toBuffers(shards)))
if err != nil {
b.Fatal(err)
}
}
}
// Benchmark 10 data slices with 2 parity slices holding 10000 bytes each
func BenchmarkStreamVerify10x2x10000(b *testing.B) {
benchmarkStreamVerify(b, 10, 2, 10000)
}
// Benchmark 50 data slices with 5 parity slices holding 100000 bytes each
func BenchmarkStreamVerify50x5x50000(b *testing.B) {
benchmarkStreamVerify(b, 50, 5, 100000)
}
// Benchmark 10 data slices with 2 parity slices holding 1MB bytes each
func BenchmarkStreamVerify10x2x1M(b *testing.B) {
benchmarkStreamVerify(b, 10, 2, 1024*1024)
}
// Benchmark 5 data slices with 2 parity slices holding 1MB bytes each
func BenchmarkStreamVerify5x2x1M(b *testing.B) {
benchmarkStreamVerify(b, 5, 2, 1024*1024)
}
// Benchmark 10 data slices with 4 parity slices holding 1MB bytes each
func BenchmarkStreamVerify10x4x1M(b *testing.B) {
benchmarkStreamVerify(b, 10, 4, 1024*1024)
}
// Benchmark 5 data slices with 2 parity slices holding 1MB bytes each
func BenchmarkStreamVerify50x20x1M(b *testing.B) {
benchmarkStreamVerify(b, 50, 20, 1024*1024)
}
// Benchmark 10 data slices with 4 parity slices holding 16MB bytes each
func BenchmarkStreamVerify10x4x16M(b *testing.B) {
benchmarkStreamVerify(b, 10, 4, 16*1024*1024)
}
func TestStreamSplitJoin(t *testing.T) {
var data = make([]byte, 250000)
rand.Seed(0)
fillRandom(data)
enc, _ := NewStream(5, 3, testOptions()...)
split := emptyBuffers(5)
err := enc.Split(bytes.NewBuffer(data), toWriters(split), int64(len(data)))
if err != nil {
t.Fatal(err)
}
splits := toBytes(split)
expect := len(data) / 5
// Beware, if changing data size
if split[0].Len() != expect {
t.Errorf("unexpected size. expected %d, got %d", expect, split[0].Len())
}
err = enc.Split(bytes.NewBuffer([]byte{}), toWriters(emptyBuffers(3)), 0)
if err != ErrShortData {
t.Errorf("expected %v, got %v", ErrShortData, err)
}
buf := new(bytes.Buffer)
err = enc.Join(buf, toReaders(toBuffers(splits)), int64(len(data)))
if err != nil {
t.Fatal(err)
}
joined := buf.Bytes()
if !bytes.Equal(joined, data) {
t.Fatal("recovered data does match original", joined[:8], data[:8], "... lengths:", len(joined), len(data))
}
err = enc.Join(buf, toReaders(emptyBuffers(2)), 0)
if err != ErrTooFewShards {
t.Errorf("expected %v, got %v", ErrTooFewShards, err)
}
bufs := toReaders(emptyBuffers(5))
bufs[2] = nil
err = enc.Join(buf, bufs, 0)
if se, ok := err.(StreamReadError); ok {
if se.Err != ErrShardNoData {
t.Errorf("expected %v, got %v", ErrShardNoData, se.Err)
}
if se.Stream != 2 {
t.Errorf("Expected error on stream 2, got %d", se.Stream)
}
} else {
t.Errorf("expected error type %T, got %T", StreamReadError{}, err)
}
err = enc.Join(buf, toReaders(toBuffers(splits)), int64(len(data)+1))
if err != ErrShortData {
t.Errorf("expected %v, got %v", ErrShortData, err)
}
}
func TestNewStream(t *testing.T) {
tests := []struct {
data, parity int
err error
}{
{127, 127, nil},
{1, 0, nil},
{256, 256, ErrMaxShardNum},
{0, 1, ErrInvShardNum},
{1, -1, ErrInvShardNum},
{257, 1, ErrMaxShardNum},
// overflow causes r.Shards to be negative
{256, int(^uint(0) >> 1), errInvalidRowSize},
}
for _, test := range tests {
_, err := NewStream(test.data, test.parity, testOptions()...)
if err != test.err {
t.Errorf("New(%v, %v): expected %v, got %v", test.data, test.parity, test.err, err)
}
}
}