Discussion:
[lxc-devel] [lxd/master] [WIP] Cluster https address support
freeekanayaka on Github
2018-11-28 12:37:15 UTC
Permalink
From 15ea50bb1401a67b5d0151dfdc051e3b314f6bfa Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <***@canonical.com>
Date: Mon, 26 Nov 2018 11:06:02 +0100
Subject: [PATCH 01/10] Add cluster.https_address to node configuration

Signed-off-by: Free Ekanayaka <***@canonical.com>
---
lxd/node/config.go | 25 +++++++++++++++++++++++++
lxd/node/config_test.go | 24 ++++++++++++++++++++++++
2 files changed, 49 insertions(+)

diff --git a/lxd/node/config.go b/lxd/node/config.go
index 186c8056aa..c67814c030 100644
--- a/lxd/node/config.go
+++ b/lxd/node/config.go
@@ -37,6 +37,12 @@ func (c *Config) HTTPSAddress() string {
return c.m.GetString("core.https_address")
}

+// ClusterAddress returns the address and port this LXD node should use for
+// cluster communication.
+func (c *Config) ClusterAddress() string {
+ return c.m.GetString("cluster.https_address")
+}
+
// DebugAddress returns the address and port to setup the pprof listener on
func (c *Config) DebugAddress() string {
return c.m.GetString("core.debug_address")
@@ -85,6 +91,22 @@ func HTTPSAddress(node *db.Node) (string, error) {
return config.HTTPSAddress(), nil
}

+// ClusterAddress is a convenience for loading the node configuration and
+// returning the value of cluster.https_address.
+func ClusterAddress(node *db.Node) (string, error) {
+ var config *Config
+ err := node.Transaction(func(tx *db.NodeTx) error {
+ var err error
+ config, err = ConfigLoad(tx)
+ return err
+ })
+ if err != nil {
+ return "", err
+ }
+
+ return config.ClusterAddress(), nil
+}
+
// DebugAddress is a convenience for loading the node configuration and
// returning the value of core.debug_address.
func DebugAddress(node *db.Node) (string, error) {
@@ -120,6 +142,9 @@ var ConfigSchema = config.Schema{
// Network address for this LXD server
"core.https_address": {},

+ // Network address for cluster communication
+ "cluster.https_address": {},
+
// Network address for the debug server
"core.debug_address": {},

diff --git a/lxd/node/config_test.go b/lxd/node/config_test.go
index 939fed176e..fb20433f97 100644
--- a/lxd/node/config_test.go
+++ b/lxd/node/config_test.go
@@ -118,3 +118,27 @@ func TestHTTPSAddress(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, "127.0.0.1:666", address)
}
+
+// The cluster.https_address config key is fetched from the db with a new
+// transaction.
+func TestClusterAddress(t *testing.T) {
+ nodeDB, cleanup := db.NewTestNode(t)
+ defer cleanup()
+
+ address, err := node.ClusterAddress(nodeDB)
+ require.NoError(t, err)
+ assert.Equal(t, "", address)
+
+ err = nodeDB.Transaction(func(tx *db.NodeTx) error {
+ config, err := node.ConfigLoad(tx)
+ require.NoError(t, err)
+ _, err = config.Replace(map[string]interface{}{"cluster.https_address": "127.0.0.1:666"})
+ require.NoError(t, err)
+ return nil
+ })
+ require.NoError(t, err)
+
+ address, err = node.ClusterAddress(nodeDB)
+ require.NoError(t, err)
+ assert.Equal(t, "127.0.0.1:666", address)
+}

From 7c9bc5f2185a66b4d89bd639da275f4fb924a8f4 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <***@canonical.com>
Date: Mon, 26 Nov 2018 11:29:18 +0100
Subject: [PATCH 02/10] Drive by: fix unit test not actually checking for error

Signed-off-by: Free Ekanayaka <***@canonical.com>
---
lxd/db/node/update_test.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lxd/db/node/update_test.go b/lxd/db/node/update_test.go
index 36d3d2136e..a99fd96dfc 100644
--- a/lxd/db/node/update_test.go
+++ b/lxd/db/node/update_test.go
@@ -28,7 +28,7 @@ func TestUpdateFromV36_DropTables(t *testing.T) {
require.NoError(t, err)

var current []string
- query.Transaction(db, func(tx *sql.Tx) error {
+ err = query.Transaction(db, func(tx *sql.Tx) error {
var err error
stmt := "SELECT name FROM sqlite_master WHERE type='table'"
current, err = query.SelectStrings(tx, stmt)

From 1f08c4d64e4afc2050f131f0faff8190d7964e42 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <***@canonical.com>
Date: Mon, 26 Nov 2018 11:29:38 +0100
Subject: [PATCH 03/10] Copy core.https_address to cluster.https_address when
upgrading clustered node

Signed-off-by: Free Ekanayaka <***@canonical.com>
---
lxd/db/node/update.go | 28 ++++++++++++++++++++++++
lxd/db/node/update_test.go | 44 ++++++++++++++++++++++++++++++++++++++
2 files changed, 72 insertions(+)

diff --git a/lxd/db/node/update.go b/lxd/db/node/update.go
index 65ba00f889..aeb23b25ee 100644
--- a/lxd/db/node/update.go
+++ b/lxd/db/node/update.go
@@ -8,9 +8,11 @@ import (
"strconv"
"strings"

+ "github.com/lxc/lxd/lxd/db/query"
"github.com/lxc/lxd/lxd/db/schema"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/logger"
+ "github.com/pkg/errors"
)

// Schema for the local database.
@@ -90,6 +92,7 @@ var updates = map[int]schema.Update{
35: updateFromV34,
36: updateFromV35,
37: updateFromV36,
+ 38: updateFromV37,
}

// UpdateFromPreClustering is the last schema version where clustering support
@@ -98,6 +101,31 @@ const UpdateFromPreClustering = 36

// Schema updates begin here

+// Copy core.https_address to cluster.https_address in case this node is
+// clustered.
+func updateFromV37(tx *sql.Tx) error {
+ count, err := query.Count(tx, "raft_nodes", "")
+ if err != nil {
+ return errors.Wrap(err, "Fetch count of Raft nodes")
+ }
+
+ if count == 0 {
+ // This node is not clustered, nothing to do.
+ return nil
+ }
+
+ // Copy the core.https_address config.
+ _, err = tx.Exec(`
+INSERT INTO config (key, value)
+ SELECT 'cluster.https_address', value FROM config WHERE key = 'core.https_address'
+`)
+ if err != nil {
+ return errors.Wrap(err, "Insert cluster.https_address config")
+ }
+
+ return nil
+}
+
// Add a raft_nodes table to be used when running in clustered mode. It lists
// the current nodes in the LXD cluster that are participating to the dqlite
// database Raft cluster.
diff --git a/lxd/db/node/update_test.go b/lxd/db/node/update_test.go
index a99fd96dfc..045e68ef83 100644
--- a/lxd/db/node/update_test.go
+++ b/lxd/db/node/update_test.go
@@ -43,3 +43,47 @@ func TestUpdateFromV36_DropTables(t *testing.T) {
assert.False(t, shared.StringInSlice(name, current))
}
}
+
+// If clustering is enabled, the core.https_address config gets copied to
+// cluster.https_config.
+func TestUpdateFromV37_CopyCoreHTTPSAddress(t *testing.T) {
+ schema := node.Schema()
+ db, err := schema.ExerciseUpdate(38, func(db *sql.DB) {
+ _, err := db.Exec("INSERT INTO raft_nodes VALUES (1, '1.2.3.4:666')")
+ require.NoError(t, err)
+
+ _, err = db.Exec("INSERT INTO config VALUES (1, 'core.https_address', '1.2.3.4:666')")
+ require.NoError(t, err)
+ })
+ require.NoError(t, err)
+
+ var clusterAddress string
+ err = query.Transaction(db, func(tx *sql.Tx) error {
+ stmt := "SELECT value FROM config WHERE key='cluster.https_address'"
+ row := tx.QueryRow(stmt)
+ err := row.Scan(&clusterAddress)
+ return err
+ })
+ require.NoError(t, err)
+
+ assert.Equal(t, clusterAddress, "1.2.3.4:666")
+}
+
+// If clustering is not enabled, the core.https_address config does not get copied.
+func TestUpdateFromV37_NotClustered(t *testing.T) {
+ schema := node.Schema()
+ db, err := schema.ExerciseUpdate(38, func(db *sql.DB) {
+ _, err := db.Exec("INSERT INTO config VALUES (1, 'core.https_address', '1.2.3.4:666')")
+ require.NoError(t, err)
+ })
+ require.NoError(t, err)
+
+ var clusterAddress string
+ err = query.Transaction(db, func(tx *sql.Tx) error {
+ stmt := "SELECT value FROM config WHERE key='cluster.https_address'"
+ row := tx.QueryRow(stmt)
+ err := row.Scan(&clusterAddress)
+ return err
+ })
+ require.EqualError(t, err, "sql: no rows in result set")
+}

From c30dd1ddf59fa125bbbcf057367f1433a3ff0d3b Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <***@canonical.com>
Date: Mon, 26 Nov 2018 11:40:49 +0100
Subject: [PATCH 04/10] Modify node.DetermineRaftNode to look at
cluster.https_address

Signed-off-by: Free Ekanayaka <***@canonical.com>
---
lxd/node/raft.go | 34 ++++++++++++++++++----------------
lxd/node/raft_test.go | 16 ++++++++--------
2 files changed, 26 insertions(+), 24 deletions(-)

diff --git a/lxd/node/raft.go b/lxd/node/raft.go
index 8b4605356a..0c39362484 100644
--- a/lxd/node/raft.go
+++ b/lxd/node/raft.go
@@ -1,37 +1,40 @@
package node

-import "github.com/lxc/lxd/lxd/db"
+import (
+ "github.com/lxc/lxd/lxd/db"
+)

// DetermineRaftNode figures out what raft node ID and address we have, if any.
//
-// This decision is based on the values of the core.https_address config key
+// This decision is based on the values of the cluster.https_address config key
// and on the rows in the raft_nodes table, both stored in the node-level
// SQLite database.
//
// The following rules are applied:
//
-// - If no core.https_address config key is set, this is a non-clustered node
+// - If no cluster.https_address config key is set, this is a non-clustered node
// and the returned RaftNode will have ID 1 but no address, to signal that
// the node should setup an in-memory raft cluster where the node itself
// is the only member and leader.
//
-// - If core.https_address config key is set, but there is no row in the
-// raft_nodes table, this is a non-clustered node as well, and same behavior
-// as the previous case applies.
+// - If cluster.https_address config key is set, but there is no row in the
+// raft_nodes table, this is a brand new clustered node that is joining a
+// cluster, and same behavior as the previous case applies.
//
-// - If core.https_address config key is set and there is at least one row in
-// the raft_nodes table, then this node is considered a raft node if
-// core.https_address matches one of the rows in raft_nodes. In that case,
-// the matching db.RaftNode row is returned, otherwise nil.
+// - If cluster.https_address config key is set, a consistency check is made to
+// verify that and there is at least one row in the raft_nodes table. This
+// node is considered a raft node if cluster.https_address matches one of the
+// rows in raft_nodes. In that case, the matching db.RaftNode row is
+// returned, otherwise nil.
func DetermineRaftNode(tx *db.NodeTx) (*db.RaftNode, error) {
config, err := ConfigLoad(tx)
if err != nil {
return nil, err
}

- address := config.HTTPSAddress()
+ address := config.ClusterAddress()

- // If core.https_address is the empty string, then this LXD instance is
+ // If cluster.https_address is the empty string, then this LXD instance is
// not running in clustering mode.
if address == "" {
return &db.RaftNode{ID: 1}, nil
@@ -42,14 +45,13 @@ func DetermineRaftNode(tx *db.NodeTx) (*db.RaftNode, error) {
return nil, err
}

- // If core.https_address is set, but raft_nodes has no rows, this is
- // still an instance not running in clustering mode.
+ // If cluster.https_address and the raft_nodes table is not populated,
+ // this must be a joining node.
if len(nodes) == 0 {
return &db.RaftNode{ID: 1}, nil
}

- // If there is one or more row in raft_nodes, try to find a matching
- // one.
+ // Try to find a matching node.
for _, node := range nodes {
if node.Address == address {
return &node, nil
diff --git a/lxd/node/raft_test.go b/lxd/node/raft_test.go
index b376bdc3f2..c3bcbd7c98 100644
--- a/lxd/node/raft_test.go
+++ b/lxd/node/raft_test.go
@@ -11,40 +11,40 @@ import (
)

// The raft identity (ID and address) of a node depends on the value of
-// core.https_address and the entries of the raft_nodes table.
+// cluster.https_address and the entries of the raft_nodes table.
func TestDetermineRaftNode(t *testing.T) {
cases := []struct {
title string
- address string // Value of core.https_address
+ address string // Value of cluster.https_address
addresses []string // Entries in raft_nodes
node *db.RaftNode // Expected node value
}{
{
- `no core.https_address set`,
+ `no cluster.https_address set`,
"",
[]string{},
&db.RaftNode{ID: 1},
},
{
- `core.https_address set and and no raft_nodes rows`,
+ `cluster.https_address set and and no raft_nodes rows`,
"1.2.3.4:8443",
[]string{},
&db.RaftNode{ID: 1},
},
{
- `core.https_address set and matching the one and only raft_nodes row`,
+ `cluster.https_address set and matching the one and only raft_nodes row`,
"1.2.3.4:8443",
[]string{"1.2.3.4:8443"},
&db.RaftNode{ID: 1, Address: "1.2.3.4:8443"},
},
{
- `core.https_address set and matching one of many raft_nodes rows`,
+ `cluster.https_address set and matching one of many raft_nodes rows`,
"5.6.7.8:999",
[]string{"1.2.3.4:666", "5.6.7.8:999"},
&db.RaftNode{ID: 2, Address: "5.6.7.8:999"},
},
{
- `core.https_address set and no matching raft_nodes row`,
+ `core.cluster set and no matching raft_nodes row`,
"1.2.3.4:666",
[]string{"5.6.7.8:999"},
nil,
@@ -56,7 +56,7 @@ func TestDetermineRaftNode(t *testing.T) {
tx, cleanup := db.NewTestNodeTx(t)
defer cleanup()

- err := tx.UpdateConfig(map[string]string{"core.https_address": c.address})
+ err := tx.UpdateConfig(map[string]string{"cluster.https_address": c.address})
require.NoError(t, err)

for _, address := range c.addresses {

From dd13fde3407b6f2e6dd67f5f593749db6a4a1fef Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <***@canonical.com>
Date: Mon, 26 Nov 2018 11:59:55 +0100
Subject: [PATCH 05/10] Modify the lxd/cluster sub-package to look at
cluster.https_address

Signed-off-by: Free Ekanayaka <***@canonical.com>
---
lxd/cluster/heartbeat_test.go | 3 ++-
lxd/cluster/membership.go | 30 +++++++++++++++++-------------
lxd/cluster/membership_test.go | 34 ++++++++++++++++++++++++----------
lxd/cluster/notify.go | 4 ++--
lxd/cluster/notify_test.go | 8 ++++----
lxd/cluster/raft_test.go | 4 ++--
6 files changed, 51 insertions(+), 32 deletions(-)

diff --git a/lxd/cluster/heartbeat_test.go b/lxd/cluster/heartbeat_test.go
index 68c9883518..7e9f18e01e 100644
--- a/lxd/cluster/heartbeat_test.go
+++ b/lxd/cluster/heartbeat_test.go
@@ -246,7 +246,8 @@ func (f *heartbeatFixture) node() (*state.State, *cluster.Gateway, string) {

address := server.Listener.Addr().String()
mf := &membershipFixtures{t: f.t, state: state}
- mf.NetworkAddress(address)
+ mf.CoreAddress(address)
+ mf.ClusterAddress(address)

var err error
require.NoError(f.t, state.Cluster.Close())
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index b367fab5d5..5d1ffd72b9 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -25,7 +25,7 @@ import (
// Bootstrap turns a non-clustered LXD instance into the first (and leader)
// node of a new LXD cluster.
//
-// This instance must already have its core.https_address set and be listening
+// This instance must already have its cluster.https_address set and be listening
// on the associated network address.
func Bootstrap(state *state.State, gateway *Gateway, name string) error {
// Check parameters
@@ -38,23 +38,27 @@ func Bootstrap(state *state.State, gateway *Gateway, name string) error {
return err
}

- var address string
+ var networkAddress string // Regular REST API address
+ var clusterAddress string // Address for Raft and dqlite traffic
+
err = state.Node.Transaction(func(tx *db.NodeTx) error {
// Fetch current network address and raft nodes
config, err := node.ConfigLoad(tx)
if err != nil {
return errors.Wrap(err, "failed to fetch node configuration")
}
- address = config.HTTPSAddress()
+
+ networkAddress = config.HTTPSAddress()
+ clusterAddress = config.ClusterAddress()

// Make sure node-local database state is in order.
- err = membershipCheckNodeStateForBootstrapOrJoin(tx, address)
+ err = membershipCheckNodeStateForBootstrapOrJoin(tx, clusterAddress)
if err != nil {
return err
}

// Add ourselves as first raft node
- err = tx.RaftNodeFirst(address)
+ err = tx.RaftNodeFirst(clusterAddress)
if err != nil {
return errors.Wrap(err, "failed to insert first raft node")
}
@@ -74,7 +78,7 @@ func Bootstrap(state *state.State, gateway *Gateway, name string) error {
}

// Add ourselves to the nodes table.
- err = tx.NodeUpdate(1, name, address)
+ err = tx.NodeUpdate(1, name, networkAddress)
if err != nil {
return errors.Wrap(err, "failed to update cluster node")
}
@@ -226,7 +230,7 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
if err != nil {
return errors.Wrap(err, "failed to fetch node configuration")
}
- address = config.HTTPSAddress()
+ address = config.ClusterAddress()

// Make sure node-local database state is in order.
err = membershipCheckNodeStateForBootstrapOrJoin(tx, address)
@@ -824,24 +828,24 @@ func Enabled(node *db.Node) (bool, error) {

// Check that node-related preconditions are met for bootstrapping or joining a
// cluster.
-func membershipCheckNodeStateForBootstrapOrJoin(tx *db.NodeTx, address string) error {
+func membershipCheckNodeStateForBootstrapOrJoin(tx *db.NodeTx, clusterAddress string) error {
nodes, err := tx.RaftNodes()
if err != nil {
return errors.Wrap(err, "failed to fetch current raft nodes")
}

- hasNetworkAddress := address != ""
+ hasClusterAddress := clusterAddress != ""
hasRaftNodes := len(nodes) > 0

// Sanity check that we're not in an inconsistent situation, where no
- // network address is set, but still there are entries in the
+ // cluster address is set, but still there are entries in the
// raft_nodes table.
- if !hasNetworkAddress && hasRaftNodes {
+ if !hasClusterAddress && hasRaftNodes {
return fmt.Errorf("inconsistent state: found leftover entries in raft_nodes")
}

- if !hasNetworkAddress {
- return fmt.Errorf("no core.https_address config is set on this node")
+ if !hasClusterAddress {
+ return fmt.Errorf("no cluster.https_address config is set on this node")
}
if hasRaftNodes {
return fmt.Errorf("the node is already part of a cluster")
diff --git a/lxd/cluster/membership_test.go b/lxd/cluster/membership_test.go
index a954381e2d..72c4c001d1 100644
--- a/lxd/cluster/membership_test.go
+++ b/lxd/cluster/membership_test.go
@@ -26,7 +26,8 @@ func TestBootstrap_UnmetPreconditions(t *testing.T) {
}{
{
func(f *membershipFixtures) {
- f.NetworkAddress("1.2.3.4:666")
+ f.ClusterAddress("1.2.3.4:666")
+ f.RaftNode("5.6.7.8:666")
filename := filepath.Join(f.state.OS.VarDir, "cluster.crt")
ioutil.WriteFile(filename, []byte{}, 0644)
},
@@ -34,11 +35,11 @@ func TestBootstrap_UnmetPreconditions(t *testing.T) {
},
{
func(*membershipFixtures) {},
- "no core.https_address config is set on this node",
+ "no cluster.https_address config is set on this node",
},
{
func(f *membershipFixtures) {
- f.NetworkAddress("1.2.3.4:666")
+ f.ClusterAddress("1.2.3.4:666")
f.RaftNode("5.6.7.8:666")
},
"the node is already part of a cluster",
@@ -51,7 +52,7 @@ func TestBootstrap_UnmetPreconditions(t *testing.T) {
},
{
func(f *membershipFixtures) {
- f.NetworkAddress("1.2.3.4:666")
+ f.ClusterAddress("1.2.3.4:666")
f.ClusterNode("5.6.7.8:666")
},
"inconsistent state: found leftover entries in nodes",
@@ -89,7 +90,8 @@ func TestBootstrap(t *testing.T) {

address := server.Listener.Addr().String()
f := &membershipFixtures{t: t, state: state}
- f.NetworkAddress(address)
+ f.CoreAddress(address)
+ f.ClusterAddress(address)

err := cluster.Bootstrap(state, gateway, "buzz")
require.NoError(t, err)
@@ -264,7 +266,8 @@ func TestJoin(t *testing.T) {
require.NoError(t, err)

targetF := &membershipFixtures{t: t, state: targetState}
- targetF.NetworkAddress(targetAddress)
+ targetF.CoreAddress(targetAddress)
+ targetF.ClusterAddress(targetAddress)

err = cluster.Bootstrap(targetState, targetGateway, "buzz")
require.NoError(t, err)
@@ -300,7 +303,7 @@ func TestJoin(t *testing.T) {
require.NoError(t, err)

f := &membershipFixtures{t: t, state: state}
- f.NetworkAddress(address)
+ f.ClusterAddress(address)

// Accept the joining node.
raftNodes, err := cluster.Accept(
@@ -387,7 +390,7 @@ func FLAKY_TestPromote(t *testing.T) {
"db.bin", store, targetAddress, "/unused/db/dir", 5*time.Second, dqlite.WithDialFunc(dialFunc))
require.NoError(t, err)
targetF := &membershipFixtures{t: t, state: targetState}
- targetF.NetworkAddress(targetAddress)
+ targetF.ClusterAddress(targetAddress)

err = cluster.Bootstrap(targetState, targetGateway, "buzz")
require.NoError(t, err)
@@ -403,7 +406,7 @@ func FLAKY_TestPromote(t *testing.T) {
address := server.Listener.Addr().String()
targetF.ClusterNode(address) // Add the non database node to the cluster database
f := &membershipFixtures{t: t, state: state}
- f.NetworkAddress(address)
+ f.ClusterAddress(address)
f.RaftNode(targetAddress) // Insert the leader in our local list of database nodes

gateway := newGateway(t, state.Node, targetCert)
@@ -445,7 +448,7 @@ type membershipFixtures struct {
}

// Set core.https_address to the given value.
-func (h *membershipFixtures) NetworkAddress(address string) {
+func (h *membershipFixtures) CoreAddress(address string) {
err := h.state.Node.Transaction(func(tx *db.NodeTx) error {
config := map[string]string{
"core.https_address": address,
@@ -455,6 +458,17 @@ func (h *membershipFixtures) NetworkAddress(address string) {
require.NoError(h.t, err)
}

+// Set cluster.https_address to the given value.
+func (h *membershipFixtures) ClusterAddress(address string) {
+ err := h.state.Node.Transaction(func(tx *db.NodeTx) error {
+ config := map[string]string{
+ "cluster.https_address": address,
+ }
+ return tx.UpdateConfig(config)
+ })
+ require.NoError(h.t, err)
+}
+
// Add the given address to the raft_nodes table.
func (h *membershipFixtures) RaftNode(address string) {
err := h.state.Node.Transaction(func(tx *db.NodeTx) error {
diff --git a/lxd/cluster/notify.go b/lxd/cluster/notify.go
index 6c20424558..1b6b5dce59 100644
--- a/lxd/cluster/notify.go
+++ b/lxd/cluster/notify.go
@@ -31,12 +31,12 @@ const (
// NewNotifier builds a Notifier that can be used to notify other peers using
// the given policy.
func NewNotifier(state *state.State, cert *shared.CertInfo, policy NotifierPolicy) (Notifier, error) {
- address, err := node.HTTPSAddress(state.Node)
+ address, err := node.ClusterAddress(state.Node)
if err != nil {
return nil, errors.Wrap(err, "failed to fetch node address")
}

- // Fast-track the case where we're not networked at all.
+ // Fast-track the case where we're not clustered at all.
if address == "" {
nullNotifier := func(func(lxd.ContainerServer) error) error { return nil }
return nullNotifier, nil
diff --git a/lxd/cluster/notify_test.go b/lxd/cluster/notify_test.go
index 55514f943f..848820619e 100644
--- a/lxd/cluster/notify_test.go
+++ b/lxd/cluster/notify_test.go
@@ -36,7 +36,7 @@ func TestNewNotifier(t *testing.T) {
hook := func(client lxd.ContainerServer) error {
server, _, err := client.GetServer()
require.NoError(t, err)
- peers <- server.Config["core.https_address"].(string)
+ peers <- server.Config["cluster.https_address"].(string)
return nil
}
assert.NoError(t, notifier(hook))
@@ -106,7 +106,7 @@ type notifyFixtures struct {
// return a cleanup function.
//
// The address of the first node spawned will be saved as local
-// core.https_address.
+// cluster.https_address.
func (h *notifyFixtures) Nodes(cert *shared.CertInfo, n int) func() {
servers := make([]*httptest.Server, n)
for i := 0; i < n; i++ {
@@ -135,7 +135,7 @@ func (h *notifyFixtures) Nodes(cert *shared.CertInfo, n int) func() {
config, err := node.ConfigLoad(tx)
require.NoError(h.t, err)
address := servers[0].Listener.Addr().String()
- values := map[string]interface{}{"core.https_address": address}
+ values := map[string]interface{}{"cluster.https_address": address}
_, err = config.Patch(values)
require.NoError(h.t, err)
return nil
@@ -187,7 +187,7 @@ func newRestServer(cert *shared.CertInfo) *httptest.Server {

mux.HandleFunc("/1.0/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
- config := map[string]interface{}{"core.https_address": server.Listener.Addr().String()}
+ config := map[string]interface{}{"cluster.https_address": server.Listener.Addr().String()}
metadata := api.ServerPut{Config: config}
util.WriteJSON(w, api.ResponseRaw{Metadata: metadata}, false)
})
diff --git a/lxd/cluster/raft_test.go b/lxd/cluster/raft_test.go
index 223127da57..56cae84936 100644
--- a/lxd/cluster/raft_test.go
+++ b/lxd/cluster/raft_test.go
@@ -119,13 +119,13 @@ func newRaft(t *testing.T, db *db.Node, cert *shared.CertInfo) *cluster.RaftInst
return instance
}

-// Set the core.https_address config key to the given address, and insert the
+// Set the cluster.https_address config key to the given address, and insert the
// address into the raft_nodes table.
//
// This effectively makes the node act as a database raft node.
func setRaftRole(t *testing.T, database *db.Node, address string) *dqlite.DatabaseServerStore {
require.NoError(t, database.Transaction(func(tx *db.NodeTx) error {
- err := tx.UpdateConfig(map[string]string{"core.https_address": address})
+ err := tx.UpdateConfig(map[string]string{"cluster.https_address": address})
if err != nil {
return err
}

From e218e862e0d2f85ff133fc53be14bca60c4fc2da Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <***@canonical.com>
Date: Mon, 26 Nov 2018 15:12:16 +0100
Subject: [PATCH 06/10] Support for optional cluster TCP address in the
endpoints sub-package

Signed-off-by: Free Ekanayaka <***@canonical.com>
---
lxd/api.go | 13 +++
lxd/daemon.go | 7 ++
lxd/endpoints/cluster.go | 106 +++++++++++++++++++++++
lxd/endpoints/cluster_test.go | 39 +++++++++
lxd/endpoints/endpoints.go | 49 +++++++++--
lxd/endpoints/endpoints_exported_test.go | 7 ++
lxd/endpoints/endpoints_test.go | 11 +--
lxd/endpoints/network.go | 7 ++
8 files changed, 229 insertions(+), 10 deletions(-)
create mode 100644 lxd/endpoints/cluster.go
create mode 100644 lxd/endpoints/cluster_test.go

diff --git a/lxd/api.go b/lxd/api.go
index 40c540155c..59cda6e0ca 100644
--- a/lxd/api.go
+++ b/lxd/api.go
@@ -46,6 +46,19 @@ func RestServer(d *Daemon) *http.Server {
return &http.Server{Handler: &lxdHttpServer{r: mux, d: d}}
}

+// ClusterServer creates an http.Server capable of handling clustering-related
+// requests.
+func ClusterServer(d *Daemon) *http.Server {
+ mux := mux.NewRouter()
+ mux.StrictSlash(false)
+
+ for endpoint, f := range d.gateway.HandlerFuncs() {
+ mux.HandleFunc(endpoint, f)
+ }
+
+ return &http.Server{Handler: mux}
+}
+
type lxdHttpServer struct {
r *mux.Router
d *Daemon
diff --git a/lxd/daemon.go b/lxd/daemon.go
index f0a78e6e74..1f82c98c4f 100644
--- a/lxd/daemon.go
+++ b/lxd/daemon.go
@@ -537,6 +537,11 @@ func (d *Daemon) init() error {
return errors.Wrap(err, "Failed to fetch node address")
}

+ clusterAddress, err := node.ClusterAddress(d.db)
+ if err != nil {
+ return errors.Wrap(err, "Failed to fetch cluster address")
+ }
+
debugAddress, err := node.DebugAddress(d.db)
if err != nil {
return errors.Wrap(err, "Failed to fetch debug address")
@@ -548,9 +553,11 @@ func (d *Daemon) init() error {
UnixSocket: d.UnixSocket(),
Cert: certInfo,
RestServer: RestServer(d),
+ ClusterServer: ClusterServer(d),
DevLxdServer: DevLxdServer(d),
LocalUnixSocketGroup: d.config.Group,
NetworkAddress: address,
+ ClusterAddress: clusterAddress,
DebugAddress: debugAddress,
}
d.endpoints, err = endpoints.Up(config)
diff --git a/lxd/endpoints/cluster.go b/lxd/endpoints/cluster.go
new file mode 100644
index 0000000000..6e2392894a
--- /dev/null
+++ b/lxd/endpoints/cluster.go
@@ -0,0 +1,106 @@
+package endpoints
+
+import (
+ "fmt"
+ "net"
+ "time"
+
+ "github.com/lxc/lxd/lxd/util"
+ "github.com/lxc/lxd/shared"
+ "github.com/lxc/lxd/shared/logger"
+ "github.com/pkg/errors"
+)
+
+// ClusterAddress returns the cluster addresss of the cluster endpoint, or an
+// empty string if there's no cluster endpoint
+func (e *Endpoints) ClusterAddress() string {
+ e.mu.RLock()
+ defer e.mu.RUnlock()
+
+ listener := e.listeners[cluster]
+ if listener == nil {
+ return ""
+ }
+ return listener.Addr().String()
+}
+
+// ClusterUpdateAddress updates the address for the cluster endpoint, shutting
+// it down and restarting it.
+func (e *Endpoints) ClusterUpdateAddress(address string) error {
+ networkAddress := e.NetworkAddress()
+ if networkAddress == "" {
+ return fmt.Errorf("Cannot set cluster address without network address")
+ }
+
+ if address != "" {
+ address = util.CanonicalNetworkAddress(address)
+ }
+
+ oldAddress := e.ClusterAddress()
+ if address == oldAddress {
+ return nil
+ }
+
+ logger.Infof("Update cluster address")
+
+ e.mu.Lock()
+ defer e.mu.Unlock()
+
+ // Close the previous socket
+ e.closeListener(cluster)
+
+ // If turning off listening, we're done
+ if address == "" || address == networkAddress {
+ return nil
+ }
+
+ // Attempt to setup the new listening socket
+ getListener := func(address string) (*net.Listener, error) {
+ var err error
+ var listener net.Listener
+
+ for i := 0; i < 10; i++ { // Ten retries over a second seems reasonable.
+ listener, err = net.Listen("tcp", address)
+ if err == nil {
+ break
+ }
+
+ time.Sleep(100 * time.Millisecond)
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("cannot listen on https socket: %v", err)
+ }
+
+ return &listener, nil
+ }
+
+ // If setting a new address, setup the listener
+ if address != "" {
+ listener, err := getListener(address)
+ if err != nil {
+ // Attempt to revert to the previous address
+ listener, err1 := getListener(oldAddress)
+ if err1 == nil {
+ e.listeners[cluster] = networkTLSListener(*listener, e.cert)
+ e.serveHTTP(cluster)
+ }
+
+ return err
+ }
+
+ e.listeners[cluster] = networkTLSListener(*listener, e.cert)
+ e.serveHTTP(cluster)
+ }
+
+ return nil
+}
+
+func clusterCreateListener(address string, cert *shared.CertInfo) (net.Listener, error) {
+ listener, err := net.Listen("tcp", util.CanonicalNetworkAddress(address))
+ if err != nil {
+ return nil, errors.Wrap(err, "Listen to cluster address")
+ }
+
+ return networkTLSListener(listener, cert), nil
+}
diff --git a/lxd/endpoints/cluster_test.go b/lxd/endpoints/cluster_test.go
new file mode 100644
index 0000000000..04b8012431
--- /dev/null
+++ b/lxd/endpoints/cluster_test.go
@@ -0,0 +1,39 @@
+package endpoints_test
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+// If both a network and a cluster address are set, and they differ, a new
+// network TCP socket will be created.
+func TestEndpoints_ClusterCreateTCPSocket(t *testing.T) {
+ endpoints, config, cleanup := newEndpoints(t)
+ defer cleanup()
+
+ config.NetworkAddress = "127.0.0.1:12345"
+ config.ClusterAddress = "127.0.0.1:54321"
+ require.NoError(t, endpoints.Up(config))
+
+ assert.NoError(t, httpGetOverTLSSocket(endpoints.NetworkAddressAndCert()))
+ assert.NoError(t, httpGetOverTLSSocket(endpoints.ClusterAddressAndCert()))
+}
+
+// When the cluster address is updated, any previous cluster socket gets
+// closed.
+func TestEndpoints_ClusterUpdateAddress(t *testing.T) {
+ endpoints, config, cleanup := newEndpoints(t)
+ defer cleanup()
+
+ config.NetworkAddress = "127.0.0.1:12345"
+ config.ClusterAddress = "127.0.0.1:54321"
+ require.NoError(t, endpoints.Up(config))
+
+ // Use "localhost" instead of "127.0.0.1" just to make the address
+ // different and actually trigger an endpoint change.
+ require.NoError(t, endpoints.ClusterUpdateAddress("localhost:54321"))
+
+ assert.NoError(t, httpGetOverTLSSocket(endpoints.ClusterAddressAndCert()))
+}
diff --git a/lxd/endpoints/endpoints.go b/lxd/endpoints/endpoints.go
index 435af2550b..6c1c1df5e5 100644
--- a/lxd/endpoints/endpoints.go
+++ b/lxd/endpoints/endpoints.go
@@ -25,6 +25,10 @@ type Config struct {
// HTTP server handling requests for the LXD RESTful API.
RestServer *http.Server

+ // HTTP server handling clustering-related requests. Only used if
+ // ClusterAddress is set.
+ ClusterServer *http.Server
+
// HTTP server for the internal /dev/lxd API exposed to containers.
DevLxdServer *http.Server

@@ -47,6 +51,10 @@ type Config struct {
// It can be updated after the endpoints are up using UpdateNetworkAddress().
NetworkAddress string

+ // Optional dedicated network address for clustering traffic. If not
+ // set, NetworkAddress will be used.
+ ClusterAddress string
+
// DebugSetAddress sets the address for the pprof endpoint.
//
// It can be updated after the endpoints are up using UpdateDebugAddress().
@@ -91,21 +99,30 @@ type Config struct {
//
// The network endpoint socket will use TLS encryption, using the certificate
// keypair and CA passed via config.Cert.
+//
+// cluster endpoint (TCP socket with TLS)
+// -------------------------------------
+//
+// If a network address was set via config.ClusterAddress, then attach
+// config.ClusterServer to it.
func Up(config *Config) (*Endpoints, error) {
if config.Dir == "" {
- return nil, fmt.Errorf("no directory configured")
+ return nil, fmt.Errorf("No directory configured")
}
if config.UnixSocket == "" {
- return nil, fmt.Errorf("no unix socket configured")
+ return nil, fmt.Errorf("No unix socket configured")
}
if config.RestServer == nil {
- return nil, fmt.Errorf("no REST server configured")
+ return nil, fmt.Errorf("No REST server configured")
+ }
+ if config.ClusterServer == nil {
+ return nil, fmt.Errorf("No cluster server configured")
}
if config.DevLxdServer == nil {
- return nil, fmt.Errorf("no devlxd server configured")
+ return nil, fmt.Errorf("No devlxd server configured")
}
if config.Cert == nil {
- return nil, fmt.Errorf("no TLS certificate configured")
+ return nil, fmt.Errorf("No TLS certificate configured")
}

endpoints := &Endpoints{
@@ -147,6 +164,7 @@ func (e *Endpoints) up(config *Config) error {
local: config.RestServer,
network: config.RestServer,
pprof: pprofCreateServer(),
+ cluster: config.ClusterServer,
}
e.cert = config.Cert
e.inherited = map[kind]bool{}
@@ -185,6 +203,17 @@ func (e *Endpoints) up(config *Config) error {

// Errors here are not fatal and are just logged.
e.listeners[network] = networkCreateListener(config.NetworkAddress, e.cert)
+
+ if config.ClusterAddress != "" && config.ClusterAddress != config.NetworkAddress {
+ e.listeners[cluster], err = clusterCreateListener(config.ClusterAddress, e.cert)
+ if err != nil {
+ return err
+ }
+
+ logger.Infof("Starting cluster handler:")
+ e.serveHTTP(cluster)
+ }
+
}

if config.DebugAddress != "" {
@@ -225,6 +254,14 @@ func (e *Endpoints) Down() error {
}
}

+ if e.listeners[cluster] != nil {
+ logger.Infof("Stopping cluster handler:")
+ err := e.closeListener(cluster)
+ if err != nil {
+ return err
+ }
+ }
+
if e.listeners[devlxd] != nil {
logger.Infof("Stopping /dev/lxd handler:")
err := e.closeListener(devlxd)
@@ -322,6 +359,7 @@ const (
devlxd
network
pprof
+ cluster
)

// Human-readable descriptions of the various kinds of endpoints.
@@ -330,4 +368,5 @@ var descriptions = map[kind]string{
devlxd: "devlxd socket",
network: "TCP socket",
pprof: "pprof socket",
+ cluster: "cluster socket",
}
diff --git a/lxd/endpoints/endpoints_exported_test.go b/lxd/endpoints/endpoints_exported_test.go
index 82cc072d87..cc4802fee8 100644
--- a/lxd/endpoints/endpoints_exported_test.go
+++ b/lxd/endpoints/endpoints_exported_test.go
@@ -38,6 +38,13 @@ func (e *Endpoints) NetworkAddressAndCert() (string, *shared.CertInfo) {
return e.NetworkAddress(), e.cert
}

+// Return the cluster addresss and server certificate of the network
+// endpoint. This method is supposed to be used in conjunction with
+// the httpGetOverTLSSocket test helper.
+func (e *Endpoints) ClusterAddressAndCert() (string, *shared.CertInfo) {
+ return e.ClusterAddress(), e.cert
+}
+
// Set the file descriptor number marker that will be used when detecting
// socket activation. Needed because "go test" might open unrelated file
// descriptor starting at number 3.
diff --git a/lxd/endpoints/endpoints_test.go b/lxd/endpoints/endpoints_test.go
index a51f003edb..71577b5690 100644
--- a/lxd/endpoints/endpoints_test.go
+++ b/lxd/endpoints/endpoints_test.go
@@ -32,11 +32,12 @@ func newEndpoints(t *testing.T) (*endpoints.Endpoints, *endpoints.Config, func()
require.NoError(t, os.Mkdir(filepath.Join(dir, "devlxd"), 0755))

config := &endpoints.Config{
- Dir: dir,
- UnixSocket: filepath.Join(dir, "unix.socket"),
- RestServer: newServer(),
- DevLxdServer: newServer(),
- Cert: shared.TestingKeyPair(),
+ Dir: dir,
+ UnixSocket: filepath.Join(dir, "unix.socket"),
+ RestServer: newServer(),
+ ClusterServer: newServer(),
+ DevLxdServer: newServer(),
+ Cert: shared.TestingKeyPair(),
}
endpoints := endpoints.Unstarted()

diff --git a/lxd/endpoints/network.go b/lxd/endpoints/network.go
index 09abf72fb9..d262e266dd 100644
--- a/lxd/endpoints/network.go
+++ b/lxd/endpoints/network.go
@@ -133,6 +133,13 @@ func (e *Endpoints) NetworkUpdateCert(cert *shared.CertInfo) {
return
}
listener.(*networkListener).Config(cert)
+
+ // Update the cluster listener too, if enabled.
+ listener, ok = e.listeners[cluster]
+ if !ok {
+ return
+ }
+ listener.(*networkListener).Config(cert)
}

// Create a new net.Listener bound to the tcp socket of the network endpoint.

From 72237ca8b7ab7b3aadae22aa2d5fe4f98925359b Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <***@canonical.com>
Date: Mon, 26 Nov 2018 16:16:32 +0100
Subject: [PATCH 07/10] Make PUT /1.0 handle changes to cluster.https_address

Signed-off-by: Free Ekanayaka <***@canonical.com>
---
lxd/api_1.0.go | 57 +++++++++++++++++++++++++++++++++++++-------------
1 file changed, 43 insertions(+), 14 deletions(-)

diff --git a/lxd/api_1.0.go b/lxd/api_1.0.go
index eb0a47b69d..e38fe561de 100644
--- a/lxd/api_1.0.go
+++ b/lxd/api_1.0.go
@@ -1,6 +1,7 @@
package main

import (
+ "fmt"
"net/http"
"os"

@@ -304,6 +305,15 @@ func doApi10Update(d *Daemon, req api.ServerPut, patch bool) Response {
if err != nil {
return errors.Wrap(err, "Failed to load node config")
}
+
+ // We currently don't allow changing the cluster.https_address
+ // once it's set.
+ curClusterAddress := newNodeConfig.ClusterAddress()
+ newClusterAddress, ok := nodeValues["cluster.https_address"]
+ if ok && curClusterAddress != "" && newClusterAddress != curClusterAddress {
+ return fmt.Errorf("Changing cluster.https_address is currently not supported")
+ }
+
if patch {
nodeChanged, err = newNodeConfig.Patch(nodeValues)
} else {
@@ -410,22 +420,41 @@ func doApi10UpdateTriggers(d *Daemon, nodeChanged, clusterChanged map[string]str
}
}
}
- for key, value := range nodeChanged {
- switch key {
- case "maas.machine":
- maasChanged = true
- case "core.https_address":
- err := d.endpoints.NetworkUpdateAddress(value)
- if err != nil {
- return err
- }
- case "core.debug_address":
- err := d.endpoints.PprofUpdateAddress(value)
- if err != nil {
- return err
- }
+
+ // Look for changed values. We do it sequentially because some keys are
+ // correlated with others, and need to be processed first (for example
+ // core.https_address need to be processed before
+ // cluster.https_address).
+
+ value, ok := nodeChanged["maas.machine"]
+ if ok {
+ maasChanged = true
+ }
+
+ value, ok = nodeChanged["core.https_address"]
+ if ok {
+ err := d.endpoints.NetworkUpdateAddress(value)
+ if err != nil {
+ return err
+ }
+ }
+
+ value, ok = nodeChanged["cluster.https_address"]
+ if ok {
+ err := d.endpoints.ClusterUpdateAddress(value)
+ if err != nil {
+ return err
}
}
+
+ value, ok = nodeChanged["core.debug_address"]
+ if ok {
+ err := d.endpoints.PprofUpdateAddress(value)
+ if err != nil {
+ return err
+ }
+ }
+
if maasChanged {
url, key := clusterConfig.MAASController()
machine := nodeConfig.MAASMachine()

From e2f7a3a96303f4b394281c12b34df39af772e64e Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <***@canonical.com>
Date: Wed, 28 Nov 2018 10:40:47 +0100
Subject: [PATCH 08/10] Update PUT /1.0/cluster to honor cluster.https_address
if provided

Signed-off-by: Free Ekanayaka <***@canonical.com>
---
lxd/api_cluster.go | 81 +++++++++++++++++++++---
lxd/api_cluster_test.go | 132 ++++++++++++++++++++++++++++++++++------
2 files changed, 184 insertions(+), 29 deletions(-)

diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index 475367f268..09a021e8c3 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -207,6 +207,29 @@ func clusterPutBootstrap(d *Daemon, req api.ClusterPut) Response {
resources := map[string][]string{}
resources["cluster"] = []string{}

+ // If there's no cluster.https_address set, but core.https_address is,
+ // let's default to it, even if the user didn't set it explicitely with
+ // a PUT /1.0 request.
+ d.db.Transaction(func(tx *db.NodeTx) error {
+ config, err := node.ConfigLoad(tx)
+
+ clusterAddress := config.ClusterAddress()
+ if clusterAddress != "" {
+ return nil
+ }
+
+ address := config.HTTPSAddress()
+
+ _, err = config.Patch(map[string]interface{}{
+ "cluster.https_address": address,
+ })
+ if err != nil {
+ return errors.Wrap(err, "Copy core.https_address to cluster.https_address")
+ }
+
+ return nil
+ })
+
op, err := operationCreate(d.cluster, "", operationClassTask, db.OperationClusterBootstrap, resources, nil, run, nil, nil)
if err != nil {
return InternalError(err)
@@ -224,6 +247,14 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) Response {
return BadRequest(fmt.Errorf("No target cluster node certificate provided"))
}

+ clusterAddress, err := node.ClusterAddress(d.db)
+ if err != nil {
+ return SmartError(err)
+ }
+ if clusterAddress != "" {
+ return BadRequest(fmt.Errorf("The cluster.https_address config key is set on this node"))
+ }
+
address, err := node.HTTPSAddress(d.db)
if err != nil {
return SmartError(err)
@@ -235,7 +266,17 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) Response {
}

// The user has provided a server address, and no networking
- // was setup on this node, let's do the job and open the port.
+ // was setup on this node, let's do the job and open the
+ // port. We'll use the same address both for the REST API and
+ // for clustering.
+
+ // First try to listen to the provided address. If we fail, we
+ // won't actually update the database config.
+ err = d.endpoints.NetworkUpdateAddress(req.ServerAddress)
+ if err != nil {
+ return SmartError(err)
+ }
+
err := d.db.Transaction(func(tx *db.NodeTx) error {
config, err := node.ConfigLoad(tx)
if err != nil {
@@ -243,7 +284,8 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) Response {
}

_, err = config.Patch(map[string]interface{}{
- "core.https_address": req.ServerAddress,
+ "core.https_address": req.ServerAddress,
+ "cluster.https_address": req.ServerAddress,
})
return err
})
@@ -251,15 +293,34 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) Response {
return SmartError(err)
}

- err = d.endpoints.NetworkUpdateAddress(req.ServerAddress)
- if err != nil {
- return SmartError(err)
- }
-
address = req.ServerAddress
} else {
- if req.ServerAddress != "" && req.ServerAddress != address {
- return BadRequest(fmt.Errorf("A different core.https_address is already set on this node"))
+ if req.ServerAddress != "" {
+ // The user has previously set core.https_address and
+ // is now providing a cluster address as well. If they
+ // differ we need to listen to it.
+ if req.ServerAddress != address {
+ err := d.endpoints.ClusterUpdateAddress(req.ServerAddress)
+ if err != nil {
+ return SmartError(err)
+ }
+ address = req.ServerAddress
+ }
+ }
+
+ // Update the cluster.https_address config key.
+ err := d.db.Transaction(func(tx *db.NodeTx) error {
+ config, err := node.ConfigLoad(tx)
+ if err != nil {
+ return errors.Wrap(err, "Failed to load cluster config")
+ }
+ _, err = config.Patch(map[string]interface{}{
+ "cluster.https_address": address,
+ })
+ return err
+ })
+ if err != nil {
+ return SmartError(err)
}
}

@@ -840,7 +901,7 @@ func internalClusterPostAccept(d *Daemon, r *http.Request) Response {

// Redirect all requests to the leader, which is the one with
// knowning what nodes are part of the raft cluster.
- address, err := node.HTTPSAddress(d.db)
+ address, err := node.ClusterAddress(d.db)
if err != nil {
return SmartError(err)
}
diff --git a/lxd/api_cluster_test.go b/lxd/api_cluster_test.go
index c284ce3509..b3147717e2 100644
--- a/lxd/api_cluster_test.go
+++ b/lxd/api_cluster_test.go
@@ -23,8 +23,11 @@ func TestCluster_Bootstrap(t *testing.T) {
daemon, cleanup := newDaemon(t)
defer cleanup()

+ // Simulate what happens when running "lxd init", where a PUT /1.0
+ // request is issued to set both core.https_address and
+ // cluster.https_address to the same value.
f := clusterFixture{t: t}
- f.EnableNetworking(daemon, "")
+ f.EnableNetworkingWithClusterAddress(daemon, "")

client := f.ClientUnix(daemon)

@@ -94,8 +97,15 @@ func TestCluster_Join(t *testing.T) {
f := clusterFixture{t: t}
passwords := []string{"sekret", ""}

+ // Enable networking on all daemons. The bootstrap daemon will also
+ // have it's cluster.https_address set to the same value as
+ // core.https_address, so it can be bootstrapped.
for i, daemon := range daemons {
- f.EnableNetworking(daemon, passwords[i])
+ if i == 0 {
+ f.EnableNetworkingWithClusterAddress(daemon, passwords[i])
+ } else {
+ f.EnableNetworking(daemon, passwords[i])
+ }
}

// Bootstrap the cluster using the first node.
@@ -188,7 +198,7 @@ func TestCluster_JoinServerAddress(t *testing.T) {

f := clusterFixture{t: t}
password := "sekret"
- f.EnableNetworking(daemons[0], password)
+ f.EnableNetworkingWithClusterAddress(daemons[0], password)

// Bootstrap the cluster using the first node.
client := f.ClientUnix(daemons[0])
@@ -276,18 +286,69 @@ func TestCluster_JoinServerAddress(t *testing.T) {
assert.Equal(t, "buzz", node.ServerName)
}

-// If an LXD node is already for networking and the user asks to configure a
-// different server address, the request fails.
-func TestCluster_JoinServerAddressMismatch(t *testing.T) {
+// If the joining LXD node is already configured for networking, the user can
+// requests to use a different port for clustering, using the ServerAddress
+// key.
+func TestCluster_JoinDifferentServerAddress(t *testing.T) {
daemons, cleanup := newDaemons(t, 2)
defer cleanup()

f := clusterFixture{t: t}
- passwords := []string{"sekret", ""}
+ password := "sekret"
+ f.EnableNetworkingWithClusterAddress(daemons[0], password)

- for i, daemon := range daemons {
- f.EnableNetworking(daemon, passwords[i])
+ // Bootstrap the cluster using the first node.
+ client := f.ClientUnix(daemons[0])
+ cluster := api.ClusterPut{}
+ cluster.ServerName = "buzz"
+ cluster.Enabled = true
+ op, err := client.UpdateCluster(cluster, "")
+ require.NoError(t, err)
+ require.NoError(t, op.Wait())
+
+ // Configure the second node for networking.
+ f.EnableNetworking(daemons[1], "")
+
+ // Make the second node join the cluster, specifying a dedicated
+ // cluster addres.
+ port, err := shared.AllocatePort()
+ require.NoError(f.t, err)
+ serverAddress := fmt.Sprintf("127.0.0.1:%d", port)
+
+ f.RegisterCertificate(daemons[1], daemons[0], "rusp", "sekret")
+ address := daemons[0].endpoints.NetworkAddress()
+ cert := string(daemons[0].endpoints.NetworkPublicKey())
+ client = f.ClientUnix(daemons[1])
+ cluster = api.ClusterPut{
+ ClusterAddress: address,
+ ClusterCertificate: cert,
+ ServerAddress: serverAddress,
}
+ cluster.ServerName = "rusp"
+ cluster.Enabled = true
+ op, err = client.UpdateCluster(cluster, "")
+ require.NoError(t, err)
+ require.NoError(t, op.Wait())
+
+ // The GetClusterMembers client method returns both nodes.
+ nodes, err := client.GetClusterMembers()
+ require.NoError(t, err)
+ assert.Len(t, nodes, 2)
+ assert.Equal(t, "buzz", nodes[0].ServerName)
+ assert.Equal(t, "rusp", nodes[1].ServerName)
+ assert.Equal(t, "Online", nodes[0].Status)
+ assert.Equal(t, "Online", nodes[1].Status)
+}
+
+// If an LXD node is already for networking and the user asks to configure a
+// the same address as cluster address, the request still succeeds.
+func TestCluster_JoinSameServerAddress(t *testing.T) {
+ daemons, cleanup := newDaemons(t, 2)
+ defer cleanup()
+
+ f := clusterFixture{t: t}
+ f.EnableNetworkingWithClusterAddress(daemons[0], "sekret")
+ f.EnableNetworking(daemons[1], "")

// Bootstrap the cluster using the first node.
client := f.ClientUnix(daemons[0])
@@ -306,12 +367,12 @@ func TestCluster_JoinServerAddressMismatch(t *testing.T) {
cluster = api.ClusterPut{
ClusterAddress: address,
ClusterCertificate: cert,
- ServerAddress: "1.2.3.4",
+ ServerAddress: daemons[1].endpoints.NetworkAddress(),
}
cluster.ServerName = "rusp"
cluster.Enabled = true
_, err = client.UpdateCluster(cluster, "")
- assert.EqualError(t, err, "A different core.https_address is already set on this node")
+ assert.NoError(t, err)
}

// If the joining node hasn't added its certificate as trusted client
@@ -321,11 +382,8 @@ func TestCluster_JoinUnauthorized(t *testing.T) {
defer cleanup()

f := clusterFixture{t: t}
- passwords := []string{"sekret", ""}
-
- for i, daemon := range daemons {
- f.EnableNetworking(daemon, passwords[i])
- }
+ f.EnableNetworkingWithClusterAddress(daemons[0], "sekret")
+ f.EnableNetworking(daemons[1], "")

// Bootstrap the cluster using the first node.
client := f.ClientUnix(daemons[0])
@@ -534,11 +592,11 @@ type clusterFixture struct {
func (f *clusterFixture) FormCluster(daemons []*Daemon) {
f.daemons = daemons
for i, daemon := range daemons {
- password := ""
if i == 0 {
- password = "sekret"
+ f.EnableNetworkingWithClusterAddress(daemon, "sekret")
+ } else {
+ f.EnableNetworking(daemon, "")
}
- f.EnableNetworking(daemon, password)
}

// Bootstrap the cluster using the first node.
@@ -587,6 +645,42 @@ func (f *clusterFixture) EnableNetworking(daemon *Daemon, password string) {
require.NoError(f.t, client.UpdateServer(serverPut, ""))
}

+// Enable networking in the given daemon, and set cluster.https_address to the
+// same value as core.https address. The password is optional and can be an
+// empty string.
+func (f *clusterFixture) EnableNetworkingWithClusterAddress(daemon *Daemon, password string) {
+ port, err := shared.AllocatePort()
+ require.NoError(f.t, err)
+
+ address := fmt.Sprintf("127.0.0.1:%d", port)
+
+ client := f.ClientUnix(daemon)
+ server, _, err := client.GetServer()
+ require.NoError(f.t, err)
+ serverPut := server.Writable()
+ serverPut.Config["core.https_address"] = address
+ serverPut.Config["core.trust_password"] = password
+ serverPut.Config["cluster.https_address"] = address
+
+ require.NoError(f.t, client.UpdateServer(serverPut, ""))
+}
+
+// Enable a dedicated cluster address in the given daemon.
+func (f *clusterFixture) EnableClusterAddress(daemon *Daemon) {
+ port, err := shared.AllocatePort()
+ require.NoError(f.t, err)
+
+ address := fmt.Sprintf("127.0.0.1:%d", port)
+
+ client := f.ClientUnix(daemon)
+ server, _, err := client.GetServer()
+ require.NoError(f.t, err)
+ serverPut := server.Writable()
+ serverPut.Config["cluster.https_address"] = address
+
+ require.NoError(f.t, client.UpdateServer(serverPut, ""))
+}
+
// Register daemon1's server certificate as daemon2's trusted certificate,
// using password authentication.
func (f *clusterFixture) RegisterCertificate(daemon1, daemon2 *Daemon, name, password string) {

From 18c57e15176e76e8bff476d42a70836bec7f7a15 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <***@canonical.com>
Date: Wed, 28 Nov 2018 10:41:25 +0100
Subject: [PATCH 09/10] Change "lxd init" to also set cluster.https_address
when bootstrapping a cluster

Signed-off-by: Free Ekanayaka <***@canonical.com>
---
lxd/main_init.go | 8 ++++++++
1 file changed, 8 insertions(+)

diff --git a/lxd/main_init.go b/lxd/main_init.go
index 7a855b1fd5..de793b6254 100644
--- a/lxd/main_init.go
+++ b/lxd/main_init.go
@@ -124,6 +124,14 @@ func (c *cmdInit) Run(cmd *cobra.Command, args []string) error {
}
}

+ // If clustering is enabled, and no cluster.https_address network address
+ // was specified, we fallback to core.https_address.
+ if config.Cluster != nil &&
+ config.Node.Config["core.https_address"] != nil &&
+ config.Node.Config["cluster.https_address"] == nil {
+ config.Node.Config["cluster.https_address"] = config.Node.Config["core.https_address"]
+ }
+
// Detect if the user has chosen to join a cluster using the new
// cluster join API format, and use the dedicated API if so.
if config.Cluster != nil && config.Cluster.ClusterAddress != "" && config.Cluster.ServerAddress != "" {

From 952be9814c83b5f3deaf473ffca0547228649982 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <***@canonical.com>
Date: Wed, 28 Nov 2018 13:32:12 +0100
Subject: [PATCH 10/10] Add integration tests exercising custom clustering
address

Signed-off-by: Free Ekanayaka <***@canonical.com>
---
test/includes/clustering.sh | 13 ++++++++++-
test/main.sh | 1 +
test/suites/clustering.sh | 45 +++++++++++++++++++++++++++++++++++++
3 files changed, 58 insertions(+), 1 deletion(-)

diff --git a/test/includes/clustering.sh b/test/includes/clustering.sh
index 8da0034503..7b50d78e50 100644
--- a/test/includes/clustering.sh
+++ b/test/includes/clustering.sh
@@ -138,9 +138,13 @@ spawn_lxd_and_bootstrap_cluster() {
bridge="${2}"
LXD_DIR="${3}"
driver="dir"
- if [ "$#" -eq "4" ]; then
+ port=""
+ if [ "$#" -ge "4" ]; then
driver="${4}"
fi
+ if [ "$#" -ge "5" ]; then
+ port="${5}"
+ fi

echo "==> Spawn bootstrap cluster node in ${ns} with storage driver ${driver}"

@@ -153,6 +157,13 @@ spawn_lxd_and_bootstrap_cluster() {
config:
core.trust_password: sekret
core.https_address: 10.1.1.101:8443
+EOF
+ if [ "${port}" != "" ]; then
+ cat >> "${LXD_DIR}/preseed.yaml" <<EOF
+ cluster.https_address: 10.1.1.101:${port}
+EOF
+ fi
+ cat >> "${LXD_DIR}/preseed.yaml" <<EOF
images.auto_update_interval: 0
storage_pools:
- name: data
diff --git a/test/main.sh b/test/main.sh
index 1b160aa697..5ba756bf72 100755
--- a/test/main.sh
+++ b/test/main.sh
@@ -229,6 +229,7 @@ run_test test_clustering_profiles "clustering profiles"
run_test test_clustering_join_api "clustering join api"
run_test test_clustering_shutdown_nodes "clustering shutdown"
run_test test_clustering_projects "clustering projects"
+run_test test_clustering_address "clustering address"
#run_test test_clustering_upgrade "clustering upgrade"

# shellcheck disable=SC2034
diff --git a/test/suites/clustering.sh b/test/suites/clustering.sh
index 64ead072aa..e57c40bd56 100644
--- a/test/suites/clustering.sh
+++ b/test/suites/clustering.sh
@@ -1052,3 +1052,48 @@ test_clustering_projects() {
kill_lxd "${LXD_ONE_DIR}"
kill_lxd "${LXD_TWO_DIR}"
}
+
+test_clustering_address() {
+ setup_clustering_bridge
+ prefix="lxd$$"
+ bridge="${prefix}"
+
+ setup_clustering_netns 1
+ LXD_ONE_DIR=$(mktemp -d -p "${TEST_DIR}" XXX)
+ chmod +x "${LXD_ONE_DIR}"
+ ns1="${prefix}1"
+
+ # Bootstrap the first node using a custom cluster port
+ spawn_lxd_and_bootstrap_cluster "${ns1}" "${bridge}" "${LXD_ONE_DIR}" "dir" "8444"
+
+ # The bootstrap node appears in the list with its regular port
+ LXD_DIR="${LXD_ONE_DIR}" lxc cluster list | grep -q 8443
+
+ # The bootstrap node has opened as separate port for clustering traffic
+ address=$(LXD_DIR="${LXD_ONE_DIR}" lxc cluster show node1 | grep ^url: | cut -f 2 -d " " | sed "s/8443/8444/g")
+ curl -k -s --cert "${LXD_ONE_DIR}/cluster.crt" --key "${LXD_ONE_DIR}/cluster.key" -X HEAD "${address}/internal/database"
+
+ # Add a newline at the end of each line. YAML as weird rules..
+ cert=$(sed ':a;N;$!ba;s/\n/\n\n/g' "${LXD_ONE_DIR}/server.crt")
+
+ # Spawn a second node using a custom cluster port
+ setup_clustering_netns 2
+ LXD_TWO_DIR=$(mktemp -d -p "${TEST_DIR}" XXX)
+ chmod +x "${LXD_TWO_DIR}"
+ ns2="${prefix}2"
+ spawn_lxd_and_join_cluster "${ns2}" "${bridge}" "${cert}" 2 1 "${LXD_TWO_DIR}"
+
+ LXD_DIR="${LXD_ONE_DIR}" lxc cluster list
+
+ LXD_DIR="${LXD_TWO_DIR}" lxd shutdown
+ LXD_DIR="${LXD_ONE_DIR}" lxd shutdown
+ sleep 2
+ rm -f "${LXD_TWO_DIR}/unix.socket"
+ rm -f "${LXD_ONE_DIR}/unix.socket"
+
+ teardown_clustering_netns
+ teardown_clustering_bridge
+
+ kill_lxd "${LXD_ONE_DIR}"
+ kill_lxd "${LXD_TWO_DIR}"
+}

Loading...