Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

221 lines
8.0 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 (see README.md for details)
  3. #include "blockstore_impl.h"
  4. blockstore_journal_check_t::blockstore_journal_check_t(blockstore_impl_t *bs)
  5. {
  6. this->bs = bs;
  7. sectors_required = 0;
  8. next_pos = bs->journal.next_free;
  9. next_sector = bs->journal.cur_sector;
  10. first_sector = -1;
  11. next_in_pos = bs->journal.in_sector_pos;
  12. right_dir = next_pos >= bs->journal.used_start;
  13. }
  14. // Check if we can write <required> entries of <size> bytes and <data_after> data bytes after them to the journal
  15. int blockstore_journal_check_t::check_available(blockstore_op_t *op, int entries_required, int size, int data_after)
  16. {
  17. int required = entries_required;
  18. while (1)
  19. {
  20. int fits = bs->journal.no_same_sector_overwrites && bs->journal.sector_info[next_sector].written
  21. ? 0
  22. : (bs->journal.block_size - next_in_pos) / size;
  23. if (fits > 0)
  24. {
  25. if (first_sector == -1)
  26. {
  27. first_sector = next_sector;
  28. }
  29. required -= fits;
  30. next_in_pos += fits * size;
  31. sectors_required++;
  32. }
  33. else if (bs->journal.sector_info[next_sector].dirty)
  34. {
  35. // sectors_required is more like "sectors to write"
  36. sectors_required++;
  37. }
  38. if (required <= 0)
  39. {
  40. break;
  41. }
  42. next_pos = next_pos + bs->journal.block_size;
  43. if (next_pos >= bs->journal.len)
  44. {
  45. next_pos = bs->journal.block_size;
  46. right_dir = false;
  47. }
  48. next_in_pos = 0;
  49. next_sector = ((next_sector + 1) % bs->journal.sector_count);
  50. if (next_sector == first_sector)
  51. {
  52. // next_sector may wrap when all sectors are flushed and the incoming batch is too big
  53. // This is an error condition, we can't wait for anything in this case
  54. throw std::runtime_error(
  55. "Blockstore journal_sector_buffer_count="+std::to_string(bs->journal.sector_count)+
  56. " is too small for a batch of "+std::to_string(entries_required)+" entries of "+std::to_string(size)+" bytes"
  57. );
  58. }
  59. if (bs->journal.sector_info[next_sector].usage_count > 0 ||
  60. bs->journal.sector_info[next_sector].dirty)
  61. {
  62. // No memory buffer available. Wait for it.
  63. int used = 0, dirty = 0;
  64. for (int i = 0; i < bs->journal.sector_count; i++)
  65. {
  66. if (bs->journal.sector_info[i].dirty)
  67. {
  68. dirty++;
  69. used++;
  70. }
  71. if (bs->journal.sector_info[i].usage_count > 0)
  72. {
  73. used++;
  74. }
  75. }
  76. // In fact, it's even more rare than "ran out of journal space", so print a warning
  77. printf(
  78. "Ran out of journal sector buffers: %d/%lu buffers used (%d dirty), next buffer (%ld) is %s and flushed %lu times\n",
  79. used, bs->journal.sector_count, dirty, next_sector,
  80. bs->journal.sector_info[next_sector].dirty ? "dirty" : "not dirty",
  81. bs->journal.sector_info[next_sector].usage_count
  82. );
  83. PRIV(op)->wait_for = WAIT_JOURNAL_BUFFER;
  84. return 0;
  85. }
  86. }
  87. if (data_after > 0)
  88. {
  89. next_pos = next_pos + data_after;
  90. if (next_pos > bs->journal.len)
  91. {
  92. next_pos = bs->journal.block_size + data_after;
  93. right_dir = false;
  94. }
  95. }
  96. if (!right_dir && next_pos >= bs->journal.used_start-bs->journal.block_size)
  97. {
  98. // No space in the journal. Wait until used_start changes.
  99. printf(
  100. "Ran out of journal space (free space: %lu bytes, sectors to write: %d)\n",
  101. (bs->journal.next_free >= bs->journal.used_start
  102. ? bs->journal.len-bs->journal.block_size - (bs->journal.next_free-bs->journal.used_start)
  103. : bs->journal.used_start - bs->journal.next_free),
  104. sectors_required
  105. );
  106. PRIV(op)->wait_for = WAIT_JOURNAL;
  107. bs->flusher->request_trim();
  108. PRIV(op)->wait_detail = bs->journal.used_start;
  109. return 0;
  110. }
  111. return 1;
  112. }
  113. journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, uint32_t size)
  114. {
  115. if (journal.block_size - journal.in_sector_pos < size ||
  116. journal.no_same_sector_overwrites && journal.sector_info[journal.cur_sector].written)
  117. {
  118. assert(!journal.sector_info[journal.cur_sector].dirty);
  119. // Move to the next journal sector
  120. journal.sector_info[journal.cur_sector].written = false;
  121. if (journal.sector_info[journal.cur_sector].usage_count > 0)
  122. {
  123. // Also select next sector buffer in memory
  124. journal.cur_sector = ((journal.cur_sector + 1) % journal.sector_count);
  125. assert(!journal.sector_info[journal.cur_sector].usage_count);
  126. }
  127. else
  128. {
  129. journal.dirty_start = journal.next_free;
  130. }
  131. journal.sector_info[journal.cur_sector].offset = journal.next_free;
  132. journal.in_sector_pos = 0;
  133. journal.next_free = (journal.next_free+journal.block_size) < journal.len ? journal.next_free + journal.block_size : journal.block_size;
  134. memset(journal.inmemory
  135. ? journal.buffer + journal.sector_info[journal.cur_sector].offset
  136. : journal.sector_buf + journal.block_size*journal.cur_sector, 0, journal.block_size);
  137. }
  138. journal_entry *je = (struct journal_entry*)(
  139. (journal.inmemory
  140. ? journal.buffer + journal.sector_info[journal.cur_sector].offset
  141. : journal.sector_buf + journal.block_size*journal.cur_sector) + journal.in_sector_pos
  142. );
  143. journal.in_sector_pos += size;
  144. je->magic = JOURNAL_MAGIC;
  145. je->type = type;
  146. je->size = size;
  147. je->crc32_prev = journal.crc32_last;
  148. journal.sector_info[journal.cur_sector].dirty = true;
  149. return je;
  150. }
  151. void prepare_journal_sector_write(journal_t & journal, int cur_sector, io_uring_sqe *sqe, std::function<void(ring_data_t*)> cb)
  152. {
  153. journal.sector_info[cur_sector].dirty = false;
  154. journal.sector_info[cur_sector].written = true;
  155. journal.sector_info[cur_sector].usage_count++;
  156. ring_data_t *data = ((ring_data_t*)sqe->user_data);
  157. data->iov = (struct iovec){
  158. (journal.inmemory
  159. ? journal.buffer + journal.sector_info[cur_sector].offset
  160. : journal.sector_buf + journal.block_size*cur_sector),
  161. journal.block_size
  162. };
  163. data->callback = cb;
  164. my_uring_prep_writev(
  165. sqe, journal.fd, &data->iov, 1, journal.offset + journal.sector_info[cur_sector].offset
  166. );
  167. }
  168. journal_t::~journal_t()
  169. {
  170. if (sector_buf)
  171. free(sector_buf);
  172. if (sector_info)
  173. free(sector_info);
  174. if (buffer)
  175. free(buffer);
  176. sector_buf = NULL;
  177. sector_info = NULL;
  178. buffer = NULL;
  179. }
  180. uint64_t journal_t::get_trim_pos()
  181. {
  182. auto journal_used_it = used_sectors.lower_bound(used_start);
  183. #ifdef BLOCKSTORE_DEBUG
  184. printf(
  185. "Trimming journal (used_start=%08lx, next_free=%08lx, dirty_start=%08lx, new_start=%08lx, new_refcount=%ld)\n",
  186. used_start, next_free, dirty_start,
  187. journal_used_it == used_sectors.end() ? 0 : journal_used_it->first,
  188. journal_used_it == used_sectors.end() ? 0 : journal_used_it->second
  189. );
  190. #endif
  191. if (journal_used_it == used_sectors.end())
  192. {
  193. // Journal is cleared to its end, restart from the beginning
  194. journal_used_it = used_sectors.begin();
  195. if (journal_used_it == used_sectors.end())
  196. {
  197. // Journal is empty
  198. return next_free;
  199. }
  200. else
  201. {
  202. // next_free does not need updating during trim
  203. return journal_used_it->first;
  204. }
  205. }
  206. else if (journal_used_it->first > used_start)
  207. {
  208. // Journal is cleared up to <journal_used_it>
  209. return journal_used_it->first;
  210. }
  211. // Can't trim journal
  212. return used_start;
  213. }