What is PubsubIO watermark heuristic running on GCD?

Hey. I am trying to run a pipeline where I calculate the difference between messages that are published to pubsub with a 30 second pulse * (10K streams, each of which every 30 seconds). I do not care about the completeness of the completeness of the data, but I would like to understand what the watermark heuristic for PubsubIO (and if I can configure it) is to determine if I can ignore the late data with a fairly low loss.

* Note that the pubsub theme provides [persistent life] persistence in case we need to remove the pipeline, so it is important that the heuristic works well with an unfilled subscription.

Can someone explain how the watermark is calculated (if timestamplabel () is used) and how it can be configured, if at all?

+7
google-cloud-dataflow
source share
1 answer

Here is a brief description of how we calculate the PubSub watermark:

Our goal is to create a reasonable heuristic watermark for data sent to our streaming pipeline through PubSub. We make some assumptions about the source that sends the data to PubSub. In particular, we assume that the timestamps of the source data β€œbehave well,” in other words, we expect a limited number of timestamps on the source data before it is sent to PubSub. Any data that is sent with timestamps outside the allowed boundaries out of turn will be considered late data. In our current implementation, this binding of 10 seconds means reordering timestamps to 10 seconds before sending to pubsub will not generate late data. We call this value the evaluation area. The problem of creating a PubSub watermark then comes down to the fact that additional data will not be late due to transfer through PubSub.

What are the issues we are facing with PubSub? Since pubsub does not guarantee an order, we must have some additional metadata to know enough about the backlog. Fortunately, PubSub provides a back-log dimension in terms of "the oldest non-printable publication timestamp." This is not the same as the timestamp of the event of our message, since PubSub is independent of the metadata at the application level that is sent through it, but it is the timestamp when the message was received by PubSub.

While this dimension is like a watermark, it is not the same. We cannot just use the old obsolete publication timestamp as a watermark. These timestamps are not equal to the timestamps of the event, and in the case of sending historical (past) data, it can be arbitrarily deleted. The order for these timestamps may also be different, because, as mentioned above, we allow a limited number of reorders. However, we can use this as a measure of lag in order to find out enough information about the time stamps of the event present in the lag so that we can set a reasonable watermark as follows.

We call the subscription, which receives the data of the basic subscription. Looking at our basic subscription, we see that messages can fail. We put each message in our pubsub publish the time stamp "pt" and its time stamp "et". Please note that the two temporary domains may be unrelated.

Basic Subscription Timestamps

Some messages in the base subscription are not confirmed, creating a backlog. This may be due to the fact that they have not yet been delivered, or they may be delivered but not yet processed. Remember also that the exits from this subscription are distributed across several fragments. Thus, you can’t say, just looking at the basic subscription, what should be our watermark.

We are continuing to create a second metadata-only tracking subscription, which is used to effectively check the underlying subscription lag and accept minimum timestamps of events in the lag. By supporting a small or absent tracking subscription delay, we can check messages before the basic subsciptions oldest unakd.

enter image description here

We keep track of the tracking subscription, ensuring that pulling from this subscription is inexpensive. Conversely, if we lag behind a tracking subscription, we will stop promoting the watermark. To do this, we ensure that at least one of the following conditions is met:

  • Subscribing to a subscription is enough ahead of the base subscription, enough ahead means that the tracking subscription is ahead, at least in terms of rating range. This ensures that any limited reordering is taken into account within the estimation range.
  • Subscription to subscription is quite close to real time. In other words, there is no tracking subscription subscription.

We will report tracking subscription messages as soon as possible, as soon as we continue with the metadata about the posts and timestamps of the messages. We store this metadata in a sparse histogram format in order to minimize the amount of space used and the size of the long recording.

Finally, we guarantee that we have enough data to make a reasonable estimate of the watermark. We take a group of event timestamps with publication marks in the range

[ min ( base sub oldest unack'd, tracking sub oldest unack'd - 10 sec) , tracking sub oldest unack'd ] 

This ensures that we look at all the timestamps of the events in the lag, or if the lag is small, the most recent rating range to make a watermark rating.

Finally, the watermark value is calculated as the minimum time of the event in the strip.

Also observe that this method is correct, but creates a too conservative watermark. Since we review all messages before basic subscriptions that were the oldest unakd in the tracking subscription, we can include event timestamps in the watermark rating for messages that have already been confirmed.

In addition, there are several heuristics to ensure progress. The above method works well in the case of dense, often incoming data. In the case of rare or rare data, there may not be enough recent reports to build a reasonable estimate. If we did not see the subscription data in more than two minutes (and there is no lag), we will promote the watermark to almost real time. This ensures that the watermark and piping continue to move forward, even if no more messages are received.

All of the above ensures that as long as the reordering of the timestamp of the source event is not within the estimation range, there will be no additional data.

+8
source share

All Articles