Implement CAS writes
From now on, reads will return the server-side object version numbers and writes and deletes will have an additional "version" parameter which, if set to a non-zero value, will be atomically compared with the current version of the object plus 1 and the modification will fail if it doesn't match. This feature opens the road to correct online flattening of snapshot layers and other interesting things.allow-etcd-address-option
parent
f9fe72d40a
commit
891250d355
|
@ -633,6 +633,13 @@ resume_1:
|
||||||
// Slice the operation into parts
|
// Slice the operation into parts
|
||||||
slice_rw(op);
|
slice_rw(op);
|
||||||
op->needs_reslice = false;
|
op->needs_reslice = false;
|
||||||
|
if (op->opcode == OSD_OP_WRITE && op->version && op->parts.size() > 1)
|
||||||
|
{
|
||||||
|
// Atomic writes to multiple stripes are unsupported
|
||||||
|
op->retval = -EINVAL;
|
||||||
|
erase_op(op);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
resume_2:
|
resume_2:
|
||||||
// Send unsent parts, if they're not subject to change
|
// Send unsent parts, if they're not subject to change
|
||||||
op->state = 3;
|
op->state = 3;
|
||||||
|
@ -922,6 +929,7 @@ bool cluster_client_t::try_send(cluster_op_t *op, int i)
|
||||||
.offset = part->offset,
|
.offset = part->offset,
|
||||||
.len = part->len,
|
.len = part->len,
|
||||||
.meta_revision = meta_rev,
|
.meta_revision = meta_rev,
|
||||||
|
.version = op->opcode == OSD_OP_WRITE ? op->version : 0,
|
||||||
} },
|
} },
|
||||||
.bitmap = op->opcode == OSD_OP_WRITE ? NULL : op->part_bitmaps + pg_bitmap_size*i,
|
.bitmap = op->opcode == OSD_OP_WRITE ? NULL : op->part_bitmaps + pg_bitmap_size*i,
|
||||||
.bitmap_len = (unsigned)(op->opcode == OSD_OP_WRITE ? 0 : pg_bitmap_size),
|
.bitmap_len = (unsigned)(op->opcode == OSD_OP_WRITE ? 0 : pg_bitmap_size),
|
||||||
|
@ -1072,10 +1080,6 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part)
|
||||||
if (part->op.reply.hdr.retval != expected)
|
if (part->op.reply.hdr.retval != expected)
|
||||||
{
|
{
|
||||||
// Operation failed, retry
|
// Operation failed, retry
|
||||||
fprintf(
|
|
||||||
stderr, "%s operation failed on OSD %lu: retval=%ld (expected %d), dropping connection\n",
|
|
||||||
osd_op_names[part->op.req.hdr.opcode], part->osd_num, part->op.reply.hdr.retval, expected
|
|
||||||
);
|
|
||||||
if (part->op.reply.hdr.retval == -EPIPE)
|
if (part->op.reply.hdr.retval == -EPIPE)
|
||||||
{
|
{
|
||||||
// Mark op->up_wait = true before stopping the client
|
// Mark op->up_wait = true before stopping the client
|
||||||
|
@ -1094,7 +1098,14 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part)
|
||||||
// Don't overwrite other errors with -EPIPE
|
// Don't overwrite other errors with -EPIPE
|
||||||
op->retval = part->op.reply.hdr.retval;
|
op->retval = part->op.reply.hdr.retval;
|
||||||
}
|
}
|
||||||
msgr.stop_client(part->op.peer_fd);
|
if (op->retval != -EINTR && op->retval != -EIO)
|
||||||
|
{
|
||||||
|
fprintf(
|
||||||
|
stderr, "%s operation failed on OSD %lu: retval=%ld (expected %d), dropping connection\n",
|
||||||
|
osd_op_names[part->op.req.hdr.opcode], part->osd_num, part->op.reply.hdr.retval, expected
|
||||||
|
);
|
||||||
|
msgr.stop_client(part->op.peer_fd);
|
||||||
|
}
|
||||||
part->flags |= PART_ERROR;
|
part->flags |= PART_ERROR;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -1106,6 +1117,7 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part)
|
||||||
if (op->opcode == OSD_OP_READ)
|
if (op->opcode == OSD_OP_READ)
|
||||||
{
|
{
|
||||||
copy_part_bitmap(op, part);
|
copy_part_bitmap(op, part);
|
||||||
|
op->version = op->parts.size() == 1 ? part->op.reply.rw.version : 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (op->inflight_count == 0)
|
if (op->inflight_count == 0)
|
||||||
|
|
|
@ -31,6 +31,9 @@ struct cluster_op_t
|
||||||
uint64_t inode;
|
uint64_t inode;
|
||||||
uint64_t offset;
|
uint64_t offset;
|
||||||
uint64_t len;
|
uint64_t len;
|
||||||
|
// for reads and writes within a single object (stripe),
|
||||||
|
// reads can return current version and writes can use "CAS" semantics
|
||||||
|
uint64_t version = 0;
|
||||||
int retval;
|
int retval;
|
||||||
osd_op_buf_list_t iov;
|
osd_op_buf_list_t iov;
|
||||||
std::function<void(cluster_op_t*)> callback;
|
std::function<void(cluster_op_t*)> callback;
|
||||||
|
|
|
@ -261,7 +261,7 @@ void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer
|
||||||
{
|
{
|
||||||
osd_num_t peer_osd = clients.at(peer_fd)->osd_num;
|
osd_num_t peer_osd = clients.at(peer_fd)->osd_num;
|
||||||
stop_client(peer_fd, true);
|
stop_client(peer_fd, true);
|
||||||
on_connect_peer(peer_osd, -EIO);
|
on_connect_peer(peer_osd, -EPIPE);
|
||||||
return;
|
return;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -191,6 +191,9 @@ struct __attribute__((__packed__)) osd_op_rw_t
|
||||||
uint32_t flags;
|
uint32_t flags;
|
||||||
// inode metadata revision
|
// inode metadata revision
|
||||||
uint64_t meta_revision;
|
uint64_t meta_revision;
|
||||||
|
// object version for atomic "CAS" (compare-and-set) writes
|
||||||
|
// writes and deletes fail with -EINTR if object version differs from (version-1)
|
||||||
|
uint64_t version;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct __attribute__((__packed__)) osd_reply_rw_t
|
struct __attribute__((__packed__)) osd_reply_rw_t
|
||||||
|
@ -199,6 +202,8 @@ struct __attribute__((__packed__)) osd_reply_rw_t
|
||||||
// for reads: bitmap length
|
// for reads: bitmap length
|
||||||
uint32_t bitmap_len;
|
uint32_t bitmap_len;
|
||||||
uint32_t pad0;
|
uint32_t pad0;
|
||||||
|
// for reads: object version
|
||||||
|
uint64_t version;
|
||||||
};
|
};
|
||||||
|
|
||||||
// sync to the primary OSD
|
// sync to the primary OSD
|
||||||
|
|
|
@ -222,6 +222,7 @@ resume_2:
|
||||||
finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO);
|
finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
cur_op->reply.rw.version = op_data->fact_ver;
|
||||||
cur_op->reply.rw.bitmap_len = op_data->pg_data_size * clean_entry_bitmap_size;
|
cur_op->reply.rw.bitmap_len = op_data->pg_data_size * clean_entry_bitmap_size;
|
||||||
if (op_data->degraded)
|
if (op_data->degraded)
|
||||||
{
|
{
|
||||||
|
@ -343,6 +344,12 @@ resume_3:
|
||||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
// Check CAS version
|
||||||
|
if (cur_op->req.rw.version && op_data->fact_ver != (cur_op->req.rw.version-1))
|
||||||
|
{
|
||||||
|
cur_op->reply.hdr.retval = -EINTR;
|
||||||
|
goto continue_others;
|
||||||
|
}
|
||||||
// Save version override for parallel reads
|
// Save version override for parallel reads
|
||||||
pg.ver_override[op_data->oid] = op_data->fact_ver;
|
pg.ver_override[op_data->oid] = op_data->fact_ver;
|
||||||
// Submit deletes
|
// Submit deletes
|
||||||
|
@ -370,6 +377,8 @@ resume_5:
|
||||||
free_object_state(pg, &op_data->object_state);
|
free_object_state(pg, &op_data->object_state);
|
||||||
}
|
}
|
||||||
pg.total_count--;
|
pg.total_count--;
|
||||||
|
cur_op->reply.hdr.retval = 0;
|
||||||
|
continue_others:
|
||||||
osd_op_t *next_op = NULL;
|
osd_op_t *next_op = NULL;
|
||||||
auto next_it = pg.write_queue.find(op_data->oid);
|
auto next_it = pg.write_queue.find(op_data->oid);
|
||||||
if (next_it != pg.write_queue.end() && next_it->second == cur_op)
|
if (next_it != pg.write_queue.end() && next_it->second == cur_op)
|
||||||
|
@ -378,7 +387,7 @@ resume_5:
|
||||||
if (next_it != pg.write_queue.end() && next_it->first == op_data->oid)
|
if (next_it != pg.write_queue.end() && next_it->first == op_data->oid)
|
||||||
next_op = next_it->second;
|
next_op = next_it->second;
|
||||||
}
|
}
|
||||||
finish_op(cur_op, cur_op->req.rw.len);
|
finish_op(cur_op, cur_op->reply.hdr.retval);
|
||||||
if (next_op)
|
if (next_op)
|
||||||
{
|
{
|
||||||
// Continue next write to the same object
|
// Continue next write to the same object
|
||||||
|
|
|
@ -65,7 +65,10 @@ int osd_t::read_bitmaps(osd_op_t *cur_op, pg_t & pg, int base_state)
|
||||||
auto vo_it = pg.ver_override.find(cur_oid);
|
auto vo_it = pg.ver_override.find(cur_oid);
|
||||||
auto read_version = (vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX);
|
auto read_version = (vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX);
|
||||||
// Read bitmap synchronously from the local database
|
// Read bitmap synchronously from the local database
|
||||||
bs->read_bitmap(cur_oid, read_version, op_data->snapshot_bitmaps + chain_num*clean_entry_bitmap_size, NULL);
|
bs->read_bitmap(
|
||||||
|
cur_oid, read_version, op_data->snapshot_bitmaps + chain_num*clean_entry_bitmap_size,
|
||||||
|
!chain_num ? &cur_op->reply.rw.version : NULL
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -228,7 +231,10 @@ int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg)
|
||||||
// Read bitmap synchronously from the local database
|
// Read bitmap synchronously from the local database
|
||||||
for (int j = prev; j <= i; j++)
|
for (int j = prev; j <= i; j++)
|
||||||
{
|
{
|
||||||
bs->read_bitmap((*bitmap_requests)[j].oid, (*bitmap_requests)[j].version, (*bitmap_requests)[j].bmp_buf, NULL);
|
bs->read_bitmap(
|
||||||
|
(*bitmap_requests)[j].oid, (*bitmap_requests)[j].version, (*bitmap_requests)[j].bmp_buf,
|
||||||
|
(*bitmap_requests)[j].oid.inode == cur_op->req.rw.inode ? &cur_op->reply.rw.version : NULL
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -264,6 +270,10 @@ int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg)
|
||||||
for (int j = prev; j <= i; j++)
|
for (int j = prev; j <= i; j++)
|
||||||
{
|
{
|
||||||
memcpy((*bitmap_requests)[j].bmp_buf, cur_buf, clean_entry_bitmap_size);
|
memcpy((*bitmap_requests)[j].bmp_buf, cur_buf, clean_entry_bitmap_size);
|
||||||
|
if ((*bitmap_requests)[j].oid.inode == cur_op->req.rw.inode)
|
||||||
|
{
|
||||||
|
memcpy(&cur_op->reply.rw.version, cur_buf-8, 8);
|
||||||
|
}
|
||||||
cur_buf += 8 + clean_entry_bitmap_size;
|
cur_buf += 8 + clean_entry_bitmap_size;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -96,6 +96,12 @@ resume_3:
|
||||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
// Check CAS version
|
||||||
|
if (cur_op->req.rw.version && op_data->fact_ver != (cur_op->req.rw.version-1))
|
||||||
|
{
|
||||||
|
cur_op->reply.hdr.retval = -EINTR;
|
||||||
|
goto continue_others;
|
||||||
|
}
|
||||||
if (op_data->scheme == POOL_SCHEME_REPLICATED)
|
if (op_data->scheme == POOL_SCHEME_REPLICATED)
|
||||||
{
|
{
|
||||||
// Set bitmap bits
|
// Set bitmap bits
|
||||||
|
@ -265,7 +271,7 @@ continue_others:
|
||||||
next_op = next_it->second;
|
next_op = next_it->second;
|
||||||
}
|
}
|
||||||
// finish_op would invalidate next_it if it cleared pg.write_queue, but it doesn't do that :)
|
// finish_op would invalidate next_it if it cleared pg.write_queue, but it doesn't do that :)
|
||||||
finish_op(cur_op, cur_op->req.rw.len);
|
finish_op(cur_op, cur_op->reply.hdr.retval);
|
||||||
if (next_op)
|
if (next_op)
|
||||||
{
|
{
|
||||||
// Continue next write to the same object
|
// Continue next write to the same object
|
||||||
|
|
Loading…
Reference in New Issue