Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

221 lines
7.8 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 (see README.md for details)
  3. #include "blockstore_impl.h"
  4. blockstore_journal_check_t::blockstore_journal_check_t(blockstore_impl_t *bs)
  5. {
  6. this->bs = bs;
  7. sectors_to_write = 0;
  8. next_pos = bs->journal.next_free;
  9. next_sector = bs->journal.cur_sector;
  10. first_sector = -1;
  11. next_in_pos = bs->journal.in_sector_pos;
  12. right_dir = next_pos >= bs->journal.used_start;
  13. }
  14. // Check if we can write <required> entries of <size> bytes and <data_after> data bytes after them to the journal
  15. int blockstore_journal_check_t::check_available(blockstore_op_t *op, int entries_required, int size, int data_after)
  16. {
  17. int required = entries_required;
  18. while (1)
  19. {
  20. int fits = bs->journal.no_same_sector_overwrites && next_pos == bs->journal.next_free && bs->journal.sector_info[next_sector].written
  21. ? 0
  22. : (bs->journal.block_size - next_in_pos) / size;
  23. if (fits > 0)
  24. {
  25. if (fits > required)
  26. {
  27. fits = required;
  28. }
  29. if (first_sector == -1)
  30. {
  31. first_sector = next_sector;
  32. }
  33. required -= fits;
  34. next_in_pos += fits * size;
  35. sectors_to_write++;
  36. }
  37. else if (bs->journal.sector_info[next_sector].dirty)
  38. {
  39. sectors_to_write++;
  40. }
  41. if (required <= 0)
  42. {
  43. break;
  44. }
  45. next_pos = next_pos + bs->journal.block_size;
  46. if (next_pos >= bs->journal.len)
  47. {
  48. next_pos = bs->journal.block_size;
  49. right_dir = false;
  50. }
  51. next_in_pos = 0;
  52. next_sector = ((next_sector + 1) % bs->journal.sector_count);
  53. if (next_sector == first_sector)
  54. {
  55. // next_sector may wrap when all sectors are flushed and the incoming batch is too big
  56. // This is an error condition, we can't wait for anything in this case
  57. throw std::runtime_error(
  58. "Blockstore journal_sector_buffer_count="+std::to_string(bs->journal.sector_count)+
  59. " is too small for a batch of "+std::to_string(entries_required)+" entries of "+std::to_string(size)+" bytes"
  60. );
  61. }
  62. if (bs->journal.sector_info[next_sector].flush_count > 0 ||
  63. bs->journal.sector_info[next_sector].dirty)
  64. {
  65. // No memory buffer available. Wait for it.
  66. int used = 0, dirty = 0;
  67. for (int i = 0; i < bs->journal.sector_count; i++)
  68. {
  69. if (bs->journal.sector_info[i].dirty)
  70. {
  71. dirty++;
  72. used++;
  73. }
  74. if (bs->journal.sector_info[i].flush_count > 0)
  75. {
  76. used++;
  77. }
  78. }
  79. // In fact, it's even more rare than "ran out of journal space", so print a warning
  80. printf(
  81. "Ran out of journal sector buffers: %d/%lu buffers used (%d dirty), next buffer (%ld)"
  82. " is %s and flushed %lu times. Consider increasing \'journal_sector_buffer_count\'\n",
  83. used, bs->journal.sector_count, dirty, next_sector,
  84. bs->journal.sector_info[next_sector].dirty ? "dirty" : "not dirty",
  85. bs->journal.sector_info[next_sector].flush_count
  86. );
  87. PRIV(op)->wait_for = WAIT_JOURNAL_BUFFER;
  88. return 0;
  89. }
  90. }
  91. if (data_after > 0)
  92. {
  93. next_pos = next_pos + data_after;
  94. if (next_pos > bs->journal.len)
  95. {
  96. next_pos = bs->journal.block_size + data_after;
  97. right_dir = false;
  98. }
  99. }
  100. if (!right_dir && next_pos >= bs->journal.used_start-bs->journal.block_size)
  101. {
  102. // No space in the journal. Wait until used_start changes.
  103. printf(
  104. "Ran out of journal space (used_start=%08lx, next_free=%08lx, dirty_start=%08lx)\n",
  105. bs->journal.used_start, bs->journal.next_free, bs->journal.dirty_start
  106. );
  107. PRIV(op)->wait_for = WAIT_JOURNAL;
  108. bs->flusher->request_trim();
  109. PRIV(op)->wait_detail = bs->journal.used_start;
  110. return 0;
  111. }
  112. return 1;
  113. }
  114. journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, uint32_t size)
  115. {
  116. if (!journal.entry_fits(size))
  117. {
  118. assert(!journal.sector_info[journal.cur_sector].dirty);
  119. // Move to the next journal sector
  120. if (journal.sector_info[journal.cur_sector].flush_count > 0)
  121. {
  122. // Also select next sector buffer in memory
  123. journal.cur_sector = ((journal.cur_sector + 1) % journal.sector_count);
  124. assert(!journal.sector_info[journal.cur_sector].flush_count);
  125. }
  126. else
  127. {
  128. journal.dirty_start = journal.next_free;
  129. }
  130. journal.sector_info[journal.cur_sector].written = false;
  131. journal.sector_info[journal.cur_sector].offset = journal.next_free;
  132. journal.in_sector_pos = 0;
  133. journal.next_free = (journal.next_free+journal.block_size) < journal.len ? journal.next_free + journal.block_size : journal.block_size;
  134. memset(journal.inmemory
  135. ? journal.buffer + journal.sector_info[journal.cur_sector].offset
  136. : journal.sector_buf + journal.block_size*journal.cur_sector, 0, journal.block_size);
  137. }
  138. journal_entry *je = (struct journal_entry*)(
  139. (journal.inmemory
  140. ? journal.buffer + journal.sector_info[journal.cur_sector].offset
  141. : journal.sector_buf + journal.block_size*journal.cur_sector) + journal.in_sector_pos
  142. );
  143. journal.in_sector_pos += size;
  144. je->magic = JOURNAL_MAGIC;
  145. je->type = type;
  146. je->size = size;
  147. je->crc32_prev = journal.crc32_last;
  148. journal.sector_info[journal.cur_sector].dirty = true;
  149. return je;
  150. }
  151. void prepare_journal_sector_write(journal_t & journal, int cur_sector, io_uring_sqe *sqe, std::function<void(ring_data_t*)> cb)
  152. {
  153. journal.sector_info[cur_sector].dirty = false;
  154. journal.sector_info[cur_sector].written = true;
  155. journal.sector_info[cur_sector].flush_count++;
  156. ring_data_t *data = ((ring_data_t*)sqe->user_data);
  157. data->iov = (struct iovec){
  158. (journal.inmemory
  159. ? journal.buffer + journal.sector_info[cur_sector].offset
  160. : journal.sector_buf + journal.block_size*cur_sector),
  161. journal.block_size
  162. };
  163. data->callback = cb;
  164. my_uring_prep_writev(
  165. sqe, journal.fd, &data->iov, 1, journal.offset + journal.sector_info[cur_sector].offset
  166. );
  167. }
  168. journal_t::~journal_t()
  169. {
  170. if (sector_buf)
  171. free(sector_buf);
  172. if (sector_info)
  173. free(sector_info);
  174. if (buffer)
  175. free(buffer);
  176. sector_buf = NULL;
  177. sector_info = NULL;
  178. buffer = NULL;
  179. }
  180. uint64_t journal_t::get_trim_pos()
  181. {
  182. auto journal_used_it = used_sectors.lower_bound(used_start);
  183. #ifdef BLOCKSTORE_DEBUG
  184. printf(
  185. "Trimming journal (used_start=%08lx, next_free=%08lx, dirty_start=%08lx, new_start=%08lx, new_refcount=%ld)\n",
  186. used_start, next_free, dirty_start,
  187. journal_used_it == used_sectors.end() ? 0 : journal_used_it->first,
  188. journal_used_it == used_sectors.end() ? 0 : journal_used_it->second
  189. );
  190. #endif
  191. if (journal_used_it == used_sectors.end())
  192. {
  193. // Journal is cleared to its end, restart from the beginning
  194. journal_used_it = used_sectors.begin();
  195. if (journal_used_it == used_sectors.end())
  196. {
  197. // Journal is empty
  198. return next_free;
  199. }
  200. else
  201. {
  202. // next_free does not need updating during trim
  203. return journal_used_it->first;
  204. }
  205. }
  206. else if (journal_used_it->first > used_start)
  207. {
  208. // Journal is cleared up to <journal_used_it>
  209. return journal_used_it->first;
  210. }
  211. // Can't trim journal
  212. return used_start;
  213. }