Running Data Pipelines Locally Using Containerized Ceph S3, Kafka & NiFI
As the data world becomes more and more similar to the development with data sets being used as git repositories, the evolvement of treating data as it was a product and having a complete MDLC (Machine Learning Development Lifecycle), Data Engineers might want to test their data pipelines locally, on a small scale to see the nothing breaks, versions are completely aligned and that those data pipelines can be pushed to production.
With the help of the open-source world and containerization, Data Engineers are able to test pipelines locally, on their own computers, using containers — and later integrate them with the work of other data engineers.
In this demo, I’d like to share with you how you can spin up such a stack, that includes
Red Hat Ceph Storage for S3 capabilities,
NiFI for data flow & transformation, and
Kafka as the event streaming engine, In order to create a data pipeline that transforms a given CSV file to Avro format, and sends it to Kafka.
A sample of the data set (30 rows total):
Let's get started!
- A computer running Podman/Docker
- In the case of using
Podman, Install the
Setting Up The Infrastructure
To run this demo, We’ll have to run three different containerized components, which are
Red Hat Ceph Storage,
Before diving into each on of the components, let’s create a network so that all containers will be able to share subnets, DNS records, etc:
$ docker network create data-pipeline
For the sake of this demo, we won’t run those infrastructure services in their full form but will use single-node containers, as it is more lightweight and less overkill to use and implement locally.
Running A Single-Node Ceph Cluster For S3
In order to use
S3 in our demo, we'll need to run a single-node
Red Hat Ceph Storage implementation, that will expose the
Ceph RGW S3 interface to other containers in the shared network.
Let’s first create local directories that will be used for the
Ceph container to preserve configurations:
$ mkdir -p /data/etc/ceph/
$ mkdir -p /data/var/lib/ceph/
Now let’s run the
Ceph cluster itself. This command creates a
Ceph cluster, with hardened credentials (access-key and secret-key, both are
$ docker run -d --privileged --name ceph --net data-pipeline -e NETWORK_AUTO_DETECT=4 -v /data/var/lib/ceph:/var/lib/ceph:rw -v /data/etc/ceph:/etc/ceph:rw -e CEPH_DEMO_UID=nifi -e CEPH_DEMO_ACCESS_KEY=nifi -e CEPH_DEMO_SECRET_KEY=nifi -p 8080:8080 registry.redhat.io/rhceph-alpha/rhceph-5-rhel8@sha256:9aaea414e2c263216f3cdcb7a096f57c3adf6125ec9f4b0f5f65fa8c43987155 demo
Note: Make sure you have valid credentials to pull the
Red Hat Ceph Storage container image. If you don't have it, you can use upstream
Ceph images coming from the
Now, let’s install the
awscli tool to interact with our
$ pip3 install awscli
Now let’s configure our
S3 client to use our created credentials:
AWS Access Key ID [****************nifi]:
AWS Secret Access Key [****************nifi]:
Default region name [None]:
Default output format [None]:
Make sure you insert
nifi to both
AWS Access Key ID and
AWS Secret Access Key. Press
Enter continuously for completion.
Let’s create our
S3 bucket that will be used by
$ aws s3 mb s3://nifi --endpoint-url http://127.0.0.1:8080make_bucket: nifi
And now let’s upload the CSV file that is part of this git repo folder:
$ aws s3 cp paychecks.csv s3://nifi/ --endpoint-url http://127.0.0.1:8080upload: ./paychecks.csv to s3://nifi/paychecks.csv
Running A Single-Node Kafka Cluster
Now, we’ll run both
Broker as single-node containers.
Let’s start with
$ docker run -d --name zookeeper --net data-pipeline -e ZOOKEEPER_CLIENT_PORT=2181 -e ZOOKEEPER_TICK_TIME=2000 -p 22181:2181 confluentinc/cp-zookeeper:latest
Now that we have it running, let’s spin up our
$ docker run -d --name kafka --net data-pipeline -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -p 29092:29092 confluentinc/cp-kafka:latest
Great! We have our cluster up and running, now let’s create a topic that
NiFi will use to write the transformed
$ docker exec -it kafka kafka-topics --create --topic data-pipeline --replication-factor 1 --partitions 1 -bootstrap-server kafka:9092Created topic data-pipeline.
Running A Single-Node NiFi
To run our
NiFi instance, we'll use the following command:
docker run -d --net data-pipeline --name nifi -d -p 8443:8443 apache/nifi:latest
Make sure that you see
NiFi's login screen once the container is up (it takes a few seconds).
In order to log in to
NiFi, search for the following in the logs:
docker logs nifi | grep -i generatedGenerated Username [xxxxxx]
Generated Password [xxxxxx]
NiFi and see the following screen:
Configuring The Data Pipeline
In order to run our data pipeline, I’ve created a
NiFi Template that was pre-created for you to use. You'll have minimal changes and then you'll be able to run your own pipeline locally.
Let’s start by uploading our template, press right-click then
CSV_--__Avro_Transformer.xml template that is being attached to the git repository and upload it to
Now, let’s import our template and start dealing with the configuration:
you’ll see the following screen when importing it:
Next step, we’ll have to configure the created controllers for our data flow. To do so, make sure to right-click and then hit
Once it was opened, make sure to enable all controllers for our flow (the lightning button on the right of each controller):
Once we have all controllers enabled, we’ll have to change our
S3 Endpoint URL,
Access Key and
Secret Key as
NiFi doesn't save those as part of the template configuration.
To do so, press twice on both (one each time)
ListS3 component, so as the
FetchS3Object and navigate to the
Change both the access key and secret key to
nifias was created in previous stages, and change the
Endpoint Override URL to your
Ceph's container IP address.
In order to find out what it is, you can use the following command:
$ docker inspect ceph | grep IPAddress
Once you have changed those configs for both
FetchS3Object processors, we can start our flow!
Running Our Data Flow
In order to run our Flow, make sure to hit right-click and press the
The flow will start reading data from our
Ceph cluster, transform it and write it to our
Kafka cluster. Verify that the flow has finished successfully:
We can see that there is a
424 Bytes file that has moved through all queues and reached our
Validating Our Transformation
Now let’s verify that data is indeed in our
Kafka cluster, and that it was transformed to
Avro format. To do so, we'll use
kcat tool to read our
Kafka cluster's content:
kcat -b localhost:29092 -t data-pipeline
And the result is:
Great! we have the transformed data in our
Testing Our Flow’s Consistency
Let’s upload our second CSV file, which is called
paychecks2.csv to our bucket.
As our flow should run every 60 seconds searching for new data, we should be able to see it after 60 seconds, in our
Let’s upload the second file to our
$ aws s3 cp paychecks2.csv s3://nifi/ --endpoint-url http://127.0.0.1:8080upload: ./paychecks2.csv to s3://nifi/paychecks2.csv
And wait 60 seconds until we see data in our
As you can see, now the message we get from
Reached end of topic at offset 60, which means another flow was run automatically.
As you can see, you can use containers locally on your computer to run automated data pipelines. This can make the life of a data engineer very easy for functional testing.
Hope you’ve enjoyed this demo, See ya next time :)