The User Defined Java Class Step

Today I would like to talk about the “User Defined Java Class” a.k.a. UDJC step introduced in Kettle 4.0. This step is incredibly versatile. It allows you to put arbitrary processing code into the ETL without the penalty of a performance hit. This article shows how to use the step in different scenarios, explaining each of the step features using a short example. The sample transformations for this article are available in the downloads section.

How does the UDJC step work?

Technically, the user defined java class step works by extending the class org.pentaho.di.trans.steps.userdefinedjavaclass.TransformClassBase, which you should check out to get an Idea of what’s at your fingertips here. To have a look at the class you need to get the Kettle sources. Get a source package from sourceforge, or check out the source from SVN. Information about obtaining Kettle sources from SVN is available here.
The java code you use in the UDJC step is compiled during transformation runtime in the context of a class derived from TransformClassBase. The TransformClassBase class is a generic step plugin class with nice convenience methods added on top. In the custom code you are free to use and override inherited fields and methods as you see fit. You can declare additional fields as you like, and even use import statements at the beginning of your code. The following imports are done automatically for you:

import org.pentaho.di.trans.steps.userdefinedjavaclass.*;
import org.pentaho.di.trans.step.*;
import org.pentaho.di.core.row.*;
import org.pentaho.di.core.*;
import org.pentaho.di.core.exception.*;

Assuming you’re already somewhat familiar with Kettle internals and you’re wondering how to conveniently access one thing or another from your code, check out the code snippets section to the left. The samples are going to make your day :)

The following sections show how the UDJC step can be used in different scenarios.

A Simple Field Transformation

The first example is doing a trivial operation: just uppercasing a string field. Its purpose is to show how to set up the step for processing rows, and how to access input and output fields. If you’ve already been developing plugins for Kettle, this will look very familiar to you. Suppose the row stream contains a field named “testfield”, and the step defines a String output field named “uppercase”. The following code would uppercase the test field and write the result to the output field.

Show step code»

The processRow() function is called by Kettle to tell the step to try and process another input row. It is supposed to return true if the step is ready to process anther row, false if there is nothing more to do.

The getRow() function fetches the next row from any input steps. It is a blocking call. It waits for the previous step to provide a row, if necessary. It eventually returns an Object array representing the incoming row or null to indicate that there are no more rows to process.

What follows is a short (and useless looking) snippet involving the boolean field called “first”. It is a convenience flag provided by the base class to enable special processing when the first row is coming in, which may be useful if there’s some preparation you’d like to execute only once. Feel free to omit setting this to false if you’re not using it.

The call to createOutputRow() ensures that the row array is big enough to hold all output fields added by the step.

The get() method is a helper allowing name based access to input and output fields of the step. You need to specify the field type (In, Out, Info) and the name of the field to get an instance of org.pentaho.di.trans.steps.userdefinedjavaclass.FieldHelper, which allows convenient access to the field’s data.

After the output field is set on the row, a call to putRow() passes the row to possible next steps.

This short sample shows all you need to know to do fast custom computation on incoming fields. The sample transformation for this code is uppercase.ktr.

Using Step Parameters

Suppose you have a nice piece of code, and you’d like it to become more generic. Step parameters may be a useful tool in this context. As an example I’d like to provide a regular expression and a field name as parameters. The step should check whether the specified field matches the regex and output a 1 or 0 to a result field.

Show step code»
The getParameter() method provides access to the parameters defined in the UI. Please note that the step parameter values may contain Kettle variables. Putting variables into parameters is a great way of making variable usage explicit. It certainly beats manually searching the code, to find out which variables are used by the step.

The sample transformation for this is parameters.ktr.

Working with Info Steps

Sometimes it’s necessary to combine the input of multiple steps. Possibly assigning roles to them. A stream lookup step is a classic example. This is where info steps come into play: they are input steps that are explicitly read from. Their rows are not returned by calls to getRow(). It’s easy to utilize info steps on a user defined java class step. Just attach them to the step and define them as info steps in the UDJC step UI. Reading rows from the info steps is as easy as calling getRowsFrom().

The sample transformation uses an info step to receive a list of regular expressions. It tests a field from the main stream for a match. If any of the regular expressions matches, the result field gets a 1. If none match it’s a 0. An additional output field captures which regular expression matched.

Show step code»

