Often when a user subscribes to a topic in Pulsar, they are only interested in the latest value of each message key. If they read through the whole message backlog for this information, they would end up reading a lot of messages which would later be occluded by a subsequent message. This wastes time and resources.
To solve this problem Apache Pulsar 2.0 has introduced Topic Compaction. Topic compaction is a process that runs in the Pulsar broker to build a snapshot of the latest value for each key in a topic. In contrast to other similar systems, Pulsar Topic Compaction is non-destructive. The original backlog is still available for the users who want it. Pulsar gives the user complete control over when Topic Compaction occurs, by allowing them to trigger it manually via a REST endpoint.
The topic compaction process reads through the backlog of the topic, retaining only the latest message for each key in a compacted backlog. If the latest payload for a key is empty, then that key is considered empty and no message is retained for that key.
Consider the example of a stock ticker. Each time the price of a stock changes, a message is written to the topic which contains two pieces of information: the new price of the stock in the message payload and the symbol of the stock whose price has changed as the message key.
Such a topic can have two kinds of consumer. One consumer is interested in how the stock price has changed over time, perhaps so that they can apply some machine learning or even simply present the stock history to the user. These would read from the full backlog.
Other consumers, however, are only interested in the latest price of a stock, for example to calculate the total value of a user's portfolio. These consumer would read from the compacted backlog.
Figure 1. Pulsar Functions runtime diagram
To catch up to the latest topic state quickly, by reading from the compacted backlog of the topic, the client must specify readCompacted(true) when creating a consumer.
Consumer<byte[]> consumer = pulsarClient.newConsumer() .topic("persistent://finance/stockmarkets/NASDAQ") .readCompacted(true).subscriptionName("portfolio-value") .subscribe();
If the client wants to read the full backlog of the topic, they can specify readCompacted(false) or leave out any specification of readCompacted, as the default is to read the full backlog.
Whether reading from the compacted backlog or the full backlog, compaction will have no effect on subscription cursors, as the message IDs do not change. For example, given the topic in the diagram above, if a client was reading from the full backlog (as always happens if a topic has not yet been compacted), and the client reads as far as position 3 and then disconnects for a while, if compaction occurs and then the client connects again, specifying that it wants to read from the compacted backlog, they it will start reading from position 6 in the compacted backlog.
Compaction on a topic is currently triggered through a rest API call. There is a command line utility to make this call for you. For example, to start compaction on the topic used above, run:
bin/pulsar-admin topics compact persistent://finance/stockmarkets/NASDAQ
Triggering compaction will only start compaction, it will not block until the compaction process has completed. The status of the compaction process can be checked with another call, for compaction status.
bin/pulsar-admin topics compaction-status persistent://finance/stockmarkets/NASDAQ
You can also wait for compaction to complete by adding the -w flag to the compaction-status subcommand.
Topic compaction in Pulsar respects the retention period of the data in the topic. If there are messages in the full backlog which are deleted due to the retention period being past, then these message will also be unavailable in the compacted backlog. It is possible to configure infinite retention on a topic for cases where you never want messages to be removed.
We are in the process of adding support to allow users to configure compaction to run a topic reaches a certain threshold in size. We plan to release this functionality in Pulsar 2.1.
----------------------------------------------------
Thanks!
Ivan Kelly
The Splunk platform removes the barriers between data and action, empowering observability, IT and security teams to ensure their organizations are secure, resilient and innovative.
Founded in 2003, Splunk is a global company — with over 7,500 employees, Splunkers have received over 1,020 patents to date and availability in 21 regions around the world — and offers an open, extensible data platform that supports shared data across any environment so that all teams in an organization can get end-to-end visibility, with context, for every interaction and business process. Build a strong data foundation with Splunk.