Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC for Async streams support in Tyk (Kafka / MQTT / AMQP etc.) [DO NOT MERGE] #6288

Closed
wants to merge 2 commits into from

Conversation

lonelycode
Copy link
Member

@lonelycode lonelycode commented May 16, 2024

User description

Description

This PoC embedds the rather excellent Benthos streams server into Tyk, and wraps it's control API into Tyk's control API.

Benthos is a highly-robust stream processor that can handle Kafka, AMQP, MQTT, Web sockets, Unix sockets, HTTP endpoints and much more as inputs and outputs, and has a highly capable, and pluggable middleware system to enable transformations, rate limiting, introspection etc.

This PoC, when run, will spin up Tyk as normal, as well as a Benthos server (all aprt of the same binary, no separate processes here) in streams mode. It will then scan the streams/active folder for stream configurations and load them into the server.

You can use the Tyk API endpoints under /tyk/streams/ to add, update, and remove streams dynamically, they take as input the same yaml config format that benthos uses (it's a straight pass-through). All configs are written to disk so if you restart, they are reloaded - just like apiDefs.

It's worth noting that Tyk speaks to Benthos via it's REST API, which has pretty weak security, it's bound to localhost, and it does support basic auth, but I didn't have time to implement or experiment fully.

In this shape, the benthos capability and Tyk capability are still very decoupled, I think we need some kind of benthos plugin to hook Tyk APIs as an input to the benthos server (so you can do HTTP -> Event processing), as well as potentially the other way around (event -> websocket or event -> SSE) so that you can use traditional APIM on those endpoints.

Also required: a dashboard loader (should not be too hard).

As this server handles streams dynamically, it doesn't need hot-reload functionality so that complexity is managed.

Short video of it working: https://www.loom.com/share/9a1fb36840ca40059ab129a41e1bf68c?sid=9218a202-d5bc-4e02-91e3-2652d8e3ceae

API usage:

Create a stream (like in the video):

(Use PUT to update an existing stream, and GET on this URI to fetch a stream)

curl --request POST \
  --url http://localhost:80/tyk/streams/test \
  --header 'content-type: text/plain' \
  --header 'x-tyk-authorization: 352d20ee67be67f6340b4c0605b044b7' \
  --data '---
input:
  kafka:
    consumer_group: "main"
    addresses:
      - localhost:9092
    topics:
      - my_topic
pipeline:
  threads: 1
  processors:
    - mapping: root = content().uppercase()
output:
  label: "socket"
  socket:
    network: "unix"
    address: /tmp/benthos1.sock
    codec: lines
' 

List Streams:

curl --request GET \
  --url http://localhost:80/tyk/streams \
  --header 'x-tyk-authorization: 352d20ee67be67f6340b4c0605b044b7'

Delete Stream:


curl --request DELETE \
  --url http://localhost:80/tyk/streams/test \
  --header 'content-type: text/plain' \
  --header 'x-tyk-authorization: 352d20ee67be67f6340b4c0605b044b7'

Copy link
Contributor

PR Description updated to latest commit (e383125)

Copy link
Contributor

API Changes

--- prev.txt	2024-05-16 15:34:54.530972937 +0000
+++ current.txt	2024-05-16 15:34:47.158905916 +0000
@@ -3154,12 +3154,6 @@
 	// TransformResponseHeaders contains the configurations related to API level response header transformation.
 	// Tyk classic API definition: `global_response_headers`/`global_response_headers_remove`.
 	TransformResponseHeaders *TransformHeaders `bson:"transformResponseHeaders,omitempty" json:"transformResponseHeaders,omitempty"`
-
-	// ContextVariables contains the configuration related to Tyk context variables.
-	ContextVariables *ContextVariables `bson:"contextVariables,omitempty" json:"contextVariables,omitempty"`
-
-	// TrafficLogs contains the configurations related to API level log analytics.
-	TrafficLogs *TrafficLogs `bson:"trafficLogs,omitempty" json:"trafficLogs,omitempty"`
 }
     Global contains configuration that affects the whole API (all endpoints).
 
