diff --git a/src/cluster_client.cpp b/src/cluster_client.cpp index 3100a70ea..a1a4b5b88 100644 --- a/src/cluster_client.cpp +++ b/src/cluster_client.cpp @@ -5,6 +5,9 @@ #include #include "cluster_client.h" +#define PART_SENT 1 +#define PART_DONE 2 +#define PART_ERROR 4 #define CACHE_DIRTY 1 #define CACHE_FLUSHING 2 #define CACHE_REPEATING 4 @@ -30,7 +33,6 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd { // peer_osd just dropped connection // determine WHICH dirty_buffers are now obsolete and repeat them - dirty_osds.erase(peer_osd); for (auto & wr: dirty_buffers) { if (affects_osd(wr.first.inode, wr.first.stripe, wr.second.len, peer_osd) && @@ -97,49 +99,42 @@ void cluster_client_t::continue_ops(bool up_retry) int j = 0; for (int i = 0; i < op_queue.size(); i++) { - bool rm = false; + bool rm = false, is_flush = op_queue[i]->flags & OP_FLUSH_BUFFER; + auto opcode = op_queue[i]->opcode; if (!op_queue[i]->up_wait || up_retry) { op_queue[i]->up_wait = false; - if (op_queue[i]->opcode == OSD_OP_READ) + if (opcode == OSD_OP_READ || opcode == OSD_OP_WRITE) { - rm = continue_rw(op_queue[i]); - } - else if (op_queue[i]->opcode == OSD_OP_WRITE) - { - if (op_queue[i]->flags & OP_FLUSH_BUFFER) - { - rm = continue_rw(op_queue[i]); - if (!rm) - { - // Regular writes can't proceed before buffer flushes - has_flushes = true; - } - } - else if (!has_flushes) + if (is_flush || !has_flushes) { + // Regular writes can't proceed before buffer flushes rm = continue_rw(op_queue[i]); } - if (!rm) - { - has_writes = true; - } } - else if (op_queue[i]->opcode == OSD_OP_SYNC) + else if (opcode == OSD_OP_SYNC) { if (!has_writes) { // SYNC can't proceed before previous writes rm = continue_sync(op_queue[i]); - if (!rm) - { - // Postpone writes until previous SYNC completes - // ...so dirty_writes can't contain anything newer than SYNC - has_flushes = true; - } } } } + if (opcode == OSD_OP_WRITE) + { + has_writes = has_writes || !rm; + if (is_flush) + { + has_flushes = has_writes || !rm; + } + } + else if (opcode == OSD_OP_SYNC) + { + // Postpone writes until previous SYNC completes + // ...so dirty_writes can't contain anything newer than SYNC + has_flushes = has_writes || !rm; + } if (!rm) { op_queue[j++] = op_queue[i]; @@ -185,13 +180,26 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config) // Cluster-wide immediate_commit mode immediate_commit = true; } + if (config.find("client_max_dirty_bytes") != config.end()) + { + client_max_dirty_bytes = config["client_max_dirty_bytes"].uint64_value(); + } else if (config.find("client_dirty_limit") != config.end()) { - client_dirty_limit = config["client_dirty_limit"].uint64_value(); + // Old name + client_max_dirty_bytes = config["client_dirty_limit"].uint64_value(); } - if (!client_dirty_limit) + if (config.find("client_max_dirty_ops") != config.end()) { - client_dirty_limit = DEFAULT_CLIENT_DIRTY_LIMIT; + client_max_dirty_ops = config["client_max_dirty_ops"].uint64_value(); + } + if (!client_max_dirty_bytes) + { + client_max_dirty_bytes = DEFAULT_CLIENT_MAX_DIRTY_BYTES; + } + if (!client_max_dirty_ops) + { + client_max_dirty_ops = DEFAULT_CLIENT_MAX_DIRTY_OPS; } up_wait_retry_interval = config["up_wait_retry_interval"].uint64_value(); if (!up_wait_retry_interval) @@ -306,7 +314,7 @@ void cluster_client_t::execute(cluster_op_t *op) op->retval = 0; if (op->opcode == OSD_OP_WRITE && !immediate_commit) { - if (dirty_bytes >= client_dirty_limit) + if (dirty_bytes >= client_max_dirty_bytes || dirty_ops >= client_max_dirty_ops) { // Push an extra SYNC operation to flush previous writes cluster_op_t *sync_op = new cluster_op_t; @@ -317,12 +325,15 @@ void cluster_client_t::execute(cluster_op_t *op) }; op_queue.push_back(sync_op); dirty_bytes = 0; + dirty_ops = 0; } dirty_bytes += op->len; + dirty_ops++; } else if (op->opcode == OSD_OP_SYNC) { dirty_bytes = 0; + dirty_ops = 0; } op_queue.push_back(op); continue_ops(); @@ -457,7 +468,7 @@ resume_0: } if (op->opcode == OSD_OP_WRITE) { - if (!immediate_commit) + if (!immediate_commit && !(op->flags & OP_FLUSH_BUFFER)) { copy_write(op, dirty_buffers); } @@ -469,13 +480,33 @@ resume_1: resume_2: // Send unsent parts, if they're not subject to change op->state = 3; + if (op->needs_reslice) + { + for (int i = 0; i < op->parts.size(); i++) + { + if (!(op->parts[i].flags & PART_SENT) && op->retval) + { + op->retval = -EPIPE; + } + } + goto resume_3; + } for (int i = 0; i < op->parts.size(); i++) { - if (!op->parts[i].sent && !op->parts[i].done) + if (!(op->parts[i].flags & PART_SENT)) { if (!try_send(op, i)) { // We'll need to retry again + op->up_wait = true; + if (!retry_timeout_id) + { + retry_timeout_id = tfd->set_timer(up_wait_retry_interval, false, [this](int) + { + retry_timeout_id = 0; + continue_ops(true); + }); + } op->state = 2; } } @@ -485,7 +516,7 @@ resume_2: return 0; } resume_3: - if (op->sent_count > 0) + if (op->inflight_count > 0) { op->state = 3; return 0; @@ -517,6 +548,10 @@ resume_3: } else { + for (int i = 0; i < op->parts.size(); i++) + { + op->parts[i].flags = 0; + } goto resume_2; } } @@ -548,8 +583,7 @@ void cluster_client_t::slice_rw(cluster_op_t *op) .offset = begin, .len = (uint32_t)(end - begin), .pg_num = pg_num, - .sent = false, - .done = false, + .flags = 0, }; int left = end-begin; while (left > 0 && iov_idx < op->iov.count) @@ -606,8 +640,8 @@ bool cluster_client_t::try_send(cluster_op_t *op, int i) { int peer_fd = peer_it->second; part->osd_num = primary_osd; - part->sent = true; - op->sent_count++; + part->flags |= PART_SENT; + op->inflight_count++; part->op = (osd_op_t){ .op_type = OSD_OP_OUT, .peer_fd = peer_fd, @@ -675,8 +709,7 @@ int cluster_client_t::continue_sync(cluster_op_t *op) op->parts[i] = { .parent = op, .osd_num = sync_osd, - .sent = false, - .done = false, + .flags = 0, }; send_sync(op, &op->parts[i]); i++; @@ -684,7 +717,7 @@ int cluster_client_t::continue_sync(cluster_op_t *op) } dirty_osds.clear(); resume_1: - if (op->sent_count > 0) + if (op->inflight_count > 0) { op->state = 1; return 0; @@ -703,7 +736,7 @@ resume_1: // Retry later op->parts.clear(); op->retval = 0; - op->sent_count = 0; + op->inflight_count = 0; op->done_count = 0; op->state = 0; return 0; @@ -730,8 +763,8 @@ void cluster_client_t::send_sync(cluster_op_t *op, cluster_op_part_t *part) { auto peer_it = msgr.osd_peer_fds.find(part->osd_num); assert(peer_it != msgr.osd_peer_fds.end()); - part->sent = true; - op->sent_count++; + part->flags |= PART_SENT; + op->inflight_count++; part->op = (osd_op_t){ .op_type = OSD_OP_OUT, .peer_fd = peer_it->second, @@ -753,8 +786,7 @@ void cluster_client_t::send_sync(cluster_op_t *op, cluster_op_part_t *part) void cluster_client_t::handle_op_part(cluster_op_part_t *part) { cluster_op_t *op = part->parent; - part->sent = false; - op->sent_count--; + op->inflight_count--; int expected = part->op.req.hdr.opcode == OSD_OP_SYNC ? 0 : part->op.req.rw.len; if (part->op.reply.hdr.retval != expected) { @@ -763,9 +795,9 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part) "%s operation failed on OSD %lu: retval=%ld (expected %d), dropping connection\n", osd_op_names[part->op.req.hdr.opcode], part->osd_num, part->op.reply.hdr.retval, expected ); - msgr.stop_client(part->op.peer_fd); if (part->op.reply.hdr.retval == -EPIPE) { + // Mark op->up_wait = true before stopping the client op->up_wait = true; if (!retry_timeout_id) { @@ -781,15 +813,17 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part) // Don't overwrite other errors with -EPIPE op->retval = part->op.reply.hdr.retval; } + msgr.stop_client(part->op.peer_fd); + part->flags |= PART_ERROR; } else { // OK dirty_osds.insert(part->osd_num); - part->done = true; + part->flags |= PART_DONE; op->done_count++; } - if (op->sent_count == 0) + if (op->inflight_count == 0) { continue_ops(); } diff --git a/src/cluster_client.h b/src/cluster_client.h index 0bce1d561..dfa8aa7a2 100644 --- a/src/cluster_client.h +++ b/src/cluster_client.h @@ -10,7 +10,8 @@ #define MAX_BLOCK_SIZE 128*1024*1024 #define DEFAULT_DISK_ALIGNMENT 4096 #define DEFAULT_BITMAP_GRANULARITY 4096 -#define DEFAULT_CLIENT_DIRTY_LIMIT 32*1024*1024 +#define DEFAULT_CLIENT_MAX_DIRTY_BYTES 32*1024*1024 +#define DEFAULT_CLIENT_MAX_DIRTY_OPS 1024 struct cluster_op_t; @@ -22,8 +23,7 @@ struct cluster_op_part_t pg_num_t pg_num; osd_num_t osd_num; osd_op_buf_list_t iov; - bool sent; - bool done; + unsigned flags; osd_op_t op; }; @@ -43,7 +43,7 @@ protected: cluster_op_t *orig_op = NULL; bool needs_reslice = false; bool up_wait = false; - int sent_count = 0, done_count = 0; + int inflight_count = 0, done_count = 0; std::vector parts; friend class cluster_client_t; }; @@ -66,7 +66,8 @@ class cluster_client_t std::map pg_counts; bool immediate_commit = false; // FIXME: Implement inmemory_commit mode. Note that it requires to return overlapping reads from memory. - uint64_t client_dirty_limit = 0; + uint64_t client_max_dirty_bytes = 0; + uint64_t client_max_dirty_ops = 0; int log_level; int up_wait_retry_interval = 500; // ms @@ -76,7 +77,7 @@ class cluster_client_t std::deque op_queue; std::map dirty_buffers; std::set dirty_osds; - uint64_t dirty_bytes = 0; + uint64_t dirty_bytes = 0, dirty_ops = 0; bool pgs_loaded = false; ring_consumer_t consumer; diff --git a/src/messenger.cpp b/src/messenger.cpp index 5854c100e..2aa5f11a0 100644 --- a/src/messenger.cpp +++ b/src/messenger.cpp @@ -180,23 +180,12 @@ void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer on_connect_peer(peer_osd, -errno); return; } - int timeout_id = -1; - if (peer_connect_timeout > 0) - { - timeout_id = tfd->set_timer(1000*peer_connect_timeout, false, [this, peer_fd](int timer_id) - { - osd_num_t peer_osd = clients.at(peer_fd)->osd_num; - stop_client(peer_fd, true); - on_connect_peer(peer_osd, -EIO); - return; - }); - } clients[peer_fd] = new osd_client_t((osd_client_t){ .peer_addr = addr, .peer_port = peer_port, .peer_fd = peer_fd, .peer_state = PEER_CONNECTING, - .connect_timeout_id = timeout_id, + .connect_timeout_id = -1, .osd_num = peer_osd, .in_buf = malloc_or_die(receive_buffer_size), }); @@ -205,6 +194,16 @@ void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer // Either OUT (connected) or HUP handle_connect_epoll(peer_fd); }); + if (peer_connect_timeout > 0) + { + clients[peer_fd]->connect_timeout_id = tfd->set_timer(1000*peer_connect_timeout, false, [this, peer_fd](int timer_id) + { + osd_num_t peer_osd = clients.at(peer_fd)->osd_num; + stop_client(peer_fd, true); + on_connect_peer(peer_osd, -EIO); + return; + }); + } } void osd_messenger_t::handle_connect_epoll(int peer_fd) diff --git a/src/msgr_stop.cpp b/src/msgr_stop.cpp index e4e400459..57a8f5859 100644 --- a/src/msgr_stop.cpp +++ b/src/msgr_stop.cpp @@ -8,12 +8,19 @@ void osd_messenger_t::cancel_osd_ops(osd_client_t *cl) { + std::vector cancel_ops; + cancel_ops.resize(cl->sent_ops.size()); + int i = 0; for (auto p: cl->sent_ops) { - cancel_op(p.second); + cancel_ops[i++] = p.second; } cl->sent_ops.clear(); cl->outbox.clear(); + for (auto op: cancel_ops) + { + cancel_op(op); + } } void osd_messenger_t::cancel_op(osd_op_t *op) diff --git a/src/test_cluster_client.cpp b/src/test_cluster_client.cpp index b89eef5e3..91446ad72 100644 --- a/src/test_cluster_client.cpp +++ b/src/test_cluster_client.cpp @@ -124,6 +124,15 @@ void pretend_disconnected(cluster_client_t *cli, osd_num_t osd_num) cli->msgr.stop_client(cli->msgr.osd_peer_fds.at(osd_num)); } +void check_disconnected(cluster_client_t *cli, osd_num_t osd_num) +{ + if (cli->msgr.osd_peer_fds.find(osd_num) != cli->msgr.osd_peer_fds.end()) + { + printf("OSD %lu not disconnected as it ought to be\n", osd_num); + assert(0); + } +} + void check_op_count(cluster_client_t *cli, osd_num_t osd_num, int ops) { int peer_fd = cli->msgr.osd_peer_fds.at(osd_num); @@ -152,20 +161,20 @@ osd_op_t *find_op(cluster_client_t *cli, osd_num_t osd_num, uint64_t opcode, uin return NULL; } -void pretend_op_completed(cluster_client_t *cli, osd_op_t *op, int retval) +void pretend_op_completed(cluster_client_t *cli, osd_op_t *op, int64_t retval) { assert(op); printf("Pretend completed %s %lx+%x\n", op->req.hdr.opcode == OSD_OP_SYNC ? "sync" : (op->req.hdr.opcode == OSD_OP_WRITE ? "write" : "read"), op->req.rw.offset, op->req.rw.len); uint64_t op_id = op->req.hdr.id; int peer_fd = op->peer_fd; + cli->msgr.clients[peer_fd]->sent_ops.erase(op_id); op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; op->reply.hdr.id = op->req.hdr.id; op->reply.hdr.opcode = op->req.hdr.opcode; op->reply.hdr.retval = retval < 0 ? retval : (op->req.hdr.opcode == OSD_OP_SYNC ? 0 : op->req.rw.len); // Copy lambda to be unaffected by `delete op` std::function(op->callback)(op); - cli->msgr.clients[peer_fd]->sent_ops.erase(op_id); } void test1() @@ -177,6 +186,7 @@ void test1() int *r1 = test_write(cli, 0, 4096, 0x55); configure_single_pg_pool(cli); pretend_connected(cli, 1); + cli->continue_ops(true); can_complete(r1); check_op_count(cli, 1, 1); pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 4096), 0); @@ -184,6 +194,8 @@ void test1() pretend_disconnected(cli, 1); int *r2 = test_sync(cli); pretend_connected(cli, 1); + check_op_count(cli, 1, 0); + cli->continue_ops(true); check_op_count(cli, 1, 1); pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 4096), 0); check_op_count(cli, 1, 1); @@ -226,8 +238,8 @@ void test1() r1 = test_write(cli, 0x10000, 0x4000, 0x58); pretend_disconnected(cli, 1); - cli->continue_ops(true); pretend_connected(cli, 1); + cli->continue_ops(true); // Check replay { @@ -260,8 +272,11 @@ void test1() assert(offset == op->req.rw.offset+op->req.rw.len); replay_ops.push_back(op); } - assert(replay_start == 0); - assert(replay_end == 0x14000); + if (replay_start != 0 || replay_end != 0x14000) + { + printf("Write replay: range mismatch: %lx-%lx\n", replay_start, replay_end); + assert(0); + } for (auto op: replay_ops) { pretend_op_completed(cli, op, 0); @@ -273,6 +288,28 @@ void test1() pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0x10000, 0x4000), 0); check_completed(r1); check_op_count(cli, 1, 0); + + // Check sync + r2 = test_sync(cli); + can_complete(r2); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_SYNC, 0, 0), 0); + check_completed(r2); + + // Check disconnect during write + r1 = test_write(cli, 0, 4096, 0x59); + check_op_count(cli, 1, 1); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), -EPIPE); + check_disconnected(cli, 1); + pretend_connected(cli, 1); + check_op_count(cli, 1, 0); + cli->continue_ops(true); + check_op_count(cli, 1, 1); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0); + check_op_count(cli, 1, 1); + can_complete(r1); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0); + check_completed(r1); + // Free client delete cli; delete tfd; diff --git a/src/timerfd_manager.cpp b/src/timerfd_manager.cpp index ba5b0f24c..a2c71f837 100644 --- a/src/timerfd_manager.cpp +++ b/src/timerfd_manager.cpp @@ -121,7 +121,7 @@ again: exp.it_value.tv_sec--; exp.it_value.tv_nsec += 1000000000; } - if (exp.it_value.tv_sec < 0 || !exp.it_value.tv_sec && !exp.it_value.tv_nsec) + if (exp.it_value.tv_sec < 0 || exp.it_value.tv_sec == 0 && exp.it_value.tv_nsec <= 0) { // It already happened trigger_nearest(); @@ -159,6 +159,6 @@ void timerfd_manager_t::trigger_nearest() { timers.erase(timers.begin()+nearest, timers.begin()+nearest+1); } - cb(nearest_id); nearest = -1; + cb(nearest_id); }