Browse Source

Fix write replay ordering when immediate_commit != all

Previous implementation didn't respect write ordering and could lead
to corrupted data when restarting writes after an OSD outage

Also rework cluster_client queueing logic and add tests for it to verify the correct behaviour
tags/v0.5.11
Vitaliy Filippov 1 month ago
parent
commit
a48e2bbf18
12 changed files with 957 additions and 440 deletions
  1. +14
    -4
      src/CMakeLists.txt
  2. +347
    -298
      src/cluster_client.cpp
  3. +23
    -18
      src/cluster_client.h
  4. +25
    -0
      src/etcd_state_client.cpp
  5. +3
    -2
      src/etcd_state_client.h
  6. +0
    -117
      src/messenger.cpp
  7. +1
    -1
      src/messenger.h
  8. +1
    -0
      src/mock/build.sh
  9. +44
    -0
      src/mock/messenger.cpp
  10. +25
    -0
      src/mock/ringloop.h
  11. +128
    -0
      src/msgr_stop.cpp
  12. +346
    -0
      src/test_cluster_client.cpp

+ 14
- 4
src/CMakeLists.txt View File

@@ -14,7 +14,7 @@ if("${CMAKE_INSTALL_PREFIX}" MATCHES "^/usr/local/?$")
endif()

add_definitions(-DVERSION="0.6-dev")
add_definitions(-Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith)
add_definitions(-Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -I ${CMAKE_SOURCE_DIR}/src)
if (${WITH_ASAN})
add_definitions(-fsanitize=address -fno-omit-frame-pointer)
add_link_options(-fsanitize=address -fno-omit-frame-pointer)
@@ -67,7 +67,7 @@ target_link_libraries(fio_vitastor_blk
add_executable(vitastor-osd
osd_main.cpp osd.cpp osd_secondary.cpp msgr_receive.cpp msgr_send.cpp osd_peering.cpp osd_flush.cpp osd_peering_pg.cpp
osd_primary.cpp osd_primary_sync.cpp osd_primary_write.cpp osd_primary_subops.cpp
etcd_state_client.cpp messenger.cpp msgr_op.cpp osd_cluster.cpp http_client.cpp osd_ops.cpp pg_states.cpp
etcd_state_client.cpp messenger.cpp msgr_stop.cpp msgr_op.cpp osd_cluster.cpp http_client.cpp osd_ops.cpp pg_states.cpp
osd_rmw.cpp base64.cpp timerfd_manager.cpp epoll_manager.cpp ../json11/json11.cpp
)
target_link_libraries(vitastor-osd
@@ -87,7 +87,7 @@ target_link_libraries(fio_vitastor_sec
# libvitastor_client.so
add_library(vitastor_client SHARED
cluster_client.cpp epoll_manager.cpp etcd_state_client.cpp
messenger.cpp msgr_op.cpp msgr_send.cpp msgr_receive.cpp ringloop.cpp ../json11/json11.cpp
messenger.cpp msgr_stop.cpp msgr_op.cpp msgr_send.cpp msgr_receive.cpp ringloop.cpp ../json11/json11.cpp
http_client.cpp osd_ops.cpp pg_states.cpp timerfd_manager.cpp base64.cpp
)
target_link_libraries(vitastor_client
@@ -162,7 +162,8 @@ target_link_libraries(osd_rmw_test Jerasure tcmalloc_minimal)

# stub_uring_osd
add_executable(stub_uring_osd
stub_uring_osd.cpp epoll_manager.cpp messenger.cpp msgr_op.cpp msgr_send.cpp msgr_receive.cpp ringloop.cpp timerfd_manager.cpp ../json11/json11.cpp
stub_uring_osd.cpp epoll_manager.cpp messenger.cpp msgr_stop.cpp msgr_op.cpp
msgr_send.cpp msgr_receive.cpp ringloop.cpp timerfd_manager.cpp ../json11/json11.cpp
)
target_link_libraries(stub_uring_osd
${LIBURING_LIBRARIES}
@@ -176,6 +177,15 @@ target_link_libraries(osd_peering_pg_test tcmalloc_minimal)
# test_allocator
add_executable(test_allocator test_allocator.cpp allocator.cpp)

# test_cluster_client
add_executable(test_cluster_client
test_cluster_client.cpp
pg_states.cpp osd_ops.cpp cluster_client.cpp msgr_op.cpp mock/messenger.cpp msgr_stop.cpp
etcd_state_client.cpp timerfd_manager.cpp ../json11/json11.cpp
)
target_compile_definitions(test_cluster_client PUBLIC -D__MOCK__)
target_include_directories(test_cluster_client PUBLIC ${CMAKE_SOURCE_DIR}/src/mock)

## test_blockstore, test_shit
#add_executable(test_blockstore test_blockstore.cpp timerfd_interval.cpp)
#target_link_libraries(test_blockstore blockstore)


+ 347
- 298
src/cluster_client.cpp View File

@@ -5,6 +5,11 @@
#include <assert.h>
#include "cluster_client.h"

#define CACHE_DIRTY 1
#define CACHE_FLUSHING 2
#define CACHE_REPEATING 4
#define OP_FLUSH_BUFFER 2

cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config)
{
this->ringloop = ringloop;
@@ -21,39 +26,18 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
// peer_osd just connected
continue_ops();
}
else if (unsynced_writes.size())
else if (dirty_buffers.size())
{
// peer_osd just dropped connection
for (auto op: syncing_writes)
// determine WHICH dirty_buffers are now obsolete and repeat them
dirty_osds.erase(peer_osd);
for (auto & wr: dirty_buffers)
{
for (auto & part: op->parts)
if (affects_osd(wr.first.inode, wr.first.stripe, wr.second.len, peer_osd) &&
!(wr.second.state & CACHE_REPEATING))
{
if (part.osd_num == peer_osd && part.done)
{
// repeat this operation
part.osd_num = 0;
part.done = false;
assert(!part.sent);
op->done_count--;
}
}
}
for (auto op: unsynced_writes)
{
for (auto & part: op->parts)
{
if (part.osd_num == peer_osd && part.done)
{
// repeat this operation
part.osd_num = 0;
part.done = false;
assert(!part.sent);
op->done_count--;
}
}
if (op->done_count < op->parts.size())
{
cur_ops.insert(op);
// FIXME: Flush in larger parts
flush_buffer(wr.first, wr.second);
}
}
continue_ops();
@@ -91,6 +75,11 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd

cluster_client_t::~cluster_client_t()
{
for (auto bp: dirty_buffers)
{
free(bp.second.buf);
}
dirty_buffers.clear();
if (ringloop)
{
ringloop->unregister_consumer(&consumer);
@@ -99,21 +88,64 @@ cluster_client_t::~cluster_client_t()

void cluster_client_t::continue_ops(bool up_retry)
{
for (auto op_it = cur_ops.begin(); op_it != cur_ops.end(); )
if (!pgs_loaded)
{
// We're offline
return;
}
bool has_flushes = false, has_writes = false;
int j = 0;
for (int i = 0; i < op_queue.size(); i++)
{
if ((*op_it)->up_wait)
bool rm = false;
if (!op_queue[i]->up_wait || up_retry)
{
if (up_retry)
op_queue[i]->up_wait = false;
if (op_queue[i]->opcode == OSD_OP_READ)
{
(*op_it)->up_wait = false;
continue_rw(*op_it++);
rm = continue_rw(op_queue[i]);
}
else if (op_queue[i]->opcode == OSD_OP_WRITE)
{
if (op_queue[i]->flags & OP_FLUSH_BUFFER)
{
rm = continue_rw(op_queue[i]);
if (!rm)
{
// Regular writes can't proceed before buffer flushes
has_flushes = true;
}
}
else if (!has_flushes)
{
rm = continue_rw(op_queue[i]);
}
if (!rm)
{
has_writes = true;
}
}
else if (op_queue[i]->opcode == OSD_OP_SYNC)
{
if (!has_writes)
{
// SYNC can't proceed before previous writes
rm = continue_sync(op_queue[i]);
if (!rm)
{
// Postpone writes until previous SYNC completes
// ...so dirty_writes can't contain anything newer than SYNC
has_flushes = true;
}
}
}
else
op_it++;
}
else
continue_rw(*op_it++);
if (!rm)
{
op_queue[j++] = op_queue[i];
}
}
op_queue.resize(j);
}

static uint32_t is_power_of_two(uint64_t value)
@@ -203,23 +235,10 @@ void cluster_client_t::on_change_hook(json11::Json::object & changes)
{
// At this point, all pool operations should have been suspended
// And now they have to be resliced!
for (auto op: cur_ops)
for (auto op: op_queue)
{
if (INODE_POOL(op->inode) == pool_item.first)
{
op->needs_reslice = true;
}
}
for (auto op: unsynced_writes)
{
if (INODE_POOL(op->inode) == pool_item.first)
{
op->needs_reslice = true;
}
}
for (auto op: syncing_writes)
{
if (INODE_POOL(op->inode) == pool_item.first)
if ((op->opcode == OSD_OP_WRITE || op->opcode == OSD_OP_READ) &&
INODE_POOL(op->inode) == pool_item.first)
{
op->needs_reslice = true;
}
@@ -258,21 +277,15 @@ void cluster_client_t::on_ready(std::function<void(void)> fn)
/**
* How writes are synced when immediate_commit is false
*
* 1) accept up to <client_dirty_limit> write operations for execution,
* queue all subsequent writes into <next_writes>
* 2) accept exactly one SYNC, queue all subsequent SYNCs into <next_writes>, too
* 3) "continue" all accepted writes
*
* "Continue" WRITE:
* 1) if the operation is not a copy yet - copy it (required for replay)
* 2) if the operation is not sliced yet - slice it
* 3) if the operation doesn't require reslice - try to connect & send all remaining parts
* 4) if any of them fail due to disconnected peers or PGs not up, repeat after reconnecting or small timeout
* 5) if any of them fail due to other errors, fail the operation and forget it from the current "unsynced batch"
* 6) if PG count changes before all parts are done, wait for all in-progress parts to finish,
* 1) if the operation is not sliced yet - slice it
* 2) if the operation doesn't require reslice - try to connect & send all remaining parts
* 3) if any of them fail due to disconnected peers or PGs not up, repeat after reconnecting or small timeout
* 4) if any of them fail due to other errors, fail the operation and forget it from the current "unsynced batch"
* 5) if PG count changes before all parts are done, wait for all in-progress parts to finish,
* throw all results away, reslice and resubmit op
* 7) when all parts are done, try to "continue" the current SYNC
* 8) if the operation succeeds, but then some OSDs drop their connections, repeat
* 6) when all parts are done, try to "continue" the current SYNC
* 7) if the operation succeeds, but then some OSDs drop their connections, repeat
* parts from the current "unsynced batch" previously sent to those OSDs in any order
*
* "Continue" current SYNC:
@@ -282,181 +295,241 @@ void cluster_client_t::on_ready(std::function<void(void)> fn)
* 4) if any of them fail due to disconnected peers, repeat SYNC after repeating all writes
* 5) if any of them fail due to other errors, fail the SYNC operation
*/

void cluster_client_t::execute(cluster_op_t *op)
{
if (!pgs_loaded)
{
// We're offline
offline_ops.push_back(op);
return;
}
op->retval = 0;
if (op->opcode != OSD_OP_SYNC && op->opcode != OSD_OP_READ && op->opcode != OSD_OP_WRITE ||
(op->opcode == OSD_OP_READ || op->opcode == OSD_OP_WRITE) && (!op->inode || !op->len ||
op->offset % bs_bitmap_granularity || op->len % bs_bitmap_granularity))
if (op->opcode != OSD_OP_SYNC && op->opcode != OSD_OP_READ && op->opcode != OSD_OP_WRITE)
{
op->retval = -EINVAL;
std::function<void(cluster_op_t*)>(op->callback)(op);
return;
}
if (op->opcode == OSD_OP_SYNC)
{
execute_sync(op);
return;
}
op->retval = 0;
if (op->opcode == OSD_OP_WRITE && !immediate_commit)
{
if (next_writes.size() > 0)
{
assert(cur_sync);
next_writes.push_back(op);
return;
}
if (queued_bytes >= client_dirty_limit)
if (dirty_bytes >= client_dirty_limit)
{
// Push an extra SYNC operation to flush previous writes
next_writes.push_back(op);
cluster_op_t *sync_op = new cluster_op_t;
sync_op->is_internal = true;
sync_op->opcode = OSD_OP_SYNC;
sync_op->callback = [](cluster_op_t* sync_op) {};
execute_sync(sync_op);
return;
sync_op->callback = [](cluster_op_t* sync_op)
{
delete sync_op;
};
op_queue.push_back(sync_op);
dirty_bytes = 0;
}
queued_bytes += op->len;
dirty_bytes += op->len;
}
else if (op->opcode == OSD_OP_SYNC)
{
dirty_bytes = 0;
}
cur_ops.insert(op);
continue_rw(op);
op_queue.push_back(op);
continue_ops();
}

void cluster_client_t::continue_rw(cluster_op_t *op)
void cluster_client_t::copy_write(cluster_op_t *op, std::map<object_id, cluster_buffer_t> & dirty_buffers)
{
pool_id_t pool_id = INODE_POOL(op->inode);
if (!pool_id)
{
op->retval = -EINVAL;
std::function<void(cluster_op_t*)>(op->callback)(op);
return;
// Save operation for replay when one of PGs goes out of sync
// (primary OSD drops our connection in this case)
auto dirty_it = dirty_buffers.lower_bound((object_id){
.inode = op->inode,
.stripe = op->offset,
});
while (dirty_it != dirty_buffers.begin())
{
dirty_it--;
if (dirty_it->first.inode != op->inode ||
(dirty_it->first.stripe + dirty_it->second.len) <= op->offset)
{
dirty_it++;
break;
}
}
if (st_cli.pool_config.find(pool_id) == st_cli.pool_config.end() ||
st_cli.pool_config[pool_id].real_pg_count == 0)
uint64_t pos = op->offset, len = op->len, iov_idx = 0, iov_pos = 0;
while (len > 0)
{
// Postpone operations to unknown pools
return;
}
if (op->opcode == OSD_OP_WRITE && !immediate_commit && !op->is_internal)
{
// Save operation for replay when PG goes out of sync
// (primary OSD drops our connection in this case)
cluster_op_t *op_copy = new cluster_op_t();
op_copy->is_internal = true;
op_copy->orig_op = op;
op_copy->opcode = op->opcode;
op_copy->inode = op->inode;
op_copy->offset = op->offset;
op_copy->len = op->len;
op_copy->buf = malloc_or_die(op->len);
op_copy->iov.push_back(op_copy->buf, op->len);
op_copy->callback = [](cluster_op_t* op_copy)
{
if (op_copy->orig_op)
uint64_t new_len = 0;
if (dirty_it == dirty_buffers.end())
{
new_len = len;
}
else if (dirty_it->first.inode != op->inode || dirty_it->first.stripe > pos)
{
new_len = dirty_it->first.stripe - pos;
if (new_len > len)
{
// Acknowledge write and forget the original pointer
op_copy->orig_op->retval = op_copy->retval;
std::function<void(cluster_op_t*)>(op_copy->orig_op->callback)(op_copy->orig_op);
op_copy->orig_op = NULL;
new_len = len;
}
};
void *cur_buf = op_copy->buf;
for (int i = 0; i < op->iov.count; i++)
}
if (new_len > 0)
{
dirty_it = dirty_buffers.emplace_hint(dirty_it, (object_id){
.inode = op->inode,
.stripe = pos,
}, (cluster_buffer_t){
.buf = malloc_or_die(new_len),
.len = new_len,
});
}
// FIXME: Split big buffers into smaller ones on overwrites. But this will require refcounting
dirty_it->second.state = CACHE_DIRTY;
uint64_t cur_len = (dirty_it->first.stripe + dirty_it->second.len - pos);
if (cur_len > len)
{
cur_len = len;
}
while (cur_len > 0 && iov_idx < op->iov.count)
{
memcpy(cur_buf, op->iov.buf[i].iov_base, op->iov.buf[i].iov_len);
cur_buf += op->iov.buf[i].iov_len;
unsigned iov_len = (op->iov.buf[iov_idx].iov_len - iov_pos);
if (iov_len <= cur_len)
{
memcpy(dirty_it->second.buf + pos - dirty_it->first.stripe,
op->iov.buf[iov_idx].iov_base + iov_pos, iov_len);
pos += iov_len;
len -= iov_len;
cur_len -= iov_len;
iov_pos = 0;
iov_idx++;
}
else
{
memcpy(dirty_it->second.buf + pos - dirty_it->first.stripe,
op->iov.buf[iov_idx].iov_base + iov_pos, cur_len);
pos += cur_len;
len -= cur_len;
iov_pos += cur_len;
cur_len = 0;
}
}
unsynced_writes.push_back(op_copy);
cur_ops.erase(op);
cur_ops.insert(op_copy);
op = op_copy;
dirty_it++;
}
if (!op->parts.size())
}

void cluster_client_t::flush_buffer(const object_id & oid, cluster_buffer_t & wr)
{
wr.state = CACHE_DIRTY | CACHE_REPEATING;
cluster_op_t *op = new cluster_op_t;
op->flags = OP_FLUSH_BUFFER;
op->opcode = OSD_OP_WRITE;
op->inode = oid.inode;
op->offset = oid.stripe;
op->len = wr.len;
op->iov.push_back(wr.buf, wr.len);
op->callback = [](cluster_op_t* op)
{
// Slice the operation into parts
slice_rw(op);
delete op;
};
op_queue.push_front(op);
}

int cluster_client_t::continue_rw(cluster_op_t *op)
{
if (op->state == 0)
goto resume_0;
else if (op->state == 1)
goto resume_1;
else if (op->state == 2)
goto resume_2;
else if (op->state == 3)
goto resume_3;
resume_0:
if (!op->len || op->offset % bs_bitmap_granularity || op->len % bs_bitmap_granularity)
{
op->retval = -EINVAL;
std::function<void(cluster_op_t*)>(op->callback)(op);
return 1;
}
if (!op->needs_reslice)
{
// Send unsent parts, if they're not subject to change
for (auto & op_part: op->parts)
pool_id_t pool_id = INODE_POOL(op->inode);
if (!pool_id)
{
if (!op_part.sent && !op_part.done)
{
try_send(op, &op_part);
}
op->retval = -EINVAL;
std::function<void(cluster_op_t*)>(op->callback)(op);
return 1;
}
if (st_cli.pool_config.find(pool_id) == st_cli.pool_config.end() ||
st_cli.pool_config[pool_id].real_pg_count == 0)
{
// Postpone operations to unknown pools
return 0;
}
}
if (!op->sent_count)
if (op->opcode == OSD_OP_WRITE)
{
if (op->done_count >= op->parts.size())
if (!immediate_commit)
{
// Finished successfully
// Even if the PG count has changed in meanwhile we treat it as success
// because if some operations were invalid for the new PG count we'd get errors
cur_ops.erase(op);
op->retval = op->len;
std::function<void(cluster_op_t*)>(op->callback)(op);
continue_sync();
return;
copy_write(op, dirty_buffers);
}
else if (op->retval != 0 && op->retval != -EPIPE)
}
resume_1:
// Slice the operation into parts
slice_rw(op);
op->needs_reslice = false;
resume_2:
// Send unsent parts, if they're not subject to change
op->state = 3;
for (int i = 0; i < op->parts.size(); i++)
{
if (!op->parts[i].sent && !op->parts[i].done)
{
// Fatal error (not -EPIPE)
cur_ops.erase(op);
if (!immediate_commit && op->opcode == OSD_OP_WRITE)
if (!try_send(op, i))
{
for (int i = 0; i < unsynced_writes.size(); i++)
{
if (unsynced_writes[i] == op)
{
unsynced_writes.erase(unsynced_writes.begin()+i, unsynced_writes.begin()+i+1);
break;
}
}
}
bool del = op->is_internal;
std::function<void(cluster_op_t*)>(op->callback)(op);
if (del)
{
if (op->buf)
free(op->buf);
delete op;
// We'll need to retry again
op->state = 2;
}
continue_sync();
return;
}
}
if (op->state == 2)
{
return 0;
}
resume_3:
if (op->sent_count > 0)
{
op->state = 3;
return 0;
}
if (op->done_count >= op->parts.size())
{
// Finished successfully
// Even if the PG count has changed in meanwhile we treat it as success
// because if some operations were invalid for the new PG count we'd get errors
op->retval = op->len;
std::function<void(cluster_op_t*)>(op->callback)(op);
return 1;
}
else if (op->retval != 0 && op->retval != -EPIPE)
{
// Fatal error (not -EPIPE)
std::function<void(cluster_op_t*)>(op->callback)(op);
return 1;
}
else
{
// -EPIPE - clear the error and retry
op->retval = 0;
if (op->needs_reslice)
{
op->parts.clear();
op->done_count = 0;
goto resume_1;
}
else
{
// -EPIPE or no error - clear the error
op->retval = 0;
if (op->needs_reslice)
{
op->parts.clear();
op->done_count = 0;
op->needs_reslice = false;
continue_rw(op);
}
goto resume_2;
}
}
return 0;
}

void cluster_client_t::slice_rw(cluster_op_t *op)
{
// Slice the request into individual object stripe requests
// Primary OSDs still operate individual stripes, but their size is multiplied by PG minsize in case of EC
auto & pool_cfg = st_cli.pool_config[INODE_POOL(op->inode)];
uint64_t pg_block_size = bs_block_size * (
pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks
);
auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(op->inode));
uint32_t pg_data_size = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks);
uint64_t pg_block_size = bs_block_size * pg_data_size;
uint64_t first_stripe = (op->offset / pg_block_size) * pg_block_size;
uint64_t last_stripe = ((op->offset + op->len + pg_block_size - 1) / pg_block_size - 1) * pg_block_size;
op->retval = 0;
@@ -500,8 +573,28 @@ void cluster_client_t::slice_rw(cluster_op_t *op)
}
}

bool cluster_client_t::try_send(cluster_op_t *op, cluster_op_part_t *part)
bool cluster_client_t::affects_osd(uint64_t inode, uint64_t offset, uint64_t len, osd_num_t osd)
{
auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(inode));
uint32_t pg_data_size = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks);
uint64_t pg_block_size = bs_block_size * pg_data_size;
uint64_t first_stripe = (offset / pg_block_size) * pg_block_size;
uint64_t last_stripe = ((offset + len + pg_block_size - 1) / pg_block_size - 1) * pg_block_size;
for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size)
{
pg_num_t pg_num = (stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1; // like map_to_pg()
auto pg_it = pool_cfg.pg_config.find(pg_num);
if (pg_it != pool_cfg.pg_config.end() && pg_it->second.cur_primary == osd)
{
return true;
}
}
return false;
}

bool cluster_client_t::try_send(cluster_op_t *op, int i)
{
auto part = &op->parts[i];
auto & pool_cfg = st_cli.pool_config[INODE_POOL(op->inode)];
auto pg_it = pool_cfg.pg_config.find(part->pg_num);
if (pg_it != pool_cfg.pg_config.end() &&
@@ -545,129 +638,92 @@ bool cluster_client_t::try_send(cluster_op_t *op, cluster_op_part_t *part)
return false;
}

void cluster_client_t::execute_sync(cluster_op_t *op)
int cluster_client_t::continue_sync(cluster_op_t *op)
{
if (immediate_commit)
if (op->state == 1)
goto resume_1;
if (immediate_commit || !dirty_osds.size())
{
// Syncs are not required in the immediate_commit mode
// Sync is not required in the immediate_commit mode or if there are no dirty_osds
op->retval = 0;
std::function<void(cluster_op_t*)>(op->callback)(op);
}
else if (cur_sync != NULL)
{
next_writes.push_back(op);
}
else
{
cur_sync = op;
continue_sync();
}
}

void cluster_client_t::continue_sync()
{
if (!cur_sync || cur_sync->parts.size() > 0)
{
// Already submitted
return;
}
cur_sync->retval = 0;
std::set<osd_num_t> sync_osds;
for (auto prev_op: unsynced_writes)
{
if (prev_op->done_count < prev_op->parts.size())
{
// Writes not finished yet
return;
}
for (auto & part: prev_op->parts)
{
if (part.osd_num)
{
sync_osds.insert(part.osd_num);
}
}
}
if (!sync_osds.size())
{
// No dirty writes
finish_sync();
return;
return 1;
}
// Check that all OSD connections are still alive
for (auto sync_osd: sync_osds)
for (auto sync_osd: dirty_osds)
{
auto peer_it = msgr.osd_peer_fds.find(sync_osd);
if (peer_it == msgr.osd_peer_fds.end())
{
// SYNC is pointless to send to a non connected OSD
return;
return 0;
}
}
syncing_writes.swap(unsynced_writes);
// Post sync to affected OSDs
cur_sync->parts.resize(sync_osds.size());
int i = 0;
for (auto sync_osd: sync_osds)
for (auto & prev_op: dirty_buffers)
{
cur_sync->parts[i] = {
.parent = cur_sync,
.osd_num = sync_osd,
.sent = false,
.done = false,
};
send_sync(cur_sync, &cur_sync->parts[i]);
i++;
if (prev_op.second.state == CACHE_DIRTY)
{
prev_op.second.state = CACHE_FLUSHING;
}
}
}

void cluster_client_t::finish_sync()
{
int retval = cur_sync->retval;
if (retval != 0)
op->parts.resize(dirty_osds.size());
op->retval = 0;
{
for (auto op: syncing_writes)
int i = 0;
for (auto sync_osd: dirty_osds)
{
if (op->done_count < op->parts.size())
{
cur_ops.insert(op);
}
op->parts[i] = {
.parent = op,
.osd_num = sync_osd,
.sent = false,
.done = false,
};
send_sync(op, &op->parts[i]);
i++;
}
unsynced_writes.insert(unsynced_writes.begin(), syncing_writes.begin(), syncing_writes.end());
syncing_writes.clear();
}
if (retval == -EPIPE)
dirty_osds.clear();
resume_1:
if (op->sent_count > 0)
{
// Retry later
cur_sync->parts.clear();
cur_sync->retval = 0;
cur_sync->sent_count = 0;
cur_sync->done_count = 0;
return;
op->state = 1;
return 0;
}
std::function<void(cluster_op_t*)>(cur_sync->callback)(cur_sync);
if (!retval)
if (op->retval != 0)
{
for (auto op: syncing_writes)
for (auto uw_it = dirty_buffers.begin(); uw_it != dirty_buffers.end(); uw_it++)
{
assert(op->sent_count == 0);
if (op->is_internal)
if (uw_it->second.state == CACHE_FLUSHING)
{
if (op->buf)
free(op->buf);
delete op;
uw_it->second.state = CACHE_DIRTY;
}
}
syncing_writes.clear();
if (op->retval == -EPIPE)
{
// Retry later
op->parts.clear();
op->retval = 0;
op->sent_count = 0;
op->done_count = 0;
op->state = 0;
return 0;
}
}
cur_sync = NULL;
queued_bytes = 0;
std::vector<cluster_op_t*> next_wr_copy;
next_wr_copy.swap(next_writes);
for (auto next_op: next_wr_copy)
else
{
execute(next_op);
for (auto uw_it = dirty_buffers.begin(); uw_it != dirty_buffers.end(); )
{
if (uw_it->second.state == CACHE_FLUSHING)
{
free(uw_it->second.buf);
dirty_buffers.erase(uw_it++);
}
else
uw_it++;
}
}
std::function<void(cluster_op_t*)>(op->callback)(op);
return 1;
}

void cluster_client_t::send_sync(cluster_op_t *op, cluster_op_part_t *part)
@@ -729,19 +785,12 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part)
else
{
// OK
dirty_osds.insert(part->osd_num);
part->done = true;
op->done_count++;
}
if (op->sent_count == 0)
{
if (op->opcode == OSD_OP_SYNC)
{
assert(op == cur_sync);
finish_sync();
}
else if (!op->up_wait)
{
continue_rw(op);
}
continue_ops();
}
}

+ 23
- 18
src/cluster_client.h View File

@@ -37,9 +37,10 @@ struct cluster_op_t
osd_op_buf_list_t iov;
std::function<void(cluster_op_t*)> callback;
protected:
int flags = 0;
int state = 0;
void *buf = NULL;
cluster_op_t *orig_op = NULL;
bool is_internal = false;
bool needs_reslice = false;
bool up_wait = false;
int sent_count = 0, done_count = 0;
@@ -47,6 +48,14 @@ protected:
friend class cluster_client_t;
};

struct cluster_buffer_t
{
void *buf;
uint64_t len;
int state;
};

// FIXME: Split into public and private interfaces
class cluster_client_t
{
timerfd_manager_t *tfd;
@@ -61,21 +70,16 @@ class cluster_client_t
int log_level;
int up_wait_retry_interval = 500; // ms

uint64_t op_id = 1;
ring_consumer_t consumer;
// operations currently in progress
std::set<cluster_op_t*> cur_ops;
int retry_timeout_id = 0;
// unsynced operations are copied in memory to allow replay when cluster isn't in the immediate_commit mode
// unsynced_writes are replayed in any order (because only the SYNC operation guarantees ordering)
std::vector<cluster_op_t*> unsynced_writes;
std::vector<cluster_op_t*> syncing_writes;
cluster_op_t* cur_sync = NULL;
std::vector<cluster_op_t*> next_writes;
uint64_t op_id = 1;
std::vector<cluster_op_t*> offline_ops;
uint64_t queued_bytes = 0;
std::deque<cluster_op_t*> op_queue;
std::map<object_id, cluster_buffer_t> dirty_buffers;
std::set<osd_num_t> dirty_osds;
uint64_t dirty_bytes = 0;

bool pgs_loaded = false;
ring_consumer_t consumer;
std::vector<std::function<void(void)>> on_ready_hooks;

public:
@@ -89,18 +93,19 @@ public:
bool is_ready();
void on_ready(std::function<void(void)> fn);

protected:
static void copy_write(cluster_op_t *op, std::map<object_id, cluster_buffer_t> & dirty_buffers);
void continue_ops(bool up_retry = false);
protected:
bool affects_osd(uint64_t inode, uint64_t offset, uint64_t len, osd_num_t osd);
void flush_buffer(const object_id & oid, cluster_buffer_t & wr);
void on_load_config_hook(json11::Json::object & config);
void on_load_pgs_hook(bool success);
void on_change_hook(json11::Json::object & changes);
void on_change_osd_state_hook(uint64_t peer_osd);
void continue_rw(cluster_op_t *op);
int continue_rw(cluster_op_t *op);
void slice_rw(cluster_op_t *op);
bool try_send(cluster_op_t *op, cluster_op_part_t *part);
void execute_sync(cluster_op_t *op);
void continue_sync();
void finish_sync();
bool try_send(cluster_op_t *op, int i);
int continue_sync(cluster_op_t *op);
void send_sync(cluster_op_t *op, cluster_op_part_t *part);
void handle_op_part(cluster_op_part_t *part);
};

+ 25
- 0
src/etcd_state_client.cpp View File

@@ -4,19 +4,24 @@
#include "osd_ops.h"
#include "pg_states.h"
#include "etcd_state_client.h"
#ifndef __MOCK__
#include "http_client.h"
#include "base64.h"
#endif

etcd_state_client_t::~etcd_state_client_t()
{
etcd_watches_initialised = -1;
#ifndef __MOCK__
if (etcd_watch_ws)
{
etcd_watch_ws->close();
etcd_watch_ws = NULL;
}
#endif
}

#ifndef __MOCK__
json_kv_t etcd_state_client_t::parse_etcd_kv(const json11::Json & kv_json)
{
json_kv_t kv;
@@ -323,6 +328,26 @@ void etcd_state_client_t::load_pgs()
start_etcd_watcher();
});
}
#else
void etcd_state_client_t::parse_config(json11::Json & config)
{
}

void etcd_state_client_t::load_global_config()
{
json11::Json::object global_config;
on_load_config_hook(global_config);
}

void etcd_state_client_t::load_pgs()
{
}
#endif

void etcd_state_client_t::parse_state(const json_kv_t & kv)
{
parse_state(kv.key, kv.value);
}

void etcd_state_client_t::parse_state(const std::string & key, const json11::Json & value)
{


+ 3
- 2
src/etcd_state_client.h View File

@@ -57,6 +57,8 @@ struct websocket_t;
struct etcd_state_client_t
{
protected:
websocket_t *etcd_watch_ws = NULL;
uint64_t bs_block_size = DEFAULT_BLOCK_SIZE;
void add_etcd_url(std::string);
public:
std::vector<std::string> etcd_addresses;
@@ -66,8 +68,6 @@ public:

int etcd_watches_initialised = 0;
uint64_t etcd_watch_revision = 0;
websocket_t *etcd_watch_ws = NULL;
uint64_t bs_block_size = 0;
std::map<pool_id_t, pool_config_t> pool_config;
std::map<osd_num_t, json11::Json> peer_states;

@@ -84,6 +84,7 @@ public:
void start_etcd_watcher();
void load_global_config();
void load_pgs();
void parse_state(const json_kv_t & kv);
void parse_state(const std::string & key, const json11::Json & value);
void parse_config(json11::Json & config);
~etcd_state_client_t();


+ 0
- 117
src/messenger.cpp View File

@@ -357,123 +357,6 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
outbox_push(op);
}

void osd_messenger_t::cancel_osd_ops(osd_client_t *cl)
{
for (auto p: cl->sent_ops)
{
cancel_op(p.second);
}
cl->sent_ops.clear();
cl->outbox.clear();
}

void osd_messenger_t::cancel_op(osd_op_t *op)
{
if (op->op_type == OSD_OP_OUT)
{
op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
op->reply.hdr.id = op->req.hdr.id;
op->reply.hdr.opcode = op->req.hdr.opcode;
op->reply.hdr.retval = -EPIPE;
// Copy lambda to be unaffected by `delete op`
std::function<void(osd_op_t*)>(op->callback)(op);
}
else
{
// This function is only called in stop_client(), so it's fine to destroy the operation
delete op;
}
}

void osd_messenger_t::stop_client(int peer_fd, bool force)
{
assert(peer_fd != 0);
auto it = clients.find(peer_fd);
if (it == clients.end())
{
return;
}
uint64_t repeer_osd = 0;
osd_client_t *cl = it->second;
if (cl->peer_state == PEER_CONNECTED)
{
if (cl->osd_num)
{
// Reload configuration from etcd when the connection is dropped
if (log_level > 0)
printf("[OSD %lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl->osd_num);
repeer_osd = cl->osd_num;
}
else
{
if (log_level > 0)
printf("[OSD %lu] Stopping client %d (regular client)\n", osd_num, peer_fd);
}
}
else if (!force)
{
return;
}
cl->peer_state = PEER_STOPPED;
clients.erase(it);
tfd->set_fd_handler(peer_fd, false, NULL);
if (cl->connect_timeout_id >= 0)
{
tfd->clear_timer(cl->connect_timeout_id);
cl->connect_timeout_id = -1;
}
if (cl->osd_num)
{
osd_peer_fds.erase(cl->osd_num);
}
if (cl->read_op)
{
if (cl->read_op->callback)
{
cancel_op(cl->read_op);
}
else
{
delete cl->read_op;
}
cl->read_op = NULL;
}
for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++)
{
if (*rit == peer_fd)
{
read_ready_clients.erase(rit);
break;
}
}
for (auto wit = write_ready_clients.begin(); wit != write_ready_clients.end(); wit++)
{
if (*wit == peer_fd)
{
write_ready_clients.erase(wit);
break;
}
}
free(cl->in_buf);
cl->in_buf = NULL;
close(peer_fd);
if (repeer_osd)
{
// First repeer PGs as canceling OSD ops may push new operations
// and we need correct PG states when we do that
repeer_pgs(repeer_osd);
}
if (cl->osd_num)
{
// Cancel outbound operations
cancel_osd_ops(cl);
}
if (cl->refs <= 0)
{
delete cl;
}
}

void osd_messenger_t::accept_connections(int listen_fd)
{
// Accept new connections


+ 1
- 1
src/messenger.h View File

@@ -16,7 +16,7 @@
#include "json11/json11.hpp"
#include "msgr_op.h"
#include "timerfd_manager.h"
#include "ringloop.h"
#include <ringloop.h>

#define CL_READ_HDR 1
#define CL_READ_DATA 2


+ 1
- 0
src/mock/build.sh View File

@@ -0,0 +1 @@
g++ -D__MOCK__ -fsanitize=address -g -Wno-pointer-arith pg_states.cpp osd_ops.cpp test_cluster_client.cpp cluster_client.cpp msgr_op.cpp msgr_stop.cpp mock/messenger.cpp etcd_state_client.cpp timerfd_manager.cpp ../json11/json11.cpp -I mock -I . -I ..; ./a.out

+ 44
- 0
src/mock/messenger.cpp View File

@@ -0,0 +1,44 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)

#include <unistd.h>
#include <stdexcept>
#include <assert.h>

#include "messenger.h"

void osd_messenger_t::init()
{
}

osd_messenger_t::~osd_messenger_t()
{
while (clients.size() > 0)
{
stop_client(clients.begin()->first, true);
}
}

void osd_messenger_t::outbox_push(osd_op_t *cur_op)
{
clients[cur_op->peer_fd]->sent_ops[cur_op->req.hdr.id] = cur_op;
}

void osd_messenger_t::parse_config(const json11::Json & config)
{
}

void osd_messenger_t::connect_peer(uint64_t peer_osd, json11::Json peer_state)
{
wanted_peers[peer_osd] = (osd_wanted_peer_t){
.port = 1,
};
}

void osd_messenger_t::read_requests()
{
}

void osd_messenger_t::send_replies()
{
}

+ 25
- 0
src/mock/ringloop.h View File

@@ -0,0 +1,25 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)

#pragma once

#include <functional>

struct ring_consumer_t
{
std::function<void(void)> loop;
};

class ring_loop_t
{
public:
void register_consumer(ring_consumer_t *consumer)
{
}
void unregister_consumer(ring_consumer_t *consumer)
{
}
void submit()
{
}
};

+ 128
- 0
src/msgr_stop.cpp View File

@@ -0,0 +1,128 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)

