Thursday, June 21, 2012

Exception handling; fault management and priority messages

Introduction

Exception handling is often a topic which is paid too little attention to. Functional people often consider this a technical topic and technical people consider the handling of error situations to mostly be functional in nature. This difference in opinion can result in software going to a production environment which is difficult to maintain.

It is suggested to first read the following two articles to understand the background of this article;
http://javaoraclesoa.blogspot.nl/2012/05/exception-handling-in-soa-suite-10g-and.html
http://javaoraclesoa.blogspot.nl/2012/05/re-enqueueing-faulted-bpel-messages.html

The below article describes a combination of exception handling by using a custom Java class in a fault policy to retire the process and a BPEL catch branch to reenqueue a message with a higher priority.

The implementation will satisfy the following requirements;

If an error occurs
- the process will be retired to prevent future faults in case a service invocation fails (fault management framework and custom Java fault handler)
- the faulted message must be put on a queue so it can easily be re-offered to the process after the error has been resolved (catch branch in BPEL using AQ adapter to re-enqueue a message)
- the faulted message must be picked up before the other messages in the queue if the problem is fixed in order to retain the order of messages processed (AQ with sorting on priority)

The fault management framework

Summary

The fault management framework does not handle all exceptions which occur but only invoke exceptions. The fault management framework takes precedence over the BPEL catch branch. The fault management framework makes it easy to use custom Java fault handlers to for example retire a process.

The BPEL catch branch does not allow a custom Java class to handle the exception. You can however use Java embedding. See for an example; http://javaoraclesoa.blogspot.nl/2012/03/base64encode-and-base64decode-in-bpel.html. Mind that when an exception is rethrown in a Catch branch and a CatchAll branch is present on the same level, the exception is propagated to the higher level and not caught by the CatchAll branch!

Example

The following example will retire a process when errors occur in invoke activities. The rethrow action makes sure that after the process has been retired, the error is propagated to the catch activities in the BPEL process.

The following code can be put in the <SCA-project>/SCA-INF/src/ms/exceptiontest.soa.faultHandlers folder;

package ms.exceptiontest.soa.faultHandlers;

import com.collaxa.cube.engine.fp.BPELFaultRecoveryContextImpl;
import java.util.logging.Logger;
import oracle.integration.platform.faultpolicy.IFaultRecoveryContext;
import oracle.integration.platform.faultpolicy.IFaultRecoveryJavaClass;
import oracle.soa.management.facade.Composite;
import oracle.soa.management.facade.Locator;
import oracle.soa.management.facade.LocatorFactory;

public class retireFaultHandler implements IFaultRecoveryJavaClass {
    private final static Logger logger =
        Logger.getLogger(retireFaultHandler.class.getName());

    public void handleRetrySuccess(IFaultRecoveryContext iFaultRecoveryContext) {
    }

    public String handleFault(IFaultRecoveryContext iFaultCtx) {
        try {
            BPELFaultRecoveryContextImpl bpelCtx =
                (BPELFaultRecoveryContextImpl)iFaultCtx;
            Locator loc = LocatorFactory.createLocator();
            Composite comp =
                loc.lookupComposite(bpelCtx.getProcessDN().getCompositeDN());
            comp.retire();
            System.out.println("retired " + comp.getDN());

        } catch (Exception e) {
            logger.severe("Error in FaultHandler " +
                          retireFaultHandler.class.getName());
            e.printStackTrace();
        }
        return "OK";
    }
}

Then you can use the following fault-policies.xml

<?xml version="1.0" encoding="UTF-8"?>
<faultPolicies xmlns="http://schemas.oracle.com/bpel/faultpolicy"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <faultPolicy version="2.1.3" id="ConnectionFaults">
    <Conditions>
      <faultName>
        <condition>
          <action ref="retireProcess"/>
        </condition>
      </faultName>
    </Conditions>
    <Actions>
      <Action id="retireProcess">
        <javaAction className="ms.exceptiontest.soa.faultHandlers.retireFaultHandler"
                    defaultAction="ora-rethrow-fault">
          <returnValue value="OK" ref="ora-rethrow-fault"/>
        </javaAction>
      </Action>
      <!-- Generics -->
      <Action id="ora-terminate">
        <abort/>
      </Action>
      <Action id="ora-replay-scope">
        <replayScope/>
      </Action>
      <Action id="ora-rethrow-fault">
        <rethrowFault/>
      </Action>
      <Action id="ora-human-intervention">
        <humanIntervention/>
      </Action>
    </Actions>
  </faultPolicy>
</faultPolicies>

And fault-bindings.xml

<?xml version="1.0" encoding="UTF-8"?>
<faultPolicyBindings version="2.0.1"
                     xmlns="http://schemas.oracle.com/bpel/faultpolicy"
                     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <composite faultPolicy="ConnectionFaults"/>
</faultPolicyBindings>

Re-enqueue messages using a higher priority

PL/SQL queue code

To be able to re-enqueue messages using a higher priority, a multiconsumer queue is created. Creating a multiconsumer queue is not required but allows more flexibility since multiple parties can become subscriber on the queue.

The queue uses a sort_list of priority,enq_time making sure messages with higher priority are dequeued first. The retention time is set to 1 week since even if messages are dequeued, it is handy if we can still find them to check their contents. The user holding the queues (and queue package), should have the following grants;

GRANT EXECUTE ON DBMS_AQADM TO testuser;
GRANT EXECUTE ON DBMS_AQ TO testuser;
GRANT AQ_ADMINISTRATOR_ROLE TO testuser;

--creating queue tables
BEGIN
  DBMS_AQADM.CREATE_QUEUE_TABLE( Queue_table => '"TESTUSER"."SOA_MULTI_QT"', Queue_payload_type => 'SYS.XMLTYPE', Sort_list => 'PRIORITY,ENQ_TIME', Multiple_consumers => TRUE, Compatible => '8.1.3');
END;

--creating queues
BEGIN
  DBMS_AQADM.CREATE_QUEUE( Queue_name => 'TESTUSER.SOA_MULTI_QUEUE', Queue_table => 'TESTUSER.SOA_MULTI_QT', Queue_type => 0, Max_retries => 5, Retry_delay => 0, Retention_time => '604800', dependency_tracking => FALSE, COMMENT => 'multi queue');
END;

--adding subscribers
begin
DBMS_AQADM.ADD_SUBSCRIBER ('TESTUSER.SOA_MULTI_QUEUE',sys.aq$_agent('FAULTTEST', null, null));
DBMS_AQADM.Start_queue( Queue_name => 'TESTUSER.SOA_MULTI_QUEUE',TRUE,TRUE );
end;

To drop the queue, queuetable, etc, the following can be used

BEGIN
  DBMS_AQADM.Stop_queue( Queue_name => 'TESTUSER.SOA_MULTI_QUEUE' );
  DBMS_AQADM.DROP_QUEUE( Queue_name => 'TESTUSER.SOA_MULTI_QUEUE' );
  DBMS_AQADM.DROP_QUEUE_TABLE( Queue_table => '"TESTUSER"."SOA_MULTI_QT"');
END;

To allow other users to enqueue messages without having to grant them rights to the DBMS_AQ package, the following package can be used;

CREATE OR REPLACE
PACKAGE soa_queue_pack
IS
type t_recipients_list IS TABLE OF VARCHAR2 (50); -- index by binary_integer;

PROCEDURE vul_multi_queue(
    p_queue_naam      IN VARCHAR2 ,
    p_xml_payload     IN xmltype ,
    p_priority        IN BINARY_INTEGER ,
    p_recipients_list IN t_recipients_list);
END soa_queue_pack;

create or replace
PACKAGE BODY soa_queue_pack
IS
PROCEDURE vul_multi_queue(
    p_queue_naam      IN VARCHAR2 ,
    p_xml_payload     IN xmltype ,
    p_priority        IN BINARY_INTEGER ,
    p_recipients_list IN t_recipients_list )
IS
  l_msg_aq raw(18);
  l_enq_opt dbms_aq.enqueue_options_t;
  l_msg_prop dbms_aq.message_properties_t;
  l_recipients_list dbms_aq.aq$_recipient_list_t;
