When one publish request is a batch request containing 10 messages then we get:
CountDownLatch countDownLatch = new CountDownLatch(4 * 120); ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5); ScheduledFuture<?> handle = scheduler.scheduleAtFixedRate(() -> { try { publishWithBatchSettingsExample(projectId, topicId); countDownLatch.countDown(); } catch (IOException | ExecutionException | InterruptedException e) { throw new RuntimeException(e); } }, 10, 250, TimeUnit.MILLISECONDS); countDownLatch.await(); handle.cancel(false);
private static void publishWithBatchSettingsExample(String projectId, String topicId) throws IOException, ExecutionException, InterruptedException { TopicName topicName = TopicName.of(projectId, topicId); Publisher publisher = null; List<ApiFuture<String>> messageIdFutures = new ArrayList<>(); try { // Batch settings control how the publisher batches messages long requestBytesThreshold = 5000L; // default : 1000 bytes long messageCountBatchSize = 10L; // default : 100 message Duration publishDelayThreshold = Duration.ofMillis(500); // default : 1 ms // Publish request get triggered based on request size, messages count & time since last // publish, whichever condition is met first. BatchingSettings batchingSettings = BatchingSettings.newBuilder() .setElementCountThreshold(messageCountBatchSize) .setRequestByteThreshold(requestBytesThreshold) .setDelayThreshold(publishDelayThreshold) .build(); // Create a publisher instance with default settings bound to the topic publisher = Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build(); // schedule publishing one message at a time : messages get automatically batched for (int i = 0; i < 10; i++) { String message = "message " + i; ByteString data = ByteString.copyFromUtf8(message); PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); // Once published, returns a server-assigned message id (unique within the topic) ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage); messageIdFutures.add(messageIdFuture); } } finally { // Wait on any pending publish requests. List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get(); LOGGER.info("Published " + messageIds.size() + " messages with batch settings."); if (publisher != null) { // When finished with the publisher, shutdown to free up resources. publisher.shutdown(); publisher.awaitTermination(1, TimeUnit.MINUTES); } } }