@ -18,6 +18,20 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
else if ( unsynced_writes . size ( ) )
{
// peer_osd just dropped connection
for ( auto op : syncing_writes )
{
for ( auto & part : op - > parts )
{
if ( part . osd_num = = peer_osd & & part . done )
{
// repeat this operation
part . osd_num = 0 ;
part . done = false ;
assert ( ! part . sent ) ;
op - > done_count - - ;
}
}
}
for ( auto op : unsynced_writes )
{
for ( auto & part : op - > parts )
@ -206,6 +220,10 @@ void cluster_client_t::on_change_hook(json11::Json::object & changes)
{
op - > needs_reslice = true ;
}
for ( auto op : syncing_writes )
{
op - > needs_reslice = true ;
}
pg_count = st_cli . pg_config . size ( ) ;
if ( pg_count )
{
@ -281,19 +299,19 @@ void cluster_client_t::execute(cluster_op_t *op)
{
if ( next_writes . size ( ) > 0 )
{
assert ( cur_sync ) ;
next_writes . push_back ( op ) ;
return ;
}
if ( queued_bytes > = client_dirty_limit )
{
// Push an extra SYNC operation to flush previous writes
next_writes . push_back ( op ) ;
cluster_op_t * sync_op = new cluster_op_t ;
sync_op - > is_internal = true ;
sync_op - > opcode = OSD_OP_SYNC ;
sync_op - > callback = [ ] ( cluster_op_t * sync_op ) { } ;
execute_sync ( sync_op ) ;
next_writes . push_back ( op ) ;
queued_bytes = 0 ;
return ;
}
queued_bytes + = op - > len ;
@ -512,10 +530,8 @@ void cluster_client_t::execute_sync(cluster_op_t *op)
op - > retval = 0 ;
std : : function < void ( cluster_op_t * ) > ( op - > callback ) ( op ) ;
}
else if ( next_writes . size ( ) > 0 | | cur_sync ! = NULL )
else if ( cur_sync ! = NULL )
{
// There are some writes postponed to be executed after SYNC
// Push this SYNC after them
next_writes . push_back ( op ) ;
}
else
@ -565,6 +581,7 @@ void cluster_client_t::continue_sync()
return ;
}
}
syncing_writes . swap ( unsynced_writes ) ;
// Post sync to affected OSDs
cur_sync - > parts . resize ( sync_osds . size ( ) ) ;
int i = 0 ;
@ -584,6 +601,18 @@ void cluster_client_t::continue_sync()
void cluster_client_t : : finish_sync ( )
{
int retval = cur_sync - > retval ;
if ( retval ! = 0 )
{
for ( auto op : syncing_writes )
{
if ( op - > done_count < op - > parts . size ( ) )
{
cur_ops . insert ( op ) ;
}
}
unsynced_writes . insert ( unsynced_writes . begin ( ) , syncing_writes . begin ( ) , syncing_writes . end ( ) ) ;
syncing_writes . clear ( ) ;
}
if ( retval = = - EPIPE )
{
// Retry later
@ -596,8 +625,9 @@ void cluster_client_t::finish_sync()
std : : function < void ( cluster_op_t * ) > ( cur_sync - > callback ) ( cur_sync ) ;
if ( ! retval )
{
for ( auto op : unsynced _writes)
for ( auto op : syncing _writes)
{
assert ( op - > sent_count = = 0 ) ;
if ( op - > is_internal )
{
if ( op - > buf )
@ -605,24 +635,15 @@ void cluster_client_t::finish_sync()
delete op ;
}
}
unsynced _writes. clear ( ) ;
syncing _writes. clear ( ) ;
}
cur_sync = NULL ;
while ( next_writes . size ( ) > 0 )
queued_bytes = 0 ;
std : : vector < cluster_op_t * > next_wr_copy ;
next_wr_copy . swap ( next_writes ) ;
for ( auto next_op : next_wr_copy )
{
if ( next_writes [ 0 ] - > opcode = = OSD_OP_SYNC )
{
cur_sync = next_writes [ 0 ] ;
next_writes . erase ( next_writes . begin ( ) , next_writes . begin ( ) + 1 ) ;
continue_sync ( ) ;
}
else
{
auto wr = next_writes [ 0 ] ;
cur_ops . insert ( wr ) ;
next_writes . erase ( next_writes . begin ( ) , next_writes . begin ( ) + 1 ) ;
continue_rw ( wr ) ;
}
execute ( next_op ) ;
}
}