Merge pull request #6 from markhpc/wip-fmt

hsbench: gofmt to cleanup code
master
Mark Nelson 2019-08-19 10:56:29 -05:00 committed by GitHub
commit 457598e1b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 332 additions and 333 deletions

View File

@ -6,6 +6,7 @@ package main
import ( import (
"bytes" "bytes"
"code.cloudfoundry.org/bytefmt"
"crypto/hmac" "crypto/hmac"
"crypto/md5" "crypto/md5"
"crypto/sha1" "crypto/sha1"
@ -15,6 +16,10 @@ import (
"encoding/json" "encoding/json"
"flag" "flag"
"fmt" "fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"io" "io"
"io/ioutil" "io/ioutil"
"log" "log"
@ -29,11 +34,6 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"code.cloudfoundry.org/bytefmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
) )
// Global variables // Global variables
@ -136,82 +136,82 @@ func setSignature(req *http.Request) {
} }
type IntervalStats struct { type IntervalStats struct {
loop int loop int
name string name string
mode string mode string
bytes int64 bytes int64
slowdowns int64 slowdowns int64
intervalNano int64 intervalNano int64
latNano []int64 latNano []int64
} }
func (is *IntervalStats) makeOutputStats() OutputStats { func (is *IntervalStats) makeOutputStats() OutputStats {
// Compute and log the stats // Compute and log the stats
ops := len(is.latNano) ops := len(is.latNano)
totalLat := int64(0); totalLat := int64(0)
minLat := float64(0); minLat := float64(0)
maxLat := float64(0); maxLat := float64(0)
NinetyNineLat := float64(0); NinetyNineLat := float64(0)
avgLat := float64(0); avgLat := float64(0)
if ops > 0 { if ops > 0 {
minLat = float64(is.latNano[0]) / 1000000 minLat = float64(is.latNano[0]) / 1000000
maxLat = float64(is.latNano[ops - 1]) / 1000000 maxLat = float64(is.latNano[ops-1]) / 1000000
for i := range is.latNano { for i := range is.latNano {
totalLat += is.latNano[i] totalLat += is.latNano[i]
} }
avgLat = float64(totalLat) / float64(ops) / 1000000 avgLat = float64(totalLat) / float64(ops) / 1000000
NintyNineLatNano := is.latNano[int64(math.Round(0.99*float64(ops))) - 1] NintyNineLatNano := is.latNano[int64(math.Round(0.99*float64(ops)))-1]
NinetyNineLat = float64(NintyNineLatNano) / 1000000 NinetyNineLat = float64(NintyNineLatNano) / 1000000
} }
seconds := float64(is.intervalNano) / 1000000000 seconds := float64(is.intervalNano) / 1000000000
mbps := float64(is.bytes) / seconds / bytefmt.MEGABYTE mbps := float64(is.bytes) / seconds / bytefmt.MEGABYTE
iops := float64(ops) / seconds iops := float64(ops) / seconds
return OutputStats{ return OutputStats{
is.loop, is.loop,
is.name, is.name,
seconds, seconds,
is.mode, is.mode,
ops, ops,
mbps, mbps,
iops, iops,
minLat, minLat,
avgLat, avgLat,
NinetyNineLat, NinetyNineLat,
maxLat, maxLat,
is.slowdowns} is.slowdowns}
} }
type OutputStats struct { type OutputStats struct {
Loop int Loop int
IntervalName string IntervalName string
Seconds float64 Seconds float64
Mode string Mode string
Ops int Ops int
Mbps float64 Mbps float64
Iops float64 Iops float64
MinLat float64 MinLat float64
AvgLat float64 AvgLat float64
NinetyNineLat float64 NinetyNineLat float64
MaxLat float64 MaxLat float64
Slowdowns int64 Slowdowns int64
} }
func (o *OutputStats) log() { func (o *OutputStats) log() {
log.Printf( log.Printf(
"Loop: %d, Int: %s, Dur(s): %.1f, Mode: %s, Ops: %d, MB/s: %.2f, IO/s: %.0f, Lat(ms): [ min: %.1f, avg: %.1f, 99%%: %.1f, max: %.1f ], Slowdowns: %d", "Loop: %d, Int: %s, Dur(s): %.1f, Mode: %s, Ops: %d, MB/s: %.2f, IO/s: %.0f, Lat(ms): [ min: %.1f, avg: %.1f, 99%%: %.1f, max: %.1f ], Slowdowns: %d",
o.Loop, o.Loop,
o.IntervalName, o.IntervalName,
o.Seconds, o.Seconds,
o.Mode, o.Mode,
o.Ops, o.Ops,
o.Mbps, o.Mbps,
o.Iops, o.Iops,
o.MinLat, o.MinLat,
o.AvgLat, o.AvgLat,
o.NinetyNineLat, o.NinetyNineLat,
o.MaxLat, o.MaxLat,
o.Slowdowns) o.Slowdowns)
} }
func (o *OutputStats) csv_header(w *csv.Writer) { func (o *OutputStats) csv_header(w *csv.Writer) {
@ -242,7 +242,7 @@ func (o *OutputStats) csv(w *csv.Writer) {
log.Fatal("OutputStats Passed nil csv writer") log.Fatal("OutputStats Passed nil csv writer")
} }
s := []string { s := []string{
strconv.Itoa(o.Loop), strconv.Itoa(o.Loop),
o.IntervalName, o.IntervalName,
strconv.FormatFloat(o.Seconds, 'f', 2, 64), strconv.FormatFloat(o.Seconds, 'f', 2, 64),
@ -257,7 +257,7 @@ func (o *OutputStats) csv(w *csv.Writer) {
strconv.FormatInt(o.Slowdowns, 10)} strconv.FormatInt(o.Slowdowns, 10)}
if err := w.Write(s); err != nil { if err := w.Write(s); err != nil {
log.Fatal("Error writing to CSV writer: ",err) log.Fatal("Error writing to CSV writer: ", err)
} }
} }
@ -273,13 +273,13 @@ func (o *OutputStats) json(jfile *os.File) {
_, err = jfile.WriteString(string(jdata) + "\n") _, err = jfile.WriteString(string(jdata) + "\n")
if err != nil { if err != nil {
log.Fatal("Error writing to JSON file: ", err) log.Fatal("Error writing to JSON file: ", err)
} }
} }
type ThreadStats struct { type ThreadStats struct {
start int64 start int64
curInterval int64 curInterval int64
intervals []IntervalStats intervals []IntervalStats
} }
func makeThreadStats(s int64, loop int, mode string, intervalNano int64) ThreadStats { func makeThreadStats(s int64, loop int, mode string, intervalNano int64) ThreadStats {
@ -293,8 +293,8 @@ func (ts *ThreadStats) updateIntervals(loop int, mode string, intervalNano int64
if intervalNano < 0 { if intervalNano < 0 {
return ts.curInterval return ts.curInterval
} }
for ts.start + intervalNano*(ts.curInterval+1) < time.Now().UnixNano() { for ts.start+intervalNano*(ts.curInterval+1) < time.Now().UnixNano() {
ts.curInterval++ ts.curInterval++
ts.intervals = append( ts.intervals = append(
ts.intervals, ts.intervals,
IntervalStats{ IntervalStats{
@ -318,10 +318,10 @@ type Stats struct {
threads int threads int
// The loop we are in // The loop we are in
loop int loop int
// Test mode being run // Test mode being run
mode string mode string
// start time in nanoseconds // start time in nanoseconds
startNano int64 startNano int64
// end time in nanoseconds // end time in nanoseconds
endNano int64 endNano int64
// Duration in nanoseconds for each interval // Duration in nanoseconds for each interval
@ -329,7 +329,7 @@ type Stats struct {
// Per-thread statistics // Per-thread statistics
threadStats []ThreadStats threadStats []ThreadStats
// a map of per-interval thread completion counters // a map of per-interval thread completion counters
intervalCompletions sync.Map intervalCompletions sync.Map
// a counter of how many threads have finished updating stats entirely // a counter of how many threads have finished updating stats entirely
completions int32 completions int32
} }
@ -345,73 +345,73 @@ func makeStats(loop int, mode string, threads int, intervalNano int64) Stats {
} }
func (stats *Stats) makeOutputStats(i int64) (OutputStats, bool) { func (stats *Stats) makeOutputStats(i int64) (OutputStats, bool) {
// Check bounds first // Check bounds first
if stats.intervalNano < 0 || i < 0 { if stats.intervalNano < 0 || i < 0 {
return OutputStats{}, false return OutputStats{}, false
} }
// Not safe to log if not all writers have completed. // Not safe to log if not all writers have completed.
value, ok := stats.intervalCompletions.Load(i) value, ok := stats.intervalCompletions.Load(i)
if !ok { if !ok {
return OutputStats{}, false return OutputStats{}, false
} }
cp, ok := value.(*int32) cp, ok := value.(*int32)
if !ok { if !ok {
return OutputStats{}, false return OutputStats{}, false
} }
count := atomic.LoadInt32(cp) count := atomic.LoadInt32(cp)
if count < int32(stats.threads) { if count < int32(stats.threads) {
return OutputStats{}, false return OutputStats{}, false
} }
bytes := int64(0) bytes := int64(0)
ops := int64(0) ops := int64(0)
slowdowns := int64(0) slowdowns := int64(0)
for t := 0; t < stats.threads; t++ { for t := 0; t < stats.threads; t++ {
bytes += stats.threadStats[t].intervals[i].bytes bytes += stats.threadStats[t].intervals[i].bytes
ops += int64(len(stats.threadStats[t].intervals[i].latNano)) ops += int64(len(stats.threadStats[t].intervals[i].latNano))
slowdowns += stats.threadStats[t].intervals[i].slowdowns slowdowns += stats.threadStats[t].intervals[i].slowdowns
} }
// Aggregate the per-thread Latency slice // Aggregate the per-thread Latency slice
tmpLat := make([]int64, ops) tmpLat := make([]int64, ops)
var c int var c int
for t := 0; t < stats.threads; t++ { for t := 0; t < stats.threads; t++ {
c += copy(tmpLat[c:], stats.threadStats[t].intervals[i].latNano) c += copy(tmpLat[c:], stats.threadStats[t].intervals[i].latNano)
} }
sort.Slice(tmpLat, func(i, j int) bool { return tmpLat[i] < tmpLat[j] }) sort.Slice(tmpLat, func(i, j int) bool { return tmpLat[i] < tmpLat[j] })
is := IntervalStats{stats.loop, strconv.FormatInt(i, 10), stats.mode, bytes, slowdowns, stats.intervalNano, tmpLat} is := IntervalStats{stats.loop, strconv.FormatInt(i, 10), stats.mode, bytes, slowdowns, stats.intervalNano, tmpLat}
return is.makeOutputStats(), true return is.makeOutputStats(), true
} }
func (stats *Stats) makeTotalStats() (OutputStats, bool) { func (stats *Stats) makeTotalStats() (OutputStats, bool) {
// Not safe to log if not all writers have completed. // Not safe to log if not all writers have completed.
completions := atomic.LoadInt32(&stats.completions) completions := atomic.LoadInt32(&stats.completions)
if (completions < int32(threads)) { if completions < int32(threads) {
log.Printf("log, completions: %d", completions) log.Printf("log, completions: %d", completions)
return OutputStats{}, false return OutputStats{}, false
} }
bytes := int64(0) bytes := int64(0)
ops := int64(0) ops := int64(0)
slowdowns := int64(0); slowdowns := int64(0)
for t := 0; t < stats.threads; t++ { for t := 0; t < stats.threads; t++ {
for i := 0; i < len(stats.threadStats[t].intervals); i++ { for i := 0; i < len(stats.threadStats[t].intervals); i++ {
bytes += stats.threadStats[t].intervals[i].bytes bytes += stats.threadStats[t].intervals[i].bytes
ops += int64(len(stats.threadStats[t].intervals[i].latNano)) ops += int64(len(stats.threadStats[t].intervals[i].latNano))
slowdowns += stats.threadStats[t].intervals[i].slowdowns slowdowns += stats.threadStats[t].intervals[i].slowdowns
} }
} }
// Aggregate the per-thread Latency slice // Aggregate the per-thread Latency slice
tmpLat := make([]int64, ops) tmpLat := make([]int64, ops)
var c int var c int
for t := 0; t < stats.threads; t++ { for t := 0; t < stats.threads; t++ {
for i := 0; i < len(stats.threadStats[t].intervals); i++ { for i := 0; i < len(stats.threadStats[t].intervals); i++ {
c += copy(tmpLat[c:], stats.threadStats[t].intervals[i].latNano) c += copy(tmpLat[c:], stats.threadStats[t].intervals[i].latNano)
} }
} }
sort.Slice(tmpLat, func(i, j int) bool { return tmpLat[i] < tmpLat[j] }) sort.Slice(tmpLat, func(i, j int) bool { return tmpLat[i] < tmpLat[j] })
is := IntervalStats{stats.loop, "TOTAL", stats.mode, bytes, slowdowns, stats.endNano - stats.startNano, tmpLat} is := IntervalStats{stats.loop, "TOTAL", stats.mode, bytes, slowdowns, stats.endNano - stats.startNano, tmpLat}
return is.makeOutputStats(), true return is.makeOutputStats(), true
} }
@ -477,15 +477,15 @@ func runUpload(thread_num int, fendtime time.Time, stats *Stats) {
if duration_secs > -1 && time.Now().After(endtime) { if duration_secs > -1 && time.Now().After(endtime) {
break break
} }
objnum := atomic.AddInt64(&op_counter, 1) objnum := atomic.AddInt64(&op_counter, 1)
bucket_num := objnum % int64(bucket_count) bucket_num := objnum % int64(bucket_count)
if object_count > -1 && objnum >= object_count { if object_count > -1 && objnum >= object_count {
objnum = atomic.AddInt64(&op_counter, -1) objnum = atomic.AddInt64(&op_counter, -1)
break break
} }
fileobj := bytes.NewReader(object_data) fileobj := bytes.NewReader(object_data)
key := fmt.Sprintf("%s%012d", object_prefix, objnum) key := fmt.Sprintf("%s%012d", object_prefix, objnum)
r := &s3.PutObjectInput{ r := &s3.PutObjectInput{
Bucket: &buckets[bucket_num], Bucket: &buckets[bucket_num],
Key: &key, Key: &key,
@ -496,12 +496,12 @@ func runUpload(thread_num int, fendtime time.Time, stats *Stats) {
// Disable payload checksum calculation (very expensive) // Disable payload checksum calculation (very expensive)
req.HTTPRequest.Header.Add("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD") req.HTTPRequest.Header.Add("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
err := req.Send() err := req.Send()
end := time.Now().UnixNano() end := time.Now().UnixNano()
stats.updateIntervals(thread_num) stats.updateIntervals(thread_num)
if err != nil { if err != nil {
errcnt++ errcnt++
stats.addSlowDown(thread_num); stats.addSlowDown(thread_num)
atomic.AddInt64(&op_counter, -1) atomic.AddInt64(&op_counter, -1)
log.Printf("upload err", err) log.Printf("upload err", err)
} else { } else {
@ -512,7 +512,7 @@ func runUpload(thread_num int, fendtime time.Time, stats *Stats) {
break break
} }
} }
stats.finish(thread_num) stats.finish(thread_num)
atomic.AddInt64(&running_threads, -1) atomic.AddInt64(&running_threads, -1)
} }
@ -521,46 +521,46 @@ func runDownload(thread_num int, fendtime time.Time, stats *Stats) {
svc := s3.New(session.New(), cfg) svc := s3.New(session.New(), cfg)
for { for {
if duration_secs > -1 && time.Now().After(endtime) { if duration_secs > -1 && time.Now().After(endtime) {
break break
} }
objnum := atomic.AddInt64(&op_counter, 1) objnum := atomic.AddInt64(&op_counter, 1)
if object_count > -1 && objnum >= object_count { if object_count > -1 && objnum >= object_count {
atomic.AddInt64(&op_counter, -1) atomic.AddInt64(&op_counter, -1)
break break
} }
bucket_num := objnum % int64(bucket_count) bucket_num := objnum % int64(bucket_count)
key := fmt.Sprintf("%s%012d", object_prefix, objnum) key := fmt.Sprintf("%s%012d", object_prefix, objnum)
r := &s3.GetObjectInput{ r := &s3.GetObjectInput{
Bucket: &buckets[bucket_num], Bucket: &buckets[bucket_num],
Key: &key, Key: &key,
} }
start := time.Now().UnixNano() start := time.Now().UnixNano()
req, resp := svc.GetObjectRequest(r) req, resp := svc.GetObjectRequest(r)
err := req.Send() err := req.Send()
end := time.Now().UnixNano() end := time.Now().UnixNano()
stats.updateIntervals(thread_num) stats.updateIntervals(thread_num)
if err != nil { if err != nil {
errcnt++ errcnt++
stats.addSlowDown(thread_num); stats.addSlowDown(thread_num)
log.Printf("download err", err) log.Printf("download err", err)
} else { } else {
// Update the stats // Update the stats
stats.addOp(thread_num, object_size, end-start) stats.addOp(thread_num, object_size, end-start)
} }
if err == nil { if err == nil {
_, err = io.Copy(ioutil.Discard, resp.Body) _, err = io.Copy(ioutil.Discard, resp.Body)
} }
if errcnt > 2 { if errcnt > 2 {
break break
} }
} }
stats.finish(thread_num) stats.finish(thread_num)
atomic.AddInt64(&running_threads, -1) atomic.AddInt64(&running_threads, -1)
} }
@ -569,43 +569,43 @@ func runDelete(thread_num int, stats *Stats) {
svc := s3.New(session.New(), cfg) svc := s3.New(session.New(), cfg)
for { for {
if duration_secs > -1 && time.Now().After(endtime) { if duration_secs > -1 && time.Now().After(endtime) {
break
}
objnum := atomic.AddInt64(&op_counter, 1)
if object_count > -1 && objnum >= object_count {
atomic.AddInt64(&op_counter, -1)
break break
} }
bucket_num := objnum % int64(bucket_count) objnum := atomic.AddInt64(&op_counter, 1)
if object_count > -1 && objnum >= object_count {
atomic.AddInt64(&op_counter, -1)
break
}
key := fmt.Sprintf("%s%012d", object_prefix, objnum) bucket_num := objnum % int64(bucket_count)
r := &s3.DeleteObjectInput{
Bucket: &buckets[bucket_num], key := fmt.Sprintf("%s%012d", object_prefix, objnum)
Key: &key, r := &s3.DeleteObjectInput{
} Bucket: &buckets[bucket_num],
Key: &key,
}
start := time.Now().UnixNano() start := time.Now().UnixNano()
req, out := svc.DeleteObjectRequest(r) req, out := svc.DeleteObjectRequest(r)
err := req.Send() err := req.Send()
end := time.Now().UnixNano() end := time.Now().UnixNano()
stats.updateIntervals(thread_num) stats.updateIntervals(thread_num)
if err != nil { if err != nil {
errcnt++ errcnt++
stats.addSlowDown(thread_num); stats.addSlowDown(thread_num)
log.Printf("delete err", err, "out", out.String()) log.Printf("delete err", err, "out", out.String())
} else { } else {
// Update the stats // Update the stats
stats.addOp(thread_num, object_size, end-start) stats.addOp(thread_num, object_size, end-start)
} }
if errcnt > 2 { if errcnt > 2 {
break break
} }
} }
stats.finish(thread_num) stats.finish(thread_num)
atomic.AddInt64(&running_threads, -1) atomic.AddInt64(&running_threads, -1)
} }
@ -615,9 +615,9 @@ func runBucketDelete(thread_num int, stats *Stats) {
for { for {
bucket_num := atomic.AddInt64(&op_counter, 1) bucket_num := atomic.AddInt64(&op_counter, 1)
if bucket_num >= bucket_count { if bucket_num >= bucket_count {
atomic.AddInt64(&op_counter, -1) atomic.AddInt64(&op_counter, -1)
break break
} }
r := &s3.DeleteBucketInput{ r := &s3.DeleteBucketInput{
Bucket: &buckets[bucket_num], Bucket: &buckets[bucket_num],
} }
@ -627,156 +627,156 @@ func runBucketDelete(thread_num int, stats *Stats) {
end := time.Now().UnixNano() end := time.Now().UnixNano()
stats.updateIntervals(thread_num) stats.updateIntervals(thread_num)
if err != nil { if err != nil {
break break
} }
stats.addOp(thread_num, 0, end-start) stats.addOp(thread_num, 0, end-start)
} }
stats.finish(thread_num) stats.finish(thread_num)
atomic.AddInt64(&running_threads, -1) atomic.AddInt64(&running_threads, -1)
} }
var cfg *aws.Config var cfg *aws.Config
func runBucketsInit(thread_num int, stats *Stats) { func runBucketsInit(thread_num int, stats *Stats) {
svc := s3.New(session.New(), cfg) svc := s3.New(session.New(), cfg)
for { for {
bucket_num := atomic.AddInt64(&op_counter, 1) bucket_num := atomic.AddInt64(&op_counter, 1)
if bucket_num >= bucket_count { if bucket_num >= bucket_count {
atomic.AddInt64(&op_counter, -1) atomic.AddInt64(&op_counter, -1)
break break
} }
start := time.Now().UnixNano() start := time.Now().UnixNano()
in := &s3.CreateBucketInput{Bucket: aws.String(buckets[bucket_num])} in := &s3.CreateBucketInput{Bucket: aws.String(buckets[bucket_num])}
_, err := svc.CreateBucket(in) _, err := svc.CreateBucket(in)
end := time.Now().UnixNano() end := time.Now().UnixNano()
stats.updateIntervals(thread_num) stats.updateIntervals(thread_num)
if err != nil { if err != nil {
if !strings.Contains(err.Error(), s3.ErrCodeBucketAlreadyOwnedByYou) && if !strings.Contains(err.Error(), s3.ErrCodeBucketAlreadyOwnedByYou) &&
!strings.Contains(err.Error(), "BucketAlreadyExists") { !strings.Contains(err.Error(), "BucketAlreadyExists") {
log.Fatalf("FATAL: Unable to create bucket %s (is your access and secret correct?): %v", buckets[bucket_num], err) log.Fatalf("FATAL: Unable to create bucket %s (is your access and secret correct?): %v", buckets[bucket_num], err)
} }
} }
stats.addOp(thread_num, 0, end-start) stats.addOp(thread_num, 0, end-start)
} }
stats.finish(thread_num) stats.finish(thread_num)
atomic.AddInt64(&running_threads, -1) atomic.AddInt64(&running_threads, -1)
} }
func runBucketsClear(thread_num int, stats *Stats) { func runBucketsClear(thread_num int, stats *Stats) {
svc := s3.New(session.New(), cfg) svc := s3.New(session.New(), cfg)
for { for {
bucket_num := atomic.AddInt64(&op_counter, 1) bucket_num := atomic.AddInt64(&op_counter, 1)
if bucket_num >= bucket_count { if bucket_num >= bucket_count {
atomic.AddInt64(&op_counter, -1) atomic.AddInt64(&op_counter, -1)
break
}
out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]})
if err != nil {
break break
} }
n := len(out.Contents) out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]})
for n > 0 { if err != nil {
for _, v := range out.Contents { break
}
n := len(out.Contents)
for n > 0 {
for _, v := range out.Contents {
start := time.Now().UnixNano() start := time.Now().UnixNano()
svc.DeleteObject(&s3.DeleteObjectInput{ svc.DeleteObject(&s3.DeleteObjectInput{
Bucket: &buckets[bucket_num], Bucket: &buckets[bucket_num],
Key: v.Key, Key: v.Key,
}) })
end := time.Now().UnixNano() end := time.Now().UnixNano()
stats.updateIntervals(thread_num) stats.updateIntervals(thread_num)
stats.addOp(thread_num, *v.Size, end-start) stats.addOp(thread_num, *v.Size, end-start)
} }
out, err = svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]}) out, err = svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]})
if err != nil { if err != nil {
break break
} }
n = len(out.Contents) n = len(out.Contents)
} }
} }
stats.finish(thread_num) stats.finish(thread_num)
atomic.AddInt64(&running_threads, -1) atomic.AddInt64(&running_threads, -1)
} }
func runWrapper(loop int, r rune) []OutputStats { func runWrapper(loop int, r rune) []OutputStats {
op_counter = -1 op_counter = -1
running_threads = int64(threads) running_threads = int64(threads)
intervalNano := int64(interval*1000000000) intervalNano := int64(interval * 1000000000)
endtime = time.Now().Add(time.Second * time.Duration(duration_secs)) endtime = time.Now().Add(time.Second * time.Duration(duration_secs))
var stats Stats var stats Stats
// If we perviously set the object count after running a put // If we perviously set the object count after running a put
// test, set the object count back to -1 for the new put test. // test, set the object count back to -1 for the new put test.
if r == 'p' && object_count_flag { if r == 'p' && object_count_flag {
object_count = -1 object_count = -1
object_count_flag = false object_count_flag = false
} }
switch r { switch r {
case 'c': case 'c':
log.Printf("Running Loop %d BUCKET CLEAR TEST", loop) log.Printf("Running Loop %d BUCKET CLEAR TEST", loop)
stats = makeStats(loop, "BCLR", threads, intervalNano) stats = makeStats(loop, "BCLR", threads, intervalNano)
for n := 0; n < threads; n++ { for n := 0; n < threads; n++ {
go runBucketsClear(n, &stats); go runBucketsClear(n, &stats)
} }
case 'x': case 'x':
log.Printf("Running Loop %d BUCKET DELETE TEST", loop) log.Printf("Running Loop %d BUCKET DELETE TEST", loop)
stats = makeStats(loop, "BDEL", threads, intervalNano) stats = makeStats(loop, "BDEL", threads, intervalNano)
for n := 0; n < threads; n++ { for n := 0; n < threads; n++ {
go runBucketDelete(n, &stats); go runBucketDelete(n, &stats)
} }
case 'i': case 'i':
log.Printf("Running Loop %d BUCKET INIT TEST", loop) log.Printf("Running Loop %d BUCKET INIT TEST", loop)
stats = makeStats(loop, "BINIT", threads, intervalNano) stats = makeStats(loop, "BINIT", threads, intervalNano)
for n := 0; n < threads; n++ { for n := 0; n < threads; n++ {
go runBucketsInit(n, &stats); go runBucketsInit(n, &stats)
} }
case 'p': case 'p':
log.Printf("Running Loop %d OBJECT PUT TEST", loop) log.Printf("Running Loop %d OBJECT PUT TEST", loop)
stats = makeStats(loop, "PUT", threads, intervalNano) stats = makeStats(loop, "PUT", threads, intervalNano)
for n := 0; n < threads; n++ { for n := 0; n < threads; n++ {
go runUpload(n, endtime, &stats); go runUpload(n, endtime, &stats)
} }
case 'g': case 'g':
log.Printf("Running Loop %d OBJECT GET TEST", loop) log.Printf("Running Loop %d OBJECT GET TEST", loop)
stats = makeStats(loop, "GET", threads, intervalNano) stats = makeStats(loop, "GET", threads, intervalNano)
for n := 0; n < threads; n++ { for n := 0; n < threads; n++ {
go runDownload(n, endtime, &stats); go runDownload(n, endtime, &stats)
} }
case 'd': case 'd':
log.Printf("Running Loop %d OBJECT DELETE TEST", loop) log.Printf("Running Loop %d OBJECT DELETE TEST", loop)
stats = makeStats(loop, "DEL", threads, intervalNano) stats = makeStats(loop, "DEL", threads, intervalNano)
for n := 0; n < threads; n++ { for n := 0; n < threads; n++ {
go runDelete(n, &stats); go runDelete(n, &stats)
} }
} }
// Wait for it to finish // Wait for it to finish
for atomic.LoadInt64(&running_threads) > 0 { for atomic.LoadInt64(&running_threads) > 0 {
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
} }
// If the user didn't set the object_count, we can set it here // If the user didn't set the object_count, we can set it here
// to limit subsequent get/del tests to valid objects only. // to limit subsequent get/del tests to valid objects only.
if r == 'p' && object_count < 0 { if r == 'p' && object_count < 0 {
object_count = op_counter + 1 object_count = op_counter + 1
object_count_flag = true object_count_flag = true
} }
// Create the Output Stats // Create the Output Stats
os := make([]OutputStats, 0) os := make([]OutputStats, 0)
for i := int64(0); i >= 0; i++ { for i := int64(0); i >= 0; i++ {
if o, ok := stats.makeOutputStats(i); ok { if o, ok := stats.makeOutputStats(i); ok {
os = append(os, o) os = append(os, o)
} else { } else {
break break
} }
} }
if o, ok := stats.makeTotalStats(); ok { if o, ok := stats.makeTotalStats(); ok {
o.log() o.log()
os = append(os, o) os = append(os, o)
} }
@ -789,22 +789,22 @@ func init() {
myflag.StringVar(&access_key, "a", os.Getenv("AWS_ACCESS_KEY_ID"), "Access key") myflag.StringVar(&access_key, "a", os.Getenv("AWS_ACCESS_KEY_ID"), "Access key")
myflag.StringVar(&secret_key, "s", os.Getenv("AWS_SECRET_ACCESS_KEY"), "Secret key") myflag.StringVar(&secret_key, "s", os.Getenv("AWS_SECRET_ACCESS_KEY"), "Secret key")
myflag.StringVar(&url_host, "u", os.Getenv("AWS_HOST"), "URL for host with method prefix") myflag.StringVar(&url_host, "u", os.Getenv("AWS_HOST"), "URL for host with method prefix")
myflag.StringVar(&object_prefix, "op", "", "Prefix for objects") myflag.StringVar(&object_prefix, "op", "", "Prefix for objects")
myflag.StringVar(&bucket_prefix, "bp", "hotsauce_bench", "Prefix for buckets") myflag.StringVar(&bucket_prefix, "bp", "hotsauce_bench", "Prefix for buckets")
myflag.StringVar(&region, "r", "us-east-1", "Region for testing") myflag.StringVar(&region, "r", "us-east-1", "Region for testing")
myflag.StringVar(&modes, "m", "cxipgdx", "Run modes in order. See NOTES for more info") myflag.StringVar(&modes, "m", "cxipgdx", "Run modes in order. See NOTES for more info")
myflag.StringVar(&output, "o", "", "Write CSV output to this file") myflag.StringVar(&output, "o", "", "Write CSV output to this file")
myflag.StringVar(&json_output, "j", "", "Write JSON output to this file") myflag.StringVar(&json_output, "j", "", "Write JSON output to this file")
myflag.Int64Var(&object_count, "n", -1, "Maximum number of objects <-1 for unlimited>") myflag.Int64Var(&object_count, "n", -1, "Maximum number of objects <-1 for unlimited>")
myflag.Int64Var(&bucket_count, "b", 1, "Number of buckets to distribute IOs across") myflag.Int64Var(&bucket_count, "b", 1, "Number of buckets to distribute IOs across")
myflag.IntVar(&duration_secs, "d", 60, "Maximum test duration in seconds <-1 for unlimited>") myflag.IntVar(&duration_secs, "d", 60, "Maximum test duration in seconds <-1 for unlimited>")
myflag.IntVar(&threads, "t", 1, "Number of threads to run") myflag.IntVar(&threads, "t", 1, "Number of threads to run")
myflag.IntVar(&loops, "l", 1, "Number of times to repeat test") myflag.IntVar(&loops, "l", 1, "Number of times to repeat test")
myflag.StringVar(&sizeArg, "z", "1M", "Size of objects in bytes with postfix K, M, and G") myflag.StringVar(&sizeArg, "z", "1M", "Size of objects in bytes with postfix K, M, and G")
myflag.Float64Var(&interval, "ri", 1.0, "Number of seconds between report intervals") myflag.Float64Var(&interval, "ri", 1.0, "Number of seconds between report intervals")
// define custom usage output with notes // define custom usage output with notes
notes := notes :=
` `
NOTES: NOTES:
- Valid mode types for the -m mode string are: - Valid mode types for the -m mode string are:
c: clear all existing objects from buckets (requires lookups) c: clear all existing objects from buckets (requires lookups)
@ -823,7 +823,7 @@ NOTES:
fmt.Fprintf(flag.CommandLine.Output(), "\nUSAGE: %s [OPTIONS]\n\n", os.Args[0]) fmt.Fprintf(flag.CommandLine.Output(), "\nUSAGE: %s [OPTIONS]\n\n", os.Args[0])
fmt.Fprintf(flag.CommandLine.Output(), "OPTIONS:\n") fmt.Fprintf(flag.CommandLine.Output(), "OPTIONS:\n")
myflag.PrintDefaults() myflag.PrintDefaults()
fmt.Fprintf(flag.CommandLine.Output(), notes); fmt.Fprintf(flag.CommandLine.Output(), notes)
} }
if err := myflag.Parse(os.Args[1:]); err != nil { if err := myflag.Parse(os.Args[1:]); err != nil {
@ -845,13 +845,12 @@ NOTES:
} }
invalid_mode := false invalid_mode := false
for _, r := range modes { for _, r := range modes {
if ( if r != 'i' &&
r != 'i' && r != 'c' &&
r != 'c' && r != 'p' &&
r != 'p' && r != 'g' &&
r != 'g' && r != 'd' &&
r != 'd' && r != 'x' {
r != 'x') {
s := fmt.Sprintf("Invalid mode '%s' passed to -m", string(r)) s := fmt.Sprintf("Invalid mode '%s' passed to -m", string(r))
log.Printf(s) log.Printf(s)
invalid_mode = true invalid_mode = true
@ -859,7 +858,7 @@ NOTES:
} }
if invalid_mode { if invalid_mode {
log.Fatal("Invalid modes passed to -m, see help for details.") log.Fatal("Invalid modes passed to -m, see help for details.")
} }
var err error var err error
var size uint64 var size uint64
if size, err = bytefmt.ToBytes(sizeArg); err != nil { if size, err = bytefmt.ToBytes(sizeArg); err != nil {
@ -869,12 +868,12 @@ NOTES:
} }
func initData() { func initData() {
// Initialize data for the bucket // Initialize data for the bucket
object_data = make([]byte, object_size) object_data = make([]byte, object_size)
rand.Read(object_data) rand.Read(object_data)
hasher := md5.New() hasher := md5.New()
hasher.Write(object_data) hasher.Write(object_data)
object_data_md5 = base64.StdEncoding.EncodeToString(hasher.Sum(nil)) object_data_md5 = base64.StdEncoding.EncodeToString(hasher.Sum(nil))
} }
func main() { func main() {
@ -911,51 +910,51 @@ func main() {
initData() initData()
// Setup the slice of buckets // Setup the slice of buckets
for i := int64(0); i < bucket_count; i++ { for i := int64(0); i < bucket_count; i++ {
buckets = append(buckets, fmt.Sprintf("%s%012d", bucket_prefix, i)) buckets = append(buckets, fmt.Sprintf("%s%012d", bucket_prefix, i))
} }
// Loop running the tests // Loop running the tests
oStats := make([]OutputStats, 0) oStats := make([]OutputStats, 0)
for loop := 0; loop < loops; loop++ { for loop := 0; loop < loops; loop++ {
for _, r := range modes { for _, r := range modes {
oStats = append(oStats, runWrapper(loop, r)...) oStats = append(oStats, runWrapper(loop, r)...)
} }
} }
// Write CSV Output // Write CSV Output
if output != "" { if output != "" {
file, err := os.OpenFile(output, os.O_CREATE|os.O_WRONLY, 0777) file, err := os.OpenFile(output, os.O_CREATE|os.O_WRONLY, 0777)
defer file.Close() defer file.Close()
if err != nil { if err != nil {
log.Fatal("Could not open CSV file for writing.") log.Fatal("Could not open CSV file for writing.")
} else { } else {
csvWriter := csv.NewWriter(file) csvWriter := csv.NewWriter(file)
for i, o := range oStats { for i, o := range oStats {
if i == 0 { if i == 0 {
o.csv_header(csvWriter) o.csv_header(csvWriter)
} }
o.csv(csvWriter) o.csv(csvWriter)
} }
csvWriter.Flush() csvWriter.Flush()
} }
} }
// Write JSON output // Write JSON output
if json_output != "" { if json_output != "" {
file, err := os.OpenFile(json_output, os.O_CREATE|os.O_WRONLY, 0777) file, err := os.OpenFile(json_output, os.O_CREATE|os.O_WRONLY, 0777)
defer file.Close() defer file.Close()
if err != nil { if err != nil {
log.Fatal("Could not open JSON file for writing.") log.Fatal("Could not open JSON file for writing.")
} }
data, err := json.Marshal(oStats) data, err := json.Marshal(oStats)
if err != nil { if err != nil {
log.Fatal("Error marshaling JSON: ", err) log.Fatal("Error marshaling JSON: ", err)
} }
_, err = file.Write(data) _, err = file.Write(data)
if err != nil { if err != nil {
log.Fatal("Error writing to JSON file: ", err) log.Fatal("Error writing to JSON file: ", err)
} }
file.Sync() file.Sync()
} }
} }