parent
f88d05d4fd
commit
9cafa0c4c4
653
hsbench.go
653
hsbench.go
|
@ -6,6 +6,7 @@ package main
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"code.cloudfoundry.org/bytefmt"
|
||||
"crypto/hmac"
|
||||
"crypto/md5"
|
||||
"crypto/sha1"
|
||||
|
@ -15,6 +16,10 @@ import (
|
|||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
|
@ -29,11 +34,6 @@ import (
|
|||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"code.cloudfoundry.org/bytefmt"
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
)
|
||||
|
||||
// Global variables
|
||||
|
@ -136,82 +136,82 @@ func setSignature(req *http.Request) {
|
|||
}
|
||||
|
||||
type IntervalStats struct {
|
||||
loop int
|
||||
name string
|
||||
mode string
|
||||
bytes int64
|
||||
slowdowns int64
|
||||
intervalNano int64
|
||||
latNano []int64
|
||||
loop int
|
||||
name string
|
||||
mode string
|
||||
bytes int64
|
||||
slowdowns int64
|
||||
intervalNano int64
|
||||
latNano []int64
|
||||
}
|
||||
|
||||
func (is *IntervalStats) makeOutputStats() OutputStats {
|
||||
// Compute and log the stats
|
||||
ops := len(is.latNano)
|
||||
totalLat := int64(0);
|
||||
minLat := float64(0);
|
||||
maxLat := float64(0);
|
||||
NinetyNineLat := float64(0);
|
||||
avgLat := float64(0);
|
||||
if ops > 0 {
|
||||
minLat = float64(is.latNano[0]) / 1000000
|
||||
maxLat = float64(is.latNano[ops - 1]) / 1000000
|
||||
for i := range is.latNano {
|
||||
totalLat += is.latNano[i]
|
||||
}
|
||||
avgLat = float64(totalLat) / float64(ops) / 1000000
|
||||
NintyNineLatNano := is.latNano[int64(math.Round(0.99*float64(ops))) - 1]
|
||||
NinetyNineLat = float64(NintyNineLatNano) / 1000000
|
||||
}
|
||||
seconds := float64(is.intervalNano) / 1000000000
|
||||
mbps := float64(is.bytes) / seconds / bytefmt.MEGABYTE
|
||||
iops := float64(ops) / seconds
|
||||
// Compute and log the stats
|
||||
ops := len(is.latNano)
|
||||
totalLat := int64(0)
|
||||
minLat := float64(0)
|
||||
maxLat := float64(0)
|
||||
NinetyNineLat := float64(0)
|
||||
avgLat := float64(0)
|
||||
if ops > 0 {
|
||||
minLat = float64(is.latNano[0]) / 1000000
|
||||
maxLat = float64(is.latNano[ops-1]) / 1000000
|
||||
for i := range is.latNano {
|
||||
totalLat += is.latNano[i]
|
||||
}
|
||||
avgLat = float64(totalLat) / float64(ops) / 1000000
|
||||
NintyNineLatNano := is.latNano[int64(math.Round(0.99*float64(ops)))-1]
|
||||
NinetyNineLat = float64(NintyNineLatNano) / 1000000
|
||||
}
|
||||
seconds := float64(is.intervalNano) / 1000000000
|
||||
mbps := float64(is.bytes) / seconds / bytefmt.MEGABYTE
|
||||
iops := float64(ops) / seconds
|
||||
|
||||
return OutputStats{
|
||||
is.loop,
|
||||
is.name,
|
||||
seconds,
|
||||
is.mode,
|
||||
ops,
|
||||
mbps,
|
||||
iops,
|
||||
minLat,
|
||||
avgLat,
|
||||
NinetyNineLat,
|
||||
maxLat,
|
||||
is.slowdowns}
|
||||
return OutputStats{
|
||||
is.loop,
|
||||
is.name,
|
||||
seconds,
|
||||
is.mode,
|
||||
ops,
|
||||
mbps,
|
||||
iops,
|
||||
minLat,
|
||||
avgLat,
|
||||
NinetyNineLat,
|
||||
maxLat,
|
||||
is.slowdowns}
|
||||
}
|
||||
|
||||
type OutputStats struct {
|
||||
Loop int
|
||||
IntervalName string
|
||||
Seconds float64
|
||||
Mode string
|
||||
Ops int
|
||||
Mbps float64
|
||||
Iops float64
|
||||
MinLat float64
|
||||
AvgLat float64
|
||||
NinetyNineLat float64
|
||||
MaxLat float64
|
||||
Slowdowns int64
|
||||
Loop int
|
||||
IntervalName string
|
||||
Seconds float64
|
||||
Mode string
|
||||
Ops int
|
||||
Mbps float64
|
||||
Iops float64
|
||||
MinLat float64
|
||||
AvgLat float64
|
||||
NinetyNineLat float64
|
||||
MaxLat float64
|
||||
Slowdowns int64
|
||||
}
|
||||
|
||||
func (o *OutputStats) log() {
|
||||
log.Printf(
|
||||
"Loop: %d, Int: %s, Dur(s): %.1f, Mode: %s, Ops: %d, MB/s: %.2f, IO/s: %.0f, Lat(ms): [ min: %.1f, avg: %.1f, 99%%: %.1f, max: %.1f ], Slowdowns: %d",
|
||||
o.Loop,
|
||||
o.IntervalName,
|
||||
o.Seconds,
|
||||
o.Mode,
|
||||
o.Ops,
|
||||
o.Mbps,
|
||||
o.Iops,
|
||||
o.MinLat,
|
||||
o.AvgLat,
|
||||
o.NinetyNineLat,
|
||||
o.MaxLat,
|
||||
o.Slowdowns)
|
||||
"Loop: %d, Int: %s, Dur(s): %.1f, Mode: %s, Ops: %d, MB/s: %.2f, IO/s: %.0f, Lat(ms): [ min: %.1f, avg: %.1f, 99%%: %.1f, max: %.1f ], Slowdowns: %d",
|
||||
o.Loop,
|
||||
o.IntervalName,
|
||||
o.Seconds,
|
||||
o.Mode,
|
||||
o.Ops,
|
||||
o.Mbps,
|
||||
o.Iops,
|
||||
o.MinLat,
|
||||
o.AvgLat,
|
||||
o.NinetyNineLat,
|
||||
o.MaxLat,
|
||||
o.Slowdowns)
|
||||
}
|
||||
|
||||
func (o *OutputStats) csv_header(w *csv.Writer) {
|
||||
|
@ -242,7 +242,7 @@ func (o *OutputStats) csv(w *csv.Writer) {
|
|||
log.Fatal("OutputStats Passed nil csv writer")
|
||||
}
|
||||
|
||||
s := []string {
|
||||
s := []string{
|
||||
strconv.Itoa(o.Loop),
|
||||
o.IntervalName,
|
||||
strconv.FormatFloat(o.Seconds, 'f', 2, 64),
|
||||
|
@ -257,7 +257,7 @@ func (o *OutputStats) csv(w *csv.Writer) {
|
|||
strconv.FormatInt(o.Slowdowns, 10)}
|
||||
|
||||
if err := w.Write(s); err != nil {
|
||||
log.Fatal("Error writing to CSV writer: ",err)
|
||||
log.Fatal("Error writing to CSV writer: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -277,9 +277,9 @@ func (o *OutputStats) json(jfile *os.File) {
|
|||
}
|
||||
|
||||
type ThreadStats struct {
|
||||
start int64
|
||||
start int64
|
||||
curInterval int64
|
||||
intervals []IntervalStats
|
||||
intervals []IntervalStats
|
||||
}
|
||||
|
||||
func makeThreadStats(s int64, loop int, mode string, intervalNano int64) ThreadStats {
|
||||
|
@ -293,8 +293,8 @@ func (ts *ThreadStats) updateIntervals(loop int, mode string, intervalNano int64
|
|||
if intervalNano < 0 {
|
||||
return ts.curInterval
|
||||
}
|
||||
for ts.start + intervalNano*(ts.curInterval+1) < time.Now().UnixNano() {
|
||||
ts.curInterval++
|
||||
for ts.start+intervalNano*(ts.curInterval+1) < time.Now().UnixNano() {
|
||||
ts.curInterval++
|
||||
ts.intervals = append(
|
||||
ts.intervals,
|
||||
IntervalStats{
|
||||
|
@ -319,9 +319,9 @@ type Stats struct {
|
|||
// The loop we are in
|
||||
loop int
|
||||
// Test mode being run
|
||||
mode string
|
||||
mode string
|
||||
// start time in nanoseconds
|
||||
startNano int64
|
||||
startNano int64
|
||||
// end time in nanoseconds
|
||||
endNano int64
|
||||
// Duration in nanoseconds for each interval
|
||||
|
@ -345,73 +345,73 @@ func makeStats(loop int, mode string, threads int, intervalNano int64) Stats {
|
|||
}
|
||||
|
||||
func (stats *Stats) makeOutputStats(i int64) (OutputStats, bool) {
|
||||
// Check bounds first
|
||||
if stats.intervalNano < 0 || i < 0 {
|
||||
return OutputStats{}, false
|
||||
}
|
||||
// Not safe to log if not all writers have completed.
|
||||
value, ok := stats.intervalCompletions.Load(i)
|
||||
if !ok {
|
||||
return OutputStats{}, false
|
||||
}
|
||||
cp, ok := value.(*int32)
|
||||
if !ok {
|
||||
return OutputStats{}, false
|
||||
}
|
||||
count := atomic.LoadInt32(cp)
|
||||
if count < int32(stats.threads) {
|
||||
return OutputStats{}, false
|
||||
}
|
||||
// Check bounds first
|
||||
if stats.intervalNano < 0 || i < 0 {
|
||||
return OutputStats{}, false
|
||||
}
|
||||
// Not safe to log if not all writers have completed.
|
||||
value, ok := stats.intervalCompletions.Load(i)
|
||||
if !ok {
|
||||
return OutputStats{}, false
|
||||
}
|
||||
cp, ok := value.(*int32)
|
||||
if !ok {
|
||||
return OutputStats{}, false
|
||||
}
|
||||
count := atomic.LoadInt32(cp)
|
||||
if count < int32(stats.threads) {
|
||||
return OutputStats{}, false
|
||||
}
|
||||
|
||||
bytes := int64(0)
|
||||
ops := int64(0)
|
||||
slowdowns := int64(0)
|
||||
bytes := int64(0)
|
||||
ops := int64(0)
|
||||
slowdowns := int64(0)
|
||||
|
||||
for t := 0; t < stats.threads; t++ {
|
||||
bytes += stats.threadStats[t].intervals[i].bytes
|
||||
ops += int64(len(stats.threadStats[t].intervals[i].latNano))
|
||||
slowdowns += stats.threadStats[t].intervals[i].slowdowns
|
||||
}
|
||||
// Aggregate the per-thread Latency slice
|
||||
tmpLat := make([]int64, ops)
|
||||
var c int
|
||||
for t := 0; t < stats.threads; t++ {
|
||||
c += copy(tmpLat[c:], stats.threadStats[t].intervals[i].latNano)
|
||||
}
|
||||
sort.Slice(tmpLat, func(i, j int) bool { return tmpLat[i] < tmpLat[j] })
|
||||
is := IntervalStats{stats.loop, strconv.FormatInt(i, 10), stats.mode, bytes, slowdowns, stats.intervalNano, tmpLat}
|
||||
for t := 0; t < stats.threads; t++ {
|
||||
bytes += stats.threadStats[t].intervals[i].bytes
|
||||
ops += int64(len(stats.threadStats[t].intervals[i].latNano))
|
||||
slowdowns += stats.threadStats[t].intervals[i].slowdowns
|
||||
}
|
||||
// Aggregate the per-thread Latency slice
|
||||
tmpLat := make([]int64, ops)
|
||||
var c int
|
||||
for t := 0; t < stats.threads; t++ {
|
||||
c += copy(tmpLat[c:], stats.threadStats[t].intervals[i].latNano)
|
||||
}
|
||||
sort.Slice(tmpLat, func(i, j int) bool { return tmpLat[i] < tmpLat[j] })
|
||||
is := IntervalStats{stats.loop, strconv.FormatInt(i, 10), stats.mode, bytes, slowdowns, stats.intervalNano, tmpLat}
|
||||
return is.makeOutputStats(), true
|
||||
}
|
||||
|
||||
func (stats *Stats) makeTotalStats() (OutputStats, bool) {
|
||||
// Not safe to log if not all writers have completed.
|
||||
completions := atomic.LoadInt32(&stats.completions)
|
||||
if (completions < int32(threads)) {
|
||||
log.Printf("log, completions: %d", completions)
|
||||
return OutputStats{}, false
|
||||
}
|
||||
// Not safe to log if not all writers have completed.
|
||||
completions := atomic.LoadInt32(&stats.completions)
|
||||
if completions < int32(threads) {
|
||||
log.Printf("log, completions: %d", completions)
|
||||
return OutputStats{}, false
|
||||
}
|
||||
|
||||
bytes := int64(0)
|
||||
ops := int64(0)
|
||||
slowdowns := int64(0);
|
||||
bytes := int64(0)
|
||||
ops := int64(0)
|
||||
slowdowns := int64(0)
|
||||
|
||||
for t := 0; t < stats.threads; t++ {
|
||||
for i := 0; i < len(stats.threadStats[t].intervals); i++ {
|
||||
bytes += stats.threadStats[t].intervals[i].bytes
|
||||
ops += int64(len(stats.threadStats[t].intervals[i].latNano))
|
||||
slowdowns += stats.threadStats[t].intervals[i].slowdowns
|
||||
}
|
||||
bytes += stats.threadStats[t].intervals[i].bytes
|
||||
ops += int64(len(stats.threadStats[t].intervals[i].latNano))
|
||||
slowdowns += stats.threadStats[t].intervals[i].slowdowns
|
||||
}
|
||||
}
|
||||
// Aggregate the per-thread Latency slice
|
||||
tmpLat := make([]int64, ops)
|
||||
var c int
|
||||
for t := 0; t < stats.threads; t++ {
|
||||
for i := 0; i < len(stats.threadStats[t].intervals); i++ {
|
||||
c += copy(tmpLat[c:], stats.threadStats[t].intervals[i].latNano)
|
||||
}
|
||||
// Aggregate the per-thread Latency slice
|
||||
tmpLat := make([]int64, ops)
|
||||
var c int
|
||||
for t := 0; t < stats.threads; t++ {
|
||||
for i := 0; i < len(stats.threadStats[t].intervals); i++ {
|
||||
c += copy(tmpLat[c:], stats.threadStats[t].intervals[i].latNano)
|
||||
}
|
||||
}
|
||||
sort.Slice(tmpLat, func(i, j int) bool { return tmpLat[i] < tmpLat[j] })
|
||||
is := IntervalStats{stats.loop, "TOTAL", stats.mode, bytes, slowdowns, stats.endNano - stats.startNano, tmpLat}
|
||||
sort.Slice(tmpLat, func(i, j int) bool { return tmpLat[i] < tmpLat[j] })
|
||||
is := IntervalStats{stats.loop, "TOTAL", stats.mode, bytes, slowdowns, stats.endNano - stats.startNano, tmpLat}
|
||||
return is.makeOutputStats(), true
|
||||
}
|
||||
|
||||
|
@ -477,15 +477,15 @@ func runUpload(thread_num int, fendtime time.Time, stats *Stats) {
|
|||
if duration_secs > -1 && time.Now().After(endtime) {
|
||||
break
|
||||
}
|
||||
objnum := atomic.AddInt64(&op_counter, 1)
|
||||
bucket_num := objnum % int64(bucket_count)
|
||||
objnum := atomic.AddInt64(&op_counter, 1)
|
||||
bucket_num := objnum % int64(bucket_count)
|
||||
if object_count > -1 && objnum >= object_count {
|
||||
objnum = atomic.AddInt64(&op_counter, -1)
|
||||
break
|
||||
}
|
||||
fileobj := bytes.NewReader(object_data)
|
||||
|
||||
key := fmt.Sprintf("%s%012d", object_prefix, objnum)
|
||||
key := fmt.Sprintf("%s%012d", object_prefix, objnum)
|
||||
r := &s3.PutObjectInput{
|
||||
Bucket: &buckets[bucket_num],
|
||||
Key: &key,
|
||||
|
@ -496,12 +496,12 @@ func runUpload(thread_num int, fendtime time.Time, stats *Stats) {
|
|||
// Disable payload checksum calculation (very expensive)
|
||||
req.HTTPRequest.Header.Add("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
|
||||
err := req.Send()
|
||||
end := time.Now().UnixNano()
|
||||
stats.updateIntervals(thread_num)
|
||||
end := time.Now().UnixNano()
|
||||
stats.updateIntervals(thread_num)
|
||||
|
||||
if err != nil {
|
||||
errcnt++
|
||||
stats.addSlowDown(thread_num);
|
||||
stats.addSlowDown(thread_num)
|
||||
atomic.AddInt64(&op_counter, -1)
|
||||
log.Printf("upload err", err)
|
||||
} else {
|
||||
|
@ -512,7 +512,7 @@ func runUpload(thread_num int, fendtime time.Time, stats *Stats) {
|
|||
break
|
||||
}
|
||||
}
|
||||
stats.finish(thread_num)
|
||||
stats.finish(thread_num)
|
||||
atomic.AddInt64(&running_threads, -1)
|
||||
}
|
||||
|
||||
|
@ -521,46 +521,46 @@ func runDownload(thread_num int, fendtime time.Time, stats *Stats) {
|
|||
svc := s3.New(session.New(), cfg)
|
||||
for {
|
||||
if duration_secs > -1 && time.Now().After(endtime) {
|
||||
break
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
objnum := atomic.AddInt64(&op_counter, 1)
|
||||
if object_count > -1 && objnum >= object_count {
|
||||
if object_count > -1 && objnum >= object_count {
|
||||
atomic.AddInt64(&op_counter, -1)
|
||||
break
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
bucket_num := objnum % int64(bucket_count)
|
||||
key := fmt.Sprintf("%s%012d", object_prefix, objnum)
|
||||
r := &s3.GetObjectInput{
|
||||
Bucket: &buckets[bucket_num],
|
||||
Key: &key,
|
||||
}
|
||||
key := fmt.Sprintf("%s%012d", object_prefix, objnum)
|
||||
r := &s3.GetObjectInput{
|
||||
Bucket: &buckets[bucket_num],
|
||||
Key: &key,
|
||||
}
|
||||
|
||||
start := time.Now().UnixNano()
|
||||
req, resp := svc.GetObjectRequest(r)
|
||||
err := req.Send()
|
||||
req, resp := svc.GetObjectRequest(r)
|
||||
err := req.Send()
|
||||
end := time.Now().UnixNano()
|
||||
stats.updateIntervals(thread_num)
|
||||
|
||||
if err != nil {
|
||||
errcnt++
|
||||
stats.addSlowDown(thread_num);
|
||||
log.Printf("download err", err)
|
||||
if err != nil {
|
||||
errcnt++
|
||||
stats.addSlowDown(thread_num)
|
||||
log.Printf("download err", err)
|
||||
} else {
|
||||
// Update the stats
|
||||
stats.addOp(thread_num, object_size, end-start)
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
_, err = io.Copy(ioutil.Discard, resp.Body)
|
||||
}
|
||||
if errcnt > 2 {
|
||||
break
|
||||
}
|
||||
if err == nil {
|
||||
_, err = io.Copy(ioutil.Discard, resp.Body)
|
||||
}
|
||||
if errcnt > 2 {
|
||||
break
|
||||
}
|
||||
|
||||
}
|
||||
stats.finish(thread_num)
|
||||
stats.finish(thread_num)
|
||||
atomic.AddInt64(&running_threads, -1)
|
||||
}
|
||||
|
||||
|
@ -569,43 +569,43 @@ func runDelete(thread_num int, stats *Stats) {
|
|||
svc := s3.New(session.New(), cfg)
|
||||
|
||||
for {
|
||||
if duration_secs > -1 && time.Now().After(endtime) {
|
||||
break
|
||||
}
|
||||
|
||||
objnum := atomic.AddInt64(&op_counter, 1)
|
||||
if object_count > -1 && objnum >= object_count {
|
||||
atomic.AddInt64(&op_counter, -1)
|
||||
if duration_secs > -1 && time.Now().After(endtime) {
|
||||
break
|
||||
}
|
||||
|
||||
bucket_num := objnum % int64(bucket_count)
|
||||
objnum := atomic.AddInt64(&op_counter, 1)
|
||||
if object_count > -1 && objnum >= object_count {
|
||||
atomic.AddInt64(&op_counter, -1)
|
||||
break
|
||||
}
|
||||
|
||||
key := fmt.Sprintf("%s%012d", object_prefix, objnum)
|
||||
r := &s3.DeleteObjectInput{
|
||||
Bucket: &buckets[bucket_num],
|
||||
Key: &key,
|
||||
}
|
||||
bucket_num := objnum % int64(bucket_count)
|
||||
|
||||
key := fmt.Sprintf("%s%012d", object_prefix, objnum)
|
||||
r := &s3.DeleteObjectInput{
|
||||
Bucket: &buckets[bucket_num],
|
||||
Key: &key,
|
||||
}
|
||||
|
||||
start := time.Now().UnixNano()
|
||||
req, out := svc.DeleteObjectRequest(r)
|
||||
err := req.Send()
|
||||
req, out := svc.DeleteObjectRequest(r)
|
||||
err := req.Send()
|
||||
end := time.Now().UnixNano()
|
||||
stats.updateIntervals(thread_num)
|
||||
|
||||
if err != nil {
|
||||
errcnt++
|
||||
stats.addSlowDown(thread_num);
|
||||
log.Printf("delete err", err, "out", out.String())
|
||||
} else {
|
||||
if err != nil {
|
||||
errcnt++
|
||||
stats.addSlowDown(thread_num)
|
||||
log.Printf("delete err", err, "out", out.String())
|
||||
} else {
|
||||
// Update the stats
|
||||
stats.addOp(thread_num, object_size, end-start)
|
||||
}
|
||||
if errcnt > 2 {
|
||||
if errcnt > 2 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
stats.finish(thread_num)
|
||||
stats.finish(thread_num)
|
||||
atomic.AddInt64(&running_threads, -1)
|
||||
}
|
||||
|
||||
|
@ -615,9 +615,9 @@ func runBucketDelete(thread_num int, stats *Stats) {
|
|||
for {
|
||||
bucket_num := atomic.AddInt64(&op_counter, 1)
|
||||
if bucket_num >= bucket_count {
|
||||
atomic.AddInt64(&op_counter, -1)
|
||||
break
|
||||
}
|
||||
atomic.AddInt64(&op_counter, -1)
|
||||
break
|
||||
}
|
||||
r := &s3.DeleteBucketInput{
|
||||
Bucket: &buckets[bucket_num],
|
||||
}
|
||||
|
@ -627,156 +627,156 @@ func runBucketDelete(thread_num int, stats *Stats) {
|
|||
end := time.Now().UnixNano()
|
||||
stats.updateIntervals(thread_num)
|
||||
|
||||
if err != nil {
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
stats.addOp(thread_num, 0, end-start)
|
||||
}
|
||||
stats.finish(thread_num)
|
||||
atomic.AddInt64(&running_threads, -1)
|
||||
}
|
||||
stats.addOp(thread_num, 0, end-start)
|
||||
}
|
||||
stats.finish(thread_num)
|
||||
atomic.AddInt64(&running_threads, -1)
|
||||
}
|
||||
|
||||
var cfg *aws.Config
|
||||
|
||||
func runBucketsInit(thread_num int, stats *Stats) {
|
||||
svc := s3.New(session.New(), cfg)
|
||||
svc := s3.New(session.New(), cfg)
|
||||
|
||||
for {
|
||||
bucket_num := atomic.AddInt64(&op_counter, 1)
|
||||
bucket_num := atomic.AddInt64(&op_counter, 1)
|
||||
if bucket_num >= bucket_count {
|
||||
atomic.AddInt64(&op_counter, -1)
|
||||
break
|
||||
}
|
||||
start := time.Now().UnixNano()
|
||||
in := &s3.CreateBucketInput{Bucket: aws.String(buckets[bucket_num])}
|
||||
_, err := svc.CreateBucket(in)
|
||||
end := time.Now().UnixNano()
|
||||
stats.updateIntervals(thread_num)
|
||||
start := time.Now().UnixNano()
|
||||
in := &s3.CreateBucketInput{Bucket: aws.String(buckets[bucket_num])}
|
||||
_, err := svc.CreateBucket(in)
|
||||
end := time.Now().UnixNano()
|
||||
stats.updateIntervals(thread_num)
|
||||
|
||||
if err != nil {
|
||||
if !strings.Contains(err.Error(), s3.ErrCodeBucketAlreadyOwnedByYou) &&
|
||||
!strings.Contains(err.Error(), "BucketAlreadyExists") {
|
||||
log.Fatalf("FATAL: Unable to create bucket %s (is your access and secret correct?): %v", buckets[bucket_num], err)
|
||||
}
|
||||
if !strings.Contains(err.Error(), s3.ErrCodeBucketAlreadyOwnedByYou) &&
|
||||
!strings.Contains(err.Error(), "BucketAlreadyExists") {
|
||||
log.Fatalf("FATAL: Unable to create bucket %s (is your access and secret correct?): %v", buckets[bucket_num], err)
|
||||
}
|
||||
}
|
||||
stats.addOp(thread_num, 0, end-start)
|
||||
stats.addOp(thread_num, 0, end-start)
|
||||
}
|
||||
stats.finish(thread_num)
|
||||
atomic.AddInt64(&running_threads, -1)
|
||||
stats.finish(thread_num)
|
||||
atomic.AddInt64(&running_threads, -1)
|
||||
}
|
||||
|
||||
func runBucketsClear(thread_num int, stats *Stats) {
|
||||
svc := s3.New(session.New(), cfg)
|
||||
svc := s3.New(session.New(), cfg)
|
||||
|
||||
for {
|
||||
bucket_num := atomic.AddInt64(&op_counter, 1)
|
||||
if bucket_num >= bucket_count {
|
||||
atomic.AddInt64(&op_counter, -1)
|
||||
break
|
||||
}
|
||||
out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]})
|
||||
if err != nil {
|
||||
for {
|
||||
bucket_num := atomic.AddInt64(&op_counter, 1)
|
||||
if bucket_num >= bucket_count {
|
||||
atomic.AddInt64(&op_counter, -1)
|
||||
break
|
||||
}
|
||||
n := len(out.Contents)
|
||||
for n > 0 {
|
||||
for _, v := range out.Contents {
|
||||
}
|
||||
out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]})
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
n := len(out.Contents)
|
||||
for n > 0 {
|
||||
for _, v := range out.Contents {
|
||||
start := time.Now().UnixNano()
|
||||
svc.DeleteObject(&s3.DeleteObjectInput{
|
||||
Bucket: &buckets[bucket_num],
|
||||
Key: v.Key,
|
||||
})
|
||||
svc.DeleteObject(&s3.DeleteObjectInput{
|
||||
Bucket: &buckets[bucket_num],
|
||||
Key: v.Key,
|
||||
})
|
||||
end := time.Now().UnixNano()
|
||||
stats.updateIntervals(thread_num)
|
||||
stats.addOp(thread_num, *v.Size, end-start)
|
||||
stats.updateIntervals(thread_num)
|
||||
stats.addOp(thread_num, *v.Size, end-start)
|
||||
|
||||
}
|
||||
out, err = svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]})
|
||||
if err != nil {
|
||||
}
|
||||
out, err = svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]})
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
n = len(out.Contents)
|
||||
}
|
||||
}
|
||||
stats.finish(thread_num)
|
||||
atomic.AddInt64(&running_threads, -1)
|
||||
}
|
||||
n = len(out.Contents)
|
||||
}
|
||||
}
|
||||
stats.finish(thread_num)
|
||||
atomic.AddInt64(&running_threads, -1)
|
||||
}
|
||||
|
||||
func runWrapper(loop int, r rune) []OutputStats {
|
||||
op_counter = -1
|
||||
running_threads = int64(threads)
|
||||
intervalNano := int64(interval*1000000000)
|
||||
endtime = time.Now().Add(time.Second * time.Duration(duration_secs))
|
||||
running_threads = int64(threads)
|
||||
intervalNano := int64(interval * 1000000000)
|
||||
endtime = time.Now().Add(time.Second * time.Duration(duration_secs))
|
||||
var stats Stats
|
||||
|
||||
// If we perviously set the object count after running a put
|
||||
// test, set the object count back to -1 for the new put test.
|
||||
// If we perviously set the object count after running a put
|
||||
// test, set the object count back to -1 for the new put test.
|
||||
if r == 'p' && object_count_flag {
|
||||
object_count = -1
|
||||
object_count_flag = false
|
||||
}
|
||||
object_count = -1
|
||||
object_count_flag = false
|
||||
}
|
||||
|
||||
switch r {
|
||||
case 'c':
|
||||
log.Printf("Running Loop %d BUCKET CLEAR TEST", loop)
|
||||
stats = makeStats(loop, "BCLR", threads, intervalNano)
|
||||
for n := 0; n < threads; n++ {
|
||||
go runBucketsClear(n, &stats);
|
||||
}
|
||||
case 'x':
|
||||
log.Printf("Running Loop %d BUCKET DELETE TEST", loop)
|
||||
stats = makeStats(loop, "BDEL", threads, intervalNano)
|
||||
for n := 0; n < threads; n++ {
|
||||
go runBucketDelete(n, &stats);
|
||||
}
|
||||
case 'c':
|
||||
log.Printf("Running Loop %d BUCKET CLEAR TEST", loop)
|
||||
stats = makeStats(loop, "BCLR", threads, intervalNano)
|
||||
for n := 0; n < threads; n++ {
|
||||
go runBucketsClear(n, &stats)
|
||||
}
|
||||
case 'x':
|
||||
log.Printf("Running Loop %d BUCKET DELETE TEST", loop)
|
||||
stats = makeStats(loop, "BDEL", threads, intervalNano)
|
||||
for n := 0; n < threads; n++ {
|
||||
go runBucketDelete(n, &stats)
|
||||
}
|
||||
case 'i':
|
||||
log.Printf("Running Loop %d BUCKET INIT TEST", loop)
|
||||
stats = makeStats(loop, "BINIT", threads, intervalNano)
|
||||
for n := 0; n < threads; n++ {
|
||||
go runBucketsInit(n, &stats);
|
||||
go runBucketsInit(n, &stats)
|
||||
}
|
||||
case 'p':
|
||||
log.Printf("Running Loop %d OBJECT PUT TEST", loop)
|
||||
stats = makeStats(loop, "PUT", threads, intervalNano)
|
||||
stats = makeStats(loop, "PUT", threads, intervalNano)
|
||||
for n := 0; n < threads; n++ {
|
||||
go runUpload(n, endtime, &stats);
|
||||
go runUpload(n, endtime, &stats)
|
||||
}
|
||||
case 'g':
|
||||
log.Printf("Running Loop %d OBJECT GET TEST", loop)
|
||||
stats = makeStats(loop, "GET", threads, intervalNano)
|
||||
for n := 0; n < threads; n++ {
|
||||
go runDownload(n, endtime, &stats);
|
||||
case 'g':
|
||||
log.Printf("Running Loop %d OBJECT GET TEST", loop)
|
||||
stats = makeStats(loop, "GET", threads, intervalNano)
|
||||
for n := 0; n < threads; n++ {
|
||||
go runDownload(n, endtime, &stats)
|
||||
}
|
||||
case 'd':
|
||||
log.Printf("Running Loop %d OBJECT DELETE TEST", loop)
|
||||
stats = makeStats(loop, "DEL", threads, intervalNano)
|
||||
for n := 0; n < threads; n++ {
|
||||
go runDelete(n, &stats);
|
||||
}
|
||||
}
|
||||
case 'd':
|
||||
log.Printf("Running Loop %d OBJECT DELETE TEST", loop)
|
||||
stats = makeStats(loop, "DEL", threads, intervalNano)
|
||||
for n := 0; n < threads; n++ {
|
||||
go runDelete(n, &stats)
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for it to finish
|
||||
for atomic.LoadInt64(&running_threads) > 0 {
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
// Wait for it to finish
|
||||
for atomic.LoadInt64(&running_threads) > 0 {
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
|
||||
// If the user didn't set the object_count, we can set it here
|
||||
// to limit subsequent get/del tests to valid objects only.
|
||||
// If the user didn't set the object_count, we can set it here
|
||||
// to limit subsequent get/del tests to valid objects only.
|
||||
if r == 'p' && object_count < 0 {
|
||||
object_count = op_counter + 1
|
||||
object_count_flag = true
|
||||
}
|
||||
object_count_flag = true
|
||||
}
|
||||
|
||||
// Create the Output Stats
|
||||
os := make([]OutputStats, 0)
|
||||
for i := int64(0); i >= 0; i++ {
|
||||
if o, ok := stats.makeOutputStats(i); ok {
|
||||
os = append(os, o)
|
||||
os = append(os, o)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
if o, ok := stats.makeTotalStats(); ok {
|
||||
if o, ok := stats.makeTotalStats(); ok {
|
||||
o.log()
|
||||
os = append(os, o)
|
||||
}
|
||||
|
@ -789,22 +789,22 @@ func init() {
|
|||
myflag.StringVar(&access_key, "a", os.Getenv("AWS_ACCESS_KEY_ID"), "Access key")
|
||||
myflag.StringVar(&secret_key, "s", os.Getenv("AWS_SECRET_ACCESS_KEY"), "Secret key")
|
||||
myflag.StringVar(&url_host, "u", os.Getenv("AWS_HOST"), "URL for host with method prefix")
|
||||
myflag.StringVar(&object_prefix, "op", "", "Prefix for objects")
|
||||
myflag.StringVar(&object_prefix, "op", "", "Prefix for objects")
|
||||
myflag.StringVar(&bucket_prefix, "bp", "hotsauce_bench", "Prefix for buckets")
|
||||
myflag.StringVar(®ion, "r", "us-east-1", "Region for testing")
|
||||
myflag.StringVar(&modes, "m", "cxipgdx", "Run modes in order. See NOTES for more info")
|
||||
myflag.StringVar(&modes, "m", "cxipgdx", "Run modes in order. See NOTES for more info")
|
||||
myflag.StringVar(&output, "o", "", "Write CSV output to this file")
|
||||
myflag.StringVar(&json_output, "j", "", "Write JSON output to this file")
|
||||
myflag.Int64Var(&object_count, "n", -1, "Maximum number of objects <-1 for unlimited>")
|
||||
myflag.Int64Var(&bucket_count, "b", 1, "Number of buckets to distribute IOs across")
|
||||
myflag.Int64Var(&object_count, "n", -1, "Maximum number of objects <-1 for unlimited>")
|
||||
myflag.Int64Var(&bucket_count, "b", 1, "Number of buckets to distribute IOs across")
|
||||
myflag.IntVar(&duration_secs, "d", 60, "Maximum test duration in seconds <-1 for unlimited>")
|
||||
myflag.IntVar(&threads, "t", 1, "Number of threads to run")
|
||||
myflag.IntVar(&loops, "l", 1, "Number of times to repeat test")
|
||||
myflag.StringVar(&sizeArg, "z", "1M", "Size of objects in bytes with postfix K, M, and G")
|
||||
myflag.Float64Var(&interval, "ri", 1.0, "Number of seconds between report intervals")
|
||||
// define custom usage output with notes
|
||||
notes :=
|
||||
`
|
||||
notes :=
|
||||
`
|
||||
NOTES:
|
||||
- Valid mode types for the -m mode string are:
|
||||
c: clear all existing objects from buckets (requires lookups)
|
||||
|
@ -823,7 +823,7 @@ NOTES:
|
|||
fmt.Fprintf(flag.CommandLine.Output(), "\nUSAGE: %s [OPTIONS]\n\n", os.Args[0])
|
||||
fmt.Fprintf(flag.CommandLine.Output(), "OPTIONS:\n")
|
||||
myflag.PrintDefaults()
|
||||
fmt.Fprintf(flag.CommandLine.Output(), notes);
|
||||
fmt.Fprintf(flag.CommandLine.Output(), notes)
|
||||
}
|
||||
|
||||
if err := myflag.Parse(os.Args[1:]); err != nil {
|
||||
|
@ -845,13 +845,12 @@ NOTES:
|
|||
}
|
||||
invalid_mode := false
|
||||
for _, r := range modes {
|
||||
if (
|
||||
r != 'i' &&
|
||||
r != 'c' &&
|
||||
r != 'p' &&
|
||||
r != 'g' &&
|
||||
r != 'd' &&
|
||||
r != 'x') {
|
||||
if r != 'i' &&
|
||||
r != 'c' &&
|
||||
r != 'p' &&
|
||||
r != 'g' &&
|
||||
r != 'd' &&
|
||||
r != 'x' {
|
||||
s := fmt.Sprintf("Invalid mode '%s' passed to -m", string(r))
|
||||
log.Printf(s)
|
||||
invalid_mode = true
|
||||
|
@ -869,12 +868,12 @@ NOTES:
|
|||
}
|
||||
|
||||
func initData() {
|
||||
// Initialize data for the bucket
|
||||
object_data = make([]byte, object_size)
|
||||
rand.Read(object_data)
|
||||
hasher := md5.New()
|
||||
hasher.Write(object_data)
|
||||
object_data_md5 = base64.StdEncoding.EncodeToString(hasher.Sum(nil))
|
||||
// Initialize data for the bucket
|
||||
object_data = make([]byte, object_size)
|
||||
rand.Read(object_data)
|
||||
hasher := md5.New()
|
||||
hasher.Write(object_data)
|
||||
object_data_md5 = base64.StdEncoding.EncodeToString(hasher.Sum(nil))
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
@ -911,51 +910,51 @@ func main() {
|
|||
initData()
|
||||
|
||||
// Setup the slice of buckets
|
||||
for i := int64(0); i < bucket_count; i++ {
|
||||
for i := int64(0); i < bucket_count; i++ {
|
||||
buckets = append(buckets, fmt.Sprintf("%s%012d", bucket_prefix, i))
|
||||
}
|
||||
|
||||
// Loop running the tests
|
||||
oStats := make([]OutputStats, 0)
|
||||
for loop := 0; loop < loops; loop++ {
|
||||
for _, r := range modes {
|
||||
oStats = append(oStats, runWrapper(loop, r)...)
|
||||
}
|
||||
}
|
||||
// Loop running the tests
|
||||
oStats := make([]OutputStats, 0)
|
||||
for loop := 0; loop < loops; loop++ {
|
||||
for _, r := range modes {
|
||||
oStats = append(oStats, runWrapper(loop, r)...)
|
||||
}
|
||||
}
|
||||
|
||||
// Write CSV Output
|
||||
if output != "" {
|
||||
file, err := os.OpenFile(output, os.O_CREATE|os.O_WRONLY, 0777)
|
||||
file, err := os.OpenFile(output, os.O_CREATE|os.O_WRONLY, 0777)
|
||||
defer file.Close()
|
||||
if err != nil {
|
||||
log.Fatal("Could not open CSV file for writing.")
|
||||
} else {
|
||||
csvWriter := csv.NewWriter(file)
|
||||
for i, o := range oStats {
|
||||
for i, o := range oStats {
|
||||
if i == 0 {
|
||||
o.csv_header(csvWriter)
|
||||
}
|
||||
o.csv(csvWriter)
|
||||
}
|
||||
csvWriter.Flush()
|
||||
o.csv(csvWriter)
|
||||
}
|
||||
csvWriter.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
// Write JSON output
|
||||
if json_output != "" {
|
||||
file, err := os.OpenFile(json_output, os.O_CREATE|os.O_WRONLY, 0777)
|
||||
defer file.Close()
|
||||
if err != nil {
|
||||
log.Fatal("Could not open JSON file for writing.")
|
||||
}
|
||||
defer file.Close()
|
||||
if err != nil {
|
||||
log.Fatal("Could not open JSON file for writing.")
|
||||
}
|
||||
data, err := json.Marshal(oStats)
|
||||
if err != nil {
|
||||
log.Fatal("Error marshaling JSON: ", err)
|
||||
}
|
||||
_, err = file.Write(data)
|
||||
if err != nil {
|
||||
log.Fatal("Error writing to JSON file: ", err)
|
||||
}
|
||||
file.Sync()
|
||||
if err != nil {
|
||||
log.Fatal("Error marshaling JSON: ", err)
|
||||
}
|
||||
_, err = file.Write(data)
|
||||
if err != nil {
|
||||
log.Fatal("Error writing to JSON file: ", err)
|
||||
}
|
||||
file.Sync()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue