diff --git a/src/msgr_receive.cpp b/src/msgr_receive.cpp index afcd2a0c..8be182cf 100644 --- a/src/msgr_receive.cpp +++ b/src/msgr_receive.cpp @@ -232,6 +232,15 @@ void osd_messenger_t::handle_op_hdr(osd_client_t *cl) } cl->read_remaining = cur_op->req.sec_stab.len; } + else if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP) + { + if (cur_op->req.sec_read_bmp.len > 0) + { + cur_op->buf = memalign_or_die(MEM_ALIGNMENT, cur_op->req.sec_read_bmp.len); + cl->recv_list.push_back(cur_op->buf, cur_op->req.sec_read_bmp.len); + } + cl->read_remaining = cur_op->req.sec_read_bmp.len; + } else if (cur_op->req.hdr.opcode == OSD_OP_READ) { cl->read_remaining = 0; @@ -277,17 +286,19 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl) { // Read data. In this case we assume that the buffer is preallocated by the caller (!) unsigned bmp_len = (op->reply.hdr.opcode == OSD_OP_SEC_READ ? op->reply.sec_rw.attr_len : op->reply.rw.bitmap_len); - if (op->reply.hdr.retval != (op->reply.hdr.opcode == OSD_OP_SEC_READ ? op->req.sec_rw.len : op->req.rw.len) || - bmp_len > op->bitmap_len) + unsigned expected_size = (op->reply.hdr.opcode == OSD_OP_SEC_READ ? op->req.sec_rw.len : op->req.rw.len); + if (op->reply.hdr.retval >= 0 && (op->reply.hdr.retval != expected_size || bmp_len > op->bitmap_len)) { // Check reply length to not overflow the buffer - printf("Client %d read reply of different length\n", cl->peer_fd); + printf("Client %d read reply of different length: expected %u+%u, got %ld+%u\n", + cl->peer_fd, expected_size, op->bitmap_len, op->reply.hdr.retval, bmp_len); cl->sent_ops[op->req.hdr.id] = op; stop_client(cl->peer_fd); return false; } - if (bmp_len > 0) + if (op->reply.hdr.retval >= 0 && bmp_len > 0) { + assert(op->bitmap); cl->recv_list.push_back(op->bitmap, bmp_len); } if (op->reply.hdr.retval > 0) @@ -314,6 +325,16 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl) op->buf = memalign_or_die(MEM_ALIGNMENT, cl->read_remaining); cl->recv_list.push_back(op->buf, cl->read_remaining); } + else if (op->reply.hdr.opcode == OSD_OP_SEC_READ_BMP && op->reply.hdr.retval > 0) + { + assert(!op->iov.count); + delete cl->read_op; + cl->read_op = op; + cl->read_state = CL_READ_REPLY_DATA; + cl->read_remaining = op->reply.hdr.retval; + op->buf = memalign_or_die(MEM_ALIGNMENT, cl->read_remaining); + cl->recv_list.push_back(op->buf, cl->read_remaining); + } else if (op->reply.hdr.opcode == OSD_OP_SHOW_CONFIG && op->reply.hdr.retval > 0) { assert(!op->iov.count); diff --git a/src/msgr_send.cpp b/src/msgr_send.cpp index 115d5edc..8bdaf197 100644 --- a/src/msgr_send.cpp +++ b/src/msgr_send.cpp @@ -87,6 +87,14 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) to_outbox.push_back(NULL); } } + if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP) + { + if (cur_op->op_type == OSD_OP_IN && cur_op->reply.hdr.retval > 0) + to_send_list.push_back((iovec){ .iov_base = cur_op->buf, .iov_len = (size_t)cur_op->reply.hdr.retval }); + else if (cur_op->op_type == OSD_OP_OUT && cur_op->req.sec_read_bmp.len > 0) + to_send_list.push_back((iovec){ .iov_base = cur_op->buf, .iov_len = (size_t)cur_op->req.sec_read_bmp.len }); + to_outbox.push_back(NULL); + } if (cur_op->op_type == OSD_OP_IN) { // To free it later diff --git a/src/osd.cpp b/src/osd.cpp index d470c72f..523f9c11 100644 --- a/src/osd.cpp +++ b/src/osd.cpp @@ -231,6 +231,7 @@ void osd_t::exec_op(osd_op_t *cur_op) cur_op->req.hdr.opcode != OSD_OP_SEC_READ && cur_op->req.hdr.opcode != OSD_OP_SEC_LIST && cur_op->req.hdr.opcode != OSD_OP_READ && + cur_op->req.hdr.opcode != OSD_OP_SEC_READ_BMP && cur_op->req.hdr.opcode != OSD_OP_SHOW_CONFIG) { // Readonly mode diff --git a/src/osd_ops.cpp b/src/osd_ops.cpp index 0d8b1214..e2aa7bce 100644 --- a/src/osd_ops.cpp +++ b/src/osd_ops.cpp @@ -20,4 +20,5 @@ const char* osd_op_names[] = { "primary_sync", "primary_delete", "ping", + "sec_read_bmp", }; diff --git a/src/osd_ops.h b/src/osd_ops.h index 1d81f3af..94b185d0 100644 --- a/src/osd_ops.h +++ b/src/osd_ops.h @@ -28,7 +28,8 @@ #define OSD_OP_SYNC 13 #define OSD_OP_DELETE 14 #define OSD_OP_PING 15 -#define OSD_OP_MAX 15 +#define OSD_OP_SEC_READ_BMP 16 +#define OSD_OP_MAX 16 // Alignment & limit for read/write operations #ifndef MEM_ALIGNMENT #define MEM_ALIGNMENT 512 @@ -128,6 +129,20 @@ struct __attribute__((__packed__)) osd_reply_sec_stab_t }; typedef osd_reply_sec_stab_t osd_reply_sec_rollback_t; +// bulk read bitmaps from a secondary OSD +struct __attribute__((__packed__)) osd_op_sec_read_bmp_t +{ + osd_op_header_t header; + // obj_ver_id array length in bytes + uint64_t len; +}; + +struct __attribute__((__packed__)) osd_reply_sec_read_bmp_t +{ + // retval is payload length in bytes. payload is {version,bitmap}[] + osd_reply_header_t header; +}; + // show configuration struct __attribute__((__packed__)) osd_op_show_config_t { @@ -198,6 +213,7 @@ union osd_any_op_t osd_op_sec_del_t sec_del; osd_op_sec_sync_t sec_sync; osd_op_sec_stab_t sec_stab; + osd_op_sec_read_bmp_t sec_read_bmp; osd_op_sec_list_t sec_list; osd_op_show_config_t show_conf; osd_op_rw_t rw; @@ -212,6 +228,7 @@ union osd_any_reply_t osd_reply_sec_del_t sec_del; osd_reply_sec_sync_t sec_sync; osd_reply_sec_stab_t sec_stab; + osd_reply_sec_read_bmp_t sec_read_bmp; osd_reply_sec_list_t sec_list; osd_reply_show_config_t show_conf; osd_reply_rw_t rw; diff --git a/src/osd_secondary.cpp b/src/osd_secondary.cpp index 02331c51..5f4fb46f 100644 --- a/src/osd_secondary.cpp +++ b/src/osd_secondary.cpp @@ -44,6 +44,25 @@ void osd_t::secondary_op_callback(osd_op_t *op) void osd_t::exec_secondary(osd_op_t *cur_op) { + if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP) + { + int n = cur_op->req.sec_read_bmp.len / sizeof(obj_ver_id); + if (n > 0) + { + obj_ver_id *ov = (obj_ver_id*)cur_op->buf; + void *reply_buf = malloc_or_die(n * (8 + clean_entry_bitmap_size)); + void *cur_buf = reply_buf; + for (int i = 0; i < n; i++) + { + bs->read_bitmap(ov[i].oid, ov[i].version, cur_buf + sizeof(uint64_t), (uint64_t*)cur_buf); + cur_buf += (8 + clean_entry_bitmap_size); + } + free(cur_op->buf); + cur_op->buf = reply_buf; + } + finish_op(cur_op, n * (8 + clean_entry_bitmap_size)); + return; + } cur_op->bs_op = new blockstore_op_t(); cur_op->bs_op->callback = [this, cur_op](blockstore_op_t* bs_op) { secondary_op_callback(cur_op); }; cur_op->bs_op->opcode = (cur_op->req.hdr.opcode == OSD_OP_SEC_READ ? BS_OP_READ