Skip to content

Commit

Permalink
Merge pull request #689 from luraproject/multiple_unsafe_req
Browse files Browse the repository at this point in the history
Allow multiple unsafe requests on the same endpoint
  • Loading branch information
kpacha authored Sep 28, 2023
2 parents 691640c + a77c39b commit 965dacf
Show file tree
Hide file tree
Showing 6 changed files with 360 additions and 160 deletions.
41 changes: 33 additions & 8 deletions proxy/merging.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package proxy
import (
"context"
"fmt"
"net/http"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -41,21 +42,26 @@ func NewMergeDataMiddleware(logger logging.Logger, endpointConfig *config.Endpoi
if len(next) != totalBackends {
panic(ErrNotEnoughProxies)
}
reqClone := func(r *Request) *Request { return r }

if hasUnsafeBackends(endpointConfig) {
reqClone = CloneRequest
}

if !isSequential {
return parallelMerge(serviceTimeout, combiner, next...)
return parallelMerge(reqClone, serviceTimeout, combiner, next...)
}

patterns := make([]string, len(endpointConfig.Backend))
for i, b := range endpointConfig.Backend {
patterns[i] = b.URLPattern
}
return sequentialMerge(patterns, serviceTimeout, combiner, next...)
return sequentialMerge(reqClone, patterns, serviceTimeout, combiner, next...)
}
}

