Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

390 lines
12 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 (see README.md for details)
  3. #include <sys/types.h>
  4. #include <sys/socket.h>
  5. #include <netinet/in.h>
  6. #include <netinet/tcp.h>
  7. #include <arpa/inet.h>
  8. #include <string.h>
  9. #include <stdio.h>
  10. #include <unistd.h>
  11. #include <fcntl.h>
  12. #include <errno.h>
  13. #include <stdlib.h>
  14. #include <malloc.h>
  15. #include <stdexcept>
  16. #include "osd_ops.h"
  17. #include "rw_blocking.h"
  18. #include "test_pattern.h"
  19. int connect_osd(const char *osd_address, int osd_port);
  20. uint64_t test_read(int connect_fd, uint64_t inode, uint64_t stripe, uint64_t version, uint64_t offset, uint64_t len);
  21. uint64_t test_write(int connect_fd, uint64_t inode, uint64_t stripe, uint64_t version, uint64_t pattern);
  22. void* test_primary_read(int connect_fd, uint64_t inode, uint64_t offset, uint64_t len);
  23. void test_primary_write(int connect_fd, uint64_t inode, uint64_t offset, uint64_t len, uint64_t pattern);
  24. void test_primary_sync(int connect_fd);
  25. void test_sync_stab_all(int connect_fd);
  26. void test_list_stab(int connect_fd);
  27. int main0(int narg, char *args[])
  28. {
  29. int connect_fd;
  30. // Prepare data for cluster read
  31. connect_fd = connect_osd("127.0.0.1", 11203);
  32. test_write(connect_fd, 2, 0, 1, PATTERN0);
  33. close(connect_fd);
  34. connect_fd = connect_osd("127.0.0.1", 11204);
  35. test_write(connect_fd, 2, 1, 1, PATTERN1);
  36. close(connect_fd);
  37. connect_fd = connect_osd("127.0.0.1", 11205);
  38. test_write(connect_fd, 2, 2, 1, PATTERN0^PATTERN1);
  39. close(connect_fd);
  40. return 0;
  41. }
  42. int main1(int narg, char *args[])
  43. {
  44. int connect_fd;
  45. void *data;
  46. // Cluster read
  47. connect_fd = connect_osd("127.0.0.1", 11203);
  48. data = test_primary_read(connect_fd, 2, 0, 128*1024);
  49. if (data)
  50. {
  51. check_pattern(data, 128*1024, PATTERN0);
  52. printf("inode=2 0-128K OK\n");
  53. free(data);
  54. }
  55. data = test_primary_read(connect_fd, 2, 0, 256*1024);
  56. if (data)
  57. {
  58. check_pattern(data, 128*1024, PATTERN0);
  59. check_pattern(data+128*1024, 128*1024, PATTERN1);
  60. printf("inode=2 0-256K OK\n");
  61. free(data);
  62. }
  63. close(connect_fd);
  64. return 0;
  65. }
  66. int main2(int narg, char *args[])
  67. {
  68. int connect_fd;
  69. // Cluster write (sync not implemented yet)
  70. connect_fd = connect_osd("127.0.0.1", 11203);
  71. test_primary_write(connect_fd, 2, 0, 128*1024, PATTERN0);
  72. test_primary_write(connect_fd, 2, 128*1024, 128*1024, PATTERN1);
  73. test_sync_stab_all(connect_fd);
  74. close(connect_fd);
  75. connect_fd = connect_osd("127.0.0.1", 11204);
  76. if (connect_fd >= 0)
  77. {
  78. test_sync_stab_all(connect_fd);
  79. close(connect_fd);
  80. }
  81. connect_fd = connect_osd("127.0.0.1", 11205);
  82. if (connect_fd >= 0)
  83. {
  84. test_sync_stab_all(connect_fd);
  85. close(connect_fd);
  86. }
  87. return 0;
  88. }
  89. int main3(int narg, char *args[])
  90. {
  91. int connect_fd;
  92. connect_fd = connect_osd("127.0.0.1", 11203);
  93. test_list_stab(connect_fd);
  94. close(connect_fd);
  95. return 0;
  96. }
  97. int main4(int narg, char *args[])
  98. {
  99. int connect_fd;
  100. // Cluster write (sync not implemented yet)
  101. connect_fd = connect_osd("127.0.0.1", 11203);
  102. test_primary_write(connect_fd, 2, 0, 128*1024, PATTERN0);
  103. test_primary_write(connect_fd, 2, 128*1024, 128*1024, PATTERN1);
  104. test_primary_sync(connect_fd);
  105. close(connect_fd);
  106. return 0;
  107. }
  108. int main(int narg, char *args[])
  109. {
  110. int connect_fd;
  111. connect_fd = connect_osd("192.168.7.2", 43051);
  112. test_read(connect_fd, 1, 1039663104, UINT64_MAX, 0, 128*1024);
  113. close(connect_fd);
  114. return 0;
  115. }
  116. int connect_osd(const char *osd_address, int osd_port)
  117. {
  118. struct sockaddr_in addr;
  119. int r;
  120. if ((r = inet_pton(AF_INET, osd_address, &addr.sin_addr)) != 1)
  121. {
  122. fprintf(stderr, "server address: %s%s\n", osd_address, r == 0 ? " is not valid" : ": no ipv4 support");
  123. return -1;
  124. }
  125. addr.sin_family = AF_INET;
  126. addr.sin_port = htons(osd_port);
  127. int connect_fd = socket(AF_INET, SOCK_STREAM, 0);
  128. if (connect_fd < 0)
  129. {
  130. perror("socket");
  131. return -1;
  132. }
  133. if (connect(connect_fd, (sockaddr*)&addr, sizeof(addr)) < 0)
  134. {
  135. perror("connect");
  136. return -1;
  137. }
  138. int one = 1;
  139. setsockopt(connect_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
  140. return connect_fd;
  141. }
  142. bool check_reply(int r, osd_any_op_t & op, osd_any_reply_t & reply, int expected)
  143. {
  144. if (r != OSD_PACKET_SIZE)
  145. {
  146. printf("read failed\n");
  147. return false;
  148. }
  149. if (reply.hdr.magic != SECONDARY_OSD_REPLY_MAGIC ||
  150. reply.hdr.id != op.hdr.id || reply.hdr.opcode != op.hdr.opcode)
  151. {
  152. printf("bad reply: magic, id or opcode does not match request\n");
  153. return false;
  154. }
  155. if (expected >= 0 && reply.hdr.retval != expected)
  156. {
  157. printf("operation failed, retval=%ld\n", reply.hdr.retval);
  158. return false;
  159. }
  160. return true;
  161. }
  162. uint64_t test_read(int connect_fd, uint64_t inode, uint64_t stripe, uint64_t version, uint64_t offset, uint64_t len)
  163. {
  164. osd_any_op_t op;
  165. osd_any_reply_t reply;
  166. op.hdr.magic = SECONDARY_OSD_OP_MAGIC;
  167. op.hdr.id = 1;
  168. op.hdr.opcode = OSD_OP_SEC_READ;
  169. op.sec_rw.oid = {
  170. .inode = inode,
  171. .stripe = stripe,
  172. };
  173. op.sec_rw.version = version;
  174. op.sec_rw.offset = offset;
  175. op.sec_rw.len = len;
  176. void *data = memalign(MEM_ALIGNMENT, op.sec_rw.len);
  177. write_blocking(connect_fd, op.buf, OSD_PACKET_SIZE);
  178. int r = read_blocking(connect_fd, reply.buf, OSD_PACKET_SIZE);
  179. if (!check_reply(r, op, reply, op.sec_rw.len))
  180. {
  181. free(data);
  182. return 0;
  183. }
  184. r = read_blocking(connect_fd, data, len);
  185. if (r != len)
  186. {
  187. free(data);
  188. perror("read data");
  189. return 0;
  190. }
  191. free(data);
  192. printf("Read %lx:%lx v%lu = v%lu\n", inode, stripe, version, reply.sec_rw.version);
  193. op.hdr.opcode = OSD_OP_SEC_LIST;
  194. op.sec_list.list_pg = 1;
  195. op.sec_list.pg_count = 1;
  196. op.sec_list.pg_stripe_size = 4*1024*1024;
  197. write_blocking(connect_fd, op.buf, OSD_PACKET_SIZE);
  198. r = read_blocking(connect_fd, reply.buf, OSD_PACKET_SIZE);
  199. if (reply.hdr.retval < 0 || !check_reply(r, op, reply, reply.hdr.retval))
  200. {
  201. return 0;
  202. }
  203. data = memalign(MEM_ALIGNMENT, sizeof(obj_ver_id)*reply.hdr.retval);
  204. r = read_blocking(connect_fd, data, sizeof(obj_ver_id)*reply.hdr.retval);
  205. if (r != sizeof(obj_ver_id)*reply.hdr.retval)
  206. {
  207. free(data);
  208. perror("read data");
  209. return 0;
  210. }
  211. obj_ver_id *ov = (obj_ver_id*)data;
  212. for (int i = 0; i < reply.hdr.retval; i++)
  213. {
  214. if (ov[i].oid.inode == inode && (ov[i].oid.stripe & ~(4096-1)) == (stripe & ~(4096-1)))
  215. {
  216. printf("list: %lx:%lx v%lu stable=%d\n", ov[i].oid.inode, ov[i].oid.stripe, ov[i].version, i < reply.sec_list.stable_count ? 1 : 0);
  217. }
  218. }
  219. return 0;
  220. }
  221. uint64_t test_write(int connect_fd, uint64_t inode, uint64_t stripe, uint64_t version, uint64_t pattern)
  222. {
  223. osd_any_op_t op;
  224. osd_any_reply_t reply;
  225. op.hdr.magic = SECONDARY_OSD_OP_MAGIC;
  226. op.hdr.id = 1;
  227. op.hdr.opcode = OSD_OP_SEC_WRITE;
  228. op.sec_rw.oid = {
  229. .inode = inode,
  230. .stripe = stripe,
  231. };
  232. op.sec_rw.version = version;
  233. op.sec_rw.offset = 0;
  234. op.sec_rw.len = 128*1024;
  235. void *data = memalign(MEM_ALIGNMENT, op.sec_rw.len);
  236. for (int i = 0; i < (op.sec_rw.len)/sizeof(uint64_t); i++)
  237. ((uint64_t*)data)[i] = pattern;
  238. write_blocking(connect_fd, op.buf, OSD_PACKET_SIZE);
  239. write_blocking(connect_fd, data, op.sec_rw.len);
  240. int r = read_blocking(connect_fd, reply.buf, OSD_PACKET_SIZE);
  241. if (!check_reply(r, op, reply, op.sec_rw.len))
  242. {
  243. free(data);
  244. return 0;
  245. }
  246. version = reply.sec_rw.version;
  247. op.hdr.opcode = OSD_OP_TEST_SYNC_STAB_ALL;
  248. op.hdr.id = 2;
  249. write_blocking(connect_fd, op.buf, OSD_PACKET_SIZE);
  250. r = read_blocking(connect_fd, reply.buf, OSD_PACKET_SIZE);
  251. if (!check_reply(r, op, reply, 0))
  252. {
  253. free(data);
  254. return 0;
  255. }
  256. free(data);
  257. return version;
  258. }
  259. void* test_primary_read(int connect_fd, uint64_t inode, uint64_t offset, uint64_t len)
  260. {
  261. osd_any_op_t op;
  262. osd_any_reply_t reply;
  263. op.hdr.magic = SECONDARY_OSD_OP_MAGIC;
  264. op.hdr.id = 1;
  265. op.hdr.opcode = OSD_OP_READ;
  266. op.rw.inode = inode;
  267. op.rw.offset = offset;
  268. op.rw.len = len;
  269. void *data = memalign(MEM_ALIGNMENT, len);
  270. write_blocking(connect_fd, op.buf, OSD_PACKET_SIZE);
  271. int r = read_blocking(connect_fd, reply.buf, OSD_PACKET_SIZE);
  272. if (!check_reply(r, op, reply, len))
  273. {
  274. free(data);
  275. return NULL;
  276. }
  277. r = read_blocking(connect_fd, data, len);
  278. if (r != len)
  279. {
  280. free(data);
  281. perror("read data");
  282. return NULL;
  283. }
  284. return data;
  285. }
  286. void test_primary_write(int connect_fd, uint64_t inode, uint64_t offset, uint64_t len, uint64_t pattern)
  287. {
  288. osd_any_op_t op;
  289. osd_any_reply_t reply;
  290. op.hdr.magic = SECONDARY_OSD_OP_MAGIC;
  291. op.hdr.id = 1;
  292. op.hdr.opcode = OSD_OP_WRITE;
  293. op.rw.inode = inode;
  294. op.rw.offset = offset;
  295. op.rw.len = len;
  296. void *data = memalign(MEM_ALIGNMENT, len);
  297. set_pattern(data, len, pattern);
  298. write_blocking(connect_fd, op.buf, OSD_PACKET_SIZE);
  299. write_blocking(connect_fd, data, len);
  300. free(data);
  301. int r = read_blocking(connect_fd, reply.buf, OSD_PACKET_SIZE);
  302. assert(check_reply(r, op, reply, len));
  303. }
  304. void test_primary_sync(int connect_fd)
  305. {
  306. osd_any_op_t op;
  307. osd_any_reply_t reply;
  308. op.hdr.magic = SECONDARY_OSD_OP_MAGIC;
  309. op.hdr.id = 1;
  310. op.hdr.opcode = OSD_OP_SYNC;
  311. write_blocking(connect_fd, op.buf, OSD_PACKET_SIZE);
  312. int r = read_blocking(connect_fd, reply.buf, OSD_PACKET_SIZE);
  313. assert(check_reply(r, op, reply, 0));
  314. }
  315. void test_sync_stab_all(int connect_fd)
  316. {
  317. osd_any_op_t op;
  318. osd_any_reply_t reply;
  319. op.hdr.magic = SECONDARY_OSD_OP_MAGIC;
  320. op.hdr.id = 1;
  321. op.hdr.opcode = OSD_OP_TEST_SYNC_STAB_ALL;
  322. write_blocking(connect_fd, op.buf, OSD_PACKET_SIZE);
  323. int r = read_blocking(connect_fd, reply.buf, OSD_PACKET_SIZE);
  324. assert(check_reply(r, op, reply, 0));
  325. }
  326. void test_list_stab(int connect_fd)
  327. {
  328. osd_any_op_t op;
  329. osd_any_reply_t reply;
  330. op.hdr.magic = SECONDARY_OSD_OP_MAGIC;
  331. op.hdr.id = 1;
  332. op.hdr.opcode = OSD_OP_SEC_LIST;
  333. op.sec_list.pg_count = 0;
  334. assert(write_blocking(connect_fd, op.buf, OSD_PACKET_SIZE) == OSD_PACKET_SIZE);
  335. int r = read_blocking(connect_fd, reply.buf, OSD_PACKET_SIZE);
  336. assert(check_reply(r, op, reply, -1));
  337. int total_count = reply.hdr.retval;
  338. int stable_count = reply.sec_list.stable_count;
  339. obj_ver_id *data = (obj_ver_id*)malloc(total_count * sizeof(obj_ver_id));
  340. assert(data);
  341. assert(read_blocking(connect_fd, data, total_count * sizeof(obj_ver_id)) == (total_count * sizeof(obj_ver_id)));
  342. int last_start = stable_count;
  343. for (int i = stable_count; i <= total_count; i++)
  344. {
  345. // Stabilize in portions of 32 entries
  346. if (i - last_start >= 32 || i == total_count)
  347. {
  348. op.hdr.opcode = OSD_OP_SEC_STABILIZE;
  349. op.sec_stab.len = sizeof(obj_ver_id) * (i - last_start);
  350. assert(write_blocking(connect_fd, op.buf, OSD_PACKET_SIZE) == OSD_PACKET_SIZE);
  351. assert(write_blocking(connect_fd, data + last_start, op.sec_stab.len) == op.sec_stab.len);
  352. r = read_blocking(connect_fd, reply.buf, OSD_PACKET_SIZE);
  353. assert(check_reply(r, op, reply, 0));
  354. last_start = i;
  355. }
  356. }
  357. obj_ver_id *data2 = (obj_ver_id*)malloc(sizeof(obj_ver_id) * 32);
  358. assert(data2);
  359. free(data2);
  360. free(data);
  361. }