diff --git a/README.md b/README.md index ecc2b36..693ee26 100644 --- a/README.md +++ b/README.md @@ -23,9 +23,12 @@ remote_write: - url: "http://prometheus-timestream-adapter:9201/write" ``` -### Why is there no remote_reader? +:warning: **There is a very early remote_reader version!** [AWS Timestream](https://aws.amazon.com/timestream) has a very powerful [query language](https://docs.aws.amazon.com/timestream/latest/developerguide/reference.html) and there is a [Grafana Plugin](https://grafana.com/grafana/plugins/grafana-timestream-datasource) supporting Timestream as a datasource. However, there is a very basic reader implementation. -[AWS Timestream](https://aws.amazon.com/timestream) has a very powerful [query language](https://docs.aws.amazon.com/timestream/latest/developerguide/reference.html) and there is a [Grafana Plugin](https://grafana.com/grafana/plugins/grafana-timestream-datasource) supporting Timestream as a datasource. However, this is the reason why I don't think a reader implementation is needed. +```yaml +remote_read: + - url: "http://prometheus-timestream-adapter:9201/read" +``` ## FAQ diff --git a/go.mod b/go.mod index f0ca581..52cdc1a 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,9 @@ require ( github.com/gogo/protobuf v1.3.1 github.com/golang/snappy v0.0.2 github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect + github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.8.0 + github.com/prometheus/common v0.14.0 github.com/prometheus/prometheus v2.5.0+incompatible github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.4.0 diff --git a/go.sum b/go.sum index c68591e..b1af8c8 100644 --- a/go.sum +++ b/go.sum @@ -217,6 +217,7 @@ github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0 github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/handler.go b/handler.go index aeba255..2bbcb54 100644 --- a/handler.go +++ b/handler.go @@ -19,13 +19,12 @@ package main import ( - "io/ioutil" - "net/http" - "github.com/gogo/protobuf/proto" "github.com/golang/snappy" "github.com/prometheus/prometheus/prompb" "go.uber.org/zap" + "io/ioutil" + "net/http" ) func writeHandler(logger *zap.SugaredLogger, ad adapter) http.HandlerFunc { @@ -51,10 +50,7 @@ func writeHandler(logger *zap.SugaredLogger, ad adapter) http.HandlerFunc { return } - records := protoToRecords(logger, &req) - receivedSamples.Add(float64(len(records))) - - err = sendRecords(logger, ad, records) + err = ad.Write(&req) if err != nil { w.WriteHeader(http.StatusInternalServerError) @@ -62,3 +58,50 @@ func writeHandler(logger *zap.SugaredLogger, ad adapter) http.HandlerFunc { } } } + +func readHandler(logger *zap.SugaredLogger, ad adapter) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + compressed, err := ioutil.ReadAll(r.Body) + if err != nil { + logger.Errorw("Read error", "err", err.Error()) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + reqBuf, err := snappy.Decode(nil, compressed) + if err != nil { + logger.Errorw("Decode error", "err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var req prompb.ReadRequest + if err := proto.Unmarshal(reqBuf, &req); err != nil { + logger.Errorw("Unmarshal error", "err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var resp *prompb.ReadResponse + resp, err = ad.Read(&req) + if err != nil { + logger.Warnw("Error executing query", "query", req, "err", err.Error()) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + data, err := proto.Marshal(resp) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/x-protobuf") + w.Header().Set("Content-Encoding", "snappy") + + compressed = snappy.Encode(nil, data) + if _, err := w.Write(compressed); err != nil { + logger.Warn("Error writing response", "err", err) + } + } +} diff --git a/handler_test.go b/handler_test.go index 596458e..f10582e 100644 --- a/handler_test.go +++ b/handler_test.go @@ -26,6 +26,7 @@ import ( "testing" "time" + "github.com/aws/aws-sdk-go/service/timestreamquery/timestreamqueryiface" "github.com/aws/aws-sdk-go/service/timestreamwrite" "github.com/aws/aws-sdk-go/service/timestreamwrite/timestreamwriteiface" "github.com/gogo/protobuf/proto" @@ -39,6 +40,10 @@ type TimeStreamWriterMock struct { timestreamwriteiface.TimestreamWriteAPI } +type TimeStreamQueryMock struct { + timestreamqueryiface.TimestreamQueryAPI +} + func (t TimeStreamWriterMock) WriteRecords(input *timestreamwrite.WriteRecordsInput) (*timestreamwrite.WriteRecordsOutput, error) { for _, i := range input.Records { if *i.MeasureName == "sample_name_error" { @@ -118,8 +123,8 @@ func Test_writeHandler(t *testing.T) { assert.NoError(t, err) rr := httptest.NewRecorder() - ad := newTimeStreamAdapter(zap.NewNop().Sugar(), cfg, TimeStreamWriterMock{}) - writeHandler(zap.NewNop().Sugar(), ad).ServeHTTP(rr, req) + ad := newTimeStreamAdapter(zap.NewNop().Sugar(), cfg, TimeStreamWriterMock{}, TimeStreamQueryMock{}) + writeHandler(zap.NewNop().Sugar(), &ad).ServeHTTP(rr, req) assert.Equal(t, tt.want, rr.Code) }) diff --git a/main.go b/main.go index 8d47bbb..b520260 100644 --- a/main.go +++ b/main.go @@ -19,16 +19,15 @@ package main import ( + "go.uber.org/zap" "log" "net/http" "os" - "time" - "github.com/aws/aws-sdk-go/service/timestreamwrite" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/prometheus/prometheus/prompb" flag "github.com/spf13/pflag" - "go.uber.org/zap" ) type config struct { @@ -40,6 +39,7 @@ type config struct { tlsCert string tlsKey string tls bool + debug bool } var ( @@ -89,6 +89,7 @@ func init() { flag.StringVar(&cfg.tlsCert, "tlsCert", "tls.cert", "") flag.StringVar(&cfg.tlsKey, "tlsKey", "tls.key", "") flag.BoolVar(&cfg.tls, "tls", false, "") + flag.BoolVar(&cfg.debug, "debug", false, "") flag.Parse() } @@ -96,6 +97,11 @@ func init() { func main() { zapConfig := zap.NewProductionConfig() zapConfig.DisableStacktrace = true + + if cfg.debug { + zapConfig.Level = zap.NewAtomicLevelAt(zap.DebugLevel) + } + sugarLogger, err := zapConfig.Build() if err != nil { @@ -105,21 +111,23 @@ func main() { defer sugarLogger.Sync() // flushes buffer, if any sugar := sugarLogger.Sugar() - timeStreamAdapter := newTimeStreamAdapter(sugar, cfg, nil) - if err := serve(sugar, cfg.listenAddr, timeStreamAdapter); err != nil { + timeStreamAdapter := newTimeStreamAdapter(sugar, cfg, nil, nil) + if err := serve(sugar, cfg.listenAddr, &timeStreamAdapter); err != nil { sugar.Errorw("Failed to listen", "addr", cfg.listenAddr, "err", err) os.Exit(1) } } type adapter interface { - Write(records []*timestreamwrite.Record) error + Write(records *prompb.WriteRequest) error + Read(request *prompb.ReadRequest) (*prompb.ReadResponse, error) Name() string } func serve(logger *zap.SugaredLogger, addr string, ad adapter) error { http.Handle(cfg.telemetryPath, promhttp.Handler()) http.Handle("/write", writeHandler(logger, ad)) + http.Handle("/read", readHandler(logger, ad)) if cfg.tls { return http.ListenAndServeTLS(addr, cfg.tlsCert, cfg.tlsKey, nil) @@ -127,17 +135,3 @@ func serve(logger *zap.SugaredLogger, addr string, ad adapter) error { return http.ListenAndServe(addr, nil) } - -func sendRecords(logger *zap.SugaredLogger, ad adapter, records []*timestreamwrite.Record) (err error) { - begin := time.Now() - err = ad.Write(records) - duration := time.Since(begin).Seconds() - if err != nil { - logger.Warnw("Error sending samples to remote storage", "err", err, "storage", ad.Name(), "num_samples", len(records)) - failedSamples.WithLabelValues(ad.Name()).Add(float64(len(records))) - } - sentSamples.WithLabelValues(ad.Name()).Add(float64(len(records))) - sentBatchDuration.WithLabelValues(ad.Name()).Observe(duration) - - return -} diff --git a/timestream.go b/timestream.go index dc5b8fa..b5ac296 100644 --- a/timestream.go +++ b/timestream.go @@ -20,17 +20,23 @@ package main import ( "fmt" - "go.uber.org/zap" "math" "net" "net/http" + "strconv" + "strings" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/timestreamquery" + "github.com/aws/aws-sdk-go/service/timestreamquery/timestreamqueryiface" "github.com/aws/aws-sdk-go/service/timestreamwrite" "github.com/aws/aws-sdk-go/service/timestreamwrite/timestreamwriteiface" + "github.com/pkg/errors" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/prompb" + "go.uber.org/zap" "golang.org/x/net/http2" ) @@ -39,15 +45,199 @@ type TimeSteamAdapter struct { tableName string databaseName string ttw timestreamwriteiface.TimestreamWriteAPI + ttq timestreamqueryiface.TimestreamQueryAPI } -func (t TimeSteamAdapter) Write(records []*timestreamwrite.Record) (err error) { +type queryTask struct { + query string + measureName string +} + +func (t *TimeSteamAdapter) Write(records *prompb.WriteRequest) (err error) { + begin := time.Now() + + wReq := t.protoToRecords(records) + receivedSamples.Add(float64(len(wReq))) + _, err = t.ttw.WriteRecords(×treamwrite.WriteRecordsInput{ DatabaseName: aws.String(t.databaseName), TableName: aws.String(t.tableName), - Records: records, + Records: wReq, }) + duration := time.Since(begin).Seconds() + if err != nil { + t.logger.Warnw("Error sending samples to remote storage", "err", err, "storage", t.Name(), "num_samples", len(wReq)) + failedSamples.WithLabelValues(t.Name()).Add(float64(len(wReq))) + } + sentSamples.WithLabelValues(t.Name()).Add(float64(len(wReq))) + sentBatchDuration.WithLabelValues(t.Name()).Observe(duration) + + return +} + +// BuildCommand generates the proper SQL for the query +func (t *TimeSteamAdapter) buildQuery(q *prompb.Query) (task *queryTask, err error) { + matchers := make([]string, 0, len(q.Matchers)) + task = new(queryTask) + for _, m := range q.Matchers { + // Metric Names + if m.Name == model.MetricNameLabel { + task.measureName = m.Value + switch m.Type { + case prompb.LabelMatcher_EQ: + matchers = append(matchers, fmt.Sprintf("measure_name = '%s'", m.Value)) + default: + return nil, errors.Errorf("unsupported match type %v", m.Type) + } + continue + } + + // Labels + switch m.Type { + case prompb.LabelMatcher_EQ: + matchers = append(matchers, fmt.Sprintf("%s = '%s'", m.Name, m.Value)) + case prompb.LabelMatcher_NEQ: + matchers = append(matchers, fmt.Sprintf("%s != '%s'", m.Name, m.Value)) + case prompb.LabelMatcher_RE: + matchers = append(matchers, fmt.Sprintf("%s LIKE '%s'", m.Name, m.Value)) + case prompb.LabelMatcher_NRE: + matchers = append(matchers, fmt.Sprintf("%s NOT LIKE '%s'", m.Name, m.Value)) + default: + return nil, errors.Errorf("unknown match type %v", m.Type) + } + } + + matchers = append(matchers, fmt.Sprintf("time BETWEEN from_milliseconds(%d) AND from_milliseconds(%d)", q.StartTimestampMs, q.EndTimestampMs)) + + dimensions, err := t.readDimension(task.measureName) + + if err != nil { + return + } + + task.query = fmt.Sprintf("SELECT %s, CREATE_TIME_SERIES(time, measure_value::double) AS %s FROM \"%s\".\"%s\" WHERE %s GROUP BY %s", + strings.Join(dimensions, ", "), task.measureName, cfg.databaseName, cfg.tableName, strings.Join(matchers, " AND "), strings.Join(dimensions, ", ")) + + t.logger.Debugw("Timestream read", "query", task.query) + + return +} + +func (t TimeSteamAdapter) readDimension(measureName string) (dimensions []string, err error) { + query := fmt.Sprintf("SHOW MEASURES FROM \"%s\".\"%s\" LIKE '%s'", cfg.databaseName, cfg.tableName, measureName) + + queryOutput, err := t.ttq.Query(×treamquery.QueryInput{QueryString: &query}) + if err != nil { + return + } + + for i, q := range queryOutput.ColumnInfo { + if *q.Name == "dimensions" { + for _, rv := range queryOutput.Rows[0].Data[i].ArrayValue { + for _, d := range rv.RowValue.Data { + if *d.ScalarValue != "varchar" { + dimensions = append(dimensions, *d.ScalarValue) + } + } + } + } + } + + return +} + +func (t *TimeSteamAdapter) handleQueryResult(qo *timestreamquery.QueryOutput, measureName string) (timeSeries []*prompb.TimeSeries, err error) { + for _, row := range qo.Rows { + var ts prompb.TimeSeries + + ts.Labels = append(ts.Labels, &prompb.Label{ + Name: model.MetricNameLabel, + Value: measureName, + }) + + for i, d := range row.Data { + if d.ScalarValue != nil { + ts.Labels = append(ts.Labels, &prompb.Label{ + Name: *qo.ColumnInfo[i].Name, + Value: *d.ScalarValue, + }) + } + if d.TimeSeriesValue != nil { + for _, p := range d.TimeSeriesValue { + value, err := strconv.ParseFloat(*p.Value.ScalarValue, 64) + if err != nil { + continue + } + + s, err := time.Parse("2006-01-02 15:04:05.999999999", *p.Time) + + if err != nil { + continue + } + + t := s.UnixNano() / int64(time.Millisecond) + ts.Samples = append(ts.Samples, prompb.Sample{ + Value: value, + Timestamp: t, + }) + } + } + } + + timeSeries = append(timeSeries, &ts) + } + return +} + +func (t TimeSteamAdapter) query(q *prompb.Query) (result prompb.QueryResult, err error) { + task, err := t.buildQuery(q) + + if err != nil { + return + } + + input := ×treamquery.QueryInput{ + QueryString: &task.query, + } + + out, err := t.ttq.Query(input) + if err != nil { + return + } + + timeSeries, err := t.handleQueryResult(out, task.measureName) + + if err != nil { + return + } + + result = prompb.QueryResult{ + Timeseries: timeSeries, + } + + return +} + +func (t *TimeSteamAdapter) Read(request *prompb.ReadRequest) (response *prompb.ReadResponse, err error) { + var queryResult prompb.QueryResult + var queryResults []*prompb.QueryResult + + for _, q := range request.Queries { + queryResult, err = t.query(q) + + if err != nil { + return + } + + queryResults = append(queryResults, &queryResult) + + } + + response = &prompb.ReadResponse{ + Results: queryResults, + } + return } @@ -55,7 +245,7 @@ func (t TimeSteamAdapter) Name() string { return "prometheus-timestream-adapter" } -func newTimeStreamAdapter(logger *zap.SugaredLogger, cfg *config, writeSvc timestreamwriteiface.TimestreamWriteAPI) TimeSteamAdapter { +func newTimeStreamAdapter(logger *zap.SugaredLogger, cfg *config, writeSvc timestreamwriteiface.TimestreamWriteAPI, readSvc timestreamqueryiface.TimestreamQueryAPI) TimeSteamAdapter { tr := &http.Transport{ ResponseHeaderTimeout: 20 * time.Second, // Using DefaultTransport values for other parameters: https://golang.org/pkg/net/http/#RoundTripper @@ -73,20 +263,24 @@ func newTimeStreamAdapter(logger *zap.SugaredLogger, cfg *config, writeSvc times // So client makes HTTP/2 requests http2.ConfigureTransport(tr) - if writeSvc == nil { - writeSvc = timestreamwrite.New( - session.Must( - session.NewSession( - &aws.Config{ - Region: aws.String(cfg.awsRegion), - MaxRetries: aws.Int(10), - HTTPClient: &http.Client{ - Transport: tr, - }, - }, - ), - ), - ) + if writeSvc == nil || readSvc == nil { + sess := session.Must(session.NewSession( + &aws.Config{ + Region: aws.String(cfg.awsRegion), + MaxRetries: aws.Int(10), + HTTPClient: &http.Client{ + Transport: tr, + }, + }, + )) + + if writeSvc == nil { + writeSvc = timestreamwrite.New(sess) + } + + if readSvc == nil { + readSvc = timestreamquery.New(sess) + } } return TimeSteamAdapter{ @@ -94,12 +288,13 @@ func newTimeStreamAdapter(logger *zap.SugaredLogger, cfg *config, writeSvc times databaseName: cfg.databaseName, tableName: cfg.tableName, ttw: writeSvc, + ttq: readSvc, } } -func readLabels(labels []*prompb.Label) (dimensions []*timestreamwrite.Dimension, measureName string) { +func (t TimeSteamAdapter) readLabels(labels []*prompb.Label) (dimensions []*timestreamwrite.Dimension, measureName string) { for _, s := range labels { - if s.Name == "__name__" { + if s.Name == model.MetricNameLabel { measureName = s.Value continue } @@ -112,15 +307,15 @@ func readLabels(labels []*prompb.Label) (dimensions []*timestreamwrite.Dimension return } -func protoToRecords(logger *zap.SugaredLogger, req *prompb.WriteRequest) (records []*timestreamwrite.Record) { +func (t TimeSteamAdapter) protoToRecords(req *prompb.WriteRequest) (records []*timestreamwrite.Record) { for _, ts := range req.Timeseries { - dimensions, measureName := readLabels(ts.Labels) + dimensions, measureName := t.readLabels(ts.Labels) for _, s := range ts.Samples { switch { case math.IsNaN(s.Value) || math.IsInf(s.Value, 0): continue case len(measureName) >= 62: - logger.Warnw("Measure name exceeds the maximum supported length", "Measure name", measureName, "Measure name length", len(measureName)) + t.logger.Warnw("Measure name exceeds the maximum supported length", "Measure name", measureName, "Length", len(measureName)) continue } diff --git a/timestream_test.go b/timestream_test.go index 6eed3a1..080b6d0 100644 --- a/timestream_test.go +++ b/timestream_test.go @@ -29,6 +29,10 @@ import ( "go.uber.org/zap" ) +var ( + mockAdapter = newTimeStreamAdapter(zap.NewNop().Sugar(), cfg, TimeStreamWriterMock{}, TimeStreamQueryMock{}) +) + func Test_readLabels(t *testing.T) { type args struct { labels []*prompb.Label @@ -64,7 +68,7 @@ func Test_readLabels(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - gotDimensions, gotMeasureName := readLabels(tt.args.labels) + gotDimensions, gotMeasureName := mockAdapter.readLabels(tt.args.labels) if !reflect.DeepEqual(gotDimensions, tt.wantDimensions) { t.Errorf("readLabels() gotDimensions = %v, want %v", gotDimensions, tt.wantDimensions) } @@ -241,7 +245,7 @@ func Test_protoToRecords(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if gotRecords := protoToRecords(zap.NewNop().Sugar(), tt.args.req); !reflect.DeepEqual(gotRecords, tt.wantRecords) { + if gotRecords := mockAdapter.protoToRecords(tt.args.req); !reflect.DeepEqual(gotRecords, tt.wantRecords) { t.Errorf("protoToRecords() = %v, want %v", gotRecords, tt.wantRecords) } })