#include <unistd.h>
#include <assert.h>

#include "messenger.h"

void osd_messenger_t::cancel_osd_ops(osd_client_t *cl)
{
for (auto p: cl->sent_ops)
{
cancel_op(p.second);
}
cl->sent_ops.clear();
cl->outbox.clear();
}

void osd_messenger_t::cancel_op(osd_op_t *op)
{
if (op->op_type == OSD_OP_OUT)
{
op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
op->reply.hdr.id = op->req.hdr.id;
op->reply.hdr.opcode = op->req.hdr.opcode;
op->reply.hdr.retval = -EPIPE;
// Copy lambda to be unaffected by `delete op`
std::function<void(osd_op_t*)>(op->callback)(op);
}
else
{
// This function is only called in stop_client(), so it's fine to destroy the operation
delete op;
}
}

void osd_messenger_t::stop_client(int peer_fd, bool force)
{
assert(peer_fd != 0);
auto it = clients.find(peer_fd);
if (it == clients.end())
{
return;
}
uint64_t repeer_osd = 0;
osd_client_t *cl = it->second;
if (cl->peer_state == PEER_CONNECTED)
{
if (cl->osd_num)
{
// Reload configuration from etcd when the connection is dropped
if (log_level > 0)
printf("[OSD %lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl->osd_num);
repeer_osd = cl->osd_num;
}
else
{
if (log_level > 0)
printf("[OSD %lu] Stopping client %d (regular client)\n", osd_num, peer_fd);
}
}
else if (!force)
{
return;
}
cl->peer_state = PEER_STOPPED;
clients.erase(it);
#ifndef __MOCK__
tfd->set_fd_handler(peer_fd, false, NULL);
if (cl->connect_timeout_id >= 0)
{
tfd->clear_timer(cl->connect_timeout_id);
cl->connect_timeout_id = -1;
}
#endif
if (cl->osd_num)
{
osd_peer_fds.erase(cl->osd_num);
}
if (cl->read_op)
{
if (cl->read_op->callback)
{
cancel_op(cl->read_op);
}
else
{
delete cl->read_op;
}
cl->read_op = NULL;
}
for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++)
{
if (*rit == peer_fd)
{
read_ready_clients.erase(rit);
break;
}
}
for (auto wit = write_ready_clients.begin(); wit != write_ready_clients.end(); wit++)
{
if (*wit == peer_fd)
{
write_ready_clients.erase(wit);
break;
}
}
free(cl->in_buf);
cl->in_buf = NULL;
#ifndef __MOCK__
close(peer_fd);
#endif
if (repeer_osd)
{
// First repeer PGs as canceling OSD ops may push new operations
// and we need correct PG states when we do that
repeer_pgs(repeer_osd);
}
if (cl->osd_num)
{
// Cancel outbound operations
cancel_osd_ops(cl);
}
if (cl->refs <= 0)
{
delete cl;
}
}

