Skip to content

Commit

Permalink
feat: Read handler
Browse files Browse the repository at this point in the history
  • Loading branch information
dpattmann committed Dec 1, 2020
1 parent ddbebf8 commit 9520e82
Show file tree
Hide file tree
Showing 8 changed files with 303 additions and 56 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ remote_write:
- url: "http://prometheus-timestream-adapter:9201/write"
```
### Why is there no remote_reader?
:warning: **There is a very early remote_reader version!** [AWS Timestream](https://aws.amazon.com/timestream) has a very powerful [query language](https://docs.aws.amazon.com/timestream/latest/developerguide/reference.html) and there is a [Grafana Plugin](https://grafana.com/grafana/plugins/grafana-timestream-datasource) supporting Timestream as a datasource. However, there is a very basic reader implementation.
[AWS Timestream](https://aws.amazon.com/timestream) has a very powerful [query language](https://docs.aws.amazon.com/timestream/latest/developerguide/reference.html) and there is a [Grafana Plugin](https://grafana.com/grafana/plugins/grafana-timestream-datasource) supporting Timestream as a datasource. However, this is the reason why I don't think a reader implementation is needed.
```yaml
remote_read:
- url: "http://prometheus-timestream-adapter:9201/read"
```
## FAQ
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ require (
github.com/gogo/protobuf v1.3.1
github.com/golang/snappy v0.0.2
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.8.0
github.com/prometheus/common v0.14.0
github.com/prometheus/prometheus v2.5.0+incompatible
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.4.0
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
57 changes: 50 additions & 7 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
package main

import (
"io/ioutil"
"net/http"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"
"go.uber.org/zap"
"io/ioutil"
"net/http"
)

func writeHandler(logger *zap.SugaredLogger, ad adapter) http.HandlerFunc {
Expand All @@ -51,14 +50,58 @@ func writeHandler(logger *zap.SugaredLogger, ad adapter) http.HandlerFunc {
return
}

records := protoToRecords(logger, &req)
receivedSamples.Add(float64(len(records)))

err = sendRecords(logger, ad, records)
err = ad.Write(&req)

if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("Error writing to AWS Timestream"))
}
}
}

func readHandler(logger *zap.SugaredLogger, ad adapter) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
compressed, err := ioutil.ReadAll(r.Body)
if err != nil {
logger.Errorw("Read error", "err", err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
logger.Errorw("Decode error", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

var req prompb.ReadRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
logger.Errorw("Unmarshal error", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

var resp *prompb.ReadResponse
resp, err = ad.Read(&req)
if err != nil {
logger.Warnw("Error executing query", "query", req, "err", err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

data, err := proto.Marshal(resp)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/x-protobuf")
w.Header().Set("Content-Encoding", "snappy")

compressed = snappy.Encode(nil, data)
if _, err := w.Write(compressed); err != nil {
logger.Warn("Error writing response", "err", err)
}
}
}
9 changes: 7 additions & 2 deletions handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"testing"
"time"

"github.com/aws/aws-sdk-go/service/timestreamquery/timestreamqueryiface"
"github.com/aws/aws-sdk-go/service/timestreamwrite"
"github.com/aws/aws-sdk-go/service/timestreamwrite/timestreamwriteiface"
"github.com/gogo/protobuf/proto"
Expand All @@ -39,6 +40,10 @@ type TimeStreamWriterMock struct {
timestreamwriteiface.TimestreamWriteAPI
}

type TimeStreamQueryMock struct {
timestreamqueryiface.TimestreamQueryAPI
}

func (t TimeStreamWriterMock) WriteRecords(input *timestreamwrite.WriteRecordsInput) (*timestreamwrite.WriteRecordsOutput, error) {
for _, i := range input.Records {
if *i.MeasureName == "sample_name_error" {
Expand Down Expand Up @@ -118,8 +123,8 @@ func Test_writeHandler(t *testing.T) {
assert.NoError(t, err)

rr := httptest.NewRecorder()
ad := newTimeStreamAdapter(zap.NewNop().Sugar(), cfg, TimeStreamWriterMock{})
writeHandler(zap.NewNop().Sugar(), ad).ServeHTTP(rr, req)
ad := newTimeStreamAdapter(zap.NewNop().Sugar(), cfg, TimeStreamWriterMock{}, TimeStreamQueryMock{})
writeHandler(zap.NewNop().Sugar(), &ad).ServeHTTP(rr, req)

assert.Equal(t, tt.want, rr.Code)
})
Expand Down
34 changes: 14 additions & 20 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@
package main

import (
"go.uber.org/zap"
"log"
"net/http"
"os"
"time"

"github.com/aws/aws-sdk-go/service/timestreamwrite"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/prometheus/prompb"
flag "github.com/spf13/pflag"
"go.uber.org/zap"
)

type config struct {
Expand All @@ -40,6 +39,7 @@ type config struct {
tlsCert string
tlsKey string
tls bool
debug bool
}

var (
Expand Down Expand Up @@ -89,13 +89,19 @@ func init() {
flag.StringVar(&cfg.tlsCert, "tlsCert", "tls.cert", "")
flag.StringVar(&cfg.tlsKey, "tlsKey", "tls.key", "")
flag.BoolVar(&cfg.tls, "tls", false, "")
flag.BoolVar(&cfg.debug, "debug", false, "")

flag.Parse()
}

func main() {
zapConfig := zap.NewProductionConfig()
zapConfig.DisableStacktrace = true

if cfg.debug {
zapConfig.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
}

sugarLogger, err := zapConfig.Build()

if err != nil {
Expand All @@ -105,39 +111,27 @@ func main() {
defer sugarLogger.Sync() // flushes buffer, if any
sugar := sugarLogger.Sugar()

timeStreamAdapter := newTimeStreamAdapter(sugar, cfg, nil)
if err := serve(sugar, cfg.listenAddr, timeStreamAdapter); err != nil {
timeStreamAdapter := newTimeStreamAdapter(sugar, cfg, nil, nil)
if err := serve(sugar, cfg.listenAddr, &timeStreamAdapter); err != nil {
sugar.Errorw("Failed to listen", "addr", cfg.listenAddr, "err", err)
os.Exit(1)
}
}

type adapter interface {
Write(records []*timestreamwrite.Record) error
Write(records *prompb.WriteRequest) error
Read(request *prompb.ReadRequest) (*prompb.ReadResponse, error)
Name() string
}

func serve(logger *zap.SugaredLogger, addr string, ad adapter) error {
http.Handle(cfg.telemetryPath, promhttp.Handler())
http.Handle("/write", writeHandler(logger, ad))
http.Handle("/read", readHandler(logger, ad))

if cfg.tls {
return http.ListenAndServeTLS(addr, cfg.tlsCert, cfg.tlsKey, nil)
}

return http.ListenAndServe(addr, nil)
}

func sendRecords(logger *zap.SugaredLogger, ad adapter, records []*timestreamwrite.Record) (err error) {
begin := time.Now()
err = ad.Write(records)
duration := time.Since(begin).Seconds()
if err != nil {
logger.Warnw("Error sending samples to remote storage", "err", err, "storage", ad.Name(), "num_samples", len(records))
failedSamples.WithLabelValues(ad.Name()).Add(float64(len(records)))
}
sentSamples.WithLabelValues(ad.Name()).Add(float64(len(records)))
sentBatchDuration.WithLabelValues(ad.Name()).Observe(duration)

return
}
Loading

0 comments on commit 9520e82

Please sign in to comment.