diff --git a/cluster_client.cpp b/cluster_client.cpp index 8d6601c7..6587be5a 100644 --- a/cluster_client.cpp +++ b/cluster_client.cpp @@ -9,10 +9,32 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd msgr.ringloop = ringloop; msgr.repeer_pgs = [this](osd_num_t peer_osd) { - // peer_osd just connected or dropped connection if (msgr.osd_peer_fds.find(peer_osd) != msgr.osd_peer_fds.end()) { - // really connected :) + // peer_osd just connected + continue_ops(); + } + else if (unsynced_writes.size()) + { + // peer_osd just dropped connection + for (auto op: unsynced_writes) + { + for (auto & part: op->parts) + { + if (part.osd_num == peer_osd && part.done) + { + // repeat this operation + part.osd_num = 0; + part.done = false; + assert(!part.sent); + op->done_count--; + } + } + if (op->done_count < op->parts.size()) + { + cur_ops.insert(op); + } + } continue_ops(); } }; @@ -30,40 +52,19 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd void cluster_client_t::continue_ops() { - for (auto op_it = unsent_ops.begin(); op_it != unsent_ops.end(); ) + if (retry_timeout_id) { - cluster_op_t *op = *op_it; - if (op->needs_reslice && !op->sent_count) - { - op->parts.clear(); - op->done_count = 0; - op->needs_reslice = false; - } - if (!op->parts.size()) - { - unsent_ops.erase(op_it++); - execute(op); - continue; - } - if (!op->needs_reslice) - { - for (auto & op_part: op->parts) - { - if (!op_part.sent && !op_part.done) - { - try_send(op, &op_part); - } - } - if (op->sent_count == op->parts.size() - op->done_count) - { - unsent_ops.erase(op_it++); - sent_ops.insert(op); - } - else - op_it++; - } - else - op_it++; + tfd->clear_timer(retry_timeout_id); + retry_timeout_id = 0; + } + if (!pg_count) + { + // Config is not loaded yet + return; + } + for (auto op_it = cur_ops.begin(); op_it != cur_ops.end(); ) + { + continue_rw(*op_it++); } } @@ -88,33 +89,53 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config) bs_disk_alignment = config["disk_alignment"].uint64_value(); bs_bitmap_granularity = config["bitmap_granularity"].uint64_value(); if (!bs_block_size) - bs_block_size = DEFAULT_BLOCK_SIZE; - if (!bs_disk_alignment) - bs_disk_alignment = DEFAULT_DISK_ALIGNMENT; - if (!bs_bitmap_granularity) - bs_bitmap_granularity = DEFAULT_BITMAP_GRANULARITY; { - uint32_t block_order; - if ((block_order = is_power_of_two(bs_block_size)) >= 64 || bs_block_size < MIN_BLOCK_SIZE || bs_block_size >= MAX_BLOCK_SIZE) - throw std::runtime_error("Bad block size"); + bs_block_size = DEFAULT_BLOCK_SIZE; + } + if (!bs_disk_alignment) + { + bs_disk_alignment = DEFAULT_DISK_ALIGNMENT; + } + if (!bs_bitmap_granularity) + { + bs_bitmap_granularity = DEFAULT_BITMAP_GRANULARITY; + } + uint32_t block_order; + if ((block_order = is_power_of_two(bs_block_size)) >= 64 || bs_block_size < MIN_BLOCK_SIZE || bs_block_size >= MAX_BLOCK_SIZE) + { + throw std::runtime_error("Bad block size"); } if (config.find("pg_stripe_size") != config.end()) { pg_stripe_size = config["pg_stripe_size"].uint64_value(); if (!pg_stripe_size) + { pg_stripe_size = DEFAULT_PG_STRIPE_SIZE; + } } if (config["immediate_commit"] == "all") { // Cluster-wide immediate_commit mode immediate_commit = true; } + else if (config.find("client_dirty_limit") != config.end()) + { + client_dirty_limit = config["client_dirty_limit"].uint64_value(); + if (!client_dirty_limit) + { + client_dirty_limit = DEFAULT_CLIENT_DIRTY_LIMIT; + } + } msgr.peer_connect_interval = config["peer_connect_interval"].uint64_value(); if (!msgr.peer_connect_interval) + { msgr.peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL; + } msgr.peer_connect_timeout = config["peer_connect_timeout"].uint64_value(); if (!msgr.peer_connect_timeout) + { msgr.peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT; + } } void cluster_client_t::on_load_pgs_hook(bool success) @@ -122,7 +143,11 @@ void cluster_client_t::on_load_pgs_hook(bool success) if (success) { pg_count = st_cli.pg_config.size(); - continue_ops(); + for (auto op: offline_ops) + { + execute(op); + } + offline_ops.clear(); } } @@ -131,16 +156,24 @@ void cluster_client_t::on_change_hook(json11::Json::object & changes) if (pg_count != st_cli.pg_config.size()) { // At this point, all operations should be suspended - // And they need to be resliced! - for (auto op: unsent_ops) + // And they have to be resliced! + for (auto op: cur_ops) { op->needs_reslice = true; } - for (auto op: sent_ops) + for (auto op: unsynced_writes) { op->needs_reslice = true; } pg_count = st_cli.pg_config.size(); + if (pg_count) + { + for (auto op: offline_ops) + { + execute(op); + } + offline_ops.clear(); + } } continue_ops(); } @@ -153,53 +186,176 @@ void cluster_client_t::on_change_osd_state_hook(uint64_t peer_osd) } } -// FIXME: Implement OSD_OP_SYNC for immediate_commit == false +/** + * How writes are synced when immediate_commit is false + * + * WRITE: + * 1) copy op to the queue + * 2) slice op + * 3) connect & send all parts + * 4) if any of them fail due to disconnected peers, repeat after reconnecting + * 5) if any of them fail due to other errors, fail the operation + * 6) if PG count changes before all parts are done, wait for all in-progress parts to finish, + * throw all results away, reslice and resubmit op + * 7) when all parts are done, try to continue SYNCs + * + * SYNC: + * 1) add SYNC to the queue after writes + * 2) try to continue SYNCs + * + * Continue SYNCs: + * 1) check the queue for a SYNC ready for submission. if there is one: + * 2) slice it + * 3) connect & send all SYNC parts + * 4) if any of them fail due to disconnected peers, repeat SYNC after repeating all writes + * 5) if any of them fail due to other errors, fail the SYNC operation + */ + void cluster_client_t::execute(cluster_op_t *op) { - if (op->opcode == OSD_OP_SYNC && immediate_commit) - { - // Syncs are not required in the immediate_commit mode - op->retval = 0; - std::function(op->callback)(op); - return; - } - if (op->opcode != OSD_OP_READ && op->opcode != OSD_OP_OUT || !op->inode || !op->len || - op->offset % bs_disk_alignment || op->len % bs_disk_alignment) + op->retval = 0; + if (op->opcode != OSD_OP_SYNC && op->opcode != OSD_OP_READ && op->opcode != OSD_OP_WRITE || + (op->opcode == OSD_OP_READ || op->opcode == OSD_OP_WRITE) && (!op->inode || !op->len || + op->offset % bs_disk_alignment || op->len % bs_disk_alignment)) { op->retval = -EINVAL; std::function(op->callback)(op); return; } - if (!pg_stripe_size) + if (!pg_count) { - // Config is not loaded yet - unsent_ops.insert(op); + // Config is not loaded yet, retry after connecting to etcd + offline_ops.push_back(op); + return; + } + if (op->opcode == OSD_OP_SYNC) + { + execute_sync(op); return; } if (op->opcode == OSD_OP_WRITE && !immediate_commit) { - // Copy operation + if (next_writes.size() > 0) + { + next_writes.push_back(op); + return; + } + if (queued_bytes >= client_dirty_limit) + { + // Push an extra SYNC operation to flush previous writes + cluster_op_t *sync_op = new cluster_op_t; + sync_op->is_internal = true; + sync_op->opcode = OSD_OP_SYNC; + sync_op->callback = [](cluster_op_t* sync_op) {}; + execute_sync(sync_op); + next_writes.push_back(op); + queued_bytes = 0; + return; + } + queued_bytes += op->len; + } + cur_ops.insert(op); + continue_rw(op); +} + +void cluster_client_t::continue_rw(cluster_op_t *op) +{ + if (!pg_count) + { + return; + } + if (op->opcode == OSD_OP_WRITE && !immediate_commit && !op->is_internal) + { + // Save operation for replay when PG goes out of sync + // (primary OSD drops our connection in this case) cluster_op_t *op_copy = new cluster_op_t(); + op_copy->is_internal = true; + op_copy->orig_op = op; op_copy->opcode = op->opcode; op_copy->inode = op->inode; op_copy->offset = op->offset; op_copy->len = op->len; op_copy->buf = malloc(op->len); + op_copy->callback = [](cluster_op_t* op_copy) + { + if (op_copy->orig_op) + { + // Acknowledge write and forget the original pointer + op_copy->orig_op->retval = op_copy->retval; + std::function(op_copy->orig_op->callback)(op_copy->orig_op); + op_copy->orig_op = NULL; + } + }; memcpy(op_copy->buf, op->buf, op->len); - unsynced_ops.push_back(op_copy); - unsynced_bytes += op->len; - if (inmemory_commit) + unsynced_writes.push_back(op_copy); + cur_ops.erase(op); + cur_ops.insert(op_copy); + op = op_copy; + } + if (!op->parts.size()) + { + // Slice the operation into parts + slice_rw(op); + } + if (!op->needs_reslice) + { + // Send unsent parts, if they're not subject to change + for (auto & op_part: op->parts) { - // Immediately acknowledge write and continue with the copy - op->retval = op->len; - std::function(op->callback)(op); - op = op_copy; - } - if (unsynced_bytes >= inmemory_dirty_limit) - { - // Push an extra SYNC operation + if (!op_part.sent && !op_part.done) + { + try_send(op, &op_part); + } } } + if (!op->sent_count) + { + if (op->done_count >= op->parts.size()) + { + // Finished successfully + // Even if the PG count has changed in meanwhile we treat it as success + // because if some operations were invalid for the new PG count we'd get errors + cur_ops.erase(op); + op->retval = op->len; + std::function(op->callback)(op); + continue_sync(); + } + else if (op->retval != 0 && op->retval != -EPIPE) + { + // Fatal error (not -EPIPE) + cur_ops.erase(op); + if (!immediate_commit && op->opcode == OSD_OP_WRITE) + { + for (int i = 0; i < unsynced_writes.size(); i++) + { + if (unsynced_writes[i] == op) + { + unsynced_writes.erase(unsynced_writes.begin()+i, unsynced_writes.begin()+i+1); + break; + } + } + } + bool del = op->is_internal; + std::function(op->callback)(op); + if (del) + { + delete op; + } + continue_sync(); + } + else if (op->needs_reslice) + { + op->parts.clear(); + op->done_count = 0; + op->needs_reslice = false; + continue_rw(op); + } + // else -EPIPE + } +} + +void cluster_client_t::slice_rw(cluster_op_t *op) +{ // Slice the request into individual object stripe requests // Primary OSDs still operate individual stripes, but their size is multiplied by PG minsize in case of EC uint64_t pg_block_size = bs_block_size * pg_part_count; @@ -213,8 +369,8 @@ void cluster_client_t::execute(cluster_op_t *op) part_count++; } } + op->retval = 0; op->parts.resize(part_count); - bool resend = false; int i = 0; for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size) { @@ -231,22 +387,9 @@ void cluster_client_t::execute(cluster_op_t *op) .sent = false, .done = false, }; - if (!try_send(op, &op->parts[i])) - { - // Part needs to be sent later - resend = true; - } i++; } } - if (resend) - { - unsent_ops.insert(op); - } - else - { - sent_ops.insert(op); - } } bool cluster_client_t::try_send(cluster_op_t *op, cluster_op_part_t *part) @@ -286,7 +429,7 @@ bool cluster_client_t::try_send(cluster_op_t *op, cluster_op_part_t *part) { part->op.send_list.push_back(part->buf, part->len); } - else + else /* if (op->opcode == OSD_OP_READ) */ { part->op.buf = part->buf; } @@ -301,36 +444,163 @@ bool cluster_client_t::try_send(cluster_op_t *op, cluster_op_part_t *part) return false; } +void cluster_client_t::execute_sync(cluster_op_t *op) +{ + if (immediate_commit) + { + // Syncs are not required in the immediate_commit mode + op->retval = 0; + std::function(op->callback)(op); + } + else if (next_writes.size() > 0 || cur_sync != NULL) + { + // There are some writes postponed to be executed after SYNC + // Push this SYNC after them + next_writes.push_back(op); + } + else + { + cur_sync = op; + continue_sync(); + } +} + +void cluster_client_t::continue_sync() +{ + if (!cur_sync || cur_sync->parts.size() > 0) + { + // Already submitted + return; + } + cur_sync->retval = 0; + std::set sync_osds; + for (auto prev_op: unsynced_writes) + { + if (prev_op->done_count < prev_op->parts.size()) + { + // Writes not finished yet + return; + } + for (auto & part: prev_op->parts) + { + if (part.osd_num) + { + sync_osds.insert(part.osd_num); + } + } + } + if (!sync_osds.size()) + { + // No dirty writes + finish_sync(); + return; + } + // Check that all OSD connections are still alive + for (auto sync_osd: sync_osds) + { + auto peer_it = msgr.osd_peer_fds.find(sync_osd); + if (peer_it == msgr.osd_peer_fds.end()) + { + // SYNC is pointless to send to a non connected OSD + return; + } + } + // Post sync to affected OSDs + cur_sync->parts.resize(sync_osds.size()); + int i = 0; + for (auto sync_osd: sync_osds) + { + cur_sync->parts[i] = { + .parent = cur_sync, + .osd_num = sync_osd, + .sent = false, + .done = false, + }; + send_sync(cur_sync, &cur_sync->parts[i]); + i++; + } +} + +void cluster_client_t::finish_sync() +{ + if (cur_sync->retval == -EPIPE) + { + // Retry later + cur_sync->parts.clear(); + cur_sync->retval = 0; + cur_sync->sent_count = 0; + cur_sync->done_count = 0; + return; + } + std::function(cur_sync->callback)(cur_sync); + if (!cur_sync->retval) + { + for (auto op: unsynced_writes) + { + if (op->is_internal) + { + delete op; + } + } + unsynced_writes.clear(); + } + cur_sync = NULL; + int i; + for (i = 0; i < next_writes.size() && !cur_sync; i++) + { + execute(next_writes[i]); + } + next_writes.erase(next_writes.begin(), next_writes.begin()+i); +} + +void cluster_client_t::send_sync(cluster_op_t *op, cluster_op_part_t *part) +{ + auto peer_it = msgr.osd_peer_fds.find(part->osd_num); + assert(peer_it != msgr.osd_peer_fds.end()); + part->sent = true; + op->sent_count++; + part->op = { + .op_type = OSD_OP_OUT, + .peer_fd = peer_it->second, + .req = { + .hdr = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = op_id++, + .opcode = OSD_OP_SYNC, + }, + }, + .callback = [this, part](osd_op_t *op_part) + { + handle_op_part(part); + }, + }; + part->op.send_list.push_back(part->op.req.buf, OSD_PACKET_SIZE); + msgr.outbox_push(&part->op); +} + void cluster_client_t::handle_op_part(cluster_op_part_t *part) { cluster_op_t *op = part->parent; part->sent = false; op->sent_count--; part->op.buf = NULL; - if (part->op.reply.hdr.retval != part->op.req.rw.len) + int expected = part->op.req.hdr.opcode == OSD_OP_SYNC ? 0 : part->op.req.rw.len; + if (part->op.reply.hdr.retval != expected) { // Operation failed, retry printf( - "Operation part failed on OSD %lu: retval=%ld (expected %u), reconnecting\n", - part->osd_num, part->op.reply.hdr.retval, part->op.req.rw.len + "Operation failed on OSD %lu: retval=%ld (expected %d), dropping connection\n", + part->osd_num, part->op.reply.hdr.retval, expected ); msgr.stop_client(part->op.peer_fd); - if (op->sent_count == op->parts.size() - op->done_count - 1) + if (part->op.reply.hdr.retval && !retry_timeout_id) { - // Resend later when OSDs come up - // FIXME: Check for different types of errors - // FIXME: Repeat operations after a small timeout, for the case when OSD is coming up - sent_ops.erase(op); - unsent_ops.insert(op); + retry_timeout_id = tfd->set_timer(up_wait_retry_interval, false, [this](int) { retry_timeout_id = 0; continue_ops(); }); } - if (op->sent_count == 0 && op->needs_reslice) + if (!op->retval || op->retval == -EPIPE) { - // PG count has changed, reslice the operation - unsent_ops.erase(op); - op->parts.clear(); - op->done_count = 0; - op->needs_reslice = false; - execute(op); + // Don't overwrite other errors with -EPIPE + op->retval = part->op.reply.hdr.retval; } } else @@ -338,12 +608,17 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part) // OK part->done = true; op->done_count++; - if (op->done_count >= op->parts.size()) + } + if (op->sent_count == 0) + { + if (op->opcode == OSD_OP_SYNC) { - // Finished! - sent_ops.erase(op); - op->retval = op->len; - std::function(op->callback)(op); + assert(op == cur_sync); + finish_sync(); + } + else + { + continue_rw(op); } } } diff --git a/cluster_client.h b/cluster_client.h index d12c0ae4..de4fa4b3 100644 --- a/cluster_client.h +++ b/cluster_client.h @@ -9,6 +9,7 @@ #define DEFAULT_PG_STRIPE_SIZE 4*1024*1024 #define DEFAULT_DISK_ALIGNMENT 4096 #define DEFAULT_BITMAP_GRANULARITY 4096 +#define DEFAULT_CLIENT_DIRTY_LIMIT 32*1024*1024 struct cluster_op_t; @@ -35,6 +36,8 @@ struct cluster_op_t void *buf; std::function callback; protected: + cluster_op_t *orig_op = NULL; + bool is_internal = false; bool needs_reslice = false; int sent_count = 0, done_count = 0; std::vector parts; @@ -53,17 +56,24 @@ class cluster_client_t uint64_t bs_bitmap_granularity = 0; uint64_t pg_count = 0; bool immediate_commit = false; - bool inmemory_commit = false; - uint64_t inmemory_dirty_limit = 32*1024*1024; + // FIXME: Implement inmemory_commit mode. Note that it requires to return overlapping reads from memory. + uint64_t client_dirty_limit = 0; int log_level; + int up_wait_retry_interval = 500; // ms uint64_t op_id = 1; etcd_state_client_t st_cli; osd_messenger_t msgr; - std::set sent_ops, unsent_ops; + // operations currently in progress + std::set cur_ops; + int retry_timeout_id = 0; // unsynced operations are copied in memory to allow replay when cluster isn't in the immediate_commit mode - std::vector unsynced_ops; - uint64_t unsynced_bytes = 0; + // unsynced_writes are replayed in any order (because only the SYNC operation guarantees ordering) + std::vector unsynced_writes; + cluster_op_t* cur_sync = NULL; + std::vector next_writes; + std::vector offline_ops; + uint64_t queued_bytes = 0; public: cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config); @@ -71,10 +81,16 @@ public: protected: void continue_ops(); - void on_load_config_hook(json11::Json::object & cfg); + void on_load_config_hook(json11::Json::object & config); void on_load_pgs_hook(bool success); void on_change_hook(json11::Json::object & changes); void on_change_osd_state_hook(uint64_t peer_osd); + void continue_rw(cluster_op_t *op); + void slice_rw(cluster_op_t *op); bool try_send(cluster_op_t *op, cluster_op_part_t *part); + void execute_sync(cluster_op_t *op); + void continue_sync(); + void finish_sync(); + void send_sync(cluster_op_t *op, cluster_op_part_t *part); void handle_op_part(cluster_op_part_t *part); }; diff --git a/messenger.h b/messenger.h index 9755d9de..723a0dd8 100644 --- a/messenger.h +++ b/messenger.h @@ -131,7 +131,7 @@ struct osd_client_t std::deque outbox; std::map sent_ops; - // PGs dirtied by this client's primary-writes (FIXME to drop the connection) + // PGs dirtied by this client's primary-writes std::set dirty_pgs; // Write state diff --git a/osd_peering.cpp b/osd_peering.cpp index cf16cd53..5e5f1614 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -133,6 +133,22 @@ void osd_t::start_pg_peering(pg_num_t pg_num) it++; } dirty_pgs.erase(pg.pg_num); + // Drop connections of clients who have this PG in dirty_pgs + if (immediate_commit != IMMEDIATE_ALL) + { + std::vector to_stop; + for (auto & cp: c_cli.clients) + { + if (cp.second.dirty_pgs.find(pg_num) != cp.second.dirty_pgs.end()) + { + to_stop.push_back(cp.first); + } + } + for (auto peer_fd: to_stop) + { + c_cli.stop_client(peer_fd); + } + } // Calculate current write OSD set pg.pg_cursize = 0; pg.cur_set.resize(pg.target_set.size());