clientv3: Expose clientv3/examples close to the code.

Many of the tests had missing '// Output:' comment, so were not
runnable. They required fining.
release-3.5
Piotr Tabor 2020-10-06 22:36:16 +02:00
parent dd45d04b2d
commit f67956cb7a
23 changed files with 995 additions and 717 deletions

1
.words
View File

@ -108,6 +108,7 @@ uncontended
unfreed unfreed
unlisting unlisting
unprefixed unprefixed
WatchProgressNotifyInterval
WAL WAL
WithBackoff WithBackoff
WithDialer WithDialer

View File

@ -0,0 +1 @@
../tests/integration/clientv3/examples/example_auth_test.go

View File

@ -0,0 +1 @@
../tests/integration/clientv3/examples/example_cluster_test.go

1
clientv3/example_kv_test.go Symbolic link
View File

@ -0,0 +1 @@
../tests/integration/clientv3/examples/example_kv_test.go

View File

@ -0,0 +1 @@
../tests/integration/clientv3/examples/example_lease_test.go

View File

@ -0,0 +1 @@
../tests/integration/clientv3/examples/example_maintenance_test.go

View File

@ -0,0 +1 @@
../tests/integration/clientv3/examples/example_metrics_test.go

1
clientv3/example_test.go Symbolic link
View File

@ -0,0 +1 @@
../tests/integration/clientv3/examples/example_test.go

View File

@ -0,0 +1 @@
../tests/integration/clientv3/examples/example_watch_test.go

41
clientv3/main_test.go Normal file
View File

@ -0,0 +1,41 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clientv3_test
import (
"testing"
"time"
"go.etcd.io/etcd/v3/pkg/testutil"
)
const (
dialTimeout = 5 * time.Second
requestTimeout = 10 * time.Second
)
func exampleEndpoints() []string { return nil }
func forUnitTestsRunInMockedContext(mocking func(), example func()) {
mocking()
// TODO: Call 'example' when mocking() provides realistic mocking of transport.
// The real testing logic of examples gets executed
// as part of ./tests/integration/clientv3/integration/...
}
func TestMain(m *testing.M) {
testutil.MustTestMainWithLeakDetection(m)
}

View File

@ -303,6 +303,8 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e
return ttl, err return ttl, err
} }
} }
// Throttle in case of e.g. connection problems.
time.Sleep(50 * time.Millisecond)
} }
if cctx.Err() == context.DeadlineExceeded { if cctx.Err() == context.DeadlineExceeded {

3
test
View File

@ -94,9 +94,6 @@ function unit_pass {
function integration_extra { function integration_extra {
if [ -z "${PKG}" ] ; then if [ -z "${PKG}" ] ; then
if [[ -z "${RUN_ARG[*]}" ]]; then
run_for_module "tests" go_test "./integration/..." "keep_going" : -timeout="${TIMEOUT:-5m}" "${COMMON_TEST_FLAGS[@]}" --run=Example "$@" || return $?
fi
run_for_module "." go_test "./contrib/raftexample" "keep_going" : -timeout="${TIMEOUT:-5m}" "${RUN_ARG[@]}" "${COMMON_TEST_FLAGS[@]}" "$@" || return $? run_for_module "." go_test "./contrib/raftexample" "keep_going" : -timeout="${TIMEOUT:-5m}" "${RUN_ARG[@]}" "${COMMON_TEST_FLAGS[@]}" "$@" || return $?
run_for_module "tests" go_test "./integration/v2store/..." "keep_going" : -tags v2v3 -timeout="${TIMEOUT:-5m}" "${RUN_ARG[@]}" "${COMMON_TEST_FLAGS[@]}" "$@" || return $? run_for_module "tests" go_test "./integration/v2store/..." "keep_going" : -tags v2v3 -timeout="${TIMEOUT:-5m}" "${RUN_ARG[@]}" "${COMMON_TEST_FLAGS[@]}" "$@" || return $?
else else

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package clientv3test package clientv3_test
import ( import (
"context" "context"
@ -22,92 +22,101 @@ import (
"go.etcd.io/etcd/v3/clientv3" "go.etcd.io/etcd/v3/clientv3"
) )
func mockAuth() {
fmt.Println(`etcdserver: permission denied`)
fmt.Println(`user u permission: key "foo", range end "zoo"`)
}
func ExampleAuth() { func ExampleAuth() {
cli, err := clientv3.New(clientv3.Config{ forUnitTestsRunInMockedContext(
Endpoints: endpoints, mockAuth,
DialTimeout: dialTimeout, func() {
}) cli, err := clientv3.New(clientv3.Config{
if err != nil { Endpoints: exampleEndpoints(),
log.Fatal(err) DialTimeout: dialTimeout,
} })
defer cli.Close() if err != nil {
log.Fatal(err)
}
defer cli.Close()
if _, err = cli.RoleAdd(context.TODO(), "root"); err != nil { if _, err = cli.RoleAdd(context.TODO(), "root"); err != nil {
log.Fatal(err) log.Fatal(err)
} }
if _, err = cli.UserAdd(context.TODO(), "root", "123"); err != nil { if _, err = cli.UserAdd(context.TODO(), "root", "123"); err != nil {
log.Fatal(err) log.Fatal(err)
} }
if _, err = cli.UserGrantRole(context.TODO(), "root", "root"); err != nil { if _, err = cli.UserGrantRole(context.TODO(), "root", "root"); err != nil {
log.Fatal(err) log.Fatal(err)
} }
if _, err = cli.RoleAdd(context.TODO(), "r"); err != nil { if _, err = cli.RoleAdd(context.TODO(), "r"); err != nil {
log.Fatal(err) log.Fatal(err)
} }
if _, err = cli.RoleGrantPermission( if _, err = cli.RoleGrantPermission(
context.TODO(), context.TODO(),
"r", // role name "r", // role name
"foo", // key "foo", // key
"zoo", // range end "zoo", // range end
clientv3.PermissionType(clientv3.PermReadWrite), clientv3.PermissionType(clientv3.PermReadWrite),
); err != nil { ); err != nil {
log.Fatal(err) log.Fatal(err)
} }
if _, err = cli.UserAdd(context.TODO(), "u", "123"); err != nil { if _, err = cli.UserAdd(context.TODO(), "u", "123"); err != nil {
log.Fatal(err) log.Fatal(err)
} }
if _, err = cli.UserGrantRole(context.TODO(), "u", "r"); err != nil { if _, err = cli.UserGrantRole(context.TODO(), "u", "r"); err != nil {
log.Fatal(err) log.Fatal(err)
} }
if _, err = cli.AuthEnable(context.TODO()); err != nil { if _, err = cli.AuthEnable(context.TODO()); err != nil {
log.Fatal(err) log.Fatal(err)
} }
cliAuth, err := clientv3.New(clientv3.Config{ cliAuth, err := clientv3.New(clientv3.Config{
Endpoints: endpoints, Endpoints: exampleEndpoints(),
DialTimeout: dialTimeout, DialTimeout: dialTimeout,
Username: "u", Username: "u",
Password: "123", Password: "123",
}) })
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer cliAuth.Close() defer cliAuth.Close()
if _, err = cliAuth.Put(context.TODO(), "foo1", "bar"); err != nil { if _, err = cliAuth.Put(context.TODO(), "foo1", "bar"); err != nil {
log.Fatal(err) log.Fatal(err)
} }
_, err = cliAuth.Txn(context.TODO()). _, err = cliAuth.Txn(context.TODO()).
If(clientv3.Compare(clientv3.Value("zoo1"), ">", "abc")). If(clientv3.Compare(clientv3.Value("zoo1"), ">", "abc")).
Then(clientv3.OpPut("zoo1", "XYZ")). Then(clientv3.OpPut("zoo1", "XYZ")).
Else(clientv3.OpPut("zoo1", "ABC")). Else(clientv3.OpPut("zoo1", "ABC")).
Commit() Commit()
fmt.Println(err) fmt.Println(err)
// now check the permission with the root account // now check the permission with the root account
rootCli, err := clientv3.New(clientv3.Config{ rootCli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints, Endpoints: exampleEndpoints(),
DialTimeout: dialTimeout, DialTimeout: dialTimeout,
Username: "root", Username: "root",
Password: "123", Password: "123",
}) })
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer rootCli.Close() defer rootCli.Close()
resp, err := rootCli.RoleGet(context.TODO(), "r") resp, err := rootCli.RoleGet(context.TODO(), "r")
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
fmt.Printf("user u permission: key %q, range end %q\n", resp.Perm[0].Key, resp.Perm[0].RangeEnd) fmt.Printf("user u permission: key %q, range end %q\n", resp.Perm[0].Key, resp.Perm[0].RangeEnd)
if _, err = rootCli.AuthDisable(context.TODO()); err != nil { if _, err = rootCli.AuthDisable(context.TODO()); err != nil {
log.Fatal(err) log.Fatal(err)
} }
})
// Output: etcdserver: permission denied // Output: etcdserver: permission denied
// user u permission: key "foo", range end "zoo" // user u permission: key "foo", range end "zoo"
} }

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package clientv3test package clientv3_test
import ( import (
"context" "context"
@ -22,103 +22,160 @@ import (
"go.etcd.io/etcd/v3/clientv3" "go.etcd.io/etcd/v3/clientv3"
) )
func ExampleCluster_memberList() { func mockCluster_memberList() {
cli, err := clientv3.New(clientv3.Config{ fmt.Println("members: 3")
Endpoints: endpoints, }
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
resp, err := cli.MemberList(context.Background()) func ExampleCluster_memberList() {
if err != nil { forUnitTestsRunInMockedContext(mockCluster_memberList, func() {
log.Fatal(err) cli, err := clientv3.New(clientv3.Config{
} Endpoints: exampleEndpoints(),
fmt.Println("members:", len(resp.Members)) DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
resp, err := cli.MemberList(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Println("members:", len(resp.Members))
})
// Output: members: 3 // Output: members: 3
} }
func ExampleCluster_memberAdd() { func mockCluster_memberAdd() {
cli, err := clientv3.New(clientv3.Config{ fmt.Println("added member.PeerURLs: [http://localhost:32380]")
Endpoints: endpoints[:2], fmt.Println("members count: 4")
DialTimeout: dialTimeout, }
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
peerURLs := endpoints[2:] func ExampleCluster_memberAdd() {
mresp, err := cli.MemberAdd(context.Background(), peerURLs) forUnitTestsRunInMockedContext(mockCluster_memberAdd, func() {
if err != nil { cli, err := clientv3.New(clientv3.Config{
log.Fatal(err) Endpoints: exampleEndpoints(),
} DialTimeout: dialTimeout,
fmt.Println("added member.PeerURLs:", mresp.Member.PeerURLs) })
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// Add member 1:
mresp, err := cli.MemberAdd(context.Background(), []string{"http://localhost:32380"})
if err != nil {
log.Fatal(err)
}
fmt.Println("added member.PeerURLs:", mresp.Member.PeerURLs)
fmt.Println("members count:", len(mresp.Members))
// Restore original cluster state
_, err = cli.MemberRemove(context.Background(), mresp.Member.ID)
if err != nil {
log.Fatal(err)
}
})
// Output:
// added member.PeerURLs: [http://localhost:32380] // added member.PeerURLs: [http://localhost:32380]
// members count: 4
}
func mockCluster_memberAddAsLearner() {
fmt.Println("members count: 4")
fmt.Println("added member.IsLearner: true")
} }
func ExampleCluster_memberAddAsLearner() { func ExampleCluster_memberAddAsLearner() {
cli, err := clientv3.New(clientv3.Config{ forUnitTestsRunInMockedContext(mockCluster_memberAddAsLearner, func() {
Endpoints: endpoints[:2], cli, err := clientv3.New(clientv3.Config{
DialTimeout: dialTimeout, Endpoints: exampleEndpoints(),
}) DialTimeout: dialTimeout,
if err != nil { })
log.Fatal(err) if err != nil {
} log.Fatal(err)
defer cli.Close() }
defer cli.Close()
peerURLs := endpoints[2:] mresp, err := cli.MemberAddAsLearner(context.Background(), []string{"http://localhost:32381"})
mresp, err := cli.MemberAddAsLearner(context.Background(), peerURLs) if err != nil {
if err != nil { log.Fatal(err)
log.Fatal(err) }
}
fmt.Println("added member.PeerURLs:", mresp.Member.PeerURLs) // Restore original cluster state
fmt.Println("added member.IsLearner:", mresp.Member.IsLearner) _, err = cli.MemberRemove(context.Background(), mresp.Member.ID)
// added member.PeerURLs: [http://localhost:32380] if err != nil {
log.Fatal(err)
}
fmt.Println("members count:", len(mresp.Members))
fmt.Println("added member.IsLearner:", mresp.Member.IsLearner)
})
// Output:
// members count: 4
// added member.IsLearner: true // added member.IsLearner: true
} }
func mockCluster_memberRemove() {}
func ExampleCluster_memberRemove() { func ExampleCluster_memberRemove() {
cli, err := clientv3.New(clientv3.Config{ forUnitTestsRunInMockedContext(mockCluster_memberRemove, func() {
Endpoints: endpoints[1:], cli, err := clientv3.New(clientv3.Config{
DialTimeout: dialTimeout, Endpoints: exampleEndpoints(),
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
resp, err := cli.MemberList(context.Background())
if err != nil {
log.Fatal(err)
}
_, err = cli.MemberRemove(context.Background(), resp.Members[0].ID)
if err != nil {
log.Fatal(err)
}
// Restore original cluster:
_, err = cli.MemberAdd(context.Background(), resp.Members[0].PeerURLs)
if err != nil {
log.Fatal(err)
}
}) })
if err != nil {
log.Fatal(err)
}
defer cli.Close()
resp, err := cli.MemberList(context.Background())
if err != nil {
log.Fatal(err)
}
_, err = cli.MemberRemove(context.Background(), resp.Members[0].ID)
if err != nil {
log.Fatal(err)
}
} }
func mockCluster_memberUpdate() {}
func ExampleCluster_memberUpdate() { func ExampleCluster_memberUpdate() {
cli, err := clientv3.New(clientv3.Config{ forUnitTestsRunInMockedContext(mockCluster_memberUpdate, func() {
Endpoints: endpoints, cli, err := clientv3.New(clientv3.Config{
DialTimeout: dialTimeout, Endpoints: exampleEndpoints(),
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
resp, err := cli.MemberList(context.Background())
if err != nil {
log.Fatal(err)
}
peerURLs := []string{"http://localhost:12380"}
_, err = cli.MemberUpdate(context.Background(), resp.Members[0].ID, peerURLs)
if err != nil {
log.Fatal(err)
}
// Restore to mitigate impact on other tests:
_, err = cli.MemberUpdate(context.Background(), resp.Members[0].ID, resp.Members[0].PeerURLs)
if err != nil {
log.Fatal(err)
}
}) })
if err != nil { // Output:
log.Fatal(err)
}
defer cli.Close()
resp, err := cli.MemberList(context.Background())
if err != nil {
log.Fatal(err)
}
peerURLs := []string{"http://localhost:12380"}
_, err = cli.MemberUpdate(context.Background(), resp.Members[0].ID, peerURLs)
if err != nil {
log.Fatal(err)
}
} }

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package clientv3test package clientv3_test
import ( import (
"context" "context"
@ -23,259 +23,312 @@ import (
"go.etcd.io/etcd/v3/clientv3" "go.etcd.io/etcd/v3/clientv3"
) )
func mockKV_put() {}
func ExampleKV_put() { func ExampleKV_put() {
cli, err := clientv3.New(clientv3.Config{ forUnitTestsRunInMockedContext(mockKV_put, func() {
Endpoints: endpoints, cli, err := clientv3.New(clientv3.Config{
DialTimeout: dialTimeout, Endpoints: exampleEndpoints(),
}) DialTimeout: dialTimeout,
if err != nil { })
log.Fatal(err) if err != nil {
} log.Fatal(err)
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err = cli.Put(ctx, "sample_key", "sample_value")
cancel()
if err != nil {
log.Fatal(err)
}
}
func ExampleKV_putErrorHandling() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err = cli.Put(ctx, "", "sample_value")
cancel()
if err != nil {
switch err {
case context.Canceled:
fmt.Printf("ctx is canceled by another routine: %v\n", err)
case context.DeadlineExceeded:
fmt.Printf("ctx is attached with a deadline is exceeded: %v\n", err)
case rpctypes.ErrEmptyKey:
fmt.Printf("client-side error: %v\n", err)
default:
fmt.Printf("bad cluster endpoints, which are not etcd servers: %v\n", err)
} }
} defer cli.Close()
// Output: client-side error: etcdserver: key is not provided
}
func ExampleKV_get() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
_, err = cli.Put(context.TODO(), "foo", "bar")
if err != nil {
log.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
resp, err := cli.Get(ctx, "foo")
cancel()
if err != nil {
log.Fatal(err)
}
for _, ev := range resp.Kvs {
fmt.Printf("%s : %s\n", ev.Key, ev.Value)
}
// Output: foo : bar
}
func ExampleKV_getWithRev() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
presp, err := cli.Put(context.TODO(), "foo", "bar1")
if err != nil {
log.Fatal(err)
}
_, err = cli.Put(context.TODO(), "foo", "bar2")
if err != nil {
log.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
resp, err := cli.Get(ctx, "foo", clientv3.WithRev(presp.Header.Revision))
cancel()
if err != nil {
log.Fatal(err)
}
for _, ev := range resp.Kvs {
fmt.Printf("%s : %s\n", ev.Key, ev.Value)
}
// Output: foo : bar1
}
func ExampleKV_getSortedPrefix() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
for i := range make([]int, 3) {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err = cli.Put(ctx, fmt.Sprintf("key_%d", i), "value") _, err = cli.Put(ctx, "sample_key", "sample_value")
cancel() cancel()
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
} })
// Output:
}
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) func mockKV_putErrorHandling() {
resp, err := cli.Get(ctx, "key", clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) fmt.Println("client-side error: etcdserver: key is not provided")
cancel() }
if err != nil {
log.Fatal(err) func ExampleKV_putErrorHandling() {
} forUnitTestsRunInMockedContext(mockKV_putErrorHandling, func() {
for _, ev := range resp.Kvs { cli, err := clientv3.New(clientv3.Config{
fmt.Printf("%s : %s\n", ev.Key, ev.Value) Endpoints: exampleEndpoints(),
} DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err = cli.Put(ctx, "", "sample_value")
cancel()
if err != nil {
switch err {
case context.Canceled:
fmt.Printf("ctx is canceled by another routine: %v\n", err)
case context.DeadlineExceeded:
fmt.Printf("ctx is attached with a deadline is exceeded: %v\n", err)
case rpctypes.ErrEmptyKey:
fmt.Printf("client-side error: %v\n", err)
default:
fmt.Printf("bad cluster endpoints, which are not etcd servers: %v\n", err)
}
}
})
// Output: client-side error: etcdserver: key is not provided
}
func mockKV_get() {
fmt.Println("foo : bar")
}
func ExampleKV_get() {
forUnitTestsRunInMockedContext(mockKV_get, func() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: exampleEndpoints(),
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
_, err = cli.Put(context.TODO(), "foo", "bar")
if err != nil {
log.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
resp, err := cli.Get(ctx, "foo")
cancel()
if err != nil {
log.Fatal(err)
}
for _, ev := range resp.Kvs {
fmt.Printf("%s : %s\n", ev.Key, ev.Value)
}
})
// Output: foo : bar
}
func mockKV_getWithRev() {
fmt.Println("foo : bar1")
}
func ExampleKV_getWithRev() {
forUnitTestsRunInMockedContext(mockKV_getWithRev, func() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: exampleEndpoints(),
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
presp, err := cli.Put(context.TODO(), "foo", "bar1")
if err != nil {
log.Fatal(err)
}
_, err = cli.Put(context.TODO(), "foo", "bar2")
if err != nil {
log.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
resp, err := cli.Get(ctx, "foo", clientv3.WithRev(presp.Header.Revision))
cancel()
if err != nil {
log.Fatal(err)
}
for _, ev := range resp.Kvs {
fmt.Printf("%s : %s\n", ev.Key, ev.Value)
}
})
// Output: foo : bar1
}
func mockKV_getSortedPrefix() {
fmt.Println(`key_2 : value`)
fmt.Println(`key_1 : value`)
fmt.Println(`key_0 : value`)
}
func ExampleKV_getSortedPrefix() {
forUnitTestsRunInMockedContext(mockKV_getSortedPrefix, func() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: exampleEndpoints(),
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
for i := range make([]int, 3) {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err = cli.Put(ctx, fmt.Sprintf("key_%d", i), "value")
cancel()
if err != nil {
log.Fatal(err)
}
}
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
resp, err := cli.Get(ctx, "key", clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
cancel()
if err != nil {
log.Fatal(err)
}
for _, ev := range resp.Kvs {
fmt.Printf("%s : %s\n", ev.Key, ev.Value)
}
})
// Output: // Output:
// key_2 : value // key_2 : value
// key_1 : value // key_1 : value
// key_0 : value // key_0 : value
} }
func mockKV_delete() {
fmt.Println("Deleted all keys: true")
}
func ExampleKV_delete() { func ExampleKV_delete() {
cli, err := clientv3.New(clientv3.Config{ forUnitTestsRunInMockedContext(mockKV_delete, func() {
Endpoints: endpoints, cli, err := clientv3.New(clientv3.Config{
DialTimeout: dialTimeout, Endpoints: exampleEndpoints(),
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
// count keys about to be deleted
gresp, err := cli.Get(ctx, "key", clientv3.WithPrefix())
if err != nil {
log.Fatal(err)
}
// delete the keys
dresp, err := cli.Delete(ctx, "key", clientv3.WithPrefix())
if err != nil {
log.Fatal(err)
}
fmt.Println("Deleted all keys:", int64(len(gresp.Kvs)) == dresp.Deleted)
}) })
if err != nil {
log.Fatal(err)
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
// count keys about to be deleted
gresp, err := cli.Get(ctx, "key", clientv3.WithPrefix())
if err != nil {
log.Fatal(err)
}
// delete the keys
dresp, err := cli.Delete(ctx, "key", clientv3.WithPrefix())
if err != nil {
log.Fatal(err)
}
fmt.Println("Deleted all keys:", int64(len(gresp.Kvs)) == dresp.Deleted)
// Output: // Output:
// Deleted all keys: true // Deleted all keys: true
} }
func mockKV_compact() {}
func ExampleKV_compact() { func ExampleKV_compact() {
cli, err := clientv3.New(clientv3.Config{ forUnitTestsRunInMockedContext(mockKV_compact, func() {
Endpoints: endpoints, cli, err := clientv3.New(clientv3.Config{
DialTimeout: dialTimeout, Endpoints: exampleEndpoints(),
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
resp, err := cli.Get(ctx, "foo")
cancel()
if err != nil {
log.Fatal(err)
}
compRev := resp.Header.Revision // specify compact revision of your choice
ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
_, err = cli.Compact(ctx, compRev)
cancel()
if err != nil {
log.Fatal(err)
}
}) })
if err != nil { // Output:
log.Fatal(err) }
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) func mockKV_txn() {
resp, err := cli.Get(ctx, "foo") fmt.Println("key : XYZ")
cancel()
if err != nil {
log.Fatal(err)
}
compRev := resp.Header.Revision // specify compact revision of your choice
ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
_, err = cli.Compact(ctx, compRev)
cancel()
if err != nil {
log.Fatal(err)
}
} }
func ExampleKV_txn() { func ExampleKV_txn() {
cli, err := clientv3.New(clientv3.Config{ forUnitTestsRunInMockedContext(mockKV_txn, func() {
Endpoints: endpoints, cli, err := clientv3.New(clientv3.Config{
DialTimeout: dialTimeout, Endpoints: exampleEndpoints(),
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
kvc := clientv3.NewKV(cli)
_, err = kvc.Put(context.TODO(), "key", "xyz")
if err != nil {
log.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err = kvc.Txn(ctx).
// txn value comparisons are lexical
If(clientv3.Compare(clientv3.Value("key"), ">", "abc")).
// the "Then" runs, since "xyz" > "abc"
Then(clientv3.OpPut("key", "XYZ")).
// the "Else" does not run
Else(clientv3.OpPut("key", "ABC")).
Commit()
cancel()
if err != nil {
log.Fatal(err)
}
gresp, err := kvc.Get(context.TODO(), "key")
if err != nil {
log.Fatal(err)
}
for _, ev := range gresp.Kvs {
fmt.Printf("%s : %s\n", ev.Key, ev.Value)
}
}) })
if err != nil {
log.Fatal(err)
}
defer cli.Close()
kvc := clientv3.NewKV(cli)
_, err = kvc.Put(context.TODO(), "key", "xyz")
if err != nil {
log.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err = kvc.Txn(ctx).
// txn value comparisons are lexical
If(clientv3.Compare(clientv3.Value("key"), ">", "abc")).
// the "Then" runs, since "xyz" > "abc"
Then(clientv3.OpPut("key", "XYZ")).
// the "Else" does not run
Else(clientv3.OpPut("key", "ABC")).
Commit()
cancel()
if err != nil {
log.Fatal(err)
}
gresp, err := kvc.Get(context.TODO(), "key")
if err != nil {
log.Fatal(err)
}
for _, ev := range gresp.Kvs {
fmt.Printf("%s : %s\n", ev.Key, ev.Value)
}
// Output: key : XYZ // Output: key : XYZ
} }
func mockKV_do() {}
func ExampleKV_do() { func ExampleKV_do() {
cli, err := clientv3.New(clientv3.Config{ forUnitTestsRunInMockedContext(mockKV_do, func() {
Endpoints: endpoints, cli, err := clientv3.New(clientv3.Config{
DialTimeout: dialTimeout, Endpoints: exampleEndpoints(),
}) DialTimeout: dialTimeout,
if err != nil { })
log.Fatal(err) if err != nil {
}
defer cli.Close()
ops := []clientv3.Op{
clientv3.OpPut("put-key", "123"),
clientv3.OpGet("put-key"),
clientv3.OpPut("put-key", "456")}
for _, op := range ops {
if _, err := cli.Do(context.TODO(), op); err != nil {
log.Fatal(err) log.Fatal(err)
} }
} defer cli.Close()
ops := []clientv3.Op{
clientv3.OpPut("put-key", "123"),
clientv3.OpGet("put-key"),
clientv3.OpPut("put-key", "456")}
for _, op := range ops {
if _, err := cli.Do(context.TODO(), op); err != nil {
log.Fatal(err)
}
}
})
// Output:
} }

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package clientv3test package clientv3_test
import ( import (
"context" "context"
@ -22,120 +22,148 @@ import (
"go.etcd.io/etcd/v3/clientv3" "go.etcd.io/etcd/v3/clientv3"
) )
func mockLease_grant() {
}
func ExampleLease_grant() { func ExampleLease_grant() {
cli, err := clientv3.New(clientv3.Config{ forUnitTestsRunInMockedContext(mockLease_grant, func() {
Endpoints: endpoints, cli, err := clientv3.New(clientv3.Config{
DialTimeout: dialTimeout, Endpoints: exampleEndpoints(),
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// minimum lease TTL is 5-second
resp, err := cli.Grant(context.TODO(), 5)
if err != nil {
log.Fatal(err)
}
// after 5 seconds, the key 'foo' will be removed
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
if err != nil {
log.Fatal(err)
}
}) })
if err != nil { //Output:
log.Fatal(err) }
}
defer cli.Close()
// minimum lease TTL is 5-second func mockLease_revoke() {
resp, err := cli.Grant(context.TODO(), 5) fmt.Println("number of keys: 0")
if err != nil {
log.Fatal(err)
}
// after 5 seconds, the key 'foo' will be removed
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
if err != nil {
log.Fatal(err)
}
} }
func ExampleLease_revoke() { func ExampleLease_revoke() {
cli, err := clientv3.New(clientv3.Config{ forUnitTestsRunInMockedContext(mockLease_revoke, func() {
Endpoints: endpoints, cli, err := clientv3.New(clientv3.Config{
DialTimeout: dialTimeout, Endpoints: exampleEndpoints(),
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
resp, err := cli.Grant(context.TODO(), 5)
if err != nil {
log.Fatal(err)
}
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
if err != nil {
log.Fatal(err)
}
// revoking lease expires the key attached to its lease ID
_, err = cli.Revoke(context.TODO(), resp.ID)
if err != nil {
log.Fatal(err)
}
gresp, err := cli.Get(context.TODO(), "foo")
if err != nil {
log.Fatal(err)
}
fmt.Println("number of keys:", len(gresp.Kvs))
}) })
if err != nil {
log.Fatal(err)
}
defer cli.Close()
resp, err := cli.Grant(context.TODO(), 5)
if err != nil {
log.Fatal(err)
}
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
if err != nil {
log.Fatal(err)
}
// revoking lease expires the key attached to its lease ID
_, err = cli.Revoke(context.TODO(), resp.ID)
if err != nil {
log.Fatal(err)
}
gresp, err := cli.Get(context.TODO(), "foo")
if err != nil {
log.Fatal(err)
}
fmt.Println("number of keys:", len(gresp.Kvs))
// Output: number of keys: 0 // Output: number of keys: 0
} }
func mockLease_keepAlive() {
fmt.Println("ttl: 5")
}
func ExampleLease_keepAlive() { func ExampleLease_keepAlive() {
cli, err := clientv3.New(clientv3.Config{ forUnitTestsRunInMockedContext(mockLease_keepAlive, func() {
Endpoints: endpoints, cli, err := clientv3.New(clientv3.Config{
DialTimeout: dialTimeout, Endpoints: exampleEndpoints(),
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
resp, err := cli.Grant(context.TODO(), 5)
if err != nil {
log.Fatal(err)
}
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
if err != nil {
log.Fatal(err)
}
// the key 'foo' will be kept forever
ch, kaerr := cli.KeepAlive(context.TODO(), resp.ID)
if kaerr != nil {
log.Fatal(kaerr)
}
ka := <-ch
if ka != nil {
fmt.Println("ttl:", ka.TTL)
} else {
fmt.Println("Unexpected NULL")
}
}) })
if err != nil {
log.Fatal(err)
}
defer cli.Close()
resp, err := cli.Grant(context.TODO(), 5)
if err != nil {
log.Fatal(err)
}
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
if err != nil {
log.Fatal(err)
}
// the key 'foo' will be kept forever
ch, kaerr := cli.KeepAlive(context.TODO(), resp.ID)
if kaerr != nil {
log.Fatal(kaerr)
}
ka := <-ch
fmt.Println("ttl:", ka.TTL)
// Output: ttl: 5 // Output: ttl: 5
} }
func mockLease_keepAliveOnce() {
fmt.Println("ttl: 5")
}
func ExampleLease_keepAliveOnce() { func ExampleLease_keepAliveOnce() {
cli, err := clientv3.New(clientv3.Config{ forUnitTestsRunInMockedContext(mockLease_keepAliveOnce, func() {
Endpoints: endpoints, cli, err := clientv3.New(clientv3.Config{
DialTimeout: dialTimeout, Endpoints: exampleEndpoints(),
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
resp, err := cli.Grant(context.TODO(), 5)
if err != nil {
log.Fatal(err)
}
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
if err != nil {
log.Fatal(err)
}
// to renew the lease only once
ka, kaerr := cli.KeepAliveOnce(context.TODO(), resp.ID)
if kaerr != nil {
log.Fatal(kaerr)
}
fmt.Println("ttl:", ka.TTL)
}) })
if err != nil {
log.Fatal(err)
}
defer cli.Close()
resp, err := cli.Grant(context.TODO(), 5)
if err != nil {
log.Fatal(err)
}
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
if err != nil {
log.Fatal(err)
}
// to renew the lease only once
ka, kaerr := cli.KeepAliveOnce(context.TODO(), resp.ID)
if kaerr != nil {
log.Fatal(kaerr)
}
fmt.Println("ttl:", ka.TTL)
// Output: ttl: 5 // Output: ttl: 5
} }

View File

@ -12,51 +12,56 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package clientv3test package clientv3_test
import ( import (
"context" "context"
"fmt"
"log" "log"
"go.etcd.io/etcd/v3/clientv3" "go.etcd.io/etcd/v3/clientv3"
) )
func ExampleMaintenance_status() { func mockMaintenance_status() {}
for _, ep := range endpoints {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
resp, err := cli.Status(context.Background(), ep) func ExampleMaintenance_status() {
if err != nil { forUnitTestsRunInMockedContext(mockMaintenance_status, func() {
log.Fatal(err) for _, ep := range exampleEndpoints() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
_, err = cli.Status(context.Background(), ep)
if err != nil {
log.Fatal(err)
}
} }
fmt.Printf("endpoint: %s / Leader: %v\n", ep, resp.Header.MemberId == resp.Leader) })
} // Output:
// endpoint: localhost:2379 / Leader: false
// endpoint: localhost:22379 / Leader: false
// endpoint: localhost:32379 / Leader: true
} }
func mockMaintenance_defragment() {}
func ExampleMaintenance_defragment() { func ExampleMaintenance_defragment() {
for _, ep := range endpoints { forUnitTestsRunInMockedContext(mockMaintenance_defragment, func() {
cli, err := clientv3.New(clientv3.Config{ for _, ep := range exampleEndpoints() {
Endpoints: []string{ep}, cli, err := clientv3.New(clientv3.Config{
DialTimeout: dialTimeout, Endpoints: []string{ep},
}) DialTimeout: dialTimeout,
if err != nil { })
log.Fatal(err) if err != nil {
} log.Fatal(err)
defer cli.Close() }
defer cli.Close()
if _, err = cli.Defragment(context.TODO(), ep); err != nil { if _, err = cli.Defragment(context.TODO(), ep); err != nil {
log.Fatal(err) log.Fatal(err)
}
} }
} })
// Output:
} }

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package clientv3test package clientv3_test
import ( import (
"context" "context"
@ -30,56 +30,62 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
) )
func mockClient_metrics() {
fmt.Println(`grpc_client_started_total{grpc_method="Range",grpc_service="etcdserverpb.KV",grpc_type="unary"} 1`)
}
func ExampleClient_metrics() { func ExampleClient_metrics() {
cli, err := clientv3.New(clientv3.Config{ forUnitTestsRunInMockedContext(mockClient_metrics, func() {
Endpoints: endpoints, cli, err := clientv3.New(clientv3.Config{
DialOptions: []grpc.DialOption{ Endpoints: exampleEndpoints(),
grpc.WithUnaryInterceptor(grpcprom.UnaryClientInterceptor), DialOptions: []grpc.DialOption{
grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor), grpc.WithUnaryInterceptor(grpcprom.UnaryClientInterceptor),
}, grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor),
}) },
if err != nil { })
log.Fatal(err) if err != nil {
} log.Fatal(err)
defer cli.Close()
// get a key so it shows up in the metrics as a range RPC
cli.Get(context.TODO(), "test_key")
// listen for all Prometheus metrics
ln, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatal(err)
}
donec := make(chan struct{})
go func() {
defer close(donec)
http.Serve(ln, promhttp.Handler())
}()
defer func() {
ln.Close()
<-donec
}()
// make an http request to fetch all Prometheus metrics
url := "http://" + ln.Addr().String() + "/metrics"
resp, err := http.Get(url)
if err != nil {
log.Fatalf("fetch error: %v", err)
}
b, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
log.Fatalf("fetch error: reading %s: %v", url, err)
}
// confirm range request in metrics
for _, l := range strings.Split(string(b), "\n") {
if strings.Contains(l, `grpc_client_started_total{grpc_method="Range"`) {
fmt.Println(l)
break
} }
} defer cli.Close()
// get a key so it shows up in the metrics as a range RPC
cli.Get(context.TODO(), "test_key")
// listen for all Prometheus metrics
ln, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatal(err)
}
donec := make(chan struct{})
go func() {
defer close(donec)
http.Serve(ln, promhttp.Handler())
}()
defer func() {
ln.Close()
<-donec
}()
// make an http request to fetch all Prometheus metrics
url := "http://" + ln.Addr().String() + "/metrics"
resp, err := http.Get(url)
if err != nil {
log.Fatalf("fetch error: %v", err)
}
b, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
log.Fatalf("fetch error: reading %s: %v", url, err)
}
// confirm range request in metrics
for _, l := range strings.Split(string(b), "\n") {
if strings.Contains(l, `grpc_client_started_total{grpc_method="Range"`) {
fmt.Println(l)
break
}
}
})
// Output: // Output:
// grpc_client_started_total{grpc_method="Range",grpc_service="etcdserverpb.KV",grpc_type="unary"} 1 // grpc_client_started_total{grpc_method="Range",grpc_service="etcdserverpb.KV",grpc_type="unary"} 1
} }

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package clientv3test package clientv3_test
import ( import (
"context" "context"
@ -21,50 +21,58 @@ import (
"log" "log"
) )
func ExampleConfig_insecure() { func mockConfig_insecure() {}
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close() // make sure to close the client
_, err = cli.Put(context.TODO(), "foo", "bar") func ExampleConfig_insecure() {
if err != nil { forUnitTestsRunInMockedContext(mockConfig_insecure, func() {
log.Fatal(err) cli, err := clientv3.New(clientv3.Config{
} Endpoints: exampleEndpoints(),
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close() // make sure to close the client
_, err = cli.Put(context.TODO(), "foo", "bar")
if err != nil {
log.Fatal(err)
}
})
// Without the line below the test is not being executed // Without the line below the test is not being executed
// Output: // Output:
} }
func mockConfig_withTLS() {}
func ExampleConfig_withTLS() { func ExampleConfig_withTLS() {
tlsInfo := transport.TLSInfo{ forUnitTestsRunInMockedContext(mockConfig_withTLS, func() {
CertFile: "/tmp/test-certs/test-name-1.pem", tlsInfo := transport.TLSInfo{
KeyFile: "/tmp/test-certs/test-name-1-key.pem", CertFile: "/tmp/test-certs/test-name-1.pem",
TrustedCAFile: "/tmp/test-certs/trusted-ca.pem", KeyFile: "/tmp/test-certs/test-name-1-key.pem",
} TrustedCAFile: "/tmp/test-certs/trusted-ca.pem",
tlsConfig, err := tlsInfo.ClientConfig() }
if err != nil { tlsConfig, err := tlsInfo.ClientConfig()
log.Fatal(err) if err != nil {
} log.Fatal(err)
cli, err := clientv3.New(clientv3.Config{ }
Endpoints: endpoints, cli, err := clientv3.New(clientv3.Config{
DialTimeout: dialTimeout, Endpoints: exampleEndpoints(),
TLS: tlsConfig, DialTimeout: dialTimeout,
}) TLS: tlsConfig,
if err != nil { })
log.Fatal(err) if err != nil {
} log.Fatal(err)
defer cli.Close() // make sure to close the client }
defer cli.Close() // make sure to close the client
_, err = cli.Put(context.TODO(), "foo", "bar") _, err = cli.Put(context.TODO(), "foo", "bar")
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
})
// Without the line below the test is not being executed // Without the line below the test is not being executed
// Output: // Output:
} }

View File

@ -12,89 +12,148 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package clientv3test package clientv3_test
import ( import (
"context" "context"
"fmt" "fmt"
"log" "log"
"time"
"go.etcd.io/etcd/v3/clientv3" "go.etcd.io/etcd/v3/clientv3"
) )
func ExampleWatcher_watch() { func mockWatcher_watch() {
cli, err := clientv3.New(clientv3.Config{ fmt.Println(`PUT "foo" : "bar"`)
Endpoints: endpoints, }
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
rch := cli.Watch(context.Background(), "foo") func ExampleWatcher_watch() {
for wresp := range rch { forUnitTestsRunInMockedContext(mockWatcher_watch, func() {
for _, ev := range wresp.Events { cli, err := clientv3.New(clientv3.Config{
fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) Endpoints: exampleEndpoints(),
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
} }
} defer cli.Close()
rch := cli.Watch(context.Background(), "foo")
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
})
// PUT "foo" : "bar" // PUT "foo" : "bar"
} }
func ExampleWatcher_watchWithPrefix() { func mockWatcher_watchWithPrefix() {
cli, err := clientv3.New(clientv3.Config{ fmt.Println(`PUT "foo1" : "bar"`)
Endpoints: endpoints, }
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
rch := cli.Watch(context.Background(), "foo", clientv3.WithPrefix()) func ExampleWatcher_watchWithPrefix() {
for wresp := range rch { forUnitTestsRunInMockedContext(mockWatcher_watchWithPrefix, func() {
for _, ev := range wresp.Events { cli, err := clientv3.New(clientv3.Config{
fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) Endpoints: exampleEndpoints(),
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
} }
} defer cli.Close()
rch := cli.Watch(context.Background(), "foo", clientv3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
})
// PUT "foo1" : "bar" // PUT "foo1" : "bar"
} }
func mockWatcher_watchWithRange() {
fmt.Println(`PUT "foo1" : "bar1"`)
fmt.Println(`PUT "foo2" : "bar2"`)
fmt.Println(`PUT "foo3" : "bar3"`)
}
func ExampleWatcher_watchWithRange() { func ExampleWatcher_watchWithRange() {
cli, err := clientv3.New(clientv3.Config{ forUnitTestsRunInMockedContext(mockWatcher_watchWithRange, func() {
Endpoints: endpoints, cli, err := clientv3.New(clientv3.Config{
DialTimeout: dialTimeout, Endpoints: exampleEndpoints(),
}) DialTimeout: dialTimeout,
if err != nil { })
log.Fatal(err) if err != nil {
} log.Fatal(err)
defer cli.Close()
// watches within ['foo1', 'foo4'), in lexicographical order
rch := cli.Watch(context.Background(), "foo1", clientv3.WithRange("foo4"))
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
} }
} defer cli.Close()
// PUT "foo1" : "bar"
// PUT "foo2" : "bar" // watches within ['foo1', 'foo4'), in lexicographical order
// PUT "foo3" : "bar" rch := cli.Watch(context.Background(), "foo1", clientv3.WithRange("foo4"))
go func() {
cli.Put(context.Background(), "foo1", "bar1")
cli.Put(context.Background(), "foo5", "bar5")
cli.Put(context.Background(), "foo2", "bar2")
cli.Put(context.Background(), "foo3", "bar3")
}()
i := 0
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
i++
if i == 3 {
// After 3 messages we are done.
cli.Delete(context.Background(), "foo", clientv3.WithPrefix())
cli.Close()
return
}
}
}
})
// Output:
// PUT "foo1" : "bar1"
// PUT "foo2" : "bar2"
// PUT "foo3" : "bar3"
}
func mockWatcher_watchWithProgressNotify() {
fmt.Println(`wresp.IsProgressNotify: true`)
} }
func ExampleWatcher_watchWithProgressNotify() { func ExampleWatcher_watchWithProgressNotify() {
cli, err := clientv3.New(clientv3.Config{ forUnitTestsRunInMockedContext(mockWatcher_watchWithProgressNotify, func() {
Endpoints: endpoints, cli, err := clientv3.New(clientv3.Config{
DialTimeout: dialTimeout, Endpoints: exampleEndpoints(),
}) DialTimeout: dialTimeout,
if err != nil { })
log.Fatal(err) if err != nil {
} log.Fatal(err)
}
rch := cli.Watch(context.Background(), "foo", clientv3.WithProgressNotify()) rch := cli.Watch(context.Background(), "foo", clientv3.WithProgressNotify())
wresp := <-rch closedch := make(chan bool)
fmt.Printf("wresp.Header.Revision: %d\n", wresp.Header.Revision) go func() {
fmt.Println("wresp.IsProgressNotify:", wresp.IsProgressNotify()) // This assumes that cluster is configured with frequent WatchProgressNotifyInterval
// wresp.Header.Revision: 0 // e.g. WatchProgressNotifyInterval: 200 * time.Millisecond.
time.Sleep(time.Second)
err := cli.Close()
if err != nil {
log.Fatal(err)
}
close(closedch)
}()
wresp := <-rch
fmt.Println("wresp.IsProgressNotify:", wresp.IsProgressNotify())
<-closedch
})
// TODO: Rather wresp.IsProgressNotify: true should be expected
// Output:
// wresp.IsProgressNotify: true // wresp.IsProgressNotify: true
} }

View File

@ -1,4 +1,4 @@
// Copyright 2016 The etcd Authors // Copyright 2017 The etcd Authors
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -12,67 +12,41 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package clientv3test package clientv3_test
import ( import (
"fmt"
"go.etcd.io/etcd/tests/v3/integration"
"go.etcd.io/etcd/v3/clientv3"
"go.etcd.io/etcd/v3/pkg/testutil"
"google.golang.org/grpc/grpclog"
"os" "os"
"strings"
"testing" "testing"
"time" "time"
"go.etcd.io/etcd/tests/v3/integration"
"go.etcd.io/etcd/v3/pkg/testutil"
) )
var ( const (
dialTimeout = 5 * time.Second dialTimeout = 5 * time.Second
requestTimeout = 10 * time.Second requestTimeout = 10 * time.Second
endpoints = []string{"localhost:2379", "localhost:22379", "localhost:32379"}
) )
var lazyCluster = integration.NewLazyClusterWithConfig(
integration.ClusterConfig{
Size: 3,
WatchProgressNotifyInterval: 200 * time.Millisecond})
func exampleEndpoints() []string { return lazyCluster.EndpointsV3() }
func forUnitTestsRunInMockedContext(mocking func(), example func()) {
// For integration tests runs in the provided environment
example()
}
// TestMain sets up an etcd cluster if running the examples. // TestMain sets up an etcd cluster if running the examples.
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
useCluster, hasRunArg := false, false // default to running only Test* v := m.Run()
for _, arg := range os.Args { lazyCluster.Terminate()
if strings.HasPrefix(arg, "-test.run=") {
exp := strings.Split(arg, "=")[1]
useCluster = strings.Contains(exp, "Example")
hasRunArg = true
break
}
}
if !hasRunArg {
// force only running Test* if no args given to avoid leak false
// positives from having a long-running cluster for the examples.
os.Args = append(os.Args, "-test.run=Test")
}
var v int if v == 0 {
if useCluster { testutil.MustCheckLeakedGoroutine()
// Redirecting outputs to Stderr, such that they not interleave with examples outputs.
// Setting it once and before running any of the test such that it not data-races
// between HTTP servers running in different tests.
clientv3.SetLogger(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
cfg := integration.ClusterConfig{Size: 3}
clus := integration.NewClusterV3(nil, &cfg)
endpoints = make([]string, 3)
for i := range endpoints {
endpoints[i] = clus.Client(i).Endpoints()[0]
}
v = m.Run()
clus.Terminate(nil)
if err := testutil.CheckAfterTest(time.Second); err != nil {
fmt.Fprintf(os.Stderr, "%v", err)
os.Exit(1)
}
} else {
v = m.Run()
}
if v == 0 && testutil.CheckLeakedGoroutine() {
os.Exit(1)
} }
os.Exit(v) os.Exit(v)
} }

View File

@ -312,7 +312,8 @@ func (c *cluster) mustNewMember(t testing.TB) *member {
return m return m
} }
func (c *cluster) addMember(t testing.TB) { // addMember return PeerURLs of the added member.
func (c *cluster) addMember(t testing.TB) types.URLs {
m := c.mustNewMember(t) m := c.mustNewMember(t)
scheme := schemeFromTLSInfo(c.cfg.PeerTLS) scheme := schemeFromTLSInfo(c.cfg.PeerTLS)
@ -327,7 +328,11 @@ func (c *cluster) addMember(t testing.TB) {
} }
} }
if err != nil { if err != nil {
t.Fatalf("add member failed on all members error: %v", err) if t != nil {
t.Fatalf("add member failed on all members error: %v", err)
} else {
log.Fatalf("add member failed on all members error: %v", err)
}
} }
m.InitialPeerURLsMap = types.URLsMap{} m.InitialPeerURLsMap = types.URLsMap{}
@ -342,6 +347,7 @@ func (c *cluster) addMember(t testing.TB) {
c.Members = append(c.Members, m) c.Members = append(c.Members, m)
// wait cluster to be stable to receive future client requests // wait cluster to be stable to receive future client requests
c.waitMembersMatch(t, c.HTTPMembers()) c.waitMembersMatch(t, c.HTTPMembers())
return m.PeerURLs
} }
func (c *cluster) addMemberByURL(t testing.TB, clientURL, peerURL string) error { func (c *cluster) addMemberByURL(t testing.TB, clientURL, peerURL string) error {
@ -360,8 +366,9 @@ func (c *cluster) addMemberByURL(t testing.TB, clientURL, peerURL string) error
return nil return nil
} }
func (c *cluster) AddMember(t testing.TB) { // AddMember return PeerURLs of the added member.
c.addMember(t) func (c *cluster) AddMember(t testing.TB) types.URLs {
return c.addMember(t)
} }
func (c *cluster) RemoveMember(t testing.TB, id uint64) { func (c *cluster) RemoveMember(t testing.TB, id uint64) {

View File

@ -23,13 +23,26 @@ import (
"go.etcd.io/etcd/v3/pkg/transport" "go.etcd.io/etcd/v3/pkg/transport"
) )
// Infrastructure to provision a single shared cluster for tests - only
// when its needed.
//
// See ./tests/integration/clientv3/examples/main_test.go for canonical usage.
// Please notice that the shared (LazyCluster's) state is preserved between
// testcases, so left-over state might has cross-testcase effects.
// Prefer dedicated clusters for substancial test-cases.
type LazyCluster interface { type LazyCluster interface {
// EndpointsV2 - call to this method might initialize the cluster. // EndpointsV2 - exposes connection points for client v2.
// Calls to this method might initialize the cluster.
EndpointsV2() []string EndpointsV2() []string
// EndpointsV2 - call to this method might initialize the cluster. // EndpointsV3 - exposes connection points for client v3.
// Calls to this method might initialize the cluster.
EndpointsV3() []string EndpointsV3() []string
// Cluster - calls to this method might initialize the cluster.
Cluster() *ClusterV3
// Transport - call to this method might initialize the cluster. // Transport - call to this method might initialize the cluster.
Transport() *http.Transport Transport() *http.Transport
@ -46,7 +59,13 @@ type lazyCluster struct {
// NewLazyCluster returns a new test cluster handler that gets created on the // NewLazyCluster returns a new test cluster handler that gets created on the
// first call to GetEndpoints() or GetTransport() // first call to GetEndpoints() or GetTransport()
func NewLazyCluster() LazyCluster { func NewLazyCluster() LazyCluster {
return &lazyCluster{cfg: ClusterConfig{Size: 1}} return NewLazyClusterWithConfig(ClusterConfig{Size: 1})
}
// NewLazyClusterWithConfig returns a new test cluster handler that gets created
// on the first call to GetEndpoints() or GetTransport()
func NewLazyClusterWithConfig(cfg ClusterConfig) LazyCluster {
return &lazyCluster{cfg: cfg}
} }
func (lc *lazyCluster) mustLazyInit() { func (lc *lazyCluster) mustLazyInit() {
@ -61,19 +80,23 @@ func (lc *lazyCluster) mustLazyInit() {
} }
func (lc *lazyCluster) Terminate() { func (lc *lazyCluster) Terminate() {
if lc != nil { if lc != nil && lc.cluster != nil {
lc.cluster.Terminate(nil) lc.cluster.Terminate(nil)
lc.cluster = nil
} }
} }
func (lc *lazyCluster) EndpointsV2() []string { func (lc *lazyCluster) EndpointsV2() []string {
lc.mustLazyInit() return []string{lc.Cluster().Members[0].URL()}
return []string{lc.cluster.Members[0].URL()}
} }
func (lc *lazyCluster) EndpointsV3() []string { func (lc *lazyCluster) EndpointsV3() []string {
return lc.Cluster().Client(0).Endpoints()
}
func (lc *lazyCluster) Cluster() *ClusterV3 {
lc.mustLazyInit() lc.mustLazyInit()
return []string{lc.cluster.Client(0).Endpoints()[0]} return lc.cluster
} }
func (lc *lazyCluster) Transport() *http.Transport { func (lc *lazyCluster) Transport() *http.Transport {