Implement parallel deletes

master
Vitaliy Filippov 2020-08-12 14:35:58 +03:00
parent 6d7e98a45c
commit 2824649c37
1 changed files with 46 additions and 28 deletions

View File

@ -726,38 +726,48 @@ func runBucketsInit(thread_num int, stats *Stats) {
atomic.AddInt64(&running_threads, -1)
}
func runBucketsClear(thread_num int, stats *Stats) {
type pagedObject struct {
bucket_num int64
key string
size int64
}
func runPagedList(wg *sync.WaitGroup, bucket_num int64, list chan<- pagedObject) {
svc := s3.New(session.New(), cfg)
svc.ListObjectsPages(
&s3.ListObjectsInput{
Bucket: &buckets[bucket_num],
MaxKeys: &max_keys,
},
func(page *s3.ListObjectsOutput, last bool) bool {
for _, v := range page.Contents {
list <- pagedObject{
bucket_num: bucket_num,
key: *v.Key,
size: *v.Size,
}
}
return true
})
wg.Done()
}
func runBucketsClear(list <-chan pagedObject, thread_num int, stats *Stats) {
svc := s3.New(session.New(), cfg)
for {
bucket_num := atomic.AddInt64(&op_counter, 1)
if bucket_num >= bucket_count {
atomic.AddInt64(&op_counter, -1)
break
}
out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]})
v := <-list
start := time.Now().UnixNano()
_, err := svc.DeleteObject(&s3.DeleteObjectInput{
Bucket: &buckets[v.bucket_num],
Key: &v.key,
})
end := time.Now().UnixNano()
stats.updateIntervals(thread_num)
if err != nil {
break
}
n := len(out.Contents)
for n > 0 {
for _, v := range out.Contents {
start := time.Now().UnixNano()
svc.DeleteObject(&s3.DeleteObjectInput{
Bucket: &buckets[bucket_num],
Key: v.Key,
})
end := time.Now().UnixNano()
stats.updateIntervals(thread_num)
stats.addOp(thread_num, *v.Size, end-start)
}
out, err = svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]})
if err != nil {
break
}
n = len(out.Contents)
}
stats.addOp(thread_num, v.size, end-start)
}
stats.finish(thread_num)
atomic.AddInt64(&running_threads, -1)
@ -781,9 +791,17 @@ func runWrapper(loop int, r rune) []OutputStats {
case 'c':
log.Printf("Running Loop %d BUCKET CLEAR TEST", loop)
stats = makeStats(loop, "BCLR", threads, intervalNano)
for n := 0; n < threads; n++ {
go runBucketsClear(n, &stats)
list := make(chan pagedObject, threads*2)
var wg = sync.WaitGroup{}
for b := int64(0); b < bucket_count; b++ {
wg.Add(1)
go runPagedList(&wg, b, list)
}
for n := 0; n < threads; n++ {
go runBucketsClear(list, n, &stats)
}
wg.Wait()
close(list)
case 'x':
log.Printf("Running Loop %d BUCKET DELETE TEST", loop)
stats = makeStats(loop, "BDEL", threads, intervalNano)