@ -8,10 +8,12 @@ journal_flusher_t::journal_flusher_t(int flusher_count, blockstore_impl_t *bs)
this - > bs = bs ;
this - > flusher_count = flusher_count ;
dequeuing = false ;
trimming = false ;
active_flushers = 0 ;
syncing_flushers = 0 ;
// FIXME: allow to configure flusher_start_threshold and journal_trim_interval
flusher_start_threshold = bs - > journal_block_size / sizeof ( journal_entry_stable ) ;
journal_trim_interval = flusher_start_threshold ;
journal_trim_interval = 512 ;
journal_trim_counter = 0 ;
journal_superblock = bs - > journal . inmemory ? bs - > journal . buffer : memalign_or_die ( MEM_ALIGNMENT , bs - > journal_block_size ) ;
co = new journal_flusher_co [ flusher_count ] ;
@ -172,6 +174,12 @@ bool journal_flusher_co::loop()
goto resume_17 ;
else if ( wait_state = = 18 )
goto resume_18 ;
else if ( wait_state = = 19 )
goto resume_19 ;
else if ( wait_state = = 20 )
goto resume_20 ;
else if ( wait_state = = 21 )
goto resume_21 ;
resume_0 :
if ( ! flusher - > flush_queue . size ( ) | | ! flusher - > dequeuing )
{
@ -484,9 +492,18 @@ resume_1:
if ( ! ( ( + + flusher - > journal_trim_counter ) % flusher - > journal_trim_interval ) | | flusher - > trim_wanted > 0 )
{
flusher - > journal_trim_counter = 0 ;
if ( bs - > journal . trim ( ) )
new_trim_pos = bs - > journal . get_trim_pos ( ) ;
if ( new_trim_pos ! = bs - > journal . used_start )
{
// Update journal "superblock"
resume_19 :
// Wait for other coroutines trimming the journal, if any
if ( flusher - > trimming )
{
wait_state = 19 ;
return false ;
}
flusher - > trimming = true ;
// First update journal "superblock" and only then update <used_start> in memory
await_sqe ( 12 ) ;
* ( ( journal_entry_start * ) flusher - > journal_superblock ) = {
. crc32 = 0 ,
@ -494,7 +511,7 @@ resume_1:
. type = JE_START ,
. size = sizeof ( journal_entry_start ) ,
. reserved = 0 ,
. journal_start = bs - > journal . used_start ,
. journal_start = new_trim_pos ,
} ;
( ( journal_entry_start * ) flusher - > journal_superblock ) - > crc32 = je_crc32 ( ( journal_entry * ) flusher - > journal_superblock ) ;
data - > iov = ( struct iovec ) { flusher - > journal_superblock , bs - > journal_block_size } ;
@ -507,6 +524,24 @@ resume_1:
wait_state = 13 ;
return false ;
}
if ( ! bs - > disable_journal_fsync )
{
await_sqe ( 20 ) ;
my_uring_prep_fsync ( sqe , bs - > journal . fd , IORING_FSYNC_DATASYNC ) ;
data - > iov = { 0 } ;
data - > callback = simple_callback_w ;
resume_21 :
if ( wait_count > 0 )
{
wait_state = 21 ;
return false ;
}
}
bs - > journal . used_start = new_trim_pos ;
# ifdef BLOCKSTORE_DEBUG
printf ( " Journal trimmed to %08lx (next_free=%08lx) \n " , bs - > journal . used_start , bs - > journal . next_free ) ;
# endif
flusher - > trimming = false ;
}
}
// All done