BEGIN
  FOR i IN p_recipients_list.FIRST .. p_recipients_list.LAST
  LOOP
    l_recipients_list(i) := sys.aq$_agent(p_recipients_list(i),NULL,NULL);
  END LOOP;
  l_msg_prop.priority       := p_priority;
  l_msg_prop.recipient_list := l_recipients_list;
  dbms_aq.enqueue(p_queue_naam, l_enq_opt, l_msg_prop, p_xml_payload, l_msg_aq);
END;
END;

Other users can then do the following to enqueue messages ('testuser' is the user who owns the package and queues);

DECLARE
l_recipients          testuser.soa_queue_pack.t_recipients_list;
l_message_id RAW(16);
l_message SYS.XMLType;
BEGIN
l_recipients := testuser.soa_queue_pack.t_recipients_list('FAULTTEST');
l_message := sys.XMLType.createXML('<itemCollectionArray xmlns:msg_out="http://test.ms/itemcollections" xmlns="http://test.ms/itemcollections"><msg_out:itemsCollection><msg_out:item><msg_out:name>name</msg_out:name><msg_out:value>Piet</msg_out:value></msg_out:item></msg_out:itemsCollection></itemCollectionArray>');
testuser.soa_queue_pack.vul_multi_queue('TESTUSER.SOA_MULTI_QUEUE',l_message,5,l_recipients);
END;

This can for example be used in a PL/SQL table trigger in a different schema to enqueue a message on a certain status change or insert.

BPEL dequeueing and enqueueing

In BPEL you can specify the consumer a property in the AQ wizard for the dequeue operation;


For enqueueing, you can specify the enqueue priority to for example make sure messages causing errors, are picked up first when the problem is fixed.


To make sure no loops are caused and to throttle the speed at which messages are processed by the AQ adapter, the following properties can be used in the composite.xml;

  <service name="soa_multi_queue_AQ"
           ui:wsdlLocation="soa_multi_queue_AQ.wsdl">
    <interface.wsdl interface="http://xmlns.oracle.com/pcbpel/adapter/aq/bpel/multiqueue/soa_multi_queue_AQ#wsdl.interface(Dequeue_ptt)"/>
    <binding.jca config="soa_multi_queue_AQ_aq.jca"/>
    <property name="minimumDelayBetweenMessages">10000</property>
    <property name="adapter.aq.dequeue.threads" type="xs:string" many="false">1</property>
  </service>

The above setting causes 1 message every 10 seconds to be picked up from the queue.

Demonstration

The following BPEL process has been created;
This process calls an HelloWorld webservice. This webservice can be shutdown or retired in order to simulate a RemoteException (failure to invoke the webservice). The fault policy will call the custom Java action to retire the process. After retirement, the fault is rethrown and caught by the catchall block. This will re-enqueue the message with a higher priority. Upon fixing the error and enabling the process, the faulted message is picked up first and processed correctly.

First a message is put on the queue by executing;
DECLARE
l_recipients          testuser.soa_queue_pack.t_recipients_list;
l_message_id RAW(16);
l_message SYS.XMLType;
BEGIN
l_recipients := testuser.soa_queue_pack.t_recipients_list('FAULTTEST');
l_message := sys.XMLType.createXML('<itemCollectionArray xmlns:msg_out="http://test.ms/itemcollections" xmlns="http://test.ms/itemcollections"><msg_out:itemsCollection><msg_out:item><msg_out:name>name</msg_out:name><msg_out:value>Piet</msg_out:value></msg_out:item></msg_out:itemsCollection></itemCollectionArray>');
testuser.soa_queue_pack.vul_multi_queue('TESTUSER.SOA_MULTI_QUEUE',l_message,5,l_recipients);
END;

Then confirm the message is processed succesfully;

Next, disable the HelloWorld service by retiring it and re-enqueue a message. Check how CallHelloWorld handles the exception. The process is retired and the message is re-enqueued with a higher priority.



You can also confirm that after enabling the HelloWorld service and Activating the CallHelloWorld process, it first picks up the faulted message. You can download the sample (BPEL/Java code) at; http://dl.dropbox.com/u/6693935/blog/ExceptionTest.zip

No comments:

Post a Comment