@ -6,11 +6,11 @@ package vitastor
import (
"context"
"encoding/json"
"fmt"
"strings"
"bytes"
"strconv"
"time"
"fmt"
"os"
"os/exec"
"io/ioutil"
@ -21,8 +21,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"go.etcd.io/etcd/clientv3"
"github.com/container-storage-interface/spec/lib/go/csi"
)
@ -114,6 +112,34 @@ func GetConnectionParams(params map[string]string) (map[string]string, []string,
return ctxVars , etcdUrl , etcdPrefix
}
func invokeCLI ( ctxVars map [ string ] string , args [ ] string ) ( [ ] byte , error )
{
if ( ctxVars [ "etcdUrl" ] != "" )
{
args = append ( args , "--etcd_address" , ctxVars [ "etcdUrl" ] )
}
if ( ctxVars [ "etcdPrefix" ] != "" )
{
args = append ( args , "--etcd_prefix" , ctxVars [ "etcdPrefix" ] )
}
if ( ctxVars [ "configPath" ] != "" )
{
args = append ( args , "--config_path" , ctxVars [ "configPath" ] )
}
c := exec . Command ( "/usr/bin/vitastor-cli" , args ... )
var stdout , stderr bytes . Buffer
c . Stdout = & stdout
c . Stderr = & stderr
err := c . Run ( )
stderrStr := string ( stderr . Bytes ( ) )
if ( err != nil )
{
klog . Errorf ( "vitastor-cli %s failed: %s, status %s\n" , strings . Join ( args , " " ) , stderrStr , err )
return nil , status . Error ( codes . Internal , stderrStr + " (status " + err . Error ( ) + ")" )
}
return stdout . Bytes ( ) , nil
}
// Create the volume
func ( cs * ControllerServer ) CreateVolume ( ctx context . Context , req * csi . CreateVolumeRequest ) ( * csi . CreateVolumeResponse , error )
{
@ -146,128 +172,41 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
volSize = ( ( capRange . GetRequiredBytes ( ) + MB - 1 ) / MB ) * MB
}
// FIXME: The following should PROBABLY be implemented externally in a management tool
ctxVars , etcdUrl , etcdPrefix := GetConnectionParams ( req . Parameters )
ctxVars , etcdUrl , _ := GetConnectionParams ( req . Parameters )
if ( len ( etcdUrl ) == 0 )
{
return nil , status . Error ( codes . InvalidArgument , "no etcdUrl in storage class configuration and no etcd_address in vitastor.conf" )
}
// Connect to etcd
cli , err := clientv3 . New ( clientv3 . Config {
DialTimeout : ETCD_TIMEOUT ,
Endpoints : etcdUrl ,
} )
// Create image using vitastor-cli
_ , err := invokeCLI ( ctxVars , [ ] string { "create" , volName , "-s" , fmt . Sprintf ( "%v" , volSize ) , "--pool" , fmt . Sprintf ( "%v" , poolId ) } )
if ( err != nil )
{
return nil , status . Error ( codes . Internal , "failed to connect to etcd at " + strings . Join ( etcdUrl , "," ) + ": " + err . Error ( ) )
}
defer cli . Close ( )
var imageId uint64 = 0
for
{
// Check if the image exists
ctx , cancel := context . WithTimeout ( context . Background ( ) , ETCD_TIMEOUT )
resp , err := cli . Get ( ctx , etcdPrefix + "/index/image/" + volName )
cancel ( )
if ( err != nil )
{
return nil , status . Error ( codes . Internal , "failed to read key from etcd: " + err . Error ( ) )
}
if ( len ( resp . Kvs ) > 0 )
if ( strings . Index ( err . Error ( ) , "already exists" ) > 0 )
{
kv := resp . Kvs [ 0 ]
var v InodeIndex
err := json . Unmarshal ( kv . Value , & v )
stat , err := invokeCLI ( ctxVars , [ ] string { "ls" , "--json" , volName } )
if ( err != nil )
{
return nil , status . Error ( codes . Internal , "invalid /index/image/" + volName + " key in etcd: " + err . Error ( ) )
return nil , err
}
poolId = v . PoolId
imageId = v . Id
inodeCfgKey := fmt . Sprintf ( "/config/inode/%d/%d" , poolId , imageId )
ctx , cancel := context . WithTimeout ( context . Background ( ) , ETCD_TIMEOUT )
resp , err := cli . Get ( ctx , etcdPrefix + inodeCfgKey )
cancel ( )
var inodeCfg [ ] InodeConfig
err = json . Unmarshal ( stat , & inodeCfg )
if ( err != nil )
{
return nil , status . Error ( codes . Internal , "failed to read key from etcd : " + err . Error ( ) )
return nil , status . Error ( codes . Internal , "Invalid JSON in vitastor-cli ls: " + err . Error ( ) )
}
if ( len ( resp . Kvs ) == 0 )
if ( len ( inodeCfg ) == 0 )
{
return nil , status . Error ( codes . Internal , "missing " + inodeCfgKey + " key in etcd " )
return nil , status . Error ( codes . Internal , "vitastor-cli create said that image already exists, but ls can't find it" )
}
var inodeCfg InodeConfig
err = json . Unmarshal ( resp . Kvs [ 0 ] . Value , & inodeCfg )
if ( err != nil )
{
return nil , status . Error ( codes . Internal , "invalid " + inodeCfgKey + " key in etcd: " + err . Error ( ) )
}
if ( inodeCfg . Size < uint64 ( volSize ) )
if ( inodeCfg [ 0 ] . Size < uint64 ( volSize ) )
{
return nil , status . Error ( codes . Internal , "image " + volName + " is already created, but size is less than expected" )
}
}
else
{
// Find a free ID
// Create image metadata in a transaction verifying that the image doesn't exist yet AND ID is still free
maxIdKey := fmt . Sprintf ( "%s/index/maxid/%d" , etcdPrefix , poolId )
ctx , cancel := context . WithTimeout ( context . Background ( ) , ETCD_TIMEOUT )
resp , err := cli . Get ( ctx , maxIdKey )
cancel ( )
if ( err != nil )
{
return nil , status . Error ( codes . Internal , "failed to read key from etcd: " + err . Error ( ) )
}
var modRev int64
var nextId uint64
if ( len ( resp . Kvs ) > 0 )
{
var err error
nextId , err = strconv . ParseUint ( string ( resp . Kvs [ 0 ] . Value ) , 10 , 64 )
if ( err != nil )
{
return nil , status . Error ( codes . Internal , maxIdKey + " contains invalid ID" )
}
modRev = resp . Kvs [ 0 ] . ModRevision
nextId ++
}
else
{
nextId = 1
}
inodeIdxJson , _ := json . Marshal ( InodeIndex {
Id : nextId ,
PoolId : poolId ,
} )
inodeCfgJson , _ := json . Marshal ( InodeConfig {
Name : volName ,
Size : uint64 ( volSize ) ,
} )
ctx , cancel = context . WithTimeout ( context . Background ( ) , ETCD_TIMEOUT )
txnResp , err := cli . Txn ( ctx ) . If (
clientv3 . Compare ( clientv3 . ModRevision ( fmt . Sprintf ( "%s/index/maxid/%d" , etcdPrefix , poolId ) ) , "=" , modRev ) ,
clientv3 . Compare ( clientv3 . CreateRevision ( fmt . Sprintf ( "%s/index/image/%s" , etcdPrefix , volName ) ) , "=" , 0 ) ,
clientv3 . Compare ( clientv3 . CreateRevision ( fmt . Sprintf ( "%s/config/inode/%d/%d" , etcdPrefix , poolId , nextId ) ) , "=" , 0 ) ,
) . Then (
clientv3 . OpPut ( fmt . Sprintf ( "%s/index/maxid/%d" , etcdPrefix , poolId ) , fmt . Sprintf ( "%d" , nextId ) ) ,
clientv3 . OpPut ( fmt . Sprintf ( "%s/index/image/%s" , etcdPrefix , volName ) , string ( inodeIdxJson ) ) ,
clientv3 . OpPut ( fmt . Sprintf ( "%s/config/inode/%d/%d" , etcdPrefix , poolId , nextId ) , string ( inodeCfgJson ) ) ,
) . Commit ( )
cancel ( )
if ( err != nil )
{
return nil , status . Error ( codes . Internal , "failed to commit transaction in etcd: " + err . Error ( ) )
}
if ( txnResp . Succeeded )
{
imageId = nextId
break
}
// Start over if the transaction fails
return nil , err
}
}
@ -299,97 +238,12 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
}
volName := ctxVars [ "name" ]
_ , etcdUrl , etcdPrefix := GetConnectionParams ( ctxVars )
if ( len ( etcdUrl ) == 0 )
{
return nil , status . Error ( codes . InvalidArgument , "no etcdUrl in storage class configuration and no etcd_address in vitastor.conf" )
}
ctxVars , _ , _ = GetConnectionParams ( ctxVars )
cli , err := clientv3 . New ( clientv3 . Config {
DialTimeout : ETCD_TIMEOUT ,
Endpoints : etcdUrl ,
} )
_ , err = invokeCLI ( ctxVars , [ ] string { "rm" , volName } )
if ( err != nil )
{
return nil , status . Error ( codes . Internal , "failed to connect to etcd at " + strings . Join ( etcdUrl , "," ) + ": " + err . Error ( ) )
}
defer cli . Close ( )
// Find inode by name
ctx , cancel := context . WithTimeout ( context . Background ( ) , ETCD_TIMEOUT )
resp , err := cli . Get ( ctx , etcdPrefix + "/index/image/" + volName )
cancel ( )
if ( err != nil )
{
return nil , status . Error ( codes . Internal , "failed to read key from etcd: " + err . Error ( ) )
}
if ( len ( resp . Kvs ) == 0 )
{
return nil , status . Error ( codes . NotFound , "volume " + volName + " does not exist" )
}
var idx InodeIndex
err = json . Unmarshal ( resp . Kvs [ 0 ] . Value , & idx )
if ( err != nil )
{
return nil , status . Error ( codes . Internal , "invalid /index/image/" + volName + " key in etcd: " + err . Error ( ) )
}
// Get inode config
inodeCfgKey := fmt . Sprintf ( "%s/config/inode/%d/%d" , etcdPrefix , idx . PoolId , idx . Id )
ctx , cancel = context . WithTimeout ( context . Background ( ) , ETCD_TIMEOUT )
resp , err = cli . Get ( ctx , inodeCfgKey )
cancel ( )
if ( err != nil )
{
return nil , status . Error ( codes . Internal , "failed to read key from etcd: " + err . Error ( ) )
}
if ( len ( resp . Kvs ) == 0 )
{
return nil , status . Error ( codes . NotFound , "volume " + volName + " does not exist" )
}
var inodeCfg InodeConfig
err = json . Unmarshal ( resp . Kvs [ 0 ] . Value , & inodeCfg )
if ( err != nil )
{
return nil , status . Error ( codes . Internal , "invalid " + inodeCfgKey + " key in etcd: " + err . Error ( ) )
}
// Delete inode data by invoking vitastor-cli
args := [ ] string {
"rm-data" , "--etcd_address" , strings . Join ( etcdUrl , "," ) ,
"--pool" , fmt . Sprintf ( "%d" , idx . PoolId ) ,
"--inode" , fmt . Sprintf ( "%d" , idx . Id ) ,
}
if ( ctxVars [ "configPath" ] != "" )
{
args = append ( args , "--config_path" , ctxVars [ "configPath" ] )
}
c := exec . Command ( "/usr/bin/vitastor-cli" , args ... )
var stderr bytes . Buffer
c . Stdout = nil
c . Stderr = & stderr
err = c . Run ( )
stderrStr := string ( stderr . Bytes ( ) )
if ( err != nil )
{
klog . Errorf ( "vitastor-cli rm-data failed: %s, status %s\n" , stderrStr , err )
return nil , status . Error ( codes . Internal , stderrStr + " (status " + err . Error ( ) + ")" )
}
// Delete inode config in etcd
ctx , cancel = context . WithTimeout ( context . Background ( ) , ETCD_TIMEOUT )
txnResp , err := cli . Txn ( ctx ) . Then (
clientv3 . OpDelete ( fmt . Sprintf ( "%s/index/image/%s" , etcdPrefix , volName ) ) ,
clientv3 . OpDelete ( fmt . Sprintf ( "%s/config/inode/%d/%d" , etcdPrefix , idx . PoolId , idx . Id ) ) ,
) . Commit ( )
cancel ( )
if ( err != nil )
{
return nil , status . Error ( codes . Internal , "failed to delete keys in etcd: " + err . Error ( ) )
}
if ( ! txnResp . Succeeded )
{
return nil , status . Error ( codes . Internal , "failed to delete keys in etcd: transaction failed" )
return nil , err
}
return & csi . DeleteVolumeResponse { } , nil