+ 346
- 0
src/test_cluster_client.cpp View File

@@ -0,0 +1,346 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)

#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include "cluster_client.h"

void configure_single_pg_pool(cluster_client_t *cli)
{
cli->st_cli.on_load_pgs_hook(true);
cli->st_cli.parse_state((json_kv_t){
.key = "/config/pools",
.value = json11::Json::object {
{ "1", json11::Json::object {
{ "name", "hddpool" },
{ "scheme", "replicated" },
{ "pg_size", 2 },
{ "pg_minsize", 1 },
{ "pg_count", 1 },
{ "failure_domain", "osd" },
} }
},
});
cli->st_cli.parse_state((json_kv_t){
.key = "/config/pgs",
.value = json11::Json::object {
{ "items", json11::Json::object {
{ "1", json11::Json::object {
{ "1", json11::Json::object {
{ "osd_set", json11::Json::array { 1, 2 } },
{ "primary", 1 },
} }
} }
} }
},
});
cli->st_cli.parse_state((json_kv_t){
.key = "/pg/state/1/1",
.value = json11::Json::object {
{ "peers", json11::Json::array { 1, 2 } },
{ "primary", 1 },
{ "state", json11::Json::array { "active" } },
},
});
json11::Json::object changes;
cli->st_cli.on_change_hook(changes);
}

