s3-benchmark: Implement user defined run modes.

Signed-off-by: Mark Nelson <mnelson@redhat.com>
master
Mark Nelson 2019-08-12 15:06:57 -04:00
parent 4689735d41
commit c89808646f
1 changed files with 144 additions and 78 deletions

View File

@ -32,7 +32,7 @@ import (
)
// Global variables
var access_key, secret_key, url_host, bucket_prefix, object_prefix, region, sizeArg string
var access_key, secret_key, url_host, bucket_prefix, object_prefix, region, modes, sizeArg string
var buckets []string
var duration_secs, threads, loops, bucket_count int
var object_data []byte
@ -309,6 +309,100 @@ func runDelete(thread_num int) {
var cfg *aws.Config
func init_buckets(loop int) {
// Initialize data for the bucket
object_data = make([]byte, object_size)
rand.Read(object_data)
hasher := md5.New()
hasher.Write(object_data)
object_data_md5 = base64.StdEncoding.EncodeToString(hasher.Sum(nil))
// Create the buckets and delete all the objects
starttime := time.Now()
for i := 0; i < bucket_count; i++ {
createBucket(i, true)
deleteAllObjects(i)
}
init_time := time.Now().Sub(starttime).Seconds()
logit(fmt.Sprintf("Loop %d: INIT time %.1f secs, buckets = %d, speed = %.1f buckets/sec.",
loop, init_time, bucket_count, float64(bucket_count)/init_time))
}
func run_put(loop int) float64 {
// reset counters
upload_count = 0
upload_slowdown_count = 0
running_threads = int64(threads)
starttime := time.Now()
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
for n := 0; n < threads; n++ {
go runUpload(n)
}
// Wait for it to finish
for atomic.LoadInt64(&running_threads) > 0 {
time.Sleep(time.Millisecond)
}
upload_time := upload_finish.Sub(starttime).Seconds()
bps := float64(uint64(upload_count)*object_size) / upload_time
logit(fmt.Sprintf("Loop %d: PUT time %.1f secs, objects = %d, speed = %sB/sec, %.1f operations/sec. Slowdowns = %d",
loop, upload_time, upload_count, bytefmt.ByteSize(uint64(bps)), float64(upload_count)/upload_time, upload_slowdown_count))
return bps / bytefmt.MEGABYTE
}
func run_get(loop int) float64 {
// reset counters
download_count = 0
download_slowdown_count = 0
running_threads = int64(threads)
starttime := time.Now()
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
for n := 0; n < threads; n++ {
go runDownload(n)
}
// Wait for it to finish
for atomic.LoadInt64(&running_threads) > 0 {
time.Sleep(time.Millisecond)
}
download_time := download_finish.Sub(starttime).Seconds()
bps := float64(uint64(download_count)*object_size) / download_time
logit(fmt.Sprintf("Loop %d: GET time %.1f secs, objects = %d, speed = %sB/sec, %.1f operations/sec. Slowdowns = %d",
loop, download_time, download_count, bytefmt.ByteSize(uint64(bps)), float64(download_count)/download_time, download_slowdown_count))
return bps / bytefmt.MEGABYTE
}
func run_delete(loop int) float64 {
// reset counters
delete_count = 0
delete_slowdown_count = 0
running_threads = int64(threads)
starttime := time.Now()
for n := 0; n < threads; n++ {
go runDelete(n)
}
// Wait for it to finish
for atomic.LoadInt64(&running_threads) > 0 {
time.Sleep(time.Millisecond)
}
delete_time := delete_finish.Sub(starttime).Seconds()
bps := float64(uint64(delete_count)*object_size) / delete_time
logit(fmt.Sprintf("Loop %d: DELETE time %.1f secs, %.1f deletes/sec. Slowdowns = %d",
loop, delete_time, float64(upload_count)/delete_time, delete_slowdown_count))
return bps / bytefmt.MEGABYTE
}
func init() {
// Parse command line
myflag := flag.NewFlagSet("myflag", flag.ExitOnError)
@ -318,12 +412,36 @@ func init() {
myflag.StringVar(&object_prefix, "op", "", "Prefix for objects")
myflag.StringVar(&bucket_prefix, "bp", "hotsauce_bench", "Prefix for buckets")
myflag.StringVar(&region, "r", "us-east-1", "Region for testing")
myflag.StringVar(&modes, "m", "ipgd", "Run modes in order. See NOTES for more info")
myflag.Int64Var(&object_count, "n", -1, "Maximum number of objects <-1 for unlimited>")
myflag.IntVar(&bucket_count, "b", 1, "Number of buckets to distribute IOs across")
myflag.IntVar(&duration_secs, "d", 60, "Maximum test duration in seconds <-1 for unlimited>")
myflag.IntVar(&threads, "t", 1, "Number of threads to run")
myflag.IntVar(&loops, "l", 1, "Number of times to repeat test")
myflag.StringVar(&sizeArg, "z", "1M", "Size of objects in bytes with postfix K, M, and G")
// define custom usage output with notes
notes :=
`
NOTES:
- Valid mode types for the -m mode string are:
i: initialize buckets and clear any existing objects
p: put objects in buckets
g: get objects from buckets
d: delete objects from buckets
These modes are processed in-order and can be repeated, ie "ippgd" will
initialize the buckets, put the objects, reput the objects, get the
objects, and then delete the objects. The repeat flag will repeat this
whole process the specified number of times.
`
myflag.Usage = func() {
fmt.Fprintf(flag.CommandLine.Output(), "\nUSAGE: %s [OPTIONS]\n\n", os.Args[0])
fmt.Fprintf(flag.CommandLine.Output(), "OPTIONS:\n")
myflag.PrintDefaults()
fmt.Fprintf(flag.CommandLine.Output(), notes);
}
if err := myflag.Parse(os.Args[1:]); err != nil {
os.Exit(1)
}
@ -341,6 +459,17 @@ func init() {
if url_host == "" {
log.Fatal("Missing argument -s for host endpoint.")
}
invalid_mode := false
for _, r := range modes {
if (r != 'i' && r != 'p' && r != 'g' && r != 'd') {
s := fmt.Sprintf("Invalid mode '%s' passed to -m", string(r))
log.Printf(s)
invalid_mode = true
}
}
if invalid_mode {
log.Fatal("Invalid modes passed to -m, see help for details.")
}
var err error
if object_size, err = bytefmt.ToBytes(sizeArg); err != nil {
log.Fatalf("Invalid -z argument for object size: %v", err)
@ -365,89 +494,26 @@ func main() {
logit(fmt.Sprintf("Parameters: url=%s, bucket_prefix=%s, bucket_count=%d, region=%s, duration=%d, threads=%d, loops=%d, size=%s",
url_host, bucket_prefix, bucket_count, region, duration_secs, threads, loops, sizeArg))
// Initialize data for the bucket
object_data = make([]byte, object_size)
rand.Read(object_data)
hasher := md5.New()
hasher.Write(object_data)
object_data_md5 = base64.StdEncoding.EncodeToString(hasher.Sum(nil))
// Create the buckets and delete all the objects
// Setup the slice of buckets
for i := 0; i < bucket_count; i++ {
buckets = append(buckets, fmt.Sprintf("%s%012d", bucket_prefix, i))
createBucket(i, true)
deleteAllObjects(i)
}
buckets = append(buckets, fmt.Sprintf("%s%012d", bucket_prefix, i))
}
var uploadspeed, downloadspeed float64
// Loop running the tests
for loop := 1; loop <= loops; loop++ {
// reset counters
upload_count = 0
upload_slowdown_count = 0
download_count = 0
download_slowdown_count = 0
delete_count = 0
delete_slowdown_count = 0
// Run the upload case
running_threads = int64(threads)
starttime := time.Now()
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
for n := 0; n < threads; n++ {
go runUpload(n)
for _, r := range modes {
switch r {
case 'i':
init_buckets(loop);
case 'p':
uploadspeed = run_put(loop);
case 'g':
downloadspeed = run_get(loop);
case 'd':
run_delete(loop);
}
}
// Wait for it to finish
for atomic.LoadInt64(&running_threads) > 0 {
time.Sleep(time.Millisecond)
}
upload_time := upload_finish.Sub(starttime).Seconds()
bps := float64(uint64(upload_count)*object_size) / upload_time
logit(fmt.Sprintf("Loop %d: PUT time %.1f secs, objects = %d, speed = %sB/sec, %.1f operations/sec. Slowdowns = %d",
loop, upload_time, upload_count, bytefmt.ByteSize(uint64(bps)), float64(upload_count)/upload_time, upload_slowdown_count))
uploadspeed = bps / bytefmt.MEGABYTE
// Run the download case
running_threads = int64(threads)
starttime = time.Now()
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
for n := 0; n < threads; n++ {
go runDownload(n)
}
// Wait for it to finish
for atomic.LoadInt64(&running_threads) > 0 {
time.Sleep(time.Millisecond)
}
download_time := download_finish.Sub(starttime).Seconds()
bps = float64(uint64(download_count)*object_size) / download_time
logit(fmt.Sprintf("Loop %d: GET time %.1f secs, objects = %d, speed = %sB/sec, %.1f operations/sec. Slowdowns = %d",
loop, download_time, download_count, bytefmt.ByteSize(uint64(bps)), float64(download_count)/download_time, download_slowdown_count))
downloadspeed = bps / bytefmt.MEGABYTE
// Run the delete case
running_threads = int64(threads)
starttime = time.Now()
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
for n := 0; n < threads; n++ {
go runDelete(n)
}
// Wait for it to finish
for atomic.LoadInt64(&running_threads) > 0 {
time.Sleep(time.Millisecond)
}
delete_time := delete_finish.Sub(starttime).Seconds()
logit(fmt.Sprintf("Loop %d: DELETE time %.1f secs, %.1f deletes/sec. Slowdowns = %d",
loop, delete_time, float64(upload_count)/delete_time, delete_slowdown_count))
}
// All done