Discussion:
[lxc-devel] [lxd/master] client: convert EventListener to use api.Event
smibarber on Github
2018-12-08 06:27:19 UTC
Permalink
From 6f65bf1a67dc9a6cfd508a14d94ddfe3c8035571 Mon Sep 17 00:00:00 2001
From: Stephen Barber <***@chromium.org>
Date: Fri, 7 Dec 2018 18:39:48 -0800
Subject: [PATCH] client: convert EventListener to use api.Event

Improve EventListener's AddHandler function to use an api.Event
struct rather than an interface{}.

There's an asymmetry between the AddHandler functions for
Operation and EventListener, as the former provides an
api.Operation struct to its handler function, but the latter
gives just an interface{} that must be either be teased apart
with type assertions or marshaled back into JSON and
unmarshaled into the appropriate type.

Signed-off-by: Stephen Barber <***@chromium.org>
---
client/events.go | 6 ++++--
client/lxd_events.go | 13 ++++++------
client/operations.go | 47 ++++++++++---------------------------------
lxc/monitor.go | 17 ++--------------
lxd/cluster/events.go | 3 ++-
5 files changed, 25 insertions(+), 61 deletions(-)

diff --git a/client/events.go b/client/events.go
index d73dcc99db..b0361a5495 100644
--- a/client/events.go
+++ b/client/events.go
@@ -3,6 +3,8 @@ package lxd
import (
"fmt"
"sync"
+
+ "github.com/lxc/lxd/shared/api"
)

