forked from FeatureBaseDB/featurebase
-
Notifications
You must be signed in to change notification settings - Fork 0
/
broadcast.go
176 lines (150 loc) · 5.2 KB
/
broadcast.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
// Copyright 2017 Pilosa Corp.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pilosa
import (
"fmt"
"reflect"
"github.com/gogo/protobuf/proto"
"github.com/pilosa/pilosa/internal"
)
// NodeSet represents an interface for Node membership and inter-node communication.
type NodeSet interface {
// Returns a list of all Nodes in the cluster
Nodes() []*Node
// Open starts any network activity implemented by the NodeSet
Open() error
}
// StaticNodeSet represents a basic NodeSet for testing.
type StaticNodeSet struct {
nodes []*Node
}
// NewStaticNodeSet creates a statically defined NodeSet.
func NewStaticNodeSet() *StaticNodeSet {
return &StaticNodeSet{}
}
// Nodes implements the NodeSet interface and returns a list of nodes in the cluster.
func (s *StaticNodeSet) Nodes() []*Node {
return s.nodes
}
// Open implements the NodeSet interface to start network activity, but for a static NodeSet it does nothing.
func (s *StaticNodeSet) Open() error {
return nil
}
// Join sets the NodeSet nodes to the slice of Nodes passed in.
func (s *StaticNodeSet) Join(nodes []*Node) error {
s.nodes = nodes
return nil
}
// Broadcaster is an interface for broadcasting messages.
type Broadcaster interface {
SendSync(pb proto.Message) error
SendAsync(pb proto.Message) error
}
func init() {
NopBroadcaster = &nopBroadcaster{}
}
// NopBroadcaster represents a Broadcaster that doesn't do anything.
var NopBroadcaster Broadcaster
type nopBroadcaster struct{}
// SendSync A no-op implemenetation of Broadcaster SendSync method.
func (c *nopBroadcaster) SendSync(pb proto.Message) error {
return nil
}
// SendAsync A no-op implemenetation of Broadcaster SendAsync method.
func (c *nopBroadcaster) SendAsync(pb proto.Message) error {
return nil
}
// BroadcastHandler is the interface for the pilosa object which knows how to
// handle broadcast messages. (Hint: this is implemented by pilosa.Server)
type BroadcastHandler interface {
ReceiveMessage(pb proto.Message) error
}
// BroadcastReceiver is the interface for the object which will listen for and
// decode broadcast messages before passing them to pilosa to handle. The
// implementation of this could be an http server which listens for messages,
// gets the protobuf payload, and then passes it to
// BroadcastHandler.ReceiveMessage.
type BroadcastReceiver interface {
// Start starts listening for broadcast messages - it should return
// immediately, spawning a goroutine if necessary.
Start(BroadcastHandler) error
}
type nopBroadcastReceiver struct{}
func (n *nopBroadcastReceiver) Start(b BroadcastHandler) error { return nil }
// NopBroadcastReceiver is a no-op implementation of the BroadcastReceiver.
var NopBroadcastReceiver = &nopBroadcastReceiver{}
// Broadcast message types.
const (
MessageTypeCreateSlice = 1
MessageTypeCreateIndex = 2
MessageTypeDeleteIndex = 3
MessageTypeCreateFrame = 4
MessageTypeDeleteFrame = 5
MessageTypeCreateInputDefinition = 6
MessageTypeDeleteInputDefinition = 7
)
// MarshalMessage encodes the protobuf message into a byte slice.
func MarshalMessage(m proto.Message) ([]byte, error) {
var typ uint8
switch obj := m.(type) {
case *internal.CreateSliceMessage:
typ = MessageTypeCreateSlice
case *internal.CreateIndexMessage:
typ = MessageTypeCreateIndex
case *internal.DeleteIndexMessage:
typ = MessageTypeDeleteIndex
case *internal.CreateFrameMessage:
typ = MessageTypeCreateFrame
case *internal.DeleteFrameMessage:
typ = MessageTypeDeleteFrame
case *internal.CreateInputDefinitionMessage:
typ = MessageTypeCreateInputDefinition
case *internal.DeleteInputDefinitionMessage:
typ = MessageTypeDeleteInputDefinition
default:
return nil, fmt.Errorf("message type not implemented for marshalling: %s", reflect.TypeOf(obj))
}
buf, err := proto.Marshal(m)
if err != nil {
return nil, err
}
return append([]byte{typ}, buf...), nil
}
// UnmarshalMessage decodes the byte slice into a protobuf message.
func UnmarshalMessage(buf []byte) (proto.Message, error) {
typ, buf := buf[0], buf[1:]
var m proto.Message
switch typ {
case MessageTypeCreateSlice:
m = &internal.CreateSliceMessage{}
case MessageTypeCreateIndex:
m = &internal.CreateIndexMessage{}
case MessageTypeDeleteIndex:
m = &internal.DeleteIndexMessage{}
case MessageTypeCreateFrame:
m = &internal.CreateFrameMessage{}
case MessageTypeDeleteFrame:
m = &internal.DeleteFrameMessage{}
case MessageTypeCreateInputDefinition:
m = &internal.CreateInputDefinitionMessage{}
case MessageTypeDeleteInputDefinition:
m = &internal.DeleteInputDefinitionMessage{}
default:
return nil, fmt.Errorf("invalid message type: %d", typ)
}
if err := proto.Unmarshal(buf, m); err != nil {
return nil, err
}
return m, nil
}