docker for mac

brew update
brew install docker docker-machine
#docker-machine create default
#docker-machine ls
#docker-machine rm -y default 
#brew install hyperkit
brew install --cask virtualbox
docker-machine create --driver virtualbox default

if errors as below do VBoxManage list hostonlyifs and VBoxManage hostonlyif remove vboxnet0VBoxManage hostonlyif remove vboxnet3

 (default) Found a new host-only adapter: "vboxnet3"
 Error creating machine: Error in driver during machine creation: Error setting up host only network on machine start: /usr/local/bin/VBoxManage hostonlyif 
 ipconfig #vboxnet3 --ip 192.168.99.1 --netmask 255.255.255.0 failed:
 VBoxManage: error: Code E_ACCESSDENIED (0x80070005) - Access denied (extended info not available)
 VBoxManage: error: Context: "EnableStaticIPConfig(Bstr(pszIp).raw(), Bstr(pszNetmask).raw())" at line 242 of file VBoxManageHostonly.cpp
VBoxManage list hostonlyifs
VBoxManage hostonlyif remove vboxnet0
...
VBoxManage hostonlyif remove vboxnet3

then

sudo mkdir /etc/vbox
sudo bash -c "echo '* 0.0.0.0/0 ::/0' > /etc/vbox/networks.conf"
docker-machine create -d virtualbox default

wait some time for ‘(default) Waiting for an IP…’

eval $(docker-machine env default)
docker-machine restart
eval "$(docker-machine env default)"
docker ps

Go to VBox manager UI, click on default machine -> settings -> network -> advanced -> port forwarding -> plus to add new forwarding rule: fill host port and guest port as 8080 -> OK

alternatively

docker-machine stop default 
VBoxManage modifyvm "default" --natpf1 "myapp,tcp,,8080,,8080"
docker-machine start default
eval $(docker-machine env default)

if docker ps returns:

Get "https://192.168.99.100:2376/v1.24/containers/json": dial tcp 192.168.99.100:2376: i/o timeout

then execute:

docker-machine stop default 
docker-machine start default
eval $(docker-machine env default)

You need to run eval $(docker-machine env default) in each new terminal window to export docker variable to be able to connect:

export DOCKER_TLS_VERIFY="1"
export DOCKER_HOST="tcp://192.168.99.100:2376"
export DOCKER_CERT_PATH="/Users/me/.docker/machine/machines/default"
export DOCKER_MACHINE_NAME="default"
# Run this command to configure your shell: 
# eval $(docker-machine env)

In your browser go to http://localhost:8080

source:
https://apple.stackexchange.com/questions/373888/how-do-i-start-the-docker-daemon-on-macos
https://stackoverflow.com/questions/70281938/docker-machine-unable-to-create-a-machine-on-macos-vboxmanage-returning-e-acces
https://stackoverflow.com/questions/36286305/how-do-i-forward-a-docker-machine-port-to-my-host-port-on-osx

Advertisement

Install and run Airflow 1.10.15 dags in docker

me@ubuntu-vm:~/dev$ cat entrypoint.sh
FROM debian:buster-slim

