|
|
|
@ -5,6 +5,7 @@ |
|
|
|
|
#include <assert.h> |
|
|
|
|
#include "cluster_client.h" |
|
|
|
|
|
|
|
|
|
#define SCRAP_BUFFER_SIZE 4*1024*1024 |
|
|
|
|
#define PART_SENT 1 |
|
|
|
|
#define PART_DONE 2 |
|
|
|
|
#define PART_ERROR 4 |
|
|
|
@ -63,10 +64,8 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd |
|
|
|
|
st_cli.parse_config(config); |
|
|
|
|
st_cli.load_global_config(); |
|
|
|
|
|
|
|
|
|
// Temporary implementation: discard all bitmaps
|
|
|
|
|
// It will be of course replaced by the implementation of snapshots
|
|
|
|
|
scrap_bitmap_size = 4096; |
|
|
|
|
scrap_bitmap = malloc_or_die(scrap_bitmap_size); |
|
|
|
|
scrap_buffer_size = SCRAP_BUFFER_SIZE; |
|
|
|
|
scrap_buffer = malloc_or_die(scrap_buffer_size); |
|
|
|
|
|
|
|
|
|
if (ringloop) |
|
|
|
|
{ |
|
|
|
@ -91,7 +90,22 @@ cluster_client_t::~cluster_client_t() |
|
|
|
|
{ |
|
|
|
|
ringloop->unregister_consumer(&consumer); |
|
|
|
|
} |
|
|
|
|
free(scrap_bitmap); |
|
|
|
|
free(scrap_buffer); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
cluster_op_t::~cluster_op_t() |
|
|
|
|
{ |
|
|
|
|
if (buf) |
|
|
|
|
{ |
|
|
|
|
free(buf); |
|
|
|
|
buf = NULL; |
|
|
|
|
} |
|
|
|
|
if (bitmap_buf) |
|
|
|
|
{ |
|
|
|
|
free(bitmap_buf); |
|
|
|
|
part_bitmaps = NULL; |
|
|
|
|
bitmap_buf = NULL; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void cluster_client_t::continue_ops(bool up_retry) |
|
|
|
@ -193,6 +207,7 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config) |
|
|
|
|
{ |
|
|
|
|
bs_bitmap_granularity = DEFAULT_BITMAP_GRANULARITY; |
|
|
|
|
} |
|
|
|
|
bs_bitmap_size = bs_block_size / bs_bitmap_granularity / 8; |
|
|
|
|
uint32_t block_order; |
|
|
|
|
if ((block_order = is_power_of_two(bs_block_size)) >= 64 || bs_block_size < MIN_BLOCK_SIZE || bs_block_size >= MAX_BLOCK_SIZE) |
|
|
|
|
{ |
|
|
|
@ -269,7 +284,7 @@ void cluster_client_t::on_change_hook(json11::Json::object & changes) |
|
|
|
|
for (auto op: op_queue) |
|
|
|
|
{ |
|
|
|
|
if ((op->opcode == OSD_OP_WRITE || op->opcode == OSD_OP_READ) && |
|
|
|
|
INODE_POOL(op->inode) == pool_item.first) |
|
|
|
|
INODE_POOL(op->cur_inode) == pool_item.first) |
|
|
|
|
{ |
|
|
|
|
op->needs_reslice = true; |
|
|
|
|
} |
|
|
|
@ -334,6 +349,7 @@ void cluster_client_t::execute(cluster_op_t *op) |
|
|
|
|
std::function<void(cluster_op_t*)>(op->callback)(op); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
op->cur_inode = op->inode; |
|
|
|
|
op->retval = 0; |
|
|
|
|
if (op->opcode == OSD_OP_WRITE && !immediate_commit) |
|
|
|
|
{ |
|
|
|
@ -446,7 +462,7 @@ void cluster_client_t::flush_buffer(const object_id & oid, cluster_buffer_t *wr) |
|
|
|
|
cluster_op_t *op = new cluster_op_t; |
|
|
|
|
op->flags = OP_FLUSH_BUFFER; |
|
|
|
|
op->opcode = OSD_OP_WRITE; |
|
|
|
|
op->inode = oid.inode; |
|
|
|
|
op->cur_inode = op->inode = oid.inode; |
|
|
|
|
op->offset = oid.stripe; |
|
|
|
|
op->len = wr->len; |
|
|
|
|
op->iov.push_back(wr->buf, wr->len); |
|
|
|
@ -484,7 +500,7 @@ resume_0: |
|
|
|
|
return 1; |
|
|
|
|
} |
|
|
|
|
{ |
|
|
|
|
pool_id_t pool_id = INODE_POOL(op->inode); |
|
|
|
|
pool_id_t pool_id = INODE_POOL(op->cur_inode); |
|
|
|
|
if (!pool_id) |
|
|
|
|
{ |
|
|
|
|
op->retval = -EINVAL; |
|
|
|
@ -500,6 +516,13 @@ resume_0: |
|
|
|
|
} |
|
|
|
|
if (op->opcode == OSD_OP_WRITE) |
|
|
|
|
{ |
|
|
|
|
auto ino_it = st_cli.inode_config.find(op->inode); |
|
|
|
|
if (ino_it != st_cli.inode_config.end() && ino_it->second.readonly) |
|
|
|
|
{ |
|
|
|
|
op->retval = -EINVAL; |
|
|
|
|
std::function<void(cluster_op_t*)>(op->callback)(op); |
|
|
|
|
return 1; |
|
|
|
|
} |
|
|
|
|
if (!immediate_commit && !(op->flags & OP_FLUSH_BUFFER)) |
|
|
|
|
{ |
|
|
|
|
copy_write(op, dirty_buffers); |
|
|
|
@ -558,6 +581,22 @@ resume_3: |
|
|
|
|
// Finished successfully
|
|
|
|
|
// Even if the PG count has changed in meanwhile we treat it as success
|
|
|
|
|
// because if some operations were invalid for the new PG count we'd get errors
|
|
|
|
|
bool is_read = op->opcode == OSD_OP_READ; |
|
|
|
|
if (is_read) |
|
|
|
|
{ |
|
|
|
|
// Check parent inode
|
|
|
|
|
auto ino_it = st_cli.inode_config.find(op->cur_inode); |
|
|
|
|
if (ino_it != st_cli.inode_config.end() && |
|
|
|
|
ino_it->second.parent_id) |
|
|
|
|
{ |
|
|
|
|
// Continue reading from the parent inode
|
|
|
|
|
// FIXME: This obviously requires optimizations for long snapshot chains
|
|
|
|
|
op->cur_inode = ino_it->second.parent_id; |
|
|
|
|
op->parts.clear(); |
|
|
|
|
op->done_count = 0; |
|
|
|
|
goto resume_1; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
op->retval = op->len; |
|
|
|
|
std::function<void(cluster_op_t*)>(op->callback)(op); |
|
|
|
|
return 1; |
|
|
|
@ -590,51 +629,130 @@ resume_3: |
|
|
|
|
return 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void add_iov(int size, bool skip, cluster_op_t *op, int &iov_idx, size_t &iov_pos, osd_op_buf_list_t &iov, void *scrap, int scrap_len) |
|
|
|
|
{ |
|
|
|
|
int left = size; |
|
|
|
|
while (left > 0 && iov_idx < op->iov.count) |
|
|
|
|
{ |
|
|
|
|
int cur_left = op->iov.buf[iov_idx].iov_len - iov_pos; |
|
|
|
|
if (cur_left < left) |
|
|
|
|
{ |
|
|
|
|
if (!skip) |
|
|
|
|
{ |
|
|
|
|
iov.push_back(op->iov.buf[iov_idx].iov_base + iov_pos, cur_left); |
|
|
|
|
} |
|
|
|
|
left -= cur_left; |
|
|
|
|
iov_pos = 0; |
|
|
|
|
iov_idx++; |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
if (!skip) |
|
|
|
|
{ |
|
|
|
|
iov.push_back(op->iov.buf[iov_idx].iov_base + iov_pos, left); |
|
|
|
|
} |
|
|
|
|
iov_pos += left; |
|
|
|
|
left = 0; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
assert(left == 0); |
|
|
|
|
if (skip && scrap_len > 0) |
|
|
|
|
{ |
|
|
|
|
// All skipped ranges are read into the same useless buffer
|
|
|
|
|
left = size; |
|
|
|
|
while (left > 0) |
|
|
|
|
{ |
|
|
|
|
int cur_left = scrap_len < left ? scrap_len : left; |
|
|
|
|
iov.push_back(scrap, cur_left); |
|
|
|
|
left -= cur_left; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void cluster_client_t::slice_rw(cluster_op_t *op) |
|
|
|
|
{ |
|
|
|
|
// Slice the request into individual object stripe requests
|
|
|
|
|
// Primary OSDs still operate individual stripes, but their size is multiplied by PG minsize in case of EC
|
|
|
|
|
auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(op->inode)); |
|
|
|
|
auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(op->cur_inode)); |
|
|
|
|
uint32_t pg_data_size = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks); |
|
|
|
|
uint64_t pg_block_size = bs_block_size * pg_data_size; |
|
|
|
|
uint64_t first_stripe = (op->offset / pg_block_size) * pg_block_size; |
|
|
|
|
uint64_t last_stripe = ((op->offset + op->len + pg_block_size - 1) / pg_block_size - 1) * pg_block_size; |
|
|
|
|
op->retval = 0; |
|
|
|
|
op->parts.resize((last_stripe - first_stripe) / pg_block_size + 1); |
|
|
|
|
if (op->opcode == OSD_OP_READ) |
|
|
|
|
{ |
|
|
|
|
// Allocate memory for the bitmap
|
|
|
|
|
unsigned object_bitmap_size = ((op->len / bs_bitmap_granularity + 7) / 8); |
|
|
|
|
object_bitmap_size = (object_bitmap_size < 8 ? 8 : object_bitmap_size); |
|
|
|
|
unsigned bitmap_mem = object_bitmap_size + (bs_bitmap_size * pg_data_size) * op->parts.size(); |
|
|
|
|
if (op->bitmap_buf_size < bitmap_mem) |
|
|
|
|
{ |
|
|
|
|
op->bitmap_buf = realloc_or_die(op->bitmap_buf, bitmap_mem); |
|
|
|
|
if (!op->bitmap_buf_size) |
|
|
|
|
{ |
|
|
|
|
// First allocation
|
|
|
|
|
memset(op->bitmap_buf, 0, object_bitmap_size); |
|
|
|
|
} |
|
|
|
|
op->part_bitmaps = op->bitmap_buf + object_bitmap_size; |
|
|
|
|
op->bitmap_buf_size = bitmap_mem; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
int iov_idx = 0; |
|
|
|
|
size_t iov_pos = 0; |
|
|
|
|
int i = 0; |
|
|
|
|
for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size) |
|
|
|
|
{ |
|
|
|
|
pg_num_t pg_num = (op->inode + stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1; |
|
|
|
|
pg_num_t pg_num = (op->cur_inode + stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1; // like map_to_pg()
|
|
|
|
|
uint64_t begin = (op->offset < stripe ? stripe : op->offset); |
|
|
|
|
uint64_t end = (op->offset + op->len) > (stripe + pg_block_size) |
|
|
|
|
? (stripe + pg_block_size) : (op->offset + op->len); |
|
|
|
|
op->parts[i] = (cluster_op_part_t){ |
|
|
|
|
.parent = op, |
|
|
|
|
.offset = begin, |
|
|
|
|
.len = (uint32_t)(end - begin), |
|
|
|
|
.pg_num = pg_num, |
|
|
|
|
.flags = 0, |
|
|
|
|
}; |
|
|
|
|
int left = end-begin; |
|
|
|
|
while (left > 0 && iov_idx < op->iov.count) |
|
|
|
|
op->parts[i].iov.reset(); |
|
|
|
|
if (op->cur_inode != op->inode) |
|
|
|
|
{ |
|
|
|
|
if (op->iov.buf[iov_idx].iov_len - iov_pos < left) |
|
|
|
|
// Read remaining parts from upper layers
|
|
|
|
|
uint64_t prev = begin, cur = begin; |
|
|
|
|
bool skip_prev = true; |
|
|
|
|
while (cur < end) |
|
|
|
|
{ |
|
|
|
|
op->parts[i].iov.push_back(op->iov.buf[iov_idx].iov_base + iov_pos, op->iov.buf[iov_idx].iov_len - iov_pos); |
|
|
|
|
left -= (op->iov.buf[iov_idx].iov_len - iov_pos); |
|
|
|
|
iov_pos = 0; |
|
|
|
|
iov_idx++; |
|
|
|
|
unsigned bmp_loc = (cur - op->offset)/bs_bitmap_granularity; |
|
|
|
|
bool skip = (((*(uint8_t*)(op->bitmap_buf + bmp_loc/8)) >> (bmp_loc%8)) & 0x1); |
|
|
|
|
if (skip_prev != skip) |
|
|
|
|
{ |
|
|
|
|
if (cur > prev) |
|
|
|
|
{ |
|
|
|
|
if (prev == begin && skip_prev) |
|
|
|
|
{ |
|
|
|
|
begin = cur; |
|
|
|
|
// Just advance iov_idx & iov_pos
|
|
|
|
|
add_iov(cur-prev, true, op, iov_idx, iov_pos, op->parts[i].iov, NULL, 0); |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
add_iov(cur-prev, skip_prev, op, iov_idx, iov_pos, op->parts[i].iov, scrap_buffer, scrap_buffer_size); |
|
|
|
|
} |
|
|
|
|
skip_prev = skip; |
|
|
|
|
prev = cur; |
|
|
|
|
} |
|
|
|
|
cur += bs_bitmap_granularity; |
|
|
|
|
} |
|
|
|
|
assert(cur > prev); |
|
|
|
|
if (skip_prev) |
|
|
|
|
end = prev; |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
op->parts[i].iov.push_back(op->iov.buf[iov_idx].iov_base + iov_pos, left); |
|
|
|
|
iov_pos += left; |
|
|
|
|
left = 0; |
|
|
|
|
} |
|
|
|
|
add_iov(cur-prev, skip_prev, op, iov_idx, iov_pos, op->parts[i].iov, scrap_buffer, scrap_buffer_size); |
|
|
|
|
if (end == begin) |
|
|
|
|
op->done_count++; |
|
|
|
|
} |
|
|
|
|
assert(left == 0); |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
add_iov(end-begin, false, op, iov_idx, iov_pos, op->parts[i].iov, NULL, 0); |
|
|
|
|
} |
|
|
|
|
op->parts[i].parent = op; |
|
|
|
|
op->parts[i].offset = begin; |
|
|
|
|
op->parts[i].len = (uint32_t)(end - begin); |
|
|
|
|
op->parts[i].pg_num = pg_num; |
|
|
|
|
op->parts[i].osd_num = 0; |
|
|
|
|
op->parts[i].flags = 0; |
|
|
|
|
i++; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -661,7 +779,7 @@ bool cluster_client_t::affects_osd(uint64_t inode, uint64_t offset, uint64_t len |
|
|
|
|
bool cluster_client_t::try_send(cluster_op_t *op, int i) |
|
|
|
|
{ |
|
|
|
|
auto part = &op->parts[i]; |
|
|
|
|
auto & pool_cfg = st_cli.pool_config[INODE_POOL(op->inode)]; |
|
|
|
|
auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(op->cur_inode)); |
|
|
|
|
auto pg_it = pool_cfg.pg_config.find(part->pg_num); |
|
|
|
|
if (pg_it != pool_cfg.pg_config.end() && |
|
|
|
|
!pg_it->second.pause && pg_it->second.cur_primary) |
|
|
|
@ -674,6 +792,9 @@ bool cluster_client_t::try_send(cluster_op_t *op, int i) |
|
|
|
|
part->osd_num = primary_osd; |
|
|
|
|
part->flags |= PART_SENT; |
|
|
|
|
op->inflight_count++; |
|
|
|
|
uint64_t pg_bitmap_size = bs_bitmap_size * ( |
|
|
|
|
pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks |
|
|
|
|
); |
|
|
|
|
part->op = (osd_op_t){ |
|
|
|
|
.op_type = OSD_OP_OUT, |
|
|
|
|
.peer_fd = peer_fd, |
|
|
|
@ -683,12 +804,12 @@ bool cluster_client_t::try_send(cluster_op_t *op, int i) |
|
|
|
|
.id = op_id++, |
|
|
|
|
.opcode = op->opcode, |
|
|
|
|
}, |
|
|
|
|
.inode = op->inode, |
|
|
|
|
.inode = op->cur_inode, |
|
|
|
|
.offset = part->offset, |
|
|
|
|
.len = part->len, |
|
|
|
|
} }, |
|
|
|
|
.bitmap = scrap_bitmap, |
|
|
|
|
.bitmap_len = scrap_bitmap_size, |
|
|
|
|
.bitmap = op->opcode == OSD_OP_WRITE ? NULL : op->part_bitmaps + pg_bitmap_size*i, |
|
|
|
|
.bitmap_len = (unsigned)(op->opcode == OSD_OP_WRITE ? 0 : pg_bitmap_size), |
|
|
|
|
.callback = [this, part](osd_op_t *op_part) |
|
|
|
|
{ |
|
|
|
|
handle_op_part(part); |
|
|
|
@ -817,6 +938,16 @@ void cluster_client_t::send_sync(cluster_op_t *op, cluster_op_part_t *part) |
|
|
|
|
msgr.outbox_push(&part->op); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static inline void mem_or(void *res, const void *r2, unsigned int len) |
|
|
|
|
{ |
|
|
|
|
unsigned int i; |
|
|
|
|
for (i = 0; i < len; ++i) |
|
|
|
|
{ |
|
|
|
|
// Hope the compiler vectorizes this
|
|
|
|
|
((uint8_t*)res)[i] = ((uint8_t*)res)[i] | ((uint8_t*)r2)[i]; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void cluster_client_t::handle_op_part(cluster_op_part_t *part) |
|
|
|
|
{ |
|
|
|
|
cluster_op_t *op = part->parent; |
|
|
|
@ -856,9 +987,43 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part) |
|
|
|
|
dirty_osds.insert(part->osd_num); |
|
|
|
|
part->flags |= PART_DONE; |
|
|
|
|
op->done_count++; |
|
|
|
|
if (op->opcode == OSD_OP_READ) |
|
|
|
|
{ |
|
|
|
|
copy_part_bitmap(op, part); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (op->inflight_count == 0) |
|
|
|
|
{ |
|
|
|
|
continue_ops(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void cluster_client_t::copy_part_bitmap(cluster_op_t *op, cluster_op_part_t *part) |
|
|
|
|
{ |
|
|
|
|
// Copy (OR) bitmap
|
|
|
|
|
auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(op->cur_inode)); |
|
|
|
|
uint32_t pg_block_size = bs_block_size * ( |
|
|
|
|
pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks |
|
|
|
|
); |
|
|
|
|
uint32_t object_offset = (part->op.req.rw.offset - op->offset) / bs_bitmap_granularity; |
|
|
|
|
uint32_t part_offset = (part->op.req.rw.offset % pg_block_size) / bs_bitmap_granularity; |
|
|
|
|
uint32_t part_len = part->op.req.rw.len / bs_bitmap_granularity; |
|
|
|
|
if (!(object_offset & 0x7) && !(part_offset & 0x7) && (part_len >= 8)) |
|
|
|
|
{ |
|
|
|
|
// Copy bytes
|
|
|
|
|
mem_or(op->bitmap_buf + object_offset/8, part->op.bitmap + part_offset/8, part_len/8); |
|
|
|
|
object_offset += (part_len & ~0x7); |
|
|
|
|
part_offset += (part_len & ~0x7); |
|
|
|
|
part_len = (part_len & 0x7); |
|
|
|
|
} |
|
|
|
|
while (part_len > 0) |
|
|
|
|
{ |
|
|
|
|
// Copy bits
|
|
|
|
|
(*(uint8_t*)(op->bitmap_buf + (object_offset >> 3))) |= ( |
|
|
|
|
(((*(uint8_t*)(part->op.bitmap + (part_offset >> 3))) >> (part_offset & 0x7)) & 0x1) << (object_offset & 0x7) |
|
|
|
|
); |
|
|
|
|
part_offset++; |
|
|
|
|
object_offset++; |
|
|
|
|
part_len--; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|