Processes used
Several BPEL processes are used in this example. They can be downloaded at: https://dl.dropboxusercontent.com/u/6693935/blog/JMSTest.zip
First I will shortly describe the different services.
problemService is an unreliable process which contains a small piece of Java embedding. 50% of the time this process fails and throws a SOAP exception.
JMSProject is used to produce a message to the JMSTopic. The selector 'All' is used when producing the message.
clientA receives messages from the topic with selector 'All' and 'A'. This process calls the unreliable service problemService. When the call does not succeed, the process retires itself (using the fault management framework) and produces a message with selector 'Aerror'
clientB receives messages from the topic with selector 'All' and 'B'
clientAhandleError receives messages from the topic with selector Aerror. It is meant to simulate a reporting activity or similar. It then produces the same message again with selector 'A'
The above picture shows how the processes are related. Topic indicates a JMS topic. An arrow from the Topic to a process (the other boxes are the processes) indicates the process consumes messages. The text not between () at the arrows indicate the JMS message selector used. An arrow going from a process to the Topic indicates the process produces a message. The selector which is set when producing is shown in the text near the arrow. The number between brackets indicates the order of an example flow in which an error occurs in the problemService.
When a message is produced by JMSProject (1), clientA and clientB will pick up this message (2) since selector 'All' is used. clientA then calls the problemService. If it succeeds, clientA is done and the result is the following;
If it fails however, the process is retired; no new messages will be picked up. clientB can pickup new messages. In addition, clientA produces a message with selector Aerror (3). clientAhandleError (4) picks up this message and reproduce it to the topic with selector A(5). clientB will not receive this message and A will not pick it up since it is retired. When A is reactivated, the flow will restart from when clientA received the message (6). It will again call the problemService. In the below sample flow, it does succeed the second time.
If it fails again (it took some retries) and clientA is reactivated, the flow will look as follows;
As can be seen, the errors and restart actions are part of the same flow and can be related to each other. This in contrast to a previously described AQ error handling methods such as http://javaoraclesoa.blogspot.nl/2012/06/exception-handling-fault-management-and.html
Also when using this method, the state of the durable JMS Topic subscribers reflects the state of the BPEL processes and messages can be viewed from the Weblogic console. No PL/SQL is required to manage the queue and messages.
I've added a 'test' subscriber to the topic since this allows me to debug the queue and messages if messages are not picked up by any of the other subscribers.
Configuration
In order to achieve the above described behavior, some configuration is required.
JMS configuration introduction
The process of defining a JMS server, topic and connection factories is described on: http://middlewaremagic.com/weblogic/?p=4403
First a filestore is created. This filestore is used to create a JMS server. Next a JMS module is created. Inside the module, a sub-deployment is created which is targeted to the JMS server. A JMS topic is created which is targeted to the sub-deployment. In order to allow the module to be connected to, a connection factory is created.
In the JMSAdapter, a connection factory is created which refers to the JMSModule connection factory. In BPEL you can refer to the JMSAdapter connection factory to allow the BPEL process to send messages to the topic.
The below picture illustrates this (my simplified interpretation)
JMS configuration
Create a JMS server
Create a filestore and create a server using it.
Create and configure JMS Module
One topic is created and one connection factory.
The topic uses by default a FIFO ordering based on JMSMessageId. This is suitable in most cases and performs best. If however messages arrive in the time period between picking up the message and re-enqueueing the error message, the order might be lost. To achieve better ordering, the JMSPriority can be used. This can be set in the JMSAdapter configuration when producing messages (JMSProject, clientA, clientAerrorHandler in this example). The JMS module needs to have a Destination Sort Key system resource configured based on the JMSPriority. This sort key needs to be used in the configuration of the topic (configuration, general). See http://www.esentri.com/blog/2014/03/31/processing-prioritized-messages-weblogic-jms-queue/ for the required configuration and screenshots. In this example, this has not been implemented.
The connection factory can define client properties. Subscription sharing policy allows several clients with the same Client Id to register for messages. This comes in handy when working in a cluster. Multiple hosts can be registered to the same topic this way using the same connection factory. Specifying the Client Id allows restricting clients on the connection factory. It is usual to leave this field empty. The Client Id is passed using the connection factory defined in the JMSAdapter configuration.
JMS Adapter configuration
One connection factory is used but since the connection factory specifies the client which is used on the JMS Module connection factory, a separate JMSAdapter connection factory is required per JMSClient.
The durable subscribers on the topic are defined by the clients and the clients are defined in the connection factory of the JMS Adapter. See https://blogs.oracle.com/ateamsoab2b/entry/configure_oracle_soa_jmsadatper_to
The following exception can occur if the ClientId is not specified. In case of errors, look at the AdminServer log file!
ERRJMS_ERR_CR_TOPIC_CONS.
Unable to create Topic consumer due to JMSException.
Please examine the log file to determine the problem.
at oracle.tip.adapter.jms.JMS.JMSConnection.createConsumer(JMSConnection.java:559)
at oracle.tip.adapter.jms.JMS.JMSConnection.createConsumer(JMSConnection.java:493)
at oracle.tip.adapter.jms.JMS.JMSMessageConsumer.createConsumer(JMSMessageConsumer.java:347)
at oracle.tip.adapter.jms.JMS.JMSMessageConsumer.init(JMSMessageConsumer.java:921)
at oracle.tip.adapter.jms.inbound.JmsConsumer.init(JmsConsumer.java:941)
at oracle.tip.adapter.jms.JmsDDEndpoint.onDestinationsAvailable(JmsDDEnd
point.java:178)
at weblogic.jms.extensions.JMSDestinationAvailabilityHelper$DestinationAvailabilityListenerWrapper$2.run(JMSDestinationAvailabilityHelper.java:386)
at weblogic.jms.extensions.JMSDestinationAvailabilityHelper$DestinationAvailabilityListenerWrapper.callOutListener(JMSDestinationAvailabilityHelper.java:402)
at weblogic.jms.extensions.JMSDestinationAvailabilityHelper$DestinationAvailabilityListenerWrapper.onDDMembershipChange(JMSDestinationAvailabilityHelper.java:383)
at weblogic.jms.common.CDS$DD2Listener.run(CDS.java:1263)
at weblogic.work.SelfTuningWorkManagerImpl$WorkAdapterImpl.run(SelfTuningWorkManagerImpl.java:545)
at weblogic.work.ExecuteThread.execute(ExecuteThread.java:256)
at weblogic.work.ExecuteThread.run(ExecuteThread.java:221)
Caused by: weblogic.jms.common.IllegalStateException: [JMSClientExceptions:055032]An attempt was made to create a named consumer (A) on a connection with no clientID
In the BPEL processed some adapter configuration is required.
Produce a message
The below image shows how the adapter configuration for producing messages.
In order to produce messages with a specific header so we can use a message selector on it, we use the following BPEL code. Please mind, this is BPEL 2.0 code. For BPEL 1.1 code look at: http://learn-oraclesoa.blogspot.nl/2013/06/getting-and-setting-jms-header.html;
<invoke name="InvokeJMS" partnerLink="WriteJMS"
portType="ns1:Produce_Message_ptt" operation="Produce_Message"
inputVariable="InvokeJMS_Produce_Message_InputVariable"
bpelx:invokeAsDetail="no">
<bpelx:toProperties>
<bpelx:toProperty name="jca.jms.JMSProperty.selector">'All'</bpelx:toProperty>
</bpelx:toProperties>
</invoke>
Consume a message
For consuming messages, the durable subscriber is specified and the message selector. In case for client A, the selector is set to All or A. This allows the error handler to send specific messages to this service and also allows a common producer to send messages to this process and other processes.
Conclusion
The initial purpose of this blogpost was to try and create an error handling mechanism similar to when using a multi consumer AQ (http://javaoraclesoa.blogspot.nl/2012/06/exception-handling-fault-management-and.html) by using a JMS Topic. In order to achieve this, I needed to implement several other interesting techniques
- custom JMS headers in BPEL 2.0
- JMS message selectors
- configuration from BPEL to the Topic
This is most likely the most important gain from this exercise. The described method has several benefits and drawbacks.
Benefits
- A JMS topic is used with a file-based persistent store. This is a relatively lightweight implementation which performs reliable and is fast (see http://javaoraclesoa.blogspot.nl/2013/02/comparing-weblogic-filestore-backed-jms.html).
- The message flow can be traced in the EM Fusion Middleware Control and messages can be viewed in Weblogic console. No PL/SQL is required.
Drawbacks
- A separate connection factory needs to be created if a new client is added.
- There is quite a lot of configuration required. If done manually, errors will occur. If automated, it will require quite some scripting.
- Managing processing of messages is done at the client (BPEL process) level by retiring/activating processes. The topic can be deactivated as a whole but this will influence every producer/consumer.
Limitations
An important limitation is of course that if you want to continue processing messages based on a specific field in the message contents, error handling on process level might be too coarse grained and another solution is required. This solution also does not prevent parallel execution of processes after they have been picked up. Only the order of picking up messages is guaranteed. A solution described on: http://javaoraclesoa.blogspot.nl/2013/05/how-to-deal-with-services-that-dont.html can be used to prevent this.
Of course, 'guaranteed' in the topic of this post is relative and not every situation has been tested.
No comments:
Post a Comment