-
Notifications
You must be signed in to change notification settings - Fork 7
/
invoke.go
136 lines (110 loc) · 3.96 KB
/
invoke.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// Package invoke provides Lambda sync and async invocation helpers.
//
// The Sync() and Async() varients utilize the default Lambda client,
// while the InvokeSync() and InvokeAsync() variants may be passed
// a client in order to specify the region etc.
//
// All functions invoke DefaultAlias ("current").
package invoke
import (
"encoding/json"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/lambda"
"github.com/pkg/errors"
)
// DefaultClient is the default Lambda client.
var DefaultClient = lambda.New(session.New(aws.NewConfig()))
// DefaultAlias is the alias for function invocations.
var DefaultAlias = "current"
// Lambda interface.
type Lambda interface {
Invoke(*lambda.InvokeInput) (*lambda.InvokeOutput, error)
}
// Error represents a Lambda error.
type Error struct {
// Message is the error message returned from Lambda.
Message string `json:"errorMessage"`
// Handled specifies if the error was controlled or not.
// For example a timeout is unhandled, while an error returned from
// the function is handled.
Handled bool
}
// Error message.
func (e *Error) Error() string {
if e.Handled {
return fmt.Sprintf("handled: %s", e.Message)
} else {
return fmt.Sprintf("unhandled: %s", e.Message)
}
}
// Sync invokes function `name` synchronously with the default client.
func Sync(name string, in, out interface{}) error {
return InvokeSync(DefaultClient, name, in, out)
}
// SyncQualifier invokes function `name` (version or alias specified by `qualifier`) synchronously with the default client.
func SyncQualifier(name, qualifier string, in, out interface{}) error {
return InvokeSyncQualifier(DefaultClient, name, qualifier, in, out)
}
// InvokeSync invokes function `name` synchronously with the given `client`.
func InvokeSync(client Lambda, name string, in, out interface{}) error {
return InvokeSyncQualifier(client, name, DefaultAlias, in, out)
}
// InvokeSyncQualifier invokes function `name` (version or alias specified by `qualifier`) synchronously with the given `client`.
func InvokeSyncQualifier(client Lambda, name, qualifier string, in, out interface{}) error {
b, err := json.Marshal(in)
if err != nil {
return errors.Wrap(err, "marshalling input")
}
res, err := client.Invoke(&lambda.InvokeInput{
FunctionName: &name,
Qualifier: &qualifier,
Payload: b,
})
if err != nil {
return errors.Wrap(err, "invoking function")
}
if res.FunctionError != nil {
err := &Error{
Handled: *res.FunctionError == "Handled",
}
if e := json.Unmarshal(res.Payload, &err); e != nil {
return errors.Wrap(e, "unmarshalling error response")
}
return err
}
if err := json.Unmarshal(res.Payload, &out); err != nil {
return errors.Wrap(err, "unmarshalling response")
}
return nil
}
// Async invokes function `name` asynchronously with the default client.
func Async(name string, in interface{}) error {
return InvokeAsync(DefaultClient, name, in)
}
// AsyncQualifier invokes function `name` (version or alias specified by `qualifier`) asynchronously with the default client.
func AsyncQualifier(name, qualifier string, in interface{}) error {
return InvokeAsyncQualifier(DefaultClient, name, qualifier, in)
}
// InvokeAsync invokes function `name` asynchronously with the given `client`.
func InvokeAsync(client Lambda, name string, in interface{}) error {
return InvokeAsyncQualifier(client, name, DefaultAlias, in)
}
// InvokeAsyncQualifier invokes function `name` (version or alias specified by `qualifier`) asynchronously with the given `client`.
func InvokeAsyncQualifier(client Lambda, name, qualifier string, in interface{}) error {
b, err := json.Marshal(in)
if err != nil {
return errors.Wrap(err, "marshalling input")
}
_, err = client.Invoke(&lambda.InvokeInput{
FunctionName: &name,
InvocationType: aws.String("Event"),
Qualifier: &qualifier,
Payload: b,
})
if err != nil {
return errors.Wrap(err, "invoking function")
}
return nil
}