RUN apt update && apt upgrade -y \
 && apt install sudo locales -y \
 && echo "tzdata tzdata/Areas select Europe" | debconf-set-selections \
 && echo "tzdata tzdata/Zones/Europe select Warsaw" | debconf-set-selections \
 && rm -f /etc/localtime /etc/timezone \
 && dpkg-reconfigure -f noninteractive tzdata \
 && rm -rf /var/lib/apt/lists/* && localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8 \
 && export LANG=en_US.utf8 \
 && adduser airflow \
 && echo "airflow ALL=NOPASSWD: ALL" > /etc/sudoers.d/airflow

USER airflow

RUN sudo apt update -y \
 && sudo apt install --no-install-recommends freetds-bin libffi6 libsasl2-2 libsasl2-modules libssl1.1 locales lsb-release sasl2-bin sqlite3 unixodbc -y \
 && sudo apt install python3 python3-pip -y \
 && python3 -m pip install --upgrade pip==20.2.4 \
 && AIRFLOW_VERSION=1.10.15 \
 && PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)" \
 && CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt" \
 && pip3 install "apache-airflow==${AIRFLOW_VERSION}" apache-airflow-backport-providers-apache-beam==2021.3.13 apache-airflow-backport-providers-google==2021.3.3 --constraint "${CONSTRAINT_URL}"

RUN PATH=$PATH:~/.local/bin \
 && airflow db init \
 && mkdir ~/airflow/dags

COPY entrypoint.sh /entrypoint.sh

RUN sudo chown airflow:airflow /entrypoint.sh && sudo chmod u+x /entrypoint.sh

ENV PATH="${PATH}:~/.local/bin"

EXPOSE 8080

ENTRYPOINT ["/entrypoint.sh"]
me@ubuntu-vm:~/dev$ cat entrypoint.sh
#!/bin/bash
PATH=$PATH:~/.local/bin
echo $PATH
nohup airflow webserver $* >> ~/airflow/logs/webserver.logs &
nohup airflow scheduler >> ~/airflow/logs/scheduler.logs &
sleep 3
tail -f ~/airflow/logs/webserver.logs ~/airflow/logs/scheduler.logs

build an image named debian-buster-slim-airflow-1.10.15 based on Dockerfile

me@ubuntu-vm:~/dev$ docker build -t debian-buster-slim-airflow-1.10.15 .
Sending build context to Docker daemon  6.656kB
Step 1/9 : FROM debian:buster-slim
 ---> d6c0854744f0
Step 2/9 : RUN apt update && apt upgrade -y  && apt install sudo locales -y  && echo "tzdata tzdata/Areas select Europe" | debconf-set-selections  && echo "tzdata tzdata/Zones/Europe select Warsaw" | debconf-set-selections  && rm -f /etc/localtime /etc/timezone  && dpkg-reconfigure -f noninteractive tzdata  && rm -rf /var/lib/apt/lists/* && localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8  && export LANG=en_US.utf8  && adduser airflow  && echo "airflow ALL=NOPASSWD: ALL" > /etc/sudoers.d/airflow
 ---> Using cache
 ---> b67e9c4a416c
Step 3/9 : USER airflow
 ---> Using cache
 ---> 8d4a0a9223f3
Step 4/9 : RUN sudo apt update -y  && sudo apt install --no-install-recommends freetds-bin libffi6 libsasl2-2 libsasl2-modules libssl1.1 locales lsb-release sasl2-bin sqlite3 unixodbc -y  && sudo apt install python3 python3-pip -y  && python3 -m pip install --upgrade pip==20.2.4  && AIRFLOW_VERSION=1.10.15  && PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"  && CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"  && pip3 install "apache-airflow==${AIRFLOW_VERSION}" apache-airflow-backport-providers-apache-beam==2021.3.13 apache-airflow-backport-providers-google==2021.3.3 --constraint "${CONSTRAINT_URL}"
 ---> Using cache
 ---> 80063fa7c6ab
Step 5/9 : RUN PATH=$PATH:~/.local/bin  && airflow db init  && mkdir ~/airflow/dags
 ---> Using cache
 ---> 70a9522a549f
Step 6/9 : COPY entrypoint.sh /entrypoint.sh
 ---> ea6f9f014cac
Step 7/9 : RUN sudo chown airflow:airflow /entrypoint.sh && sudo chmod u+x /entrypoint.sh
 ---> Running in 4bee043480f3
Removing intermediate container 4bee043480f3
 ---> b4309a30238b
Step 8/9 : EXPOSE 8080
 ---> Running in 9a11a6f7e4d6
Removing intermediate container 9a11a6f7e4d6
 ---> 2bcf6cbcffbb
Step 9/9 : ENTRYPOINT ["/entrypoint.sh"]
 ---> Running in 3f581c95d16b
Removing intermediate container 3f581c95d16b
 ---> cabafe595172
Successfully built cabafe595172
Successfully tagged airflow:latest

create and run a container from given debian-buster-slim-airflow-1.10.1 image:

me@ubuntu-vm:~/dev$ docker run -d  -p 8080:8080 -v $(pwd)/share:/share debian-buster-slim-airflow-1.10.15
6a49147124e6a4689b2de59bbd28498bcff9ab5a6b3d06719859526dd0249255
me@ubuntu-vm:~/dev$ docker logs 6a49147124e6a4689b2de59bbd28498bcff9ab5a6b3d06719859526dd0249255
/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/home/airflow/.local/bin
[2022-09-25 21:13:32 +0200] [23] [INFO] Starting gunicorn 20.0.4
[2022-09-25 21:13:32 +0200] [23] [INFO] Listening at: http://0.0.0.0:8080 (23)
[2022-09-25 21:13:32 +0200] [23] [INFO] Using worker: sync
[2022-09-25 21:13:32 +0200] [26] [INFO] Booting worker with pid: 26
[2022-09-25 21:13:32 +0200] [27] [INFO] Booting worker with pid: 27
==> /home/airflow/airflow/logs/webserver.logs  /home/airflow/airflow/logs/scheduler.logs <==
[2022-09-25 21:13:31,622] {__init__.py:50} INFO - Using executor SequentialExecutor
[2022-09-25 21:13:31,628] {scheduler_job.py:1351} INFO - Starting the scheduler
[2022-09-25 21:13:31,628] {scheduler_job.py:1359} INFO - Running execute loop for -1 seconds
[2022-09-25 21:13:31,628] {scheduler_job.py:1360} INFO - Processing each file at most -1 times
[2022-09-25 21:13:31,628] {scheduler_job.py:1363} INFO - Searching for files in /home/airflow/airflow/dags
[2022-09-25 21:13:31,631] {scheduler_job.py:1365} INFO - There are 25 files in /home/airflow/airflow/dags
me@ubuntu-vm:~/dev$ docker ps
CONTAINER ID   IMAGE     COMMAND            CREATED          STATUS          PORTS                                       NAMES
e2370e7ec521   airflow   "/entrypoint.sh"   46 seconds ago   Up 45 seconds   0.0.0.0:8080->8080/tcp, :::8080->8080/tcp   musing_franklin
me@ubuntu-vm:~/dev$ docker exec -t musing_franklin whoami
airflow

me@ubuntu-vm:~/dev$ docker exec -t musing_franklin bash -c 'echo $PATH'
/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:~/.local/bin

me@ubuntu-vm:~/dev$ docker exec -t musing_franklin echo $PATH
/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin

docker exec -t musing_franklin bash -c 'ls -la /share'
docker exec -t musing_franklin bash -c 'sudo chmod +r /share/bartek_exception_branch_dag.py && cp /share/bartek_exception_branch_dag.py ~/airflow/dags/'

docker exec -t musing_franklin /home/airflow/.local/bin/airflow dags list
docker exec -t musing_franklin bash -c 'airflow dags list'
docker exec -t musing_franklin bash -c 'airflow dags unpause bartek_exception_branch_dag'
#docker exec -t musing_franklin bash -c 'rm ~/airflow/dags/bartek_branch_dag.py'
docker exec -t musing_franklin bash -c 'airflow dags trigger bartek_exception_branch_dag'

to list containers including stopped:

docker ps -a

to stop running container using container name or container id:

docker stop musing_franklin

to start a stopped container and start it using container name or container id:

docker start musing_franklin

to delete a container:

docker rm musing_franklin

install bash completion and docker completion:

brew install bash-completion

curl -XGET https://raw.githubusercontent.com/docker/cli/master/contrib/completion/bash/docker > $(brew –prefix)/etc/bash_completion.d/docker

and open a new tab to have auto completion when typing docker start mus + tab to get

docker start musing_franklin

Dataflow cancelling and draining streaming beam pipeline

1. Drain pipeline – pipeline stops accepting new traffic and tries to complete processing of data accepted before drain signal

locals {
  ts = formatdate("YYYY_MM_DD__hh_mm_ss", timestamp())
}

a) wait until streaming pipeline fully drains then start new pipeline (new pipeline’s name is dynamically calculated)

resource "google_dataflow_job" "job" {
  job_name = "myjob${replace(local.ts, local.ts, "")}"
  skip_wait_on_job_termination = false # default false if not present 
  on_delete = "drain"
}
JOB_ID                                   NAME                     TYPE       CREATION_TIME        STATE    REGION
2022-09-21_07_20_10-4421241458695032459  bartek-mypubsubtogcsjob  Streaming  2022-09-21 14:20:12  Running  us-central1

module.dataflow_classic_template_job.google_dataflow_job.job[0]: Destroying... [id=2022-09-21_07_20_10-442...]
module.dataflow_classic_template_job.google_dataflow_job.job[0]: Still destroying... [id=2022-09-21_07_20_10-442..., 10s elapsed]
module.dataflow_classic_template_job.google_dataflow_job.job[0]: Still destroying... [id=2022-09-21_07_20_10-442..., 20s elapsed]
...
module.dataflow_classic_template_job.google_dataflow_job.job[0]: Still destroying... [id=2022-09-21_07_20_10-442..., 3m10s elapsed]

JOB_ID                                   NAME                     TYPE       CREATION_TIME        STATE     REGION
2022-09-21_07_20_10-442...               bartek-mypubsubtogcsjob  Streaming  2022-09-21 14:20:12  Draining  us-central1

module.dataflow_classic_template_job.google_dataflow_job.job[0]: Destruction complete after 3m39s
module.dataflow_classic_template_job.google_dataflow_job.job[0]: Creating...
module.dataflow_classic_template_job.google_dataflow_job.job[0]: Creation complete after 2s [id=2022-09-21_07_36_15-164...]

JOB_ID                                   NAME                     TYPE       CREATION_TIME        STATE    REGION
2022-09-21_07_20_10-442...               bartek-mypubsubtogcsjob  Streaming  2022-09-21 14:20:12  Drained  us-central1
2022-09-21_07_36_15-164...               bartek-mypubsubtogcsjob  Streaming  2022-09-21 14:36:16  Running  us-central1

// or

resource "google_dataflow_job" "job" {
  job_name = "myjob-${local.ts}" 
  on_delete = "drain"
}
module.dataflow_classic_template_job.google_dataflow_job.job[0]: Destroying... [id=2022-09-21_10_15_50-481...]
module.dataflow_classic_template_job.google_dataflow_job.job[0]: Still destroying... [id=2022-09-21_10_15_50-481... 10s elapsed]
module.dataflow_classic_template_job.google_dataflow_job.job[0]: Still destroying... [id=2022-09-21_10_15_50-481..., 20s elapsed]

JOB_ID                                    NAME                                          TYPE       CREATION_TIME        STATE     REGION
2022-09-21_10_15_50-481...                bartek-mypubsubtogcsjob-2022_09_21__17_15_43  Streaming  2022-09-21 17:15:51  Draining  us-central1

module.dataflow_classic_template_job.google_dataflow_job.job[0]: Still destroying... [id=2022-09-21_10_15_50-481..., 3m40s elapsed]
module.dataflow_classic_template_job.google_dataflow_job.job[0]: Destruction complete after 3m50s
module.dataflow_classic_template_job.google_dataflow_job.job[0]: Creating...
module.dataflow_classic_template_job.google_dataflow_job.job[0]: Creation complete after 2s [id=2022-09-21_10_26_05-505...]

JOB_ID                                    NAME                                          TYPE       CREATION_TIME        STATE    REGION
2022-09-21_10_26_05-505...                bartek-mypubsubtogcsjob-2022_09_21__17_22_15  Streaming  2022-09-21 17:26:06  Running  us-central1
2022-09-21_10_15_50-481...                bartek-mypubsubtogcsjob-2022_09_21__17_15_43  Streaming  2022-09-21 17:15:51  Drained  us-central1

b) immediately start new job with the same static name as existing and asynchronously finish existing job with status: Updated

resource "google_dataflow_job" "job" {
  job_name                        = local.job_base_name
  # regardless of skip_wait_on_job_termination (whether is missing, set to false or true)
  on_delete = "drain"
}
module.dataflow_classic_template_job.google_dataflow_job.job[0]: Modifying... [id=2022-09-21_08_22_00-287...]
module.dataflow_classic_template_job.google_dataflow_job.job[0]: Still modifying... [id=2022-09-21_08_22_00-287..., 10s elapsed]
module.dataflow_classic_template_job.google_dataflow_job.job[0]: Still modifying... [id=2022-09-21_08_22_00-287..., 20s elapsed]

JOB_ID                                   NAME                     TYPE       CREATION_TIME        STATE    REGION
2022-09-21_08_27_30-403...               bartek-mypubsubtogcsjob  Streaming  2022-09-21 15:27:31  Pending  us-central1
2022-09-21_08_22_00-287...               bartek-mypubsubtogcsjob  Streaming  2022-09-21 15:22:00  Running  us-central1

module.dataflow_classic_template_job.google_dataflow_job.job[0]: Still modifying... [id=2022-09-21_08_22_00-287..., 3m10s elapsed]
module.dataflow_classic_template_job.google_dataflow_job.job[0]: Modifications complete after 3m17s [id=2022-09-21_08_27_30-403...]

JOB_ID                                   NAME                     TYPE       CREATION_TIME        STATE    REGION
2022-09-21_08_27_30-403...               bartek-mypubsubtogcsjob  Streaming  2022-09-21 15:27:31  Running  us-central1
2022-09-21_08_22_00-287...               bartek-mypubsubtogcsjob  Streaming  2022-09-21 15:22:00  Updated  us-central1

c) Error – job name already exists – starts draining existing job but fails to start new one (with dynamically calculated job name)

resource "google_dataflow_job" "job" {
  job_name = "myjob${replace(local.ts, local.ts, "")}"
  skip_wait_on_job_termination = true
  on_delete = "drain"
}
module.dataflow_classic_template_job.google_dataflow_job.job[0]: Destroying... [id=2022-09-21_09_19_32-103...]
module.dataflow_classic_template_job.google_dataflow_job.job[0]: Destruction complete after 8s
module.dataflow_classic_template_job.google_dataflow_job.job[0]: Creating...

Error: googleapi: Error 409: (f5bb8c1b1a52...): The workflow could not be created. Causes: (4ed84c0ff929dc...): 
There is already an active job named bartek-mypubsubtogcsjob. 
If you want to  submit a second job, try again by setting a different name., alreadyExists

d) immediately start new job with a different name and also drain existing job asynchronously

resource "google_dataflow_job" "job" {
  job_name = "myjob-${local.ts}"
  skip_wait_on_job_termination = true
  on_delete = "drain"
}
JOB_ID                                    NAME                                          TYPE       CREATION_TIME        STATE    REGION
2022-09-21_09_55_59-142...                bartek-mypubsubtogcsjob-2022_09_21__16_55_51  Streaming  2022-09-21 16:55:59  Running  us-central1

module.dataflow_classic_template_job.google_dataflow_job.job[0]: Destroying... [id=2022-09-21_09_55_59-142...]
module.dataflow_classic_template_job.google_dataflow_job.job[0]: Destruction complete after 7s
module.dataflow_classic_template_job.google_dataflow_job.job[0]: Creating...
module.dataflow_classic_template_job.google_dataflow_job.job[0]: Creation complete after 3s [id=2022-09-21_10_05_40-161...]

JOB_ID                                    NAME                                          TYPE       CREATION_TIME        STATE     REGION
2022-09-21_10_05_40-161...                bartek-mypubsubtogcsjob-2022_09_21__17_05_32  Streaming  2022-09-21 17:05:41  Running   us-central1
2022-09-21_09_55_59-142...                bartek-mypubsubtogcsjob-2022_09_21__16_55_51  Streaming  2022-09-21 16:55:59  Draining  us-central1

JOB_ID                                    NAME                                          TYPE       CREATION_TIME        STATE    REGION
2022-09-21_10_05_40-161...                bartek-mypubsubtogcsjob-2022_09_21__17_05_32  Streaming  2022-09-21 17:05:41  Running  us-central1
2022-09-21_09_55_59-142...                bartek-mypubsubtogcsjob-2022_09_21__16_55_51  Streaming  2022-09-21 16:55:59  Drained  us-central1

2. Cancel streaming pipeline – stops the pipeline but messages in-flight may be lost:

resource "google_dataflow_job" "job" {
  on_delete = "cancel"
...
}

Airflow schedule dag to run once a month at specific date and time and on demand

Goal, schedule airflow job to run in 10min from now and next run at the same time in next months from now.

bartek_basic_dag.py:

import pendulum
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

# pip install apache-airflow==1.10.15

with DAG(dag_id='bartek_basic_dag',
         schedule_interval="_SCHEDULE_INTERVAL_",
         start_date=pendulum.parse('_START_DATE_', tz='UTC'),

         # optional below
         default_args={},
         description='Bartek basic dag description',
         tags=["bartek"],
         ) as dag:
    t1 = BashOperator(task_id='hello_bash_operator', bash_command='echo "Bartek" from bash')

We need to:
– set minute at current minute +5mins in cron expression and
– set start date to current date -1month +5mins
as airflow executes the trigger at the end of 1-month long interval so that will be executed in 5 mins from now with run id scheduled__2022-08-17T08:09:00+00:00 (referring date in the past).

 echo "Current local date time: $(date '+%Y-%m-%dT%H:%M:%S%z')" && \
 echo "Current UTC   date time: $(date -u '+%Y-%m-%dT%H:%M:%S%z')" && \
 cat bartek_basic_dag.py | \
 sed "s/_SCHEDULE_INTERVAL_/$(date -u -v +5M '+%M %H %d') * */" | \
 sed "s/_START_DATE_/$(date -u -v -1m -v +5M +%Y-%m-%dT%H:%M:00)/"
Current local date time: 2022-09-17T10:04:00+0200
Current UTC   date time: 2022-09-17T08:04:00+0000
import pendulum
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

# pip install apache-airflow==1.10.15

with DAG(dag_id='bartek_basic_dag',
         schedule_interval="09 08 17 * *",
         start_date=pendulum.parse('2022-08-17T08:09:00', tz='UTC'),

         # optional below
         default_args={},
         description='Bartek basic dag description',
         tags=["bartek"],
         ) as dag:
    t1 = BashOperator(task_id='hello_bash_operator', bash_command='echo "Bartek" from bash')
Airflow simple dag scheduled once a month
Airflow simple dag scheduled once a month

bigquery

1. One row with array of struct

bq query –nouse_legacy_sql ‘SELECT [STRUCT(1 as id, “bob” as name, 2 as seq),STRUCT(1 as id, “bob” as name, 1 as seq),STRUCT(1 as id, “bob” as name, 3 as seq)] as data’

SELECT [
  STRUCT(1 as id, "bob" as name, 2 as seq),
  STRUCT(1 as id, "bob" as name, 1 as seq), 
  STRUCT(1 as id, "bob" as name, 3 as seq)
] as data
+---------------------------------------------------------------------------------------------------------+
|                                                  data                                                   |
+---------------------------------------------------------------------------------------------------------+
| [{"id":"1","name":"bob","seq":"2"},{"id":"1","name":"bob","seq":"1"},{"id":"1","name":"bob","seq":"3"}] |
+---------------------------------------------------------------------------------------------------------+

