Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

405 lines
14 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 (see README.md for details)
  3. #include <sys/socket.h>
  4. #include <sys/poll.h>
  5. #include <netinet/in.h>
  6. #include <netinet/tcp.h>
  7. #include <arpa/inet.h>
  8. #include "osd.h"
  9. osd_t::osd_t(blockstore_config_t & config, ring_loop_t *ringloop)
  10. {
  11. bs_block_size = strtoull(config["block_size"].c_str(), NULL, 10);
  12. bs_bitmap_granularity = strtoull(config["bitmap_granularity"].c_str(), NULL, 10);
  13. if (!bs_block_size)
  14. bs_block_size = DEFAULT_BLOCK_SIZE;
  15. if (!bs_bitmap_granularity)
  16. bs_bitmap_granularity = DEFAULT_BITMAP_GRANULARITY;
  17. // Force external bitmap size
  18. entry_attr_size = bs_block_size / bs_bitmap_granularity / 8;
  19. config["entry_attr_size"] = entry_attr_size;
  20. this->config = config;
  21. this->ringloop = ringloop;
  22. // FIXME: Create Blockstore from on-disk superblock config and check it against the OSD cluster config
  23. this->bs = new blockstore_t(config, ringloop);
  24. parse_config(config);
  25. epmgr = new epoll_manager_t(ringloop);
  26. this->tfd = epmgr->tfd;
  27. this->tfd->set_timer(print_stats_interval*1000, true, [this](int timer_id)
  28. {
  29. print_stats();
  30. });
  31. this->tfd->set_timer(slow_log_interval*1000, true, [this](int timer_id)
  32. {
  33. print_slow();
  34. });
  35. c_cli.tfd = this->tfd;
  36. c_cli.ringloop = this->ringloop;
  37. c_cli.exec_op = [this](osd_op_t *op) { exec_op(op); };
  38. c_cli.repeer_pgs = [this](osd_num_t peer_osd) { repeer_pgs(peer_osd); };
  39. init_cluster();
  40. consumer.loop = [this]() { loop(); };
  41. ringloop->register_consumer(&consumer);
  42. }
  43. osd_t::~osd_t()
  44. {
  45. ringloop->unregister_consumer(&consumer);
  46. delete epmgr;
  47. delete bs;
  48. close(listen_fd);
  49. }
  50. void osd_t::parse_config(blockstore_config_t & config)
  51. {
  52. if (config.find("log_level") == config.end())
  53. config["log_level"] = "1";
  54. // Initial startup configuration
  55. json11::Json json_config = json11::Json(config);
  56. st_cli.parse_config(json_config);
  57. etcd_report_interval = strtoull(config["etcd_report_interval"].c_str(), NULL, 10);
  58. if (etcd_report_interval <= 0)
  59. etcd_report_interval = 30;
  60. osd_num = strtoull(config["osd_num"].c_str(), NULL, 10);
  61. if (!osd_num)
  62. throw std::runtime_error("osd_num is required in the configuration");
  63. c_cli.osd_num = osd_num;
  64. run_primary = config["run_primary"] != "false" && config["run_primary"] != "0" && config["run_primary"] != "no";
  65. // Cluster configuration
  66. bind_address = config["bind_address"];
  67. if (bind_address == "")
  68. bind_address = "0.0.0.0";
  69. bind_port = stoull_full(config["bind_port"]);
  70. if (bind_port <= 0 || bind_port > 65535)
  71. bind_port = 0;
  72. if (config["immediate_commit"] == "all")
  73. immediate_commit = IMMEDIATE_ALL;
  74. else if (config["immediate_commit"] == "small")
  75. immediate_commit = IMMEDIATE_SMALL;
  76. if (config.find("autosync_interval") != config.end())
  77. {
  78. autosync_interval = strtoull(config["autosync_interval"].c_str(), NULL, 10);
  79. if (autosync_interval > MAX_AUTOSYNC_INTERVAL)
  80. autosync_interval = DEFAULT_AUTOSYNC_INTERVAL;
  81. }
  82. if (config.find("client_queue_depth") != config.end())
  83. {
  84. client_queue_depth = strtoull(config["client_queue_depth"].c_str(), NULL, 10);
  85. if (client_queue_depth < 128)
  86. client_queue_depth = 128;
  87. }
  88. recovery_queue_depth = strtoull(config["recovery_queue_depth"].c_str(), NULL, 10);
  89. if (recovery_queue_depth < 1 || recovery_queue_depth > MAX_RECOVERY_QUEUE)
  90. recovery_queue_depth = DEFAULT_RECOVERY_QUEUE;
  91. if (config["readonly"] == "true" || config["readonly"] == "1" || config["readonly"] == "yes")
  92. readonly = true;
  93. print_stats_interval = strtoull(config["print_stats_interval"].c_str(), NULL, 10);
  94. if (!print_stats_interval)
  95. print_stats_interval = 3;
  96. slow_log_interval = strtoull(config["slow_log_interval"].c_str(), NULL, 10);
  97. if (!slow_log_interval)
  98. slow_log_interval = 10;
  99. c_cli.peer_connect_interval = strtoull(config["peer_connect_interval"].c_str(), NULL, 10);
  100. if (!c_cli.peer_connect_interval)
  101. c_cli.peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL;
  102. c_cli.peer_connect_timeout = strtoull(config["peer_connect_timeout"].c_str(), NULL, 10);
  103. if (!c_cli.peer_connect_timeout)
  104. c_cli.peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT;
  105. log_level = strtoull(config["log_level"].c_str(), NULL, 10);
  106. c_cli.log_level = log_level;
  107. }
  108. void osd_t::bind_socket()
  109. {
  110. listen_fd = socket(AF_INET, SOCK_STREAM, 0);
  111. if (listen_fd < 0)
  112. {
  113. throw std::runtime_error(std::string("socket: ") + strerror(errno));
  114. }
  115. int enable = 1;
  116. setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
  117. sockaddr_in addr;
  118. int r;
  119. if ((r = inet_pton(AF_INET, bind_address.c_str(), &addr.sin_addr)) != 1)
  120. {
  121. close(listen_fd);
  122. throw std::runtime_error("bind address "+bind_address+(r == 0 ? " is not valid" : ": no ipv4 support"));
  123. }
  124. addr.sin_family = AF_INET;
  125. addr.sin_port = htons(bind_port);
  126. if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0)
  127. {
  128. close(listen_fd);
  129. throw std::runtime_error(std::string("bind: ") + strerror(errno));
  130. }
  131. if (bind_port == 0)
  132. {
  133. socklen_t len = sizeof(addr);
  134. if (getsockname(listen_fd, (sockaddr *)&addr, &len) == -1)
  135. {
  136. close(listen_fd);
  137. throw std::runtime_error(std::string("getsockname: ") + strerror(errno));
  138. }
  139. listening_port = ntohs(addr.sin_port);
  140. }
  141. else
  142. {
  143. listening_port = bind_port;
  144. }
  145. if (listen(listen_fd, listen_backlog) < 0)
  146. {
  147. close(listen_fd);
  148. throw std::runtime_error(std::string("listen: ") + strerror(errno));
  149. }
  150. fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
  151. epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
  152. {
  153. c_cli.accept_connections(listen_fd);
  154. });
  155. }
  156. bool osd_t::shutdown()
  157. {
  158. stopping = true;
  159. if (inflight_ops > 0)
  160. {
  161. return false;
  162. }
  163. return !bs || bs->is_safe_to_stop();
  164. }
  165. void osd_t::loop()
  166. {
  167. handle_peers();
  168. c_cli.read_requests();
  169. c_cli.send_replies();
  170. ringloop->submit();
  171. }
  172. void osd_t::exec_op(osd_op_t *cur_op)
  173. {
  174. clock_gettime(CLOCK_REALTIME, &cur_op->tv_begin);
  175. if (stopping)
  176. {
  177. // Throw operation away
  178. delete cur_op;
  179. return;
  180. }
  181. inflight_ops++;
  182. if (cur_op->req.hdr.magic != SECONDARY_OSD_OP_MAGIC ||
  183. cur_op->req.hdr.opcode < OSD_OP_MIN || cur_op->req.hdr.opcode > OSD_OP_MAX ||
  184. ((cur_op->req.hdr.opcode == OSD_OP_SEC_READ ||
  185. cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
  186. cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE) &&
  187. (cur_op->req.sec_rw.len > OSD_RW_MAX ||
  188. cur_op->req.sec_rw.len % bs_bitmap_granularity ||
  189. cur_op->req.sec_rw.offset % bs_bitmap_granularity)) ||
  190. ((cur_op->req.hdr.opcode == OSD_OP_READ ||
  191. cur_op->req.hdr.opcode == OSD_OP_WRITE ||
  192. cur_op->req.hdr.opcode == OSD_OP_DELETE) &&
  193. (cur_op->req.rw.len > OSD_RW_MAX ||
  194. cur_op->req.rw.len % bs_bitmap_granularity ||
  195. cur_op->req.rw.offset % bs_bitmap_granularity)))
  196. {
  197. // Bad command
  198. finish_op(cur_op, -EINVAL);
  199. return;
  200. }
  201. if (readonly &&
  202. cur_op->req.hdr.opcode != OSD_OP_SEC_READ &&
  203. cur_op->req.hdr.opcode != OSD_OP_SEC_LIST &&
  204. cur_op->req.hdr.opcode != OSD_OP_READ &&
  205. cur_op->req.hdr.opcode != OSD_OP_SHOW_CONFIG)
  206. {
  207. // Readonly mode
  208. finish_op(cur_op, -EROFS);
  209. return;
  210. }
  211. if (cur_op->req.hdr.opcode == OSD_OP_TEST_SYNC_STAB_ALL)
  212. {
  213. exec_sync_stab_all(cur_op);
  214. }
  215. else if (cur_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG)
  216. {
  217. exec_show_config(cur_op);
  218. }
  219. else if (cur_op->req.hdr.opcode == OSD_OP_READ)
  220. {
  221. continue_primary_read(cur_op);
  222. }
  223. else if (cur_op->req.hdr.opcode == OSD_OP_WRITE)
  224. {
  225. continue_primary_write(cur_op);
  226. }
  227. else if (cur_op->req.hdr.opcode == OSD_OP_SYNC)
  228. {
  229. continue_primary_sync(cur_op);
  230. }
  231. else if (cur_op->req.hdr.opcode == OSD_OP_DELETE)
  232. {
  233. continue_primary_del(cur_op);
  234. }
  235. else
  236. {
  237. exec_secondary(cur_op);
  238. }
  239. }
  240. void osd_t::reset_stats()
  241. {
  242. c_cli.stats = { 0 };
  243. prev_stats = { 0 };
  244. memset(recovery_stat_count, 0, sizeof(recovery_stat_count));
  245. memset(recovery_stat_bytes, 0, sizeof(recovery_stat_bytes));
  246. }
  247. void osd_t::print_stats()
  248. {
  249. for (int i = 0; i <= OSD_OP_MAX; i++)
  250. {
  251. if (c_cli.stats.op_stat_count[i] != prev_stats.op_stat_count[i])
  252. {
  253. uint64_t avg = (c_cli.stats.op_stat_sum[i] - prev_stats.op_stat_sum[i])/(c_cli.stats.op_stat_count[i] - prev_stats.op_stat_count[i]);
  254. uint64_t bw = (c_cli.stats.op_stat_bytes[i] - prev_stats.op_stat_bytes[i]) / print_stats_interval;
  255. if (c_cli.stats.op_stat_bytes[i] != 0)
  256. {
  257. printf(
  258. "[OSD %lu] avg latency for op %d (%s): %lu us, B/W: %.2f %s\n", osd_num, i, osd_op_names[i], avg,
  259. (bw > 1024*1024*1024 ? bw/1024.0/1024/1024 : (bw > 1024*1024 ? bw/1024.0/1024 : bw/1024.0)),
  260. (bw > 1024*1024*1024 ? "GB/s" : (bw > 1024*1024 ? "MB/s" : "KB/s"))
  261. );
  262. }
  263. else
  264. {
  265. printf("[OSD %lu] avg latency for op %d (%s): %lu us\n", osd_num, i, osd_op_names[i], avg);
  266. }
  267. prev_stats.op_stat_count[i] = c_cli.stats.op_stat_count[i];
  268. prev_stats.op_stat_sum[i] = c_cli.stats.op_stat_sum[i];
  269. prev_stats.op_stat_bytes[i] = c_cli.stats.op_stat_bytes[i];
  270. }
  271. }
  272. for (int i = 0; i <= OSD_OP_MAX; i++)
  273. {
  274. if (c_cli.stats.subop_stat_count[i] != prev_stats.subop_stat_count[i])
  275. {
  276. uint64_t avg = (c_cli.stats.subop_stat_sum[i] - prev_stats.subop_stat_sum[i])/(c_cli.stats.subop_stat_count[i] - prev_stats.subop_stat_count[i]);
  277. printf("[OSD %lu] avg latency for subop %d (%s): %ld us\n", osd_num, i, osd_op_names[i], avg);
  278. prev_stats.subop_stat_count[i] = c_cli.stats.subop_stat_count[i];
  279. prev_stats.subop_stat_sum[i] = c_cli.stats.subop_stat_sum[i];
  280. }
  281. }
  282. for (int i = 0; i < 2; i++)
  283. {
  284. if (recovery_stat_count[0][i] != recovery_stat_count[1][i])
  285. {
  286. uint64_t bw = (recovery_stat_bytes[0][i] - recovery_stat_bytes[1][i]) / print_stats_interval;
  287. printf(
  288. "[OSD %lu] %s recovery: %.1f op/s, B/W: %.2f %s\n", osd_num, recovery_stat_names[i],
  289. (recovery_stat_count[0][i] - recovery_stat_count[1][i]) * 1.0 / print_stats_interval,
  290. (bw > 1024*1024*1024 ? bw/1024.0/1024/1024 : (bw > 1024*1024 ? bw/1024.0/1024 : bw/1024.0)),
  291. (bw > 1024*1024*1024 ? "GB/s" : (bw > 1024*1024 ? "MB/s" : "KB/s"))
  292. );
  293. recovery_stat_count[1][i] = recovery_stat_count[0][i];
  294. recovery_stat_bytes[1][i] = recovery_stat_bytes[0][i];
  295. }
  296. }
  297. if (incomplete_objects > 0)
  298. {
  299. printf("[OSD %lu] %lu object(s) incomplete\n", osd_num, incomplete_objects);
  300. }
  301. if (degraded_objects > 0)
  302. {
  303. printf("[OSD %lu] %lu object(s) degraded\n", osd_num, degraded_objects);
  304. }
  305. if (misplaced_objects > 0)
  306. {
  307. printf("[OSD %lu] %lu object(s) misplaced\n", osd_num, misplaced_objects);
  308. }
  309. }
  310. void osd_t::print_slow()
  311. {
  312. char alloc[1024];
  313. timespec now;
  314. clock_gettime(CLOCK_REALTIME, &now);
  315. for (auto & kv: c_cli.clients)
  316. {
  317. for (auto op: kv.second->received_ops)
  318. {
  319. if ((now.tv_sec - op->tv_begin.tv_sec) >= slow_log_interval)
  320. {
  321. int l = sizeof(alloc), n;
  322. char *buf = alloc;
  323. #define bufprintf(s, ...) { n = snprintf(buf, l, s, __VA_ARGS__); n = n < 0 ? 0 : n; buf += n; l -= n; }
  324. bufprintf("[OSD %lu] Slow op", osd_num);
  325. if (kv.second->osd_num)
  326. {
  327. bufprintf(" from peer OSD %lu (client %d)", kv.second->osd_num, kv.second->peer_fd);
  328. }
  329. else
  330. {
  331. bufprintf(" from client %d", kv.second->peer_fd);
  332. }
  333. bufprintf(": %s id=%lu", osd_op_names[op->req.hdr.opcode], op->req.hdr.id);
  334. if (op->req.hdr.opcode == OSD_OP_SEC_READ || op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
  335. op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE || op->req.hdr.opcode == OSD_OP_SEC_DELETE)
  336. {
  337. bufprintf(" %lx:%lx v", op->req.sec_rw.oid.inode, op->req.sec_rw.oid.stripe);
  338. if (op->req.sec_rw.version == UINT64_MAX)
  339. {
  340. bufprintf("%s", "max");
  341. }
  342. else
  343. {
  344. bufprintf("%lu", op->req.sec_rw.version);
  345. }
  346. if (op->req.hdr.opcode != OSD_OP_SEC_DELETE)
  347. {
  348. bufprintf(" offset=%x len=%x", op->req.sec_rw.offset, op->req.sec_rw.len);
  349. }
  350. }
  351. else if (op->req.hdr.opcode == OSD_OP_SEC_STABILIZE || op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK)
  352. {
  353. for (uint64_t i = 0; i < op->req.sec_stab.len; i += sizeof(obj_ver_id))
  354. {
  355. obj_ver_id *ov = (obj_ver_id*)(op->buf + i);
  356. bufprintf(i == 0 ? " %lx:%lx v%lu" : ", %lx:%lx v%lu", ov->oid.inode, ov->oid.stripe, ov->version);
  357. }
  358. }
  359. else if (op->req.hdr.opcode == OSD_OP_SEC_LIST)
  360. {
  361. bufprintf(
  362. " inode=%lx-%lx pg=%u/%u, stripe=%lu",
  363. op->req.sec_list.min_inode, op->req.sec_list.max_inode,
  364. op->req.sec_list.list_pg, op->req.sec_list.pg_count,
  365. op->req.sec_list.pg_stripe_size
  366. );
  367. }
  368. else if (op->req.hdr.opcode == OSD_OP_READ || op->req.hdr.opcode == OSD_OP_WRITE ||
  369. op->req.hdr.opcode == OSD_OP_DELETE)
  370. {
  371. bufprintf(" inode=%lx offset=%lx len=%x", op->req.rw.inode, op->req.rw.offset, op->req.rw.len);
  372. }
  373. #undef bufprintf
  374. printf("%s\n", alloc);
  375. }
  376. }
  377. }
  378. }