// The EventListener struct is used to interact with a LXD event stream
@@ -18,12 +20,12 @@ type EventListener struct {

// The EventTarget struct is returned to the caller of AddHandler and used in RemoveHandler
type EventTarget struct {
- function func(interface{})
+ function func(api.Event)
types []string
}

// AddHandler adds a function to be called whenever an event is received
-func (e *EventListener) AddHandler(types []string, function func(interface{})) (*EventTarget, error) {
+func (e *EventListener) AddHandler(types []string, function func(api.Event)) (*EventTarget, error) {
if function == nil {
return nil, fmt.Errorf("A valid function must be provided")
}
diff --git a/client/lxd_events.go b/client/lxd_events.go
index 15fc35cb6f..034ed7a4b9 100644
--- a/client/lxd_events.go
+++ b/client/lxd_events.go
@@ -6,6 +6,7 @@ import (
"time"

"github.com/lxc/lxd/shared"
+ "github.com/lxc/lxd/shared/api"
)

// Event handling functions
@@ -90,29 +91,27 @@ func (r *ProtocolLXD) GetEvents() (*EventListener, error) {
}

// Attempt to unpack the message
- message := make(map[string]interface{})
- err = json.Unmarshal(data, &message)
+ event := api.Event{}
+ err = json.Unmarshal(data, &event)
if err != nil {
continue
}

// Extract the message type
- _, ok := message["type"]
- if !ok {
+ if event.Type == "" {
continue
}
- messageType := message["type"].(string)

// Send the message to all handlers
r.eventListenersLock.Lock()
for _, listener := range r.eventListeners {
listener.targetsLock.Lock()
for _, target := range listener.targets {
- if target.types != nil && !shared.StringInSlice(messageType, target.types) {
+ if target.types != nil && !shared.StringInSlice(event.Type, target.types) {
continue
}

- go target.function(message)
+ go target.function(event)
}
listener.targetsLock.Unlock()
}
diff --git a/client/operations.go b/client/operations.go
index 63c6d227f9..99ef378043 100644
--- a/client/operations.go
+++ b/client/operations.go
@@ -40,13 +40,14 @@ func (op *operation) AddHandler(function func(api.Operation)) (*EventTarget, err
}

// Wrap the function to filter unwanted messages
- wrapped := func(data interface{}) {
- newOp := op.extractOperation(data)
- if newOp == nil {
+ wrapped := func(event api.Event) {
+ newOp := api.Operation{}
+ err := json.Unmarshal(event.Metadata, &newOp)
+ if err != nil || newOp.ID != op.ID {
return
}

- function(*newOp)
+ function(newOp)
}

return op.listener.AddHandler([]string{"operation"}, wrapped)
@@ -145,7 +146,7 @@ func (op *operation) setupListener() error {

// Setup the handler
chReady := make(chan bool)
- _, err := op.listener.AddHandler([]string{"operation"}, func(data interface{}) {
+ _, err := op.listener.AddHandler([]string{"operation"}, func(event api.Event) {
<-chReady

// We don't want concurrency while processing events
@@ -158,13 +159,14 @@ func (op *operation) setupListener() error {
}

// Get an operation struct out of this data
- newOp := op.extractOperation(data)
- if newOp == nil {
+ newOp := api.Operation{}
+ err := json.Unmarshal(event.Metadata, &newOp)
+ if err != nil || newOp.ID != op.ID {
return
}

// Update the struct
- op.Operation = *newOp
+ op.Operation = newOp

// And check if we're done
if op.StatusCode.IsFinal() {
@@ -243,33 +245,6 @@ func (op *operation) setupListener() error {
return nil
}

-func (op *operation) extractOperation(data interface{}) *api.Operation {
- // Extract the metadata
- meta, ok := data.(map[string]interface{})["metadata"]
- if !ok {
- return nil
- }
-
- // And attempt to decode it as JSON operation data
- encoded, err := json.Marshal(meta)
- if err != nil {
- return nil
- }
-
- newOp := api.Operation{}
- err = json.Unmarshal(encoded, &newOp)
- if err != nil {
- return nil
- }
-
- // And now check that it's what we want
- if newOp.ID != op.ID {
- return nil
- }
-
- return &newOp
-}
-
// The remoteOperation type represents an ongoing LXD operation between two servers
type remoteOperation struct {
targetOp Operation
@@ -295,7 +270,7 @@ func (op *remoteOperation) AddHandler(function func(api.Operation)) (*EventTarge
} else {
// Generate a mock EventTarget
target = &EventTarget{
- function: func(interface{}) { function(api.Operation{}) },
+ function: func(api.Event) { function(api.Operation{}) },
types: []string{"operation"},
}
}
diff --git a/lxc/monitor.go b/lxc/monitor.go
index a6aba202d8..cd9b5684ef 100644
--- a/lxc/monitor.go
+++ b/lxc/monitor.go
@@ -93,22 +93,9 @@ func (c *cmdMonitor) Run(cmd *cobra.Command, args []string) error {
}
}

- handler := func(message interface{}) {
+ handler := func(event api.Event) {
// Special handling for logging only output
if c.flagPretty && len(c.flagType) == 1 && shared.StringInSlice("logging", c.flagType) {
- render, err := json.Marshal(&message)
- if err != nil {
- fmt.Printf("error: %s\n", err)
- os.Exit(1)
- }
-
- event := api.Event{}
- err = json.Unmarshal(render, &event)
- if err != nil {
- fmt.Printf("error: %s\n", err)
- os.Exit(1)
- }
-
logEntry := api.EventLogging{}
err = json.Unmarshal(event.Metadata, &logEntry)
if err != nil {
@@ -144,7 +131,7 @@ func (c *cmdMonitor) Run(cmd *cobra.Command, args []string) error {
return
}

- render, err := yaml.Marshal(&message)
+ render, err := yaml.Marshal(&event)
if err != nil {
fmt.Printf("error: %s\n", err)
os.Exit(1)
diff --git a/lxd/cluster/events.go b/lxd/cluster/events.go
index 93cb77a91f..5201423bfc 100644
--- a/lxd/cluster/events.go
+++ b/lxd/cluster/events.go
@@ -8,6 +8,7 @@ import (
"github.com/lxc/lxd/lxd/endpoints"
"github.com/lxc/lxd/lxd/task"
"github.com/lxc/lxd/shared"
+ "github.com/lxc/lxd/shared/api"
"github.com/lxc/lxd/shared/logger"
"golang.org/x/net/context"
)
@@ -99,7 +100,7 @@ func eventsUpdateListeners(endpoints *endpoints.Endpoints, cluster *db.Cluster,
continue
}
logger.Debugf("Listening for events on node %s", node.Address)
- listener.AddHandler(nil, func(event interface{}) { f(node.ID, event) })
+ listener.AddHandler(nil, func(event api.Event) { f(node.ID, event) })
listeners[node.ID] = listener
}
for id, listener := range listeners {

Loading...