2. Flatten 3-element array into 3 rows with struct each

SELECT * FROM UNNEST(
  [
    STRUCT(1 as id, "bob" as name, 2 as seq),
    STRUCT(1 as id, "bob" as name, 1 as seq),
    STRUCT(1 as id, "bob" as name, 3 as seq)
  ]
)
+----+------+-----+
| id | name | seq |
+----+------+-----+
|  1 | bob  |   2 |
|  1 | bob  |   1 |
|  1 | bob  |   3 |
+----+------+-----+

3. Flatten 3-element array into 3 rows and query particular field that rows

SELECT name FROM UNNEST([STRUCT(1 as id, "bob" as name, 2 as seq),STRUCT(1 as id, "bob" as name, 1 as seq),STRUCT(1 as id, "bob" as name, 3 as seq)])
+------+
| name |
+------+
| bob  |
| bob  |
| bob  |
+------+

4. WITH temp table row returning array – select field of first element of array

WITH tmp AS (
  SELECT [STRUCT(1 as id, "bob" as name, 2 as seq),STRUCT(1 as id, "bob" as name, 1 as seq),STRUCT(1 as id, "bob" as name, 3 as seq)] 
as data)
SELECT data[OFFSET(0)].id FROM tmp;
+----+
| id |
+----+
|  1 |
+----+

5. WITH temp table row returning struct – select fields

WITH tmp AS (
  SELECT * FROM UNNEST([STRUCT(1 as id, "bob" as name, 2 as seq),STRUCT(1 as id, "bob" as name, 1 as seq),STRUCT(1 as id, "bob" as name, 3 as seq)])
)
SELECT id, name, seq FROM tmp
+----+------+-----+
| id | name | seq |
+----+------+-----+
|  1 | bob  |   2 |
|  1 | bob  |   1 |
|  1 | bob  |   3 |
+----+------+-----+ 

6. Subselect

SELECT id, name, seq FROM (
  SELECT * FROM UNNEST([STRUCT(1 as id, "bob" as name, 2 as seq),STRUCT(1 as id, "bob" as name, 1 as seq),STRUCT(1 as id, "bob" as name, 3 as seq)])
AS S)
+----+------+-----+
| id | name | seq |
+----+------+-----+
|  1 | bob  |   2 |
|  1 | bob  |   1 |
|  1 | bob  |   3 |
+----+------+-----+

7. select one of the duplicates that have biggest field value
Data:

SELECT * FROM UNNEST(
  [
    STRUCT(1 as id, "foo" as name, 2 as seq),
    STRUCT(2 as id, "bar" as name, 2 as seq),
    STRUCT(1 as id, "foo" as name, 1 as seq),
    STRUCT(1 as id, "foo" as name, 3 as seq),
    STRUCT(2 as id, "bar" as name, 1 as seq)
  ]
)
+----+------+-----+
| id | name | seq |
+----+------+-----+
|  1 | foo  |   2 |
|  2 | bar  |   2 |
|  1 | foo  |   1 |
|  1 | foo  |   3 |
|  2 | bar  |   1 |
+----+------+-----+