@@ -3993,6 +3987,9 @@
 	//
 	// Tyk classic API definition: `event_handlers`
 	EventHandlers EventHandlers `bson:"eventHandlers,omitempty" json:"eventHandlers,omitempty"`
+
+	// ContextVariables contains the configuration related to Tyk context variables.
+	ContextVariables *ContextVariables `bson:"contextVariables,omitempty" json:"contextVariables,omitempty"`
 }
     Server contains the configuration that sets Tyk up to receive requests from
     the client applications.
@@ -4219,19 +4216,6 @@
 func (i *TrackEndpoint) Fill(meta apidef.TrackEndpointMeta)
     Fill fills *TrackEndpoint receiver with data from apidef.TrackEndpointMeta.
 
-type TrafficLogs struct {
-	// Enabled enables traffic log analytics for the API.
-	// Tyk classic API definition: `do_not_track`.
-	Enabled bool `bson:"enabled" json:"enabled"`
-}
-    TrafficLogs holds configuration about API log analytics.
-
-func (t *TrafficLogs) ExtractTo(api *apidef.APIDefinition)
-    ExtractTo extracts *TrafficLogs into *apidef.APIDefinition.
-
-func (t *TrafficLogs) Fill(api apidef.APIDefinition)
-    Fill fills *TrafficLogs from apidef.APIDefinition.
-
 type TransformBody struct {
 	// Enabled activates transform request/request body middleware.
 	Enabled bool `bson:"enabled" json:"enabled"`
@@ -4573,6 +4557,36 @@
 func (x *XTykAPIGateway) Fill(api apidef.APIDefinition)
     Fill fills *XTykAPIGateway from apidef.APIDefinition.
 
+# Package: ./benthos-client
+
+package benthosClient // import "github.com/TykTechnologies/tyk/benthos-client"
+
+
+TYPES
+
+type Client struct {
+	BaseURL    string
+	Username   string
+	Password   string
+	HTTPClient *http.Client
+}
+
+func NewClient(baseURL, username, password string) *Client
+
+func (c *Client) CreateStream(streamID string, config []byte) (map[string]interface{}, error)
+
+func (c *Client) DeleteStream(streamID string) (map[string]interface{}, error)
+
+func (c *Client) GetReady() (map[string]interface{}, error)
+
+func (c *Client) GetStream(streamID string) (map[string]interface{}, error)
+
+func (c *Client) GetStreams() (map[string]interface{}, error)
+
+func (c *Client) PatchStream(streamID string, config []byte) (map[string]interface{}, error)
+
+func (c *Client) UpdateStream(streamID string, config []byte) (map[string]interface{}, error)
+
 # Package: ./certs
 
 package certs // import "github.com/TykTechnologies/tyk/certs"
@@ -5831,7 +5845,7 @@
 	// EnableFixedWindow enables fixed window rate limiting.
 	EnableFixedWindowRateLimiter bool `json:"enable_fixed_window_rate_limiter"`
 
-	// Redis based rate limiter with sliding log. Provides 100% rate limiting accuracy, but require two additional Redis roundtrip for each request.
+	// Redis based rate limiter with fixed window. Provides 100% rate limiting accuracy, but require two additional Redis roundtrip for each request.
 	EnableRedisRollingLimiter bool `json:"enable_redis_rolling_limiter"`
 
 	// To enable, set to `true`. The sentinel-based rate limiter delivers a smoother performance curve as rate-limit calculations happen off-thread, but a stricter time-out based cool-down for clients. For example, when a throttling action is triggered, they are required to cool-down for the period of the rate limit.
@@ -7106,11 +7120,8 @@
 const (
 	// QuotaKeyPrefix serves as a standard prefix for generating quota keys.
 	QuotaKeyPrefix = "quota-"
-
-	// RateLimitKeyPrefix serves as a standard prefix for generating rate limiter keys.
-	RateLimitKeyPrefix = rate.LimiterKeyPrefix
-
-	// SentinelRateLimitKeyPostfix is appended to the rate limiting key to combine into a sentinel key.
+	// RateLimitKeyPrefix serves as a standard prefix for generating rate limit keys.
+	RateLimitKeyPrefix          = "rate-limit-"
 	SentinelRateLimitKeyPostfix = ".BLOCKED"
 )
 const (
@@ -7996,6 +8007,8 @@
 type Gateway struct {
 	DefaultProxyMux *proxyMux
 
+	StreamClient *benthosClient.Client
+
 	DRLManager *drl.DRL
 
 	Analytics            RedisAnalyticsHandler
@@ -8109,6 +8122,8 @@
 
 func (gw *Gateway) LoadSampleAPI(def string) (spec *APISpec)
 
+func (gw *Gateway) LoadStreamsFromDisk() error
+
 func (gw *Gateway) MarshalJSON() ([]byte, error)
 
 func (gw *Gateway) MonitorApplicationInstrumentation()
@@ -8140,6 +8155,12 @@
 func (gw *Gateway) SetupNewRelic() (app newrelic.Application)
     SetupNewRelic creates new newrelic.Application instance
 
+func (gw *Gateway) StartStreamServer()
+
+func (gw *Gateway) StreamHandler(w http.ResponseWriter, r *http.Request)
+
+func (gw *Gateway) StreamListHandler(w http.ResponseWriter, r *http.Request)
+
 func (gw *Gateway) TykNewSingleHostReverseProxy(target *url.URL, spec *APISpec, logger *logrus.Entry) *ReverseProxy
     TykNewSingleHostReverseProxy returns a new ReverseProxy that rewrites URLs
     to the scheme, host, and base path provided in target. If the target's path
@@ -9709,7 +9730,7 @@
 
 func (l *SessionLimiter) Context() context.Context
 
-func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.SessionState, rateLimitKey string, quotaKey string, store storage.Handler, enableRL, enableQ bool, api *APISpec, dryRun bool) sessionFailReason
+func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.SessionState, rateLimitKey string, quotaKey string, store storage.Handler, enableRL, enableQ bool, globalConf *config.Config, api *APISpec, dryRun bool) sessionFailReason
     ForwardMessage will enforce rate limiting, returning a non-zero
     sessionFailReason if session limits have been exceeded. Key values to manage
     rate are Rate and Per, e.g. Rate of 10 messages Per 10 seconds
@@ -11349,6 +11370,23 @@
 
 func (v *Vault) Get(key string) (string, error)
 
+# Package: ./streams
+
+package streams // import "github.com/TykTechnologies/tyk/streams"
+
+
+TYPES
+
+type Server struct {
+	// Has unexported fields.
+}
+
+func New() *Server
+
+func (s *Server) Start()
+
+func (s *Server) Stop()
+
 # Package: ./tcp
 
 package tcp // import "github.com/TykTechnologies/tyk/tcp"

Copy link
Contributor

PR Review 🔍

⏱️ Estimated effort to review [1-5]

4, due to the integration of a new system (Benthos) within Tyk, the complexity of handling multiple stream protocols, and the security implications of the new endpoints. The PR involves significant changes across multiple files, including new API endpoints and background services, which increases the review complexity.

🧪 Relevant tests

No

⚡ Possible issues

Possible Bug: The StreamHandler method in gateway/streams_api.go does not handle the case where the HTTP method is not one of the expected ones (GET, POST, PUT, DELETE). This could lead to uninitialized obj and code variables, resulting in a potential server error.

Error Handling: In gateway/streams_api.go, the error from deleteStreamFile is directly returned if it occurs, which might expose internal path or file system details to the client. It's better to log this error internally and return a generic error message to the client.

🔒 Security concerns

- Sensitive information exposure: The configuration in streams/config.yaml sets basic_auth as disabled and includes a password hash and salt in the configuration file, which is insecure especially if the file gets exposed or mishandled. Additionally, the password hashing mechanism might not be secure enough depending on how it's implemented in the system.

Code feedback:
relevant filegateway/streams_api.go
suggestion      

Handle unsupported HTTP methods in StreamHandler to avoid uninitialized variables. You can return a 405 Method Not Allowed status when an unexpected method is used. This change enhances the robustness of the API endpoint by ensuring all HTTP methods are explicitly handled. [important]

relevant lineswitch r.Method {

relevant filegateway/streams_api.go
suggestion      

Modify the error handling in deleteStreamFile to log the error internally and return a generic error message to the client. This prevents potential exposure of sensitive file system details and improves the security posture of the application. [important]

relevant linereturn err, http.StatusInternalServerError

relevant filestreams/config.yaml
suggestion      

Consider enabling basic authentication and ensure that the password is not stored in a configuration file as a hash. Use environment variables or a secure vault solution to handle sensitive credentials. This change is crucial to prevent unauthorized access and potential leakage of sensitive information. [important]

relevant lineenabled: false

Copy link
Contributor

PR Code Suggestions ✨

CategorySuggestion                                                                                                                                    Score
Security
Enhance the security of password storage

To improve security, consider encrypting the password stored in the Client struct or use a
more secure method of authentication.

benthos-client/benthos_client.go [22]

-Password:   password,
+Password:   encrypt(password),
 
Suggestion importance[1-10]: 8

Why: Storing passwords in plain text within a struct can lead to security vulnerabilities. Encrypting the password or using a more secure authentication method is a valid and important suggestion.

8
Add rate limiting to stream handling endpoints to prevent abuse

Implement rate limiting or other protective measures in the StreamHandler and
StreamListHandler to prevent abuse of these endpoints.

gateway/streams_api.go [15]

 func (gw *Gateway) StreamHandler(w http.ResponseWriter, r *http.Request) {
+    if !rateLimiter.Allow() {
+        http.Error(w, "Too many requests", http.StatusTooManyRequests)
+        return
+    }
 
Suggestion importance[1-10]: 8

Why: Adding rate limiting to protect endpoints from abuse is a crucial security measure, especially for public-facing APIs. This suggestion is highly relevant and would help in safeguarding the system against potential abuse.

8
Maintainability
Improve error logging for better traceability

Instead of printing the error directly in the doRequest function, consider logging it with
more context or handling it in a way that can be traced back more easily in larger
systems.

benthos-client/benthos_client.go [55]

-fmt.Println("error reading response: ", err)
+log.Errorf("Error reading response in doRequest: %v", err)
 
Suggestion importance[1-10]: 7

Why: The suggestion to improve error logging for better traceability is valid. Using structured logging instead of simple print statements can greatly enhance debugging and operational monitoring.

7

Copy link
Contributor

💥 CI tests failed 🙈

git-state

diff --git a/gateway/server.go b/gateway/server.go
index 8de5bac..3b3513c 100644
--- a/gateway/server.go
+++ b/gateway/server.go
@@ -32,7 +32,6 @@ import (
 	"github.com/TykTechnologies/tyk/streams"
 	"github.com/TykTechnologies/tyk/test"
 
-	benthosClient "github.com/TykTechnologies/tyk/benthos-client"
 	logstashHook "github.com/bshuster-repo/logrus-logstash-hook"
 	"github.com/evalphobia/logrus_sentry"
 	graylogHook "github.com/gemnasium/logrus-graylog-hook"
@@ -42,6 +41,8 @@ import (
 	"github.com/sirupsen/logrus"
 	logrus_syslog "github.com/sirupsen/logrus/hooks/syslog"
 
+	benthosClient "github.com/TykTechnologies/tyk/benthos-client"
+
 	"github.com/TykTechnologies/tyk/internal/uuid"
 
 	"github.com/TykTechnologies/again"

Please look at the run or in the Checks tab.

// try to open the stream file on disk
filename := fmt.Sprintf("%s.yaml", streamID)
filepath := path.Join("streams", "active", filename)
f, err := os.Open(filepath)

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.
if err != nil {
// if the file doesn't exist, create it
if os.IsNotExist(err) {
f, err = os.Create(filepath)

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.
func deleteStreamFile(streamID string) error {
filename := fmt.Sprintf("%s.yaml", streamID)
filepath := path.Join("streams", "active", filename)
err := os.Remove(filepath)

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.
buger added a commit that referenced this pull request May 18, 2024
@buger buger mentioned this pull request May 18, 2024
@buger buger closed this May 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants