diff --git a/src/cluster_client.cpp b/src/cluster_client.cpp index 6bc2ff7e..7cf6419a 100644 --- a/src/cluster_client.cpp +++ b/src/cluster_client.cpp @@ -633,6 +633,13 @@ resume_1: // Slice the operation into parts slice_rw(op); op->needs_reslice = false; + if (op->opcode == OSD_OP_WRITE && op->version && op->parts.size() > 1) + { + // Atomic writes to multiple stripes are unsupported + op->retval = -EINVAL; + erase_op(op); + return 1; + } resume_2: // Send unsent parts, if they're not subject to change op->state = 3; @@ -922,6 +929,7 @@ bool cluster_client_t::try_send(cluster_op_t *op, int i) .offset = part->offset, .len = part->len, .meta_revision = meta_rev, + .version = op->opcode == OSD_OP_WRITE ? op->version : 0, } }, .bitmap = op->opcode == OSD_OP_WRITE ? NULL : op->part_bitmaps + pg_bitmap_size*i, .bitmap_len = (unsigned)(op->opcode == OSD_OP_WRITE ? 0 : pg_bitmap_size), @@ -1072,10 +1080,6 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part) if (part->op.reply.hdr.retval != expected) { // Operation failed, retry - fprintf( - stderr, "%s operation failed on OSD %lu: retval=%ld (expected %d), dropping connection\n", - osd_op_names[part->op.req.hdr.opcode], part->osd_num, part->op.reply.hdr.retval, expected - ); if (part->op.reply.hdr.retval == -EPIPE) { // Mark op->up_wait = true before stopping the client @@ -1094,7 +1098,14 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part) // Don't overwrite other errors with -EPIPE op->retval = part->op.reply.hdr.retval; } - msgr.stop_client(part->op.peer_fd); + if (op->retval != -EINTR && op->retval != -EIO) + { + fprintf( + stderr, "%s operation failed on OSD %lu: retval=%ld (expected %d), dropping connection\n", + osd_op_names[part->op.req.hdr.opcode], part->osd_num, part->op.reply.hdr.retval, expected + ); + msgr.stop_client(part->op.peer_fd); + } part->flags |= PART_ERROR; } else @@ -1106,6 +1117,7 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part) if (op->opcode == OSD_OP_READ) { copy_part_bitmap(op, part); + op->version = op->parts.size() == 1 ? part->op.reply.rw.version : 0; } } if (op->inflight_count == 0) diff --git a/src/cluster_client.h b/src/cluster_client.h index dcf3db32..456e6dd9 100644 --- a/src/cluster_client.h +++ b/src/cluster_client.h @@ -31,6 +31,9 @@ struct cluster_op_t uint64_t inode; uint64_t offset; uint64_t len; + // for reads and writes within a single object (stripe), + // reads can return current version and writes can use "CAS" semantics + uint64_t version = 0; int retval; osd_op_buf_list_t iov; std::function callback; diff --git a/src/messenger.cpp b/src/messenger.cpp index d25bfe1a..747402b6 100644 --- a/src/messenger.cpp +++ b/src/messenger.cpp @@ -261,7 +261,7 @@ void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer { osd_num_t peer_osd = clients.at(peer_fd)->osd_num; stop_client(peer_fd, true); - on_connect_peer(peer_osd, -EIO); + on_connect_peer(peer_osd, -EPIPE); return; }); } diff --git a/src/osd_ops.h b/src/osd_ops.h index e8078b71..98770701 100644 --- a/src/osd_ops.h +++ b/src/osd_ops.h @@ -191,6 +191,9 @@ struct __attribute__((__packed__)) osd_op_rw_t uint32_t flags; // inode metadata revision uint64_t meta_revision; + // object version for atomic "CAS" (compare-and-set) writes + // writes and deletes fail with -EINTR if object version differs from (version-1) + uint64_t version; }; struct __attribute__((__packed__)) osd_reply_rw_t @@ -199,6 +202,8 @@ struct __attribute__((__packed__)) osd_reply_rw_t // for reads: bitmap length uint32_t bitmap_len; uint32_t pad0; + // for reads: object version + uint64_t version; }; // sync to the primary OSD diff --git a/src/osd_primary.cpp b/src/osd_primary.cpp index 3a3e685e..52c463bb 100644 --- a/src/osd_primary.cpp +++ b/src/osd_primary.cpp @@ -222,6 +222,7 @@ resume_2: finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO); return; } + cur_op->reply.rw.version = op_data->fact_ver; cur_op->reply.rw.bitmap_len = op_data->pg_data_size * clean_entry_bitmap_size; if (op_data->degraded) { @@ -343,6 +344,12 @@ resume_3: pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); return; } + // Check CAS version + if (cur_op->req.rw.version && op_data->fact_ver != (cur_op->req.rw.version-1)) + { + cur_op->reply.hdr.retval = -EINTR; + goto continue_others; + } // Save version override for parallel reads pg.ver_override[op_data->oid] = op_data->fact_ver; // Submit deletes @@ -370,6 +377,8 @@ resume_5: free_object_state(pg, &op_data->object_state); } pg.total_count--; + cur_op->reply.hdr.retval = 0; +continue_others: osd_op_t *next_op = NULL; auto next_it = pg.write_queue.find(op_data->oid); if (next_it != pg.write_queue.end() && next_it->second == cur_op) @@ -378,7 +387,7 @@ resume_5: if (next_it != pg.write_queue.end() && next_it->first == op_data->oid) next_op = next_it->second; } - finish_op(cur_op, cur_op->req.rw.len); + finish_op(cur_op, cur_op->reply.hdr.retval); if (next_op) { // Continue next write to the same object diff --git a/src/osd_primary_chain.cpp b/src/osd_primary_chain.cpp index e9a2fbfb..64f60845 100644 --- a/src/osd_primary_chain.cpp +++ b/src/osd_primary_chain.cpp @@ -65,7 +65,10 @@ int osd_t::read_bitmaps(osd_op_t *cur_op, pg_t & pg, int base_state) auto vo_it = pg.ver_override.find(cur_oid); auto read_version = (vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX); // Read bitmap synchronously from the local database - bs->read_bitmap(cur_oid, read_version, op_data->snapshot_bitmaps + chain_num*clean_entry_bitmap_size, NULL); + bs->read_bitmap( + cur_oid, read_version, op_data->snapshot_bitmaps + chain_num*clean_entry_bitmap_size, + !chain_num ? &cur_op->reply.rw.version : NULL + ); } } else @@ -228,7 +231,10 @@ int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg) // Read bitmap synchronously from the local database for (int j = prev; j <= i; j++) { - bs->read_bitmap((*bitmap_requests)[j].oid, (*bitmap_requests)[j].version, (*bitmap_requests)[j].bmp_buf, NULL); + bs->read_bitmap( + (*bitmap_requests)[j].oid, (*bitmap_requests)[j].version, (*bitmap_requests)[j].bmp_buf, + (*bitmap_requests)[j].oid.inode == cur_op->req.rw.inode ? &cur_op->reply.rw.version : NULL + ); } } else @@ -264,6 +270,10 @@ int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg) for (int j = prev; j <= i; j++) { memcpy((*bitmap_requests)[j].bmp_buf, cur_buf, clean_entry_bitmap_size); + if ((*bitmap_requests)[j].oid.inode == cur_op->req.rw.inode) + { + memcpy(&cur_op->reply.rw.version, cur_buf-8, 8); + } cur_buf += 8 + clean_entry_bitmap_size; } } diff --git a/src/osd_primary_write.cpp b/src/osd_primary_write.cpp index d1e08f81..386d3af6 100644 --- a/src/osd_primary_write.cpp +++ b/src/osd_primary_write.cpp @@ -96,6 +96,12 @@ resume_3: pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); return; } + // Check CAS version + if (cur_op->req.rw.version && op_data->fact_ver != (cur_op->req.rw.version-1)) + { + cur_op->reply.hdr.retval = -EINTR; + goto continue_others; + } if (op_data->scheme == POOL_SCHEME_REPLICATED) { // Set bitmap bits @@ -265,7 +271,7 @@ continue_others: next_op = next_it->second; } // finish_op would invalidate next_it if it cleared pg.write_queue, but it doesn't do that :) - finish_op(cur_op, cur_op->req.rw.len); + finish_op(cur_op, cur_op->reply.hdr.retval); if (next_op) { // Continue next write to the same object