diff --git a/Makefile b/Makefile index dd0484be..5560bbab 100644 --- a/Makefile +++ b/Makefile @@ -30,7 +30,7 @@ libfio_blockstore.so: ./libblockstore.so fio_engine.cpp json11.o g++ $(CXXFLAGS) -shared -o libfio_blockstore.so fio_engine.cpp json11.o ./libblockstore.so -ltcmalloc_minimal -luring OSD_OBJS := osd.o osd_secondary.o osd_receive.o osd_send.o osd_peering.o osd_flush.o osd_peering_pg.o \ - osd_primary.o osd_cluster.o osd_http.o osd_rmw.o json11.o timerfd_interval.o base64.o timerfd_manager.o + osd_primary.o osd_primary_subops.o osd_cluster.o osd_http.o osd_rmw.o json11.o timerfd_interval.o base64.o timerfd_manager.o base64.o: base64.cpp base64.h g++ $(CXXFLAGS) -c -o $@ $< osd_secondary.o: osd_secondary.cpp osd.h osd_ops.h ringloop.h @@ -53,7 +53,9 @@ osd_rmw.o: osd_rmw.cpp osd_rmw.h xor.h g++ $(CXXFLAGS) -c -o $@ $< osd_rmw_test: osd_rmw_test.cpp osd_rmw.cpp osd_rmw.h xor.h g++ $(CXXFLAGS) -o $@ $< -osd_primary.o: osd_primary.cpp osd.h osd_ops.h osd_peering_pg.h xor.h ringloop.h +osd_primary.o: osd_primary.cpp osd_primary.h osd_rmw.h osd.h osd_ops.h osd_peering_pg.h xor.h ringloop.h + g++ $(CXXFLAGS) -c -o $@ $< +osd_primary_subops.o: osd_primary_subops.cpp osd_primary.h osd_rmw.h osd.h osd_ops.h osd_peering_pg.h xor.h ringloop.h g++ $(CXXFLAGS) -c -o $@ $< osd.o: osd.cpp osd.h osd_http.h osd_ops.h osd_peering_pg.h ringloop.h g++ $(CXXFLAGS) -c -o $@ $< diff --git a/osd_primary.cpp b/osd_primary.cpp index f5a702b2..6ed74024 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -1,11 +1,4 @@ -#include "osd.h" -#include "osd_rmw.h" - -// FIXME: Split into more files - -#define SUBMIT_READ 0 -#define SUBMIT_RMW_READ 1 -#define SUBMIT_WRITE 2 +#include "osd_primary.h" // read: read directly or read paired stripe(s), reconstruct, return // write: read paired stripe(s), reconstruct, modify, calculate parity, write @@ -16,70 +9,6 @@ // // sync: sync peers, get unstable versions, stabilize them -struct unstable_osd_num_t -{ - osd_num_t osd_num; - int start, len; -}; - -struct osd_primary_op_data_t -{ - int st = 0; - pg_num_t pg_num; - object_id oid; - uint64_t target_ver; - uint64_t fact_ver = 0; - int n_subops = 0, done = 0, errors = 0, epipe = 0; - int degraded = 0, pg_size, pg_minsize; - osd_rmw_stripe_t *stripes; - osd_op_t *subops = NULL; - uint64_t *prev_set = NULL; - pg_osd_set_state_t *object_state = NULL; - - // for sync. oops, requires freeing - std::vector *unstable_write_osds = NULL; - pg_num_t *dirty_pgs = NULL; - int dirty_pg_count = 0; - obj_ver_id *unstable_writes = NULL; -}; - -void osd_t::finish_op(osd_op_t *cur_op, int retval) -{ - inflight_ops--; - if (cur_op->op_data && cur_op->op_data->pg_num > 0) - { - auto & pg = pgs[cur_op->op_data->pg_num]; - pg.inflight--; - assert(pg.inflight >= 0); - if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch) - { - finish_stop_pg(pg); - } - } - if (!cur_op->peer_fd) - { - // Copy lambda to be unaffected by `delete op` - std::function(cur_op->callback)(cur_op); - } - else - { - // FIXME add separate magic number - auto cl_it = clients.find(cur_op->peer_fd); - if (cl_it != clients.end()) - { - cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; - cur_op->reply.hdr.id = cur_op->req.hdr.id; - cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode; - cur_op->reply.hdr.retval = retval; - outbox_push(cl_it->second, cur_op); - } - else - { - delete cur_op; - } - } -} - bool osd_t::prepare_primary_rw(osd_op_t *cur_op) { // PG number is calculated from the offset @@ -120,7 +49,7 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op) return true; } -uint64_t* get_object_osd_set(pg_t &pg, object_id &oid, uint64_t *def, pg_osd_set_state_t **object_state) +static uint64_t* get_object_osd_set(pg_t &pg, object_id &oid, uint64_t *def, pg_osd_set_state_t **object_state) { if (!(pg.state & (PG_HAS_INCOMPLETE | PG_HAS_DEGRADED | PG_HAS_MISPLACED))) { @@ -226,185 +155,6 @@ resume_2: finish_op(cur_op, cur_op->req.rw.len); } -void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op) -{ - bool w = submit_type == SUBMIT_WRITE; - osd_primary_op_data_t *op_data = cur_op->op_data; - osd_rmw_stripe_t *stripes = op_data->stripes; - // Allocate subops - int n_subops = 0, zero_read = -1; - for (int role = 0; role < pg_size; role++) - { - if (osd_set[role] == this->osd_num || osd_set[role] != 0 && zero_read == -1) - { - zero_read = role; - } - if (osd_set[role] != 0 && (w || stripes[role].read_end != 0)) - { - n_subops++; - } - } - if (!n_subops && submit_type == SUBMIT_RMW_READ) - { - n_subops = 1; - } - else - { - zero_read = -1; - } - uint64_t op_version = w ? op_data->fact_ver+1 : (submit_type == SUBMIT_RMW_READ ? UINT64_MAX : op_data->target_ver); - osd_op_t *subops = new osd_op_t[n_subops]; - op_data->fact_ver = 0; - op_data->done = op_data->errors = 0; - op_data->n_subops = n_subops; - op_data->subops = subops; - int subop = 0; - for (int role = 0; role < pg_size; role++) - { - // We always submit zero-length writes to all replicas, even if the stripe is not modified - if (!(w || stripes[role].read_end != 0 || zero_read == role)) - { - continue; - } - osd_num_t role_osd_num = osd_set[role]; - if (role_osd_num != 0) - { - if (role_osd_num == this->osd_num) - { - subops[subop].bs_op = new blockstore_op_t({ - .opcode = (uint64_t)(w ? BS_OP_WRITE : BS_OP_READ), - .callback = [cur_op, this](blockstore_op_t *subop) - { - if (subop->opcode == BS_OP_WRITE && subop->retval != subop->len) - { - // die - throw std::runtime_error("local write operation failed (retval = "+std::to_string(subop->retval)+")"); - } - handle_primary_subop( - subop->opcode == BS_OP_WRITE ? OSD_OP_SECONDARY_WRITE : OSD_OP_SECONDARY_READ, - cur_op, subop->retval, subop->len, subop->version - ); - }, - .oid = { - .inode = op_data->oid.inode, - .stripe = op_data->oid.stripe | role, - }, - .version = op_version, - .offset = w ? stripes[role].write_start : stripes[role].read_start, - .len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start, - .buf = w ? stripes[role].write_buf : stripes[role].read_buf, - }); - bs->enqueue_op(subops[subop].bs_op); - } - else - { - subops[subop].op_type = OSD_OP_OUT; - subops[subop].send_list.push_back(subops[subop].req.buf, OSD_PACKET_SIZE); - subops[subop].peer_fd = this->osd_peer_fds.at(role_osd_num); - subops[subop].req.sec_rw = { - .header = { - .magic = SECONDARY_OSD_OP_MAGIC, - .id = this->next_subop_id++, - .opcode = (uint64_t)(w ? OSD_OP_SECONDARY_WRITE : OSD_OP_SECONDARY_READ), - }, - .oid = { - .inode = op_data->oid.inode, - .stripe = op_data->oid.stripe | role, - }, - .version = op_version, - .offset = w ? stripes[role].write_start : stripes[role].read_start, - .len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start, - }; - subops[subop].buf = w ? stripes[role].write_buf : stripes[role].read_buf; - if (w && stripes[role].write_end > 0) - { - subops[subop].send_list.push_back(stripes[role].write_buf, stripes[role].write_end - stripes[role].write_start); - } - subops[subop].callback = [cur_op, this](osd_op_t *subop) - { - int fail_fd = subop->req.hdr.opcode == OSD_OP_SECONDARY_WRITE && - subop->reply.hdr.retval != subop->req.sec_rw.len ? subop->peer_fd : -1; - // so it doesn't get freed - subop->buf = NULL; - handle_primary_subop( - subop->req.hdr.opcode, cur_op, subop->reply.hdr.retval, - subop->req.sec_rw.len, subop->reply.sec_rw.version - ); - if (fail_fd >= 0) - { - // write operation failed, drop the connection - stop_client(fail_fd); - } - }; - outbox_push(clients[subops[subop].peer_fd], &subops[subop]); - } - subop++; - } - } -} - -void osd_t::handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, int expected, uint64_t version) -{ - osd_primary_op_data_t *op_data = cur_op->op_data; - if (retval != expected) - { - printf("%s subop failed: retval = %d (expected %d)\n", osd_op_names[opcode], retval, expected); - if (retval == -EPIPE) - { - op_data->epipe++; - } - op_data->errors++; - } - else - { - op_data->done++; - if (opcode == OSD_OP_SECONDARY_READ || opcode == OSD_OP_SECONDARY_WRITE) - { - if (op_data->fact_ver != 0 && op_data->fact_ver != version) - { - throw std::runtime_error("different fact_versions returned from subops: "+std::to_string(version)+" vs "+std::to_string(op_data->fact_ver)); - } - op_data->fact_ver = version; - } - } - if ((op_data->errors + op_data->done) >= op_data->n_subops) - { - delete[] op_data->subops; - op_data->subops = NULL; - op_data->st++; - if (cur_op->req.hdr.opcode == OSD_OP_READ) - { - continue_primary_read(cur_op); - } - else if (cur_op->req.hdr.opcode == OSD_OP_WRITE) - { - continue_primary_write(cur_op); - } - else if (cur_op->req.hdr.opcode == OSD_OP_SYNC) - { - continue_primary_sync(cur_op); - } - else - { - throw std::runtime_error("BUG: unknown opcode"); - } - } -} - -void osd_t::pg_cancel_write_queue(pg_t & pg, object_id oid, int retval) -{ - auto st_it = pg.write_queue.find(oid), it = st_it; - while (it != pg.write_queue.end() && it->first == oid) - { - finish_op(it->second, retval); - it++; - } - if (st_it != it) - { - pg.write_queue.erase(st_it, it); - } -} - void osd_t::continue_primary_write(osd_op_t *cur_op) { if (!cur_op->op_data && !prepare_primary_rw(cur_op)) @@ -631,34 +381,6 @@ resume_7: } } -void osd_t::autosync() -{ - if (immediate_commit != IMMEDIATE_ALL && !autosync_op) - { - autosync_op = new osd_op_t(); - autosync_op->op_type = OSD_OP_IN; - autosync_op->req = { - .sync = { - .header = { - .magic = SECONDARY_OSD_OP_MAGIC, - .id = 1, - .opcode = OSD_OP_SYNC, - }, - }, - }; - autosync_op->callback = [this](osd_op_t *op) - { - if (op->reply.hdr.retval < 0) - { - printf("Warning: automatic sync resulted in an error: %ld (%s)\n", -op->reply.hdr.retval, strerror(-op->reply.hdr.retval)); - } - delete autosync_op; - autosync_op = NULL; - }; - exec_op(autosync_op); - } -} - // Save and clear unstable_writes -> SYNC all -> STABLE all void osd_t::continue_primary_sync(osd_op_t *cur_op) { @@ -821,196 +543,3 @@ finish: goto resume_2; } } - -void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_state_t *object_state) -{ - osd_primary_op_data_t *op_data = cur_op->op_data; - int extra_chunks = 0; - for (auto chunk: object_state->osd_set) - { - if (chunk.osd_num != cur_set[chunk.role]) - { - extra_chunks++; - } - } - op_data->n_subops = extra_chunks; - op_data->done = op_data->errors = 0; - if (!extra_chunks) - { - return; - } - osd_op_t *subops = new osd_op_t[extra_chunks]; - op_data->subops = subops; - int i = 0; - for (auto chunk: object_state->osd_set) - { - if (chunk.osd_num != cur_set[chunk.role]) - { - if (chunk.osd_num == this->osd_num) - { - subops[i].bs_op = new blockstore_op_t({ - .opcode = BS_OP_DELETE, - .callback = [cur_op, this](blockstore_op_t *subop) - { - if (subop->retval != 0) - { - // die - throw std::runtime_error("local delete operation failed"); - } - handle_primary_subop(OSD_OP_SECONDARY_DELETE, cur_op, subop->retval, 0, 0); - }, - .oid = { - .inode = op_data->oid.inode, - .stripe = op_data->oid.stripe | chunk.role, - }, - // Same version as write - .version = op_data->fact_ver, - }); - bs->enqueue_op(subops[i].bs_op); - } - else - { - subops[i].op_type = OSD_OP_OUT; - subops[i].send_list.push_back(subops[i].req.buf, OSD_PACKET_SIZE); - subops[i].peer_fd = osd_peer_fds.at(chunk.osd_num); - subops[i].req.sec_del = { - .header = { - .magic = SECONDARY_OSD_OP_MAGIC, - .id = this->next_subop_id++, - .opcode = OSD_OP_SECONDARY_DELETE, - }, - .oid = { - .inode = op_data->oid.inode, - .stripe = op_data->oid.stripe | chunk.role, - }, - // Same version as write - .version = op_data->fact_ver, - }; - subops[i].callback = [cur_op, this](osd_op_t *subop) - { - int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : 0; - handle_primary_subop(OSD_OP_SECONDARY_DELETE, cur_op, subop->reply.hdr.retval, 0, 0); - if (fail_fd >= 0) - { - // delete operation failed, drop the connection - stop_client(fail_fd); - } - }; - outbox_push(clients[subops[i].peer_fd], &subops[i]); - } - i++; - } - } -} - -void osd_t::submit_primary_sync_subops(osd_op_t *cur_op) -{ - osd_primary_op_data_t *op_data = cur_op->op_data; - int n_osds = op_data->unstable_write_osds->size(); - osd_op_t *subops = new osd_op_t[n_osds]; - op_data->done = op_data->errors = 0; - op_data->n_subops = n_osds; - op_data->subops = subops; - for (int i = 0; i < n_osds; i++) - { - osd_num_t sync_osd = (*(op_data->unstable_write_osds))[i].osd_num; - if (sync_osd == this->osd_num) - { - subops[i].bs_op = new blockstore_op_t({ - .opcode = BS_OP_SYNC, - .callback = [cur_op, this](blockstore_op_t *subop) - { - if (subop->retval != 0) - { - // die - throw std::runtime_error("local sync operation failed"); - } - handle_primary_subop(OSD_OP_SECONDARY_SYNC, cur_op, subop->retval, 0, 0); - }, - }); - bs->enqueue_op(subops[i].bs_op); - } - else - { - subops[i].op_type = OSD_OP_OUT; - subops[i].send_list.push_back(subops[i].req.buf, OSD_PACKET_SIZE); - subops[i].peer_fd = osd_peer_fds.at(sync_osd); - subops[i].req.sec_sync = { - .header = { - .magic = SECONDARY_OSD_OP_MAGIC, - .id = this->next_subop_id++, - .opcode = OSD_OP_SECONDARY_SYNC, - }, - }; - subops[i].callback = [cur_op, this](osd_op_t *subop) - { - int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : 0; - handle_primary_subop(OSD_OP_SECONDARY_SYNC, cur_op, subop->reply.hdr.retval, 0, 0); - if (fail_fd >= 0) - { - // sync operation failed, drop the connection - stop_client(fail_fd); - } - }; - outbox_push(clients[subops[i].peer_fd], &subops[i]); - } - } -} - -void osd_t::submit_primary_stab_subops(osd_op_t *cur_op) -{ - osd_primary_op_data_t *op_data = cur_op->op_data; - int n_osds = op_data->unstable_write_osds->size(); - osd_op_t *subops = new osd_op_t[n_osds]; - op_data->done = op_data->errors = 0; - op_data->n_subops = n_osds; - op_data->subops = subops; - for (int i = 0; i < n_osds; i++) - { - auto & stab_osd = (*(op_data->unstable_write_osds))[i]; - if (stab_osd.osd_num == this->osd_num) - { - subops[i].bs_op = new blockstore_op_t({ - .opcode = BS_OP_STABLE, - .callback = [cur_op, this](blockstore_op_t *subop) - { - if (subop->retval != 0) - { - // die - throw std::runtime_error("local stabilize operation failed"); - } - handle_primary_subop(OSD_OP_SECONDARY_STABILIZE, cur_op, subop->retval, 0, 0); - }, - .len = (uint32_t)stab_osd.len, - .buf = (void*)(op_data->unstable_writes + stab_osd.start), - }); - bs->enqueue_op(subops[i].bs_op); - } - else - { - subops[i].op_type = OSD_OP_OUT; - subops[i].send_list.push_back(subops[i].req.buf, OSD_PACKET_SIZE); - subops[i].peer_fd = osd_peer_fds.at(stab_osd.osd_num); - subops[i].req.sec_stab = { - .header = { - .magic = SECONDARY_OSD_OP_MAGIC, - .id = this->next_subop_id++, - .opcode = OSD_OP_SECONDARY_STABILIZE, - }, - .len = (uint64_t)(stab_osd.len * sizeof(obj_ver_id)), - }; - subops[i].send_list.push_back(op_data->unstable_writes + stab_osd.start, stab_osd.len * sizeof(obj_ver_id)); - subops[i].callback = [cur_op, this](osd_op_t *subop) - { - int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : 0; - handle_primary_subop(OSD_OP_SECONDARY_STABILIZE, cur_op, subop->reply.hdr.retval, 0, 0); - if (fail_fd >= 0) - { - // sync operation failed, drop the connection - stop_client(fail_fd); - } - }; - outbox_push(clients[subops[i].peer_fd], &subops[i]); - } - } -} diff --git a/osd_primary.h b/osd_primary.h new file mode 100644 index 00000000..63e15259 --- /dev/null +++ b/osd_primary.h @@ -0,0 +1,35 @@ +#pragma once + +#include "osd.h" +#include "osd_rmw.h" + +#define SUBMIT_READ 0 +#define SUBMIT_RMW_READ 1 +#define SUBMIT_WRITE 2 + +struct unstable_osd_num_t +{ + osd_num_t osd_num; + int start, len; +}; + +struct osd_primary_op_data_t +{ + int st = 0; + pg_num_t pg_num; + object_id oid; + uint64_t target_ver; + uint64_t fact_ver = 0; + int n_subops = 0, done = 0, errors = 0, epipe = 0; + int degraded = 0, pg_size, pg_minsize; + osd_rmw_stripe_t *stripes; + osd_op_t *subops = NULL; + uint64_t *prev_set = NULL; + pg_osd_set_state_t *object_state = NULL; + + // for sync. oops, requires freeing + std::vector *unstable_write_osds = NULL; + pg_num_t *dirty_pgs = NULL; + int dirty_pg_count = 0; + obj_ver_id *unstable_writes = NULL; +}; diff --git a/osd_primary_subops.cpp b/osd_primary_subops.cpp new file mode 100644 index 00000000..9b704858 --- /dev/null +++ b/osd_primary_subops.cpp @@ -0,0 +1,442 @@ +#include "osd_primary.h" + +#define SUBMIT_READ 0 +#define SUBMIT_RMW_READ 1 +#define SUBMIT_WRITE 2 + +void osd_t::autosync() +{ + if (immediate_commit != IMMEDIATE_ALL && !autosync_op) + { + autosync_op = new osd_op_t(); + autosync_op->op_type = OSD_OP_IN; + autosync_op->req = { + .sync = { + .header = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = 1, + .opcode = OSD_OP_SYNC, + }, + }, + }; + autosync_op->callback = [this](osd_op_t *op) + { + if (op->reply.hdr.retval < 0) + { + printf("Warning: automatic sync resulted in an error: %ld (%s)\n", -op->reply.hdr.retval, strerror(-op->reply.hdr.retval)); + } + delete autosync_op; + autosync_op = NULL; + }; + exec_op(autosync_op); + } +} + +void osd_t::finish_op(osd_op_t *cur_op, int retval) +{ + inflight_ops--; + if (cur_op->op_data && cur_op->op_data->pg_num > 0) + { + auto & pg = pgs[cur_op->op_data->pg_num]; + pg.inflight--; + assert(pg.inflight >= 0); + if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch) + { + finish_stop_pg(pg); + } + } + if (!cur_op->peer_fd) + { + // Copy lambda to be unaffected by `delete op` + std::function(cur_op->callback)(cur_op); + } + else + { + // FIXME add separate magic number + auto cl_it = clients.find(cur_op->peer_fd); + if (cl_it != clients.end()) + { + cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; + cur_op->reply.hdr.id = cur_op->req.hdr.id; + cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode; + cur_op->reply.hdr.retval = retval; + outbox_push(cl_it->second, cur_op); + } + else + { + delete cur_op; + } + } +} + +void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op) +{ + bool w = submit_type == SUBMIT_WRITE; + osd_primary_op_data_t *op_data = cur_op->op_data; + osd_rmw_stripe_t *stripes = op_data->stripes; + // Allocate subops + int n_subops = 0, zero_read = -1; + for (int role = 0; role < pg_size; role++) + { + if (osd_set[role] == this->osd_num || osd_set[role] != 0 && zero_read == -1) + { + zero_read = role; + } + if (osd_set[role] != 0 && (w || stripes[role].read_end != 0)) + { + n_subops++; + } + } + if (!n_subops && submit_type == SUBMIT_RMW_READ) + { + n_subops = 1; + } + else + { + zero_read = -1; + } + uint64_t op_version = w ? op_data->fact_ver+1 : (submit_type == SUBMIT_RMW_READ ? UINT64_MAX : op_data->target_ver); + osd_op_t *subops = new osd_op_t[n_subops]; + op_data->fact_ver = 0; + op_data->done = op_data->errors = 0; + op_data->n_subops = n_subops; + op_data->subops = subops; + int subop = 0; + for (int role = 0; role < pg_size; role++) + { + // We always submit zero-length writes to all replicas, even if the stripe is not modified + if (!(w || stripes[role].read_end != 0 || zero_read == role)) + { + continue; + } + osd_num_t role_osd_num = osd_set[role]; + if (role_osd_num != 0) + { + if (role_osd_num == this->osd_num) + { + subops[subop].bs_op = new blockstore_op_t({ + .opcode = (uint64_t)(w ? BS_OP_WRITE : BS_OP_READ), + .callback = [cur_op, this](blockstore_op_t *subop) + { + if (subop->opcode == BS_OP_WRITE && subop->retval != subop->len) + { + // die + throw std::runtime_error("local write operation failed (retval = "+std::to_string(subop->retval)+")"); + } + handle_primary_subop( + subop->opcode == BS_OP_WRITE ? OSD_OP_SECONDARY_WRITE : OSD_OP_SECONDARY_READ, + cur_op, subop->retval, subop->len, subop->version + ); + }, + .oid = { + .inode = op_data->oid.inode, + .stripe = op_data->oid.stripe | role, + }, + .version = op_version, + .offset = w ? stripes[role].write_start : stripes[role].read_start, + .len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start, + .buf = w ? stripes[role].write_buf : stripes[role].read_buf, + }); + bs->enqueue_op(subops[subop].bs_op); + } + else + { + subops[subop].op_type = OSD_OP_OUT; + subops[subop].send_list.push_back(subops[subop].req.buf, OSD_PACKET_SIZE); + subops[subop].peer_fd = this->osd_peer_fds.at(role_osd_num); + subops[subop].req.sec_rw = { + .header = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = this->next_subop_id++, + .opcode = (uint64_t)(w ? OSD_OP_SECONDARY_WRITE : OSD_OP_SECONDARY_READ), + }, + .oid = { + .inode = op_data->oid.inode, + .stripe = op_data->oid.stripe | role, + }, + .version = op_version, + .offset = w ? stripes[role].write_start : stripes[role].read_start, + .len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start, + }; + subops[subop].buf = w ? stripes[role].write_buf : stripes[role].read_buf; + if (w && stripes[role].write_end > 0) + { + subops[subop].send_list.push_back(stripes[role].write_buf, stripes[role].write_end - stripes[role].write_start); + } + subops[subop].callback = [cur_op, this](osd_op_t *subop) + { + int fail_fd = subop->req.hdr.opcode == OSD_OP_SECONDARY_WRITE && + subop->reply.hdr.retval != subop->req.sec_rw.len ? subop->peer_fd : -1; + // so it doesn't get freed + subop->buf = NULL; + handle_primary_subop( + subop->req.hdr.opcode, cur_op, subop->reply.hdr.retval, + subop->req.sec_rw.len, subop->reply.sec_rw.version + ); + if (fail_fd >= 0) + { + // write operation failed, drop the connection + stop_client(fail_fd); + } + }; + outbox_push(clients[subops[subop].peer_fd], &subops[subop]); + } + subop++; + } + } +} + +void osd_t::handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, int expected, uint64_t version) +{ + osd_primary_op_data_t *op_data = cur_op->op_data; + if (retval != expected) + { + printf("%s subop failed: retval = %d (expected %d)\n", osd_op_names[opcode], retval, expected); + if (retval == -EPIPE) + { + op_data->epipe++; + } + op_data->errors++; + } + else + { + op_data->done++; + if (opcode == OSD_OP_SECONDARY_READ || opcode == OSD_OP_SECONDARY_WRITE) + { + if (op_data->fact_ver != 0 && op_data->fact_ver != version) + { + throw std::runtime_error("different fact_versions returned from subops: "+std::to_string(version)+" vs "+std::to_string(op_data->fact_ver)); + } + op_data->fact_ver = version; + } + } + if ((op_data->errors + op_data->done) >= op_data->n_subops) + { + delete[] op_data->subops; + op_data->subops = NULL; + op_data->st++; + if (cur_op->req.hdr.opcode == OSD_OP_READ) + { + continue_primary_read(cur_op); + } + else if (cur_op->req.hdr.opcode == OSD_OP_WRITE) + { + continue_primary_write(cur_op); + } + else if (cur_op->req.hdr.opcode == OSD_OP_SYNC) + { + continue_primary_sync(cur_op); + } + else + { + throw std::runtime_error("BUG: unknown opcode"); + } + } +} + +void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_state_t *object_state) +{ + osd_primary_op_data_t *op_data = cur_op->op_data; + int extra_chunks = 0; + for (auto chunk: object_state->osd_set) + { + if (chunk.osd_num != cur_set[chunk.role]) + { + extra_chunks++; + } + } + op_data->n_subops = extra_chunks; + op_data->done = op_data->errors = 0; + if (!extra_chunks) + { + return; + } + osd_op_t *subops = new osd_op_t[extra_chunks]; + op_data->subops = subops; + int i = 0; + for (auto chunk: object_state->osd_set) + { + if (chunk.osd_num != cur_set[chunk.role]) + { + if (chunk.osd_num == this->osd_num) + { + subops[i].bs_op = new blockstore_op_t({ + .opcode = BS_OP_DELETE, + .callback = [cur_op, this](blockstore_op_t *subop) + { + if (subop->retval != 0) + { + // die + throw std::runtime_error("local delete operation failed"); + } + handle_primary_subop(OSD_OP_SECONDARY_DELETE, cur_op, subop->retval, 0, 0); + }, + .oid = { + .inode = op_data->oid.inode, + .stripe = op_data->oid.stripe | chunk.role, + }, + // Same version as write + .version = op_data->fact_ver, + }); + bs->enqueue_op(subops[i].bs_op); + } + else + { + subops[i].op_type = OSD_OP_OUT; + subops[i].send_list.push_back(subops[i].req.buf, OSD_PACKET_SIZE); + subops[i].peer_fd = osd_peer_fds.at(chunk.osd_num); + subops[i].req.sec_del = { + .header = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = this->next_subop_id++, + .opcode = OSD_OP_SECONDARY_DELETE, + }, + .oid = { + .inode = op_data->oid.inode, + .stripe = op_data->oid.stripe | chunk.role, + }, + // Same version as write + .version = op_data->fact_ver, + }; + subops[i].callback = [cur_op, this](osd_op_t *subop) + { + int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : 0; + handle_primary_subop(OSD_OP_SECONDARY_DELETE, cur_op, subop->reply.hdr.retval, 0, 0); + if (fail_fd >= 0) + { + // delete operation failed, drop the connection + stop_client(fail_fd); + } + }; + outbox_push(clients[subops[i].peer_fd], &subops[i]); + } + i++; + } + } +} + +void osd_t::submit_primary_sync_subops(osd_op_t *cur_op) +{ + osd_primary_op_data_t *op_data = cur_op->op_data; + int n_osds = op_data->unstable_write_osds->size(); + osd_op_t *subops = new osd_op_t[n_osds]; + op_data->done = op_data->errors = 0; + op_data->n_subops = n_osds; + op_data->subops = subops; + for (int i = 0; i < n_osds; i++) + { + osd_num_t sync_osd = (*(op_data->unstable_write_osds))[i].osd_num; + if (sync_osd == this->osd_num) + { + subops[i].bs_op = new blockstore_op_t({ + .opcode = BS_OP_SYNC, + .callback = [cur_op, this](blockstore_op_t *subop) + { + if (subop->retval != 0) + { + // die + throw std::runtime_error("local sync operation failed"); + } + handle_primary_subop(OSD_OP_SECONDARY_SYNC, cur_op, subop->retval, 0, 0); + }, + }); + bs->enqueue_op(subops[i].bs_op); + } + else + { + subops[i].op_type = OSD_OP_OUT; + subops[i].send_list.push_back(subops[i].req.buf, OSD_PACKET_SIZE); + subops[i].peer_fd = osd_peer_fds.at(sync_osd); + subops[i].req.sec_sync = { + .header = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = this->next_subop_id++, + .opcode = OSD_OP_SECONDARY_SYNC, + }, + }; + subops[i].callback = [cur_op, this](osd_op_t *subop) + { + int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : 0; + handle_primary_subop(OSD_OP_SECONDARY_SYNC, cur_op, subop->reply.hdr.retval, 0, 0); + if (fail_fd >= 0) + { + // sync operation failed, drop the connection + stop_client(fail_fd); + } + }; + outbox_push(clients[subops[i].peer_fd], &subops[i]); + } + } +} + +void osd_t::submit_primary_stab_subops(osd_op_t *cur_op) +{ + osd_primary_op_data_t *op_data = cur_op->op_data; + int n_osds = op_data->unstable_write_osds->size(); + osd_op_t *subops = new osd_op_t[n_osds]; + op_data->done = op_data->errors = 0; + op_data->n_subops = n_osds; + op_data->subops = subops; + for (int i = 0; i < n_osds; i++) + { + auto & stab_osd = (*(op_data->unstable_write_osds))[i]; + if (stab_osd.osd_num == this->osd_num) + { + subops[i].bs_op = new blockstore_op_t({ + .opcode = BS_OP_STABLE, + .callback = [cur_op, this](blockstore_op_t *subop) + { + if (subop->retval != 0) + { + // die + throw std::runtime_error("local stabilize operation failed"); + } + handle_primary_subop(OSD_OP_SECONDARY_STABILIZE, cur_op, subop->retval, 0, 0); + }, + .len = (uint32_t)stab_osd.len, + .buf = (void*)(op_data->unstable_writes + stab_osd.start), + }); + bs->enqueue_op(subops[i].bs_op); + } + else + { + subops[i].op_type = OSD_OP_OUT; + subops[i].send_list.push_back(subops[i].req.buf, OSD_PACKET_SIZE); + subops[i].peer_fd = osd_peer_fds.at(stab_osd.osd_num); + subops[i].req.sec_stab = { + .header = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = this->next_subop_id++, + .opcode = OSD_OP_SECONDARY_STABILIZE, + }, + .len = (uint64_t)(stab_osd.len * sizeof(obj_ver_id)), + }; + subops[i].send_list.push_back(op_data->unstable_writes + stab_osd.start, stab_osd.len * sizeof(obj_ver_id)); + subops[i].callback = [cur_op, this](osd_op_t *subop) + { + int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : 0; + handle_primary_subop(OSD_OP_SECONDARY_STABILIZE, cur_op, subop->reply.hdr.retval, 0, 0); + if (fail_fd >= 0) + { + // sync operation failed, drop the connection + stop_client(fail_fd); + } + }; + outbox_push(clients[subops[i].peer_fd], &subops[i]); + } + } +} + +void osd_t::pg_cancel_write_queue(pg_t & pg, object_id oid, int retval) +{ + auto st_it = pg.write_queue.find(oid), it = st_it; + while (it != pg.write_queue.end() && it->first == oid) + { + finish_op(it->second, retval); + it++; + } + if (st_it != it) + { + pg.write_queue.erase(st_it, it); + } +}