Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

576 lines
16 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 (see README.md for details)
  3. #define _LARGEFILE64_SOURCE
  4. #include <sys/types.h>
  5. #include <sys/ioctl.h>
  6. #include <sys/stat.h>
  7. #include <sys/time.h>
  8. #include <fcntl.h>
  9. #include <unistd.h>
  10. #include <stdint.h>
  11. #include <malloc.h>
  12. #include <linux/fs.h>
  13. #include <string.h>
  14. #include <errno.h>
  15. #include <assert.h>
  16. #include <stdio.h>
  17. #include <liburing.h>
  18. #include <math.h>
  19. #include <sys/socket.h>
  20. #include <sys/epoll.h>
  21. #include <netinet/in.h>
  22. #include <arpa/inet.h>
  23. #include <map>
  24. #include <vector>
  25. #include <deque>
  26. #include <algorithm>
  27. #include "blockstore.h"
  28. #include "blockstore_impl.h"
  29. #include "osd_peering_pg.cpp"
  30. //#include "cpp-btree/btree_map.h"
  31. static int setup_context(unsigned entries, struct io_uring *ring)
  32. {
  33. int ret = io_uring_queue_init(entries, ring, 0);
  34. if (ret < 0)
  35. {
  36. fprintf(stderr, "queue_init: %s\n", strerror(-ret));
  37. return -1;
  38. }
  39. return 0;
  40. }
  41. static void test_write(struct io_uring *ring, int fd)
  42. {
  43. struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
  44. assert(sqe);
  45. uint8_t *buf = (uint8_t*)memalign(512, 1024*1024*1024);
  46. struct iovec iov = { buf, 1024*1024*1024 };
  47. io_uring_prep_writev(sqe, fd, &iov, 1, 0);
  48. io_uring_sqe_set_data(sqe, buf);
  49. io_uring_submit_and_wait(ring, 1);
  50. struct io_uring_cqe *cqe;
  51. io_uring_peek_cqe(ring, &cqe);
  52. int ret = cqe->res;
  53. //int ret = writev(fd, &iov, 1);
  54. if (ret < 0)
  55. printf("cqe failed: %d %s\n", ret, strerror(-ret));
  56. else
  57. printf("result: %d user_data: %lld -> %lld\n", ret, sqe->user_data, cqe->user_data);
  58. io_uring_cqe_seen(ring, cqe);
  59. free(buf);
  60. }
  61. int main00(int argc, char *argv[])
  62. {
  63. // queue with random removal: vector is best :D
  64. // deque: 8.1s
  65. // vector: 6.6s
  66. // list: 9.3s
  67. for (int i = 0; i < 10000; i++)
  68. {
  69. std::list<int> q;
  70. for (int i = 0; i < 20480; i++)
  71. {
  72. for (auto it = q.begin(); it != q.end();)
  73. {
  74. if (rand() < RAND_MAX/2)
  75. {
  76. //q.erase(it); -> for deque and vector
  77. auto p = it++;
  78. q.erase(p);
  79. }
  80. else
  81. it++;
  82. }
  83. q.push_back(rand());
  84. }
  85. }
  86. return 0;
  87. }
  88. int main01(int argc, char *argv[])
  89. {
  90. // deque: 2.091s
  91. // vector: 18.733s
  92. // list: 5.216s
  93. // good, at least in this test deque is fine
  94. for (int i = 0; i < 10000; i++)
  95. {
  96. std::deque<int> q;
  97. for (int i = 0; i < 20480; i++)
  98. {
  99. int r = rand();
  100. if (r < RAND_MAX/4 && q.size() > 0)
  101. q.pop_front();
  102. //q.erase(q.begin());
  103. else
  104. q.push_back(r);
  105. }
  106. }
  107. return 0;
  108. }
  109. int main_vec(int argc, char *argv[])
  110. {
  111. // vector: 16 elements -> 0.047s, 256 elements -> 1.622s, 1024 elements -> 16.087s, 2048 elements -> 55.8s
  112. for (int i = 0; i < 100000; i++)
  113. {
  114. std::vector<iovec> v;
  115. for (int i = 0; i < 2048; i++)
  116. {
  117. int r = rand();
  118. auto it = v.begin();
  119. for (; it != v.end(); it++)
  120. if (it->iov_len >= r)
  121. break;
  122. v.insert(it, (iovec){ .iov_base = 0, .iov_len = (size_t)r });
  123. }
  124. }
  125. return 0;
  126. }
  127. int main_map(int argc, char *argv[])
  128. {
  129. // map: 16 elements -> 0.105s, 256 elements -> 2.634s, 1024 elements -> 12.55s, 2048 elements -> 27.475s
  130. // conclustion: vector is better in fulfill_read
  131. for (int i = 0; i < 100000; i++)
  132. {
  133. std::map<int,iovec> v;
  134. for (int i = 0; i < 2048; i++)
  135. {
  136. int r = rand();
  137. v[r] = (iovec){ .iov_base = 0, .iov_len = (size_t)r };
  138. }
  139. }
  140. return 0;
  141. }
  142. int main0(int argc, char *argv[])
  143. {
  144. // std::map 5M entries monotone -> 2.115s, random -> 8.782s
  145. // btree_map 5M entries monotone -> 0.458s, random -> 5.429s
  146. // absl::btree_map 5M entries random -> 5.09s
  147. // sparse_hash_map 5M entries -> 2.193s, random -> 2.586s
  148. btree::btree_map<obj_ver_id, dirty_entry> dirty_db;
  149. //std::map<obj_ver_id, dirty_entry> dirty_db;
  150. //spp::sparse_hash_map<obj_ver_id, dirty_entry, obj_ver_hash> dirty_db;
  151. for (int i = 0; i < 5000000; i++)
  152. {
  153. dirty_db[(obj_ver_id){
  154. .oid = (object_id){
  155. .inode = (uint64_t)rand(),
  156. .stripe = (uint64_t)i,
  157. },
  158. .version = 1,
  159. }] = (dirty_entry){
  160. .state = BS_ST_SYNCED | BS_ST_BIG_WRITE,
  161. .flags = 0,
  162. .location = (uint64_t)i << 17,
  163. .offset = 0,
  164. .len = 1 << 17,
  165. };
  166. }
  167. return 0;
  168. }
  169. int main1(int argc, char *argv[])
  170. {
  171. std::vector<uint64_t> v1, v2;
  172. v1.reserve(10000);
  173. v2.reserve(10000);
  174. for (int i = 0; i < 10000; i++)
  175. {
  176. v1.push_back(i);
  177. v2.push_back(i);
  178. }
  179. for (int i = 0; i < 100000; i++)
  180. {
  181. // haha (core i5-2500 | i7-6800HQ)
  182. // vector 10000 items: 4.37/100000 | 3.66
  183. // vector 100000 items: 9.68/10000 | 0.95
  184. // deque 10000 items: 28.432/100000
  185. // list 10000 items: 320.695/100000
  186. std::vector<uint64_t> v3;
  187. v3.insert(v3.end(), v1.begin(), v1.end());
  188. v3.insert(v3.end(), v2.begin(), v2.end());
  189. }
  190. return 0;
  191. }
  192. int main02(int argc, char *argv[])
  193. {
  194. std::map<int, std::string> strs;
  195. strs.emplace(12, "str");
  196. auto it = strs.upper_bound(13);
  197. //printf("s = %d %s %d\n", it->first, it->second.c_str(), it == strs.begin());
  198. it--;
  199. printf("%d\n", it == strs.end());
  200. //printf("s = %d %s\n", it->first, it->second.c_str());
  201. struct io_uring ring;
  202. int fd = open("/dev/loop0", O_RDWR | O_DIRECT, 0644);
  203. if (fd < 0)
  204. {
  205. perror("open infile");
  206. return 1;
  207. }
  208. if (setup_context(32, &ring))
  209. return 1;
  210. test_write(&ring, fd);
  211. close(fd);
  212. io_uring_queue_exit(&ring);
  213. return 0;
  214. }
  215. int main03(int argc, char *argv[])
  216. {
  217. int listen_fd = socket(AF_INET, SOCK_STREAM, 0), enable = 1;
  218. assert(listen_fd >= 0);
  219. setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
  220. struct sockaddr_in bind_addr;
  221. assert(inet_pton(AF_INET, "0.0.0.0", &bind_addr.sin_addr) == 1);
  222. bind_addr.sin_family = AF_INET;
  223. bind_addr.sin_port = htons(13892);
  224. int r = bind(listen_fd, (sockaddr*)&bind_addr, sizeof(bind_addr));
  225. if (r)
  226. {
  227. perror("bind");
  228. return 1;
  229. }
  230. assert(listen(listen_fd, 128) == 0);
  231. struct sockaddr_in peer_addr;
  232. socklen_t peer_addr_size = sizeof(peer_addr);
  233. int peer_fd = accept(listen_fd, (sockaddr*)&peer_addr, &peer_addr_size);
  234. assert(peer_fd >= 0);
  235. //fcntl(peer_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
  236. struct io_uring ring;
  237. assert(setup_context(32, &ring) == 0);
  238. void *buf = memalign(512, 4096*1024);
  239. struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
  240. assert(sqe);
  241. struct iovec iov = { buf, 4096*1024 };
  242. struct msghdr msg = { 0 };
  243. msg.msg_iov = &iov;
  244. msg.msg_iovlen = 1;
  245. io_uring_prep_recvmsg(sqe, peer_fd, &msg, 0);
  246. io_uring_sqe_set_data(sqe, buf);
  247. io_uring_submit_and_wait(&ring, 1);
  248. struct io_uring_cqe *cqe;
  249. io_uring_peek_cqe(&ring, &cqe);
  250. int ret = cqe->res;
  251. printf("cqe result: %d\n", ret);
  252. // ok, io_uring's sendmsg always reads as much data as is available and finishes
  253. io_uring_cqe_seen(&ring, cqe);
  254. close(peer_fd);
  255. close(listen_fd);
  256. io_uring_queue_exit(&ring);
  257. return 0;
  258. }
  259. int main04(int argc, char *argv[])
  260. {
  261. /*spp::sparse_hash_set<obj_ver_id> osd1, osd2;
  262. // fill takes 18.9 s
  263. for (int i = 0; i < 1024*1024*8*2; i++)
  264. {
  265. obj_ver_id ovid = { { rand() % 500, rand() }, rand() };
  266. osd1.insert(ovid);
  267. osd2.insert(ovid);
  268. }
  269. for (int i = 0; i < 50000; i++)
  270. {
  271. obj_ver_id ovid = { { rand() % 500, rand() }, rand() };
  272. osd1.insert(ovid);
  273. ovid = { { rand() % 500, rand() }, rand() };
  274. osd2.insert(ovid);
  275. }
  276. // diff takes only 2.3 s
  277. spp::sparse_hash_set<obj_ver_id> osd1diff;
  278. for (obj_ver_id e: osd1)
  279. {
  280. auto it = osd2.find(e);
  281. if (it != osd2.end())
  282. osd2.erase(it);
  283. else
  284. osd1diff.insert(e);
  285. }*/
  286. // fill vector takes 2 s
  287. std::vector<obj_ver_role> to_sort;
  288. to_sort.resize(1024*1024*8*2*3);
  289. printf("Filling\n");
  290. for (int i = 0; i < 1024*1024*8*2*3; i++)
  291. {
  292. to_sort[i] = {
  293. .oid = (object_id){
  294. .inode = (uint64_t)(rand() % 500),
  295. .stripe = (uint64_t)rand(),
  296. },
  297. .version = (uint64_t)rand(),
  298. .osd_num = (uint64_t)(rand() % 16),
  299. };
  300. }
  301. printf("Sorting\n");
  302. // sorting the whole array takes 7 s
  303. // sorting in 3 parts... almost the same, 6 s
  304. std::sort(to_sort.begin(), to_sort.end());
  305. return 0;
  306. }
  307. uint64_t jumphash(uint64_t key, int count)
  308. {
  309. uint64_t b = 0;
  310. uint64_t seed = key;
  311. for (int j = 1; j < count; j++)
  312. {
  313. seed = 2862933555777941757ull*seed + 3037000493ull; // LCPRNG
  314. if (seed < (UINT64_MAX / (j+1)))
  315. {
  316. b = j;
  317. }
  318. }
  319. return b;
  320. }
  321. void jumphash_prepare(int count, uint64_t *out_weights, uint64_t *in_weights)
  322. {
  323. if (count <= 0)
  324. {
  325. return;
  326. }
  327. uint64_t total_weight = in_weights[0];
  328. out_weights[0] = UINT64_MAX;
  329. for (int j = 1; j < count; j++)
  330. {
  331. total_weight += in_weights[j];
  332. out_weights[j] = UINT64_MAX / total_weight * in_weights[j];
  333. }
  334. }
  335. uint64_t jumphash_weights(uint64_t key, int count, uint64_t *prepared_weights)
  336. {
  337. uint64_t b = 0;
  338. uint64_t seed = key;
  339. for (int j = 1; j < count; j++)
  340. {
  341. seed = 2862933555777941757ull*seed + 3037000493ull; // LCPRNG
  342. if (seed < prepared_weights[j])
  343. {
  344. b = j;
  345. }
  346. }
  347. return b;
  348. }
  349. void jumphash3(uint64_t key, int count, uint64_t *weights, uint64_t *r)
  350. {
  351. r[0] = 0;
  352. r[1] = 1;
  353. r[2] = 2;
  354. uint64_t total_weight = weights[0]+weights[1]+weights[2];
  355. uint64_t seed = key;
  356. for (int j = 3; j < count; j++)
  357. {
  358. seed = 2862933555777941757ull*seed + 3037000493ull; // LCPRNG
  359. total_weight += weights[j];
  360. if (seed < UINT64_MAX*1.0*weights[j]/total_weight)
  361. r[0] = j;
  362. else
  363. {
  364. seed = 2862933555777941757ull*seed + 3037000493ull; // LCPRNG
  365. if (seed < UINT64_MAX*1.0*weights[j]/total_weight)
  366. r[1] = j;
  367. else
  368. {
  369. seed = 2862933555777941757ull*seed + 3037000493ull; // LCPRNG
  370. if (seed < UINT64_MAX*1.0*weights[j]/total_weight)
  371. r[2] = j;
  372. }
  373. }
  374. }
  375. }
  376. uint64_t crush(uint64_t key, int count, uint64_t *weights)
  377. {
  378. uint64_t b = 0;
  379. uint64_t seed = 0;
  380. uint64_t max = 0;
  381. for (int j = 0; j < count; j++)
  382. {
  383. seed = (key + 0xc6a4a7935bd1e995 + (seed << 6) + (seed >> 2));
  384. seed ^= (j + 0xc6a4a7935bd1e995 + (seed << 6) + (seed >> 2));
  385. seed = 2862933555777941757ull*seed + 3037000493ull; // LCPRNG
  386. seed = -log(((double)seed) / (1ul << 32) / (1ul << 32)) * weights[j];
  387. if (seed > max)
  388. {
  389. max = seed;
  390. b = j;
  391. }
  392. }
  393. return b;
  394. }
  395. void crush3(uint64_t key, int count, uint64_t *weights, uint64_t *r, uint64_t total_weight)
  396. {
  397. uint64_t seed = 0;
  398. uint64_t max = 0;
  399. for (int k1 = 0; k1 < count; k1++)
  400. {
  401. for (int k2 = k1+1; k2 < count; k2++)
  402. {
  403. if (k2 == k1)
  404. {
  405. continue;
  406. }
  407. for (int k3 = k2+1; k3 < count; k3++)
  408. {
  409. if (k3 == k1 || k3 == k2)
  410. {
  411. continue;
  412. }
  413. seed = (key + 0xc6a4a7935bd1e995 + (seed << 6) + (seed >> 2));
  414. seed ^= (k1 + 0xc6a4a7935bd1e995 + (seed << 6) + (seed >> 2));
  415. seed ^= (k2 + 0xc6a4a7935bd1e995 + (seed << 6) + (seed >> 2));
  416. seed ^= (k3 + 0xc6a4a7935bd1e995 + (seed << 6) + (seed >> 2));
  417. seed = 2862933555777941757ull*seed + 3037000493ull; // LCPRNG
  418. //seed = ((double)seed) / (1ul << 32) / (1ul << 32) * (weights[k1] + weights[k2] + weights[k3]);
  419. seed = ((double)seed) / (1ul << 32) / (1ul << 32) * (1 -
  420. (1 - 1.0*weights[k1]/total_weight)*
  421. (1 - 1.0*weights[k2]/total_weight)*
  422. (1 - 1.0*weights[k3]/total_weight)
  423. ) * UINT64_MAX;
  424. if (seed > max)
  425. {
  426. r[0] = k1;
  427. r[1] = k2;
  428. r[2] = k3;
  429. max = seed;
  430. }
  431. }
  432. }
  433. }
  434. }
  435. int main(int argc, char *argv[])
  436. {
  437. int host_count = 6;
  438. uint64_t host_weights[] = {
  439. 34609*3,
  440. 34931*3,
  441. 35850+36387+35859,
  442. 36387,
  443. 36387*2,
  444. 36387,
  445. };
  446. /*int osd_count[] = { 3, 3, 3, 1, 2 };
  447. uint64_t osd_weights[][3] = {
  448. { 34609, 34609, 34609 },
  449. { 34931, 34931, 34931 },
  450. { 35850, 36387, 35859 },
  451. { 36387 },
  452. { 36387, 36387 },
  453. };*/
  454. uint64_t total_weight = 0;
  455. for (int i = 0; i < host_count; i++)
  456. {
  457. total_weight += host_weights[i];
  458. }
  459. uint64_t host_weights_prepared[host_count];
  460. jumphash_prepare(host_count, host_weights_prepared, host_weights);
  461. uint64_t total_pgs[host_count] = { 0 };
  462. int pg_count = 256;
  463. double uniformity[pg_count] = { 0 };
  464. for (uint64_t pg = 1; pg <= pg_count; pg++)
  465. {
  466. uint64_t r[3];
  467. /*
  468. // Select first host
  469. //r[0] = jumphash_weights(pg, host_count, host_weights_prepared);
  470. r[0] = crush(pg, host_count, host_weights);
  471. // Select second host
  472. uint64_t seed = pg;
  473. r[1] = r[0];
  474. while (r[1] == r[0])
  475. {
  476. seed = 2862933555777941757ull*seed + 3037000493ull; // LCPRNG
  477. //r[1] = jumphash_weights(seed, host_count, host_weights_prepared);
  478. r[1] = crush(seed, host_count, host_weights);
  479. }
  480. // Select third host
  481. seed = pg;
  482. r[2] = r[0];
  483. while (r[2] == r[0] || r[2] == r[1])
  484. {
  485. seed = 2862933555777941757ull*seed + 3037000493ull; // LCPRNG
  486. //r[2] = jumphash_weights(seed, host_count, host_weights_prepared);
  487. r[2] = crush(seed, host_count, host_weights);
  488. }
  489. */
  490. /*
  491. // Select second host
  492. uint64_t host_weights1[host_count];
  493. for (int i = 0; i < r[0]; i++)
  494. host_weights1[i] = host_weights[i];
  495. for (int i = r[0]+1; i < host_count; i++)
  496. host_weights1[i-1] = host_weights[i];
  497. r[1] = crush(pg, host_count-1, host_weights1);
  498. // Select third host
  499. for (int i = r[1]+1; i < host_count-1; i++)
  500. host_weights1[i-1] = host_weights[i];
  501. r[2] = crush(pg, host_count-2, host_weights1);
  502. // Transform numbers
  503. r[2] = r[2] >= r[1] ? 1+r[2] : r[2];
  504. r[2] = r[2] >= r[0] ? 1+r[2] : r[2];
  505. r[1] = r[1] >= r[0] ? 1+r[1] : r[1];
  506. */
  507. crush3(pg, host_count, host_weights, r, total_weight);
  508. uint64_t shift = (2862933555777941757ull*pg + 3037000493ull) % host_count;
  509. if (shift == 1)
  510. {
  511. uint64_t tmp;
  512. tmp = r[0];
  513. r[0] = r[1];
  514. r[1] = r[2];
  515. r[2] = tmp;
  516. }
  517. else if (shift == 2)
  518. {
  519. uint64_t tmp;
  520. tmp = r[0];
  521. r[0] = r[2];
  522. r[2] = r[1];
  523. r[1] = tmp;
  524. }
  525. total_pgs[r[0]]++;
  526. total_pgs[r[1]]++;
  527. total_pgs[r[2]]++;
  528. double u = 0;
  529. for (int i = 0; i < host_count; i++)
  530. {
  531. double d = abs(1 - total_pgs[i]/3.0/pg * total_weight/host_weights[i]);
  532. u += d;
  533. }
  534. uniformity[pg-1] = u/host_count;
  535. printf("pg %lu: hosts %lu, %lu, %lu ; avg deviation = %.2f\n", pg, r[0], r[1], r[2], u/host_count);
  536. }
  537. printf("total PGs: ");
  538. for (int i = 0; i < host_count; i++)
  539. {
  540. printf(i > 0 ? ", %lu (%.2f)" : "%lu (%.2f)", total_pgs[i], total_pgs[i]/3.0/pg_count * total_weight/host_weights[i]);
  541. }
  542. printf("\n");
  543. return 0;
  544. }