Clustering in Kettle

This article introduces clustering concepts supported by Kettle a.k.a. PDI. If you need to replicate data to several physical databases, or would like to learn about scale-out options for record processing, this article may be for you. As usual, the downloads section has the demo transformations for this article.

Clustering storage

Suppose a data mart is populated by incremental loads. The data mart is supposed to be physically replicated in different data centers: Europe, North America and Asia. The applications accessing the data mart are configured to use any of the three database locations, preferring geographically closer locations for lower latency. Assuming that the database in question cannot provide this type of clustering transparently, the ETL process needs to consider all locations when loading the data mart.

Kettle’s support for clustering storage

If a DB system does not support clustering out of the box, the data mart loading process may implement clustering manually, loading records into three different databases at once.

IT is of course possible to define three different connections manually, and use three different steps for each db write operation everywhere in the process, but there is a more convenient way of achieving this.

The clustering section on the db connection dialog allows to define the databases kettle should be replicating to.

To make sure Kettle is replicating all statements to all databases of the cluster, the DB steps need to be partitioned with a partitioning schema that reflects the database partition ids, while using the “Mirror to all partitions” partitioner.

Please note that Spoon will also replicate any DDL (Create TABLE, INDEX etc.) that it generated using SQL buttons to all databases in a cluster.

The demo transformation for this article has a clustered connection with three databases. The database definitions differ only in db name, and they point to databases on a local mysql instance. Some demo records are loaded into all three of them.

Clustering to scale out data processing

Kettle also supports clustering to scale out when processing large amounts of rows. Multiple Carte servers make up a processing cluster in this scenario. Transformations executed on a processing cluster designate (sequences of) steps to be executed in parallel on all cluster slaves.

Carte documentation

A Carte server is a lightweight HTTP server that is used to execute jobs and transformations remotely. Details are found on the Carte wiki page.

Kettle clustering terminology

I’d like to introduce a bit of terminology before moving on to explain the concepts related to processing clusters in Kettle. I will use the following terms with a clearly defined meaning. It’s easy to get lost in carte/server/master/slave lingo otherwise.

Term Definition
slave server A running instance of Carte.
processing cluster A collection of slave servers. Note: One and only one of the processing cluster’s slave servers needs to assume the role of a “master”. The others are assuming the “slave” role.
cluster master The slave server defined to act as “master” on a processing cluster.
cluster slave A slave server defined to act as “slave” on a processing cluster.
cluster schema Definition of a processing cluster in Spoon.

With the definitions out of the way, the following sections explain how to construct and execute a transformation on a processing cluster.

Setting up a clustering environment in Spoon

Setting up a processing cluster in Spoon is a two step process. The slave servers need to be defined first. Once they are in place, a cluster schema is created to group them together defining a proper processing cluster. Slave servers are often used on their own and without being part of any processing cluster. It’s perfectly possible (and common) to have a couple of them defined and using them for remote execution individually. It is therefore necessary to select a few designated slave servers in what is called a cluster schema to form a processing cluster.

Before defining the slave servers in Spoon, I’d like to have them actually running, so I can verify the definitions. Starting up three slave servers is just a matter of changing into the kettle directory and starting them using carte.sh (or Carte.bat on windows).

./carte.sh localhost 8081

./carte.sh localhost 8082

./carte.sh localhost 8083

Each command will start a carte server, and bind it on the given network interface (localhost in this case) and the specified port.

It is easy to verify that the instances are running by pointing a browser at them. The following URL’s can be used to connect carte instances started above.

http://localhost:8081

http://localhost:8082

http://localhost:8083

The default username and password for Carte is cluster/cluster. Security configuration for Carte is documented on the Carte wiki page. After logging in, Carte should be displaying a list of jobs and transformations registered with it. For a pristine Carte server, there will be only the “row generator test” transformation. It is safe to execute (or delete) the test transformation.

Assuming all slave servers are up and running, the next step is to define them in Spoon. For this tutorial, I’d like to set the slave server on 8081 to “master”. My slave servers will be called “master”, “slave a” and “slave b”.

I always check that the definitions are valid, by selecting “Monitor” from the context menu of a slave server definition. Spoon will open up a perspective that shows the registered jobs and transformations on the selected slave server together with their status. If there are errors popping up instead, the slave server is not running or the server definition is incorrect.

After all slaves are defined, they need to be grouped together in a cluster schema to form a processing cluster.

A processing cluster is going to need a range of TCP ports for communication between the slave servers. The port setting in the schema indicates the first port the processing cluster is going to use. Ports with increasing port numbers are going to be used as required.

