Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connect fails when using merge #325

Open
dalianzhu opened this issue Aug 3, 2021 · 0 comments
Open

Connect fails when using merge #325

dalianzhu opened this issue Aug 3, 2021 · 0 comments
Assignees
Labels
question Question regarding how RxGo is working etc.

Comments

@dalianzhu
Copy link

dalianzhu commented Aug 3, 2021

I created a source and created a new stream based on this source, using rxgo.WithPublishStrategy() and connect, But the result does not match the expectation, sometimes it gets stuck. Did I understand Connect right?
code:

        // yes , it works well with Defer, but how do I use create and connect
	obSource := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
		for _, v := range []int{1, 2, 3} {
			next <- rxgo.Of(v)
		}
	}}, rxgo.WithPublishStrategy())

	ob2 := obSource.Map(func(c context.Context, i interface{}) (interface{}, error) {
		return fmt.Sprintf("ob2:%v", i), nil
	})

	var wg sync.WaitGroup
	wg.Add(1)
	rxgo.Merge([]rxgo.Observable{
		obSource,
		ob2,
	}).Map(func(c context.Context, i interface{}) (interface{}, error) {
		log.Printf("map:%v\n", i)
		return i, nil
	}).DoOnCompleted(func() {
		wg.Done()
	})
        // 	time.Sleep(time.Millisecond)  // Add this line and it's ok, Is there a better way?
	obSource.Connect(context.Background())
	wg.Wait()

result:

expected:
2021/08/03 22:44:07 map:ob2:1
2021/08/03 22:44:07 map:1
2021/08/03 22:44:07 map:ob2:2
2021/08/03 22:44:07 map:2
2021/08/03 22:44:07 map:ob2:3
2021/08/03 22:44:07 map:3

// The results are random and sometimes get stuck
2021/08/03 22:41:48 map:ob2:1
2021/08/03 22:41:48 map:2
2021/08/03 22:41:48 map:ob2:2
2021/08/03 22:41:48 map:3
2021/08/03 22:41:48 map:ob2:3
@dalianzhu dalianzhu added the question Question regarding how RxGo is working etc. label Aug 3, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Question regarding how RxGo is working etc.
Projects
None yet
Development

No branches or pull requests

2 participants