diff --git a/src/cluster_client.cpp b/src/cluster_client.cpp index 3682f08c..0a5c20f2 100644 --- a/src/cluster_client.cpp +++ b/src/cluster_client.cpp @@ -10,7 +10,7 @@ #define PART_ERROR 4 #define CACHE_DIRTY 1 #define CACHE_FLUSHING 2 -#define CACHE_REPEATING 4 +#define CACHE_REPEATING 3 #define OP_FLUSH_BUFFER 2 cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config) @@ -36,10 +36,10 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd for (auto & wr: dirty_buffers) { if (affects_osd(wr.first.inode, wr.first.stripe, wr.second.len, peer_osd) && - !(wr.second.state & CACHE_REPEATING)) + wr.second.state != CACHE_REPEATING) { // FIXME: Flush in larger parts - flush_buffer(wr.first, wr.second); + flush_buffer(wr.first, &wr.second); } } continue_ops(); @@ -103,21 +103,22 @@ void cluster_client_t::continue_ops(bool up_retry) } restart: continuing_ops = 1; + op_queue_pos = 0; bool has_flushes = false, has_writes = false; - int j = 0; - for (int i = 0; i < op_queue.size(); i++) + while (op_queue_pos < op_queue.size()) { - bool rm = false, is_flush = op_queue[i]->flags & OP_FLUSH_BUFFER; - auto opcode = op_queue[i]->opcode; - if (!op_queue[i]->up_wait || up_retry) + auto op = op_queue[op_queue_pos]; + bool rm = false, is_flush = op->flags & OP_FLUSH_BUFFER; + auto opcode = op->opcode; + if (!op->up_wait || up_retry) { - op_queue[i]->up_wait = false; + op->up_wait = false; if (opcode == OSD_OP_READ || opcode == OSD_OP_WRITE) { if (is_flush || !has_flushes) { // Regular writes can't proceed before buffer flushes - rm = continue_rw(op_queue[i]); + rm = continue_rw(op); } } else if (opcode == OSD_OP_SYNC) @@ -125,7 +126,7 @@ restart: if (!has_writes) { // SYNC can't proceed before previous writes - rm = continue_sync(op_queue[i]); + rm = continue_sync(op); } } } @@ -143,20 +144,20 @@ restart: // ...so dirty_writes can't contain anything newer than SYNC has_flushes = has_writes || !rm; } - if (!rm) + if (rm) { - op_queue[j++] = op_queue[i]; + op_queue.erase(op_queue.begin()+op_queue_pos, op_queue.begin()+op_queue_pos+1); + } + else + { + op_queue_pos++; + } + if (continuing_ops == 2) + { + goto restart; } } - op_queue.resize(j); - if (continuing_ops == 2) - { - goto restart; - } - else - { - continuing_ops = 0; - } + continuing_ops = 0; } static uint32_t is_power_of_two(uint64_t value) @@ -433,21 +434,30 @@ void cluster_client_t::copy_write(cluster_op_t *op, std::mapstate = CACHE_REPEATING; cluster_op_t *op = new cluster_op_t; op->flags = OP_FLUSH_BUFFER; op->opcode = OSD_OP_WRITE; op->inode = oid.inode; op->offset = oid.stripe; - op->len = wr.len; - op->iov.push_back(wr.buf, wr.len); - op->callback = [](cluster_op_t* op) + op->len = wr->len; + op->iov.push_back(wr->buf, wr->len); + op->callback = [wr](cluster_op_t* op) { + if (wr->state == CACHE_REPEATING) + { + wr->state = CACHE_DIRTY; + } delete op; }; - op_queue.push_front(op); + op_queue.insert(op_queue.begin(), op); + if (continuing_ops) + { + continuing_ops = 2; + op_queue_pos++; + } } int cluster_client_t::continue_rw(cluster_op_t *op) diff --git a/src/cluster_client.h b/src/cluster_client.h index da7e22a5..83fdf323 100644 --- a/src/cluster_client.h +++ b/src/cluster_client.h @@ -74,7 +74,7 @@ class cluster_client_t int retry_timeout_id = 0; uint64_t op_id = 1; std::vector offline_ops; - std::deque op_queue; + std::vector op_queue; std::map dirty_buffers; std::set dirty_osds; uint64_t dirty_bytes = 0, dirty_ops = 0; @@ -83,6 +83,7 @@ class cluster_client_t ring_consumer_t consumer; std::vector> on_ready_hooks; int continuing_ops = 0; + int op_queue_pos = 0; public: etcd_state_client_t st_cli; @@ -99,7 +100,7 @@ public: void continue_ops(bool up_retry = false); protected: bool affects_osd(uint64_t inode, uint64_t offset, uint64_t len, osd_num_t osd); - void flush_buffer(const object_id & oid, cluster_buffer_t & wr); + void flush_buffer(const object_id & oid, cluster_buffer_t *wr); void on_load_config_hook(json11::Json::object & config); void on_load_pgs_hook(bool success); void on_change_hook(json11::Json::object & changes); diff --git a/src/test_cluster_client.cpp b/src/test_cluster_client.cpp index 91446ad7..5541634c 100644 --- a/src/test_cluster_client.cpp +++ b/src/test_cluster_client.cpp @@ -47,7 +47,7 @@ void configure_single_pg_pool(cluster_client_t *cli) cli->st_cli.on_change_hook(changes); } -int *test_write(cluster_client_t *cli, uint64_t offset, uint64_t len, uint8_t c) +int *test_write(cluster_client_t *cli, uint64_t offset, uint64_t len, uint8_t c, std::function cb = NULL) { printf("Post write %lx+%lx\n", offset, len); int *r = new int; @@ -59,7 +59,7 @@ int *test_write(cluster_client_t *cli, uint64_t offset, uint64_t len, uint8_t c) op->len = len; op->iov.push_back(malloc_or_die(len), len); memset(op->iov.buf[0].iov_base, c, len); - op->callback = [r](cluster_op_t *op) + op->callback = [r, cb](cluster_op_t *op) { if (*r == -1) printf("Error: Not allowed to complete yet\n"); @@ -68,6 +68,8 @@ int *test_write(cluster_client_t *cli, uint64_t offset, uint64_t len, uint8_t c) free(op->iov.buf[0].iov_base); printf("Done write %lx+%lx r=%d\n", op->offset, op->len, op->retval); delete op; + if (cb != NULL) + cb(); }; cli->execute(op); return r; @@ -310,6 +312,28 @@ void test1() pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0); check_completed(r1); + // Check disconnect inside operation callback (reenterability) + // Probably doesn't happen too often, but possible in theory + r1 = test_write(cli, 0, 0x1000, 0x60, [cli]() + { + pretend_disconnected(cli, 1); + }); + r2 = test_write(cli, 0x1000, 0x1000, 0x61); + check_op_count(cli, 1, 2); + can_complete(r1); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0); + check_completed(r1); + check_disconnected(cli, 1); + pretend_connected(cli, 1); + cli->continue_ops(true); + check_op_count(cli, 1, 2); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0x1000, 0x1000), 0); + check_op_count(cli, 1, 1); + can_complete(r2); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0x1000, 0x1000), 0); + check_completed(r2); + // Free client delete cli; delete tfd;