The call tp findInfoRowSet() finds the row set to read from based on the info step name defined in the UDJC step UI. Reading from info row sets is no different from reading from the main input row set. You just need to specify the row set explicitly and call to getRowFrom().

The example transformation for using info steps is info_steps.ktr

Working with Target Steps

It is possible to direct rows to different target steps using the user defined java class step. In a regular case a call to putRow() takes care of passing on a row to the next step(s). Kettle takes care of the rest. Now if you’d like to direct rows to specific steps, you’d define all possible target steps and call putRowTo(), specifying the output row set explicitly instead. The following sample distributes rows randomly to two different target steps.

IMPORTANT NOTE: due to bug PDI-4712 (affects 4.0.0 and 4.0.1) you need to have at least as many info steps defined as you have target steps. Just define dummies to be info steps. They serve no purpose except ensuring that the step dialog does not blow up.

Show step code»

The method findTargetRowSet() finds the correct target row set by the name specified in the UDJC step UI. The returned row set can be written to by calling putRowTo().

The example transformation for this is target_steps.ktr

Error Handling

The UDJC step supports Kettle's error handling feature. To enable it, drag an outbound hop to the step that receives the error rows, then right click the UDJC step and select "Defined Error Handling". Now you can configure error step to receive the bad rows, and enter a few options and field names that hold extended error information. Diverting error rows from within the UDJC step is done by calling putError(), supplying additional information about the error(s) encountered. To demonstrate, the example transformation does a simple division. If the denominator is 0, the row is put to the error stream.

Show step code»

The demo transformation for error handling is error_handling.ktr

Accessing Database Connections

If the java step is supposed to do something with a database, you should be probably using Kettle's facilities for obtaining a database connection. The following example uses the Kettle database connection named "TestDB". The incoming rows have a "table_name" field. The step checks whether the table exists end writes the result to an output field.

If you're planning to do non-trivial work with your databases in a user defined java class you should probably become familiar with the java package org.pentaho.di.core.database. Check out the source of existing DB related steps for examples on how to use classes from the database package.

Show step code»

In this sample init() and dispose() methods of the step are overridden to create the database connection and to disconnect upon completion. The call to init() happens during transformation initialization time, before the first call to processRow(). The dispose() method is called once the transformation is finished. If there's any overarching initialization and clean up code in your steps, you may consider putting it into init() and dispose() respectively.

The example transformation for this is db_access.ktr


Implementing Input Steps

It is also possible to create a user defined java class that serves as an input step. In this case it is generating rows of its own instead of processing rows coming in from other steps. As an example I'd like to create a step that generates a row for each java system property.

Show step code»

In this code the step is not calling getRow() to get incoming rows, but initializes a list of properties on the first call to processRow(). The properties are written to the output stream one by one. Since there is no incoming row, the step creates one by calling RowDataUtil.allocateRowData() first. It then sets the field values and passes the row on to the next step.

The sample transformation for this is input_step.ktr

Downloads

Download the example transformations to follow along the samples: udjc_samples.zip
All transformations were created using Kettle 4.0. Enjoy :)

Conclusion

This article explains how the user defined java class step can be used in different roles and scenarios. If you find that you need custom processing but the JavaScript step is not giving you the necessary performance or flexibility, you may consider using the user defined java class step instead. Be sure to also check out the samples folder that comes with Kettle. There's a few nice samples for the user defined java class step.

Happy Coding :)

Slawo

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
    Object[] r = getRow();

    if (r == null) {
        setOutputDone();
        return false;
    }

    if (first){
        first = false;
    }

    r = createOutputRow(r, data.outputRowMeta.size());
    
    // Get the value from an input field
    String test_value = get(Fields.In, "testfield").getString(r);
    
    // play around with it
    String uppercase_value = test_value.toUpperCase();
    
    // Set a value in a new output field
    get(Fields.Out, "uppercase").setValue(r, uppercase_value);

    // Send the row on to the next step.
    putRow(data.outputRowMeta, r);

    return true;
}
Powered by Hackadelic Sliding Notes 1.6.5

import java.util.regex.Pattern;

private Pattern p = null;
private FieldHelper fieldToTest = null;
private FieldHelper outputField = null;

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
    Object[] r = getRow();

    if (r == null) {
        setOutputDone();
        return false;
    }
    
    // prepare regex and field helpers
    if (first){
        first = false;

        String regexString = getParameter("regex");
        p = Pattern.compile(regexString);
        
        fieldToTest = get(Fields.In, getParameter("test_field"));
        outputField = get(Fields.Out, "result");
    }

    r = createOutputRow(r, data.outputRowMeta.size());
    
    // Get the value from an input field
    String test_value = fieldToTest.getString(r);

    // test for match and write result
    if (p.matcher(test_value).matches()){
        outputField.setValue(r, Long.valueOf(1));
    }
    else{
        outputField.setValue(r, Long.valueOf(0));
    }

    // Send the row on to the next step.
    putRow(data.outputRowMeta, r);

    return true;
}
Powered by Hackadelic Sliding Notes 1.6.5

import java.util.regex.Pattern;
import java.util.*;

private FieldHelper resultField = null;
private FieldHelper matchField = null;
private FieldHelper outputField = null;
private FieldHelper inputField = null;
private ArrayList patterns = new ArrayList(20);
private ArrayList expressions = new ArrayList(20);

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
    Object[] r = getRow();

    if (r == null) {
        setOutputDone();
        return false;
    }
    
    // prepare regex and field helpers
    if (first){
        first = false;
        // get the input and output fields
        resultField = get(Fields.Out, "result");
        matchField = get(Fields.Out, "matched_by");
        inputField = get(Fields.In, "value");
        
        // get all rows from the info stream and compile the regex field to patterns
        FieldHelper regexField = get(Fields.Info, "regex");
        RowSet infoStream = findInfoRowSet("expressions");
        
        Object[] infoRow = null;
        while((infoRow = getRowFrom(infoStream)) != null){
            String regexString = regexField.getString(infoRow);
            expressions.add(regexString);
            patterns.add(Pattern.compile(regexString));
        }

    }

    // get the value of the field to check
    String value = inputField.getString(r);

    // check if any pattern matches
    int matchFound = 0;
    String matchExpression = null;
    for(int i=0;i<patterns.size();i++){
        if (((Pattern) patterns.get(i)).matcher(value).matches()){
            matchFound = 1;
            matchExpression = (String)expressions.get(i);
            break;
        }
    }

    // write result to stream
    r = createOutputRow(r, data.outputRowMeta.size());
    resultField.setValue(r, Long.valueOf(matchFound));
    matchField.setValue(r, matchExpression);

    // Send the row on to the next step.
    putRow(data.outputRowMeta, r);

    return true;
}
Powered by Hackadelic Sliding Notes 1.6.5

import java.util.regex.Pattern;
import java.util.*;

private RowSet lowProbStream = null;
private RowSet highProbStream = null;

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
	Object[] r = getRow();

	if (r == null) {
		setOutputDone();
		return false;
	}
	
	// prepare regex and field helpers
    if (first){
        first = false;
		lowProbStream = findTargetRowSet("low_probability");
		highProbStream = findTargetRowSet("high_probability");
    }

	// Send the row on to the next step. 
	if (Math.random() < 0.35){
	    putRowTo(data.outputRowMeta, r, lowProbStream);
	}
	else{
	    putRowTo(data.outputRowMeta, r, highProbStream);	
	}

	return true;
}
Powered by Hackadelic Sliding Notes 1.6.5

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
	Object[] r = getRow();

	if (r == null) {
		setOutputDone();
		return false;
	}

    if (first){
        first = false;
    }

    r = createOutputRow(r, data.outputRowMeta.size());
	
    // Get the value from an input field
    Long numerator = get(Fields.In, "numerator").getInteger(r);
    Long denominator = get(Fields.In, "denominator").getInteger(r);

	// avoid dividing by 0
	if (denominator == 0){
		// putErro is declared as follows:
		// public void putError(RowMetaInterface rowMeta, Object[] row, long nrErrors, String errorDescriptions, String fieldNames, String errorCodes)
		putError(data.outputRowMeta, r, 1, "Denominator must be different from 0", "denominator", "DIV_0");
		// get on with the next line
		return true;
	}

	long integer_division = numerator / denominator;
	long remainder = numerator % denominator;
	
	// write output fields
    get(Fields.Out, "integer_division").setValue(r, Long.valueOf(integer_division));
    get(Fields.Out, "remainder").setValue(r, Long.valueOf(remainder));

	// Send the row on to the next step.
    putRow(data.outputRowMeta, r);

	return true;
}
Powered by Hackadelic Sliding Notes 1.6.5

import org.pentaho.di.core.database.Database;
import java.util.List;
import java.util.Arrays;

private Database db = null;
private FieldHelper outputField = null;
private FieldHelper tableField = null;
private List existingTables = null;

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
	Object[] r = getRow();

	if (r == null) {
		setOutputDone();
		return false;
	}
	
	if (first){
		first = false;
		existingTables = Arrays.asList(db.getTablenames());
		tableField = get(Fields.In, "table_name");
		outputField = get(Fields.Out, "table_exists");
	}

    r = createOutputRow(r, data.outputRowMeta.size());

	if (existingTables.contains(tableField.getString(r))){
	    outputField.setValue(r, Long.valueOf(1));
	}
	else{
		outputField.setValue(r, Long.valueOf(0));
	}

	// Send the row on to the next step.
    putRow(data.outputRowMeta, r);

	return true;
}


public boolean init(StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface)
{

	if (parent.initImpl(stepMetaInterface, stepDataInterface)){

		try{

			db = new Database(this.parent, getTransMeta().findDatabase("TestDB"));
			db.shareVariablesWith(this.parent);
			db.connect();
			return true;
		}
		catch(KettleDatabaseException e){
			logError("Error connecting to TestDB: "+ e.getMessage());
			setErrors(1);
			stopAll();
		}
	
	}

	return false;
    
}

public void dispose(StepMetaInterface smi, StepDataInterface sdi)
{
	if (db != null) {
       	db.disconnect();
	}

    parent.disposeImpl(smi, sdi);
}
Powered by Hackadelic Sliding Notes 1.6.5

import java.util.*;

private ArrayList keys = null;
private int idx = 0;

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{

	if (first){
		first = false;
		// get the system property names, output is done one at a time later
		keys = Collections.list(System.getProperties().propertyNames());
		idx = 0;
	}

	if (idx >= keys.size()) {
		setOutputDone();
		return false;
	}

	// create a row
	Object[] r = RowDataUtil.allocateRowData(data.outputRowMeta.size());
    
    // Set key and value in a new output row
    get(Fields.Out, "key").setValue(r, keys.get(idx));
    get(Fields.Out, "value").setValue(r, System.getProperties().get(keys.get(idx)));

	idx++;

	// Send the row on to the next step.
    putRow(data.outputRowMeta, r);

	return true;
}
Powered by Hackadelic Sliding Notes 1.6.5

57 comments to The User Defined Java Class Step

  • What does it mean “delete fields”?

  • Kaushal

    Hi,
    Is it possible to extend the Code Snippets (for e.g. adding a reusable code snippet)

    Regards,
    Kaushal

  • Fred H.

    Thank you for this.
    However, I don’t understand how the outputRowMeta object is created in the input step example. If this was a step plugin implementation instead of a UDJC, how would the row metadata be defined?

  • Tomislav Urban

    Slawomir – Terrific article, thanks!

    I’m wondering if you’ve had any luck using the Binary output file type. I’m attempting to read some files from the file system into a BLOB field (PostgreSQL bytea). Everything works fine until I attempt to execute:

    get(Fields.Out, “filebytes”).setValue(r, fileBytesIn);

    filebytes is defined as a Binary field in the output field list
    fileBytesIn in a byte[] resulting from:

    fis = new FileInputStream(“C:\\mydir\\” + id.toString());

    and

    fileBytesIn = IOUtils.toByteArray(fis);

    Any thoughts?

  • Skidvd

    Sorry for the new user questions, but my googling and reading of online docs has not yet found an answere to this seemingly basic question.

    I have developed a Java based transform externally to Kettle. My class extends org.pentaho.di.core.exception.KettleException.TransformClassBase and overrides the processRow method. I put this class into a jar in the libext folder. However, within Kettle, I do not see how to configure the UDJC step to reference my class? I’m using 4.3.0 stable and it only appers to have the ability to add the class code directly into the Step configuration. Is there a way to have a step reference exisitng external Java code on the classpath as I described above?

    TIA!

  • Jasper

    Thanks, It was very helpfull to see some more real world examples. PDI is great, as long as you have examples

  • Miguel

    I have a question, I hope I can help

    that way I set a new variable kettle, (Type – “system,” root, “parent,” grandparent)

    excuse me for my inlges

Leave a Reply

 

 

 

You can use these HTML tags

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>