where column with unique field: id

Expected result:

+----+------+-----+
| id | name | seq |
+----+------+-----+
|  2 | bar  |   2 |
|  1 | foo  |   3 |
+----+------+-----+
WITH tmp AS (
  SELECT * FROM UNNEST(
    [
      STRUCT(1 as id, "foo" as name, 2 as seq),
      STRUCT(2 as id, "bar" as name, 2 as seq),
      STRUCT(1 as id, "foo" as name, 1 as seq),
      STRUCT(1 as id, "foo" as name, 3 as seq),
      STRUCT(2 as id, "bar" as name, 1 as seq)
    ]
  )
)
SELECT id, name, seq
FROM (
  SELECT
      *,
      ROW_NUMBER() OVER (PARTITION BY id ORDER BY seq DESC) row_number
  FROM tmp
)
WHERE row_number = 1

8. merge

CREATE OR REPLACE TABLE bartek_dataset.tmp_table (id INT64, name STRING, seq INT64);
MERGE bartek_dataset.tmp_table T 
USING 
(
  SELECT id, name, seq
  FROM 
  (
    SELECT
        *,
        ROW_NUMBER() OVER (PARTITION BY id ORDER BY seq DESC) row_number
    FROM 
    (
      SELECT * FROM UNNEST
        (
          [
            STRUCT(1 as id, "foo" as name, 2 as seq),
            STRUCT(2 as id, "bar" as name, 2 as seq),
            STRUCT(1 as id, "foo" as name, 1 as seq),
            STRUCT(1 as id, "foo" as name, 3 as seq),
            STRUCT(2 as id, "bar" as name, 1 as seq)
          ]
        )
    )
  )
  WHERE row_number = 1
) S
ON T.id = S.id
WHEN NOT MATCHED THEN
  INSERT (id, name, seq) VALUES(S.id, S.name, S.seq);
SELECT * FROM bartek_dataset.tmp_table;
--TRUNCATE TABLE bartek_dataset.tmp_table;

9. insert into from select

INSERT INTO bartek_dataset.tmp_table 
(
  SELECT * FROM UNNEST
  (
    [
      STRUCT(1 as id, "foo" as name, 2 as seq),
      STRUCT(1 as id, "foo" as name, 1 as seq)
    ]
  )
);

10. create table as select

CREATE TABLE bartek_dataset.tmp_table2 AS
(
  SELECT * FROM UNNEST
  (
    [
      STRUCT(1 as id, "foo" as name, 2 as seq),
      STRUCT(1 as id, "foo" as name, 1 as seq)
    ]
  )
)

airflow 1.10.15 on ubuntu 18.04 20.04 and 22.04 installation with google and apache beam providers

sudo apt-get update && sudo apt-get upgrade -y && sudo apt-get clean -y && sudo apt-get autoremove -y && sudo apt-get autoclean -y && sudo apt clean -y && sudo apt autoremove -y && sudo apt autoclean -y


sudo apt-get install build-essential -y

# ubuntu 18.04 and 20.04
sudo apt-get install -y --no-install-recommends freetds-bin krb5-user ldap-utils libffi6 libsasl2-2 libsasl2-modules libssl1.1 locales  lsb-release sasl2-bin sqlite3 unixodbc

# ubuntu 22.04
sudo apt-get install -y --no-install-recommends freetds-bin krb5-user ldap-utils libffi-dev libsasl2-2 libsasl2-modules libssl-dev locales  lsb-release sasl2-bin sqlite3 unixodbc libncurses-dev build-essential libssl-dev zlib1g-dev libbz2-dev libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev libncursesw5-dev xz-utils tk-dev libffi-dev liblzma-dev python3-openssl git


sudo apt-get install curl apt-transport-https ca-certificates gnupg -y
echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] https://packages.cloud.google.com/apt cloud-sdk main" | sudo tee -a /etc/apt/sources.list.d/google-cloud-sdk.list
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key --keyring /usr/share/keyrings/cloud.google.gpg add -
sudo apt-get update && sudo apt-get install google-cloud-sdk -y

cd
tar xzf /media/sf_Downloads/.ssh.tgz
cd .config
tar xzf /media/sf_Downloads/gcloud.tgz
cd

sudo apt install python3-pip -y
pip3 -V
alias "pip=pip3"

# ubuntu 18.04 and 20.04
# python3 -m pip install --upgrade pip==20.2.4

# ubuntu 22.04 with pip3 --use-deprecated legacy-resolver
python3 -m pip install --upgrade pip


# install other python versions:
curl https://pyenv.run | bash

echo '# pyenv' >> ~/.bashrc
echo 'export PATH="$HOME/.pyenv/bin:$PATH"' >> ~/.bashrc
echo 'eval "$(pyenv init --path)"' >> ~/.bashrc
echo 'eval "$(pyenv virtualenv-init -)"' >> ~/.bashrc
. ~/.bashrc

#pyenv versions
pyenv install 3.8.13

# Per project usage
pyenv local 3.8.13

# https://airflow.apache.org/docs/apache-airflow/1.10.15/installation.html
AIRFLOW_VERSION=1.10.15
PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
echo $PYTHON_VERSION
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
echo $CONSTRAINT_URL

# ubuntu 18.04 and 20.04
# pip3 install "apache-airflow==${AIRFLOW_VERSION}" apache-airflow-backport-providers-apache-beam==2021.3.13 apache-airflow-backport-providers-google==2021.3.3 --constraint "${CONSTRAINT_URL}"

# ubuntu 22.04 with pip3 --use-deprecated legacy-resolver
pip3 --use-deprecated legacy-resolver install "apache-airflow==${AIRFLOW_VERSION}" apache-airflow-backport-providers-apache-beam==2021.3.13 apache-airflow-backport-providers-google==2021.3.3 --constraint "${CONSTRAINT_URL}" 


#pip3 install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
#pip3 install apache-airflow-backport-providers-apache-beam==2021.3.13 --constraint "${CONSTRAINT_URL}"
#pip3 install apache-airflow-backport-providers-google==2021.3.3 --constraint "${CONSTRAINT_URL}"

#pip3 install apache-beam[gcp]==2.27.0 --constraint "${CONSTRAINT_URL}"
#pip3 install "apache-airflow[gcp]==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

export GOOGLE_CLOUD_PROJECT=...


PATH=$PATH:~/.local/bin
airflow db init
nohup airflow webserver $* >> ~/airflow/logs/webserver.logs &
#nohup airflow worker $* >> ~/airflow/logs/worker.logs &
nohup airflow scheduler >> ~/airflow/logs/scheduler.logs &