int *test_write(cluster_client_t *cli, uint64_t offset, uint64_t len, uint8_t c)
{
printf("Post write %lx+%lx\n", offset, len);
int *r = new int;
*r = -1;
cluster_op_t *op = new cluster_op_t();
op->opcode = OSD_OP_WRITE;
op->inode = 0x1000000000001;
op->offset = offset;
op->len = len;
op->iov.push_back(malloc_or_die(len), len);
memset(op->iov.buf[0].iov_base, c, len);
op->callback = [r](cluster_op_t *op)
{
if (*r == -1)
printf("Error: Not allowed to complete yet\n");
assert(*r != -1);
*r = op->retval == op->len ? 1 : 0;
free(op->iov.buf[0].iov_base);
printf("Done write %lx+%lx r=%d\n", op->offset, op->len, op->retval);
delete op;
};
cli->execute(op);
return r;
}

int *test_sync(cluster_client_t *cli)
{
printf("Post sync\n");
int *r = new int;
*r = -1;
cluster_op_t *op = new cluster_op_t();
op->opcode = OSD_OP_SYNC;
op->callback = [r](cluster_op_t *op)
{
if (*r == -1)
printf("Error: Not allowed to complete yet\n");
assert(*r != -1);
*r = op->retval == 0 ? 1 : 0;
printf("Done sync r=%d\n", op->retval);
delete op;
};
cli->execute(op);
return r;
}

void can_complete(int *r)
{
// Allow the operation to proceed so the test verifies
// that it doesn't complete earlier than expected
*r = -2;
}

void check_completed(int *r)
{
assert(*r == 1);
delete r;
}

void pretend_connected(cluster_client_t *cli, osd_num_t osd_num)
{
printf("OSD %lu connected\n", osd_num);
int peer_fd = cli->msgr.clients.size() ? std::prev(cli->msgr.clients.end())->first+1 : 10;
cli->msgr.osd_peer_fds[osd_num] = peer_fd;
cli->msgr.clients[peer_fd] = new osd_client_t();
cli->msgr.clients[peer_fd]->osd_num = osd_num;
cli->msgr.clients[peer_fd]->peer_state = PEER_CONNECTED;
cli->msgr.wanted_peers.erase(osd_num);
cli->msgr.repeer_pgs(osd_num);
}

void pretend_disconnected(cluster_client_t *cli, osd_num_t osd_num)
{
printf("OSD %lu disconnected\n", osd_num);
cli->msgr.stop_client(cli->msgr.osd_peer_fds.at(osd_num));
}

void check_op_count(cluster_client_t *cli, osd_num_t osd_num, int ops)
{
int peer_fd = cli->msgr.osd_peer_fds.at(osd_num);
int real_ops = cli->msgr.clients[peer_fd]->sent_ops.size();
if (real_ops != ops)
{
printf("error: %d ops expected, but %d queued\n", ops, real_ops);
assert(0);
}
}

osd_op_t *find_op(cluster_client_t *cli, osd_num_t osd_num, uint64_t opcode, uint64_t offset, uint64_t len)
{
int peer_fd = cli->msgr.osd_peer_fds.at(osd_num);
auto op_it = cli->msgr.clients[peer_fd]->sent_ops.begin();
while (op_it != cli->msgr.clients[peer_fd]->sent_ops.end())
{
auto op = op_it->second;
if (op->req.hdr.opcode == opcode && (opcode == OSD_OP_SYNC ||
op->req.rw.inode == 0x1000000000001 && op->req.rw.offset == offset && op->req.rw.len == len))
{
return op;
}
op_it++;
}
return NULL;
}

void pretend_op_completed(cluster_client_t *cli, osd_op_t *op, int retval)
{
assert(op);
printf("Pretend completed %s %lx+%x\n", op->req.hdr.opcode == OSD_OP_SYNC
? "sync" : (op->req.hdr.opcode == OSD_OP_WRITE ? "write" : "read"), op->req.rw.offset, op->req.rw.len);
uint64_t op_id = op->req.hdr.id;
int peer_fd = op->peer_fd;
op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
op->reply.hdr.id = op->req.hdr.id;
op->reply.hdr.opcode = op->req.hdr.opcode;
op->reply.hdr.retval = retval < 0 ? retval : (op->req.hdr.opcode == OSD_OP_SYNC ? 0 : op->req.rw.len);
// Copy lambda to be unaffected by `delete op`
std::function<void(osd_op_t*)>(op->callback)(op);
cli->msgr.clients[peer_fd]->sent_ops.erase(op_id);
}

