Tag: sharining

Apache Beam and Dataflow sharding

Pipeline writing numbers 1 – 10000 as strings to Pubsub:

writingPipeline.apply(Create.of(IntStream.rangeClosed(1, 10000).mapToObj(String::valueOf).collect(Collectors.toList())))
    .apply(ParDo.of(new CreatePubsubMessageFn()))  // new PubsubMessage(word.getBytes())
    .apply(PubsubIO.writeMessages().to(writeOptions.getTopic()));

Streaming pipeline reading from Pubsub, applying 5 sec fixed window, grouping read elements into 4 groups: a, b, c, d as following a: 1-9, b: 10-99, c: 100-999 and d: 1000-10000 and writing using FileOI into txt files with 4 shards:

readingPipeline.apply(PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription(readOptions.getSubscription()))
    .apply(Window.into(FixedWindows.of(Duration.standardSeconds(5))))
    .apply("Log Window", ParDo.of(new DoFn<PubsubMessage,  KV<String, String>>() {
        @ProcessElement
        public void process(@Element PubsubMessage e, OutputReceiver<KV<String, String>> receiver, @Timestamp Instant ts, BoundedWindow w, PaneInfo p) {
        int i = Integer.parseInt(new String(e.getPayload()));
        String key = i < 10 ? "a" : i < 100 ? "b" : i < 1000 ? "c" : "d";
        KV<String, String> kv = KV.of(key, String.valueOf(i));
        LOGGER.info("[{}][Window] {}:{},ts={},w={},p={}", ipAddressAndThread(), key, i, ts, windowToString(w), p);
        receiver.output(kv);
        }
    }))
    .apply(FileIO
        .<String, KV<String, String>>writeDynamic()
        .by(KV::getKey)
        .via(Contextful.fn(KV::getValue), TextIO.sink())
        .withDestinationCoder(StringUtf8Coder.of())
        .withNaming(subPath -> new MyFileNaming(subPath, ".txt"))
        .to(readOptions.getOutput())
        .withTempDirectory(readOptions.getTemp())
        .withNumShards(4)
    );

and

    static class MyFileNaming implements FileIO.Write.FileNaming {
        private final String subPath;
        private final String extension;

        public MyFileNaming(String subPath, String extension) {
            this.subPath = subPath;
            this.extension = extension;
        }

        @Override
        public String getFilename(BoundedWindow w, PaneInfo p, int numShards, int shardIndex, Compression compression) {
            String filename = String.format("%s-winMaxTs-%s-shard-%s-of-%s--%s-%s%s", subPath, w.maxTimestamp().toString().replace(":","_").replace(" ","_"), shardIndex, numShards, getLocalHostAddressSpaced(), currentThread().getName(), extension);
            return filename;
        }
    }

Generated 16 shards – 4 shards for each key

a-winMaxTs-2023-01-20T10_48_04.999Z-shard-0-of-4--10_128_0_112-Thread-17.txt
a-winMaxTs-2023-01-20T10_48_04.999Z-shard-1-of-4--10_128_0_112-Thread-16.txt
a-winMaxTs-2023-01-20T10_48_04.999Z-shard-2-of-4--10_128_0_112-Thread-16.txt
a-winMaxTs-2023-01-20T10_48_04.999Z-shard-3-of-4--10_128_0_112-Thread-16.txt
b-winMaxTs-2023-01-20T10_48_04.999Z-shard-0-of-4--10_128_0_112-Thread-17.txt
b-winMaxTs-2023-01-20T10_48_04.999Z-shard-1-of-4--10_128_0_112-Thread-16.txt
b-winMaxTs-2023-01-20T10_48_04.999Z-shard-2-of-4--10_128_0_112-Thread-16.txt
b-winMaxTs-2023-01-20T10_48_04.999Z-shard-3-of-4--10_128_0_112-Thread-17.txt
c-winMaxTs-2023-01-20T10_48_04.999Z-shard-0-of-4--10_128_0_112-Thread-16.txt
c-winMaxTs-2023-01-20T10_48_04.999Z-shard-1-of-4--10_128_0_112-Thread-17.txt
c-winMaxTs-2023-01-20T10_48_04.999Z-shard-2-of-4--10_128_0_112-Thread-16.txt
c-winMaxTs-2023-01-20T10_48_04.999Z-shard-3-of-4--10_128_0_112-Thread-17.txt
d-winMaxTs-2023-01-20T10_48_04.999Z-shard-0-of-4--10_128_0_112-Thread-16.txt
d-winMaxTs-2023-01-20T10_48_04.999Z-shard-1-of-4--10_128_0_112-Thread-18.txt
d-winMaxTs-2023-01-20T10_48_04.999Z-shard-2-of-4--10_128_0_112-Thread-17.txt
d-winMaxTs-2023-01-20T10_48_04.999Z-shard-3-of-4--10_128_0_112-Thread-16.txt

If the window was 2 secs instead of 5 secs, then we could have 4 shards for
first window [2023-01-20T16_29_28.000, 2023-01-20T16_29_30.000) and 4 shards for second window [2023-01-20T16_29_30.000, 2023-01-20T16_29_32.000), e.g.:

