Pipeline writing numbers 1 – 10000 as strings to Pubsub:
writingPipeline.apply(Create.of(IntStream.rangeClosed(1, 10000).mapToObj(String::valueOf).collect(Collectors.toList()))) .apply(ParDo.of(new CreatePubsubMessageFn())) // new PubsubMessage(word.getBytes()) .apply(PubsubIO.writeMessages().to(writeOptions.getTopic()));
Streaming pipeline reading from Pubsub, applying 5 sec fixed window, grouping read elements into 4 groups: a, b, c, d as following a: 1-9, b: 10-99, c: 100-999 and d: 1000-10000 and writing using FileOI into txt files with 4 shards:
readingPipeline.apply(PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription(readOptions.getSubscription())) .apply(Window.into(FixedWindows.of(Duration.standardSeconds(5)))) .apply("Log Window", ParDo.of(new DoFn<PubsubMessage, KV<String, String>>() { @ProcessElement public void process(@Element PubsubMessage e, OutputReceiver<KV<String, String>> receiver, @Timestamp Instant ts, BoundedWindow w, PaneInfo p) { int i = Integer.parseInt(new String(e.getPayload())); String key = i < 10 ? "a" : i < 100 ? "b" : i < 1000 ? "c" : "d"; KV<String, String> kv = KV.of(key, String.valueOf(i)); LOGGER.info("[{}][Window] {}:{},ts={},w={},p={}", ipAddressAndThread(), key, i, ts, windowToString(w), p); receiver.output(kv); } })) .apply(FileIO .<String, KV<String, String>>writeDynamic() .by(KV::getKey) .via(Contextful.fn(KV::getValue), TextIO.sink()) .withDestinationCoder(StringUtf8Coder.of()) .withNaming(subPath -> new MyFileNaming(subPath, ".txt")) .to(readOptions.getOutput()) .withTempDirectory(readOptions.getTemp()) .withNumShards(4) );
and
static class MyFileNaming implements FileIO.Write.FileNaming { private final String subPath; private final String extension; public MyFileNaming(String subPath, String extension) { this.subPath = subPath; this.extension = extension; } @Override public String getFilename(BoundedWindow w, PaneInfo p, int numShards, int shardIndex, Compression compression) { String filename = String.format("%s-winMaxTs-%s-shard-%s-of-%s--%s-%s%s", subPath, w.maxTimestamp().toString().replace(":","_").replace(" ","_"), shardIndex, numShards, getLocalHostAddressSpaced(), currentThread().getName(), extension); return filename; } }
Generated 16 shards – 4 shards for each key
a-winMaxTs-2023-01-20T10_48_04.999Z-shard-0-of-4--10_128_0_112-Thread-17.txt a-winMaxTs-2023-01-20T10_48_04.999Z-shard-1-of-4--10_128_0_112-Thread-16.txt a-winMaxTs-2023-01-20T10_48_04.999Z-shard-2-of-4--10_128_0_112-Thread-16.txt a-winMaxTs-2023-01-20T10_48_04.999Z-shard-3-of-4--10_128_0_112-Thread-16.txt b-winMaxTs-2023-01-20T10_48_04.999Z-shard-0-of-4--10_128_0_112-Thread-17.txt b-winMaxTs-2023-01-20T10_48_04.999Z-shard-1-of-4--10_128_0_112-Thread-16.txt b-winMaxTs-2023-01-20T10_48_04.999Z-shard-2-of-4--10_128_0_112-Thread-16.txt b-winMaxTs-2023-01-20T10_48_04.999Z-shard-3-of-4--10_128_0_112-Thread-17.txt c-winMaxTs-2023-01-20T10_48_04.999Z-shard-0-of-4--10_128_0_112-Thread-16.txt c-winMaxTs-2023-01-20T10_48_04.999Z-shard-1-of-4--10_128_0_112-Thread-17.txt c-winMaxTs-2023-01-20T10_48_04.999Z-shard-2-of-4--10_128_0_112-Thread-16.txt c-winMaxTs-2023-01-20T10_48_04.999Z-shard-3-of-4--10_128_0_112-Thread-17.txt d-winMaxTs-2023-01-20T10_48_04.999Z-shard-0-of-4--10_128_0_112-Thread-16.txt d-winMaxTs-2023-01-20T10_48_04.999Z-shard-1-of-4--10_128_0_112-Thread-18.txt d-winMaxTs-2023-01-20T10_48_04.999Z-shard-2-of-4--10_128_0_112-Thread-17.txt d-winMaxTs-2023-01-20T10_48_04.999Z-shard-3-of-4--10_128_0_112-Thread-16.txt
If the window was 2 secs instead of 5 secs, then we could have 4 shards for
first window [2023-01-20T16_29_28.000, 2023-01-20T16_29_30.000) and 4 shards for second window [2023-01-20T16_29_30.000, 2023-01-20T16_29_32.000), e.g.:
d-winMaxTs-2023-01-20T16_29_29.999Z-shard-0-of-4--10_128_0_29-Thread-17.txt d-winMaxTs-2023-01-20T16_29_29.999Z-shard-1-of-4--10_128_0_29-Thread-17.txt d-winMaxTs-2023-01-20T16_29_29.999Z-shard-2-of-4--10_128_0_29-Thread-17.txt d-winMaxTs-2023-01-20T16_29_29.999Z-shard-3-of-4--10_128_0_29-Thread-16.txt d-winMaxTs-2023-01-20T16_29_31.999Z-shard-0-of-4--10_128_0_29-Thread-18.txt d-winMaxTs-2023-01-20T16_29_31.999Z-shard-1-of-4--10_128_0_29-Thread-18.txt d-winMaxTs-2023-01-20T16_29_31.999Z-shard-2-of-4--10_128_0_29-Thread-18.txt d-winMaxTs-2023-01-20T16_29_31.999Z-shard-3-of-4--10_128_0_28-Thread-16.txt
Coming back to 5 sec window (usually captures as elements),
when
.withNumShards(4)
is changed to:
.withNumShards(2)
2 shards are generated for each key for each window:
a-winMaxTs-2023-01-20T12_10_49.999Z-shard-0-of-2–10_128_0_16-Thread-19.txt
a-winMaxTs-2023-01-20T12_10_49.999Z-shard-1-of-2–10_128_0_15-Thread-18.txt
b-winMaxTs-2023-01-20T12_10_49.999Z-shard-0-of-2–10_128_0_15-Thread-16.txt
b-winMaxTs-2023-01-20T12_10_49.999Z-shard-1-of-2–10_128_0_16-Thread-19.txt
c-winMaxTs-2023-01-20T12_10_49.999Z-shard-0-of-2–10_128_0_15-Thread-18.txt
c-winMaxTs-2023-01-20T12_10_49.999Z-shard-1-of-2–10_128_0_16-Thread-16.txt
d-winMaxTs-2023-01-20T12_10_49.999Z-shard-0-of-2–10_128_0_16-Thread-19.txt
d-winMaxTs-2023-01-20T12_10_49.999Z-shard-1-of-2–10_128_0_16-Thread-16.txt
When
.withNumShards(4)
is changed to:
.withNumShards(0)
then the runner will determine number of shards per key (assuming Streaming Engine is used), in that case we get 20 shards (additional 4 shards for hottest key d):
a-winMaxTs-2023-01-20T10_27_34.999Z-shard-0-of-4--10_128_0_7-Thread-17.txt a-winMaxTs-2023-01-20T10_27_34.999Z-shard-1-of-4--10_128_0_7-Thread-16.txt a-winMaxTs-2023-01-20T10_27_34.999Z-shard-2-of-4--10_128_0_7-Thread-16.txt a-winMaxTs-2023-01-20T10_27_34.999Z-shard-3-of-4--10_128_0_7-Thread-17.txt b-winMaxTs-2023-01-20T10_27_34.999Z-shard-0-of-4--10_128_0_7-Thread-16.txt b-winMaxTs-2023-01-20T10_27_34.999Z-shard-1-of-4--10_128_0_7-Thread-17.txt b-winMaxTs-2023-01-20T10_27_34.999Z-shard-2-of-4--10_128_0_7-Thread-17.txt b-winMaxTs-2023-01-20T10_27_34.999Z-shard-3-of-4--10_128_0_7-Thread-16.txt c-winMaxTs-2023-01-20T10_27_34.999Z-shard-0-of-4--10_128_0_7-Thread-17.txt c-winMaxTs-2023-01-20T10_27_34.999Z-shard-1-of-4--10_128_0_7-Thread-17.txt c-winMaxTs-2023-01-20T10_27_34.999Z-shard-2-of-4--10_128_0_7-Thread-17.txt c-winMaxTs-2023-01-20T10_27_34.999Z-shard-3-of-4--10_128_0_7-Thread-16.txt d-winMaxTs-2023-01-20T10_27_34.999Z-shard-0-of-4--10_128_0_7-Thread-17.txt d-winMaxTs-2023-01-20T10_27_34.999Z-shard-1-of-4--10_128_0_7-Thread-16.txt d-winMaxTs-2023-01-20T10_27_34.999Z-shard-2-of-4--10_128_0_7-Thread-17.txt d-winMaxTs-2023-01-20T10_27_34.999Z-shard-3-of-4--10_128_0_7-Thread-16.txt d-winMaxTs-2023-01-20T10_27_39.999Z-shard-0-of-4--10_128_0_7-Thread-19.txt d-winMaxTs-2023-01-20T10_27_39.999Z-shard-1-of-4--10_128_0_6-Thread-16.txt d-winMaxTs-2023-01-20T10_27_39.999Z-shard-2-of-4--10_128_0_7-Thread-19.txt d-winMaxTs-2023-01-20T10_27_39.999Z-shard-3-of-4--10_128_0_6-Thread-16.txt