// Copyright (c) Vitaliy Filippov, 2019+ // License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details) #include #include #include "cluster_client_impl.h" #include "http_client.h" // json_is_true cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json config) { wb = new writeback_cache_t(); cli_config = config.object_items(); file_config = osd_messenger_t::read_config(config); config = osd_messenger_t::merge_configs(cli_config, file_config, etcd_global_config, {}); this->ringloop = ringloop; this->tfd = tfd; msgr.osd_num = 0; msgr.tfd = tfd; msgr.ringloop = ringloop; msgr.repeer_pgs = [this](osd_num_t peer_osd) { if (msgr.osd_peer_fds.find(peer_osd) != msgr.osd_peer_fds.end()) { // peer_osd just connected // retry operations waiting for connection immediately continue_ops(client_retry_interval); continue_lists(); continue_raw_ops(peer_osd); } else { // peer_osd just dropped connection // determine WHICH dirty_buffers are now obsolete and repeat them if (wb->repeat_ops_for(this, peer_osd) > 0) { continue_ops(); } } }; msgr.exec_op = [this](osd_op_t *op) { // Garbage in fprintf(stderr, "Incoming garbage from peer %d\n", op->peer_fd); msgr.stop_client(op->peer_fd); delete op; }; msgr.parse_config(config); st_cli.tfd = tfd; st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); }; st_cli.on_change_osd_state_hook = [this](uint64_t peer_osd) { on_change_osd_state_hook(peer_osd); }; st_cli.on_change_hook = [this](std::map & changes) { on_change_hook(changes); }; st_cli.on_load_pgs_hook = [this](bool success) { on_load_pgs_hook(success); }; st_cli.on_reload_hook = [this]() { st_cli.load_global_config(); }; st_cli.parse_config(config); st_cli.load_global_config(); scrap_buffer_size = SCRAP_BUFFER_SIZE; scrap_buffer = malloc_or_die(scrap_buffer_size); } cluster_client_t::~cluster_client_t() { msgr.repeer_pgs = [](osd_num_t){}; if (ringloop) { ringloop->unregister_consumer(&consumer); } free(scrap_buffer); delete wb; wb = NULL; } cluster_op_t::~cluster_op_t() { if (buf) { free(buf); buf = NULL; } if (bitmap_buf) { free(bitmap_buf); part_bitmaps = NULL; bitmap_buf = NULL; } } void cluster_client_t::continue_raw_ops(osd_num_t peer_osd) { auto it = raw_ops.find(peer_osd); while (it != raw_ops.end() && it->first == peer_osd) { auto op = it->second; op->op_type = OSD_OP_OUT; op->peer_fd = msgr.osd_peer_fds.at(peer_osd); msgr.outbox_push(op); raw_ops.erase(it++); } } void cluster_client_t::init_msgr() { if (msgr_initialized) return; msgr.init(); msgr_initialized = true; if (ringloop) { consumer.loop = [this]() { msgr.read_requests(); msgr.send_replies(); this->ringloop->submit(); }; ringloop->register_consumer(&consumer); } } void cluster_client_t::unshift_op(cluster_op_t *op) { op->next = op_queue_head; if (op_queue_head) { op_queue_head->prev = op; op_queue_head = op; } else op_queue_tail = op_queue_head = op; inc_wait(op->opcode, op->flags, op->next, 1); } void cluster_client_t::calc_wait(cluster_op_t *op) { op->prev_wait = 0; if (op->opcode == OSD_OP_WRITE) { for (auto prev = op->prev; prev; prev = prev->prev) { if (prev->opcode == OSD_OP_SYNC || prev->opcode == OSD_OP_WRITE && !(op->flags & OP_FLUSH_BUFFER) && (prev->flags & OP_FLUSH_BUFFER)) { op->prev_wait++; } } if (!op->prev_wait) continue_rw(op); } else if (op->opcode == OSD_OP_SYNC) { for (auto prev = op->prev; prev; prev = prev->prev) { if (prev->opcode == OSD_OP_SYNC || prev->opcode == OSD_OP_WRITE && (!(prev->flags & OP_IMMEDIATE_COMMIT) || enable_writeback)) { op->prev_wait++; } } if (!op->prev_wait) continue_sync(op); } else /* if (op->opcode == OSD_OP_READ || op->opcode == OSD_OP_READ_BITMAP || op->opcode == OSD_OP_READ_CHAIN_BITMAP) */ { continue_rw(op); } } void cluster_client_t::inc_wait(uint64_t opcode, uint64_t flags, cluster_op_t *next, int inc) { if (opcode != OSD_OP_WRITE && opcode != OSD_OP_SYNC) { return; } cluster_op_t *bh_ops_local[32], **bh_ops = bh_ops_local; int bh_op_count = 0, bh_op_max = 32; while (next) { auto n2 = next->next; if (opcode == OSD_OP_WRITE ? (next->opcode == OSD_OP_SYNC && (!(flags & OP_IMMEDIATE_COMMIT) || enable_writeback) || next->opcode == OSD_OP_WRITE && (flags & OP_FLUSH_BUFFER) && !(next->flags & OP_FLUSH_BUFFER)) : (next->opcode == OSD_OP_SYNC || next->opcode == OSD_OP_WRITE)) { next->prev_wait += inc; assert(next->prev_wait >= 0); if (!next->prev_wait) { // Kind of std::vector with local "small vector optimisation" if (bh_op_count >= bh_op_max) { bh_op_max *= 2; cluster_op_t **n = (cluster_op_t**)malloc_or_die(sizeof(cluster_op_t*) * bh_op_max); memcpy(n, bh_ops, sizeof(cluster_op_t*) * bh_op_count); if (bh_ops != bh_ops_local) { free(bh_ops); } bh_ops = n; } bh_ops[bh_op_count++] = next; } } next = n2; } for (int i = 0; i < bh_op_count; i++) { cluster_op_t *next = bh_ops[i]; if (next->opcode == OSD_OP_SYNC) continue_sync(next); else continue_rw(next); } if (bh_ops != bh_ops_local) { free(bh_ops); } } void cluster_client_t::erase_op(cluster_op_t *op) { uint64_t opcode = op->opcode, flags = op->flags; cluster_op_t *next = op->next; if (op->prev) op->prev->next = op->next; if (op->next) op->next->prev = op->prev; if (op_queue_head == op) op_queue_head = op->next; if (op_queue_tail == op) op_queue_tail = op->prev; op->next = op->prev = NULL; if (flags & OP_FLUSH_BUFFER) { // Completed flushes change writeback buffer states, // so the callback should be run before inc_wait() // which may continue following SYNCs, but these SYNCs // should know about the changed buffer state // This is ugly but this is the way we do it auto cb = std::move(op->callback); cb(op); } if (!(flags & OP_IMMEDIATE_COMMIT) || enable_writeback) { inc_wait(opcode, flags, next, -1); } if (!(flags & OP_FLUSH_BUFFER)) { // Call callback at the end to avoid inconsistencies in prev_wait // if the callback adds more operations itself auto cb = std::move(op->callback); cb(op); } if (flags & OP_FLUSH_BUFFER) { int i = 0; while (i < wb->writeback_overflow.size() && wb->writebacks_active < client_max_writeback_iodepth) { execute_internal(wb->writeback_overflow[i]); i++; } if (i > 0) { wb->writeback_overflow.erase(wb->writeback_overflow.begin(), wb->writeback_overflow.begin()+i); } } } void cluster_client_t::continue_ops(int time_passed) { if (!pgs_loaded) { // We're offline return; } if (continuing_ops) { // Attempt to reenter the function return; } int reset_duration = 0; restart: continuing_ops = 1; for (auto op = op_queue_head; op; ) { cluster_op_t *next_op = op->next; if (op->retry_after && time_passed) { op->retry_after = op->retry_after > time_passed ? op->retry_after-time_passed : 0; if (op->retry_after && (!reset_duration || op->retry_after < reset_duration)) { reset_duration = op->retry_after; } } if (!op->retry_after && !op->prev_wait) { if (op->opcode == OSD_OP_SYNC) continue_sync(op); else continue_rw(op); } op = next_op; if (continuing_ops == 2) { goto restart; } } continuing_ops = 0; reset_retry_timer(reset_duration); } void cluster_client_t::reset_retry_timer(int new_duration) { if (retry_timeout_duration && retry_timeout_duration <= new_duration || !new_duration) { return; } if (retry_timeout_id) { tfd->clear_timer(retry_timeout_id); } retry_timeout_duration = new_duration; retry_timeout_id = tfd->set_timer(retry_timeout_duration, false, [this](int) { int time_passed = retry_timeout_duration; retry_timeout_id = 0; retry_timeout_duration = 0; continue_ops(time_passed); }); } void cluster_client_t::on_load_config_hook(json11::Json::object & etcd_global_config) { this->etcd_global_config = etcd_global_config; config = osd_messenger_t::merge_configs(cli_config, file_config, etcd_global_config, {}); // client_max_dirty_bytes/client_dirty_limit if (config.find("client_max_dirty_bytes") != config.end()) { client_max_dirty_bytes = config["client_max_dirty_bytes"].uint64_value(); } else if (config.find("client_dirty_limit") != config.end()) { // Old name client_max_dirty_bytes = config["client_dirty_limit"].uint64_value(); } else client_max_dirty_bytes = 0; if (!client_max_dirty_bytes) { client_max_dirty_bytes = DEFAULT_CLIENT_MAX_DIRTY_BYTES; } // client_max_dirty_ops client_max_dirty_ops = config["client_max_dirty_ops"].uint64_value(); if (!client_max_dirty_ops) { client_max_dirty_ops = DEFAULT_CLIENT_MAX_DIRTY_OPS; } // client_enable_writeback enable_writeback = json_is_true(config["client_enable_writeback"]) && json_is_true(config["client_writeback_allowed"]); // client_max_buffered_bytes client_max_buffered_bytes = config["client_max_buffered_bytes"].uint64_value(); if (!client_max_buffered_bytes) { client_max_buffered_bytes = DEFAULT_CLIENT_MAX_BUFFERED_BYTES; } // client_max_buffered_ops client_max_buffered_ops = config["client_max_buffered_ops"].uint64_value(); if (!client_max_buffered_ops) { client_max_buffered_ops = DEFAULT_CLIENT_MAX_BUFFERED_OPS; } // client_max_writeback_iodepth client_max_writeback_iodepth = config["client_max_writeback_iodepth"].uint64_value(); if (!client_max_writeback_iodepth) { client_max_writeback_iodepth = DEFAULT_CLIENT_MAX_WRITEBACK_IODEPTH; } // client_retry_interval client_retry_interval = config["client_retry_interval"].uint64_value(); if (!client_retry_interval) { client_retry_interval = 50; } else if (client_retry_interval < 10) { client_retry_interval = 10; } // client_eio_retry_interval client_eio_retry_interval = 1000; if (!config["client_eio_retry_interval"].is_null()) { client_eio_retry_interval = config["client_eio_retry_interval"].uint64_value(); if (client_eio_retry_interval && client_eio_retry_interval < 10) { client_eio_retry_interval = 10; } } // client_retry_enospc client_retry_enospc = config["client_retry_enospc"].is_null() ? true : config["client_retry_enospc"].bool_value(); // log_level log_level = config["log_level"].uint64_value(); msgr.parse_config(config); st_cli.parse_config(config); st_cli.load_pgs(); } void cluster_client_t::on_load_pgs_hook(bool success) { for (auto pool_item: st_cli.pool_config) { pg_counts[pool_item.first] = pool_item.second.real_pg_count; } pgs_loaded = true; for (auto fn: on_ready_hooks) { fn(); } on_ready_hooks.clear(); for (auto op: offline_ops) { execute(op); } offline_ops.clear(); continue_ops(); } void cluster_client_t::on_change_hook(std::map & changes) { for (auto pool_item: st_cli.pool_config) { if (pg_counts[pool_item.first] != pool_item.second.real_pg_count) { // At this point, all pool operations should have been suspended // And now they have to be resliced! for (auto op = op_queue_head; op; op = op->next) { if ((op->opcode == OSD_OP_WRITE || op->opcode == OSD_OP_READ || op->opcode == OSD_OP_READ_BITMAP || op->opcode == OSD_OP_READ_CHAIN_BITMAP) && INODE_POOL(op->cur_inode) == pool_item.first) { op->needs_reslice = true; } } pg_counts[pool_item.first] = pool_item.second.real_pg_count; } } continue_ops(); } bool cluster_client_t::get_immediate_commit(uint64_t inode) { if (enable_writeback) return false; pool_id_t pool_id = INODE_POOL(inode); if (!pool_id) return true; auto pool_it = st_cli.pool_config.find(pool_id); if (pool_it == st_cli.pool_config.end()) return true; return pool_it->second.immediate_commit == IMMEDIATE_ALL; } void cluster_client_t::on_change_osd_state_hook(uint64_t peer_osd) { if (msgr.wanted_peers.find(peer_osd) != msgr.wanted_peers.end()) { msgr.connect_peer(peer_osd, st_cli.peer_states[peer_osd]); } } bool cluster_client_t::is_ready() { return pgs_loaded; } void cluster_client_t::on_ready(std::function fn) { if (pgs_loaded) { fn(); } else { on_ready_hooks.push_back(fn); } } bool cluster_client_t::flush() { if (!ringloop) { if (wb->writeback_queue.size()) { wb->start_writebacks(this, 0); cluster_op_t *sync = new cluster_op_t; sync->opcode = OSD_OP_SYNC; sync->callback = [](cluster_op_t *sync) { delete sync; }; execute(sync); } return op_queue_head == NULL; } bool sync_done = false; cluster_op_t *sync = new cluster_op_t; sync->opcode = OSD_OP_SYNC; sync->callback = [&sync_done](cluster_op_t *sync) { delete sync; sync_done = true; }; execute(sync); while (!sync_done) { ringloop->loop(); if (!sync_done) ringloop->wait(); } return true; } /** * How writes are synced when immediate_commit is false * * "Continue" WRITE: * 1) if the operation is not sliced yet - slice it * 2) if the operation doesn't require reslice - try to connect & send all remaining parts * 3) if any of them fail due to disconnected peers or PGs not up, repeat after reconnecting or small timeout * 4) if any of them fail due to other errors, fail the operation and forget it from the current "unsynced batch" * 5) if PG count changes before all parts are done, wait for all in-progress parts to finish, * throw all results away, reslice and resubmit op * 6) when all parts are done, try to "continue" the current SYNC * 7) if the operation succeeds, but then some OSDs drop their connections, repeat * parts from the current "unsynced batch" previously sent to those OSDs in any order * * "Continue" current SYNC: * 1) take all unsynced operations from the current batch * 2) check if all affected OSDs are still alive * 3) if yes, send all SYNCs. otherwise, leave current SYNC as is. * 4) if any of them fail due to disconnected peers, repeat SYNC after repeating all writes * 5) if any of them fail due to other errors, fail the SYNC operation * * If writeback caching is turned on and writeback limit is not exhausted: * data is just copied and the write is confirmed to the client. */ void cluster_client_t::execute(cluster_op_t *op) { if (op->opcode != OSD_OP_SYNC && op->opcode != OSD_OP_READ && op->opcode != OSD_OP_READ_BITMAP && op->opcode != OSD_OP_READ_CHAIN_BITMAP && op->opcode != OSD_OP_WRITE) { op->retval = -EINVAL; auto cb = std::move(op->callback); cb(op); return; } if (!pgs_loaded) { offline_ops.push_back(op); return; } op->flags = op->flags & OSD_OP_IGNORE_READONLY; // the only allowed flag execute_internal(op); } void cluster_client_t::execute_internal(cluster_op_t *op) { op->cur_inode = op->inode; op->retval = 0; // check alignment, readonly flag and so on if (!check_rw(op)) { return; } if (op->opcode == OSD_OP_WRITE && enable_writeback && !(op->flags & OP_FLUSH_BUFFER) && !op->version /* no CAS writeback */) { if (wb->writebacks_active >= client_max_writeback_iodepth) { // Writeback queue is full, postpone the operation wb->writeback_overflow.push_back(op); return; } // Just copy and acknowledge the operation wb->copy_write(op, CACHE_DIRTY); while (wb->writeback_bytes + op->len > client_max_buffered_bytes || wb->writeback_queue_size > client_max_buffered_ops) { // Initiate some writeback (asynchronously) wb->start_writebacks(this, 1); } op->retval = op->len; auto cb = std::move(op->callback); cb(op); return; } if (op->opcode == OSD_OP_WRITE && !(op->flags & OP_IMMEDIATE_COMMIT)) { if (!(op->flags & OP_FLUSH_BUFFER) && !op->version /* no CAS write-repeat */) { wb->copy_write(op, CACHE_WRITTEN); } if (dirty_bytes >= client_max_dirty_bytes || dirty_ops >= client_max_dirty_ops) { // Push an extra SYNC operation to flush previous writes cluster_op_t *sync_op = new cluster_op_t; sync_op->opcode = OSD_OP_SYNC; sync_op->flags = OP_FLUSH_BUFFER; sync_op->callback = [](cluster_op_t* sync_op) { delete sync_op; }; execute_internal(sync_op); } dirty_bytes += op->len; dirty_ops++; } else if (op->opcode == OSD_OP_SYNC) { // Flush the whole write-back queue first if (!(op->flags & OP_FLUSH_BUFFER) && wb->writeback_overflow.size() > 0) { // Writeback queue is full, postpone the operation wb->writeback_overflow.push_back(op); return; } if (wb->writeback_queue.size()) { wb->start_writebacks(this, 0); } dirty_bytes = 0; dirty_ops = 0; } op->prev = op_queue_tail; if (op_queue_tail) { op_queue_tail->next = op; op_queue_tail = op; } else op_queue_tail = op_queue_head = op; if (!(op->flags & OP_IMMEDIATE_COMMIT) || enable_writeback) calc_wait(op); else { if (op->opcode == OSD_OP_SYNC) continue_sync(op); else continue_rw(op); } } bool cluster_client_t::check_rw(cluster_op_t *op) { if (op->opcode == OSD_OP_SYNC) { return true; } pool_id_t pool_id = INODE_POOL(op->cur_inode); if (!pool_id) { op->retval = -EINVAL; auto cb = std::move(op->callback); cb(op); return false; } auto pool_it = st_cli.pool_config.find(pool_id); if (pool_it == st_cli.pool_config.end() || pool_it->second.real_pg_count == 0) { // Pools are loaded, but this one is unknown op->retval = -EINVAL; auto cb = std::move(op->callback); cb(op); return false; } // Check alignment if (!op->len && (op->opcode == OSD_OP_READ_BITMAP || op->opcode == OSD_OP_READ_CHAIN_BITMAP || op->opcode == OSD_OP_WRITE) || op->offset % pool_it->second.bitmap_granularity || op->len % pool_it->second.bitmap_granularity) { op->retval = -EINVAL; auto cb = std::move(op->callback); cb(op); return false; } if (pool_it->second.immediate_commit == IMMEDIATE_ALL) { op->flags |= OP_IMMEDIATE_COMMIT; } if ((op->opcode == OSD_OP_WRITE || op->opcode == OSD_OP_DELETE) && !(op->flags & OSD_OP_IGNORE_READONLY)) { auto ino_it = st_cli.inode_config.find(op->inode); if (ino_it != st_cli.inode_config.end() && ino_it->second.readonly) { op->retval = -EROFS; auto cb = std::move(op->callback); cb(op); return false; } } return true; } void cluster_client_t::execute_raw(osd_num_t osd_num, osd_op_t *op) { auto fd_it = msgr.osd_peer_fds.find(osd_num); if (fd_it != msgr.osd_peer_fds.end()) { op->op_type = OSD_OP_OUT; op->peer_fd = fd_it->second; msgr.outbox_push(op); } else { if (msgr.wanted_peers.find(osd_num) == msgr.wanted_peers.end()) msgr.connect_peer(osd_num, st_cli.peer_states[osd_num]); raw_ops.emplace(osd_num, op); } } int cluster_client_t::continue_rw(cluster_op_t *op) { if (op->state == 0) goto resume_0; else if (op->state == 1) goto resume_1; else if (op->state == 2) goto resume_2; resume_0: // Slice the operation into parts slice_rw(op); op->needs_reslice = false; if ((op->opcode == OSD_OP_WRITE || op->opcode == OSD_OP_DELETE) && op->version && op->parts.size() > 1) { // Atomic writes to multiple stripes are unsupported op->retval = -EINVAL; erase_op(op); return 1; } resume_1: // Send unsent parts, if they're not subject to change op->state = 2; if (op->needs_reslice) { for (int i = 0; i < op->parts.size(); i++) { if (!(op->parts[i].flags & PART_SENT) && op->retval) { op->retval = -EPIPE; } } goto resume_2; } // Protect from try_send completing the operation immediately op->inflight_count++; for (int i = 0; i < op->parts.size(); i++) { if (!(op->parts[i].flags & PART_SENT)) { if (!try_send(op, i)) { // We'll need to retry again if (op->parts[i].flags & PART_RETRY) { op->retry_after = client_retry_interval; reset_retry_timer(client_retry_interval); } op->state = 1; } } } op->inflight_count--; if (op->state == 1) { // Some suboperations have to be resent return 0; } resume_2: if (op->inflight_count > 0) { op->state = 2; return 0; } if (op->done_count >= op->parts.size()) { // Finished successfully // Even if the PG count has changed in meanwhile we treat it as success // because if some operations were invalid for the new PG count we'd get errors if (op->opcode == OSD_OP_READ || op->opcode == OSD_OP_READ_CHAIN_BITMAP) { // Check parent inode auto ino_it = st_cli.inode_config.find(op->cur_inode); while (ino_it != st_cli.inode_config.end() && ino_it->second.parent_id && INODE_POOL(ino_it->second.parent_id) == INODE_POOL(op->cur_inode) && // Check for loops ino_it->second.parent_id != op->inode) { // Skip parents from the same pool ino_it = st_cli.inode_config.find(ino_it->second.parent_id); } if (ino_it != st_cli.inode_config.end() && ino_it->second.parent_id && ino_it->second.parent_id != op->inode) { // Continue reading from the parent inode op->cur_inode = ino_it->second.parent_id; op->parts.clear(); op->done_count = 0; goto resume_0; } } op->retval = op->len; if (op->opcode == OSD_OP_READ_BITMAP || op->opcode == OSD_OP_READ_CHAIN_BITMAP) { auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(op->inode)); op->retval = op->len / pool_cfg.bitmap_granularity; } erase_op(op); return 1; } else if (op->retval != 0 && !(op->flags & OP_FLUSH_BUFFER) && op->retval != -EPIPE && (op->retval != -EIO || !client_eio_retry_interval) && (op->retval != -ENOSPC || !client_retry_enospc)) { // Fatal error (neither -EPIPE, -EIO nor -ENOSPC) erase_op(op); return 1; } else { // Non-fatal error - clear the error and retry op->retval = 0; if (op->needs_reslice) { op->parts.clear(); op->done_count = 0; goto resume_0; } else { for (int i = 0; i < op->parts.size(); i++) { if (!(op->parts[i].flags & PART_DONE)) { op->parts[i].flags = PART_RETRY; } } goto resume_1; } } return 0; } static void add_iov(int size, bool skip, cluster_op_t *op, int &iov_idx, size_t &iov_pos, osd_op_buf_list_t &iov, void *scrap, int scrap_len) { int left = size; while (left > 0 && iov_idx < op->iov.count) { int cur_left = op->iov.buf[iov_idx].iov_len - iov_pos; if (cur_left < left) { if (!skip) { iov.push_back((uint8_t*)op->iov.buf[iov_idx].iov_base + iov_pos, cur_left); } left -= cur_left; iov_pos = 0; iov_idx++; } else { if (!skip) { iov.push_back((uint8_t*)op->iov.buf[iov_idx].iov_base + iov_pos, left); } iov_pos += left; left = 0; } } assert(left == 0); if (skip && scrap_len > 0) { // All skipped ranges are read into the same useless buffer left = size; while (left > 0) { int cur_left = scrap_len < left ? scrap_len : left; iov.push_back(scrap, cur_left); left -= cur_left; } } } void cluster_client_t::slice_rw(cluster_op_t *op) { // Slice the request into individual object stripe requests // Primary OSDs still operate individual stripes, but their size is multiplied by PG minsize in case of EC auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(op->cur_inode)); uint32_t pg_data_size = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks); uint64_t pg_block_size = pool_cfg.data_block_size * pg_data_size; uint64_t first_stripe = (op->offset / pg_block_size) * pg_block_size; uint64_t last_stripe = op->len > 0 ? ((op->offset + op->len - 1) / pg_block_size) * pg_block_size : first_stripe; op->retval = 0; op->parts.resize((last_stripe - first_stripe) / pg_block_size + 1); if (op->opcode == OSD_OP_READ || op->opcode == OSD_OP_READ_BITMAP || op->opcode == OSD_OP_READ_CHAIN_BITMAP) { // Allocate memory for the bitmap unsigned object_bitmap_size = ((op->len / pool_cfg.bitmap_granularity + 7) / 8); object_bitmap_size = (object_bitmap_size < 8 ? 8 : object_bitmap_size); unsigned bitmap_mem = object_bitmap_size + (pool_cfg.data_block_size / pool_cfg.bitmap_granularity / 8 * pg_data_size) * op->parts.size(); if (!op->bitmap_buf || op->bitmap_buf_size < bitmap_mem) { op->bitmap_buf = realloc_or_die(op->bitmap_buf, bitmap_mem); op->part_bitmaps = (uint8_t*)op->bitmap_buf + object_bitmap_size; op->bitmap_buf_size = bitmap_mem; } memset(op->bitmap_buf, 0, bitmap_mem); } int iov_idx = 0; size_t iov_pos = 0; int i = 0; // We also have to return reads from CACHE_REPEATING buffers - they are not // guaranteed to be present on target OSDs at the moment of repeating // And we're also free to return data from other cached buffers just // because it's faster bool dirty_copied = wb->read_from_cache(op, pool_cfg.bitmap_granularity); for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size) { pg_num_t pg_num = (stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1; // like map_to_pg() uint64_t begin = (op->offset < stripe ? stripe : op->offset); uint64_t end = (op->offset + op->len) > (stripe + pg_block_size) ? (stripe + pg_block_size) : (op->offset + op->len); op->parts[i].iov.reset(); op->parts[i].flags = 0; if (op->cur_inode != op->inode || op->opcode == OSD_OP_READ && dirty_copied) { // Read remaining parts from upper layers uint64_t prev = begin, cur = begin; bool skip_prev = true; while (cur < end) { unsigned bmp_loc = (cur - op->offset)/pool_cfg.bitmap_granularity; bool skip = (((*((uint8_t*)op->bitmap_buf + bmp_loc/8)) >> (bmp_loc%8)) & 0x1); if (skip_prev != skip) { if (cur > prev) { if (prev == begin && skip_prev) { begin = cur; // Just advance iov_idx & iov_pos add_iov(cur-prev, true, op, iov_idx, iov_pos, op->parts[i].iov, NULL, 0); } else add_iov(cur-prev, skip_prev, op, iov_idx, iov_pos, op->parts[i].iov, scrap_buffer, scrap_buffer_size); } skip_prev = skip; prev = cur; } cur += pool_cfg.bitmap_granularity; } assert(cur > prev); if (skip_prev) { // Just advance iov_idx & iov_pos add_iov(end-prev, true, op, iov_idx, iov_pos, op->parts[i].iov, NULL, 0); end = prev; } else add_iov(cur-prev, skip_prev, op, iov_idx, iov_pos, op->parts[i].iov, scrap_buffer, scrap_buffer_size); if (end == begin) { op->done_count++; op->parts[i].flags = PART_DONE; } } else if (op->opcode != OSD_OP_READ_BITMAP && op->opcode != OSD_OP_READ_CHAIN_BITMAP && op->opcode != OSD_OP_DELETE) { add_iov(end-begin, false, op, iov_idx, iov_pos, op->parts[i].iov, NULL, 0); } op->parts[i].parent = op; op->parts[i].offset = begin; op->parts[i].len = op->opcode == OSD_OP_READ_BITMAP || op->opcode == OSD_OP_READ_CHAIN_BITMAP || op->opcode == OSD_OP_DELETE ? 0 : (uint32_t)(end - begin); op->parts[i].pg_num = pg_num; op->parts[i].osd_num = 0; i++; } } bool cluster_client_t::affects_osd(uint64_t inode, uint64_t offset, uint64_t len, osd_num_t osd) { auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(inode)); uint32_t pg_data_size = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks); uint64_t pg_block_size = pool_cfg.data_block_size * pg_data_size; uint64_t first_stripe = (offset / pg_block_size) * pg_block_size; uint64_t last_stripe = len > 0 ? ((offset + len - 1) / pg_block_size) * pg_block_size : first_stripe; for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size) { pg_num_t pg_num = (stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1; // like map_to_pg() auto pg_it = pool_cfg.pg_config.find(pg_num); if (pg_it != pool_cfg.pg_config.end() && pg_it->second.cur_primary == osd) { return true; } } return false; } bool cluster_client_t::try_send(cluster_op_t *op, int i) { if (!msgr_initialized) { init_msgr(); } auto part = &op->parts[i]; auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(op->cur_inode)); auto pg_it = pool_cfg.pg_config.find(part->pg_num); if (pg_it != pool_cfg.pg_config.end() && !pg_it->second.pause && pg_it->second.cur_primary) { osd_num_t primary_osd = pg_it->second.cur_primary; auto peer_it = msgr.osd_peer_fds.find(primary_osd); if (peer_it != msgr.osd_peer_fds.end()) { int peer_fd = peer_it->second; part->osd_num = primary_osd; part->flags |= PART_SENT; op->inflight_count++; uint64_t pg_bitmap_size = (pool_cfg.data_block_size / pool_cfg.bitmap_granularity / 8) * ( pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks ); uint64_t meta_rev = 0; if (op->opcode != OSD_OP_READ_BITMAP && op->opcode != OSD_OP_READ_CHAIN_BITMAP && op->opcode != OSD_OP_DELETE) { auto ino_it = st_cli.inode_config.find(op->inode); if (ino_it != st_cli.inode_config.end()) meta_rev = ino_it->second.mod_revision; } part->op = (osd_op_t){ .op_type = OSD_OP_OUT, .peer_fd = peer_fd, .req = { .rw = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, .id = next_op_id(), .opcode = op->opcode == OSD_OP_READ_BITMAP || op->opcode == OSD_OP_READ_CHAIN_BITMAP ? OSD_OP_READ : op->opcode, }, .inode = op->cur_inode, .offset = part->offset, .len = part->len, .meta_revision = meta_rev, .version = op->opcode == OSD_OP_WRITE || op->opcode == OSD_OP_DELETE ? op->version : 0, } }, .bitmap = (op->opcode == OSD_OP_READ || op->opcode == OSD_OP_READ_BITMAP || op->opcode == OSD_OP_READ_CHAIN_BITMAP ? (uint8_t*)op->part_bitmaps + pg_bitmap_size*i : NULL), .bitmap_len = (unsigned)(op->opcode == OSD_OP_READ || op->opcode == OSD_OP_READ_BITMAP || op->opcode == OSD_OP_READ_CHAIN_BITMAP ? pg_bitmap_size : 0), .callback = [this, part](osd_op_t *op_part) { handle_op_part(part); }, }; part->op.iov = part->iov; msgr.outbox_push(&part->op); return true; } else if (msgr.wanted_peers.find(primary_osd) == msgr.wanted_peers.end()) { msgr.connect_peer(primary_osd, st_cli.peer_states[primary_osd]); } } return false; } int cluster_client_t::continue_sync(cluster_op_t *op) { if (op->state == 1) goto resume_1; if (!dirty_osds.size()) { // Sync is not required in the immediate_commit mode or if there are no dirty_osds op->retval = 0; erase_op(op); return 1; } // Check that all OSD connections are still alive for (auto do_it = dirty_osds.begin(); do_it != dirty_osds.end(); ) { osd_num_t sync_osd = *do_it; auto peer_it = msgr.osd_peer_fds.find(sync_osd); if (peer_it == msgr.osd_peer_fds.end()) dirty_osds.erase(do_it++); else do_it++; } // Post sync to affected OSDs wb->fsync_start(); op->parts.resize(dirty_osds.size()); op->retval = 0; { int i = 0; for (auto sync_osd: dirty_osds) { op->parts[i] = { .parent = op, .osd_num = sync_osd, .flags = 0, }; send_sync(op, &op->parts[i]); i++; } } dirty_osds.clear(); resume_1: if (op->inflight_count > 0) { op->state = 1; return 0; } if (op->retval != 0) { wb->fsync_error(); if (op->retval == -EPIPE || op->retval == -EIO || op->retval == -ENOSPC) { // Retry later op->parts.clear(); op->retval = 0; op->inflight_count = 0; op->done_count = 0; op->state = 0; return 0; } } else { wb->fsync_ok(); } erase_op(op); return 1; } void cluster_client_t::send_sync(cluster_op_t *op, cluster_op_part_t *part) { auto peer_it = msgr.osd_peer_fds.find(part->osd_num); assert(peer_it != msgr.osd_peer_fds.end()); part->flags |= PART_SENT; op->inflight_count++; part->op = (osd_op_t){ .op_type = OSD_OP_OUT, .peer_fd = peer_it->second, .req = { .hdr = { .magic = SECONDARY_OSD_OP_MAGIC, .id = next_op_id(), .opcode = OSD_OP_SYNC, }, }, .callback = [this, part](osd_op_t *op_part) { handle_op_part(part); }, }; msgr.outbox_push(&part->op); } static inline void mem_or(void *res, const void *r2, unsigned int len) { unsigned int i; for (i = 0; i < len; ++i) { // Hope the compiler vectorizes this ((uint8_t*)res)[i] = ((uint8_t*)res)[i] | ((uint8_t*)r2)[i]; } } void cluster_client_t::handle_op_part(cluster_op_part_t *part) { cluster_op_t *op = part->parent; int expected = part->op.req.hdr.opcode == OSD_OP_SYNC ? 0 : part->op.req.rw.len; if (part->op.reply.hdr.retval != expected) { // Operation failed, retry part->flags |= PART_ERROR; if (!op->retval || op->retval == -EPIPE || part->op.reply.hdr.retval == -EIO) { // Error priority: EIO > ENOSPC > EPIPE op->retval = part->op.reply.hdr.retval; } int stop_fd = -1; if (op->retval != -EINTR && op->retval != -EIO && op->retval != -ENOSPC) { stop_fd = part->op.peer_fd; if (op->retval != -EPIPE || log_level > 0) { fprintf( stderr, "%s operation failed on OSD %ju: retval=%jd (expected %d), dropping connection\n", osd_op_names[part->op.req.hdr.opcode], part->osd_num, part->op.reply.hdr.retval, expected ); } } else if (log_level > 0) { fprintf( stderr, "%s operation failed on OSD %ju: retval=%jd (expected %d)\n", osd_op_names[part->op.req.hdr.opcode], part->osd_num, part->op.reply.hdr.retval, expected ); } // All next things like timer, continue_sync/rw and stop_client may affect the operation again // So do all these things after modifying operation state, otherwise we may hit reenterability bugs // FIXME postpone such things to set_immediate here to avoid bugs // Set op->retry_after to retry operation after a short pause (not immediately) if (!op->retry_after) { op->retry_after = op->retval != -EPIPE ? client_eio_retry_interval : client_retry_interval; } reset_retry_timer(op->retry_after); if (stop_fd >= 0) { msgr.stop_client(stop_fd); } op->inflight_count--; if (op->inflight_count == 0 && !op->retry_after) { if (op->opcode == OSD_OP_SYNC) continue_sync(op); else continue_rw(op); } } else { // OK op->inflight_count--; if ((op->opcode == OSD_OP_WRITE || op->opcode == OSD_OP_DELETE) && !(op->flags & OP_IMMEDIATE_COMMIT)) dirty_osds.insert(part->osd_num); part->flags |= PART_DONE; op->done_count++; if (op->opcode == OSD_OP_READ || op->opcode == OSD_OP_READ_BITMAP || op->opcode == OSD_OP_READ_CHAIN_BITMAP) { copy_part_bitmap(op, part); op->version = op->parts.size() == 1 ? part->op.reply.rw.version : 0; } else if (op->opcode == OSD_OP_WRITE) { op->version = op->parts.size() == 1 ? part->op.reply.rw.version : 0; } if (op->inflight_count == 0 && !op->retry_after) { if (op->opcode == OSD_OP_SYNC) continue_sync(op); else continue_rw(op); } } } void cluster_client_t::copy_part_bitmap(cluster_op_t *op, cluster_op_part_t *part) { // Copy (OR) bitmap auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(op->cur_inode)); uint32_t pg_block_size = pool_cfg.data_block_size * ( pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks ); uint32_t object_offset = (part->op.req.rw.offset - op->offset) / pool_cfg.bitmap_granularity; uint32_t part_offset = (part->op.req.rw.offset % pg_block_size) / pool_cfg.bitmap_granularity; uint32_t op_len = op->len / pool_cfg.bitmap_granularity; uint32_t part_len = pg_block_size/pool_cfg.bitmap_granularity - part_offset; if (part_len > op_len-object_offset) { part_len = op_len-object_offset; } if (!(object_offset & 0x7) && !(part_offset & 0x7) && (part_len >= 8)) { // Copy bytes mem_or((uint8_t*)op->bitmap_buf + object_offset/8, (uint8_t*)part->op.bitmap + part_offset/8, part_len/8); object_offset += (part_len & ~0x7); part_offset += (part_len & ~0x7); part_len = (part_len & 0x7); } while (part_len > 0) { // Copy bits (*((uint8_t*)op->bitmap_buf + (object_offset >> 3))) |= ( (((*((uint8_t*)part->op.bitmap + (part_offset >> 3))) >> (part_offset & 0x7)) & 0x1) << (object_offset & 0x7) ); part_offset++; object_offset++; part_len--; } } uint64_t cluster_client_t::next_op_id() { return msgr.next_subop_id++; }