From a48e2bbf18009c948db7570870c70ad2430e07fe Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sat, 3 Apr 2021 14:51:52 +0300 Subject: [PATCH] Fix write replay ordering when immediate_commit != all Previous implementation didn't respect write ordering and could lead to corrupted data when restarting writes after an OSD outage Also rework cluster_client queueing logic and add tests for it to verify the correct behaviour --- src/CMakeLists.txt | 18 +- src/cluster_client.cpp | 683 +++++++++++++++++++----------------- src/cluster_client.h | 41 ++- src/etcd_state_client.cpp | 25 ++ src/etcd_state_client.h | 5 +- src/messenger.cpp | 117 ------ src/messenger.h | 2 +- src/mock/build.sh | 1 + src/mock/messenger.cpp | 44 +++ src/mock/ringloop.h | 25 ++ src/msgr_stop.cpp | 128 +++++++ src/test_cluster_client.cpp | 346 ++++++++++++++++++ 12 files changed, 976 insertions(+), 459 deletions(-) create mode 100644 src/mock/build.sh create mode 100644 src/mock/messenger.cpp create mode 100644 src/mock/ringloop.h create mode 100644 src/msgr_stop.cpp create mode 100644 src/test_cluster_client.cpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 004004405..b2b902de1 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -14,7 +14,7 @@ if("${CMAKE_INSTALL_PREFIX}" MATCHES "^/usr/local/?$") endif() add_definitions(-DVERSION="0.6-dev") -add_definitions(-Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith) +add_definitions(-Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -I ${CMAKE_SOURCE_DIR}/src) if (${WITH_ASAN}) add_definitions(-fsanitize=address -fno-omit-frame-pointer) add_link_options(-fsanitize=address -fno-omit-frame-pointer) @@ -67,7 +67,7 @@ target_link_libraries(fio_vitastor_blk add_executable(vitastor-osd osd_main.cpp osd.cpp osd_secondary.cpp msgr_receive.cpp msgr_send.cpp osd_peering.cpp osd_flush.cpp osd_peering_pg.cpp osd_primary.cpp osd_primary_sync.cpp osd_primary_write.cpp osd_primary_subops.cpp - etcd_state_client.cpp messenger.cpp msgr_op.cpp osd_cluster.cpp http_client.cpp osd_ops.cpp pg_states.cpp + etcd_state_client.cpp messenger.cpp msgr_stop.cpp msgr_op.cpp osd_cluster.cpp http_client.cpp osd_ops.cpp pg_states.cpp osd_rmw.cpp base64.cpp timerfd_manager.cpp epoll_manager.cpp ../json11/json11.cpp ) target_link_libraries(vitastor-osd @@ -87,7 +87,7 @@ target_link_libraries(fio_vitastor_sec # libvitastor_client.so add_library(vitastor_client SHARED cluster_client.cpp epoll_manager.cpp etcd_state_client.cpp - messenger.cpp msgr_op.cpp msgr_send.cpp msgr_receive.cpp ringloop.cpp ../json11/json11.cpp + messenger.cpp msgr_stop.cpp msgr_op.cpp msgr_send.cpp msgr_receive.cpp ringloop.cpp ../json11/json11.cpp http_client.cpp osd_ops.cpp pg_states.cpp timerfd_manager.cpp base64.cpp ) target_link_libraries(vitastor_client @@ -162,7 +162,8 @@ target_link_libraries(osd_rmw_test Jerasure tcmalloc_minimal) # stub_uring_osd add_executable(stub_uring_osd - stub_uring_osd.cpp epoll_manager.cpp messenger.cpp msgr_op.cpp msgr_send.cpp msgr_receive.cpp ringloop.cpp timerfd_manager.cpp ../json11/json11.cpp + stub_uring_osd.cpp epoll_manager.cpp messenger.cpp msgr_stop.cpp msgr_op.cpp + msgr_send.cpp msgr_receive.cpp ringloop.cpp timerfd_manager.cpp ../json11/json11.cpp ) target_link_libraries(stub_uring_osd ${LIBURING_LIBRARIES} @@ -176,6 +177,15 @@ target_link_libraries(osd_peering_pg_test tcmalloc_minimal) # test_allocator add_executable(test_allocator test_allocator.cpp allocator.cpp) +# test_cluster_client +add_executable(test_cluster_client + test_cluster_client.cpp + pg_states.cpp osd_ops.cpp cluster_client.cpp msgr_op.cpp mock/messenger.cpp msgr_stop.cpp + etcd_state_client.cpp timerfd_manager.cpp ../json11/json11.cpp +) +target_compile_definitions(test_cluster_client PUBLIC -D__MOCK__) +target_include_directories(test_cluster_client PUBLIC ${CMAKE_SOURCE_DIR}/src/mock) + ## test_blockstore, test_shit #add_executable(test_blockstore test_blockstore.cpp timerfd_interval.cpp) #target_link_libraries(test_blockstore blockstore) diff --git a/src/cluster_client.cpp b/src/cluster_client.cpp index dc0ea6270..3100a70ea 100644 --- a/src/cluster_client.cpp +++ b/src/cluster_client.cpp @@ -5,6 +5,11 @@ #include #include "cluster_client.h" +#define CACHE_DIRTY 1 +#define CACHE_FLUSHING 2 +#define CACHE_REPEATING 4 +#define OP_FLUSH_BUFFER 2 + cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config) { this->ringloop = ringloop; @@ -21,39 +26,18 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd // peer_osd just connected continue_ops(); } - else if (unsynced_writes.size()) + else if (dirty_buffers.size()) { // peer_osd just dropped connection - for (auto op: syncing_writes) + // determine WHICH dirty_buffers are now obsolete and repeat them + dirty_osds.erase(peer_osd); + for (auto & wr: dirty_buffers) { - for (auto & part: op->parts) + if (affects_osd(wr.first.inode, wr.first.stripe, wr.second.len, peer_osd) && + !(wr.second.state & CACHE_REPEATING)) { - if (part.osd_num == peer_osd && part.done) - { - // repeat this operation - part.osd_num = 0; - part.done = false; - assert(!part.sent); - op->done_count--; - } - } - } - for (auto op: unsynced_writes) - { - for (auto & part: op->parts) - { - if (part.osd_num == peer_osd && part.done) - { - // repeat this operation - part.osd_num = 0; - part.done = false; - assert(!part.sent); - op->done_count--; - } - } - if (op->done_count < op->parts.size()) - { - cur_ops.insert(op); + // FIXME: Flush in larger parts + flush_buffer(wr.first, wr.second); } } continue_ops(); @@ -91,6 +75,11 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd cluster_client_t::~cluster_client_t() { + for (auto bp: dirty_buffers) + { + free(bp.second.buf); + } + dirty_buffers.clear(); if (ringloop) { ringloop->unregister_consumer(&consumer); @@ -99,21 +88,64 @@ cluster_client_t::~cluster_client_t() void cluster_client_t::continue_ops(bool up_retry) { - for (auto op_it = cur_ops.begin(); op_it != cur_ops.end(); ) + if (!pgs_loaded) { - if ((*op_it)->up_wait) - { - if (up_retry) - { - (*op_it)->up_wait = false; - continue_rw(*op_it++); - } - else - op_it++; - } - else - continue_rw(*op_it++); + // We're offline + return; } + bool has_flushes = false, has_writes = false; + int j = 0; + for (int i = 0; i < op_queue.size(); i++) + { + bool rm = false; + if (!op_queue[i]->up_wait || up_retry) + { + op_queue[i]->up_wait = false; + if (op_queue[i]->opcode == OSD_OP_READ) + { + rm = continue_rw(op_queue[i]); + } + else if (op_queue[i]->opcode == OSD_OP_WRITE) + { + if (op_queue[i]->flags & OP_FLUSH_BUFFER) + { + rm = continue_rw(op_queue[i]); + if (!rm) + { + // Regular writes can't proceed before buffer flushes + has_flushes = true; + } + } + else if (!has_flushes) + { + rm = continue_rw(op_queue[i]); + } + if (!rm) + { + has_writes = true; + } + } + else if (op_queue[i]->opcode == OSD_OP_SYNC) + { + if (!has_writes) + { + // SYNC can't proceed before previous writes + rm = continue_sync(op_queue[i]); + if (!rm) + { + // Postpone writes until previous SYNC completes + // ...so dirty_writes can't contain anything newer than SYNC + has_flushes = true; + } + } + } + } + if (!rm) + { + op_queue[j++] = op_queue[i]; + } + } + op_queue.resize(j); } static uint32_t is_power_of_two(uint64_t value) @@ -203,23 +235,10 @@ void cluster_client_t::on_change_hook(json11::Json::object & changes) { // At this point, all pool operations should have been suspended // And now they have to be resliced! - for (auto op: cur_ops) + for (auto op: op_queue) { - if (INODE_POOL(op->inode) == pool_item.first) - { - op->needs_reslice = true; - } - } - for (auto op: unsynced_writes) - { - if (INODE_POOL(op->inode) == pool_item.first) - { - op->needs_reslice = true; - } - } - for (auto op: syncing_writes) - { - if (INODE_POOL(op->inode) == pool_item.first) + if ((op->opcode == OSD_OP_WRITE || op->opcode == OSD_OP_READ) && + INODE_POOL(op->inode) == pool_item.first) { op->needs_reslice = true; } @@ -258,21 +277,15 @@ void cluster_client_t::on_ready(std::function fn) /** * How writes are synced when immediate_commit is false * - * 1) accept up to write operations for execution, - * queue all subsequent writes into - * 2) accept exactly one SYNC, queue all subsequent SYNCs into , too - * 3) "continue" all accepted writes - * * "Continue" WRITE: - * 1) if the operation is not a copy yet - copy it (required for replay) - * 2) if the operation is not sliced yet - slice it - * 3) if the operation doesn't require reslice - try to connect & send all remaining parts - * 4) if any of them fail due to disconnected peers or PGs not up, repeat after reconnecting or small timeout - * 5) if any of them fail due to other errors, fail the operation and forget it from the current "unsynced batch" - * 6) if PG count changes before all parts are done, wait for all in-progress parts to finish, + * 1) if the operation is not sliced yet - slice it + * 2) if the operation doesn't require reslice - try to connect & send all remaining parts + * 3) if any of them fail due to disconnected peers or PGs not up, repeat after reconnecting or small timeout + * 4) if any of them fail due to other errors, fail the operation and forget it from the current "unsynced batch" + * 5) if PG count changes before all parts are done, wait for all in-progress parts to finish, * throw all results away, reslice and resubmit op - * 7) when all parts are done, try to "continue" the current SYNC - * 8) if the operation succeeds, but then some OSDs drop their connections, repeat + * 6) when all parts are done, try to "continue" the current SYNC + * 7) if the operation succeeds, but then some OSDs drop their connections, repeat * parts from the current "unsynced batch" previously sent to those OSDs in any order * * "Continue" current SYNC: @@ -282,181 +295,241 @@ void cluster_client_t::on_ready(std::function fn) * 4) if any of them fail due to disconnected peers, repeat SYNC after repeating all writes * 5) if any of them fail due to other errors, fail the SYNC operation */ - void cluster_client_t::execute(cluster_op_t *op) { - if (!pgs_loaded) + if (op->opcode != OSD_OP_SYNC && op->opcode != OSD_OP_READ && op->opcode != OSD_OP_WRITE) { - // We're offline - offline_ops.push_back(op); + op->retval = -EINVAL; + std::function(op->callback)(op); return; } op->retval = 0; - if (op->opcode != OSD_OP_SYNC && op->opcode != OSD_OP_READ && op->opcode != OSD_OP_WRITE || - (op->opcode == OSD_OP_READ || op->opcode == OSD_OP_WRITE) && (!op->inode || !op->len || - op->offset % bs_bitmap_granularity || op->len % bs_bitmap_granularity)) - { - op->retval = -EINVAL; - std::function(op->callback)(op); - return; - } - if (op->opcode == OSD_OP_SYNC) - { - execute_sync(op); - return; - } if (op->opcode == OSD_OP_WRITE && !immediate_commit) { - if (next_writes.size() > 0) - { - assert(cur_sync); - next_writes.push_back(op); - return; - } - if (queued_bytes >= client_dirty_limit) + if (dirty_bytes >= client_dirty_limit) { // Push an extra SYNC operation to flush previous writes - next_writes.push_back(op); cluster_op_t *sync_op = new cluster_op_t; - sync_op->is_internal = true; sync_op->opcode = OSD_OP_SYNC; - sync_op->callback = [](cluster_op_t* sync_op) {}; - execute_sync(sync_op); - return; + sync_op->callback = [](cluster_op_t* sync_op) + { + delete sync_op; + }; + op_queue.push_back(sync_op); + dirty_bytes = 0; } - queued_bytes += op->len; + dirty_bytes += op->len; } - cur_ops.insert(op); - continue_rw(op); + else if (op->opcode == OSD_OP_SYNC) + { + dirty_bytes = 0; + } + op_queue.push_back(op); + continue_ops(); } -void cluster_client_t::continue_rw(cluster_op_t *op) +void cluster_client_t::copy_write(cluster_op_t *op, std::map & dirty_buffers) { - pool_id_t pool_id = INODE_POOL(op->inode); - if (!pool_id) + // Save operation for replay when one of PGs goes out of sync + // (primary OSD drops our connection in this case) + auto dirty_it = dirty_buffers.lower_bound((object_id){ + .inode = op->inode, + .stripe = op->offset, + }); + while (dirty_it != dirty_buffers.begin()) + { + dirty_it--; + if (dirty_it->first.inode != op->inode || + (dirty_it->first.stripe + dirty_it->second.len) <= op->offset) + { + dirty_it++; + break; + } + } + uint64_t pos = op->offset, len = op->len, iov_idx = 0, iov_pos = 0; + while (len > 0) + { + uint64_t new_len = 0; + if (dirty_it == dirty_buffers.end()) + { + new_len = len; + } + else if (dirty_it->first.inode != op->inode || dirty_it->first.stripe > pos) + { + new_len = dirty_it->first.stripe - pos; + if (new_len > len) + { + new_len = len; + } + } + if (new_len > 0) + { + dirty_it = dirty_buffers.emplace_hint(dirty_it, (object_id){ + .inode = op->inode, + .stripe = pos, + }, (cluster_buffer_t){ + .buf = malloc_or_die(new_len), + .len = new_len, + }); + } + // FIXME: Split big buffers into smaller ones on overwrites. But this will require refcounting + dirty_it->second.state = CACHE_DIRTY; + uint64_t cur_len = (dirty_it->first.stripe + dirty_it->second.len - pos); + if (cur_len > len) + { + cur_len = len; + } + while (cur_len > 0 && iov_idx < op->iov.count) + { + unsigned iov_len = (op->iov.buf[iov_idx].iov_len - iov_pos); + if (iov_len <= cur_len) + { + memcpy(dirty_it->second.buf + pos - dirty_it->first.stripe, + op->iov.buf[iov_idx].iov_base + iov_pos, iov_len); + pos += iov_len; + len -= iov_len; + cur_len -= iov_len; + iov_pos = 0; + iov_idx++; + } + else + { + memcpy(dirty_it->second.buf + pos - dirty_it->first.stripe, + op->iov.buf[iov_idx].iov_base + iov_pos, cur_len); + pos += cur_len; + len -= cur_len; + iov_pos += cur_len; + cur_len = 0; + } + } + dirty_it++; + } +} + +void cluster_client_t::flush_buffer(const object_id & oid, cluster_buffer_t & wr) +{ + wr.state = CACHE_DIRTY | CACHE_REPEATING; + cluster_op_t *op = new cluster_op_t; + op->flags = OP_FLUSH_BUFFER; + op->opcode = OSD_OP_WRITE; + op->inode = oid.inode; + op->offset = oid.stripe; + op->len = wr.len; + op->iov.push_back(wr.buf, wr.len); + op->callback = [](cluster_op_t* op) + { + delete op; + }; + op_queue.push_front(op); +} + +int cluster_client_t::continue_rw(cluster_op_t *op) +{ + if (op->state == 0) + goto resume_0; + else if (op->state == 1) + goto resume_1; + else if (op->state == 2) + goto resume_2; + else if (op->state == 3) + goto resume_3; +resume_0: + if (!op->len || op->offset % bs_bitmap_granularity || op->len % bs_bitmap_granularity) { op->retval = -EINVAL; std::function(op->callback)(op); - return; + return 1; } - if (st_cli.pool_config.find(pool_id) == st_cli.pool_config.end() || - st_cli.pool_config[pool_id].real_pg_count == 0) { - // Postpone operations to unknown pools - return; - } - if (op->opcode == OSD_OP_WRITE && !immediate_commit && !op->is_internal) - { - // Save operation for replay when PG goes out of sync - // (primary OSD drops our connection in this case) - cluster_op_t *op_copy = new cluster_op_t(); - op_copy->is_internal = true; - op_copy->orig_op = op; - op_copy->opcode = op->opcode; - op_copy->inode = op->inode; - op_copy->offset = op->offset; - op_copy->len = op->len; - op_copy->buf = malloc_or_die(op->len); - op_copy->iov.push_back(op_copy->buf, op->len); - op_copy->callback = [](cluster_op_t* op_copy) + pool_id_t pool_id = INODE_POOL(op->inode); + if (!pool_id) { - if (op_copy->orig_op) - { - // Acknowledge write and forget the original pointer - op_copy->orig_op->retval = op_copy->retval; - std::function(op_copy->orig_op->callback)(op_copy->orig_op); - op_copy->orig_op = NULL; - } - }; - void *cur_buf = op_copy->buf; - for (int i = 0; i < op->iov.count; i++) - { - memcpy(cur_buf, op->iov.buf[i].iov_base, op->iov.buf[i].iov_len); - cur_buf += op->iov.buf[i].iov_len; - } - unsynced_writes.push_back(op_copy); - cur_ops.erase(op); - cur_ops.insert(op_copy); - op = op_copy; - } - if (!op->parts.size()) - { - // Slice the operation into parts - slice_rw(op); - } - if (!op->needs_reslice) - { - // Send unsent parts, if they're not subject to change - for (auto & op_part: op->parts) - { - if (!op_part.sent && !op_part.done) - { - try_send(op, &op_part); - } - } - } - if (!op->sent_count) - { - if (op->done_count >= op->parts.size()) - { - // Finished successfully - // Even if the PG count has changed in meanwhile we treat it as success - // because if some operations were invalid for the new PG count we'd get errors - cur_ops.erase(op); - op->retval = op->len; + op->retval = -EINVAL; std::function(op->callback)(op); - continue_sync(); - return; + return 1; } - else if (op->retval != 0 && op->retval != -EPIPE) + if (st_cli.pool_config.find(pool_id) == st_cli.pool_config.end() || + st_cli.pool_config[pool_id].real_pg_count == 0) { - // Fatal error (not -EPIPE) - cur_ops.erase(op); - if (!immediate_commit && op->opcode == OSD_OP_WRITE) + // Postpone operations to unknown pools + return 0; + } + } + if (op->opcode == OSD_OP_WRITE) + { + if (!immediate_commit) + { + copy_write(op, dirty_buffers); + } + } +resume_1: + // Slice the operation into parts + slice_rw(op); + op->needs_reslice = false; +resume_2: + // Send unsent parts, if they're not subject to change + op->state = 3; + for (int i = 0; i < op->parts.size(); i++) + { + if (!op->parts[i].sent && !op->parts[i].done) + { + if (!try_send(op, i)) { - for (int i = 0; i < unsynced_writes.size(); i++) - { - if (unsynced_writes[i] == op) - { - unsynced_writes.erase(unsynced_writes.begin()+i, unsynced_writes.begin()+i+1); - break; - } - } + // We'll need to retry again + op->state = 2; } - bool del = op->is_internal; - std::function(op->callback)(op); - if (del) - { - if (op->buf) - free(op->buf); - delete op; - } - continue_sync(); - return; + } + } + if (op->state == 2) + { + return 0; + } +resume_3: + if (op->sent_count > 0) + { + op->state = 3; + return 0; + } + if (op->done_count >= op->parts.size()) + { + // Finished successfully + // Even if the PG count has changed in meanwhile we treat it as success + // because if some operations were invalid for the new PG count we'd get errors + op->retval = op->len; + std::function(op->callback)(op); + return 1; + } + else if (op->retval != 0 && op->retval != -EPIPE) + { + // Fatal error (not -EPIPE) + std::function(op->callback)(op); + return 1; + } + else + { + // -EPIPE - clear the error and retry + op->retval = 0; + if (op->needs_reslice) + { + op->parts.clear(); + op->done_count = 0; + goto resume_1; } else { - // -EPIPE or no error - clear the error - op->retval = 0; - if (op->needs_reslice) - { - op->parts.clear(); - op->done_count = 0; - op->needs_reslice = false; - continue_rw(op); - } + goto resume_2; } } + return 0; } void cluster_client_t::slice_rw(cluster_op_t *op) { // Slice the request into individual object stripe requests // Primary OSDs still operate individual stripes, but their size is multiplied by PG minsize in case of EC - auto & pool_cfg = st_cli.pool_config[INODE_POOL(op->inode)]; - uint64_t pg_block_size = bs_block_size * ( - pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks - ); + auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(op->inode)); + uint32_t pg_data_size = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks); + uint64_t pg_block_size = bs_block_size * pg_data_size; uint64_t first_stripe = (op->offset / pg_block_size) * pg_block_size; uint64_t last_stripe = ((op->offset + op->len + pg_block_size - 1) / pg_block_size - 1) * pg_block_size; op->retval = 0; @@ -500,8 +573,28 @@ void cluster_client_t::slice_rw(cluster_op_t *op) } } -bool cluster_client_t::try_send(cluster_op_t *op, cluster_op_part_t *part) +bool cluster_client_t::affects_osd(uint64_t inode, uint64_t offset, uint64_t len, osd_num_t osd) { + auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(inode)); + uint32_t pg_data_size = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks); + uint64_t pg_block_size = bs_block_size * pg_data_size; + uint64_t first_stripe = (offset / pg_block_size) * pg_block_size; + uint64_t last_stripe = ((offset + len + pg_block_size - 1) / pg_block_size - 1) * pg_block_size; + for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size) + { + pg_num_t pg_num = (stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1; // like map_to_pg() + auto pg_it = pool_cfg.pg_config.find(pg_num); + if (pg_it != pool_cfg.pg_config.end() && pg_it->second.cur_primary == osd) + { + return true; + } + } + return false; +} + +bool cluster_client_t::try_send(cluster_op_t *op, int i) +{ + auto part = &op->parts[i]; auto & pool_cfg = st_cli.pool_config[INODE_POOL(op->inode)]; auto pg_it = pool_cfg.pg_config.find(part->pg_num); if (pg_it != pool_cfg.pg_config.end() && @@ -545,129 +638,92 @@ bool cluster_client_t::try_send(cluster_op_t *op, cluster_op_part_t *part) return false; } -void cluster_client_t::execute_sync(cluster_op_t *op) +int cluster_client_t::continue_sync(cluster_op_t *op) { - if (immediate_commit) + if (op->state == 1) + goto resume_1; + if (immediate_commit || !dirty_osds.size()) { - // Syncs are not required in the immediate_commit mode + // Sync is not required in the immediate_commit mode or if there are no dirty_osds op->retval = 0; std::function(op->callback)(op); - } - else if (cur_sync != NULL) - { - next_writes.push_back(op); - } - else - { - cur_sync = op; - continue_sync(); - } -} - -void cluster_client_t::continue_sync() -{ - if (!cur_sync || cur_sync->parts.size() > 0) - { - // Already submitted - return; - } - cur_sync->retval = 0; - std::set sync_osds; - for (auto prev_op: unsynced_writes) - { - if (prev_op->done_count < prev_op->parts.size()) - { - // Writes not finished yet - return; - } - for (auto & part: prev_op->parts) - { - if (part.osd_num) - { - sync_osds.insert(part.osd_num); - } - } - } - if (!sync_osds.size()) - { - // No dirty writes - finish_sync(); - return; + return 1; } // Check that all OSD connections are still alive - for (auto sync_osd: sync_osds) + for (auto sync_osd: dirty_osds) { auto peer_it = msgr.osd_peer_fds.find(sync_osd); if (peer_it == msgr.osd_peer_fds.end()) { - // SYNC is pointless to send to a non connected OSD - return; + return 0; } } - syncing_writes.swap(unsynced_writes); // Post sync to affected OSDs - cur_sync->parts.resize(sync_osds.size()); - int i = 0; - for (auto sync_osd: sync_osds) + for (auto & prev_op: dirty_buffers) { - cur_sync->parts[i] = { - .parent = cur_sync, - .osd_num = sync_osd, - .sent = false, - .done = false, - }; - send_sync(cur_sync, &cur_sync->parts[i]); - i++; - } -} - -void cluster_client_t::finish_sync() -{ - int retval = cur_sync->retval; - if (retval != 0) - { - for (auto op: syncing_writes) + if (prev_op.second.state == CACHE_DIRTY) { - if (op->done_count < op->parts.size()) + prev_op.second.state = CACHE_FLUSHING; + } + } + op->parts.resize(dirty_osds.size()); + op->retval = 0; + { + int i = 0; + for (auto sync_osd: dirty_osds) + { + op->parts[i] = { + .parent = op, + .osd_num = sync_osd, + .sent = false, + .done = false, + }; + send_sync(op, &op->parts[i]); + i++; + } + } + dirty_osds.clear(); +resume_1: + if (op->sent_count > 0) + { + op->state = 1; + return 0; + } + if (op->retval != 0) + { + for (auto uw_it = dirty_buffers.begin(); uw_it != dirty_buffers.end(); uw_it++) + { + if (uw_it->second.state == CACHE_FLUSHING) { - cur_ops.insert(op); + uw_it->second.state = CACHE_DIRTY; } } - unsynced_writes.insert(unsynced_writes.begin(), syncing_writes.begin(), syncing_writes.end()); - syncing_writes.clear(); - } - if (retval == -EPIPE) - { - // Retry later - cur_sync->parts.clear(); - cur_sync->retval = 0; - cur_sync->sent_count = 0; - cur_sync->done_count = 0; - return; - } - std::function(cur_sync->callback)(cur_sync); - if (!retval) - { - for (auto op: syncing_writes) + if (op->retval == -EPIPE) { - assert(op->sent_count == 0); - if (op->is_internal) - { - if (op->buf) - free(op->buf); - delete op; - } + // Retry later + op->parts.clear(); + op->retval = 0; + op->sent_count = 0; + op->done_count = 0; + op->state = 0; + return 0; } - syncing_writes.clear(); } - cur_sync = NULL; - queued_bytes = 0; - std::vector next_wr_copy; - next_wr_copy.swap(next_writes); - for (auto next_op: next_wr_copy) + else { - execute(next_op); + for (auto uw_it = dirty_buffers.begin(); uw_it != dirty_buffers.end(); ) + { + if (uw_it->second.state == CACHE_FLUSHING) + { + free(uw_it->second.buf); + dirty_buffers.erase(uw_it++); + } + else + uw_it++; + } } + std::function(op->callback)(op); + return 1; } void cluster_client_t::send_sync(cluster_op_t *op, cluster_op_part_t *part) @@ -729,19 +785,12 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part) else { // OK + dirty_osds.insert(part->osd_num); part->done = true; op->done_count++; } if (op->sent_count == 0) { - if (op->opcode == OSD_OP_SYNC) - { - assert(op == cur_sync); - finish_sync(); - } - else if (!op->up_wait) - { - continue_rw(op); - } + continue_ops(); } } diff --git a/src/cluster_client.h b/src/cluster_client.h index 9bf61748e..0bce1d561 100644 --- a/src/cluster_client.h +++ b/src/cluster_client.h @@ -37,9 +37,10 @@ struct cluster_op_t osd_op_buf_list_t iov; std::function callback; protected: + int flags = 0; + int state = 0; void *buf = NULL; cluster_op_t *orig_op = NULL; - bool is_internal = false; bool needs_reslice = false; bool up_wait = false; int sent_count = 0, done_count = 0; @@ -47,6 +48,14 @@ protected: friend class cluster_client_t; }; +struct cluster_buffer_t +{ + void *buf; + uint64_t len; + int state; +}; + +// FIXME: Split into public and private interfaces class cluster_client_t { timerfd_manager_t *tfd; @@ -61,21 +70,16 @@ class cluster_client_t int log_level; int up_wait_retry_interval = 500; // ms - uint64_t op_id = 1; - ring_consumer_t consumer; - // operations currently in progress - std::set cur_ops; int retry_timeout_id = 0; - // unsynced operations are copied in memory to allow replay when cluster isn't in the immediate_commit mode - // unsynced_writes are replayed in any order (because only the SYNC operation guarantees ordering) - std::vector unsynced_writes; - std::vector syncing_writes; - cluster_op_t* cur_sync = NULL; - std::vector next_writes; + uint64_t op_id = 1; std::vector offline_ops; - uint64_t queued_bytes = 0; + std::deque op_queue; + std::map dirty_buffers; + std::set dirty_osds; + uint64_t dirty_bytes = 0; bool pgs_loaded = false; + ring_consumer_t consumer; std::vector> on_ready_hooks; public: @@ -89,18 +93,19 @@ public: bool is_ready(); void on_ready(std::function fn); -protected: + static void copy_write(cluster_op_t *op, std::map & dirty_buffers); void continue_ops(bool up_retry = false); +protected: + bool affects_osd(uint64_t inode, uint64_t offset, uint64_t len, osd_num_t osd); + void flush_buffer(const object_id & oid, cluster_buffer_t & wr); void on_load_config_hook(json11::Json::object & config); void on_load_pgs_hook(bool success); void on_change_hook(json11::Json::object & changes); void on_change_osd_state_hook(uint64_t peer_osd); - void continue_rw(cluster_op_t *op); + int continue_rw(cluster_op_t *op); void slice_rw(cluster_op_t *op); - bool try_send(cluster_op_t *op, cluster_op_part_t *part); - void execute_sync(cluster_op_t *op); - void continue_sync(); - void finish_sync(); + bool try_send(cluster_op_t *op, int i); + int continue_sync(cluster_op_t *op); void send_sync(cluster_op_t *op, cluster_op_part_t *part); void handle_op_part(cluster_op_part_t *part); }; diff --git a/src/etcd_state_client.cpp b/src/etcd_state_client.cpp index cafdffb06..c260bc321 100644 --- a/src/etcd_state_client.cpp +++ b/src/etcd_state_client.cpp @@ -4,19 +4,24 @@ #include "osd_ops.h" #include "pg_states.h" #include "etcd_state_client.h" +#ifndef __MOCK__ #include "http_client.h" #include "base64.h" +#endif etcd_state_client_t::~etcd_state_client_t() { etcd_watches_initialised = -1; +#ifndef __MOCK__ if (etcd_watch_ws) { etcd_watch_ws->close(); etcd_watch_ws = NULL; } +#endif } +#ifndef __MOCK__ json_kv_t etcd_state_client_t::parse_etcd_kv(const json11::Json & kv_json) { json_kv_t kv; @@ -323,6 +328,26 @@ void etcd_state_client_t::load_pgs() start_etcd_watcher(); }); } +#else +void etcd_state_client_t::parse_config(json11::Json & config) +{ +} + +void etcd_state_client_t::load_global_config() +{ + json11::Json::object global_config; + on_load_config_hook(global_config); +} + +void etcd_state_client_t::load_pgs() +{ +} +#endif + +void etcd_state_client_t::parse_state(const json_kv_t & kv) +{ + parse_state(kv.key, kv.value); +} void etcd_state_client_t::parse_state(const std::string & key, const json11::Json & value) { diff --git a/src/etcd_state_client.h b/src/etcd_state_client.h index 1fb3fe222..4f412c9be 100644 --- a/src/etcd_state_client.h +++ b/src/etcd_state_client.h @@ -57,6 +57,8 @@ struct websocket_t; struct etcd_state_client_t { protected: + websocket_t *etcd_watch_ws = NULL; + uint64_t bs_block_size = DEFAULT_BLOCK_SIZE; void add_etcd_url(std::string); public: std::vector etcd_addresses; @@ -66,8 +68,6 @@ public: int etcd_watches_initialised = 0; uint64_t etcd_watch_revision = 0; - websocket_t *etcd_watch_ws = NULL; - uint64_t bs_block_size = 0; std::map pool_config; std::map peer_states; @@ -84,6 +84,7 @@ public: void start_etcd_watcher(); void load_global_config(); void load_pgs(); + void parse_state(const json_kv_t & kv); void parse_state(const std::string & key, const json11::Json & value); void parse_config(json11::Json & config); ~etcd_state_client_t(); diff --git a/src/messenger.cpp b/src/messenger.cpp index fec99c81f..5e75754fd 100644 --- a/src/messenger.cpp +++ b/src/messenger.cpp @@ -357,123 +357,6 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl) outbox_push(op); } -void osd_messenger_t::cancel_osd_ops(osd_client_t *cl) -{ - for (auto p: cl->sent_ops) - { - cancel_op(p.second); - } - cl->sent_ops.clear(); - cl->outbox.clear(); -} - -void osd_messenger_t::cancel_op(osd_op_t *op) -{ - if (op->op_type == OSD_OP_OUT) - { - op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; - op->reply.hdr.id = op->req.hdr.id; - op->reply.hdr.opcode = op->req.hdr.opcode; - op->reply.hdr.retval = -EPIPE; - // Copy lambda to be unaffected by `delete op` - std::function(op->callback)(op); - } - else - { - // This function is only called in stop_client(), so it's fine to destroy the operation - delete op; - } -} - -void osd_messenger_t::stop_client(int peer_fd, bool force) -{ - assert(peer_fd != 0); - auto it = clients.find(peer_fd); - if (it == clients.end()) - { - return; - } - uint64_t repeer_osd = 0; - osd_client_t *cl = it->second; - if (cl->peer_state == PEER_CONNECTED) - { - if (cl->osd_num) - { - // Reload configuration from etcd when the connection is dropped - if (log_level > 0) - printf("[OSD %lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl->osd_num); - repeer_osd = cl->osd_num; - } - else - { - if (log_level > 0) - printf("[OSD %lu] Stopping client %d (regular client)\n", osd_num, peer_fd); - } - } - else if (!force) - { - return; - } - cl->peer_state = PEER_STOPPED; - clients.erase(it); - tfd->set_fd_handler(peer_fd, false, NULL); - if (cl->connect_timeout_id >= 0) - { - tfd->clear_timer(cl->connect_timeout_id); - cl->connect_timeout_id = -1; - } - if (cl->osd_num) - { - osd_peer_fds.erase(cl->osd_num); - } - if (cl->read_op) - { - if (cl->read_op->callback) - { - cancel_op(cl->read_op); - } - else - { - delete cl->read_op; - } - cl->read_op = NULL; - } - for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++) - { - if (*rit == peer_fd) - { - read_ready_clients.erase(rit); - break; - } - } - for (auto wit = write_ready_clients.begin(); wit != write_ready_clients.end(); wit++) - { - if (*wit == peer_fd) - { - write_ready_clients.erase(wit); - break; - } - } - free(cl->in_buf); - cl->in_buf = NULL; - close(peer_fd); - if (repeer_osd) - { - // First repeer PGs as canceling OSD ops may push new operations - // and we need correct PG states when we do that - repeer_pgs(repeer_osd); - } - if (cl->osd_num) - { - // Cancel outbound operations - cancel_osd_ops(cl); - } - if (cl->refs <= 0) - { - delete cl; - } -} - void osd_messenger_t::accept_connections(int listen_fd) { // Accept new connections diff --git a/src/messenger.h b/src/messenger.h index 2c246bc4c..0c367970b 100644 --- a/src/messenger.h +++ b/src/messenger.h @@ -16,7 +16,7 @@ #include "json11/json11.hpp" #include "msgr_op.h" #include "timerfd_manager.h" -#include "ringloop.h" +#include #define CL_READ_HDR 1 #define CL_READ_DATA 2 diff --git a/src/mock/build.sh b/src/mock/build.sh new file mode 100644 index 000000000..dd92e1aac --- /dev/null +++ b/src/mock/build.sh @@ -0,0 +1 @@ +g++ -D__MOCK__ -fsanitize=address -g -Wno-pointer-arith pg_states.cpp osd_ops.cpp test_cluster_client.cpp cluster_client.cpp msgr_op.cpp msgr_stop.cpp mock/messenger.cpp etcd_state_client.cpp timerfd_manager.cpp ../json11/json11.cpp -I mock -I . -I ..; ./a.out \ No newline at end of file diff --git a/src/mock/messenger.cpp b/src/mock/messenger.cpp new file mode 100644 index 000000000..0fcd3cbde --- /dev/null +++ b/src/mock/messenger.cpp @@ -0,0 +1,44 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details) + +#include +#include +#include + +#include "messenger.h" + +void osd_messenger_t::init() +{ +} + +osd_messenger_t::~osd_messenger_t() +{ + while (clients.size() > 0) + { + stop_client(clients.begin()->first, true); + } +} + +void osd_messenger_t::outbox_push(osd_op_t *cur_op) +{ + clients[cur_op->peer_fd]->sent_ops[cur_op->req.hdr.id] = cur_op; +} + +void osd_messenger_t::parse_config(const json11::Json & config) +{ +} + +void osd_messenger_t::connect_peer(uint64_t peer_osd, json11::Json peer_state) +{ + wanted_peers[peer_osd] = (osd_wanted_peer_t){ + .port = 1, + }; +} + +void osd_messenger_t::read_requests() +{ +} + +void osd_messenger_t::send_replies() +{ +} diff --git a/src/mock/ringloop.h b/src/mock/ringloop.h new file mode 100644 index 000000000..37b290503 --- /dev/null +++ b/src/mock/ringloop.h @@ -0,0 +1,25 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details) + +#pragma once + +#include + +struct ring_consumer_t +{ + std::function loop; +}; + +class ring_loop_t +{ +public: + void register_consumer(ring_consumer_t *consumer) + { + } + void unregister_consumer(ring_consumer_t *consumer) + { + } + void submit() + { + } +}; diff --git a/src/msgr_stop.cpp b/src/msgr_stop.cpp new file mode 100644 index 000000000..e4e400459 --- /dev/null +++ b/src/msgr_stop.cpp @@ -0,0 +1,128 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details) + +#include +#include + +#include "messenger.h" + +void osd_messenger_t::cancel_osd_ops(osd_client_t *cl) +{ + for (auto p: cl->sent_ops) + { + cancel_op(p.second); + } + cl->sent_ops.clear(); + cl->outbox.clear(); +} + +void osd_messenger_t::cancel_op(osd_op_t *op) +{ + if (op->op_type == OSD_OP_OUT) + { + op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; + op->reply.hdr.id = op->req.hdr.id; + op->reply.hdr.opcode = op->req.hdr.opcode; + op->reply.hdr.retval = -EPIPE; + // Copy lambda to be unaffected by `delete op` + std::function(op->callback)(op); + } + else + { + // This function is only called in stop_client(), so it's fine to destroy the operation + delete op; + } +} + +void osd_messenger_t::stop_client(int peer_fd, bool force) +{ + assert(peer_fd != 0); + auto it = clients.find(peer_fd); + if (it == clients.end()) + { + return; + } + uint64_t repeer_osd = 0; + osd_client_t *cl = it->second; + if (cl->peer_state == PEER_CONNECTED) + { + if (cl->osd_num) + { + // Reload configuration from etcd when the connection is dropped + if (log_level > 0) + printf("[OSD %lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl->osd_num); + repeer_osd = cl->osd_num; + } + else + { + if (log_level > 0) + printf("[OSD %lu] Stopping client %d (regular client)\n", osd_num, peer_fd); + } + } + else if (!force) + { + return; + } + cl->peer_state = PEER_STOPPED; + clients.erase(it); +#ifndef __MOCK__ + tfd->set_fd_handler(peer_fd, false, NULL); + if (cl->connect_timeout_id >= 0) + { + tfd->clear_timer(cl->connect_timeout_id); + cl->connect_timeout_id = -1; + } +#endif + if (cl->osd_num) + { + osd_peer_fds.erase(cl->osd_num); + } + if (cl->read_op) + { + if (cl->read_op->callback) + { + cancel_op(cl->read_op); + } + else + { + delete cl->read_op; + } + cl->read_op = NULL; + } + for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++) + { + if (*rit == peer_fd) + { + read_ready_clients.erase(rit); + break; + } + } + for (auto wit = write_ready_clients.begin(); wit != write_ready_clients.end(); wit++) + { + if (*wit == peer_fd) + { + write_ready_clients.erase(wit); + break; + } + } + free(cl->in_buf); + cl->in_buf = NULL; +#ifndef __MOCK__ + close(peer_fd); +#endif + if (repeer_osd) + { + // First repeer PGs as canceling OSD ops may push new operations + // and we need correct PG states when we do that + repeer_pgs(repeer_osd); + } + if (cl->osd_num) + { + // Cancel outbound operations + cancel_osd_ops(cl); + } + if (cl->refs <= 0) + { + delete cl; + } +} diff --git a/src/test_cluster_client.cpp b/src/test_cluster_client.cpp new file mode 100644 index 000000000..b89eef5e3 --- /dev/null +++ b/src/test_cluster_client.cpp @@ -0,0 +1,346 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 (see README.md for details) + +#include +#include +#include +#include "cluster_client.h" + +void configure_single_pg_pool(cluster_client_t *cli) +{ + cli->st_cli.on_load_pgs_hook(true); + cli->st_cli.parse_state((json_kv_t){ + .key = "/config/pools", + .value = json11::Json::object { + { "1", json11::Json::object { + { "name", "hddpool" }, + { "scheme", "replicated" }, + { "pg_size", 2 }, + { "pg_minsize", 1 }, + { "pg_count", 1 }, + { "failure_domain", "osd" }, + } } + }, + }); + cli->st_cli.parse_state((json_kv_t){ + .key = "/config/pgs", + .value = json11::Json::object { + { "items", json11::Json::object { + { "1", json11::Json::object { + { "1", json11::Json::object { + { "osd_set", json11::Json::array { 1, 2 } }, + { "primary", 1 }, + } } + } } + } } + }, + }); + cli->st_cli.parse_state((json_kv_t){ + .key = "/pg/state/1/1", + .value = json11::Json::object { + { "peers", json11::Json::array { 1, 2 } }, + { "primary", 1 }, + { "state", json11::Json::array { "active" } }, + }, + }); + json11::Json::object changes; + cli->st_cli.on_change_hook(changes); +} + +int *test_write(cluster_client_t *cli, uint64_t offset, uint64_t len, uint8_t c) +{ + printf("Post write %lx+%lx\n", offset, len); + int *r = new int; + *r = -1; + cluster_op_t *op = new cluster_op_t(); + op->opcode = OSD_OP_WRITE; + op->inode = 0x1000000000001; + op->offset = offset; + op->len = len; + op->iov.push_back(malloc_or_die(len), len); + memset(op->iov.buf[0].iov_base, c, len); + op->callback = [r](cluster_op_t *op) + { + if (*r == -1) + printf("Error: Not allowed to complete yet\n"); + assert(*r != -1); + *r = op->retval == op->len ? 1 : 0; + free(op->iov.buf[0].iov_base); + printf("Done write %lx+%lx r=%d\n", op->offset, op->len, op->retval); + delete op; + }; + cli->execute(op); + return r; +} + +int *test_sync(cluster_client_t *cli) +{ + printf("Post sync\n"); + int *r = new int; + *r = -1; + cluster_op_t *op = new cluster_op_t(); + op->opcode = OSD_OP_SYNC; + op->callback = [r](cluster_op_t *op) + { + if (*r == -1) + printf("Error: Not allowed to complete yet\n"); + assert(*r != -1); + *r = op->retval == 0 ? 1 : 0; + printf("Done sync r=%d\n", op->retval); + delete op; + }; + cli->execute(op); + return r; +} + +void can_complete(int *r) +{ + // Allow the operation to proceed so the test verifies + // that it doesn't complete earlier than expected + *r = -2; +} + +void check_completed(int *r) +{ + assert(*r == 1); + delete r; +} + +void pretend_connected(cluster_client_t *cli, osd_num_t osd_num) +{ + printf("OSD %lu connected\n", osd_num); + int peer_fd = cli->msgr.clients.size() ? std::prev(cli->msgr.clients.end())->first+1 : 10; + cli->msgr.osd_peer_fds[osd_num] = peer_fd; + cli->msgr.clients[peer_fd] = new osd_client_t(); + cli->msgr.clients[peer_fd]->osd_num = osd_num; + cli->msgr.clients[peer_fd]->peer_state = PEER_CONNECTED; + cli->msgr.wanted_peers.erase(osd_num); + cli->msgr.repeer_pgs(osd_num); +} + +void pretend_disconnected(cluster_client_t *cli, osd_num_t osd_num) +{ + printf("OSD %lu disconnected\n", osd_num); + cli->msgr.stop_client(cli->msgr.osd_peer_fds.at(osd_num)); +} + +void check_op_count(cluster_client_t *cli, osd_num_t osd_num, int ops) +{ + int peer_fd = cli->msgr.osd_peer_fds.at(osd_num); + int real_ops = cli->msgr.clients[peer_fd]->sent_ops.size(); + if (real_ops != ops) + { + printf("error: %d ops expected, but %d queued\n", ops, real_ops); + assert(0); + } +} + +osd_op_t *find_op(cluster_client_t *cli, osd_num_t osd_num, uint64_t opcode, uint64_t offset, uint64_t len) +{ + int peer_fd = cli->msgr.osd_peer_fds.at(osd_num); + auto op_it = cli->msgr.clients[peer_fd]->sent_ops.begin(); + while (op_it != cli->msgr.clients[peer_fd]->sent_ops.end()) + { + auto op = op_it->second; + if (op->req.hdr.opcode == opcode && (opcode == OSD_OP_SYNC || + op->req.rw.inode == 0x1000000000001 && op->req.rw.offset == offset && op->req.rw.len == len)) + { + return op; + } + op_it++; + } + return NULL; +} + +void pretend_op_completed(cluster_client_t *cli, osd_op_t *op, int retval) +{ + assert(op); + printf("Pretend completed %s %lx+%x\n", op->req.hdr.opcode == OSD_OP_SYNC + ? "sync" : (op->req.hdr.opcode == OSD_OP_WRITE ? "write" : "read"), op->req.rw.offset, op->req.rw.len); + uint64_t op_id = op->req.hdr.id; + int peer_fd = op->peer_fd; + op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; + op->reply.hdr.id = op->req.hdr.id; + op->reply.hdr.opcode = op->req.hdr.opcode; + op->reply.hdr.retval = retval < 0 ? retval : (op->req.hdr.opcode == OSD_OP_SYNC ? 0 : op->req.rw.len); + // Copy lambda to be unaffected by `delete op` + std::function(op->callback)(op); + cli->msgr.clients[peer_fd]->sent_ops.erase(op_id); +} + +void test1() +{ + json11::Json config; + timerfd_manager_t *tfd = new timerfd_manager_t([](int fd, bool wr, std::function callback){}); + cluster_client_t *cli = new cluster_client_t(NULL, tfd, config); + + int *r1 = test_write(cli, 0, 4096, 0x55); + configure_single_pg_pool(cli); + pretend_connected(cli, 1); + can_complete(r1); + check_op_count(cli, 1, 1); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 4096), 0); + check_completed(r1); + pretend_disconnected(cli, 1); + int *r2 = test_sync(cli); + pretend_connected(cli, 1); + check_op_count(cli, 1, 1); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 4096), 0); + check_op_count(cli, 1, 1); + can_complete(r2); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_SYNC, 0, 0), 0); + check_completed(r2); + // Check that the client doesn't repeat operations once more + pretend_disconnected(cli, 1); + pretend_connected(cli, 1); + check_op_count(cli, 1, 0); + + // Case: + // Write(1) -> Complete Write(1) -> Overwrite(2) -> Complete Write(2) + // -> Overwrite(3) -> Drop OSD connection -> Reestablish OSD connection + // -> Complete All Posted Writes -> Sync -> Complete Sync + // The resulting state of the block must be (3) over (2) over (1). + // I.e. the part overwritten by (3) must remain as in (3) and so on. + + // More interesting case: + // Same, but both Write(2) and Write(3) must consist of two parts: + // one from an OSD 2 that drops connection and other from OSD 1 that doesn't. + // The idea is that if the whole Write(2) is repeated when OSD 2 drops connection + // then it may also overwrite a part in OSD 1 which shouldn't be overwritten. + + // Another interesting case: + // A new operation added during replay (would also break with the previous implementation) + + r1 = test_write(cli, 0, 0x10000, 0x56); + can_complete(r1); + check_op_count(cli, 1, 1); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x10000), 0); + check_completed(r1); + + r1 = test_write(cli, 0xE000, 0x4000, 0x57); + can_complete(r1); + check_op_count(cli, 1, 1); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0xE000, 0x4000), 0); + check_completed(r1); + + r1 = test_write(cli, 0x10000, 0x4000, 0x58); + + pretend_disconnected(cli, 1); + cli->continue_ops(true); + pretend_connected(cli, 1); + + // Check replay + { + uint64_t replay_start = UINT64_MAX; + uint64_t replay_end = 0; + std::vector replay_ops; + auto osd_cl = cli->msgr.clients.at(cli->msgr.osd_peer_fds.at(1)); + for (auto & op_p: osd_cl->sent_ops) + { + auto op = op_p.second; + assert(op->req.hdr.opcode == OSD_OP_WRITE); + uint64_t offset = op->req.rw.offset; + if (op->req.rw.offset < replay_start) + replay_start = op->req.rw.offset; + if (op->req.rw.offset+op->req.rw.len > replay_end) + replay_end = op->req.rw.offset+op->req.rw.len; + for (int buf_idx = 0; buf_idx < op->iov.count; buf_idx++) + { + for (int i = 0; i < op->iov.buf[buf_idx].iov_len; i++, offset++) + { + uint8_t c = offset < 0xE000 ? 0x56 : (offset < 0x10000 ? 0x57 : 0x58); + if (((uint8_t*)op->iov.buf[buf_idx].iov_base)[i] != c) + { + printf("Write replay: mismatch at %lu\n", offset-op->req.rw.offset); + goto fail; + } + } + } + fail: + assert(offset == op->req.rw.offset+op->req.rw.len); + replay_ops.push_back(op); + } + assert(replay_start == 0); + assert(replay_end == 0x14000); + for (auto op: replay_ops) + { + pretend_op_completed(cli, op, 0); + } + } + // Check that the following write finally proceeds + check_op_count(cli, 1, 1); + can_complete(r1); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0x10000, 0x4000), 0); + check_completed(r1); + check_op_count(cli, 1, 0); + // Free client + delete cli; + delete tfd; + printf("[ok] write replay test\n"); +} + +void test2() +{ + std::map unsynced_writes; + cluster_op_t *op = new cluster_op_t(); + op->opcode = OSD_OP_WRITE; + op->inode = 1; + op->offset = 0; + op->len = 4096; + op->iov.push_back(malloc_or_die(4096*1024), 4096); + // 0-4k = 0x55 + memset(op->iov.buf[0].iov_base, 0x55, op->iov.buf[0].iov_len); + cluster_client_t::copy_write(op, unsynced_writes); + // 8k-12k = 0x66 + op->offset = 8192; + memset(op->iov.buf[0].iov_base, 0x66, op->iov.buf[0].iov_len); + cluster_client_t::copy_write(op, unsynced_writes); + // 4k-1M+4k = 0x77 + op->len = op->iov.buf[0].iov_len = 1048576; + op->offset = 4096; + memset(op->iov.buf[0].iov_base, 0x77, op->iov.buf[0].iov_len); + cluster_client_t::copy_write(op, unsynced_writes); + // check it + assert(unsynced_writes.size() == 4); + auto uit = unsynced_writes.begin(); + int i; + assert(uit->first.inode == 1); + assert(uit->first.stripe == 0); + assert(uit->second.len == 4096); + for (i = 0; i < uit->second.len && ((uint8_t*)uit->second.buf)[i] == 0x55; i++) {} + assert(i == uit->second.len); + uit++; + assert(uit->first.inode == 1); + assert(uit->first.stripe == 4096); + assert(uit->second.len == 4096); + for (i = 0; i < uit->second.len && ((uint8_t*)uit->second.buf)[i] == 0x77; i++) {} + assert(i == uit->second.len); + uit++; + assert(uit->first.inode == 1); + assert(uit->first.stripe == 8192); + assert(uit->second.len == 4096); + for (i = 0; i < uit->second.len && ((uint8_t*)uit->second.buf)[i] == 0x77; i++) {} + assert(i == uit->second.len); + uit++; + assert(uit->first.inode == 1); + assert(uit->first.stripe == 12*1024); + assert(uit->second.len == 1016*1024); + for (i = 0; i < uit->second.len && ((uint8_t*)uit->second.buf)[i] == 0x77; i++) {} + assert(i == uit->second.len); + uit++; + // free memory + free(op->iov.buf[0].iov_base); + delete op; + for (auto p: unsynced_writes) + { + free(p.second.buf); + } + printf("[ok] copy_write test\n"); +} + +int main(int narg, char *args[]) +{ + test1(); + test2(); + return 0; +}