GCP publish message vs publish request

When one publish request is a batch request containing 10 messages then we get:

    CountDownLatch countDownLatch = new CountDownLatch(4 * 120);
    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);

    ScheduledFuture<?> handle = scheduler.scheduleAtFixedRate(() -> {
        try {
            publishWithBatchSettingsExample(projectId, topicId);
            countDownLatch.countDown();
        } catch (IOException | ExecutionException | InterruptedException e) {
            throw new RuntimeException(e);
        }
    }, 10, 250, TimeUnit.MILLISECONDS);

    countDownLatch.await();
    handle.cancel(false);
    private static void publishWithBatchSettingsExample(String projectId, String topicId)
            throws IOException, ExecutionException, InterruptedException {
        TopicName topicName = TopicName.of(projectId, topicId);
        Publisher publisher = null;
        List<ApiFuture<String>> messageIdFutures = new ArrayList<>();

        try {
            // Batch settings control how the publisher batches messages
            long requestBytesThreshold = 5000L; // default : 1000 bytes
            long messageCountBatchSize = 10L; // default : 100 message

            Duration publishDelayThreshold = Duration.ofMillis(500); // default : 1 ms

            // Publish request get triggered based on request size, messages count & time since last
            // publish, whichever condition is met first.
            BatchingSettings batchingSettings =
                    BatchingSettings.newBuilder()
                            .setElementCountThreshold(messageCountBatchSize)
                            .setRequestByteThreshold(requestBytesThreshold)
                            .setDelayThreshold(publishDelayThreshold)
                            .build();

            // Create a publisher instance with default settings bound to the topic
            publisher = Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build();

            // schedule publishing one message at a time : messages get automatically batched
            for (int i = 0; i < 10; i++) {
                String message = "message " + i;
                ByteString data = ByteString.copyFromUtf8(message);
                PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

                // Once published, returns a server-assigned message id (unique within the topic)
                ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
                messageIdFutures.add(messageIdFuture);
            }
        } finally {
            // Wait on any pending publish requests.
            List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();

            LOGGER.info("Published " + messageIds.size() + " messages with batch settings.");

            if (publisher != null) {
                // When finished with the publisher, shutdown to free up resources.
                publisher.shutdown();
                publisher.awaitTermination(1, TimeUnit.MINUTES);
            }
        }
    }

Leave a comment