diff --git a/Makefile b/Makefile index 6c9dc837..9504f9aa 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ clean: crc32c.o: crc32c.c g++ -g -O3 -fPIC -c -o $@ $< %.o: %.cpp allocator.h blockstore_flush.h blockstore.h blockstore_init.h blockstore_journal.h crc32c.h ringloop.h xor.h timerfd_interval.h - g++ -g -O3 -Wall -Wno-sign-compare -Wno-parentheses -fPIC -c -o $@ $< + g++ -g -O3 -Wall -Wno-sign-compare -Wno-parentheses -Wno-pointer-arith -fPIC -c -o $@ $< test: test.cpp g++ -g -O3 -o test -luring test.cpp test_blockstore: $(BLOCKSTORE_OBJS) test_blockstore.cpp diff --git a/blockstore.cpp b/blockstore.cpp index 27639bfc..4cbcaee0 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -54,8 +54,6 @@ blockstore::~blockstore() close(meta_fd); if (journal.fd >= 0 && journal.fd != meta_fd) close(journal.fd); - free(journal.sector_buf); - free(journal.sector_info); } bool blockstore::is_started() diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index 6c96b0a4..29709517 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -10,7 +10,7 @@ journal_flusher_t::journal_flusher_t(int flusher_count, blockstore *bs) sync_threshold = flusher_count == 1 ? 1 : flusher_count/2; journal_trim_interval = sync_threshold; journal_trim_counter = 0; - journal_superblock = (uint8_t*)memalign(512, 512); + journal_superblock = bs->journal.inmemory ? bs->journal.buffer : memalign(512, 512); co = new journal_flusher_co[flusher_count]; for (int i = 0; i < flusher_count; i++) { @@ -48,7 +48,8 @@ journal_flusher_co::journal_flusher_co() journal_flusher_t::~journal_flusher_t() { - free(journal_superblock); + if (!bs->journal.inmemory) + free(journal_superblock); delete[] co; } @@ -176,6 +177,7 @@ resume_0: flusher->active_until_sync++; v.clear(); wait_count = 0; + copy_count = 0; clean_loc = UINT64_MAX; skip_copy = false; while (1) @@ -183,7 +185,6 @@ resume_0: if (dirty_it->second.state == ST_J_STABLE && !skip_copy) { // First we submit all reads - // FIXME: Introduce a (default) mode where we'll keep the whole journal in memory instead of re-reading data during flush offset = dirty_it->second.offset; len = dirty_it->second.len; it = v.begin(); @@ -194,15 +195,26 @@ resume_0: break; if (it == v.end() || it->offset > offset) { + submit_offset = dirty_it->second.location + offset - dirty_it->second.offset; submit_len = it == v.end() || it->offset >= offset+len ? len : it->offset-offset; - await_sqe(1); it = v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(512, submit_len) }); - data->iov = (struct iovec){ v.back().buf, (size_t)submit_len }; - data->callback = simple_callback_r; - my_uring_prep_readv( - sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + dirty_it->second.location + offset - dirty_it->second.offset - ); - wait_count++; + copy_count++; + if (bs->journal.inmemory) + { + // Take it from memory + memcpy(v.back().buf, bs->journal.buffer + submit_offset, submit_len); + } + else + { + // Read it from disk + await_sqe(1); + data->iov = (struct iovec){ v.back().buf, (size_t)submit_len }; + data->callback = simple_callback_r; + my_uring_prep_readv( + sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + submit_offset + ); + wait_count++; + } } if (it == v.end() || it->offset+it->len >= offset+len) { @@ -238,7 +250,7 @@ resume_0: break; } } - if (wait_count == 0 && clean_loc == UINT64_MAX) + if (copy_count == 0 && clean_loc == UINT64_MAX) { // Nothing to flush flusher->active_flushers--; diff --git a/blockstore_flush.h b/blockstore_flush.h index 93f1aed2..a8fca3dd 100644 --- a/blockstore_flush.h +++ b/blockstore_flush.h @@ -33,7 +33,8 @@ class journal_flusher_co std::map::iterator dirty_it, dirty_start, dirty_end; std::vector v; std::vector::iterator it; - uint64_t offset, len, submit_len, clean_loc, old_clean_loc, meta_sector, meta_pos; + int copy_count; + uint64_t offset, len, submit_offset, submit_len, clean_loc, old_clean_loc, meta_sector, meta_pos; std::map::iterator meta_it; std::map::iterator repeat_it; std::map::iterator journal_used_it; @@ -56,7 +57,7 @@ class journal_flusher_t friend class journal_flusher_co; int journal_trim_counter, journal_trim_interval; - uint8_t* journal_superblock; + void* journal_superblock; int active_flushers, active_until_sync; std::list syncs; diff --git a/blockstore_init.cpp b/blockstore_init.cpp index 491a3941..9638f522 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -167,7 +167,14 @@ int blockstore_init_journal::loop() goto resume_3; else if (wait_state == 4) goto resume_4; - journal_buffer = (uint8_t*)memalign(DISK_ALIGNMENT, 2*JOURNAL_BUFFER_SIZE); + if (!bs->journal.inmemory) + { + journal_buffer = (uint8_t*)memalign(DISK_ALIGNMENT, 2*JOURNAL_BUFFER_SIZE); + if (!journal_buffer) + throw std::bad_alloc(); + } + else + journal_buffer = bs->journal.buffer; // Read first block of the journal sqe = bs->get_sqe(); if (!sqe) @@ -254,7 +261,7 @@ resume_1: end = bs->journal.used_start; } data->iov = { - journal_buffer + (done_buf == 1 ? JOURNAL_BUFFER_SIZE : 0), + journal_buffer + (bs->journal.inmemory ? journal_pos : (done_buf == 1 ? JOURNAL_BUFFER_SIZE : 0)), end - journal_pos < JOURNAL_BUFFER_SIZE ? end - journal_pos : JOURNAL_BUFFER_SIZE, }; data->callback = [this](ring_data_t *data1) { handle_event(data1); }; @@ -262,7 +269,9 @@ resume_1: bs->ringloop->submit(); submitted = done_buf == 1 ? 2 : 1; } - if (done_buf && handle_journal_part(journal_buffer + (done_buf == 1 ? 0 : JOURNAL_BUFFER_SIZE), done_len) == 0) + if (done_buf && handle_journal_part(journal_buffer + (bs->journal.inmemory + ? done_pos + : (done_buf == 1 ? 0 : JOURNAL_BUFFER_SIZE)), done_len) == 0) { // journal ended. wait for the next read to complete, then stop resume_3: @@ -279,7 +288,10 @@ resume_1: } } printf("Journal entries loaded: %d\n", entries_loaded); - free(journal_buffer); + if (!bs->journal.inmemory) + { + free(journal_buffer); + } bs->journal.crc32_last = crc32_last; journal_buffer = NULL; return 0; diff --git a/blockstore_init.h b/blockstore_init.h index 65feb671..db9ee77c 100644 --- a/blockstore_init.h +++ b/blockstore_init.h @@ -22,7 +22,7 @@ class blockstore_init_journal blockstore *bs; int wait_state = 0, wait_count = 0; int entries_loaded = 0; - uint8_t *journal_buffer = NULL; + void *journal_buffer = NULL; uint32_t crc32_last = 0; bool started = false; uint64_t done_pos = 0, journal_pos = 0; diff --git a/blockstore_journal.cpp b/blockstore_journal.cpp index 0c5451e7..b2c3bb19 100644 --- a/blockstore_journal.cpp +++ b/blockstore_journal.cpp @@ -76,10 +76,14 @@ journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, journal.sector_info[journal.cur_sector].offset = journal.next_free; journal.in_sector_pos = 0; journal.next_free = (journal.next_free+512) < journal.len ? journal.next_free + 512 : 512; - memset(journal.sector_buf + 512*journal.cur_sector, 0, 512); + memset(journal.inmemory + ? journal.buffer + journal.sector_info[journal.cur_sector].offset + : journal.sector_buf + 512*journal.cur_sector, 0, 512); } journal_entry *je = (struct journal_entry*)( - journal.sector_buf + 512*journal.cur_sector + journal.in_sector_pos + (journal.inmemory + ? journal.buffer + journal.sector_info[journal.cur_sector].offset + : journal.sector_buf + 512*journal.cur_sector) + journal.in_sector_pos ); journal.in_sector_pos += size; je->magic = JOURNAL_MAGIC; @@ -93,9 +97,27 @@ void prepare_journal_sector_write(journal_t & journal, io_uring_sqe *sqe, std::f { journal.sector_info[journal.cur_sector].usage_count++; ring_data_t *data = ((ring_data_t*)sqe->user_data); - data->iov = (struct iovec){ journal.sector_buf + 512*journal.cur_sector, 512 }; + data->iov = (struct iovec){ + (journal.inmemory + ? journal.buffer + journal.sector_info[journal.cur_sector].offset + : journal.sector_buf + 512*journal.cur_sector), + 512 + }; data->callback = cb; my_uring_prep_writev( sqe, journal.fd, &data->iov, 1, journal.offset + journal.sector_info[journal.cur_sector].offset ); } + +journal_t::~journal_t() +{ + if (sector_buf) + free(sector_buf); + if (sector_info) + free(sector_info); + if (buffer) + free(buffer); + sector_buf = NULL; + sector_info = NULL; + buffer = NULL; +} diff --git a/blockstore_journal.h b/blockstore_journal.h index 7773c8c5..ac2a3661 100644 --- a/blockstore_journal.h +++ b/blockstore_journal.h @@ -114,6 +114,8 @@ struct journal_t { int fd; uint64_t device_size; + bool inmemory = false; + void *buffer = NULL; uint64_t offset, len; uint64_t next_free = 512; @@ -121,8 +123,8 @@ struct journal_t uint32_t crc32_last = 0; // Current sector(s) used for writing - uint8_t *sector_buf; - journal_sector_info_t *sector_info; + void *sector_buf = NULL; + journal_sector_info_t *sector_info = NULL; uint64_t sector_count; int cur_sector = 0; int in_sector_pos = 512; // no free space because sector is initially unmapped @@ -130,6 +132,8 @@ struct journal_t // Used sector map // May use ~ 80 MB per 1 GB of used journal space in the worst case std::map used_sectors; + + ~journal_t(); }; struct blockstore_journal_check_t diff --git a/blockstore_open.cpp b/blockstore_open.cpp index 4bd6d5b2..8e0b70ee 100644 --- a/blockstore_open.cpp +++ b/blockstore_open.cpp @@ -61,6 +61,12 @@ void blockstore::calc_lengths(blockstore_config_t & config) { throw std::runtime_error("Journal is too small"); } + if (journal.inmemory) + { + journal.buffer = memalign(512, journal.len); + if (!journal.buffer) + throw std::bad_alloc(); + } } void check_size(int fd, uint64_t *size, std::string name) @@ -171,10 +177,20 @@ void blockstore::open_journal(blockstore_config_t & config) { journal.sector_count = 32; } - journal.sector_buf = (uint8_t*)memalign(512, journal.sector_count * 512); journal.sector_info = (journal_sector_info_t*)calloc(journal.sector_count, sizeof(journal_sector_info_t)); - if (!journal.sector_buf || !journal.sector_info) + if (!journal.sector_info) { throw std::bad_alloc(); } + if (config["journal_inmemory"] == "false") + { + journal.inmemory = false; + journal.sector_buf = (uint8_t*)memalign(512, journal.sector_count * 512); + if (!journal.sector_buf) + throw std::bad_alloc(); + } + else + { + journal.inmemory = true; + } } diff --git a/blockstore_read.cpp b/blockstore_read.cpp index 1226e2a3..16359611 100644 --- a/blockstore_read.cpp +++ b/blockstore_read.cpp @@ -18,6 +18,16 @@ int blockstore::fulfill_read_push(blockstore_operation *op, uint64_t &fulfilled, memset((uint8_t*)op->buf + cur_start - op->offset, 0, cur_end - cur_start); return 1; } + if (journal.inmemory && IS_JOURNAL(item_state)) + { + iovec v = { + (uint8_t*)op->buf + cur_start - op->offset, + cur_end - cur_start + }; + op->read_vec[cur_start] = v; + memcpy(v.iov_base, journal.buffer + item_location + cur_start - item_start, v.iov_len); + return 1; + } BS_SUBMIT_GET_SQE(sqe, data); data->iov = (struct iovec){ (uint8_t*)op->buf + cur_start - op->offset, @@ -25,6 +35,7 @@ int blockstore::fulfill_read_push(blockstore_operation *op, uint64_t &fulfilled, }; // FIXME: use simple std::vector instead of map for read_vec op->read_vec[cur_start] = data->iov; + op->pending_ops++; my_uring_prep_readv( sqe, IS_JOURNAL(item_state) ? journal.fd : data_fd, @@ -90,6 +101,7 @@ int blockstore::dequeue_read(blockstore_operation *read_op) return 1; } uint64_t fulfilled = 0; + read_op->pending_ops = 0; if (dirty_found) { while (dirty_it->first.oid == read_op->oid) @@ -137,7 +149,6 @@ int blockstore::dequeue_read(blockstore_operation *read_op) return 1; } read_op->retval = 0; - read_op->pending_ops = read_op->read_vec.size(); return 1; } diff --git a/blockstore_write.cpp b/blockstore_write.cpp index c21922e8..ee0857a6 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -137,6 +137,11 @@ int blockstore::dequeue_write(blockstore_operation *op) prepare_journal_sector_write(journal, sqe1, cb); op->min_used_journal_sector = op->max_used_journal_sector = 1 + journal.cur_sector; // Prepare journal data write + if (journal.inmemory) + { + // Copy data + memcpy(journal.buffer + journal.next_free, op->buf, op->len); + } data2->iov = (struct iovec){ op->buf, op->len }; data2->callback = cb; my_uring_prep_writev(