void test1()
{
json11::Json config;
timerfd_manager_t *tfd = new timerfd_manager_t([](int fd, bool wr, std::function<void(int, int)> callback){});
cluster_client_t *cli = new cluster_client_t(NULL, tfd, config);

int *r1 = test_write(cli, 0, 4096, 0x55);
configure_single_pg_pool(cli);
pretend_connected(cli, 1);
can_complete(r1);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 4096), 0);
check_completed(r1);
pretend_disconnected(cli, 1);
int *r2 = test_sync(cli);
pretend_connected(cli, 1);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 4096), 0);
check_op_count(cli, 1, 1);
can_complete(r2);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_SYNC, 0, 0), 0);
check_completed(r2);
// Check that the client doesn't repeat operations once more
pretend_disconnected(cli, 1);
pretend_connected(cli, 1);
check_op_count(cli, 1, 0);

// Case:
// Write(1) -> Complete Write(1) -> Overwrite(2) -> Complete Write(2)
// -> Overwrite(3) -> Drop OSD connection -> Reestablish OSD connection
// -> Complete All Posted Writes -> Sync -> Complete Sync
// The resulting state of the block must be (3) over (2) over (1).
// I.e. the part overwritten by (3) must remain as in (3) and so on.

// More interesting case:
// Same, but both Write(2) and Write(3) must consist of two parts:
// one from an OSD 2 that drops connection and other from OSD 1 that doesn't.
// The idea is that if the whole Write(2) is repeated when OSD 2 drops connection
// then it may also overwrite a part in OSD 1 which shouldn't be overwritten.

