You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I created a source and created a new stream based on this source, using rxgo.WithPublishStrategy() and connect, But the result does not match the expectation, sometimes it gets stuck. Did I understand Connect right?
code:
// yes , it works well with Defer, but how do I use create and connect
obSource := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
for _, v := range []int{1, 2, 3} {
next <- rxgo.Of(v)
}
}}, rxgo.WithPublishStrategy())
ob2 := obSource.Map(func(c context.Context, i interface{}) (interface{}, error) {
return fmt.Sprintf("ob2:%v", i), nil
})
var wg sync.WaitGroup
wg.Add(1)
rxgo.Merge([]rxgo.Observable{
obSource,
ob2,
}).Map(func(c context.Context, i interface{}) (interface{}, error) {
log.Printf("map:%v\n", i)
return i, nil
}).DoOnCompleted(func() {
wg.Done()
})
// time.Sleep(time.Millisecond) // Add this line and it's ok, Is there a better way?
obSource.Connect(context.Background())
wg.Wait()
result:
expected:
2021/08/03 22:44:07 map:ob2:1
2021/08/03 22:44:07 map:1
2021/08/03 22:44:07 map:ob2:2
2021/08/03 22:44:07 map:2
2021/08/03 22:44:07 map:ob2:3
2021/08/03 22:44:07 map:3
// The results are random and sometimes get stuck
2021/08/03 22:41:48 map:ob2:1
2021/08/03 22:41:48 map:2
2021/08/03 22:41:48 map:ob2:2
2021/08/03 22:41:48 map:3
2021/08/03 22:41:48 map:ob2:3
The text was updated successfully, but these errors were encountered:
I created a source and created a new stream based on this source, using rxgo.WithPublishStrategy() and connect, But the result does not match the expectation, sometimes it gets stuck. Did I understand
Connect
right?code:
result:
The text was updated successfully, but these errors were encountered: