This article introduces partitioning concepts supported by Kettle a.k.a. PDI. If you need to partition records over several tables, or would like to learn about increasing the parallelism of your transformations, this article may be for you. The samples for this article show how to use both, table partitioning and partitioning row streams.
What is Partitioning?
Partitioning at its core is a simple idea. If you got a set of items (think: records) and you decide to distribute all the items from this set into distinct buckets (subsets), you’ve created a partition. More precisely: all the subsets you decided to create form a partition of the original set.
So partitioning just means putting items into distinct groups, making sure that each item is assigned a group, and no item is replicated into multiple groups.
Partitioning is used most frequently in scale-out scenarios regarding both storage and processing. The idea is to find some criterion by which to partition the data, so that all resulting groups are logically independent from each other. Here’s some examples.
Suppose there is an awful lot of records coming in, like a click stream generated by a major website. The business requirement is to analyze the data on a monthly basis, keeping 12 months of historical data. The DBA may decide that she wants to keep the click streams in 12 distinct and relatively small tables for performance reasons. As long as the analysis is done on a monthly basis, considering only one month at a time, it may make some sense having separate tables for the data. All queries operate on one subset of the total data only: a single month, and thus on only one of the tables.
Some DB systems provide partitioning features out of the box. If they do, partitioning is usually implemented at the level of storage configuration, leaving the logical model of the database unaffected. In these DB systems a table may be configured for partitioned storage, saving different portions of the table on different disks for example. At the query level the table remains a single entity, and the DB system optimizes queries against this table to physically access only the storage fragments relevant to the query. As an example “SELECT count(*) FROM table WHERE month=2″ would be processed by the optimizer to physically access only one partition, and not even bother to look at any of the others.
Kettle support for manually partitioning to different tables
If a DB system does not support partitioning out of the box, the table loading process may implement partitioning manually, loading records for different months into different tables for example. Kettle’s table output step supports partitioning rows to different tables. Just configure it to accept the table name from a field, and it will output the rows to the appropriate table. Alternatively you may want to use the “partition by day” feature (“partition by month” is available since 4.2 too), in which case you need to supply a date field. Kettle will try to insert the record into the table “prefix_yyyyMMdd”, “prefix” being the table name given in the configuration dialog.
Please observe the fact, that Kettle will never automatically create any of the tables it writes to. As a general rule, Kettle does not execute any DDL without user intervention and confirmation. If you want to make sure that all necessary tables exist, just create them in a separate transformation. The example solutions in this article contain a demo of how to create the tables on the fly using a separate transformation.
Partitioning during data processing
Partitioning is useful during data processing too. Especially in scenarios that try to increase parallelism. Suppose the machine running the ETL has 64 CPU cores, and it is running a transformation that processes a click stream. It is supposed to generate a click count per page name. A first solution to the task, may be this transformation:
The records are retrieved and grouped, calculating the record count for each page name. This setup certainly does the job, but on a 64 core machine it can be improved to take advantage of the additional processing power. As it stands, the transformation starts only two threads, one for each step (let’s ignore the dummy helper). So two CPU cores are under heavy load, and the rest is idle. To leverage the idle cores, multiple copies of the group by step could be started. However, when started with multiple copies of the “group by” step, the transformation starts giving bad results. When starting 3 copies, for example, all of a sudden it generates multiple rows per page name.
The problem is caused by each copy of the “group by” step operating independently. Each copy calculates the click count for each page, not knowing whether the other copies are seeing clicks for the same page.
This is where partitioning becomes a useful concept. If the rows were partitioned in such a way, that any given page name were consistently sent to the same copy of a “group by” step, the results would be consistent.
Kettle’s support for partitioning row streams
Kettle’s partitioning feature can do just that. It is a two step configuration process. First it’s necessary to configure a partition schema. I’ll use three partitions as an example. The names of the partitions can be anything you like. A partition schema is essentially capturing how many subsets are created to partition the data. In other words, it defines how many ways the row stream is split when using this schema.
Next the partitioning schema is applied to the “group by” step. By applying a partitioning schema to a step (right click the step, select “Partitioning”), a matching set of step copies is started automatically. So when applying a partitioning schema with three partitions, three copies of the step are launched. Kettle will ask for a partitioning method. The partitioning method defines the rule by which the rows are going to be distributed across the copies. The most common method to use is “remainder of division” and use a sensible field to partition on. The partition schema contains three partitions, thus to determine which step copy gets to see a record, the field’s value is divided by three. If the field is not an integer type, a hashcode is divided by three instead. Depending on the remainder of the division (0,1 or 2) the record is sent to the corresponding step copy.
The revised transformation has three “group by” steps operating concurrently. The partitioning rule sends records for each page name to a consistent step copy, so it is safe to group by page name and calculate the stats. Please note that there are no guarantees as to which page name goes to which step copy. What is ensured however, is that any page name encountered is consistently forwarded to the same step copy.
Partitioning the row stream is a particularly useful concept when used in conjunction with clustering. If you run a transformations on a cluster, you may also be interested in ensuring consistent partitioning of records across the cluster nodes. Stay tuned for another article on clustering
The sample files for this article contain samples for partitioning records to tables as well as partitioning the row stream.