Clear second sector of the journal, init iov for callbacks

blocking-uring-test
Vitaliy Filippov 2019-11-21 22:04:44 +03:00
parent 201eeb8516
commit 7e87290fca
3 changed files with 16 additions and 6 deletions

View File

@ -342,12 +342,14 @@ resume_0:
// Sync batch is ready. Do it. // Sync batch is ready. Do it.
await_sqe(9); await_sqe(9);
data->callback = simple_callback; data->callback = simple_callback;
data->iov = { 0 };
my_uring_prep_fsync(sqe, bs->data_fd, 0); my_uring_prep_fsync(sqe, bs->data_fd, 0);
wait_count++; wait_count++;
if (bs->meta_fd != bs->data_fd) if (bs->meta_fd != bs->data_fd)
{ {
await_sqe(10); await_sqe(10);
data->callback = simple_callback; data->callback = simple_callback;
data->iov = { 0 };
my_uring_prep_fsync(sqe, bs->meta_fd, 0); my_uring_prep_fsync(sqe, bs->meta_fd, 0);
wait_count++; wait_count++;
} }

View File

@ -109,6 +109,7 @@ blockstore_init_journal::blockstore_init_journal(blockstore *bs)
this->bs = bs; this->bs = bs;
simple_callback = [this](ring_data_t *data1) simple_callback = [this](ring_data_t *data1)
{ {
printf("%d %d\n", data1->res, data1->iov.iov_len);
if (data1->res != data1->iov.iov_len) if (data1->res != data1->iov.iov_len)
{ {
throw std::runtime_error(std::string("I/O operation failed while reading journal: ") + strerror(-data1->res)); throw std::runtime_error(std::string("I/O operation failed while reading journal: ") + strerror(-data1->res));
@ -184,29 +185,33 @@ resume_1:
if (iszero((uint64_t*)journal_buffer, 3)) if (iszero((uint64_t*)journal_buffer, 3))
{ {
// Journal is empty // Journal is empty
// FIXME handle this wrapping to 512 better ... and align it to 4096 // FIXME handle this wrapping to 512 better
bs->journal.used_start = 512; bs->journal.used_start = 512;
bs->journal.next_free = 512; bs->journal.next_free = 512;
// Initialize journal "superblock" // Initialize journal "superblock" and the first block
// Cool effect. Same operations result in journal replay.
// FIXME: Randomize initial crc32. Track crc32 when trimming.
GET_SQE(); GET_SQE();
memset(journal_buffer, 0, 512); memset(journal_buffer, 0, 1024);
*((journal_entry_start*)journal_buffer) = { *((journal_entry_start*)journal_buffer) = {
.crc32 = 0, .crc32 = 0,
.magic = JOURNAL_MAGIC, .magic = JOURNAL_MAGIC,
.type = JE_START, .type = JE_START,
.size = sizeof(journal_entry_start), .size = sizeof(journal_entry_start),
.reserved = 0, .reserved = 0,
.journal_start = bs->journal.used_start, .journal_start = 512,
}; };
((journal_entry_start*)journal_buffer)->crc32 = je_crc32((journal_entry*)journal_buffer); ((journal_entry_start*)journal_buffer)->crc32 = je_crc32((journal_entry*)journal_buffer);
data->iov = (struct iovec){ journal_buffer, 512 }; data->iov = (struct iovec){ journal_buffer, 1024 };
data->callback = simple_callback; data->callback = simple_callback;
my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset); my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
wait_count++; wait_count++;
GET_SQE(); GET_SQE();
my_uring_prep_fsync(sqe, bs->journal.fd, 0); my_uring_prep_fsync(sqe, bs->journal.fd, 0);
data->iov = { 0 };
data->callback = simple_callback; data->callback = simple_callback;
wait_count++; wait_count++;
printf("Resetting journal\n");
bs->ringloop->submit(); bs->ringloop->submit();
resume_4: resume_4:
if (wait_count > 0) if (wait_count > 0)

View File

@ -43,6 +43,7 @@ int blockstore::continue_sync(blockstore_operation *op)
// No big writes, just fsync the journal // No big writes, just fsync the journal
BS_SUBMIT_GET_SQE(sqe, data); BS_SUBMIT_GET_SQE(sqe, data);
my_uring_prep_fsync(sqe, journal.fd, 0); my_uring_prep_fsync(sqe, journal.fd, 0);
data->iov = { 0 };
data->callback = cb; data->callback = cb;
op->pending_ops = 1; op->pending_ops = 1;
op->sync_state = SYNC_JOURNAL_SYNC_SENT; op->sync_state = SYNC_JOURNAL_SYNC_SENT;
@ -52,6 +53,7 @@ int blockstore::continue_sync(blockstore_operation *op)
// 1st step: fsync data // 1st step: fsync data
BS_SUBMIT_GET_SQE(sqe, data); BS_SUBMIT_GET_SQE(sqe, data);
my_uring_prep_fsync(sqe, data_fd, 0); my_uring_prep_fsync(sqe, data_fd, 0);
data->iov = { 0 };
data->callback = cb; data->callback = cb;
op->pending_ops = 1; op->pending_ops = 1;
op->sync_state = SYNC_DATA_SYNC_SENT; op->sync_state = SYNC_DATA_SYNC_SENT;
@ -98,6 +100,7 @@ int blockstore::continue_sync(blockstore_operation *op)
// ... And a journal fsync // ... And a journal fsync
my_uring_prep_fsync(sqe[s], journal.fd, 0); my_uring_prep_fsync(sqe[s], journal.fd, 0);
struct ring_data_t *data = ((ring_data_t*)sqe[s]->user_data); struct ring_data_t *data = ((ring_data_t*)sqe[s]->user_data);
data->iov = { 0 };
data->callback = cb; data->callback = cb;
op->pending_ops = 1 + s; op->pending_ops = 1 + s;
op->sync_state = SYNC_JOURNAL_SYNC_SENT; op->sync_state = SYNC_JOURNAL_SYNC_SENT;
@ -112,7 +115,7 @@ int blockstore::continue_sync(blockstore_operation *op)
void blockstore::handle_sync_event(ring_data_t *data, blockstore_operation *op) void blockstore::handle_sync_event(ring_data_t *data, blockstore_operation *op)
{ {
if (data->res < data->iov.iov_len) if (data->res != data->iov.iov_len)
{ {
throw std::runtime_error( throw std::runtime_error(
"write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+ "write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+