The sockets buffer size and flush interval can be used to tune the passing of rows across the network. The standard settings are reasonable defaults that should only be changed if you suspect network related issues with the processing cluster. Check the “Sockets data compressed?” box if you’d like to compress/decompress rows that are passed between server slaves. Whether the gain in network throughput is higher than the cost of compressing/decompressing depends on a couple of factors including CPU usage, network latency, and average compression rate for your particular sets of data.

The dynamic cluster feature is useful if the number and location of slave servers available is variable, as is often the case in cloud deployments. Check Pentaho Kettle solutions for a tutorial on how to set up clustering on the Amazon cloud.

Slave server and cluster schema definitions can be shared, just like connections, so it is not necessary to keep re-defining them for each transformation that uses clustering.

With a cluster schema in place, it is time to execute a transformation on the processing cluster.

The example problem

To illustrate clustering features, I’d like to start from a simple transformation. The initial transformation processes a stream of clicks. The data source is going to be a CSV file. Each record represents a link click and specifies a name of the page the link is pointing to. The task of the transformation is to determine the click count for each page, and output the click counts to a new file, sorted by page name. The following transformation is a solution to the task.

When running the transformation locally, or on a single slave server (execute remotely), it does the job and creates an output file as expected.

Preparing for running on a cluster

When running transformations on a cluster, Kettle implements a “master”/”slave” strategy. Each step of the transformation needs to be configured to run on the cluster master only (the default) or to be replicated to all cluster slaves and run in parallel. To mark a step for parallel execution on the cluster slaves, it’s necessary to apply the cluster schema to it. Select “Clusterings…” from a step’s context menu to apply the cluster schema to the step.

For the example, I chose to cluster the group by step only. Spoon indicates that the step is clustered (by displaying a C) and the number of nodes this step will be replicated to (x2).

After that it is as easy as selecting “Execute Clustered” from the launch dialog. The “Post”, “Prepare” and “Start” options should be turned on (they are on by default).

When looking at the resulting output file however, it seems that something went wrong. There are multiple entries per page name in the output file. There should only be one, giving the click count for that page.

This anomaly is not unlike the one seen in the partitioning article. Each cluster slave execlutes its own independent copy of a “group by” step, and since records get distributed to the steps in a round robin fashion by default, each copy may see clicks for any given page, summing them up. If a page is seen by both steps, two summary rows are generated for it.

To understand the problem entirely, it’s useful to learn about how exactly Kettle implements the parallel execution on a processing cluster.

The anatomy of a clustered transformation

Before looking at the details, a look into the log output of each slave server (master, slave a and slave b) may shed some light on what is going on. This is what it was for me:

Master log


INFO  11-10 18:46:51,793 - clustered_group_by (master) - Dispatching started for transformation [clustered_group_by (master)]
INFO  11-10 18:46:51,846 - Read Input - Opening file: /Users/slawo/Desktop/clustering_files/input.txt
INFO  11-10 18:46:51,854 - Read Input - Finished processing (I=85, O=0, R=0, W=84, U=1, E=0)
INFO  11-10 18:46:51,861 - Read Input - Server socket accepted for port [40001], reading from server slave b
INFO  11-10 18:46:51,869 - Read Input - Server socket accepted for port [40000], reading from server slave a
INFO  11-10 18:46:52,185 - Sort rows - Finished processing (I=24, O=0, R=0, W=24, U=0, E=0)
INFO  11-10 18:46:52,189 - Text file output - Finished processing (I=0, O=25, R=24, W=24, U=0, E=0)

Slave a log

INFO  11-10 18:46:51,828 - clustered_group_by (local processing cluster:slave a) - Dispatching started for transformation [clustered_group_by (local processing cluster:slave a)]
INFO  11-10 18:46:51,864 - Memory Group by - Server socket accepted for port [40002], reading from server master
INFO  11-10 18:46:52,029 - Memory Group by - Finished processing (I=42, O=0, R=1, W=12, U=0, E=0)

Slave b log

INFO  11-10 18:46:51,808 - clustered_group_by (local processing cluster:slave b) - Dispatching started for transformation [clustered_group_by (local processing cluster:slave b)]
INFO  11-10 18:46:51,853 - Memory Group by - Server socket accepted for port [40003], reading from server master
INFO  11-10 18:46:52,020 - Memory Group by - Finished processing (I=42, O=0, R=1, W=12, U=0, E=0)

Reading through the cluster master log it seems as if the the master started the transformation and the “Read Input” step established communication with slave a and slave b. Since the “Read Input” step is the non-clustered step preceding the clustered “group by” step, it needs to send its rows to the “group by” steps on the slaves over the network.

The log output of the cluster slaves indicates that the “group by” steps on each slave established communication back to the master. They need to send their rows to the “sort rows” step, which is executing on the master.

The big picture seems to be that the cluster master executes the main transformation, but each clustered step goes to all cluster slaves instead. Kettle establishes TCP connections between individual steps that need to communicate on the master and slave to ensure correct data flow. In fact, that’s exactly what happens. Kettle indeed generates intermediate transformations before running on a cluster. The master transformation executed on the master and slave transformations that run on the cluster slaves. It is possible to view these transformations in Spoon by selecting “Show transformations” from the launch dialog. If the other three options (“Post”, “Prepare”, “Start”) are deselected, Kettle will just generate the intermediate transformations and open them in Spoon. For any non-trivial transformation it is a good idea to check the output to verify that it was indeed generated as expected.

The following transformations are generated for the example transformation:

master

The arrows on the steps represent network communications between the master and the slaves. The small number indicates how many steps rows are sent to or read from. In this case “Read Input” sends its output to two steps (one “group by” on each slave), and “Sort Rows” is receiving rows from two steps (again one “group by” on each slave).

slave a

slave b

The slave transformations are identical. They include only the “group by” step that communicates with the steps in the master transformation to receive and pass on rows.

Partitioning data to cluster slaves

The problem with this clustered setup is that each “group by” step works independently and may see any page name, which may result in two summary rows for any page. That’s exactly the same problem that occurrs when launching multiple copies of the step. It is solved by configuring appropriate partitioning. To solve the issue at hand, I define a partitioning schema, and apply it to the “group by” step. The partitioning schema will be dynamic in this case, which means to partition as many ways as there are cluster slaves a step runs on.

With the “group by” step partitioned by the page field (division remainder), the results are what they should be. Each “group by” step receives a consistent set of page names now, thus eliminating the duplicate summary rows. Here’s the final transformation.

Bug Alert

Please note that a PDI-6836 prevents the master transformation from receiving data from a step that has a partitioning rule applied to it. I therefore attached a dummy step and clustered it together with the “group by”. If that is not in place, a NullPointerException will appear in the cluster master log and the transformation fails.

The final transformation is well behaved and scales to any number of cluster slaves. It illustrates the general principles behind clustering Kettle transformations, but it does not show the full pallette of possibilities. The following sections cover additional information that helps getting the most out of a clustered environment.

Effective clustering

To make good use of a clutered environment, it’s good to know a few techniques that guide clustered transformation design. The following sections cover a few common practices. Samples are included in the downloads section.

Distributed sorting

When sorted output is required, it is often desirable to scale-out the sorting operation to the cluster slaves. Each cluster slave would sort the rows it sees, but there is the problem of merging the sorted row streams from each cluster slave to a single sorted stream on the master. The sorted output from each slave can be merged very effectively if the master collects the streams into a “Sorted Merge” step. The “Sorted Merge” step is designed to accept sorted row streams from any number of previous steps, and making sure to preserve order when merging them together.

Reading source data on cluster slaves

In the example transformations so far the master was responsible for reading the data from the source, distributing the rows to the cluster slaves at some point. In many cases the source is either a file or a DB accessible by all slave servers. So if there is no compelling reason to have the master act as a single source reader, it is perfectly possible, and often also faster, to let the cluster slaves read the source data directly. Each cluster slave typically needs a subset of the source data.

Reading source data on cluster slaves from text files

If the data source is a text file, each slave may read a subset of the data from the file. This is supported by the “CSV Input” and “Fixed file Input” steps directly. Just cluster the input step and check the “Running in parllel?” option. Each cluster slave will execute its own copy of the input step, but read only a portion of the data. The following transformation sorts the rows in the input file, and writes them to an output file. Each cluster slave is reading only a portion of the input file.

Read source data on cluster slaves from DB

If the data source is a database, the table input step should be clustered and select only a subset of the data. This is typically done by using Kettle variables to construct a suitable query. Each clustered step has a couple of Kettle variables it can use to determine the total number of cluster slaves involved in the run as well as its own number (starting with 0). The variables are as follows:

Variable Value
Internal.Cluster.Size Total number of cluster slaves in processing cluster
Internal.Slave.Transformation.Number Number of current cluster slave (starting with 0)

So for the example processing cluster (1 master, 2 cluster slaves) Internal.Cluster.Size will be 2, while Internal.Slave.Transformation.Number will be 0 on slave a and 1 on slave b.

Using the standard techniques to generate dynamic queries, it is now possible to let each slave select only a portion of the source data.

Consider the following SQL query:

SELECT sale_id, sale_date, sale_customer 
FROM sales 
WHERE sale_id % ${Internal.Cluster.Size} = ${Internal.Slave.Transformation.Number}

The field sale_id is used to do the partitioning here. The remainder of dividing sale_id by 2 will either be 0 (for even ids) or 1 (for odd ids). Consequently the slave with id 0 will see only sales with even ids, while the one with id 1 will only see sales with odd ids. This simple mod partitioning scales to any number of cluster slaves. If the processing cluster had 20 cluster slaves, the same query would partition the rows to each of them depending on the remainder of the division by 20, so the data would be read by slaves 0-19 respectively. Conveniently, when running the transformation locally Internal.Cluster.Size is 1 and Internal.Slave.Transformation.Number is 0, thus allowing the query from above to succeed in this case as well.

Create long “swim lanes” for your data

When designing steps in a clustered transformation, try to arrange them in such a way as to get long lines of clustered steps. This avoids passing rows between the master and slaves. A long sequence of clustered steps is sometimes called a “swim lane”, as it allows to completely parallelize processing. Each cluster slave is working on its own “lane”. In fact, if the ETL task allows for it, consider implementing a totally parallel transformation that has all of its steps clustered. The sections above explain how to read source data in parallel. If there’s no need to combine the row stream on the master later on, don’t. The following sample transformation reads sales from a database, does some tax calculations on each record and updates the record in the DB. There is no need to pass rows to the master in this case.

Distribute or Copy?

There is a common pitfall when designing clustered transformations. A step on the master that needs to pass rows to steps on cluster slaves will distribute the rows among all cluster slaves by default. This is typically the desired mode of operation. The following transformation clusters a simple data stream, looking up a category field provided by the master in a data grid step.

While this looks innocent enough, it has a flaw. The data grid will have to pass its rows to two lookup steps. And since the default mode of operation is to distribute them, each lookup step will only see half of the data grid’s content.

When looking at the generated master transformation the problem becomes clear. Please note on this occasion that Kettle implements clustered info steps (steps that are actively read from by other steps) by explicitly putting writer and reader steps in place.

The data grid should be switched from distributing to copying its rows to fix the issue. This is set using the regular “Data movement” options accessible from the context menu of each step.

In situations like this, I often prefer to include the lookup data source on the cluster slaves too. This gets rid of additional row passing over the network, and simplifies the solution.

With all steps involved in the lookup replicated to all cluster slaves, each cluster slave is independent from the master when performing lookups, and the overhead of passing rows between master and slaves is reduced.

Bug Alert

If your Kettle version is affected by PDI-6837, clustering the data source step may be the only option anyway :)

Related Literature

“Kettle Solutions” contains good information on clustering, partitioning and the use of carte. It also has a nice tutorial on setting up a dynamic processing cluster in the cloud. If you are thinking about using Kettle for serious scale-out, be sure to have this book on your reading shelf.


Downloads

The download package contains several transformations illustrating the use cases discussed in this article. Enjoy :)

Comments and corrections are welcome!
Slawo

5 comments to Clustering in Kettle

  • sisibeibei

    Helpful!

  • Really interesting stuff. I was wondering what happens if you use clustering with a bulk loader. If the PDI job runs on one machine, is the FIFO pipe transmitted across the network to get to the non-local databases? Do you think there would be a performance advantage in this case in the use of the bulk loader or would the network traffic determine a bottleneck?

  • Slawomir Chodnicki

    Giovanni, the FIFO pipe usually exists between PDI input and a db-specific command line client tool. Each slave would have their own pipe and their own instance of the client tool running, and all of them would try to bulk-load into the same table. Assuming the db is not local to any of the slaves, each slave sends data over the network, which may be the first bottleneck.

    Another (and often more relevant) aspect is whether multiple instances of the db’s bulk load utility can write into the same table simultaneously. I would expect that most databases can not bulk-load concurrently into the same table. The point of bulk loading has usually been to get “one” big chunk of data into the db as quickly as possible.

    Best
    Slawo

  • Slawo,
    thanks for the reply. I don’t think I will need to run multiple instances of the bulk load utility, at least not on the same table. I was just wondering if it makes more sense to run the whole ETL job on separate machines or in a clustered way in this scenario. I guess I will have to test my connection.
    Thanks!

    Giovanni

Leave a Reply

 

 

 

You can use these HTML tags

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>