Fix some extra bugs and it seems now it is even able to trim the journal
parent
7e87290fca
commit
82a2b8e7d9
|
@ -172,7 +172,7 @@ resume_0:
|
||||||
data->iov = (struct iovec){ v.back().buf, (size_t)submit_len };
|
data->iov = (struct iovec){ v.back().buf, (size_t)submit_len };
|
||||||
data->callback = simple_callback;
|
data->callback = simple_callback;
|
||||||
my_uring_prep_readv(
|
my_uring_prep_readv(
|
||||||
sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + dirty_it->second.location + offset
|
sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + dirty_it->second.location + offset - dirty_it->second.offset
|
||||||
);
|
);
|
||||||
wait_count++;
|
wait_count++;
|
||||||
}
|
}
|
||||||
|
@ -185,7 +185,6 @@ resume_0:
|
||||||
else if (dirty_it->second.state == ST_D_STABLE)
|
else if (dirty_it->second.state == ST_D_STABLE)
|
||||||
{
|
{
|
||||||
// There is an unflushed big write. Copy small writes in its position
|
// There is an unflushed big write. Copy small writes in its position
|
||||||
printf("found ");
|
|
||||||
if (!skip_copy)
|
if (!skip_copy)
|
||||||
{
|
{
|
||||||
clean_loc = dirty_it->second.location;
|
clean_loc = dirty_it->second.location;
|
||||||
|
@ -274,10 +273,12 @@ resume_0:
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
meta_it->second.usage_count++;
|
meta_it->second.usage_count++;
|
||||||
wait_state = 3;
|
|
||||||
resume_3:
|
resume_3:
|
||||||
if (wait_count > 0)
|
if (wait_count > 0)
|
||||||
|
{
|
||||||
|
wait_state = 3;
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
// Reads completed, submit writes
|
// Reads completed, submit writes
|
||||||
for (it = v.begin(); it != v.end(); it++)
|
for (it = v.begin(); it != v.end(); it++)
|
||||||
{
|
{
|
||||||
|
@ -297,12 +298,12 @@ resume_0:
|
||||||
wait_state = 5;
|
wait_state = 5;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
*((clean_disk_entry*)meta_it->second.buf + meta_pos) = {
|
((clean_disk_entry*)meta_it->second.buf)[meta_pos] = {
|
||||||
.oid = cur.oid,
|
.oid = cur.oid,
|
||||||
.version = cur.version,
|
.version = cur.version,
|
||||||
};
|
};
|
||||||
// I consider unordered writes to data & metadata safe here
|
// I consider unordered writes to data & metadata safe here
|
||||||
// BUT it requires that journal entries even older than clean_db should be replayed after restart
|
// BUT it requires that journal entries even older than clean_db are replayed after restart
|
||||||
await_sqe(6);
|
await_sqe(6);
|
||||||
data->iov = (struct iovec){ meta_it->second.buf, 512 };
|
data->iov = (struct iovec){ meta_it->second.buf, 512 };
|
||||||
data->callback = simple_callback;
|
data->callback = simple_callback;
|
||||||
|
@ -310,10 +311,12 @@ resume_0:
|
||||||
sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector
|
sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector
|
||||||
);
|
);
|
||||||
wait_count++;
|
wait_count++;
|
||||||
wait_state = 7;
|
|
||||||
resume_7:
|
resume_7:
|
||||||
if (wait_count > 0)
|
if (wait_count > 0)
|
||||||
|
{
|
||||||
|
wait_state = 7;
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
// Done, free all buffers
|
// Done, free all buffers
|
||||||
meta_it->second.usage_count--;
|
meta_it->second.usage_count--;
|
||||||
if (meta_it->second.usage_count == 0)
|
if (meta_it->second.usage_count == 0)
|
||||||
|
@ -383,7 +386,7 @@ resume_0:
|
||||||
.location = clean_loc,
|
.location = clean_loc,
|
||||||
};
|
};
|
||||||
dirty_it = dirty_end;
|
dirty_it = dirty_end;
|
||||||
do
|
while (1)
|
||||||
{
|
{
|
||||||
if (IS_BIG_WRITE(dirty_it->second.state) && dirty_it->second.location != clean_loc)
|
if (IS_BIG_WRITE(dirty_it->second.state) && dirty_it->second.location != clean_loc)
|
||||||
{
|
{
|
||||||
|
@ -394,8 +397,16 @@ resume_0:
|
||||||
{
|
{
|
||||||
bs->journal.used_sectors.erase(dirty_it->second.journal_sector);
|
bs->journal.used_sectors.erase(dirty_it->second.journal_sector);
|
||||||
}
|
}
|
||||||
|
if (dirty_it == bs->dirty_db.begin())
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
dirty_it--;
|
dirty_it--;
|
||||||
} while (dirty_it != bs->dirty_db.begin() && dirty_it->first.oid == cur.oid);
|
if (dirty_it->first.oid != cur.oid)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
// Then, basically, remove everything up to the current version from dirty_db...
|
// Then, basically, remove everything up to the current version from dirty_db...
|
||||||
if (dirty_it->first.oid != cur.oid)
|
if (dirty_it->first.oid != cur.oid)
|
||||||
dirty_it++;
|
dirty_it++;
|
||||||
|
@ -417,6 +428,7 @@ resume_0:
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
bs->journal.used_start = journal_used_it->first;
|
bs->journal.used_start = journal_used_it->first;
|
||||||
|
// next_free does not need updating here
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (journal_used_it->first > bs->journal.used_start)
|
else if (journal_used_it->first > bs->journal.used_start)
|
||||||
|
@ -444,10 +456,12 @@ resume_0:
|
||||||
data->iov = (struct iovec){ flusher->journal_superblock, 512 };
|
data->iov = (struct iovec){ flusher->journal_superblock, 512 };
|
||||||
my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
|
my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
|
||||||
wait_count++;
|
wait_count++;
|
||||||
wait_state = 13;
|
|
||||||
resume_13:
|
resume_13:
|
||||||
if (wait_count > 0)
|
if (wait_count > 0)
|
||||||
|
{
|
||||||
|
wait_state = 13;
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
do_not_trim:
|
do_not_trim:
|
||||||
// All done
|
// All done
|
||||||
|
@ -458,8 +472,8 @@ resume_0:
|
||||||
{
|
{
|
||||||
// Requeue version
|
// Requeue version
|
||||||
flusher->unshift_flush({ .oid = cur.oid, .version = repeat_it->second });
|
flusher->unshift_flush({ .oid = cur.oid, .version = repeat_it->second });
|
||||||
|
flusher->sync_to_repeat.erase(repeat_it);
|
||||||
}
|
}
|
||||||
flusher->sync_to_repeat.erase(repeat_it);
|
|
||||||
goto resume_0;
|
goto resume_0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,7 +58,7 @@ int blockstore_init_meta::loop()
|
||||||
int count = 512 / sizeof(clean_disk_entry);
|
int count = 512 / sizeof(clean_disk_entry);
|
||||||
for (int sector = 0; sector < done_len; sector += 512)
|
for (int sector = 0; sector < done_len; sector += 512)
|
||||||
{
|
{
|
||||||
clean_disk_entry *entries = (clean_disk_entry*)(metadata_buffer + (prev_done == 1 ? bs->metadata_buf_size : 0) + sector);
|
clean_disk_entry *entries = (clean_disk_entry*)(metadata_buffer + (prev_done == 2 ? bs->metadata_buf_size : 0) + sector);
|
||||||
// handle <count> entries
|
// handle <count> entries
|
||||||
handle_entries(entries, count, bs->block_order);
|
handle_entries(entries, count, bs->block_order);
|
||||||
done_cnt += count;
|
done_cnt += count;
|
||||||
|
@ -88,12 +88,13 @@ void blockstore_init_meta::handle_entries(struct clean_disk_entry* entries, int
|
||||||
auto clean_it = bs->clean_db.find(entries[i].oid);
|
auto clean_it = bs->clean_db.find(entries[i].oid);
|
||||||
if (clean_it == end || clean_it->second.version < entries[i].version)
|
if (clean_it == end || clean_it->second.version < entries[i].version)
|
||||||
{
|
{
|
||||||
entries_loaded++;
|
|
||||||
if (clean_it != end)
|
if (clean_it != end)
|
||||||
{
|
{
|
||||||
// free the previous block
|
// free the previous block
|
||||||
allocator_set(bs->data_alloc, clean_it->second.version >> block_order, false);
|
allocator_set(bs->data_alloc, clean_it->second.version >> block_order, false);
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
entries_loaded++;
|
||||||
allocator_set(bs->data_alloc, done_cnt+i, true);
|
allocator_set(bs->data_alloc, done_cnt+i, true);
|
||||||
bs->clean_db[entries[i].oid] = (struct clean_entry){
|
bs->clean_db[entries[i].oid] = (struct clean_entry){
|
||||||
.version = entries[i].version,
|
.version = entries[i].version,
|
||||||
|
@ -109,7 +110,6 @@ blockstore_init_journal::blockstore_init_journal(blockstore *bs)
|
||||||
this->bs = bs;
|
this->bs = bs;
|
||||||
simple_callback = [this](ring_data_t *data1)
|
simple_callback = [this](ring_data_t *data1)
|
||||||
{
|
{
|
||||||
printf("%d %d\n", data1->res, data1->iov.iov_len);
|
|
||||||
if (data1->res != data1->iov.iov_len)
|
if (data1->res != data1->iov.iov_len)
|
||||||
{
|
{
|
||||||
throw std::runtime_error(std::string("I/O operation failed while reading journal: ") + strerror(-data1->res));
|
throw std::runtime_error(std::string("I/O operation failed while reading journal: ") + strerror(-data1->res));
|
||||||
|
@ -235,7 +235,7 @@ resume_1:
|
||||||
journal_pos = bs->journal.used_start = je_start->journal_start;
|
journal_pos = bs->journal.used_start = je_start->journal_start;
|
||||||
crc32_last = 0;
|
crc32_last = 0;
|
||||||
// Read journal
|
// Read journal
|
||||||
while (true)
|
while (1)
|
||||||
{
|
{
|
||||||
resume_2:
|
resume_2:
|
||||||
if (submitted)
|
if (submitted)
|
||||||
|
|
|
@ -122,7 +122,7 @@ struct journal_t
|
||||||
journal_sector_info_t *sector_info;
|
journal_sector_info_t *sector_info;
|
||||||
uint64_t sector_count;
|
uint64_t sector_count;
|
||||||
int cur_sector = 0;
|
int cur_sector = 0;
|
||||||
int in_sector_pos = 512; // no free space because sector is initially inmapped
|
int in_sector_pos = 512; // no free space because sector is initially unmapped
|
||||||
|
|
||||||
// Used sector map
|
// Used sector map
|
||||||
// May use ~ 80 MB per 1 GB of used journal space in the worst case
|
// May use ~ 80 MB per 1 GB of used journal space in the worst case
|
||||||
|
|
|
@ -126,7 +126,7 @@ void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op
|
||||||
auto dirty_it = dirty_db.find(*v);
|
auto dirty_it = dirty_db.find(*v);
|
||||||
if (dirty_it != dirty_db.end())
|
if (dirty_it != dirty_db.end())
|
||||||
{
|
{
|
||||||
do
|
while (1)
|
||||||
{
|
{
|
||||||
if (dirty_it->second.state == ST_J_SYNCED)
|
if (dirty_it->second.state == ST_J_SYNCED)
|
||||||
{
|
{
|
||||||
|
@ -140,8 +140,16 @@ void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
if (dirty_it == dirty_db.begin())
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
dirty_it--;
|
dirty_it--;
|
||||||
} while (dirty_it != dirty_db.begin() && dirty_it->first.oid == v->oid);
|
if (dirty_it->first.oid != v->oid)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
flusher->queue_flush(*v);
|
flusher->queue_flush(*v);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -138,7 +138,7 @@ int main(int narg, char *args[])
|
||||||
};
|
};
|
||||||
|
|
||||||
ringloop->register_consumer(main_cons);
|
ringloop->register_consumer(main_cons);
|
||||||
while (true)
|
while (1)
|
||||||
{
|
{
|
||||||
ringloop->loop();
|
ringloop->loop();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue