Monday, February 6, 2017

Oracle Service Bus: Produce messages to a Kafka topic

Oracle Service Bus is a powerful tool to provide features like transformation, throttling, virtualization of messages coming from different sources. There is a (recently opensourced!) Kafka transport available for Oracle Service Bus (see here). Oracle Service Bus can thus be used to do all kinds of interesting things to messages coming from Kafka topics. You can then produce altered messages to other Kafka topics and create a decoupled processing chain. In this blog I provide an example on how to use Oracle Service Bus to produce messages to a Kafka topic.

Messages from OSB to Kafka

First perform the steps as described here to setup the Service Bus with the Kafka transport. Also make sure you have a Kafka broker running.

Next create a new Business Service (File, New, Business Service). It is not visible in the component palette since it is a custom transport. Next use transport Kafka.

In the Type screen be sure to select Text as request message and None as response message.

Specify a Kafka bootstrap broker.

The body needs to be of type {}Body. If you send plain text as the body to the Kafka transport, you will get the below error message:

<Error> <oracle.osb.pipeline.kernel.router> <ubuntu> <DefaultServer> <[STUCK] ExecuteThread: '22' for queue: 'weblogic.kernel.Default (self-tuning)'> <<anonymous>> <> <43b720fd-2b5a-4c93-a073-298db3e92689-00000132> <1486368879482> <[severity-value: 8] [rid: 0] [partition-id: 0] [partition-name: DOMAIN] > <OSB-382191> <SBProject/ProxyServicePipeline: Unhandled error caught by system-level error handler: OSB Assign action failed updating variable "body": [OSB-395105]The TokenIterator does not correspond to a single XmlObject value

If you send XML as the body of the message going to the transport but not an explicit SOAP body, you will get errors in the server log like below:

<Error> <oracle.osb.pipeline.kernel.router> <ubuntu> <DefaultServer> <[STUCK]
 ExecuteThread: '22' for queue: 'weblogic.kernel.Default (self-tuning)'> <<anonymous>> <> <43b720fd-2b5a-4c93-a07
3-298db3e92689-00000132> <1486368987002> <[severity-value: 8] [rid: 0] [partition-id: 0] [partition-name: DOMAIN]
 > <OSB-382191> <SBProject/ProxyServicePipeline: Unhandled error caught by system-level error handler: com.bea.wl Failed to set the value of context variable "body". Value must be an instance
 of {}Body.

As you can see, this causes stuck threads. In order to get a {}Body you can for example use an Assign activity. In this case I'm replacing text in the input body and assign it to the output body. I'm using <ns:Body xmlns:ns=''>{fn:replace($body,'Trump','Clinton')}</ns:Body>. This replaces Trump with Clinton.

When you check the output with a tool like for example KafkaTool you can see the SOAP body is not propagated to the Kafka topic.


Oracle Service Bus processes individual messages. If you want to aggregate data or perform analytics on several messages, you can consider using Oracle Stream Analytics (OSA). It also has pattern recognition and several other interesting features. It is however not very suitable to split up messages or perform more complicated actions on individual messages such as transformations. For such a use-case, use Oracle Service Bus.