Friday, May 4, 2012

Re-enqueueing faulted BPEL messages using Oracle AQ

Introduction

Exception handling is an important topic to consider when using Middleware solutions to link different systems together. Often for example the 24/7 database appears to be more like 23/7 (no 100% up-time) or database packages a composite depends on, get changed without the SOA developer being informed about it. This can cause BPEL processes not to be able to complete successfully.

In a development environment, this is no big deal but in a production environment, where possibly large numbers of messages are processed, you'd better make sure you've thought about how to deal with for example unreachable databases. You don't want to lose messages or have a hard time restoring the faulted messages.

The below pattern provides an option for error handling using Advanced Queues (AQ). It uses an error queue to store messages which have gone wrong in BPEL and allows for an easy mechanism to offer the failed messages again to the process.

The pattern involves three queues. Messages are read from the SourceQueue. A database procedure is called to enrich the source message. If enrichment fails, the message is put on an ErrorQueue and the process is terminated. If all goes well, the resulting message is put on a TargetQueue. Messages from the ErrorQueue can be re-enqueued on the SourceQueue to reinitiate processing of failed messages.

It is suggested that the ErrorQueue and the SourceQueue are in the same database; if the message can be picked up from the source and the process is started, you can be pretty sure the source is available.

An additional benefit of using this pattern is that Oracle Advanced Queues can be used for throttling BPEL processing when BPEL is misused for batch processing.

Setup

Database

I've used http://docs.oracle.com/cd/B10501_01/appdev.920/a96587/apexampl.htm#33919 as a reference to put the PL/SQL AQ code together.

Grants

First create a test user in your database. I've called this user 'testuser'. Then grant the user the required privileges to be able to do some Advanced Queueing;

Execute as system user the following;
GRANT EXECUTE ON DBMS_AQADM TO testuser;
GRANT Aq_administrator_role TO testuser;

Create queue tables and queues

BEGIN
   DBMS_AQADM.CREATE_QUEUE_TABLE
      (queue_table => 'TEST_SOURCE_QUEUE_QT',
       queue_payload_type => 'SYS.XMLType',
       multiple_consumers => TRUE);
END;
/
BEGIN
   DBMS_AQADM.CREATE_QUEUE_TABLE
      (queue_table => 'TEST_TARGET_QUEUE_QT',
       queue_payload_type => 'SYS.XMLType',
       multiple_consumers => TRUE);
END;
/
BEGIN
   DBMS_AQADM.CREATE_QUEUE_TABLE
      (queue_table => 'TEST_ERROR_QUEUE_QT',
       queue_payload_type => 'SYS.XMLType',
       multiple_consumers => TRUE);
END;
/
begin
DBMS_AQADM.CREATE_QUEUE ('TEST_SOURCE_QUEUE', 'TEST_SOURCE_QUEUE_QT');
DBMS_AQADM.CREATE_QUEUE ('TEST_TARGET_QUEUE', 'TEST_TARGET_QUEUE_QT');
DBMS_AQADM.CREATE_QUEUE ('TEST_ERROR_QUEUE', 'TEST_ERROR_QUEUE_QT');
DBMS_AQADM.START_QUEUE ('TEST_SOURCE_QUEUE');
DBMS_AQADM.START_QUEUE ('TEST_TARGET_QUEUE');
DBMS_AQADM.START_QUEUE ('TEST_ERROR_QUEUE');
end;

Now you've created three queue tables and three queues using those tables. I've made the queues multiconsumer for additional flexibility maybe at a later stage. Multiconsumer queues allow for different parties to produce and consume messages from the queue without interfering with each other. Especially for an error queue, this can come in handy.

You don't have to register subscribers to the queue since that's done automatically upon deployment of the BPEL process (in SOA Suite 11.1.1.6 on an 11.2 database). If you're running older software, you can use a script like;
begin
DBMS_AQADM.ADD_SUBSCRIBER ('SOA_GDI.TEST_SOURCE_QUEUE',sys.aq$_agent('EXCEPTIONTEST', null, null));
end;
To add subscribers.

I've used a small database package to simulate an often encountered error; the procedure I want to call is not valid. I wanted to use a database call for the example and was not interested in the functionality of the package.

CREATE OR REPLACE PACKAGE "TESTUSER"."SOA_TEST" AS
  function getsystimestamp return timestamp;
