vitastor/src/nfs_proxy.cpp

302 lines
9.4 KiB
C++

// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
//
// Simplified NFS proxy
// Presents all images as files, stores small files directly in etcd
// Keeps image list in memory and thus is unsuitable for a lot of files
#include <netinet/tcp.h>
#include <sys/epoll.h>
#include <sys/poll.h>
#include <unistd.h>
#include <fcntl.h>
//#include <signal.h>
#include "libnfs-raw-mount.h"
#include "libnfs-raw-nfs.h"
#include "libnfs-raw-portmap.h"
#include "addr_util.h"
#include "base64.h"
#include "nfs_proxy.h"
const char *exe_name = NULL;
nfs_proxy_t::~nfs_proxy_t()
{
if (cli)
delete cli;
if (epmgr)
delete epmgr;
if (ringloop)
delete ringloop;
}
json11::Json::object nfs_proxy_t::parse_args(int narg, const char *args[])
{
json11::Json::object cfg;
for (int i = 1; i < narg; i++)
{
if (!strcmp(args[i], "-h") || !strcmp(args[i], "--help"))
{
printf(
"Vitastor NFS 3.0 proxy\n"
"(c) Vitaliy Filippov, 2021-2022 (VNPL-1.1)\n\n"
"USAGE:\n"
" %s [--etcd_address ADDR] [OTHER OPTIONS]\n",
exe_name
);
exit(0);
}
else if (args[i][0] == '-' && args[i][1] == '-')
{
const char *opt = args[i]+2;
cfg[opt] = !strcmp(opt, "json") || i == narg-1 ? "1" : args[++i];
}
}
return cfg;
}
void nfs_proxy_t::run(json11::Json cfg)
{
bind_address = cfg["bind_address"].string_value();
if (bind_address == "")
bind_address = "0.0.0.0";
// Create client
ringloop = new ring_loop_t(512);
epmgr = new epoll_manager_t(ringloop);
cli = new cluster_client_t(ringloop, epmgr->tfd, cfg);
// We need inode name hashes for NFS handles to remain stateless and <= 64 bytes long
dir_mod_rev[""] = 0;
dir_ids[""] = 1;
assert(cli->st_cli.on_inode_change_hook == NULL);
cli->st_cli.on_inode_change_hook = [this](inode_t changed_inode, bool removed)
{
if (removed)
{
auto ino_it = hash_by_inode.find(changed_inode);
if (ino_it != hash_by_inode.end())
{
inode_by_hash.erase(ino_it->second);
hash_by_inode.erase(ino_it);
}
// FIXME also calculate dir_mod_rev
}
else
{
auto & inode_cfg = cli->st_cli.inode_config.at(changed_inode);
std::string name = inode_cfg.name;
if (name_prefix != "")
{
if (name.substr(0, name_prefix.size()) != name_prefix)
return;
name = name.substr(name_prefix.size());
}
dir_mod_rev[""] = dir_mod_rev[""] < inode_cfg.mod_revision ? inode_cfg.mod_revision : dir_mod_rev[""];
std::string hash = "S"+base64_encode(sha256(name));
int pos = name.find('/');
while (pos >= 0)
{
std::string dir = name.substr(0, pos);
if (dir_ids.find(dir) == dir_ids.end())
dir_ids[dir] = next_dir_id++;
dir_mod_rev[dir] = dir_mod_rev[dir] < inode_cfg.mod_revision ? inode_cfg.mod_revision : dir_mod_rev[dir];
dir_by_hash["S"+base64_encode(sha256(dir))] = dir;
int next = name.substr(pos+1).find('/');
pos = next < 0 ? -1 : pos+1+next;
}
auto hbi_it = hash_by_inode.find(changed_inode);
if (hbi_it != hash_by_inode.end() && hbi_it->second != hash)
{
// inode had a different name, remove old hash=>inode pointer
inode_by_hash.erase(hbi_it->second);
}
inode_by_hash[hash] = changed_inode;
hash_by_inode[changed_inode] = hash;
}
};
// Load image metadata
while (!cli->is_ready())
{
ringloop->loop();
if (cli->is_ready())
break;
ringloop->wait();
}
// Create portmap socket
int portmap_socket = create_and_bind_socket(bind_address, 111, 128, NULL);
fcntl(portmap_socket, F_SETFL, fcntl(portmap_socket, F_GETFL, 0) | O_NONBLOCK);
// Create NFS socket
int nfs_socket = create_and_bind_socket(bind_address, 2049, 128, NULL);
fcntl(nfs_socket, F_SETFL, fcntl(nfs_socket, F_GETFL, 0) | O_NONBLOCK);
// Self-register portmap and NFS
pmap.reg_ports.insert((portmap_id_t){
.prog = PMAP_PROGRAM,
.vers = PMAP_V2,
.port = 111,
.owner = "portmapper-service",
.addr = "0.0.0.0.0.111",
});
pmap.reg_ports.insert((portmap_id_t){
.prog = PMAP_PROGRAM,
.vers = PMAP_V3,
.port = 111,
.owner = "portmapper-service",
.addr = "0.0.0.0.0.111",
});
pmap.reg_ports.insert((portmap_id_t){
.prog = NFS_PROGRAM,
.vers = NFS_V3,
.port = 2049,
.owner = "nfs-server",
.addr = "0.0.0.0.0.2049",
});
pmap.reg_ports.insert((portmap_id_t){
.prog = MOUNT_PROGRAM,
.vers = MOUNT_V3,
.port = 2049,
.owner = "rpc.mountd",
.addr = "0.0.0.0.0.2049",
});
// Add FDs to epoll
epmgr->tfd->set_fd_handler(portmap_socket, false, [this](int portmap_socket, int epoll_events)
{
if (epoll_events & EPOLLRDHUP)
{
fprintf(stderr, "Listening portmap socket disconnected, exiting\n");
exit(1);
}
else
{
do_accept(portmap_socket);
}
});
epmgr->tfd->set_fd_handler(nfs_socket, false, [this](int nfs_socket, int epoll_events)
{
if (epoll_events & EPOLLRDHUP)
{
fprintf(stderr, "Listening portmap socket disconnected, exiting\n");
exit(1);
}
else
{
do_accept(nfs_socket);
}
});
if (cfg["foreground"].is_null())
{
daemonize();
}
while (true)
{
ringloop->loop();
ringloop->wait();
}
/*// Sync at the end
cluster_op_t *close_sync = new cluster_op_t;
close_sync->opcode = OSD_OP_SYNC;
close_sync->callback = [&stop](cluster_op_t *op)
{
stop = true;
delete op;
};
cli->execute(close_sync);*/
// Destroy the client
delete cli;
delete epmgr;
delete ringloop;
cli = NULL;
epmgr = NULL;
ringloop = NULL;
}
void nfs_proxy_t::do_accept(int listen_fd)
{
struct sockaddr_storage addr;
socklen_t addr_size = sizeof(addr);
int nfs_fd = 0;
while ((nfs_fd = accept(listen_fd, (struct sockaddr *)&addr, &addr_size)) >= 0)
{
fprintf(stderr, "New client %d: connection from %s\n", nfs_fd, addr_to_string(addr).c_str());
fcntl(nfs_fd, F_SETFL, fcntl(nfs_fd, F_GETFL, 0) | O_NONBLOCK);
int one = 1;
setsockopt(nfs_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
auto cli = new nfs_client_t();
cli->parent = this;
cli->nfs_fd = nfs_fd;
cli->rpc = rpc_init_server_context(nfs_fd);
if (!cli->rpc)
{
delete cli;
close(nfs_fd);
fprintf(stderr, "Failed to init libnfs server context\n");
exit(1);
}
// Use both portmap and NFS everywhere
rpc_register_service(cli->rpc, PMAP_PROGRAM, PMAP_V2, pmap.pmap2_pt.data(), pmap.pmap2_pt.size());
rpc_register_service(cli->rpc, PMAP_PROGRAM, PMAP_V3, pmap.pmap3_pt.data(), pmap.pmap3_pt.size());
rpc_register_service(cli->rpc, NFS_PROGRAM, NFS_V3, cli->nfs3_pt.data(), cli->nfs3_pt.size());
rpc_register_service(cli->rpc, MOUNT_PROGRAM, MOUNT_V3, cli->nfs3_mount_pt.data(), cli->nfs3_mount_pt.size());
epmgr->tfd->set_fd_handler(nfs_fd, true, [this, cli](int nfs_fd, int epoll_events)
{
// Handle incoming event
if (epoll_events & EPOLLRDHUP)
{
fprintf(stderr, "Client %d disconnected\n", nfs_fd);
epmgr->tfd->set_fd_handler(cli->nfs_fd, true, NULL);
delete cli;
close(nfs_fd);
return;
}
int revents = 0;
if (epoll_events & EPOLLIN)
revents |= POLLIN;
if (epoll_events & EPOLLOUT)
revents |= POLLOUT;
// Let libnfs process the event
if (rpc_service(cli->rpc, revents) < 0)
{
fprintf(stderr, "libnfs error: %s, disconnecting client %d\n", rpc_get_error(cli->rpc), nfs_fd);
epmgr->tfd->set_fd_handler(cli->nfs_fd, true, NULL);
delete cli;
close(nfs_fd);
return;
}
// FIXME Add/remove events based on rpc_which_events(rpc) ?
});
}
if (nfs_fd < 0 && errno != EAGAIN)
{
fprintf(stderr, "Failed to accept connection: %s\n", strerror(errno));
exit(1);
}
}
void nfs_proxy_t::daemonize()
{
if (fork())
exit(0);
setsid();
if (fork())
exit(0);
if (chdir("/") != 0)
fprintf(stderr, "Warning: Failed to chdir into /\n");
close(0);
close(1);
close(2);
open("/dev/null", O_RDONLY);
open("/dev/null", O_WRONLY);
open("/dev/null", O_WRONLY);
}
int main(int narg, const char *args[])
{
setvbuf(stdout, NULL, _IONBF, 0);
setvbuf(stderr, NULL, _IONBF, 0);
exe_name = args[0];
nfs_proxy_t *p = new nfs_proxy_t();
p->run(nfs_proxy_t::parse_args(narg, args));
delete p;
return 0;
}