d-winMaxTs-2023-01-20T16_29_29.999Z-shard-0-of-4--10_128_0_29-Thread-17.txt
d-winMaxTs-2023-01-20T16_29_29.999Z-shard-1-of-4--10_128_0_29-Thread-17.txt
d-winMaxTs-2023-01-20T16_29_29.999Z-shard-2-of-4--10_128_0_29-Thread-17.txt
d-winMaxTs-2023-01-20T16_29_29.999Z-shard-3-of-4--10_128_0_29-Thread-16.txt
d-winMaxTs-2023-01-20T16_29_31.999Z-shard-0-of-4--10_128_0_29-Thread-18.txt
d-winMaxTs-2023-01-20T16_29_31.999Z-shard-1-of-4--10_128_0_29-Thread-18.txt
d-winMaxTs-2023-01-20T16_29_31.999Z-shard-2-of-4--10_128_0_29-Thread-18.txt
d-winMaxTs-2023-01-20T16_29_31.999Z-shard-3-of-4--10_128_0_28-Thread-16.txt

Coming back to 5 sec window (usually captures as elements),
when

.withNumShards(4)

is changed to:

.withNumShards(2)

2 shards are generated for each key for each window:
a-winMaxTs-2023-01-20T12_10_49.999Z-shard-0-of-2–10_128_0_16-Thread-19.txt
a-winMaxTs-2023-01-20T12_10_49.999Z-shard-1-of-2–10_128_0_15-Thread-18.txt
b-winMaxTs-2023-01-20T12_10_49.999Z-shard-0-of-2–10_128_0_15-Thread-16.txt
b-winMaxTs-2023-01-20T12_10_49.999Z-shard-1-of-2–10_128_0_16-Thread-19.txt
c-winMaxTs-2023-01-20T12_10_49.999Z-shard-0-of-2–10_128_0_15-Thread-18.txt
c-winMaxTs-2023-01-20T12_10_49.999Z-shard-1-of-2–10_128_0_16-Thread-16.txt
d-winMaxTs-2023-01-20T12_10_49.999Z-shard-0-of-2–10_128_0_16-Thread-19.txt
d-winMaxTs-2023-01-20T12_10_49.999Z-shard-1-of-2–10_128_0_16-Thread-16.txt

When

.withNumShards(4)

is changed to:

.withNumShards(0)

then the runner will determine number of shards per key (assuming Streaming Engine is used), in that case we get 20 shards (additional 4 shards for hottest key d):

a-winMaxTs-2023-01-20T10_27_34.999Z-shard-0-of-4--10_128_0_7-Thread-17.txt
a-winMaxTs-2023-01-20T10_27_34.999Z-shard-1-of-4--10_128_0_7-Thread-16.txt
a-winMaxTs-2023-01-20T10_27_34.999Z-shard-2-of-4--10_128_0_7-Thread-16.txt
a-winMaxTs-2023-01-20T10_27_34.999Z-shard-3-of-4--10_128_0_7-Thread-17.txt
b-winMaxTs-2023-01-20T10_27_34.999Z-shard-0-of-4--10_128_0_7-Thread-16.txt
b-winMaxTs-2023-01-20T10_27_34.999Z-shard-1-of-4--10_128_0_7-Thread-17.txt
b-winMaxTs-2023-01-20T10_27_34.999Z-shard-2-of-4--10_128_0_7-Thread-17.txt
b-winMaxTs-2023-01-20T10_27_34.999Z-shard-3-of-4--10_128_0_7-Thread-16.txt
c-winMaxTs-2023-01-20T10_27_34.999Z-shard-0-of-4--10_128_0_7-Thread-17.txt
c-winMaxTs-2023-01-20T10_27_34.999Z-shard-1-of-4--10_128_0_7-Thread-17.txt
c-winMaxTs-2023-01-20T10_27_34.999Z-shard-2-of-4--10_128_0_7-Thread-17.txt
c-winMaxTs-2023-01-20T10_27_34.999Z-shard-3-of-4--10_128_0_7-Thread-16.txt
d-winMaxTs-2023-01-20T10_27_34.999Z-shard-0-of-4--10_128_0_7-Thread-17.txt
d-winMaxTs-2023-01-20T10_27_34.999Z-shard-1-of-4--10_128_0_7-Thread-16.txt
d-winMaxTs-2023-01-20T10_27_34.999Z-shard-2-of-4--10_128_0_7-Thread-17.txt
d-winMaxTs-2023-01-20T10_27_34.999Z-shard-3-of-4--10_128_0_7-Thread-16.txt
d-winMaxTs-2023-01-20T10_27_39.999Z-shard-0-of-4--10_128_0_7-Thread-19.txt
d-winMaxTs-2023-01-20T10_27_39.999Z-shard-1-of-4--10_128_0_6-Thread-16.txt
d-winMaxTs-2023-01-20T10_27_39.999Z-shard-2-of-4--10_128_0_7-Thread-19.txt
d-winMaxTs-2023-01-20T10_27_39.999Z-shard-3-of-4--10_128_0_6-Thread-16.txt