Simplify handle_primary_subop() arguments
parent
46e111272f
commit
a56f8cd14e
2
osd.h
2
osd.h
|
@ -194,7 +194,7 @@ class osd_t
|
||||||
bool check_write_queue(osd_op_t *cur_op, pg_t & pg);
|
bool check_write_queue(osd_op_t *cur_op, pg_t & pg);
|
||||||
void remove_object_from_state(object_id & oid, pg_osd_set_state_t *object_state, pg_t &pg);
|
void remove_object_from_state(object_id & oid, pg_osd_set_state_t *object_state, pg_t &pg);
|
||||||
bool finalize_primary_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state);
|
bool finalize_primary_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state);
|
||||||
void handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, int expected, uint64_t version);
|
void handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op);
|
||||||
void handle_primary_bs_subop(osd_op_t *subop);
|
void handle_primary_bs_subop(osd_op_t *subop);
|
||||||
void add_bs_subop_stats(osd_op_t *subop);
|
void add_bs_subop_stats(osd_op_t *subop);
|
||||||
void pg_cancel_write_queue(pg_t & pg, osd_op_t *first_op, object_id oid, int retval);
|
void pg_cancel_write_queue(pg_t & pg, osd_op_t *first_op, object_id oid, int retval);
|
||||||
|
|
|
@ -206,17 +206,6 @@ void pg_obj_state_check_t::finish_object()
|
||||||
if (log_level > 1)
|
if (log_level > 1)
|
||||||
{
|
{
|
||||||
printf("Object is incomplete: inode=%lu stripe=%lu version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver);
|
printf("Object is incomplete: inode=%lu stripe=%lu version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver);
|
||||||
for (int i = ver_start; i < ver_end; i++)
|
|
||||||
{
|
|
||||||
printf("Present on: osd %lu, role %ld%s\n", list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : "");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (log_level > 2)
|
|
||||||
{
|
|
||||||
for (int i = obj_start; i < obj_end; i++)
|
|
||||||
{
|
|
||||||
printf("v%lu present on: osd %lu, role %ld%s\n", list[i].version, list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : "");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
state = OBJ_INCOMPLETE;
|
state = OBJ_INCOMPLETE;
|
||||||
pg->state = pg->state | PG_HAS_INCOMPLETE;
|
pg->state = pg->state | PG_HAS_INCOMPLETE;
|
||||||
|
@ -226,11 +215,21 @@ void pg_obj_state_check_t::finish_object()
|
||||||
if (log_level > 1)
|
if (log_level > 1)
|
||||||
{
|
{
|
||||||
printf("Object is degraded: inode=%lu stripe=%lu version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver);
|
printf("Object is degraded: inode=%lu stripe=%lu version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver);
|
||||||
for (int i = ver_start; i < ver_end; i++)
|
}
|
||||||
|
state = OBJ_DEGRADED;
|
||||||
|
pg->state = pg->state | PG_HAS_DEGRADED;
|
||||||
|
}
|
||||||
|
if (n_mismatched > 0)
|
||||||
{
|
{
|
||||||
printf("Present on: osd %lu, role %ld%s\n", list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : "");
|
if (n_roles >= pg->pg_cursize && log_level > 1)
|
||||||
|
{
|
||||||
|
printf("Object is misplaced: inode=%lu stripe=%lu version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver);
|
||||||
}
|
}
|
||||||
|
state |= OBJ_MISPLACED;
|
||||||
|
pg->state = pg->state | PG_HAS_MISPLACED;
|
||||||
}
|
}
|
||||||
|
if (log_level > 1 && (n_roles < pg->pg_cursize || n_mismatched > 0))
|
||||||
|
{
|
||||||
if (log_level > 2)
|
if (log_level > 2)
|
||||||
{
|
{
|
||||||
for (int i = obj_start; i < obj_end; i++)
|
for (int i = obj_start; i < obj_end; i++)
|
||||||
|
@ -238,13 +237,13 @@ void pg_obj_state_check_t::finish_object()
|
||||||
printf("v%lu present on: osd %lu, role %ld%s\n", list[i].version, list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : "");
|
printf("v%lu present on: osd %lu, role %ld%s\n", list[i].version, list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : "");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
state = OBJ_DEGRADED;
|
else
|
||||||
pg->state = pg->state | PG_HAS_DEGRADED;
|
|
||||||
}
|
|
||||||
if (n_mismatched > 0)
|
|
||||||
{
|
{
|
||||||
state |= OBJ_MISPLACED;
|
for (int i = ver_start; i < ver_end; i++)
|
||||||
pg->state = pg->state | PG_HAS_MISPLACED;
|
{
|
||||||
|
printf("Target version present on: osd %lu, role %ld%s\n", list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : "");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pg->total_count++;
|
pg->total_count++;
|
||||||
if (state != 0 || ver_end < obj_end)
|
if (state != 0 || ver_end < obj_end)
|
||||||
|
|
|
@ -138,6 +138,13 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t*
|
||||||
.len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start,
|
.len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start,
|
||||||
.buf = w ? stripes[role].write_buf : stripes[role].read_buf,
|
.buf = w ? stripes[role].write_buf : stripes[role].read_buf,
|
||||||
});
|
});
|
||||||
|
#ifdef OSD_DEBUG
|
||||||
|
printf(
|
||||||
|
"Submit %s to local: %lu:%lu v%lu %u-%u\n", w ? "write" : "read",
|
||||||
|
op_data->oid.inode, op_data->oid.stripe | role, op_version,
|
||||||
|
subops[i].bs_op->offset, subops[i].bs_op->len
|
||||||
|
);
|
||||||
|
#endif
|
||||||
bs->enqueue_op(subops[i].bs_op);
|
bs->enqueue_op(subops[i].bs_op);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -159,6 +166,13 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t*
|
||||||
.offset = w ? stripes[role].write_start : stripes[role].read_start,
|
.offset = w ? stripes[role].write_start : stripes[role].read_start,
|
||||||
.len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start,
|
.len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start,
|
||||||
};
|
};
|
||||||
|
#ifdef OSD_DEBUG
|
||||||
|
printf(
|
||||||
|
"Submit %s to osd %lu: %lu:%lu v%lu %u-%u\n", w ? "write" : "read", role_osd_num,
|
||||||
|
op_data->oid.inode, op_data->oid.stripe | role, op_version,
|
||||||
|
subops[i].req.sec_rw.offset, subops[i].req.sec_rw.len
|
||||||
|
);
|
||||||
|
#endif
|
||||||
subops[i].buf = w ? stripes[role].write_buf : stripes[role].read_buf;
|
subops[i].buf = w ? stripes[role].write_buf : stripes[role].read_buf;
|
||||||
if (w && stripes[role].write_end > 0)
|
if (w && stripes[role].write_end > 0)
|
||||||
{
|
{
|
||||||
|
@ -170,10 +184,7 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t*
|
||||||
subop->reply.hdr.retval != subop->req.sec_rw.len ? subop->peer_fd : -1;
|
subop->reply.hdr.retval != subop->req.sec_rw.len ? subop->peer_fd : -1;
|
||||||
// so it doesn't get freed
|
// so it doesn't get freed
|
||||||
subop->buf = NULL;
|
subop->buf = NULL;
|
||||||
handle_primary_subop(
|
handle_primary_subop(subop, cur_op);
|
||||||
subop->req.hdr.opcode, cur_op, subop->reply.hdr.retval,
|
|
||||||
subop->req.sec_rw.len, subop->reply.sec_rw.version
|
|
||||||
);
|
|
||||||
if (fail_fd >= 0)
|
if (fail_fd >= 0)
|
||||||
{
|
{
|
||||||
// write operation failed, drop the connection
|
// write operation failed, drop the connection
|
||||||
|
@ -213,12 +224,16 @@ void osd_t::handle_primary_bs_subop(osd_op_t *subop)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
add_bs_subop_stats(subop);
|
add_bs_subop_stats(subop);
|
||||||
uint64_t opcode = bs_op_to_osd_op[bs_op->opcode];
|
subop->req.hdr.opcode = bs_op_to_osd_op[bs_op->opcode];
|
||||||
int retval = bs_op->retval;
|
subop->reply.hdr.retval = bs_op->retval;
|
||||||
uint64_t version = bs_op->version;
|
if (bs_op->opcode == BS_OP_READ || bs_op->opcode == BS_OP_WRITE)
|
||||||
|
{
|
||||||
|
subop->req.sec_rw.len = bs_op->len;
|
||||||
|
subop->reply.sec_rw.version = bs_op->version;
|
||||||
|
}
|
||||||
delete bs_op;
|
delete bs_op;
|
||||||
subop->bs_op = NULL;
|
subop->bs_op = NULL;
|
||||||
handle_primary_subop(opcode, cur_op, retval, expected, version);
|
handle_primary_subop(subop, cur_op);
|
||||||
}
|
}
|
||||||
|
|
||||||
void osd_t::add_bs_subop_stats(osd_op_t *subop)
|
void osd_t::add_bs_subop_stats(osd_op_t *subop)
|
||||||
|
@ -244,8 +259,12 @@ void osd_t::add_bs_subop_stats(osd_op_t *subop)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void osd_t::handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, int expected, uint64_t version)
|
void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op)
|
||||||
{
|
{
|
||||||
|
uint64_t opcode = subop->req.hdr.opcode;
|
||||||
|
int retval = subop->reply.hdr.retval;
|
||||||
|
int expected = opcode == OSD_OP_SECONDARY_READ || opcode == OSD_OP_SECONDARY_WRITE
|
||||||
|
? subop->req.sec_rw.len : 0;
|
||||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||||
if (retval != expected)
|
if (retval != expected)
|
||||||
{
|
{
|
||||||
|
@ -261,6 +280,12 @@ void osd_t::handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval,
|
||||||
op_data->done++;
|
op_data->done++;
|
||||||
if (opcode == OSD_OP_SECONDARY_READ || opcode == OSD_OP_SECONDARY_WRITE)
|
if (opcode == OSD_OP_SECONDARY_READ || opcode == OSD_OP_SECONDARY_WRITE)
|
||||||
{
|
{
|
||||||
|
uint64_t version = subop->reply.sec_rw.version;
|
||||||
|
#ifdef OSD_DEBUG
|
||||||
|
uint64_t peer_osd = c_cli.clients.find(subop->peer_fd) != c_cli.clients.end()
|
||||||
|
? c_cli.clients[subop->peer_fd].osd_num : osd_num;
|
||||||
|
printf("subop %lu from osd %lu: version = %lu\n", opcode, peer_osd, version);
|
||||||
|
#endif
|
||||||
if (op_data->fact_ver != 0 && op_data->fact_ver != version)
|
if (op_data->fact_ver != 0 && op_data->fact_ver != version)
|
||||||
{
|
{
|
||||||
throw std::runtime_error(
|
throw std::runtime_error(
|
||||||
|
@ -380,7 +405,7 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_os
|
||||||
subops[i].callback = [cur_op, this](osd_op_t *subop)
|
subops[i].callback = [cur_op, this](osd_op_t *subop)
|
||||||
{
|
{
|
||||||
int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : -1;
|
int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : -1;
|
||||||
handle_primary_subop(OSD_OP_SECONDARY_DELETE, cur_op, subop->reply.hdr.retval, 0, 0);
|
handle_primary_subop(subop, cur_op);
|
||||||
if (fail_fd >= 0)
|
if (fail_fd >= 0)
|
||||||
{
|
{
|
||||||
// delete operation failed, drop the connection
|
// delete operation failed, drop the connection
|
||||||
|
@ -433,7 +458,7 @@ void osd_t::submit_primary_sync_subops(osd_op_t *cur_op)
|
||||||
subops[i].callback = [cur_op, this](osd_op_t *subop)
|
subops[i].callback = [cur_op, this](osd_op_t *subop)
|
||||||
{
|
{
|
||||||
int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : -1;
|
int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : -1;
|
||||||
handle_primary_subop(OSD_OP_SECONDARY_SYNC, cur_op, subop->reply.hdr.retval, 0, 0);
|
handle_primary_subop(subop, cur_op);
|
||||||
if (fail_fd >= 0)
|
if (fail_fd >= 0)
|
||||||
{
|
{
|
||||||
// sync operation failed, drop the connection
|
// sync operation failed, drop the connection
|
||||||
|
@ -488,7 +513,7 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
|
||||||
subops[i].callback = [cur_op, this](osd_op_t *subop)
|
subops[i].callback = [cur_op, this](osd_op_t *subop)
|
||||||
{
|
{
|
||||||
int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : -1;
|
int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : -1;
|
||||||
handle_primary_subop(OSD_OP_SECONDARY_STABILIZE, cur_op, subop->reply.hdr.retval, 0, 0);
|
handle_primary_subop(subop, cur_op);
|
||||||
if (fail_fd >= 0)
|
if (fail_fd >= 0)
|
||||||
{
|
{
|
||||||
// sync operation failed, drop the connection
|
// sync operation failed, drop the connection
|
||||||
|
|
73
osd_test.cpp
73
osd_test.cpp
|
@ -19,6 +19,8 @@
|
||||||
|
|
||||||
int connect_osd(const char *osd_address, int osd_port);
|
int connect_osd(const char *osd_address, int osd_port);
|
||||||
|
|
||||||
|
uint64_t test_read(int connect_fd, uint64_t inode, uint64_t stripe, uint64_t version, uint64_t offset, uint64_t len);
|
||||||
|
|
||||||
uint64_t test_write(int connect_fd, uint64_t inode, uint64_t stripe, uint64_t version, uint64_t pattern);
|
uint64_t test_write(int connect_fd, uint64_t inode, uint64_t stripe, uint64_t version, uint64_t pattern);
|
||||||
|
|
||||||
void* test_primary_read(int connect_fd, uint64_t inode, uint64_t offset, uint64_t len);
|
void* test_primary_read(int connect_fd, uint64_t inode, uint64_t offset, uint64_t len);
|
||||||
|
@ -105,7 +107,7 @@ int main3(int narg, char *args[])
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int main(int narg, char *args[])
|
int main4(int narg, char *args[])
|
||||||
{
|
{
|
||||||
int connect_fd;
|
int connect_fd;
|
||||||
// Cluster write (sync not implemented yet)
|
// Cluster write (sync not implemented yet)
|
||||||
|
@ -117,6 +119,15 @@ int main(int narg, char *args[])
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int main(int narg, char *args[])
|
||||||
|
{
|
||||||
|
int connect_fd;
|
||||||
|
connect_fd = connect_osd("192.168.7.2", 43051);
|
||||||
|
test_read(connect_fd, 1, 1039663104, UINT64_MAX, 0, 128*1024);
|
||||||
|
close(connect_fd);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int connect_osd(const char *osd_address, int osd_port)
|
int connect_osd(const char *osd_address, int osd_port)
|
||||||
{
|
{
|
||||||
struct sockaddr_in addr;
|
struct sockaddr_in addr;
|
||||||
|
@ -167,6 +178,66 @@ bool check_reply(int r, osd_any_op_t & op, osd_any_reply_t & reply, int expected
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint64_t test_read(int connect_fd, uint64_t inode, uint64_t stripe, uint64_t version, uint64_t offset, uint64_t len)
|
||||||
|
{
|
||||||
|
osd_any_op_t op;
|
||||||
|
osd_any_reply_t reply;
|
||||||
|
op.hdr.magic = SECONDARY_OSD_OP_MAGIC;
|
||||||
|
op.hdr.id = 1;
|
||||||
|
op.hdr.opcode = OSD_OP_SECONDARY_READ;
|
||||||
|
op.sec_rw.oid = {
|
||||||
|
.inode = inode,
|
||||||
|
.stripe = stripe,
|
||||||
|
};
|
||||||
|
op.sec_rw.version = version;
|
||||||
|
op.sec_rw.offset = offset;
|
||||||
|
op.sec_rw.len = len;
|
||||||
|
void *data = memalign(MEM_ALIGNMENT, op.sec_rw.len);
|
||||||
|
write_blocking(connect_fd, op.buf, OSD_PACKET_SIZE);
|
||||||
|
int r = read_blocking(connect_fd, reply.buf, OSD_PACKET_SIZE);
|
||||||
|
if (!check_reply(r, op, reply, op.sec_rw.len))
|
||||||
|
{
|
||||||
|
free(data);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
r = read_blocking(connect_fd, data, len);
|
||||||
|
if (r != len)
|
||||||
|
{
|
||||||
|
free(data);
|
||||||
|
perror("read data");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
free(data);
|
||||||
|
printf("Read %lu:%lu v%lu = v%lu\n", inode, stripe, version, reply.sec_rw.version);
|
||||||
|
op.hdr.opcode = OSD_OP_SECONDARY_LIST;
|
||||||
|
op.sec_list.list_pg = 1;
|
||||||
|
op.sec_list.pg_count = 1;
|
||||||
|
op.sec_list.pg_stripe_size = 4*1024*1024;
|
||||||
|
write_blocking(connect_fd, op.buf, OSD_PACKET_SIZE);
|
||||||
|
r = read_blocking(connect_fd, reply.buf, OSD_PACKET_SIZE);
|
||||||
|
if (reply.hdr.retval < 0 || !check_reply(r, op, reply, reply.hdr.retval))
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
data = memalign(MEM_ALIGNMENT, sizeof(obj_ver_id)*reply.hdr.retval);
|
||||||
|
r = read_blocking(connect_fd, data, sizeof(obj_ver_id)*reply.hdr.retval);
|
||||||
|
if (r != sizeof(obj_ver_id)*reply.hdr.retval)
|
||||||
|
{
|
||||||
|
free(data);
|
||||||
|
perror("read data");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
obj_ver_id *ov = (obj_ver_id*)data;
|
||||||
|
for (int i = 0; i < reply.hdr.retval; i++)
|
||||||
|
{
|
||||||
|
if (ov[i].oid.inode == inode && (ov[i].oid.stripe & ~(4096-1)) == (stripe & ~(4096-1)))
|
||||||
|
{
|
||||||
|
printf("list: %lu:%lu v%lu stable=%d\n", ov[i].oid.inode, ov[i].oid.stripe, ov[i].version, i < reply.sec_list.stable_count ? 1 : 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
uint64_t test_write(int connect_fd, uint64_t inode, uint64_t stripe, uint64_t version, uint64_t pattern)
|
uint64_t test_write(int connect_fd, uint64_t inode, uint64_t stripe, uint64_t version, uint64_t pattern)
|
||||||
{
|
{
|
||||||
osd_any_op_t op;
|
osd_any_op_t op;
|
||||||
|
|
Loading…
Reference in New Issue