Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: A simple approach towards Stateless UDAF #16767

Open
fuyufjh opened this issue May 15, 2024 · 10 comments
Open

Proposal: A simple approach towards Stateless UDAF #16767

fuyufjh opened this issue May 15, 2024 · 10 comments
Assignees
Milestone

Comments

@fuyufjh
Copy link
Contributor

fuyufjh commented May 15, 2024

Is your feature request related to a problem? Please describe.

Share some ideas about UDAF during our discussion with @wangrunji0408

Stateless UDAF and Stateful UDAF are very different in both implementation and use cases

Describe the solution you'd like

We don’t really need to offer a framework for stateless UDAF. Instead, we can reuse the interface of UDF and accept an ARRAY as input argument.

For example, supposing the user have provided such a UDF:

def my_simple_udaf(values: list[int]):
    ...

or

class MyEvent:
    def __init__(self, ts, attr1, attr2):
        self.ts = ts
        self.attr1 = attr1
        self.attr2 = attr2

def my_fancy_udaf(rows: list[MyEvent]):
    ...

Then they can no only use it as normal UDFs but also as a UDAF in agg or window agg

select my_udaf(a, [b, ...]) from t
select my_udaf(a, [b, ...]) over (...) from t

A trivial approach is using array_agg() and then call the UDF with the result array. However, in this way, both the result array and the input rows are persisted, resulting in 2 copies of data in the state table. Besides, the array_agg() is written by users, which looks very unfriendly.

To make it more efficient and to make the syntax above just works, we need a special code path to handle these use cases.

Describe alternatives you've considered

No response

Additional context

No response

@github-actions github-actions bot added this to the release-1.10 milestone May 15, 2024
@st1page
Copy link
Contributor

st1page commented May 15, 2024

Stateless UDAF is as simple as a normal UDF, except that it accepts multiple rows at a time

Yes, but it doesn't mean that he has to physically do all the data commits in one call, which can be fatal when there is a lot of input data (maybe millions of rows?). I think the implementor of stateless UDAF comes in to complete this tradeoff, i.e. what data structure to buffer this data within the middle, or to COMPACT or intermediate computation ahead of time depending on the semantics of the aggregator.

In other words, in my mind, the implementor of stateless UDAF should implement these interfaces to achieve pipelined computation

  • init() -> void
  • accumulate(input) -> void
  • finish()->result

It is still much simpler than Stateful UDAF because it does not need to

  • pass a state in the init function
  • return the current result for each accumulate

@xxchan

This comment was marked as resolved.

@xxchan
Copy link
Member

xxchan commented May 15, 2024

If the performance of passing whole input at once is acceptable, I guess array_agg() persist twice might be also acceptable.


both the result array and the input rows are persisted, resulting in 2 copies of data in the state table.

Wait, I don't get this. Why 2 copies? I think only one copy for array_agg's state?

@fuyufjh
Copy link
Contributor Author

fuyufjh commented May 15, 2024

In other words, in my mind, the implementor of stateless UDAF should implement these interfaces to achieve pipelined computation

  • init() -> void
  • accumulate(input) -> void
  • finish()->result

Keep in mind that the stateless UDAF needs to solve the cases that cannot be incremental. This might be more common than you imagine. Many streaming jobs were migrated from batch, where the complicated algorithms are freely used and were not designed for incremental computation at all.

For a textbook example, combined with a session window, a UDAF accepts tens of events (browsing history, clicks, impressions, etc.), execute some statistics regression, and then outputs a score of the likelihood of scam.

For these algorithms that can be made incremental, risingwavelabs/arrow-udf#23 will solve it.

@xxchan
Copy link
Member

xxchan commented May 15, 2024

https://spark.apache.org/docs/latest/sql-ref-functions-udf-aggregate.html Spark UDAF is also such “incremental” api. So I’m confused about what does “cannot be incremental” and “stateless” really mean. 🤔

@xxchan
Copy link
Member

xxchan commented May 16, 2024

I just discussed with @st1page, and made some points clarified:

  • "stateless" actually means the use case for batch and EOWC. i.e., no state persisted (I think the terminology is confusing, see below.)

  • IMO whether batch or streaming, the user API should be the same.

    As we can see from Spark UDAF's interface: There are 3 types IN, BUF and OUT. There should always be an intermediate value BUF (or "state", or "accumulator"), and aggregation is always "stateful" or "incremental" (but note that the user code is stateless). I guess @fuyufjh means OUT isn't always the same as BUF?

    finish(reduction: BUF): OUT
    merge(b1: BUF, b2: BUF): BUF
    reduce(b: BUF, a: IN): BUF
    zero: BUF

    • For example, avg's BUF is (sum, cnt), and finish=div.
    • In streaming, BUF is passed via rpc, and persisted in state store. And we call finish to pass the current OUT to downstream. (Note that finish can actually be called multiple times.)
    • (The largest difference) But sometimes BUF can be very large, and we only want such UDAF to be used in batch or EOWC, and we don't want to pass it via RPC. This is why @st1page proposed the slightly different API (return BUF vs return void, i.e., let user code manage state). I proposed this can be done by the computation framework via a flag and there's no need to distinguish the API for users.
  • Another difference is retract, but I think it doesn't matter a lot.

@xxchan
Copy link
Member

xxchan commented May 16, 2024

Well, I think indeed some algorithm cannot be incremental. e.g., if you need to iterate over the whole inputs multiple times.

In this case, all computation is performed in finish, and BUF is just the whole input. Or is there any other ways to do it in batch frameworks?


Ohhh, I finally understand, the API @fuyufjh proposed is equivalent to only having finish().

So there are 3 cases:

  1. small BUF. This is OK for streaming
  2. large BUF, but can do incremental computation in accumulate. That's @st1page and me are thinking about. This is not good for streaming.
  3. extreme case of 2. large BUF and only compute in finish. That's the OP's case

@fuyufjh
Copy link
Contributor Author

fuyufjh commented May 16, 2024

I guess @fuyufjh means OUT isn't always the same as BUF?

Yeah, of course. Rather than "isn't always", I tend to say "is almost never" for user-defined functions.

This is why @st1page proposed the slightly different API (return BUF vs return void, i.e., let user code manage state). I proposed this can be done by the computation framework via a flag and there's no need to distinguish the API for users.

I understand @st1page's concern, but that would be putting the cart before the horse. The users need to ingest a non-incremental algorithm here, and this is the requirement.

If the only concern is about the data mount, we can add some limitation actually, or using stream API, etc. Let's do it in the future if necessary.

I proposed this can be done by the computation framework via a flag and there's no need to distinguish the API for users.

Again, keep in mind the algorithm is not incremental. You are basically saying that users should manually create a buffer and put rows into it for each accumulate() call. This sounds stupid for both users and us.

@fuyufjh
Copy link
Contributor Author

fuyufjh commented May 16, 2024

Besides, this idea doesn't conflict with risingwavelabs/arrow-udf#23. I'd like to offer both to users and see their reactions.

@xxchan
Copy link
Member

xxchan commented May 16, 2024

You are basically saying that users should manually create a buffer and put rows into it for each accumulate() call. This sounds stupid for both users and us.

Yes, but I'm curious how users of batch frameworks like Spark do it.. Because that seems to be the only way from the UDAF API.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants