Wednesday, May 23, 2012

Exception handling in SOA Suite 10g and SOA Suite 11g

Introduction

Sometimes, the longer you think about how to solve a problem, the less complex the solution becomes. Error handling in SOA Suite 11g is one of those examples. It is tempting to implement an own mechanism for exception/error handling (for example http://javaoraclesoa.blogspot.com/2012/05/re-enqueueing-faulted-bpel-messages.html), although there already is an extensive fault management framework part of the SOA Suite. In this post I describe the method used in SOA Suite 10g to implement fault-policies using a custom Java class. I implement a similar exception handling mechanism in Oracle SOA Suite 11g.

Marcel Bellinga has provided most code in the below example.

Challenges to tackle

Some of the challenges involved when implementing exception handling;
- how do I make it easy for the people monitoring and maintaining the application to detect and recover from errors?
- how do I make sure no messages are lost?
- how do I make sure the order in which messages are offered to the application, does not change when exceptions occur?
- how do I prevent 'hammering' a system (continuously retrying faulted messages)

With these questions in mind, the following solution provides a good option.

A bit of background

Oracle BPEL 10g has the option to use fault-policies and fault-bindings (and use custom Java classes in the policies), which are put on the application server and referred to by a bpel process in the bpel.xml. See; http://docs.oracle.com/cd/E14101_01/doc.1013/e15342/bpelrn.htm#BABCHCED.

Oracle SOA Suite 11g has (in addition to the method described above) the option to deploy custom Java classes, fault-policies and fault-bindings as part of the composite to the application server. This mechanism makes it easier to use the fault management framework on a per-composite basis. See http://docs.oracle.com/cd/E12839_01/integration.1111/e10224/bp_faults.htm

Keep in mind, when using the fault management framework that the fault-policies get triggered before a catch branch as defined in a BPEL process. If you want the catch branch to be activated, the action to rethrow the fault, needs to be part of the policy.

Solution in short

The solution for handling faults while taking into account the above questions, will use the following method;
- in Oracle BPEL 10g, a custom Java class and a specific policy xml-file is deployed on the application server
- the bpel.xml file will refer to the policy defined in the specific policy XML file
- the custom Java class will first deactivate the activation agents of the process and then retire the process (avoiding the issue that messages are picked up while the process is already retired causing loss of messages)
- the faulted message is put in manual recovery mode so the error hospital can be used to recover (retry) the message after the problem is fixed
- if the problem is fixed, the process can be activated again
- the ORABPEL schema tables can be monitored for messages which can be recovered or to trigger someone something has gone wrong and a recovery action is required

In Oracle SOA Suite 11g the method is similar, however, the activation agents do not need to be deactivated explicitly, the API calls are a bit different (due to the SCA implementation) and the error handling is deployed as part of the composite (in this example, see http://mazanatti.info/index.php?/archives/75-SOA-Fault-Framework-Creating-and-using-a-Java-action-fault-policy.html for an example on how to deploy custom Java code centrally on the server).

Implementation

Implementation BPEL 10g exception handling

Custom Java action

Create a new Java project and include the orabpel.jar from your BPEL distribution in the root folder of your project. Update the project libraries to include the library.
Create a new Java class. I've used the following;

package testapi;

import com.oracle.bpel.client.BPELProcessMetaData;
import com.oracle.bpel.client.IBPELProcessConstants;
import com.oracle.bpel.client.IBPELProcessHandle;
import com.oracle.bpel.client.Locator;
import com.oracle.bpel.client.config.faultpolicy.IFaultRecoveryContext;
import com.oracle.bpel.client.config.faultpolicy.IFaultRecoveryJavaClass;

public class RetireProcess implements IFaultRecoveryJavaClass {
    public RetireProcess() {
    }

    /**     * This method is called by the BPEL Error Hospital framework when this    
     * * action is selected as retrySuccessAction (with the retry option) or    
     * * when this action is selected as successor in the human intervention    
     * * screen in the BPEL Console.     *     
     * * @param iFaultRecoveryContext     */
    public void handleRetrySuccess(IFaultRecoveryContext iFaultRecoveryContext) {
        System.out.println("RetireProcess RetrySucces start");
        setLifeCycle(iFaultRecoveryContext,
                     IBPELProcessConstants.LIFECYCLE_ACTIVE);
        System.out.println("RetireProcess RetrySucces einde");
    }

    /**     * This method is called by the BPEL Error Hospital framework when this    
     * * class is configured as action in the fault handling policy     *     
     * * @param iFaultRecoveryContext   
     * * @return String that can be used to influence choice for next action (not used in this case)     */
    public String handleBPELFault(IFaultRecoveryContext iFaultRecoveryContext) {
        System.out.println("RetireProcess HandleFault start");
        setLifeCycle(iFaultRecoveryContext,
                     IBPELProcessConstants.LIFECYCLE_RETIRED);
        System.out.println("RetireProcess HandleFault Einde");
        return null;
    }

    private void setLifeCycle(IFaultRecoveryContext iFaultRecoveryContext,
                              int status) {
        IBPELProcessHandle procHandle = null;
        Locator loc = null;
        BPELProcessMetaData bpelProcessMetadata = null;
        String processName;
        String revision;

        try {
            processName = iFaultRecoveryContext.getProcessId().getProcessId();
            revision = iFaultRecoveryContext.getProcessId().getRevisionTag();
            /*
                 * get Locator Instance
                */
            loc = iFaultRecoveryContext.getLocator();
            /*
                 * Lookup Process. Revision optional.
                */
            if (revision == null || revision.trim().equals("")) {
                procHandle = loc.lookupProcess(processName);
            } else {
                procHandle = loc.lookupProcess(processName, revision);
            }
            if (procHandle == null) {
                throw new Exception("Unable to find process: " + processName);
            }

            System.out.println("RetireProcess set lifecycle to retired");
            /*
                 * Get Metadata of the process.
                */
            bpelProcessMetadata = procHandle.getMetaData();
            if (bpelProcessMetadata.getLifecycle() != status) {
                /*
                 * Set Lifecycle to Retired.
                 * Use setState(IBPELProcessConstants.STATE_OFF) to change process state to off.
                 */
                bpelProcessMetadata.setLifecycle(status);
                System.out.println("RetireProcess Lifecycle set to retired");

                /*
                 * Stop activation agents
                 */
                if (status == IBPELProcessConstants.LIFECYCLE_RETIRED) {
                    procHandle.stopAllActivationAgents();
                } else {
                    procHandle.startAllActivationAgents();
                    ;
                }
                /*
                 * Finally update the process with the modified metadata.
                 */
                procHandle.updateMetaData(bpelProcessMetadata);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}



Noteworthy here are the method to retire the process; obtain a locator, use the locator to get a processhandle, use the processhandle to get to the metadata, update the metadata. The processhandle can also be used to stop the activation agents. Compile the project using JDK 1.5.0.06.

Place this class in;
[ORACLE_HOME]/bpel/system/classes/

Fault policy and fault binding

Create a fault policy like for example

<?xml version="1.0" encoding="UTF-8"?>
<faultPolicy version="2.0.1" id="RetireProcessPolicy" xmlns:env="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns="http://schemas.oracle.com/bpel/faultpolicy" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <!-- This section describes fault conditions. Build more conditions with faultName, test and action -->
    <Conditions>
        <!-- Fault ALL -->
        <faultName>
            <condition>
                <action ref="RetireProcess"/>
            </condition>
        </faultName>
    </Conditions>
    <Actions>
        <!-- This action will attempt 8 retries at increasing intervals of 2, 4, 8, 16, 32, 64, 128, and 256 seconds. -->
        <Action id="ora-retry">
            <retry>
                <retryCount>8</retryCount>
                <retryInterval>2</retryInterval>
                <retryFailureAction ref="ora-terminate"/>
                <exponentialBackoff/>
            </retry>
        </Action>
        <!-- This is an action will cause a replay scope fault-->
        <Action id="ora-replay-scope">
            <replayScope/>
        </Action>
        <!-- This is an action will bubble up the fault-->
        <Action id="ora-rethrow-fault">
            <rethrowFault/>
        </Action>
        <!-- This is an action will mark the work item to be "pending recovery from console"-->
        <Action id="ora-human-intervention">
            <humanIntervention/>
        </Action>
        <!-- This action will cause the instance to terminate-->
        <Action id="ora-terminate">
            <abort/>
        </Action>
        <Action id="RetireProcess">
            <javaAction className="testapi.RetireProcess" defaultAction="ora-human-intervention"/>
        </Action>
    </Actions>
</faultPolicy>


Place the fault policy in
[ORACLE_HOME}/bpel/domains/{domain}/config/fault-policies

Create a reference to the faultpolicy in the bpel.xml of the process like;
(below </activationAgents>)
     <faultPolicyBindings>
         <process faultPolicy="
RetireProcessPolicy"/>
         <partnerLink faultPolicy="
RetireProcessPolicy"/>
      </faultPolicyBindings>


Noteworthy in this policy is the defaultAction. My custom Java class returns null. This triggers the defaultAction which is set to ora-human-intervention. This causes the invoke to be visible in the error hospital (Activities tab in the process manager). From the error hospital it is also possible to specify an on retry success method to be executed (by clicking the specific error).







Result

When an error occurs, the failed messages arrive (in order) in the error hospital (usually a small number before the process is retired). The process instances which have faulted, remain open. The process is retired. You can retry the activities to check whether the error is fixed. In the error is fixed, the process can be activated again resuming normal action. This way the order of messages is guaranteed, there is no useless hammering and retrying the action which has failed. The process can be activated when the problem is fixed avoiding a lot of manual re-offering of messages.

Implementation BPEL 11g Exception handling

The 11g implementation is very similar to the 10g implementation. Deployment does not require any server side configuration. You can download the example project here; http://dl.dropbox.com/u/6693935/blog/TestExceptionHandling.zip. If you encounter errors deploying the project, you should remove the MDS entry in .adf\META-INF\adf-config.xml causing the issue. The example project requires the setup as described in; http://javaoraclesoa.blogspot.com/2012/05/re-enqueueing-faulted-bpel-messages.html. Also mind that when importing the project, your MDS configuration might differ. Remove the entries not relevant for your configuration from the .adf/META-INF/adf-config.xml file.

Custom Java class


I've used the following Java class (created in SCA-INF/src). No additional project configuration (like including libraries) is required in 11g.


package ms.testapp.exceptionhandling;
import com.collaxa.cube.engine.fp.BPELFaultRecoveryContextImpl;
import java.util.logging.Logger;
import oracle.integration.platform.faultpolicy.IFaultRecoveryContext;
import oracle.integration.platform.faultpolicy.IFaultRecoveryJavaClass;
import oracle.soa.management.facade.Composite;
import oracle.soa.management.facade.Locator;
import oracle.soa.management.facade.LocatorFactory;
public class RetireProcess implements IFaultRecoveryJavaClass {
    private final static Logger logger = Logger.getLogger(RetireProcess.class.getName());
    public RetireProcess() {
        super();
    }
    public void handleRetrySuccess(IFaultRecoveryContext iFaultRecoveryContext) {
    }
    public String handleFault(IFaultRecoveryContext iFaultRecoveryContext) {
            System.out.println("handleFault started");
            BPELFaultRecoveryContextImpl bpelCtx =
                (BPELFaultRecoveryContextImpl)iFaultRecoveryContext;
        try{
            Locator loc = LocatorFactory.createLocator();
            System.out.println("locator obtained");
            Composite comp = loc.lookupComposite(bpelCtx.getProcessDN().getCompositeDN());
            System.out.println("compisite found");
            comp.retire();
            //bpelCtx.addAuditTrailEntry("retired " + comp.getDN());
            System.out.println("process retired");
            logger.info("retired " + comp.getDN());
        } catch (Exception e) {
            System.out.println("fault in handler");
            //bpelCtx.addAuditTrailEntry("Error in FaultHandler " + RetireProcess.class.getName());
            logger.severe("Error in FaultHandler " + RetireProcess.class.getName());
            e.printStackTrace();
        }
        return null;
        }
}

Fault policy and fault binding

My fault-policy file is called fault-policies.xml (the composite.xml picks that file by default but a different file can be specified in the composite.xml file if required) and it looks like;

<?xml version="1.0" encoding="UTF-8"?>
<faultPolicies xmlns="http://schemas.oracle.com/bpel/faultpolicy"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <faultPolicy version="2.1.3" id="ConnectionFaults">
    <Conditions>
      <faultName>
        <condition>
          <action ref="handle-fault-through-custom-java"/>
        </condition>
      </faultName>
    </Conditions>
    <Actions>
      <Action id="handle-fault-through-custom-java">
        <javaAction className="
ms.testapp.exceptionhandling.RetireProcess"
                    defaultAction="ora-human-intervention">
        </javaAction>
      </Action>
      <Action id="ora-rethrow-fault">
        <rethrowFault/>
      </Action>
<Action id="ora-human-intervention">
 <humanIntervention/></Action>
    </Actions>
  </faultPolicy>
</faultPolicies>


My fault-bindings.xml looks like;



<?xml version="1.0" encoding="UTF-8"?>
<faultPolicyBindings version="2.0.1"
                     xmlns="http://schemas.oracle.com/bpel/faultpolicy"
                     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <composite  faultPolicy="ConnectionFaults"/>
</faultPolicyBindings>



These files are placed in the same folder as the composite.xml.

Result

The behavior in 11g is similar to the behavior described in 10g in both examples. One thing to notice is that the API works on composite level and I've not found a way to directly stop of start the activation agents. I did however not encounter the 10g error that the JCA adapter tried to start a retired process.
First the correct situation. Use the testscript to enqueue a message.

DECLARE
  queue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
  message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  recipients          DBMS_AQ.aq$_recipient_list_t;
  message_id RAW(16);
  message SYS.XMLType;
BEGIN
  recipients(1) := sys.aq$_agent('EXCEPTIONTEST', NULL, NULL);
  message_properties.recipient_list := recipients;
  message := sys.XMLType.createXML('<itemCollectionArray xmlns:msg_out="http://test.ms/itemcollections" xmlns="http://test.ms/itemcollections"><msg_out:itemsCollection><msg_out:item><msg_out:name>Name</msg_out:name><msg_out:value>Piet</msg_out:value></msg_out:item></msg_out:itemsCollection></itemCollectionArray>');
  DBMS_AQ.ENQUEUE( queue_name => 'TESTUSER.TEST_SOURCE_QUEUE',
                   enqueue_options => queue_options,
                   message_properties => message_properties,
                   payload => message,
                   msgid => message_id);
  COMMIT;
END;

The result is a correct execution of the process;
Next disable the TEST_TARGET_QUEUE


Again submit a test message and confirm the error handler has activated in the Enterprise Manager.

Conclusion

Error handling in SOA Suite 11g is more extensive (has more options) then error handling in SOA Suite 10g. Also SOA Suite 11g provides options for implementing fault handling on a per process basis. This was absent in SOA Suite 10g. For accessing the API, there have been many changes going from 10g to 11g. The most significant changes have been caused by the implementation of the SCA framework. SOA Suite 11g makes it a lot easier to use the Java API.

Also a lesson learned is to think about error handling very early on in a project and not start with the implementation which seems logical to a single developer but discuss the different options and requirements with the customer and other developers. In this case a relatively simple solution using standard Oracle functionality causes many requirements to be met. However if the purpose is to make as many hours as possible and tackling every requirement as a new change, then this solution is not for you!

Friday, May 4, 2012

Re-enqueueing faulted BPEL messages using Oracle AQ

Introduction

Exception handling is an important topic to consider when using Middleware solutions to link different systems together. Often for example the 24/7 database appears to be more like 23/7 (no 100% up-time) or database packages a composite depends on, get changed without the SOA developer being informed about it. This can cause BPEL processes not to be able to complete successfully.

In a development environment, this is no big deal but in a production environment, where possibly large numbers of messages are processed, you'd better make sure you've thought about how to deal with for example unreachable databases. You don't want to lose messages or have a hard time restoring the faulted messages.

The below pattern provides an option for error handling using Advanced Queues (AQ). It uses an error queue to store messages which have gone wrong in BPEL and allows for an easy mechanism to offer the failed messages again to the process.

The pattern involves three queues. Messages are read from the SourceQueue. A database procedure is called to enrich the source message. If enrichment fails, the message is put on an ErrorQueue and the process is terminated. If all goes well, the resulting message is put on a TargetQueue. Messages from the ErrorQueue can be re-enqueued on the SourceQueue to reinitiate processing of failed messages.

It is suggested that the ErrorQueue and the SourceQueue are in the same database; if the message can be picked up from the source and the process is started, you can be pretty sure the source is available.

An additional benefit of using this pattern is that Oracle Advanced Queues can be used for throttling BPEL processing when BPEL is misused for batch processing.

Setup

Database

I've used http://docs.oracle.com/cd/B10501_01/appdev.920/a96587/apexampl.htm#33919 as a reference to put the PL/SQL AQ code together.

Grants

First create a test user in your database. I've called this user 'testuser'. Then grant the user the required privileges to be able to do some Advanced Queueing;

Execute as system user the following;
GRANT EXECUTE ON DBMS_AQADM TO testuser;
GRANT Aq_administrator_role TO testuser;

Create queue tables and queues

BEGIN
   DBMS_AQADM.CREATE_QUEUE_TABLE
      (queue_table => 'TEST_SOURCE_QUEUE_QT',
       queue_payload_type => 'SYS.XMLType',
       multiple_consumers => TRUE);
END;
/
BEGIN
   DBMS_AQADM.CREATE_QUEUE_TABLE
      (queue_table => 'TEST_TARGET_QUEUE_QT',
       queue_payload_type => 'SYS.XMLType',
       multiple_consumers => TRUE);
END;
/
BEGIN
   DBMS_AQADM.CREATE_QUEUE_TABLE
      (queue_table => 'TEST_ERROR_QUEUE_QT',
       queue_payload_type => 'SYS.XMLType',
       multiple_consumers => TRUE);
END;
/
begin
DBMS_AQADM.CREATE_QUEUE ('TEST_SOURCE_QUEUE', 'TEST_SOURCE_QUEUE_QT');
DBMS_AQADM.CREATE_QUEUE ('TEST_TARGET_QUEUE', 'TEST_TARGET_QUEUE_QT');
DBMS_AQADM.CREATE_QUEUE ('TEST_ERROR_QUEUE', 'TEST_ERROR_QUEUE_QT');
DBMS_AQADM.START_QUEUE ('TEST_SOURCE_QUEUE');
DBMS_AQADM.START_QUEUE ('TEST_TARGET_QUEUE');
DBMS_AQADM.START_QUEUE ('TEST_ERROR_QUEUE');
end;

Now you've created three queue tables and three queues using those tables. I've made the queues multiconsumer for additional flexibility maybe at a later stage. Multiconsumer queues allow for different parties to produce and consume messages from the queue without interfering with each other. Especially for an error queue, this can come in handy.

You don't have to register subscribers to the queue since that's done automatically upon deployment of the BPEL process (in SOA Suite 11.1.1.6 on an 11.2 database). If you're running older software, you can use a script like;
begin
DBMS_AQADM.ADD_SUBSCRIBER ('SOA_GDI.TEST_SOURCE_QUEUE',sys.aq$_agent('EXCEPTIONTEST', null, null));
end;
To add subscribers.

I've used a small database package to simulate an often encountered error; the procedure I want to call is not valid. I wanted to use a database call for the example and was not interested in the functionality of the package.

CREATE OR REPLACE PACKAGE "TESTUSER"."SOA_TEST" AS
  function getsystimestamp return timestamp;
END SOA_TEST;
/
create or replace
PACKAGE BODY SOA_TEST AS
  function getsystimestamp return timestamp AS
  BEGIN
    RETURN systimestamp;
  END getsystimestamp;
END SOA_TEST;

BPEL

The configuration of the database adapter should be familiar and will not be described in detail here. Configure the database connection in the Weblogic console (add a datasource, go to the DbAdapter configuration and add a connection factory. Refer to the just created datasource in the connection factory. update the DbAdapter configuration). You should also add a connection factory for the AqAdapter referencing the same datasource. It's a good idea to use a datasource which only supports local transactions and is not XA capable. This will avoid some issues.

The below screenshots should be self-explanatory. The process can be downloaded here;
http://dl.dropbox.com/u/6693935/blog/ExceptionDemo.zip




Demonstration

Without error

First I offer a message on the source queue;

DECLARE
  queue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
  message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  recipients          DBMS_AQ.aq$_recipient_list_t;
  message_id RAW(16);
  message SYS.XMLType;
BEGIN
  recipients(1) := sys.aq$_agent('SOURCEQUEUETEST', NULL, NULL);
  message_properties.recipient_list := recipients;
  message := sys.XMLType.createXML('<itemCollectionArray xmlns:msg_out="http://test.ms/itemcollections" xmlns="http://test.ms/itemcollections"><msg_out:itemsCollection><msg_out:item><msg_out:name>Name</msg_out:name><msg_out:value>Piet</msg_out:value></msg_out:item></msg_out:itemsCollection></itemCollectionArray>');
  DBMS_AQ.ENQUEUE( queue_name => 'TESTUSER.TEST_SOURCE_QUEUE',
                   enqueue_options => queue_options,
                   message_properties => message_properties,
                   payload => message,
                   msgid => message_id);
  COMMIT;
END;

I confirm that the message is processed successfully by looking at the Enterprise Manager console

And by looking at the TargetQueue


With error

I invalidated the SOA_TEST.gettimestamp function by adding invalid code and recompiling the package. Then I executed the same procedure as in the 'without error' situation. As expected, my process has faulted. The CatchAll caught the exception, put the message on the error queue and terminated the process.


Restoring the faulted messages

The messages on the ErrorQueue can be restored by putting them on the SourceQueue after the problem is fixed. First fix the problem by making the package compilable again. Then execute the following;

DECLARE
  dequeue_options DBMS_AQ.dequeue_options_t;
  message_properties_d DBMS_AQ.message_properties_t;
  message_handle_d RAW(16);
  MESSAGE sys.XMLType;
  no_messages EXCEPTION;
  enqueue_options DBMS_AQ.enqueue_options_t;
  message_properties_e DBMS_AQ.message_properties_t;
  recipients DBMS_AQ.aq$_recipient_list_t;
  message_handle_e RAW(16);
  pragma exception_init (no_messages, -25228);
BEGIN
  recipients(1)                       := sys.aq$_agent('SOURCEQUEUETEST', NULL, NULL);
  message_properties_e.recipient_list := recipients;
  dequeue_options.wait                := DBMS_AQ.NO_WAIT;
  dequeue_options.consumer_name       := 'ERRORQUEUETEST';
  dequeue_options.navigation          := dbms_aq.FIRST_MESSAGE;
  LOOP
    DBMS_AQ.DEQUEUE(queue_name => 'TESTUSER.TEST_ERROR_QUEUE', dequeue_options => dequeue_options, message_properties => message_properties_d, payload => MESSAGE, msgid => message_handle_d);
    DBMS_AQ.ENQUEUE(queue_name => 'TESTUSER.TEST_SOURCE_QUEUE', enqueue_options => enqueue_options, message_properties => message_properties_e, payload => MESSAGE, msgid => message_handle_e);
    dequeue_options.navigation := dbms_aq.NEXT_MESSAGE;
  END LOOP;
EXCEPTION
WHEN no_messages THEN
  DBMS_OUTPUT.PUT_LINE ('No more messages for ERRORQUEUETEST');
  COMMIT;
END;

Confirm that the message is picked up by BPEL and succesfully processed and put in the TargetQueue. If the problem is not fixed, the message will be put back again on the ErrorQueue. Since there's only one commit at the end, the messages will be dequeued and re-enqueued after all the messages are done. This avoids loops such as ErrorQueue -> (re-enqueue) SourceQueue -> (new error in BPEL) -> ErrorQueue and so forth.