func shouldRunSequentialMerger(endpointConfig *config.EndpointConfig) bool {
if v, ok := endpointConfig.ExtraConfig[Namespace]; ok {
func shouldRunSequentialMerger(cfg *config.EndpointConfig) bool {
if v, ok := cfg.ExtraConfig[Namespace]; ok {
if e, ok := v.(map[string]interface{}); ok {
if v, ok := e[isSequentialKey]; ok {
c, ok := v.(bool)
Expand All @@ -66,15 +72,32 @@ func shouldRunSequentialMerger(endpointConfig *config.EndpointConfig) bool {
return false
}

func parallelMerge(timeout time.Duration, rc ResponseCombiner, next ...Proxy) Proxy {
func hasUnsafeBackends(cfg *config.EndpointConfig) bool {
if len(cfg.Backend) == 1 {
return false
}

hasOneUnsafe := false
for _, b := range cfg.Backend {
if m := strings.ToUpper(b.Method); m != http.MethodGet && m != http.MethodHead {
if hasOneUnsafe {
return true
}
hasOneUnsafe = true
}
}
return false
}

func parallelMerge(reqCloner func(*Request) *Request, timeout time.Duration, rc ResponseCombiner, next ...Proxy) Proxy {
return func(ctx context.Context, request *Request) (*Response, error) {
localCtx, cancel := context.WithTimeout(ctx, timeout)

parts := make(chan *Response, len(next))
failed := make(chan error, len(next))

for _, n := range next {
go requestPart(localCtx, n, request, parts, failed)
go requestPart(localCtx, n, reqCloner(request), parts, failed)
}

acc := newIncrementalMergeAccumulator(len(next), rc)
Expand All @@ -95,7 +118,7 @@ func parallelMerge(timeout time.Duration, rc ResponseCombiner, next ...Proxy) Pr

var reMergeKey = regexp.MustCompile(`\{\{\.Resp(\d+)_([\d\w-_\.]+)\}\}`)

func sequentialMerge(patterns []string, timeout time.Duration, rc ResponseCombiner, next ...Proxy) Proxy {
func sequentialMerge(reqCloner func(*Request) *Request, patterns []string, timeout time.Duration, rc ResponseCombiner, next ...Proxy) Proxy {
return func(ctx context.Context, request *Request) (*Response, error) {
localCtx, cancel := context.WithTimeout(ctx, timeout)

Expand Down Expand Up @@ -164,7 +187,9 @@ func sequentialMerge(patterns []string, timeout time.Duration, rc ResponseCombin
}
}
}
sequentialRequestPart(localCtx, n, request, out, errCh)

sequentialRequestPart(localCtx, n, reqCloner(request), out, errCh)

select {
case err := <-errCh:
if i == 0 {
Expand Down
81 changes: 81 additions & 0 deletions proxy/stack_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,87 @@ func BenchmarkProxyStack_multi(b *testing.B) {
}
}

func BenchmarkProxyStack_multipost(b *testing.B) {
backendGET := &config.Backend{
ConcurrentCalls: 3,
Timeout: time.Duration(100) * time.Millisecond,
Host: []string{"supu:8080"},
Method: "GET",
URLPattern: "/a/{{.Tupu}}",
DenyList: []string{"map.aaaa"},
Mapping: map[string]string{"supu": "SUPUUUUU"},
}

backendPOST := &config.Backend{
ConcurrentCalls: 3,
Timeout: time.Duration(100) * time.Millisecond,
Host: []string{"supu:8080"},
Method: "POST",
URLPattern: "/a/{{.Tupu}}",
DenyList: []string{"map.aaaa"},
Mapping: map[string]string{"supu": "SUPUUUUU"},
}

request := &Request{
Method: "POST",
Body: newDummyReadCloser(""),
Params: map[string]string{"Tupu": "true"},
Headers: map[string][]string{},
}

for _, testCase := range [][]*config.Backend{
{backendGET},
{backendGET, backendPOST},
{backendGET, backendPOST, backendGET},
{backendGET, backendPOST, backendGET, backendPOST},
{backendGET, backendPOST, backendGET, backendPOST, backendPOST},
} {
b.Run(fmt.Sprintf("with %d backends", len(testCase)), func(b *testing.B) {

cfg := &config.EndpointConfig{
Backend: testCase,
}

backendProxy := make([]Proxy, len(cfg.Backend))

for i, backend := range cfg.Backend {
ef := NewEntityFormatter(backend)
backendProxy[i] = func(_ context.Context, _ *Request) (*Response, error) {
res := ef.Format(Response{
Data: map[string]interface{}{
"supu": 42,
"tupu": true,
"foo": "bar",
"map": map[string]interface{}{"aaaa": false},
"col": []interface{}{
map[string]interface{}{
"a": 1,
"b": 2,
},
},
},
IsComplete: true,
})
return &res, nil
}
backendProxy[i] = NewRoundRobinLoadBalancedMiddleware(backend)(backendProxy[i])
backendProxy[i] = NewConcurrentMiddleware(backend)(backendProxy[i])
backendProxy[i] = NewRequestBuilderMiddleware(backend)(backendProxy[i])
}
p := NewMergeDataMiddleware(logging.NoOp, cfg)(backendProxy...)
p = NewStaticMiddleware(logging.NoOp, cfg)(p)

var r *Response
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
r, _ = p(context.Background(), request)
}
result = r
})
}
}

func BenchmarkProxyStack_single_flatmap(b *testing.B) {
backend := &config.Backend{
ConcurrentCalls: 3,
Expand Down
29 changes: 13 additions & 16 deletions router/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,24 @@
package router

import (
"net/http"

"github.com/luraproject/lura/v2/config"
"github.com/luraproject/lura/v2/proxy"
)

func IsValidSequentialEndpoint(endpoint *config.EndpointConfig) bool {
if endpoint.ExtraConfig[proxy.Namespace] == nil {
return false
}
func IsValidSequentialEndpoint(_ *config.EndpointConfig) bool {
// if endpoint.ExtraConfig[proxy.Namespace] == nil {
// return false
// }

proxyCfg := endpoint.ExtraConfig[proxy.Namespace].(map[string]interface{})
if proxyCfg["sequential"] == false {
return false
}
// proxyCfg := endpoint.ExtraConfig[proxy.Namespace].(map[string]interface{})
// if proxyCfg["sequential"] == false {
// return false
// }

for i, backend := range endpoint.Backend {
if backend.Method != http.MethodGet && (i+1) != len(endpoint.Backend) {
return false
}
}
// for i, backend := range endpoint.Backend {
// if backend.Method != http.MethodGet && (i+1) != len(endpoint.Backend) {
// return false
// }
// }

return true
}
Loading

0 comments on commit 965dacf

Please sign in to comment.