From 3089b4b800d8423d9195f6af066427bc207c6779 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 17 Apr 2022 19:38:35 +0300 Subject: [PATCH] Use vitastor-cli instead of direct etcd interaction in the CSI driver --- csi/src/controllerserver.go | 235 +++++++----------------------------- 1 file changed, 44 insertions(+), 191 deletions(-) diff --git a/csi/src/controllerserver.go b/csi/src/controllerserver.go index 0e5a3033..ea869cac 100644 --- a/csi/src/controllerserver.go +++ b/csi/src/controllerserver.go @@ -10,7 +10,6 @@ import ( "bytes" "strconv" "time" - "fmt" "os" "os/exec" "io/ioutil" @@ -21,8 +20,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "go.etcd.io/etcd/clientv3" - "github.com/container-storage-interface/spec/lib/go/csi" ) @@ -114,6 +111,34 @@ func GetConnectionParams(params map[string]string) (map[string]string, []string, return ctxVars, etcdUrl, etcdPrefix } +func invokeCLI(ctxVars map[string]string, args []string) ([]byte, error) +{ + if (ctxVars["etcdUrl"] != "") + { + args = append(args, "--etcd_address", ctxVars["etcdUrl"]) + } + if (ctxVars["etcdPrefix"] != "") + { + args = append(args, "--etcd_prefix", ctxVars["etcdPrefix"]) + } + if (ctxVars["configPath"] != "") + { + args = append(args, "--config_path", ctxVars["configPath"]) + } + c := exec.Command("/usr/bin/vitastor-cli", args...) + var stdout, stderr bytes.Buffer + c.Stdout = &stdout + c.Stderr = &stderr + err := c.Run() + stderrStr := string(stderr.Bytes()) + if (err != nil) + { + klog.Errorf("vitastor-cli %s failed: %s, status %s\n", strings.Join(args, " "), stderrStr, err) + return nil, status.Error(codes.Internal, stderrStr+" (status "+err.Error()+")") + } + return stdout.Bytes(), nil +} + // Create the volume func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { @@ -146,128 +171,41 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol volSize = ((capRange.GetRequiredBytes() + MB - 1) / MB) * MB } - // FIXME: The following should PROBABLY be implemented externally in a management tool - - ctxVars, etcdUrl, etcdPrefix := GetConnectionParams(req.Parameters) + ctxVars, etcdUrl, _ := GetConnectionParams(req.Parameters) if (len(etcdUrl) == 0) { return nil, status.Error(codes.InvalidArgument, "no etcdUrl in storage class configuration and no etcd_address in vitastor.conf") } - // Connect to etcd - cli, err := clientv3.New(clientv3.Config{ - DialTimeout: ETCD_TIMEOUT, - Endpoints: etcdUrl, - }) + // Create image using vitastor-cli + _, err := invokeCLI(ctxVars, []string{ "create", volName, "-s", string(volSize), "--pool", string(poolId) }) if (err != nil) { - return nil, status.Error(codes.Internal, "failed to connect to etcd at "+strings.Join(etcdUrl, ",")+": "+err.Error()) - } - defer cli.Close() - - var imageId uint64 = 0 - for - { - // Check if the image exists - ctx, cancel := context.WithTimeout(context.Background(), ETCD_TIMEOUT) - resp, err := cli.Get(ctx, etcdPrefix+"/index/image/"+volName) - cancel() - if (err != nil) + if (strings.Index(err.Error(), "already exists") > 0) { - return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error()) - } - if (len(resp.Kvs) > 0) - { - kv := resp.Kvs[0] - var v InodeIndex - err := json.Unmarshal(kv.Value, &v) + stat, err := invokeCLI(ctxVars, []string{ "ls", "--json", volName }) if (err != nil) { - return nil, status.Error(codes.Internal, "invalid /index/image/"+volName+" key in etcd: "+err.Error()) + return nil, err } - poolId = v.PoolId - imageId = v.Id - inodeCfgKey := fmt.Sprintf("/config/inode/%d/%d", poolId, imageId) - ctx, cancel := context.WithTimeout(context.Background(), ETCD_TIMEOUT) - resp, err := cli.Get(ctx, etcdPrefix+inodeCfgKey) - cancel() + var inodeCfg []InodeConfig + err = json.Unmarshal(stat, &inodeCfg) if (err != nil) { - return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error()) + return nil, status.Error(codes.Internal, "Invalid JSON in vitastor-cli ls: "+err.Error()) } - if (len(resp.Kvs) == 0) + if (len(inodeCfg) == 0) { - return nil, status.Error(codes.Internal, "missing "+inodeCfgKey+" key in etcd") + return nil, status.Error(codes.Internal, "vitastor-cli create said that image already exists, but ls can't find it") } - var inodeCfg InodeConfig - err = json.Unmarshal(resp.Kvs[0].Value, &inodeCfg) - if (err != nil) - { - return nil, status.Error(codes.Internal, "invalid "+inodeCfgKey+" key in etcd: "+err.Error()) - } - if (inodeCfg.Size < uint64(volSize)) + if (inodeCfg[0].Size < uint64(volSize)) { return nil, status.Error(codes.Internal, "image "+volName+" is already created, but size is less than expected") } } else { - // Find a free ID - // Create image metadata in a transaction verifying that the image doesn't exist yet AND ID is still free - maxIdKey := fmt.Sprintf("%s/index/maxid/%d", etcdPrefix, poolId) - ctx, cancel := context.WithTimeout(context.Background(), ETCD_TIMEOUT) - resp, err := cli.Get(ctx, maxIdKey) - cancel() - if (err != nil) - { - return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error()) - } - var modRev int64 - var nextId uint64 - if (len(resp.Kvs) > 0) - { - var err error - nextId, err = strconv.ParseUint(string(resp.Kvs[0].Value), 10, 64) - if (err != nil) - { - return nil, status.Error(codes.Internal, maxIdKey+" contains invalid ID") - } - modRev = resp.Kvs[0].ModRevision - nextId++ - } - else - { - nextId = 1 - } - inodeIdxJson, _ := json.Marshal(InodeIndex{ - Id: nextId, - PoolId: poolId, - }) - inodeCfgJson, _ := json.Marshal(InodeConfig{ - Name: volName, - Size: uint64(volSize), - }) - ctx, cancel = context.WithTimeout(context.Background(), ETCD_TIMEOUT) - txnResp, err := cli.Txn(ctx).If( - clientv3.Compare(clientv3.ModRevision(fmt.Sprintf("%s/index/maxid/%d", etcdPrefix, poolId)), "=", modRev), - clientv3.Compare(clientv3.CreateRevision(fmt.Sprintf("%s/index/image/%s", etcdPrefix, volName)), "=", 0), - clientv3.Compare(clientv3.CreateRevision(fmt.Sprintf("%s/config/inode/%d/%d", etcdPrefix, poolId, nextId)), "=", 0), - ).Then( - clientv3.OpPut(fmt.Sprintf("%s/index/maxid/%d", etcdPrefix, poolId), fmt.Sprintf("%d", nextId)), - clientv3.OpPut(fmt.Sprintf("%s/index/image/%s", etcdPrefix, volName), string(inodeIdxJson)), - clientv3.OpPut(fmt.Sprintf("%s/config/inode/%d/%d", etcdPrefix, poolId, nextId), string(inodeCfgJson)), - ).Commit() - cancel() - if (err != nil) - { - return nil, status.Error(codes.Internal, "failed to commit transaction in etcd: "+err.Error()) - } - if (txnResp.Succeeded) - { - imageId = nextId - break - } - // Start over if the transaction fails + return nil, err } } @@ -299,97 +237,12 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol } volName := ctxVars["name"] - _, etcdUrl, etcdPrefix := GetConnectionParams(ctxVars) - if (len(etcdUrl) == 0) - { - return nil, status.Error(codes.InvalidArgument, "no etcdUrl in storage class configuration and no etcd_address in vitastor.conf") - } + ctxVars, _, _ = GetConnectionParams(ctxVars) - cli, err := clientv3.New(clientv3.Config{ - DialTimeout: ETCD_TIMEOUT, - Endpoints: etcdUrl, - }) + _, err = invokeCLI(ctxVars, []string{ "rm", volName }) if (err != nil) { - return nil, status.Error(codes.Internal, "failed to connect to etcd at "+strings.Join(etcdUrl, ",")+": "+err.Error()) - } - defer cli.Close() - - // Find inode by name - ctx, cancel := context.WithTimeout(context.Background(), ETCD_TIMEOUT) - resp, err := cli.Get(ctx, etcdPrefix+"/index/image/"+volName) - cancel() - if (err != nil) - { - return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error()) - } - if (len(resp.Kvs) == 0) - { - return nil, status.Error(codes.NotFound, "volume "+volName+" does not exist") - } - var idx InodeIndex - err = json.Unmarshal(resp.Kvs[0].Value, &idx) - if (err != nil) - { - return nil, status.Error(codes.Internal, "invalid /index/image/"+volName+" key in etcd: "+err.Error()) - } - - // Get inode config - inodeCfgKey := fmt.Sprintf("%s/config/inode/%d/%d", etcdPrefix, idx.PoolId, idx.Id) - ctx, cancel = context.WithTimeout(context.Background(), ETCD_TIMEOUT) - resp, err = cli.Get(ctx, inodeCfgKey) - cancel() - if (err != nil) - { - return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error()) - } - if (len(resp.Kvs) == 0) - { - return nil, status.Error(codes.NotFound, "volume "+volName+" does not exist") - } - var inodeCfg InodeConfig - err = json.Unmarshal(resp.Kvs[0].Value, &inodeCfg) - if (err != nil) - { - return nil, status.Error(codes.Internal, "invalid "+inodeCfgKey+" key in etcd: "+err.Error()) - } - - // Delete inode data by invoking vitastor-cli - args := []string{ - "rm-data", "--etcd_address", strings.Join(etcdUrl, ","), - "--pool", fmt.Sprintf("%d", idx.PoolId), - "--inode", fmt.Sprintf("%d", idx.Id), - } - if (ctxVars["configPath"] != "") - { - args = append(args, "--config_path", ctxVars["configPath"]) - } - c := exec.Command("/usr/bin/vitastor-cli", args...) - var stderr bytes.Buffer - c.Stdout = nil - c.Stderr = &stderr - err = c.Run() - stderrStr := string(stderr.Bytes()) - if (err != nil) - { - klog.Errorf("vitastor-cli rm-data failed: %s, status %s\n", stderrStr, err) - return nil, status.Error(codes.Internal, stderrStr+" (status "+err.Error()+")") - } - - // Delete inode config in etcd - ctx, cancel = context.WithTimeout(context.Background(), ETCD_TIMEOUT) - txnResp, err := cli.Txn(ctx).Then( - clientv3.OpDelete(fmt.Sprintf("%s/index/image/%s", etcdPrefix, volName)), - clientv3.OpDelete(fmt.Sprintf("%s/config/inode/%d/%d", etcdPrefix, idx.PoolId, idx.Id)), - ).Commit() - cancel() - if (err != nil) - { - return nil, status.Error(codes.Internal, "failed to delete keys in etcd: "+err.Error()) - } - if (!txnResp.Succeeded) - { - return nil, status.Error(codes.Internal, "failed to delete keys in etcd: transaction failed") + return nil, err } return &csi.DeleteVolumeResponse{}, nil