// Another interesting case:
// A new operation added during replay (would also break with the previous implementation)

r1 = test_write(cli, 0, 0x10000, 0x56);
can_complete(r1);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x10000), 0);
check_completed(r1);

r1 = test_write(cli, 0xE000, 0x4000, 0x57);
can_complete(r1);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0xE000, 0x4000), 0);
check_completed(r1);

r1 = test_write(cli, 0x10000, 0x4000, 0x58);

pretend_disconnected(cli, 1);
cli->continue_ops(true);
pretend_connected(cli, 1);

// Check replay
{
uint64_t replay_start = UINT64_MAX;
uint64_t replay_end = 0;
std::vector<osd_op_t*> replay_ops;
auto osd_cl = cli->msgr.clients.at(cli->msgr.osd_peer_fds.at(1));
for (auto & op_p: osd_cl->sent_ops)
{
auto op = op_p.second;
assert(op->req.hdr.opcode == OSD_OP_WRITE);
uint64_t offset = op->req.rw.offset;
if (op->req.rw.offset < replay_start)
replay_start = op->req.rw.offset;
if (op->req.rw.offset+op->req.rw.len > replay_end)
replay_end = op->req.rw.offset+op->req.rw.len;
for (int buf_idx = 0; buf_idx < op->iov.count; buf_idx++)
{
for (int i = 0; i < op->iov.buf[buf_idx].iov_len; i++, offset++)
{
uint8_t c = offset < 0xE000 ? 0x56 : (offset < 0x10000 ? 0x57 : 0x58);
if (((uint8_t*)op->iov.buf[buf_idx].iov_base)[i] != c)
{
printf("Write replay: mismatch at %lu\n", offset-op->req.rw.offset);
goto fail;
}
}
}
fail:
assert(offset == op->req.rw.offset+op->req.rw.len);
replay_ops.push_back(op);
}
assert(replay_start == 0);
assert(replay_end == 0x14000);
for (auto op: replay_ops)
{
pretend_op_completed(cli, op, 0);
}
}
// Check that the following write finally proceeds
check_op_count(cli, 1, 1);
can_complete(r1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0x10000, 0x4000), 0);
check_completed(r1);
check_op_count(cli, 1, 0);
// Free client
delete cli;
delete tfd;
printf("[ok] write replay test\n");
}

void test2()
{
std::map<object_id, cluster_buffer_t> unsynced_writes;
cluster_op_t *op = new cluster_op_t();
op->opcode = OSD_OP_WRITE;
op->inode = 1;
op->offset = 0;
op->len = 4096;
op->iov.push_back(malloc_or_die(4096*1024), 4096);
// 0-4k = 0x55
memset(op->iov.buf[0].iov_base, 0x55, op->iov.buf[0].iov_len);
cluster_client_t::copy_write(op, unsynced_writes);
// 8k-12k = 0x66
op->offset = 8192;
memset(op->iov.buf[0].iov_base, 0x66, op->iov.buf[0].iov_len);
cluster_client_t::copy_write(op, unsynced_writes);
// 4k-1M+4k = 0x77
op->len = op->iov.buf[0].iov_len = 1048576;
op->offset = 4096;
memset(op->iov.buf[0].iov_base, 0x77, op->iov.buf[0].iov_len);
cluster_client_t::copy_write(op, unsynced_writes);
// check it
assert(unsynced_writes.size() == 4);
auto uit = unsynced_writes.begin();
int i;
assert(uit->first.inode == 1);
assert(uit->first.stripe == 0);
assert(uit->second.len == 4096);
for (i = 0; i < uit->second.len && ((uint8_t*)uit->second.buf)[i] == 0x55; i++) {}
assert(i == uit->second.len);
uit++;
assert(uit->first.inode == 1);
assert(uit->first.stripe == 4096);
assert(uit->second.len == 4096);
for (i = 0; i < uit->second.len && ((uint8_t*)uit->second.buf)[i] == 0x77; i++) {}
assert(i == uit->second.len);
uit++;
assert(uit->first.inode == 1);
assert(uit->first.stripe == 8192);
assert(uit->second.len == 4096);
for (i = 0; i < uit->second.len && ((uint8_t*)uit->second.buf)[i] == 0x77; i++) {}
assert(i == uit->second.len);
uit++;
assert(uit->first.inode == 1);
assert(uit->first.stripe == 12*1024);
assert(uit->second.len == 1016*1024);
for (i = 0; i < uit->second.len && ((uint8_t*)uit->second.buf)[i] == 0x77; i++) {}
assert(i == uit->second.len);
uit++;
// free memory
free(op->iov.buf[0].iov_base);
delete op;
for (auto p: unsynced_writes)
{
free(p.second.buf);
}
printf("[ok] copy_write test\n");
}

int main(int narg, char *args[])
{
test1();
test2();
return 0;
}

Loading…
Cancel
Save