cd ~/airflow
mkdir dags
mkdir logs
mkdir plugins

cp /media/sf_Downloads/bartek_dag.py dags/
sleep 30

airflow list_dags
airflow dags unpause bartek_dag
airflow dags trigger -c "{ \"a\":1 }" bartek_dag
#airflow dags trigger bartek_dag

firefox http://localhost:8080

#pip3 uninstall -y -r <(pip3 freeze)


sudo apt install python3-pip -y
pip3 -V
alias "pip=pip3"
python3 -m pip install --upgrade pip==20.2.4
AIRFLOW_VERSION=1.10.15
PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
echo $PYTHON_VERSION
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
echo $CONSTRAINT_URL
pip3 install "apache-airflow==${AIRFLOW_VERSION}" apache-airflow-backport-providers-apache-beam==2021.3.13 apache-airflow-backport-providers-google==2021.3.3 --constraint "${CONSTRAINT_URL}"
PATH=$PATH:~/.local/bin
airflow db init
nohup airflow webserver $* >> ~/airflow/logs/webserver.logs &
nohup airflow scheduler >> ~/airflow/logs/scheduler.logs &
cd ~/airflow
mkdir dags
mkdir logs
mkdir plugins

cp /media/sf_Downloads/bartek_basic_dag.py dags/
sleep 30

airflow list_dags
airflow dags unpause bartek_dag

Python – syntax vs Java, installation, IDE Intellij setup, pyenv, virtualenv, pip, apache beam like api, pytest

src/main/python/my_python.py:

def main():
    print("Hello World!")


if __name__ == "__main__":
    main()
me@MacBook:~/dev/my-apache-beam-dataflow$ python3 -V
Python 3.9.9
me@MacBook:~/dev/my-apache-beam-dataflow$ python3 src/main/python/my_python.py
Hello World!

Intellij Setup:
1. File/Project Structure/Platform Settings/SDKs/+/Add Python SDK/ -> Python SDK home path: /usr/local/bin/python3.9
2. File/Project Structure/Platform Settings/Facets/+/Add Python/ -> Python Interpreter: choose Python 9 /usr/local/bin/python3.9

Add other python file(s) like with helper method(s):
src/main/python/my_lib.py:

def hello(name: str):
    print(f"hello {name}")

Import hello method from my_lib.py file and use it main file:

from my_lib import hello

def main():
    print("Hello World!")
    hello("bob")

If errors in the import statement then Intellij right click on python folder (in src/main) – Mark directory as -> Sources Root

List and dict

    # list
    my_list = ['a', 'b']
    print(f"my_list={my_list}")

    for e in my_list:
        print(f"e={e}")
    for i in range(len(my_list)):
        print(f"my_list[{i}]={my_list[i]}")

    my_transformed_list = list(map(lambda s: s.upper(), my_list))
    print(f"my_transformed_list={my_transformed_list}")

    # possible but not recommended (use def instead)
    to_upper_lambda: Callable[[Any], Any] = lambda s: s.upper()

    my_transformed_list2 = list(map(to_upper_lambda, my_list))
    print(f"my_transformed_list2={my_transformed_list2}")

    my_list2 = ['a', 1]
    print(f"my_list2={my_list2}")

    def process(el):
        return f"{el}{el}"

    my_new_list = [f"{e}{e}" for e in my_list]
    my_new_list2 = [process(e) for e in my_list]  # same as above
    print(f"my_new_list={my_new_list}")  # my_new_list

    my_empty_list = list()
    print(f"my_empty_list={my_empty_list}")

    def to_upper_fn(s):
        return s.upper()

    my_transformed_list3 = list(map(to_upper_fn, my_list))
    print(f"my_transformed_list3={my_transformed_list3}")

    my_filtered_list = list(filter(lambda s: s == 'a', my_list))
    print(f"my_filtered_list={my_filtered_list}")
my_list=['a', 'b']
e=a
e=b
my_list[0]=a
my_list[1]=b
my_transformed_list=['A', 'B']
my_transformed_list2=['A', 'B']
my_list2=['a', 1]
my_new_list=['aa', 'bb']
my_empty_list=[]
my_transformed_list3=['A', 'B']
my_filtered_list=['a']
    # dict (aka map)
    my_dict = {'a': 1, 'b': "abc"}
    print(f"my_dict={my_dict}")
    print(f"my_dict['a']={my_dict['a']}")
    for k in my_dict:
        print(k, '->', my_dict[k])

    # noinspection PyTypeChecker
    for k, v in my_dict.items():
        print(k, '->', v)

    my_dict2 = {"boy": 1, "girl": 2, "men": 18, "woman": 17}
    print(f"my_dict2={my_dict2.items()}")
    my_sorted_dict2 = sorted(my_dict2.items(), key=lambda ele: ele[1])
    print(f"my_sorted_dict2={my_sorted_dict2}")

    my_empty_dict = dict()
    print(f"my_empty_dict={my_empty_dict}")  
my_dict={'a': 1, 'b': 'abc'}
my_dict['a']=1
a -> 1
b -> abc
a -> 1
b -> abc
my_dict2=dict_items([('boy', 1), ('girl', 2), ('men', 18), ('woman', 17)])
my_sorted_dict2=[('boy', 1), ('girl', 2), ('woman', 17), ('men', 18)]
my_empty_dict={}

Java way:

    public static void main(String[] args) {
        // List
        List<String> myListOldWay = Arrays.asList("a", "b");
        List<String> myList = List.of("a", "b");// since Java 9
        System.out.println("myList=" + myList);
        myList.forEach(e -> System.out.println("e=" + e));

        List<String> myTransformedList
                = myList.stream().map(String::toUpperCase).collect(Collectors.toList()); // equivalent: map(e -> e.toUpperCase())
        System.out.println("myTransformedList=" + myTransformedList);

        List<String> myFilteredList = myList.stream().filter(e -> e.equals("a")).collect(Collectors.toList());
        System.out.println("myFilteredList=" + myFilteredList);

        List<Object> myObjectList = Arrays.asList("a", 1);
        List<String> emptyList = Collections.emptyList();
        System.out.println("emptyList=" + emptyList);

        // Map
        Map<String, Integer> myMap = Map.of("a", 1, "b", 2); // since java 9
        System.out.println("myMap=" + myMap);
        myMap.forEach((k, v) -> System.out.println(k + "->" + v));
        myMap.keySet().forEach(k -> System.out.println(k + "->" + myMap.get(k)));
    }
myList=[a, b]
e=a
e=b
myTransformedList=[A, B]
myFilteredList=[a]
emptyList=[]
myMap={b=2, a=1}
b->2
a->1
b->2
a->1

Inheritance and Callable/def/lambda

from typing import List, Callable


class Processor:
    def process(self, elements: List[str]):
        raise NotImplemented


class NamedProcessor(Processor):
    def __init__(self, name):
        self.name = name

    def process(self, elements: List[str]):
        name = self.name if self.name else self.__class__.__name__
        print(f"Processing {len(elements)} element(s) by {name}")
        return elements

    def __rrshift__(self, name):
        self.name = name
        return self


class CallableProcessor(NamedProcessor):
    def __init__(self, fn: Callable[[str], str], name=None):
        super(CallableProcessor, self).__init__(name)
        self.fn = fn

    def process(self, elements: List[str]):
        elements2 = super().process(elements)
        return list(map(self.fn, elements2))


class ToLowerCase:
    def __call__(self, s: str):
        return s.lower()


def to_upper_case(s: str):
    return s.upper()


def main():
    upper_elements = CallableProcessor(to_upper_case).process(['a', 'b'])  # or CallableProcessor(lambda s: s.lower()))
    print(f"upper_elements={upper_elements}")
    lower__elements = ("To lower case processor" >> CallableProcessor(ToLowerCase())).process(['X', 'Y'])
    print(f"lower__elements={lower__elements}")


if __name__ == '__main__':
    main()

Processing 2 element(s) by CallableProcessor
upper_elements=['A', 'B']
Processing 2 element(s) by To lower case processor
lower__elements=['x', 'y']

*args and **kwargs


def my_args(*args):
    return args


def my_list(li):
    return li


def my_kwargs(**kwargs):
    return kwargs


def my_dict(d):
    return d


def my_arg_with_args(a, *args):
    return a, args


def my_arg_opt_arg_with_args(a, b=None, *args):
    return a, b, args


def my_args_kwargs(*args, **kwargs):
    return args, kwargs


def my_arg_opt_arg_with_args_kwargs(a, b=None, *args, **kwargs):
    return a, b, args, kwargs


def my_print(val):
    print(f"{val}, len={len(val)}")


if __name__ == '__main__':
    my_print(my_args())  # (), len=0
    my_print(my_args('x', 1, True))  # ('x', 1, True), len=3

    my_print(my_list([]))  # [], len=0 <-- difference
    my_print(my_list(list()))  # [], len=0 <-- difference
    my_print(my_list(['x', 1, True]))  # ['x', 1, True], len=3 <-- difference

    my_print(my_kwargs())  # {}, len=0
    my_print(my_kwargs(x='a', y=11, z=True))  # {'x': 'a', 'y': 11, 'z': True}, len=3

    my_print(my_dict(dict()))  # {}, len=0
    my_print(my_dict(dict(x='a', y=11, z=True)))  # {'x': 'a', 'y': 11, 'z': True}, len=3

    my_print(my_dict({}))  # {}, len=0
    my_print(my_dict({'x': 'a', 'y': 11, 'z': True}))  # {'x': 'a', 'y': 11, 'z': True}, len=3

    my_print(my_arg_with_args("a", "b", "c"))  # ('a', ('b', 'c')), len=2
    my_print(my_arg_opt_arg_with_args("a", "b", "c"))  # ('a', 'b', ('c',)), len=3
    my_print(my_args_kwargs('a', 'b', a=1, b=2))  # (('a', 'b'), {'a': 1, 'b': 2}), len=2
    my_print(my_arg_opt_arg_with_args_kwargs("a", "b", "c", x=1, y=2))  # ('a', 'b', ('c',), {'x': 1, 'y': 2}), len=4

tuple

def my_tuple(a, b):
    return a, b


if __name__ == '__main__':
    t = my_tuple('a', 1)
    print(f"t={t}")
    x, y = my_tuple('a', 1)
    print(f"x={x}, y={y}")
t=('a', 1)
x=a, y=1

if else statement

def print_conditionally(_flag):
    v = 1 if _flag else 0
    print(f"v={v}")


if __name__ == '__main__':
    print_conditionally(0)  # 0
    print_conditionally(None)  # 0
    print_conditionally(False)  # 0
    print_conditionally("")  # 0
    print_conditionally([])  # 0
    print_conditionally(())  # 0
    print_conditionally(list())  # 0
    print_conditionally(dict())  # 0
    print('-----------------------')
    print_conditionally(1)  # 1
    print_conditionally("a")  # 1
    print_conditionally(True)  # 1
    print_conditionally(['a'])  # 1
    print_conditionally(('a', 'b'))  # 1
    print_conditionally({'a': 1})  # 1

PGP GPG encryption and integration with GCP KMS and Secret Manager


rm -rf bartek-dev.conf
rm -rf bartek*.asc
rm -rf hello.txt*

cat > bartek-dev-vars.conf <<EOF
name=bartek
email=bartek@example.com
comment="bartek dev"
passphrase=bartekdev123
public_key=bartek-pp-public.asc
private_key=bartek-pp-private.asc
kms_project=bartek-project-123
kms_location=us
kms_keyring=bartek-key-ring
kms_key=bartek-key
EOF

source bartek-dev-vars.conf

rm pp-gpg.conf
cat > pp-gpg.conf <<EOF
    %echo Generating a OpenPGP RSA keys
    Key-Type: RSA
    Key-Length: 4096
    Subkey-Type: RSA
    Subkey-Length: 4096
    Name-Real: ${name}
    Name-Comment: ${comment}
    Name-Email: ${email}
    Expire-Date: 0
    Passphrase: ${passphrase}
    %commit
    %echo done
EOF

echo -e "\nGenerating public and private gpg key:"
rm -rf ~/.gnupg
gpg --verbose --batch --full-generate-key pp-gpg.conf

echo -e "\nExporting public and private gpg key to file:"
gpg --output ${public_key}  --armor --export ${email}
gpg --output ${private_key} --armor --batch --pinentry-mode=loopback --yes --passphrase "${passphrase}" --export-secret-key ${email}
ls -ls *.gpg


echo -e "\Importing public key from file:"
rm -rf ~/.gnupg
gpg --batch --pinentry-mode=loopback --yes --import ${public_key}

echo -e "\Encrypting text file using public key:"
echo "hello world" > hello.txt
gpg --output hello.txt.gpg --encrypt --recipient ${email} --trust-model always hello.txt


echo -e "\Importing private key from file:"
rm -rf ~/.gnupg
gpg --batch --pinentry-mode=loopback --yes --passphrase "${passphrase}" --import ${private_key}


echo -e "\Decrypting encrypted file using private key:"
echo -e "\n\nDecrypted text:"
gpg --batch --pinentry-mode=loopback --yes --passphrase "${passphrase}" --decrypt hello.txt.gpg


echo -e "\Encrypting private key using GCP KMS:"
echo -e "\n\nEncrypted private key:"
gcloud kms encrypt --project ${kms_project} --location ${kms_location} --keyring ${kms_keyring} --key ${kms_key} --plaintext-file=${private_key} --ciphertext-file - | base64 -w 0


echo -e "\Encrypting private key passphrase using GCP KMS:"
echo -e "\n\nEncrypted private key passphrase:"

echo -n "${passphrase}" | gcloud kms encrypt --project ${kms_project} --location ${kms_location} --keyring ${kms_keyring} --key ${kms_key} --plaintext-file - --ciphertext-file - | base64 -w 0


echo -e "\n\nDONE\n"




GZip compression and decompression on the fly with callback message consumer

import org.junit.Test;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import static org.junit.Assert.assertEquals;

public class GZipDecompressionOnTheFlyTest {

    @Test
    public void testDecompression() throws IOException {
        // given
        List<String> receivedMessages = new ArrayList<>();

        String multiRootElementXml = "<ABC><D e=\"123\"/></ABC><ABC><D e=\"456\"/></ABC>";
        byte[] gzippedBytes = gzipCompress(multiRootElementXml);
        Consumer<String> messageReceiver = message -> receivedMessages.add("Received: " + message);

        // when
        try (InputStream is = new BufferedInputStream(new GZIPInputStream(new ByteArrayInputStream(gzippedBytes)));
             SplitInputStream fwdStream = new SplitInputStream(is, "<ABC", "</ABC>")) {
            byte[] splitBytes;
            while ((splitBytes = fwdStream.readFwdAsBytes()) != null) {
                String singleRootXml = new String(splitBytes);
                messageReceiver.accept(singleRootXml);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

        // then
        assertEquals(Arrays.asList(
                "Received: <ABC><D e=\"123\"/></ABC>",
                "Received: <ABC><D e=\"456\"/></ABC>"), receivedMessages);

    }

    private byte[] gzipCompress(String s) throws IOException {
        byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
        ByteArrayOutputStream byteStream = new ByteArrayOutputStream(bytes.length);
        try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteStream)) {
            gzipOutputStream.write(bytes);
        }
        return byteStream.toByteArray();
    }

    private static class SplitInputStream implements AutoCloseable {

        byte[] startTag;

        byte[] endTag;

        private InputStream is;

        private static final int MAX_BYTES_READ = 1024 * 1024;

        public SplitInputStream(InputStream is, String startTag, String endTag) {
            this.is = is;
            this.startTag = startTag.getBytes(StandardCharsets.UTF_8);
            this.endTag = endTag.getBytes(StandardCharsets.UTF_8);
        }

        public byte[] readFwdAsBytes() throws IOException {
            ByteArrayOutputStream out = new ByteArrayOutputStream(1024 * 128);
            if (readUntilMatch(startTag, out, true)) {
                out.write(startTag);
                if (readUntilMatch(endTag, out, false)) {
                    return out.toByteArray();
                }
            }
            return null;
        }

        private boolean readUntilMatch(byte[] needle, ByteArrayOutputStream dataRead, boolean skipData) throws IOException {
            int i = 0;
            int pos = -1;
            while (true) {
                pos++;
                int b = is.read();
                // end of file:
                if (b == -1) {
                    return false;
                }

                // skip or save to buffer:
                if (!skipData) {
                    dataRead.write(b);
                }

                // check if we're matching:
                if (b == needle[i]) {
                    i++;
                    if (i >= needle.length) {
                        return true;
                    }
                } else {
                    i = 0;
                }

                // TODO: verify if condition: pos >= MAX_BYTES_READ is necessary
                // see if we've passed the stop point:
                if (skipData && i == 0 && pos >= MAX_BYTES_READ) {
                    return false;
                }
            }
        }

        @Override
        public void close() throws Exception {
            is.close();
        }
    }
}

java streams basics

1. Filter out nulls

Be default null are not filtered:

        List<String> withExplicitNulls = Stream.of("a", null, "c", null)
                .peek(s -> System.out.println("s=" + s)) // execute lambda and return input
                .collect(Collectors.toList());
        System.out.println(withExplicitNulls); // [a, null, c, null]

and method returning null in lambda:

        List<String> withNulls = Stream.of("a", "b", "c", "d").map(TestStream::toRandomNullableUpperString).collect(Collectors.toList());
        System.out.println(withNulls); // [null, B, null, D]

    private static String toRandomNullableUpperString(String s) {
        return new Random().nextBoolean() ? s.toUpperCase() : null;
    }

Remove nulls by Objects.nonNull

        List<String> withoutNulls = Stream.of("a", "b", "c", "d")
                .map(TestStream::toRandomUpperString)
                .filter(Objects::nonNull)
                .collect(Collectors.toList());
        System.out.println(withoutNulls); // [C, D]

or remove nulls by flatMap and wrap with Optional.ofNullable().stream()

        List<String> withExplicitNulls = Stream.of("a", null, "c", null)
                .flatMap(s -> {
                    System.out.println("s=" + s); // s=a, s=null, s=c, s=null
                    return Optional.ofNullable(s).stream();
                }) // execute lambda and return input
                .collect(Collectors.toList());
        System.out.println(withExplicitNulls); // [a, c]

2. FlatMap stream of string lists to string list

        List<String> collect =
                Stream.of(
                        Arrays.asList("a", "aa"),
                        Arrays.asList("b", "bb")
                )
                .flatMap(Collection::stream)
                .collect(Collectors.toList());
        System.out.println(collect); 
[a, aa, b, bb]

3. Nullable string array to string list:

        String[] nullableArray = new Random().nextBoolean() ?  new String[] { "a", "b"} : null;
        List<String> strings = Optional.ofNullable(nullableArray).map(Arrays::asList).orElse(Collections.emptyList());
        System.out.println(strings);
[]
// or
[a, b]

4. FlatMap method returning optional in lambda:

    public static void main(String[] args) {
        // java 9
        //Stream<Integer> integerStream = Stream.of("a", "1", "b", "2").map(TestStream::toNumber).flatMap(Optional::stream);

        // java 8
        Stream<Integer> integerStream = Stream.of("a", "1", "b", "2").map(TestStream::toNumber).flatMap(o -> o.isPresent() ? Stream.of(o.get()) : Stream.empty());

        System.out.println(integerStream.collect(Collectors.toList()));
    }

    private static Optional<Integer> toNumber(String s) {
        try {
            return Optional.of(Integer.parseInt(s));
        } catch (NumberFormatException e) {
            return Optional.empty();
        }
    }
[1, 2]