END SOA_TEST;
/
create or replace
PACKAGE BODY SOA_TEST AS
  function getsystimestamp return timestamp AS
  BEGIN
    RETURN systimestamp;
  END getsystimestamp;
END SOA_TEST;

BPEL

The configuration of the database adapter should be familiar and will not be described in detail here. Configure the database connection in the Weblogic console (add a datasource, go to the DbAdapter configuration and add a connection factory. Refer to the just created datasource in the connection factory. update the DbAdapter configuration). You should also add a connection factory for the AqAdapter referencing the same datasource. It's a good idea to use a datasource which only supports local transactions and is not XA capable. This will avoid some issues.

The below screenshots should be self-explanatory. The process can be downloaded here;
http://dl.dropbox.com/u/6693935/blog/ExceptionDemo.zip




Demonstration

Without error

First I offer a message on the source queue;

DECLARE
  queue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
  message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  recipients          DBMS_AQ.aq$_recipient_list_t;
  message_id RAW(16);
  message SYS.XMLType;
BEGIN
  recipients(1) := sys.aq$_agent('SOURCEQUEUETEST', NULL, NULL);
  message_properties.recipient_list := recipients;
  message := sys.XMLType.createXML('<itemCollectionArray xmlns:msg_out="http://test.ms/itemcollections" xmlns="http://test.ms/itemcollections"><msg_out:itemsCollection><msg_out:item><msg_out:name>Name</msg_out:name><msg_out:value>Piet</msg_out:value></msg_out:item></msg_out:itemsCollection></itemCollectionArray>');
  DBMS_AQ.ENQUEUE( queue_name => 'TESTUSER.TEST_SOURCE_QUEUE',
                   enqueue_options => queue_options,
                   message_properties => message_properties,
                   payload => message,
                   msgid => message_id);
  COMMIT;
END;

I confirm that the message is processed successfully by looking at the Enterprise Manager console

And by looking at the TargetQueue


With error

I invalidated the SOA_TEST.gettimestamp function by adding invalid code and recompiling the package. Then I executed the same procedure as in the 'without error' situation. As expected, my process has faulted. The CatchAll caught the exception, put the message on the error queue and terminated the process.


Restoring the faulted messages

The messages on the ErrorQueue can be restored by putting them on the SourceQueue after the problem is fixed. First fix the problem by making the package compilable again. Then execute the following;

DECLARE
  dequeue_options DBMS_AQ.dequeue_options_t;
  message_properties_d DBMS_AQ.message_properties_t;
  message_handle_d RAW(16);
  MESSAGE sys.XMLType;
  no_messages EXCEPTION;
  enqueue_options DBMS_AQ.enqueue_options_t;
  message_properties_e DBMS_AQ.message_properties_t;
  recipients DBMS_AQ.aq$_recipient_list_t;
  message_handle_e RAW(16);
  pragma exception_init (no_messages, -25228);
BEGIN
  recipients(1)                       := sys.aq$_agent('SOURCEQUEUETEST', NULL, NULL);
  message_properties_e.recipient_list := recipients;
  dequeue_options.wait                := DBMS_AQ.NO_WAIT;
  dequeue_options.consumer_name       := 'ERRORQUEUETEST';
  dequeue_options.navigation          := dbms_aq.FIRST_MESSAGE;
  LOOP
    DBMS_AQ.DEQUEUE(queue_name => 'TESTUSER.TEST_ERROR_QUEUE', dequeue_options => dequeue_options, message_properties => message_properties_d, payload => MESSAGE, msgid => message_handle_d);
    DBMS_AQ.ENQUEUE(queue_name => 'TESTUSER.TEST_SOURCE_QUEUE', enqueue_options => enqueue_options, message_properties => message_properties_e, payload => MESSAGE, msgid => message_handle_e);
    dequeue_options.navigation := dbms_aq.NEXT_MESSAGE;
  END LOOP;
EXCEPTION
WHEN no_messages THEN
  DBMS_OUTPUT.PUT_LINE ('No more messages for ERRORQUEUETEST');
  COMMIT;
END;

Confirm that the message is picked up by BPEL and succesfully processed and put in the TargetQueue. If the problem is not fixed, the message will be put back again on the ErrorQueue. Since there's only one commit at the end, the messages will be dequeued and re-enqueued after all the messages are done. This avoids loops such as ErrorQueue -> (re-enqueue) SourceQueue -> (new error in BPEL) -> ErrorQueue and so forth.