Running Data Pipelines Locally Using Containerized Ceph S3, Kafka & NiFI

As the data world becomes more and more similar to the development with data sets being used as git repositories, the evolvement of treating data as it was a product and having a complete MDLC (Machine Learning Development Lifecycle), Data Engineers might want to test their data pipelines locally, on a small scale to see the nothing breaks, versions are completely aligned and that those data pipelines can be pushed to production.

With the help of the open-source world and containerization, Data Engineers are able to test pipelines locally, on their own computers, using containers — and later integrate them with the work of other data engineers.

In this demo, I’d like to share with you how you can spin up such a stack, that includes Red Hat Ceph Storage for S3 capabilities, NiFI for data flow & transformation, and Kafka as the event streaming engine, In order to create a data pipeline that transforms a given CSV file to Avro format, and sends it to Kafka.

A sample of the data set (30 rows total):

YearsExperience,Salary
1.1,39343.00
1.3,46205.00
1.5,37731.00
.
.
.

Let's get started!

Prerequisites

  • A computer running Podman/Docker
  • In the case of using Podman, Install the podman-docker package

Setting Up The Infrastructure

To run this demo, We’ll have to run three different containerized components, which are Red Hat Ceph Storage, Kafka and NiFi.

Before diving into each on of the components, let’s create a network so that all containers will be able to share subnets, DNS records, etc:

$ docker network create data-pipeline

For the sake of this demo, we won’t run those infrastructure services in their full form but will use single-node containers, as it is more lightweight and less overkill to use and implement locally.

Running A Single-Node Ceph Cluster For S3

In order to use S3 in our demo, we'll need to run a single-node Red Hat Ceph Storage implementation, that will expose the Ceph RGW S3 interface to other containers in the shared network.

Let’s first create local directories that will be used for the Ceph container to preserve configurations:

$ mkdir -p /data/etc/ceph/ 
$ mkdir -p /data/var/lib/ceph/

Now let’s run the Ceph cluster itself. This command creates a Ceph cluster, with hardened credentials (access-key and secret-key, both are nifi).

$ docker run -d --privileged --name ceph --net data-pipeline -e NETWORK_AUTO_DETECT=4 -v /data/var/lib/ceph:/var/lib/ceph:rw -v /data/etc/ceph:/etc/ceph:rw -e CEPH_DEMO_UID=nifi -e CEPH_DEMO_ACCESS_KEY=nifi -e CEPH_DEMO_SECRET_KEY=nifi -p 8080:8080 registry.redhat.io/rhceph-alpha/rhceph-5-rhel8@sha256:9aaea414e2c263216f3cdcb7a096f57c3adf6125ec9f4b0f5f65fa8c43987155 demo

Note: Make sure you have valid credentials to pull the Red Hat Ceph Storage container image. If you don't have it, you can use upstream Ceph images coming from the Ceph/Daemon repository.

Now, let’s install the awscli tool to interact with our S3 service:

$ pip3 install awscli

Now let’s configure our S3 client to use our created credentials:

aws configure
AWS Access Key ID [****************nifi]:
AWS Secret Access Key [****************nifi]:
Default region name [None]:
Default output format [None]:

Make sure you insert nifi to both AWS Access Key ID and AWS Secret Access Key. Press Enter continuously for completion.

Let’s create our S3 bucket that will be used by NiFi:

$ aws s3 mb s3://nifi --endpoint-url http://127.0.0.1:8080make_bucket: nifi

And now let’s upload the CSV file that is part of this git repo folder:

$ aws s3 cp paychecks.csv s3://nifi/ --endpoint-url http://127.0.0.1:8080upload: ./paychecks.csv to s3://nifi/paychecks.csv

Running A Single-Node Kafka Cluster

Now, we’ll run both Zookeeper and Broker as single-node containers.

Let’s start with Zookeeper:

$ docker run -d --name zookeeper --net data-pipeline -e ZOOKEEPER_CLIENT_PORT=2181 -e ZOOKEEPER_TICK_TIME=2000 -p 22181:2181 confluentinc/cp-zookeeper:latest

Now that we have it running, let’s spin up our Kafka broker:

$ docker run -d --name kafka --net data-pipeline -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -p 29092:29092 confluentinc/cp-kafka:latest

