From 235d15422c1b330f4f8c0292f20b5bcde4b4a7b1 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Mon, 3 Feb 2020 12:35:02 +0300 Subject: [PATCH] Mostly finish primary-OSD-read --- osd.h | 71 +++++++- osd_exec_secondary.cpp | 11 ++ osd_ops.h | 5 +- osd_peering.cpp | 6 +- osd_peering_pg.h | 11 +- osd_primary.cpp | 368 +++++++++++++++++++++++++---------------- osd_send.cpp | 65 +------- 7 files changed, 320 insertions(+), 217 deletions(-) diff --git a/osd.h b/osd.h index 24188b61b..e1fb29e9c 100644 --- a/osd.h +++ b/osd.h @@ -30,13 +30,63 @@ #define CL_WRITE_REPLY 2 #define CL_WRITE_DATA 3 #define MAX_EPOLL_EVENTS 16 +#define OSD_OP_INLINE_BUF_COUNT 4 #define PEER_CONNECTING 1 #define PEER_CONNECTED 2 //#define OSD_STUB -// FIXME: add types for pg_num and osd_num? +struct osd_op_buf_t +{ + void *buf; + int len; +}; + +struct osd_op_buf_list_t +{ + int count = 0, alloc = 0, sent = 0; + osd_op_buf_t *buf = NULL; + osd_op_buf_t inline_buf[OSD_OP_INLINE_BUF_COUNT]; + + ~osd_op_buf_list_t() + { + if (buf && buf != inline_buf) + { + free(buf); + } + } + + inline void push_back(void *nbuf, int len) + { + if (count >= alloc) + { + if (!alloc) + { + alloc = OSD_OP_INLINE_BUF_COUNT; + buf = inline_buf; + } + else if (buf == inline_buf) + { + int old = alloc; + alloc = ((alloc/16)*16 + 1); + buf = (osd_op_buf_t*)malloc(sizeof(osd_op_buf_t) * alloc); + memcpy(buf, inline_buf, sizeof(osd_op_buf_t)*old); + } + else + { + alloc = ((alloc/16)*16 + 1); + buf = (osd_op_buf_t*)realloc(buf, sizeof(osd_op_buf_t) * alloc); + } + } + buf[count++] = { .buf = nbuf, .len = len }; + } + + inline osd_op_buf_t & operator [] (int i) + { + return buf[i]; + } +}; struct osd_op_t { @@ -54,14 +104,17 @@ struct osd_op_t }; blockstore_op_t bs_op; void *buf = NULL; + void *op_data = NULL; std::function callback; + osd_op_buf_list_t send_list; + ~osd_op_t(); }; struct osd_peer_def_t { - uint64_t osd_num = 0; + osd_num_t osd_num = 0; std::string addr; int port = 0; time_t last_connect_attempt = 0; @@ -74,8 +127,7 @@ struct osd_client_t int peer_fd; int peer_state; std::function connect_callback; - // osd numbers begin with 1 - uint64_t osd_num = 0; + osd_num_t osd_num = 0; // Read state bool read_ready = false; @@ -103,11 +155,14 @@ struct osd_client_t int write_state = 0; }; +struct osd_primary_read_t; +struct osd_read_stripe_t; + class osd_t { // config - uint64_t osd_num = 1; // OSD numbers start with 1 + osd_num_t osd_num = 1; // OSD numbers start with 1 bool run_primary = false; std::vector peers; blockstore_config_t config; @@ -156,7 +211,7 @@ class osd_t void outbox_push(osd_client_t & cl, osd_op_t *op); // peer handling (primary OSD logic) - void connect_peer(unsigned osd_num, const char *peer_host, int peer_port, std::function callback); + void connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function callback); void handle_connect_result(int peer_fd); void stop_client(int peer_fd); osd_peer_def_t parse_peer(std::string peer); @@ -178,6 +233,10 @@ class osd_t void exec_primary_write(osd_op_t *cur_op); void exec_primary_sync(osd_op_t *cur_op); void make_primary_reply(osd_op_t *op); + void finish_primary_op(osd_op_t *cur_op, int retval); + void handle_primary_read_subop(osd_op_t *cur_op, int ok); + int extend_missing_stripes(osd_read_stripe_t *stripes, osd_num_t *target_set, int minsize, int size); + void submit_read_subops(int read_pg_size, const uint64_t* target_set, osd_op_t *cur_op); public: osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop); ~osd_t(); diff --git a/osd_exec_secondary.cpp b/osd_exec_secondary.cpp index b48cfccc9..9c3c1889d 100644 --- a/osd_exec_secondary.cpp +++ b/osd_exec_secondary.cpp @@ -139,6 +139,7 @@ void osd_t::make_reply(osd_op_t *op) { std::string *str = (std::string*)op->buf; op->reply.hdr.retval = str->size()+1; + op->send_list.push_back((void*)str->c_str(), op->reply.hdr.retval); } else { @@ -150,4 +151,14 @@ void osd_t::make_reply(osd_op_t *op) else if (op->op.hdr.opcode == OSD_OP_SECONDARY_DELETE) op->reply.sec_del.version = op->bs_op.version; } + if (op->op.hdr.opcode == OSD_OP_SECONDARY_READ && + op->reply.hdr.retval > 0) + { + op->send_list.push_back(op->buf, op->reply.hdr.retval); + } + else if (op->op.hdr.opcode == OSD_OP_SECONDARY_LIST && + op->reply.hdr.retval > 0) + { + op->send_list.push_back(op->buf, op->reply.hdr.retval * sizeof(obj_ver_id)); + } } diff --git a/osd_ops.h b/osd_ops.h index 936eb2f9a..ab0aa2bb8 100644 --- a/osd_ops.h +++ b/osd_ops.h @@ -26,6 +26,9 @@ #define OSD_RW_ALIGN 512 #define OSD_RW_MAX 64*1024*1024 +typedef uint64_t osd_num_t; +typedef uint32_t pg_num_t; + // common request and reply headers struct __attribute__((__packed__)) osd_op_header_t { @@ -128,7 +131,7 @@ struct __attribute__((__packed__)) osd_op_secondary_list_t { osd_op_header_t header; // placement group total number and total count - uint32_t pgnum, pgtotal; + pg_num_t pgnum, pgtotal; }; struct __attribute__((__packed__)) osd_reply_secondary_list_t diff --git a/osd_peering.cpp b/osd_peering.cpp index e12a9a4a4..6f724789e 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -43,7 +43,7 @@ osd_peer_def_t osd_t::parse_peer(std::string peer) return r; } -void osd_t::connect_peer(unsigned osd_num, const char *peer_host, int peer_port, std::function callback) +void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function callback) { struct sockaddr_in addr; int r; @@ -184,7 +184,7 @@ void osd_t::start_pg_peering(int pg_idx) auto & pg = pgs[pg_idx]; auto ps = pg.peering_state = new pg_peering_state_t(); { - uint64_t osd_num = this->osd_num; + osd_num_t osd_num = this->osd_num; osd_op_t *op = new osd_op_t(); op->op_type = 0; op->peer_fd = 0; @@ -211,7 +211,7 @@ void osd_t::start_pg_peering(int pg_idx) } for (int i = 0; i < peers.size(); i++) { - uint64_t osd_num = peers[i].osd_num; + osd_num_t osd_num = peers[i].osd_num; auto & cl = clients[osd_peer_fds[peers[i].osd_num]]; osd_op_t *op = new osd_op_t(); op->op_type = OSD_OP_OUT; diff --git a/osd_peering_pg.h b/osd_peering_pg.h index 5e7b25e12..00291bf13 100644 --- a/osd_peering_pg.h +++ b/osd_peering_pg.h @@ -3,6 +3,7 @@ #include #include "object_id.h" +#include "osd_ops.h" #include "sparsepp/sparsepp/spp.h" @@ -31,7 +32,7 @@ struct pg_obj_loc_t { uint64_t role; - uint64_t osd_num; + osd_num_t osd_num; bool stable; }; @@ -39,7 +40,7 @@ typedef std::vector pg_osd_set_t; struct pg_osd_set_state_t { - std::vector read_target; + std::vector read_target; pg_osd_set_t osd_set; uint64_t state = 0; uint64_t object_count = 0; @@ -55,7 +56,7 @@ struct pg_list_result_t struct pg_peering_state_t { // osd_num -> list result - spp::sparse_hash_map list_results; + spp::sparse_hash_map list_results; int list_done = 0; }; @@ -100,10 +101,10 @@ struct pg_t { int state; uint64_t pg_cursize = 3, pg_size = 3, pg_minsize = 2; - uint64_t pg_num; + pg_num_t pg_num; uint64_t clean_count = 0; // target_set = (role => osd_num or UINT64_MAX if missing). role numbers start with zero - std::vector target_set; + std::vector target_set; // moved object map. by default, each object is considered to reside on the target_set. // this map stores all objects that differ. // it may consume up to ~ (raw storage / object size) * 24 bytes in the worst case scenario diff --git a/osd_primary.cpp b/osd_primary.cpp index 2a2e05f31..084be9e40 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -1,192 +1,272 @@ #include "osd.h" #include "xor.h" +// read: read directly or read paired stripe(s), reconstruct, return +// write: read paired stripe(s), modify, write +// nuance: take care to read the same version from paired stripes! +// if there are no write requests in progress we're good (stripes must be in sync) +// and... remember the last readable version during a write request +// and... postpone other write requests to the same stripe until the completion of previous ones +// +// sync: sync peers, get unstable versions from somewhere, stabilize them + +struct off_len_t +{ + uint64_t offset, len; +}; + +struct osd_read_stripe_t +{ + uint64_t pos; + uint32_t start, end; + uint32_t real_start, real_end; +}; + +struct osd_primary_read_t +{ + pg_num_t pg_num; + object_id oid; + uint64_t target_ver; + int n_subops = 0, done = 0, errors = 0; + int degraded = 0, pg_size, pg_minsize; + osd_read_stripe_t *stripes; + osd_op_t *subops = NULL; +}; + +void osd_t::finish_primary_op(osd_op_t *cur_op, int retval) +{ + // FIXME add separate magics + cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; + cur_op->reply.hdr.id = cur_op->op.hdr.id; + cur_op->reply.hdr.opcode = cur_op->op.hdr.opcode; + cur_op->reply.hdr.retval = retval; + outbox_push(this->clients[cur_op->peer_fd], cur_op); +} + void osd_t::exec_primary_read(osd_op_t *cur_op) { - // read: read directly or read paired stripe(s), reconstruct, return - // write: read paired stripe(s), modify, write - // nuance: take care to read the same version from paired stripes! - // if there are no write requests in progress we're good (stripes must be in sync) - // and... remember the last readable version during a write request - // and... postpone other write requests to the same stripe until the completion of previous ones - // - // sync: sync peers, get unstable versions from somewhere, stabilize them object_id oid = { .inode = cur_op->op.rw.inode, .stripe = (cur_op->op.rw.offset / (bs_block_size*2)) << STRIPE_SHIFT, }; - uint64_t start = cur_op->op.rw.offset, end = cur_op->op.rw.offset + cur_op->op.rw.len; - unsigned pg_num = (oid % pg_count); // FIXME +1 + uint64_t start = cur_op->op.rw.offset; + uint64_t end = cur_op->op.rw.offset + cur_op->op.rw.len; + pg_num_t pg_num = (oid % pg_count); // FIXME +1 if (((end - 1) / (bs_block_size*2)) != oid.stripe || (start % bs_disk_alignment) || (end % bs_disk_alignment) || pg_num > pgs.size()) { - // FIXME add separate magics - cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; - cur_op->reply.hdr.id = cur_op->op.hdr.id; - cur_op->reply.hdr.opcode = cur_op->op.hdr.opcode; - cur_op->reply.hdr.retval = -EINVAL; - outbox_push(clients[cur_op->peer_fd], cur_op); + finish_primary_op(cur_op, -EINVAL); return; } - // role -> start, end - uint64_t reads[pgs[pg_num].pg_minsize*2] = { 0 }; + osd_primary_read_t *op_data = (osd_primary_read_t*)calloc( + sizeof(osd_primary_read_t) + sizeof(osd_read_stripe_t) * pgs[pg_num].pg_size, 1 + ); + osd_read_stripe_t *stripes = (op_data->stripes = ((osd_read_stripe_t*)(op_data+1))); + cur_op->op_data = op_data; for (int role = 0; role < pgs[pg_num].pg_minsize; role++) { if (start < (1+role)*bs_block_size && end > role*bs_block_size) { - reads[role*2] = start < role*bs_block_size ? 0 : start-role*bs_block_size; - reads[role*2+1] = end > (role+1)*bs_block_size ? bs_block_size : end-role*bs_block_size; + stripes[role].real_start = stripes[role].start + = start < role*bs_block_size ? 0 : start-role*bs_block_size; + stripes[role].end = stripes[role].real_end + = end > (role+1)*bs_block_size ? bs_block_size : end-role*bs_block_size; } } - auto vo_it = pgs[pg_num].ver_override.find(oid); - uint64_t target_ver = vo_it != pgs[pg_num].ver_override.end() ? vo_it->second : UINT64_MAX; + { + auto vo_it = pgs[pg_num].ver_override.find(oid); + op_data->target_ver = vo_it != pgs[pg_num].ver_override.end() ? vo_it->second : UINT64_MAX; + } if (pgs[pg_num].pg_cursize == 3) { // Fast happy-path - void *buf = memalign(MEM_ALIGNMENT, cur_op->op.rw.len); - + submit_read_subops(pgs[pg_num].pg_minsize, pgs[pg_num].target_set.data(), cur_op); + cur_op->send_list.push_back(cur_op->buf, cur_op->op.rw.len); } else { // PG is degraded - auto it = pgs[pg_num].obj_states.find(oid); - std::vector & target_set = (it != pgs[pg_num].obj_states.end() - ? it->second->read_target - : pgs[pg_num].target_set); - uint64_t real_reads[pgs[pg_num].pg_size*2] = { 0 }; - memcpy(real_reads, reads, sizeof(uint64_t)*pgs[pg_num].pg_minsize*2); - for (int role = 0; role < pgs[pg_num].pg_minsize; role++) + uint64_t* target_set; { - if (reads[role*2+1] != 0 && target_set[role] == 0) + auto it = pgs[pg_num].obj_states.find(oid); + target_set = (it != pgs[pg_num].obj_states.end() + ? it->second->read_target.data() + : pgs[pg_num].target_set.data()); + } + if (extend_missing_stripes(stripes, target_set, pgs[pg_num].pg_minsize, pgs[pg_num].pg_size) < 0) + { + free(op_data); + finish_primary_op(cur_op, -EIO); + return; + } + // Submit reads + submit_read_subops(pgs[pg_num].pg_size, target_set, cur_op); + op_data->pg_minsize = pgs[pg_num].pg_minsize; + op_data->pg_size = pgs[pg_num].pg_size; + op_data->degraded = 1; + } +} + +void osd_t::handle_primary_read_subop(osd_op_t *cur_op, int ok) +{ + osd_primary_read_t *op_data = (osd_primary_read_t*)cur_op->op_data; + if (!ok) + op_data->errors++; + else + op_data->done++; + if ((op_data->errors + op_data->done) >= op_data->n_subops) + { + delete[] op_data->subops; + op_data->subops = NULL; + if (op_data->errors > 0) + { + free(op_data); + cur_op->op_data = NULL; + finish_primary_op(cur_op, -EIO); + return; + } + if (op_data->degraded) + { + // Reconstruct missing stripes + osd_read_stripe_t *stripes = op_data->stripes; + for (int role = 0; role < op_data->pg_minsize; role++) { - // Stripe is missing. Extend read to other stripes. - // We need at least pg_minsize stripes to recover the lost part. - int exist = 0; - for (int j = 0; j < pgs[pg_num].pg_size; j++) + if (stripes[role].end != 0 && stripes[role].real_end == 0) { - if (target_set[j] != 0) + int other = role == 0 ? 1 : 0; + int parity = op_data->pg_size-1; + memxor( + cur_op->buf + stripes[other].pos + (stripes[other].real_start - stripes[role].start), + cur_op->buf + stripes[parity].pos + (stripes[parity].real_start - stripes[role].start), + cur_op->buf + stripes[role].pos, stripes[role].end - stripes[role].start + ); + } + if (stripes[role].end != 0) + { + // Send buffer in parts to avoid copying + cur_op->send_list.push_back( + cur_op->buf + stripes[role].pos + (stripes[role].real_start - stripes[role].start), stripes[role].end + ); + } + } + } + free(op_data); + cur_op->op_data = NULL; + finish_primary_op(cur_op, cur_op->op.rw.len); + } +} + +int osd_t::extend_missing_stripes(osd_read_stripe_t *stripes, osd_num_t *target_set, int minsize, int size) +{ + for (int role = 0; role < minsize; role++) + { + if (stripes[role*2+1].end != 0 && target_set[role] == 0) + { + // Stripe is missing. Extend read to other stripes. + // We need at least pg_minsize stripes to recover the lost part. + int exist = 0; + for (int j = 0; j < size; j++) + { + if (target_set[j] != 0) + { + if (stripes[j].real_end == 0 || j >= minsize) { - if (real_reads[j*2+1] == 0 || j >= pgs[pg_num].pg_minsize) - { - real_reads[j*2] = reads[role*2]; - real_reads[j*2+1] = reads[role*2+1]; - } - else - { - real_reads[j*2] = reads[j*2] < reads[role*2] ? reads[j*2] : reads[role*2]; - real_reads[j*2+1] = reads[j*2+1] > reads[role*2+1] ? reads[j*2+1] : reads[role*2+1]; - } - exist++; - if (exist >= pgs[pg_num].pg_minsize) - { - break; - } + stripes[j].real_start = stripes[role].start; + stripes[j].real_end = stripes[role].end; + } + else + { + stripes[j].real_start = stripes[j].start < stripes[role].start ? stripes[j].start : stripes[role].start; + stripes[j].real_end = stripes[j].end > stripes[role].end ? stripes[j].end : stripes[role].end; + } + exist++; + if (exist >= minsize) + { + break; } } - if (exist < pgs[pg_num].pg_minsize) - { - // Object is unreadable - cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; - cur_op->reply.hdr.id = cur_op->op.hdr.id; - cur_op->reply.hdr.opcode = cur_op->op.hdr.opcode; - cur_op->reply.hdr.retval = -EIO; - outbox_push(clients[cur_op->peer_fd], cur_op); - return; - } + } + if (exist < minsize) + { + // Less than minsize stripes are available for this object + return -1; } } - uint64_t pos[pgs[pg_num].pg_size]; - uint64_t buf_size = 0; - int n_subops = 0; - for (int role = 0; role < pgs[pg_num].pg_size; role++) + } + return 0; +} + +void osd_t::submit_read_subops(int read_pg_size, const uint64_t* target_set, osd_op_t *cur_op) +{ + osd_primary_read_t *op_data = (osd_primary_read_t*)cur_op->op_data; + osd_read_stripe_t *stripes = op_data->stripes; + uint64_t buf_size = 0; + int n_subops = 0; + for (int role = 0; role < read_pg_size; role++) + { + if (stripes[role].real_end != 0) { - if (real_reads[role*2+1] != 0) - { - n_subops++; - pos[role] = buf_size; - buf_size += real_reads[role*2+1]; - } + n_subops++; + stripes[role].pos = buf_size; + buf_size += stripes[role].real_end - stripes[role].real_start; } - void *buf = memalign(MEM_ALIGNMENT, buf_size); - // Submit reads - osd_op_t read_ops[n_subops]; - int subop = 0; - int errors = 0, done = 0; - for (int role = 0; role < pgs[pg_num].pg_size; role++) + } + osd_op_t *subops = new osd_op_t[n_subops]; + cur_op->buf = memalign(MEM_ALIGNMENT, buf_size); + op_data->n_subops = n_subops; + op_data->subops = subops; + int subop = 0; + for (int role = 0; role < read_pg_size; role++) + { + auto role_osd_num = target_set[role]; + if (role_osd_num != 0) { - uint64_t role_osd_num = target_set[role]; - if (role_osd_num != 0) + if (role_osd_num == this->osd_num) { - if (role_osd_num == this->osd_num) - { - read_ops[subop].bs_op = { - .opcode = BS_OP_READ, - .callback = [&](blockstore_op_t *op) - { - if (op->retval < op->len) - errors++; - else - done++; - // continue op - }, - .oid = { - .inode = oid.inode, - .stripe = oid.stripe | role, - }, - .version = target_ver, - .offset = real_reads[role*2], - .len = real_reads[role*2+1] - real_reads[role*2], - .buf = buf + pos[role], - }; - bs->enqueue_op(&read_ops[subop].bs_op); - } - else - { - read_ops[subop].op_type = OSD_OP_OUT; - read_ops[subop].peer_fd = osd_peer_fds.at(role_osd_num); - read_ops[subop].op.sec_rw = { - .header = { - .magic = SECONDARY_OSD_OP_MAGIC, - .id = next_subop_id++, - .opcode = OSD_OP_SECONDARY_READ, - }, - .oid = { - .inode = oid.inode, - .stripe = oid.stripe | role, - }, - .version = target_ver, - .offset = real_reads[role*2], - .len = real_reads[role*2+1] - real_reads[role*2], - }; - read_ops[subop].buf = buf + pos[role]; - read_ops[subop].callback = [&](osd_op_t *osd_subop) + subops[subop].bs_op = { + .opcode = BS_OP_READ, + .callback = [this, cur_op](blockstore_op_t *subop) { - if (osd_subop->reply.hdr.retval < osd_subop->op.sec_rw.len) - errors++; - else - done++; - // continue op - }; - } - subop++; + handle_primary_read_subop(cur_op, subop->retval == subop->len); + }, + .oid = { + .inode = op_data->oid.inode, + .stripe = op_data->oid.stripe | role, + }, + .version = op_data->target_ver, + .offset = stripes[role].real_start, + .len = stripes[role].real_end - stripes[role].real_start, + .buf = cur_op->buf + stripes[role].pos, + }; + bs->enqueue_op(&subops[subop].bs_op); } - } - // Reconstruct missing stripes - for (int role = 0; role < pgs[pg_num].pg_minsize; role++) - { - if (reads[role*2+1] != 0 && target_set[role] == 0) + else { - int other = role == 0 ? 1 : 0; - int parity = pgs[pg_num].pg_size-1; - memxor( - buf + pos[other] + (real_reads[other*2]-reads[role*2]), - buf + pos[parity] + (real_reads[parity*2]-reads[role*2]), - buf + pos[role], reads[role*2+1]-reads[role*2] - ); + subops[subop].op_type = OSD_OP_OUT; + subops[subop].peer_fd = this->osd_peer_fds.at(role_osd_num); + subops[subop].op.sec_rw = { + .header = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = this->next_subop_id++, + .opcode = OSD_OP_SECONDARY_READ, + }, + .oid = { + .inode = op_data->oid.inode, + .stripe = op_data->oid.stripe | role, + }, + .version = op_data->target_ver, + .offset = stripes[role].real_start, + .len = stripes[role].real_end - stripes[role].real_start, + }; + subops[subop].buf = cur_op->buf + stripes[role].pos; + subops[subop].callback = [this, cur_op](osd_op_t *subop) + { + handle_primary_read_subop(cur_op, subop->reply.hdr.retval == subop->op.sec_rw.len); + }; } + subop++; } - // Send buffer in parts to avoid copying } } diff --git a/osd_send.cpp b/osd_send.cpp index aedac2024..85da80ba9 100644 --- a/osd_send.cpp +++ b/osd_send.cpp @@ -46,7 +46,7 @@ void osd_t::send_replies() cl.write_iov.iov_len = cl.write_remaining; cl.write_msg.msg_iov = &cl.write_iov; cl.write_msg.msg_iovlen = 1; - // FIXME: This is basically a busy-loop. It's probably better to add epoll here + // FIXME: This is basically a busy-loop. It may be better to add epoll here(?) data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data, peer_fd); }; my_uring_prep_sendmsg(sqe, peer_fd, &cl.write_msg, 0); cl.write_state = cl.write_state | SQE_SENT; @@ -76,67 +76,16 @@ void osd_t::handle_send(ring_data_t *data, int peer_fd) { cl.write_buf = NULL; osd_op_t *cur_op = cl.write_op; - if (cl.write_state == CL_WRITE_REPLY) + if (cur_op->send_list.sent < cur_op->send_list.count) { // Send data - if (cur_op->op_type == OSD_OP_IN) - { - if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ && - cur_op->reply.hdr.retval > 0) - { - cl.write_buf = cur_op->buf; - cl.write_remaining = cur_op->reply.hdr.retval; - cl.write_state = CL_WRITE_DATA; - } - else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_LIST && - cur_op->reply.hdr.retval > 0) - { - cl.write_buf = cur_op->buf; - cl.write_remaining = cur_op->reply.hdr.retval * sizeof(obj_ver_id); - cl.write_state = CL_WRITE_DATA; - } - else if (cur_op->op.hdr.opcode == OSD_OP_SHOW_CONFIG && - cur_op->reply.hdr.retval > 0) - { - cl.write_buf = (void*)((std::string*)cur_op->buf)->c_str(); - cl.write_remaining = cur_op->reply.hdr.retval; - cl.write_state = CL_WRITE_DATA; - } - else - { - goto op_done; - } - } - else - { - if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE) - { - cl.write_buf = cur_op->buf; - cl.write_remaining = cur_op->op.sec_rw.len; - cl.write_state = CL_WRITE_DATA; - } - else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE || - cur_op->op.hdr.opcode == OSD_OP_SECONDARY_ROLLBACK) - { - cl.write_buf = cur_op->buf; - cl.write_remaining = cur_op->op.sec_stab.len; - cl.write_state = CL_WRITE_DATA; - } - else if (cur_op->op.hdr.opcode == OSD_OP_WRITE) - { - cl.write_buf = cur_op->buf; - cl.write_remaining = cur_op->op.rw.len; - cl.write_state = CL_WRITE_DATA; - } - else - { - goto op_done; - } - } + cl.write_buf = cur_op->send_list[cur_op->send_list.sent].buf; + cl.write_remaining = cur_op->send_list[cur_op->send_list.sent].len; + cur_op->send_list.sent++; + cl.write_state = CL_WRITE_DATA; } - else if (cl.write_state == CL_WRITE_DATA) + else { - op_done: // Done if (cur_op->op_type == OSD_OP_IN) {