Browse Source

Verify data crc32 when reading journal

blocking-uring-test
Vitaliy Filippov 3 years ago
parent
commit
9260cd263a
  1. 1
      blockstore.cpp
  2. 2
      blockstore.h
  3. 196
      blockstore_init.cpp
  4. 19
      blockstore_init.h
  5. 1
      blockstore_journal.h

1
blockstore.cpp

@ -91,7 +91,6 @@ void blockstore::loop()
delete journal_init_reader;
journal_init_reader = NULL;
initialized = 10;
printf("journal read\n");
}
}
}

2
blockstore.h

@ -270,6 +270,8 @@ class blockstore
uint64_t meta_offset, meta_size, meta_area, meta_len;
uint64_t data_offset, data_size, data_len;
// FIXME: add readonly option
struct journal_t journal;
journal_flusher_t *flusher;

196
blockstore_init.cpp

@ -137,17 +137,18 @@ bool iszero(uint64_t *buf, int len)
void blockstore_init_journal::handle_event(ring_data_t *data1)
{
// Step 3: Read journal
if (data1->res < 0)
if (data1->res <= 0)
{
throw std::runtime_error(
std::string("read journal failed at offset ") + std::to_string(journal_pos) +
std::string(": ") + strerror(-data1->res)
);
}
done_pos = journal_pos;
done_buf = submitted;
done_len = data1->res;
done.push_back({
.buf = submitted_buf,
.pos = journal_pos,
.len = (uint64_t)data1->res,
});
journal_pos += data1->res;
if (journal_pos >= bs->journal.len)
{
@ -155,7 +156,7 @@ void blockstore_init_journal::handle_event(ring_data_t *data1)
journal_pos = 512;
wrapped = true;
}
submitted = 0;
submitted_buf = NULL;
}
#define GET_SQE() \
@ -174,21 +175,23 @@ int blockstore_init_journal::loop()
goto resume_3;
else if (wait_state == 4)
goto resume_4;
else if (wait_state == 5)
goto resume_5;
printf("Reading blockstore journal\n");
if (!bs->journal.inmemory)
{
journal_buffer = (uint8_t*)memalign(DISK_ALIGNMENT, 2*JOURNAL_BUFFER_SIZE);
if (!journal_buffer)
submitted_buf = memalign(512, 1024);
if (!submitted_buf)
throw std::bad_alloc();
}
else
journal_buffer = bs->journal.buffer;
submitted_buf = bs->journal.buffer;
// Read first block of the journal
sqe = bs->get_sqe();
if (!sqe)
throw std::runtime_error("io_uring is full while trying to read journal");
data = ((ring_data_t*)sqe->user_data);
data->iov = { journal_buffer, 512 };
data->iov = { submitted_buf, 512 };
data->callback = simple_callback;
my_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
bs->ringloop->submit();
@ -199,7 +202,7 @@ resume_1:
wait_state = 1;
return 1;
}
if (iszero((uint64_t*)journal_buffer, 3))
if (iszero((uint64_t*)submitted_buf, 3))
{
// Journal is empty
// FIXME handle this wrapping to 512 better
@ -209,8 +212,8 @@ resume_1:
// Cool effect. Same operations result in journal replay.
// FIXME: Randomize initial crc32. Track crc32 when trimming.
GET_SQE();
memset(journal_buffer, 0, 1024);
*((journal_entry_start*)journal_buffer) = {
memset(submitted_buf, 0, 1024);
*((journal_entry_start*)submitted_buf) = {
.crc32 = 0,
.magic = JOURNAL_MAGIC,
.type = JE_START,
@ -218,8 +221,8 @@ resume_1:
.reserved = 0,
.journal_start = 512,
};
((journal_entry_start*)journal_buffer)->crc32 = je_crc32((journal_entry*)journal_buffer);
data->iov = (struct iovec){ journal_buffer, 1024 };
((journal_entry_start*)submitted_buf)->crc32 = je_crc32((journal_entry*)submitted_buf);
data->iov = (struct iovec){ submitted_buf, 1024 };
data->callback = simple_callback;
my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
wait_count++;
@ -236,11 +239,13 @@ resume_1:
wait_state = 4;
return 1;
}
if (!bs->journal.inmemory)
free(submitted_buf);
}
else
{
// First block always contains a single JE_START entry
je_start = (journal_entry_start*)journal_buffer;
je_start = (journal_entry_start*)submitted_buf;
if (je_start->magic != JOURNAL_MAGIC ||
je_start->type != JE_START ||
je_start->size != sizeof(journal_entry_start) ||
@ -250,12 +255,15 @@ resume_1:
throw std::runtime_error("first entry of the journal is corrupt");
}
next_free = journal_pos = bs->journal.used_start = je_start->journal_start;
if (!bs->journal.inmemory)
free(submitted_buf);
submitted_buf = NULL;
crc32_last = 0;
// Read journal
while (1)
{
resume_2:
if (submitted)
if (submitted_buf)
{
wait_state = 2;
return 1;
@ -265,31 +273,71 @@ resume_1:
GET_SQE();
uint64_t end = bs->journal.len;
if (journal_pos < bs->journal.used_start)
{
end = bs->journal.used_start;
}
if (!bs->journal.inmemory)
submitted_buf = memalign(512, JOURNAL_BUFFER_SIZE);
else
submitted_buf = bs->journal.buffer + journal_pos;
data->iov = {
journal_buffer + (bs->journal.inmemory ? journal_pos : (done_buf == 1 ? JOURNAL_BUFFER_SIZE : 0)),
submitted_buf,
end - journal_pos < JOURNAL_BUFFER_SIZE ? end - journal_pos : JOURNAL_BUFFER_SIZE,
};
data->callback = [this](ring_data_t *data1) { handle_event(data1); };
my_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + journal_pos);
bs->ringloop->submit();
submitted = done_buf == 1 ? 2 : 1;
}
if (done_buf && handle_journal_part(journal_buffer + (bs->journal.inmemory
? done_pos
: (done_buf == 1 ? 0 : JOURNAL_BUFFER_SIZE)), done_len) == 0)
while (done.size() > 0)
{
// journal ended. wait for the next read to complete, then stop
resume_3:
if (submitted)
handle_res = handle_journal_part(done[0].buf, done[0].pos, done[0].len);
if (handle_res == 0)
{
// journal ended
// zero out corrupted entry, if required
if (init_write_buf)
{
GET_SQE();
data->iov = { init_write_buf, 512 };
data->callback = simple_callback;
wait_count++;
my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + init_write_sector);
bs->ringloop->submit();
resume_5:
if (wait_count > 0)
{
wait_state = 5;
return 1;
}
}
// wait for the next read to complete, then stop
resume_3:
if (submitted_buf)
{
wait_state = 3;
return 1;
}
// free buffers
if (!bs->journal.inmemory)
for (auto & e: done)
free(e.buf);
done.clear();
break;
}
else if (handle_res == 1)
{
wait_state = 3;
return 1;
// OK, remove it
if (!bs->journal.inmemory)
{
free(done[0].buf);
}
done.erase(done.begin());
}
else if (handle_res == 2)
{
// Need to wait for more reads
break;
}
}
if (!submitted)
if (!submitted_buf)
{
break;
}
@ -298,25 +346,30 @@ resume_1:
// Trim journal on start so we don't stall when all entries are older
bs->journal.trim();
printf("Journal entries loaded: %lu, free blocks: %lu / %lu\n", entries_loaded, bs->data_alloc->get_free_count(), bs->block_count);
if (!bs->journal.inmemory)
{
free(journal_buffer);
}
bs->journal.crc32_last = crc32_last;
journal_buffer = NULL;
return 0;
}
int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, uint64_t len)
{
uint64_t proc_pos, pos;
if (continue_pos != 0)
{
proc_pos = (continue_pos / 512) * 512;
pos = continue_pos % 512;
continue_pos = 0;
goto resume;
}
while (next_free >= done_pos && next_free < done_pos+len)
{
uint64_t proc_pos = next_free, pos = 0;
proc_pos = next_free;
pos = 0;
next_free += 512;
if (next_free >= bs->journal.len)
{
next_free = 512;
}
resume:
while (pos < 512)
{
journal_entry *je = (journal_entry*)((uint8_t*)buf + proc_pos - done_pos + pos);
@ -335,12 +388,13 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
break;
}
}
started = true;
pos += je->size;
crc32_last = je->crc32;
if (je->type == JE_SMALL_WRITE)
{
#ifdef BLOCKSTORE_DEBUG
printf("je_small_write oid=%lu:%lu ver=%lu offset=%u len=%u\n", je->small_write.oid.inode, je->small_write.oid.stripe, je->small_write.version, je->small_write.offset, je->small_write.len);
#endif
// oid, version, offset, len
uint64_t prev_free = next_free;
if (next_free + je->small_write.len > bs->journal.len)
{
// data continues from the beginning of the journal
@ -358,6 +412,45 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
snprintf(err, 1024, "BUG: calculated journal data offset (%lu) != stored journal data offset (%lu)", location, je->small_write.data_offset);
throw std::runtime_error(err);
}
uint32_t data_crc32 = 0;
if (location >= done_pos && location+je->small_write.len <= done_pos+len)
{
// data is within this buffer
data_crc32 = crc32c(0, buf + location - done_pos, je->small_write.len);
}
else
{
// this case is even more interesting because we must carry data crc32 check to next buffer(s)
uint64_t covered = 0;
for (int i = 0; i < done.size(); i++)
{
if (location+je->small_write.len > done[i].pos &&
location < done[i].pos+done[i].len)
{
uint64_t part_end = (location+je->small_write.len < done[i].pos+done[i].len
? location+je->small_write.len : done[i].pos+done[i].len);
uint64_t part_begin = (location < done[i].pos ? done[i].pos : location);
covered += part_end - part_begin;
data_crc32 = crc32c(data_crc32, done[i].buf + part_begin - done[i].pos, part_end - part_begin);
}
}
if (covered < je->small_write.len)
{
continue_pos = proc_pos+pos;
next_free = prev_free;
return 2;
}
}
if (data_crc32 != je->small_write.crc32_data)
{
// journal entry is corrupt, stop here
// interesting thing is that we must clear the corrupt entry if we're not readonly
memset(buf + proc_pos - done_pos + pos, 0, 512 - pos);
bs->journal.next_free = prev_free;
init_write_buf = buf + proc_pos - done_pos;
init_write_sector = proc_pos;
return 0;
}
auto clean_it = bs->clean_db.find(je->small_write.oid);
if (clean_it == bs->clean_db.end() ||
clean_it->second.version < je->big_write.version)
@ -366,9 +459,6 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
.oid = je->small_write.oid,
.version = je->small_write.version,
};
#ifdef BLOCKSTORE_DEBUG
printf("je_small_write oid=%lu:%lu ver=%lu offset=%u len=%u\n", ov.oid.inode, ov.oid.stripe, ov.version, je->small_write.offset, je->small_write.len);
#endif
bs->dirty_db.emplace(ov, (dirty_entry){
.state = ST_J_SYNCED,
.flags = 0,
@ -387,6 +477,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
}
else if (je->type == JE_BIG_WRITE)
{
#ifdef BLOCKSTORE_DEBUG
printf("je_big_write oid=%lu:%lu ver=%lu\n", je->big_write.oid.inode, je->big_write.oid.stripe, je->big_write.version);
#endif
auto clean_it = bs->clean_db.find(je->big_write.oid);
if (clean_it == bs->clean_db.end() ||
clean_it->second.version < je->big_write.version)
@ -396,9 +489,6 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
.oid = je->big_write.oid,
.version = je->big_write.version,
};
#ifdef BLOCKSTORE_DEBUG
printf("je_big_write oid=%lu:%lu ver=%lu\n", ov.oid.inode, ov.oid.stripe, ov.version);
#endif
bs->dirty_db.emplace(ov, (dirty_entry){
.state = ST_D_META_SYNCED,
.flags = 0,
@ -418,6 +508,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
}
else if (je->type == JE_STABLE)
{
#ifdef BLOCKSTORE_DEBUG
printf("je_stable oid=%lu:%lu ver=%lu\n", je->stable.oid.inode, je->stable.oid.stripe, je->stable.version);
#endif
// oid, version
obj_ver_id ov = {
.oid = je->stable.oid,
@ -428,13 +521,10 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
{
// journal contains a legitimate STABLE entry for a non-existing dirty write
// this probably means that journal was trimmed between WRITTEN and STABLE entries
// skip for now. but FIXME: maybe warn about it in the future
// skip it
}
else
{
#ifdef BLOCKSTORE_DEBUG
printf("je_stable oid=%lu:%lu ver=%lu\n", ov.oid.inode, ov.oid.stripe, ov.version);
#endif
while (1)
{
it->second.state = (it->second.state == ST_D_META_SYNCED
@ -456,6 +546,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
}
else if (je->type == JE_DELETE)
{
#ifdef BLOCKSTORE_DEBUG
printf("je_delete oid=%lu:%lu ver=%lu\n", je->del.oid.inode, je->del.oid.stripe, je->del.version);
#endif
// oid, version
obj_ver_id ov = {
.oid = je->del.oid,
@ -471,6 +564,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
});
bs->journal.used_sectors[proc_pos]++;
}
started = true;
pos += je->size;
crc32_last = je->crc32;
entries_loaded++;
}
}

19
blockstore_init.h

@ -18,23 +18,32 @@ public:
int loop();
};
struct bs_init_journal_done
{
void *buf;
uint64_t pos, len;
};
class blockstore_init_journal
{
blockstore *bs;
int wait_state = 0, wait_count = 0;
int wait_state = 0, wait_count = 0, handle_res = 0;
uint64_t entries_loaded = 0;
void *journal_buffer = NULL;
uint32_t crc32_last = 0;
bool started = false;
uint64_t done_pos = 0, journal_pos = 0;
uint64_t next_free = 512;
std::vector<bs_init_journal_done> done;
uint64_t journal_pos = 0;
uint64_t continue_pos = 0;
void *init_write_buf = NULL;
uint64_t init_write_sector = 0;
bool wrapped = false;
int submitted = 0, done_buf = 0, done_len = 0;
void *submitted_buf;
struct io_uring_sqe *sqe;
struct ring_data_t *data;
journal_entry_start *je_start;
std::function<void(ring_data_t*)> simple_callback;
int handle_journal_part(void *buf, uint64_t len);
int handle_journal_part(void *buf, uint64_t done_pos, uint64_t len);
void handle_event(ring_data_t *data);
public:
blockstore_init_journal(blockstore* bs);

1
blockstore_journal.h

@ -41,7 +41,6 @@ struct __attribute__((__packed__)) journal_entry_small_write
// data_offset is its offset within journal
uint64_t data_offset;
uint32_t crc32_data;
// FIXME verify data crc32c
};
struct __attribute__((__packed__)) journal_entry_big_write

Loading…
Cancel
Save