Issue
I have two DataStreams.
First DataStream stream1 aggregates data into 1 minute windows.
DataStream<T> stream1 = input
.keyBy(t -> t.id)
.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
.aggregate(new SomeAggregate());
stream1 produces the following windows
| Window Id | Window Start | Window End |
|---|---|---|
| 1 | 00:59:00.000 | 00:59:59.999 |
| 2 | 00:00:00.000 | 00:00:59.999 |
| 3 | 00:01:00.000 | 00:01.59.999 |
| 4 | 00:02:00.000 | 00:02.59.999 |
| 5 | 00:03:00.000 | 00:02.59.999 |
| 6 | 00:04:00.000 | 00:04.59.999 |
| n | ... | ... |
Second DataStream stream2 aggregates stream1 data into 5 minute windows/aggregations.
DataStream<T> stream2 = stream1
.keyBy(t -> t.id)
.window(TumblingProcessingTimeWindows.of(Time.seconds(300)))
.aggregate(new SomeAggregate());
stream2 produces the following windows
| Window Id | Window Start | Window End |
|---|---|---|
| 1 | 00:55:00.000 | 00:59:59.999 |
| 2 | 00:00:00.000 | 00:04:59.999 |
| n | ... | ... |
Question -
How come the following stream2 window
| Window Id | Window Start | Window End |
|---|---|---|
| 1 | 00:00:00.000 | 00:04:59.999 |
Contains 00:59:00.000 - 00:59:59.999 (Id 1) window from stream1?
| Window Id | Window Start | Window End |
|---|---|---|
| 1 | 00:59:00.000 | 00:59:59.999 |
| 2 | 00:00:00.000 | 00:00:59.999 |
| 3 | 00:01:00.000 | 00:01.59.999 |
| 4 | 00:02:00.000 | 00:02.59.999 |
| 5 | 00:03:00.000 | 00:02.59.999 |
Shouldn't 00:59:00.000 - 00:59:59.999 window belong to 00:55:00.000 - 00:59:59.999 stream2 window?
I was expecting to see these windows instead.
| Window Id | Window Start | Window End |
|---|---|---|
| 2 | 00:00:00.000 | 00:00:59.999 |
| 3 | 00:01:00.000 | 00:01.59.999 |
| 4 | 00:02:00.000 | 00:02.59.999 |
| 5 | 00:03:00.000 | 00:02.59.999 |
| 6 | 00:04:00.000 | 00:04.59.999 |
So I am not sure what I am doing wrong. Given start X and end Y, I only want to see windows that had start and end time between X and Y, not older or newer.
Solution
This is happening because you are using processing time windows.
The window for 00:59:00.000 to 00:59:59.999 for stream1 is being triggered as soon as the system clock reaches 01:00:00.000. By the time this window result reaches the second window, the time is at least 01:00:00.000, and so this result becomes part of the later window.
To get consistent, deterministic results, use event time windows (in combination with watermarks).
Answered By - David Anderson
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.