Great! We have our cluster up and running, now let’s create a topic that NiFi will use to write the transformed Avro file:

$ docker exec -it kafka kafka-topics --create --topic data-pipeline --replication-factor 1 --partitions 1 -bootstrap-server kafka:9092Created topic data-pipeline.

Running A Single-Node NiFi

To run our NiFi instance, we'll use the following command:

docker run -d --net data-pipeline --name nifi -d -p 8443:8443 apache/nifi:latest

Make sure that you see NiFi's login screen once the container is up (it takes a few seconds).

In order to log in to NiFi, search for the following in the logs:

docker logs nifi | grep -i generatedGenerated Username [xxxxxx]
Generated Password [xxxxxx]

Login to NiFi and see the following screen:

Configuring The Data Pipeline

In order to run our data pipeline, I’ve created a NiFi Template that was pre-created for you to use. You'll have minimal changes and then you'll be able to run your own pipeline locally.

Let’s start by uploading our template, press right-click then Upload Template:

Choose the CSV_--__Avro_Transformer.xml template that is being attached to the git repository and upload it to NiFi.

Now, let’s import our template and start dealing with the configuration:

you’ll see the following screen when importing it:

Next step, we’ll have to configure the created controllers for our data flow. To do so, make sure to right-click and then hit Configure:

Once it was opened, make sure to enable all controllers for our flow (the lightning button on the right of each controller):

Once we have all controllers enabled, we’ll have to change our S3 Endpoint URL, Access Key and Secret Key as NiFi doesn't save those as part of the template configuration.

To do so, press twice on both (one each time) ListS3 component, so as the FetchS3Object and navigate to the Properties tab:

Change both the access key and secret key to nifias was created in previous stages, and change the Endpoint Override URL to your Ceph's container IP address.

In order to find out what it is, you can use the following command:

$ docker inspect ceph | grep IPAddress
"IPAddress": "",
"IPAddress": "10.89.3.5",

Once you have changed those configs for both ListS3 and FetchS3Object processors, we can start our flow!

Running Our Data Flow

In order to run our Flow, make sure to hit right-click and press the Start button:

The flow will start reading data from our Ceph cluster, transform it and write it to our Kafka cluster. Verify that the flow has finished successfully:

We can see that there is a 424 Bytes file that has moved through all queues and reached our Kafka cluster.

Validating Our Transformation

Now let’s verify that data is indeed in our Kafka cluster, and that it was transformed to Avro format. To do so, we'll use kcat tool to read our Kafka cluster's content:

kcat -b localhost:29092 -t data-pipeline

And the result is:

Great! we have the transformed data in our Kafka topic.

Testing Our Flow’s Consistency

Let’s upload our second CSV file, which is called paychecks2.csv to our bucket.

As our flow should run every 60 seconds searching for new data, we should be able to see it after 60 seconds, in our Kafka topic.

Let’s upload the second file to our S3 bucket:

$ aws s3 cp paychecks2.csv s3://nifi/ --endpoint-url http://127.0.0.1:8080upload: ./paychecks2.csv to s3://nifi/paychecks2.csv

And wait 60 seconds until we see data in our Kafka topic:

As you can see, now the message we get from kcat is Reached end of topic at offset 60, which means another flow was run automatically.

Conclusion

As you can see, you can use containers locally on your computer to run automated data pipelines. This can make the life of a data engineer very easy for functional testing.

Hope you’ve enjoyed this demo, See ya next time :)

--

--

--

Sr. Solution Architect, Red Hat

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Why you should not use Lombok

OpenBOM: Parts and BOMs — what’s new?

Generating Google API key and Billing

OpenEBS Local PV Hostpath on Kubernetes

Intermediate: How to fetch Remote Configuration from Huawei AGC in Unity

Reduce Cost and Increase Productivity with Value Added IT Services from buzinessware — {link} -

Simple Spring Boot Service to Kubernetes Application: Step 17

Roman Numeral to Integer: How to solve using a HashMap

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Shon Paz

Shon Paz

Sr. Solution Architect, Red Hat

More from Medium

Deploying Apache Airflow on Kubernetes for local development

Real-time CDC streaming using Debezium, MySQL, and Redpanda

The redpanda and debezium logos

Program & Containerize Your Kafka Producer/Consumer In Minutes

Benchmark Our Redis Cluster