-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PoC for Async streams support in Tyk (Kafka / MQTT / AMQP etc.) [DO NOT MERGE] #6288
Conversation
PR Description updated to latest commit (e383125) |
API Changes --- prev.txt 2024-05-16 15:34:54.530972937 +0000
+++ current.txt 2024-05-16 15:34:47.158905916 +0000
@@ -3154,12 +3154,6 @@
// TransformResponseHeaders contains the configurations related to API level response header transformation.
// Tyk classic API definition: `global_response_headers`/`global_response_headers_remove`.
TransformResponseHeaders *TransformHeaders `bson:"transformResponseHeaders,omitempty" json:"transformResponseHeaders,omitempty"`
-
- // ContextVariables contains the configuration related to Tyk context variables.
- ContextVariables *ContextVariables `bson:"contextVariables,omitempty" json:"contextVariables,omitempty"`
-
- // TrafficLogs contains the configurations related to API level log analytics.
- TrafficLogs *TrafficLogs `bson:"trafficLogs,omitempty" json:"trafficLogs,omitempty"`
}
Global contains configuration that affects the whole API (all endpoints).
@@ -3993,6 +3987,9 @@
//
// Tyk classic API definition: `event_handlers`
EventHandlers EventHandlers `bson:"eventHandlers,omitempty" json:"eventHandlers,omitempty"`
+
+ // ContextVariables contains the configuration related to Tyk context variables.
+ ContextVariables *ContextVariables `bson:"contextVariables,omitempty" json:"contextVariables,omitempty"`
}
Server contains the configuration that sets Tyk up to receive requests from
the client applications.
@@ -4219,19 +4216,6 @@
func (i *TrackEndpoint) Fill(meta apidef.TrackEndpointMeta)
Fill fills *TrackEndpoint receiver with data from apidef.TrackEndpointMeta.
-type TrafficLogs struct {
- // Enabled enables traffic log analytics for the API.
- // Tyk classic API definition: `do_not_track`.
- Enabled bool `bson:"enabled" json:"enabled"`
-}
- TrafficLogs holds configuration about API log analytics.
-
-func (t *TrafficLogs) ExtractTo(api *apidef.APIDefinition)
- ExtractTo extracts *TrafficLogs into *apidef.APIDefinition.
-
-func (t *TrafficLogs) Fill(api apidef.APIDefinition)
- Fill fills *TrafficLogs from apidef.APIDefinition.
-
type TransformBody struct {
// Enabled activates transform request/request body middleware.
Enabled bool `bson:"enabled" json:"enabled"`
@@ -4573,6 +4557,36 @@
func (x *XTykAPIGateway) Fill(api apidef.APIDefinition)
Fill fills *XTykAPIGateway from apidef.APIDefinition.
+# Package: ./benthos-client
+
+package benthosClient // import "github.com/TykTechnologies/tyk/benthos-client"
+
+
+TYPES
+
+type Client struct {
+ BaseURL string
+ Username string
+ Password string
+ HTTPClient *http.Client
+}
+
+func NewClient(baseURL, username, password string) *Client
+
+func (c *Client) CreateStream(streamID string, config []byte) (map[string]interface{}, error)
+
+func (c *Client) DeleteStream(streamID string) (map[string]interface{}, error)
+
+func (c *Client) GetReady() (map[string]interface{}, error)
+
+func (c *Client) GetStream(streamID string) (map[string]interface{}, error)
+
+func (c *Client) GetStreams() (map[string]interface{}, error)
+
+func (c *Client) PatchStream(streamID string, config []byte) (map[string]interface{}, error)
+
+func (c *Client) UpdateStream(streamID string, config []byte) (map[string]interface{}, error)
+
# Package: ./certs
package certs // import "github.com/TykTechnologies/tyk/certs"
@@ -5831,7 +5845,7 @@
// EnableFixedWindow enables fixed window rate limiting.
EnableFixedWindowRateLimiter bool `json:"enable_fixed_window_rate_limiter"`
- // Redis based rate limiter with sliding log. Provides 100% rate limiting accuracy, but require two additional Redis roundtrip for each request.
+ // Redis based rate limiter with fixed window. Provides 100% rate limiting accuracy, but require two additional Redis roundtrip for each request.
EnableRedisRollingLimiter bool `json:"enable_redis_rolling_limiter"`
// To enable, set to `true`. The sentinel-based rate limiter delivers a smoother performance curve as rate-limit calculations happen off-thread, but a stricter time-out based cool-down for clients. For example, when a throttling action is triggered, they are required to cool-down for the period of the rate limit.
@@ -7106,11 +7120,8 @@
const (
// QuotaKeyPrefix serves as a standard prefix for generating quota keys.
QuotaKeyPrefix = "quota-"
-
- // RateLimitKeyPrefix serves as a standard prefix for generating rate limiter keys.
- RateLimitKeyPrefix = rate.LimiterKeyPrefix
-
- // SentinelRateLimitKeyPostfix is appended to the rate limiting key to combine into a sentinel key.
+ // RateLimitKeyPrefix serves as a standard prefix for generating rate limit keys.
+ RateLimitKeyPrefix = "rate-limit-"
SentinelRateLimitKeyPostfix = ".BLOCKED"
)
const (
@@ -7996,6 +8007,8 @@
type Gateway struct {
DefaultProxyMux *proxyMux
+ StreamClient *benthosClient.Client
+
DRLManager *drl.DRL
Analytics RedisAnalyticsHandler
@@ -8109,6 +8122,8 @@
func (gw *Gateway) LoadSampleAPI(def string) (spec *APISpec)
+func (gw *Gateway) LoadStreamsFromDisk() error
+
func (gw *Gateway) MarshalJSON() ([]byte, error)
func (gw *Gateway) MonitorApplicationInstrumentation()
@@ -8140,6 +8155,12 @@
func (gw *Gateway) SetupNewRelic() (app newrelic.Application)
SetupNewRelic creates new newrelic.Application instance
+func (gw *Gateway) StartStreamServer()
+
+func (gw *Gateway) StreamHandler(w http.ResponseWriter, r *http.Request)
+
+func (gw *Gateway) StreamListHandler(w http.ResponseWriter, r *http.Request)
+
func (gw *Gateway) TykNewSingleHostReverseProxy(target *url.URL, spec *APISpec, logger *logrus.Entry) *ReverseProxy
TykNewSingleHostReverseProxy returns a new ReverseProxy that rewrites URLs
to the scheme, host, and base path provided in target. If the target's path
@@ -9709,7 +9730,7 @@
func (l *SessionLimiter) Context() context.Context
-func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.SessionState, rateLimitKey string, quotaKey string, store storage.Handler, enableRL, enableQ bool, api *APISpec, dryRun bool) sessionFailReason
+func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.SessionState, rateLimitKey string, quotaKey string, store storage.Handler, enableRL, enableQ bool, globalConf *config.Config, api *APISpec, dryRun bool) sessionFailReason
ForwardMessage will enforce rate limiting, returning a non-zero
sessionFailReason if session limits have been exceeded. Key values to manage
rate are Rate and Per, e.g. Rate of 10 messages Per 10 seconds
@@ -11349,6 +11370,23 @@
func (v *Vault) Get(key string) (string, error)
+# Package: ./streams
+
+package streams // import "github.com/TykTechnologies/tyk/streams"
+
+
+TYPES
+
+type Server struct {
+ // Has unexported fields.
+}
+
+func New() *Server
+
+func (s *Server) Start()
+
+func (s *Server) Stop()
+
# Package: ./tcp
package tcp // import "github.com/TykTechnologies/tyk/tcp" |
PR Review 🔍
Code feedback:
|
PR Code Suggestions ✨
|
💥 CI tests failed 🙈git-statediff --git a/gateway/server.go b/gateway/server.go
index 8de5bac..3b3513c 100644
--- a/gateway/server.go
+++ b/gateway/server.go
@@ -32,7 +32,6 @@ import (
"github.com/TykTechnologies/tyk/streams"
"github.com/TykTechnologies/tyk/test"
- benthosClient "github.com/TykTechnologies/tyk/benthos-client"
logstashHook "github.com/bshuster-repo/logrus-logstash-hook"
"github.com/evalphobia/logrus_sentry"
graylogHook "github.com/gemnasium/logrus-graylog-hook"
@@ -42,6 +41,8 @@ import (
"github.com/sirupsen/logrus"
logrus_syslog "github.com/sirupsen/logrus/hooks/syslog"
+ benthosClient "github.com/TykTechnologies/tyk/benthos-client"
+
"github.com/TykTechnologies/tyk/internal/uuid"
"github.com/TykTechnologies/again" Please look at the run or in the Checks tab. |
// try to open the stream file on disk | ||
filename := fmt.Sprintf("%s.yaml", streamID) | ||
filepath := path.Join("streams", "active", filename) | ||
f, err := os.Open(filepath) |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression High
user-provided value
if err != nil { | ||
// if the file doesn't exist, create it | ||
if os.IsNotExist(err) { | ||
f, err = os.Create(filepath) |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression High
user-provided value
func deleteStreamFile(streamID string) error { | ||
filename := fmt.Sprintf("%s.yaml", streamID) | ||
filepath := path.Join("streams", "active", filename) | ||
err := os.Remove(filepath) |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression High
User description
Description
This PoC embedds the rather excellent Benthos streams server into Tyk, and wraps it's control API into Tyk's control API.
Benthos is a highly-robust stream processor that can handle Kafka, AMQP, MQTT, Web sockets, Unix sockets, HTTP endpoints and much more as inputs and outputs, and has a highly capable, and pluggable middleware system to enable transformations, rate limiting, introspection etc.
This PoC, when run, will spin up Tyk as normal, as well as a Benthos server (all aprt of the same binary, no separate processes here) in streams mode. It will then scan the streams/active folder for stream configurations and load them into the server.
You can use the Tyk API endpoints under
/tyk/streams/
to add, update, and remove streams dynamically, they take as input the same yaml config format that benthos uses (it's a straight pass-through). All configs are written to disk so if you restart, they are reloaded - just like apiDefs.It's worth noting that Tyk speaks to Benthos via it's REST API, which has pretty weak security, it's bound to localhost, and it does support basic auth, but I didn't have time to implement or experiment fully.
In this shape, the benthos capability and Tyk capability are still very decoupled, I think we need some kind of benthos plugin to hook Tyk APIs as an input to the benthos server (so you can do HTTP -> Event processing), as well as potentially the other way around (event -> websocket or event -> SSE) so that you can use traditional APIM on those endpoints.
Also required: a dashboard loader (should not be too hard).
As this server handles streams dynamically, it doesn't need hot-reload functionality so that complexity is managed.
Short video of it working: https://www.loom.com/share/9a1fb36840ca40059ab129a41e1bf68c?sid=9218a202-d5bc-4e02-91e3-2652d8e3ceae
API usage:
Create a stream (like in the video):
(Use
PUT
to update an existing stream, andGET
on this URI to fetch a stream)List Streams:
Delete Stream: