Showing posts with label AQ. Show all posts
Showing posts with label AQ. Show all posts

Thursday, July 25, 2013

Oracle SOA Blackbelt training June 2013 Berlin

The Oracle SOA Blackbelt training in Berlin in June of 2013 has provided me with some valuable new insights into various topics related to Oracle SOA Suite. Below are some examples of things which I found interesting to share. These are mostly not literally from the slides but written down in my own words. Some examples have been expanded a bit by additional resources I've found. This not a complete list as the training covers quite a lot of material. I've focused on topics which can relatively easily be implemented or considered. The training also covered a lot of background which is harder to summarize and make concrete in practices/suggestions. The topics are various and not written down in a particular order.

SOA Best practices

The presentation on SOA best practices contained a lot of good suggestions. These are a few of them.

Problem; Over usage of dehydration causes much overhead. Examples; synchronous non-idempotent services, multiple mid-process receives, dehydrate/wait activities in processes.
Recommendations; Avoid chattiness, design services to be idempotent, if possible avoid asynchronous services (callbacks cause thread/transaction overhead)

Problem; Usage of FlowN where N is unconstrained can cause resource problems and lack of control.
Recommendation; Do not base N in FlowN on the data. Design the process using the driver / worker pattern (driver hands small chunks to the worker and the worker processes this). This can for example be implemented by using queues for decoupling/performance.

Problem; Asynchronous services cause overhead. This can become a problem if there are large numbers of asynchronous processes waiting for a response since for every callback, a new thread/transaction is needed and a callback needs to be matched to a correlation table which takes longer if there are a lot of open processes.
Recommendation; Design processes to be synchronous as much as possible. avoid nesting of asynchronous processes. also avoid synchronous processes calling asynchronous processes

Problem; A single BPEL process does batch processing of a large amount of messages. This takes a lot of memory and causes a lot overhead for storing audit information.
Recommendations; Put the work to be done in a separate BPEL process and optimize this process.  design for worst case scenario's. implement retry mechanisms in fault-policies. implement your own scheduling mechanism to spread the load. if no message level processing is needed, ODI might be an option.

Problem; Scope variables are dehydrated and when the variables become large, this causes overhead.
Recommendations; Use local variables whenever possible. assign portions of the message to scope variables.

BPEL is meant for service orchestration. It's not a procedural programming language.
Recommendations; use declarative constructs instead of elaborate custom constructions. use the skip condition instead of if statements. use assertions before and after invokes. use pick activities to time responses

Problem; Identifying BPEL processes can be difficult due to lack of business content in the EM views.
Recommendation; Set the composite title to a business value. it is possible to search for this name. business transaction keys and sensors (both have to be custom implemented) can be used to identify a flow instead of only the ECID since the ECID can not always be traced back to business context.

Diagnostics

I usually tend to look in the log files and in the Enterprise Manager if something goes wrong. There are however several other options;
- creating dumps. the following provides a nice overview; http://docs.oracle.com/cd/E25178_01/admin.1111/e10226/soacompapp_diag.htm#BABJFIFG
- collect info from the MBean browser; oracle.as.soainfra.bpm; Server/bpel:CubeDispatcher ReadXMLDispatcherTrace and oracle.as.soainfra.bpel; Server/BPELEngine:SyncProcessStats and AsyncProcessStats

Database growth (BPEL/BPM)

A concern for managing SOA Suite installations is the growth in database size. It is essential to think about a cleaning strategy. What I learned during the training is that with some programming practices the amount of information saved can also be reduced.

What causes growth;
- creating process instances
- updates to a message payload (workflow)
- asynchronous operations
- process scopes, task assignments. looping back to tasks
- audit (entry and exit of scope and model elements)

Thus it can be more efficient to build larger processes instead of multiple smaller ones. Also scoping and the use of a lot of model elements causes the database size to increase. For example, it could be (untested) more efficient (database space wise) to create a single assign activity with a lot of actions in it instead of several smaller assign activities.

Authorization, authentication and policies

Oracle Platform Security Services provides an abstraction layer to authorization/authentication/role/group providers. In Oracle BPM, users/groups are used but also application roles. The users/groups can be stored by LDAP providers. If there are multiple LDAP providers, these providers can be virtualized by Oracle Virtual Directory (OVD). When OVD is not available at a customer, libOVD can be used to provide a limited lightweight alternative. See for example http://fusionsecurity.blogspot.nl/2012/06/libovd-when-and-how.html. Application roles are stored in a policy store which can also be LDAP based. See for example; http://docs.oracle.com/cd/E12839_01/core.1111/e10043/cfgauthr.htm. Another option is to have it database based. See for example; http://redstack.wordpress.com/2011/10/29/soa11g-database-as-a-policy-store/. Identities can be queried via the browser, for example at; http://localhost:7001/integration/services/IdentityService/identity.

Transactions

The Blackbelt training contained a presentation on the BPEL Engine internals. This provided some additional points to pay attention to when developing. There are 4 'types' of BPEL processes. These 4 types can be categorized by 2 properties; synchronous or asynchronous and durable or transient. The different types behave differently in respect to transactions and threads. This has consequences for exception handling/propagation. The following should be avoided; a transient asynchronous process and a durable synchronous process. Transaction semantics can also have consequences for performance. See for example; http://javaoraclesoa.blogspot.nl/2013/06/oracle-soa-11g-bpel-transaction.html

The Event Delivery Network

Events can be published with different settings; guaranteed delivery and once-and-only-once (OAOO). With the guaranteed delivery setting, local transactions are used (non-XA), the EDN_EVENT_QUEUE is used and there is the possibility of duplicate messages (in case of catastrophic failures). Also, the dequeue transaction is committed when all subscribers have received the message. Retries are not possible in case one subscriber fails to pickup the message. Once and only once setting works differently. A second queue is used; EDN_OAOO_QUEUE. Each subscriber picks up the message in it's own transaction. An XA connection is used with global transactions and the dequeue action can be retried. The EDN can be debugged in the following ways; by using the EDN servlet; http://<host_name>:<port_number>/soa-infra/events/edn-db-log (when EDN is AQ based (which is the default)). This servlet uses the EDN_LOG_MESSAGES table in the SOAINFRA schema. The following loggers are related oracle.integration.platform.blocks.event, oracle.integration.platform.blocks.event.saq, oracle.integration.platform.blocks.event.jms. In the Enterprise Manager, the log level can be tuned. The delivery of messages can be paused by setting the 'Paused' property of oracle.as.soainfra.config/EDNConfig:edn in the System MBean browser.

Local Invocation Optimization

If certain criteria are met, the SOAP/HTTP layer can be skipped when calling a service. The criteria are; the processes have to be on the same server, client/server policies must allow it (this is a property in the policy file). The same server requirement has implications for the use of loadbalancers in cluster configurations. Check the following part of the documentation for more details on how the 'same server check' is performed; http://docs.oracle.com/cd/E28271_01/admin.1111/e10226/soainfra_config.htm. You should also check out https://forums.oracle.com/thread/2302988 for some more information on how to make sure local optimization is used in case of clustering/load balancing setup's. A recommendation is to avoid too many small processes since this increases complexity (in order to achieve local optimization) and overhead.

BPEL fault handling best practices

The following best practices were mentioned in the training (I've rephrased them for brevity);
- always have a catch-all block (selectionFailure for example cannot be caught by using a fault policy)
- use named exceptions for business faults
- when using fault policies, always have a default action
- rethrow faults from fault policies in order to catch them in a BPEL process (when no fault handling action has been defined in the policy)
- notify the source system something has gone wrong
- think about how enable automatic recovery can have impact on transactions and business functionality. this can also be disabled; http://www.albinsblog.com/2011/10/oracle-soa-suite-11g-disabling-auto.html#.Ue_sxm3JWVA
- for asynchronous processes, after a mid-process receive, check the response (which can contain business faults) and terminate the process on error after sending a message to a notification service

MDS

An efficient way to use the MDS is to use a local filebased repository during development and use the database based MDS during runtime. MDS configuration is stored in adf-config.xml files. MDS files can be referred to by a path ; <Store_Root>/<Partition>/<Namespace>/<Resource>. When an MDS object changes, all dependant resources need to be recompiled. The MDS can be used to avoid server startup issues due to dependencies. See; https://blogs.oracle.com/aia/entry/aia_11g_best_practices_for_dec.

Mediator

The Mediator is the only product which has an out of the box resequencer to provide ordening of messages. See for example; http://docs.oracle.com/cd/E17904_01/integration.1111/e10224/med_resequencer.htm for details. Also it currently is the only component supporting Schematron based validations (see for example; http://beatechnologies.wordpress.com/2011/04/06/using-schematron-in-oracle-soa-suite-11g-for-validating-xml/). The Mediator has an Hearbeat infrastructure; if in a clustered environment one instance of a Mediator fails (for whatever reason) this is detected and the other node in the cluster will process the message. Information on message 'lease' is stored in the table; MEDIATOR_CONTAINERID_LEASE. A message is locked when processing starts and released afterwards. The Heartbeat framework can be configured. See; http://docs.oracle.com/cd/E14571_01/integration.1111/e10226/med_config.htm#BABEDHBJ. Sequential routing rules are executed in a single transaction and thread. Parallel routing rules use 3 threads; inbound threads, locker threads and worker threads. Each worker thread uses it's own transaction. Parallel routing rules can be debugged by looking at the MEDIATOR_DEFERRED_MESSAGE. One row is one message in a parallel routing rule.

Oracle Service Bus

Local transport can be used for internal service chaining (for example when calling reusable components). Local transport services can not be invoked from outside the service bus and are not published to a UDDI. See for example http://docs.oracle.com/cd/E23943_01/dev.1111/e15866/local.htm. The split/join pattern which can be implemented in the OSB uses a BEA BPEL implementation and works in-memory (not persistent). The service bus is minimalistic; by default, most features are turned off and the focus is on performance/high throughput. Oracle BPEL for example is more maximalistic and if you want high performance there you should turn things off.

Cube engine internals

Work items for the same instance are not allowed to execute concurrently. This implicates that the parallel execution in for example for-each/while loops is 'simulated' and not truly parallel. This knowledge helps understanding the behaviour of the nonBlockingInvoke setting. See; http://docs.oracle.com/cd/E23943_01/core.1111/e10108/bpel.htm#ASPER99890. The nonBlockingInvoke setting creates new threads for invocations but the invocations are still executed in sequence. In practice, this leads to performance degradation.

Sunday, May 19, 2013

How to deal with services that don't support concurrency? Offer requests one at a time.

When developing service orchestrations using Oracle SOA Suite, an often encountered problem is dealing with unreliable services. This can be services which cannot handle multiple simultaneous requests (don't support concurrency) or don't have an 100% availability (usually due to nightly batches or scheduled maintenance). One way to work with these services is having a good error handling or retry mechanism in place. For example, I've previously described a fault handling mechanism based on using Advanced Queues (AQ); http://javaoraclesoa.blogspot.nl/2012/06/exception-handling-fault-management-and.html. Using this mechanism, you can maintain the order of processing for messages and retry faulted messages. It would however be better if we can avoid faults. In case a service does not support concurrency (because of for example platform limitations or statefulness), messages will have to be offered one at a time.

If the service has a quick response time, you can make a process picking up messages from a AQ, synchronous and thus have only one running process at a time. This has been described at; http://mazanatti.info/index.php?/archives/81-SOA-Suite-11g-truly-singleton-with-AQ-Adapter.html. It's a recommended read.

In this blog post I'll describe a mechanism which can be used if a synchronous solution would not suffice, for example in long running processes. The purpose of this blog post is to illustrate a mechanism and it's components. It should not be used as-is in a production environment but tweaked to business requirements.

Implementation

I'll describe a database based mechanism which consists of several components;
- A database table holding process 'state'. In this example, CREATED, ENQUEUED, RUNNING, DONE
- A DBMS_SCHEDULER job which polls for changes. In my experience this is more stable then using the DbAdapter to do the same.
- A priority AQ to offer messages to BPEL in a specific order and allow loose coupling/flexibility/error handling mechanisms. In my experience this is very reliable.
- A BPEL process consuming AQ messages and calling the service which doesn't support concurrency. There should be only one running instance of this process at a time.


I've created a process state table which holds the process states and provides state history. I've also created a view on this table which only displays the current state. There is a column in the table PROC_NAME. This corresponds to the subscriber used in the BPEL process.

A database job polls for records every minute with state CREATED. If found and no other processes are in state ENQUEUED or RUNNING, a new message is enqueued. I've split the states ENQUEUED and RUNNING to be able to identify which messages have been picked up by the BPEL process and which haven't. There should only be one process in state RUNNING at a time.

I've created a simple HelloWorld BPEL process. This process polls for messages on the AQ. It picks up the message and informs the database that it has picked up a message (set the state to RUNNING). Next I've stubbed calling a service with a wait of one minute. After the period is over, the state is set to DONE. The process looks as followed;



At the end of this post you can download the code. To run the example however, the database needs to have a user TESTUSER with the correct grants to alllow queueing/dequeueing (see supplied script). Also in Weblogic server, there needs to be a JDBC datasource configured and a connection factory (eis/AQ/testuser) defined in the AqAdapter. You can find an example for configuring the DbAdapter at http://kiransaravi.blogspot.nl/2012/08/configuring-dbadapter-its-datasource.html. Configuration for the AqAdapter is very similar.

Running the example

First you need to create the table, trigger, AQ, package, DBMS_SCHEDULER job. This can be done by executing the supplied script.

To start testing the mechanism you can execute the following;

begin
insert into ms_process(proc_name,proc_comment) values('HELLOWORLD','Added new record for test 1');
insert into ms_process(proc_name,proc_comment) values('HELLOWORLD','Added new record for test 2');
insert into ms_process(proc_name,proc_comment) values('HELLOWORLD','Added new record for test 3');
commit;
end;

This will insert 3 records in the process table. These messages will be picked up in order. For implementations in larger applications I recommend using the PROC_SEQ field in the process table to obtain  required information for processing.

After a couple of minutes, you can see the following in the process state table;


As you can see, the messages were created at approximately the same time. The messages are picked up in order of insertion (based on ProcessId). Also as can be seen from the table, when a process is running (the period between state RUNNING and DONE), no other processes are running; there is no overlap in time.

After processing, the process view indicates the latest process state for every process. All processes are done.


In the Enterprise Manager, three processes have been executed and completed.


AQ in a clustered environment

In a clustered environment you have to mind that in an 11.2 database, AQ messages can be picked up twice from the same queue under load. Since this would break the mechanism, I suggest taking the below described workaround.

Bug: 13729601
Added: 20-February-2012
Platform: All
The dequeuer returns the same message in multiple threads in high concurrency environments when Oracle database 11.2 is used. This means that some messages are dequeued more than once. For example, in Oracle SOA Suite, if Service 1 suddenly raises a large number of business events that are subscribed to by Service 2, duplicate instances of Service 2 triggered by the same event may be seen in an intermittent fashion. The same behavior is not observed with a 10.2.0.5 database or in an 11.2 database with event10852 level 16384 set to disable the 11.2 dequeue optimizations.

Workaround: Perform the following steps:

    Log in to the 11.2 database:
    CONNECT /AS SYSDBA

    Specify the following SQL command in SQL*Plus to disable the 11.2 dequeue optimizations:
    SQL> alter system set event='10852 trace name context forever,
    level 16384'scope=spfile;

Considerations

The mechanism described can be used to avoid parallel execution of processes. Even when the processes are long running and synchronous execution is not an option.

Polling

The mechanism contains polling components; the DBMS_SCHEDULER job and the AqAdapter. This has two major drawbacks;
- it will cause load even when the system is idle
- it allows a period between finishing of a process and starting of the next process

You could consider starting the BPEL process actively from the database (thus avoiding polling) by using for example UTL_DBWS (see for example http://orasoa.blogspot.nl/2006/11/calling-bpel-process-with-utldbws.html). This however requires that the URL of the BPEL process is known in the database and that the ACL (Access Control List) is configured correctly. Also error handling should be reconsidered. The overhead of polling is minor. If a delay of 1 minute + default AqAdapter polling frequency is acceptable, a solution based on the described mechanism can be considered. Also, the DBMS_SCHEDULER job polling frequency can be reduced and the AqAdapter polling behavior can be tweaked to reduce the lost time between polls.

Chaining

Ending the process with a polling action -> initiation of the next message is not advisable since it raises several new questions;
- what to do if there are no messages waiting? having a polling mechanism together with this mechanism might break the 'only one process is running at the same time'-rule
- what to do in case of errors -> when the chain is broken

Retiring/activating

I've tried a mechanism which would retire a process at the start and then reactivate it after completion. This would disallow more then one process to be running at the same time. This appeared not to be a solid mechanism. Retiring and activating a process takes time in which new messages could be picked up. Also using the Oracle SOA API during process execution adversely effects performance.

Efficiently determining the current state

I've not tested this solution with large number of processes. I think in that case I should reconsider on how to keep a process history and get to the current state efficiently in a polling run. Most likely I'd use two tables. One for the current state which I would update and another separate table for the history which I would fill with PL/SQL triggers on the current state table.

Download

You can download the BPEL process here; https://dl.dropboxusercontent.com/u/6693935/blog/HelloWorldAQProcState.zip

The databasecode can be downloaded here (you might want to neatify it if for example you like CDM);
https://dl.dropboxusercontent.com/u/6693935/blog/processstate.txt

Monday, February 4, 2013

Comparing Weblogic filestore backed JMS with Oracle AQ backed JMS

In this blog post I will look at two methods of creating JMS queues with durable subscribers in Oracle Weblogic Server 11g. One backed by a persistent file store and the other backed by an Oracle AQ implementation. I will compare several properties of these implementations such as configuration required, performance and maintaining subscribers. I'll describe the issues I encountered during the setup and how I've solved them. I've performed the tests on the Oracle supplied Virtualbox image; http://www.oracle.com/technetwork/middleware/soasuite/learnmore/vmsoa-172279.html

Configuration

First I'll shortly describe the different implementations used. Then I'll show how the queues can be tested using SOAP UI.

AQ implementation

To setup the JMS implementation which is backed by an Oracle AQ, the following steps need to be performed (which are described in more detail on; http://docs.oracle.com/cd/E21764_01/integration.1111/e10231/adptr_jms.htm#BABBBGGB (in 8.4.8));

In the database
- create user
- grant user required rights
- create multi consumer queue table
- create queue using queue table
- start created queue

In the Weblogic server
- create datasource
- create JMSModule
- create foreign server inside JMSModule
- configure foreign server for using datasource and specify queue (destination)
- create connection factory inside foreign server
- create durable subscriptions (using PL/SQL scripts)

Filestore implementation

This is described on; http://middlewaremagic.com/weblogic/?p=4403

The steps to perform are in short;
- create a FileStore
- create JMSServer
- create JMSModule
- create connection factory inside JMSModule
- create subdeployment inside JMSModule
- create topic using FileStore and target to subdeployment
- create durable subscriptions (using Weblogic console)

Using SOAP UI to test the implementations

This is described on; http://learnwithpavan.wordpress.com/2012/02/06/hermesjms-configuration-for-weblogic-11g/. Below are some pointers and screenshots to help get this working.

HermesJMS

First configure HermesJMS; add weblogic.jar (from <MIDDLEWARE_HOME>\wlserver_10.3\server\lib) to the HermesJMS claspath in the hermes.bat file.

Next under the providers tab when creating a session, add the weblogic.jar again.


Then restart HermesJMS and add Weblogic as loader and specify the properties.

To be able to use this configuration in SOAP UI, check the tutorial on; http://www.soapui.org/JMS/working-with-jms-messages.html

To configure in Hermes the two implementations, in the binding you need to specify the connection factory. For the Oracle AQ implementation this is defined as Remote JNDI name under the foreign server in the created JMSModule.


For the Filestore implementation, this is defined directly under the JMSModule  as JNDI name. This is of course if you followed the mentioned descriptions.


I encountered in HermesJMS the following error;
weblogic hermesjms javax.jms.JMSException: Could not create InitialContext: t3://127.0.0.1:7001: Bootstrap to 127.0.0.1/127.0.0.1:7001 failed. It is likely that the remote side declared peer gone on this JVM

This was caused by a network connection which could not be correctly established in my local setup. To avoid similar issues (VM running locally, DHCP server and suspend/resume actions of the VM, misconfigured hosts file, bridged VM network adapter, Windows firewall, etc...), I decided to install SOAP UI inside the VM which worked fine.

I found that the binding setting in which the connectionfactory was specified, did not cause different results when running Discover in HermesJMS (see below).


SOAP UI

In SOAP UI you can add a JMS endpoint to an existing service by right-clicking the binding and selecting 'Add JMS endpoint'. Here you can use the HermesJMS configuration which you have created earlier.

SOAP UI does not need to have weblogic.jar in the classpath. It determines the classpath required based on the HermesJMS config.


After the endpoint is added, it can be tested the same way as a regular webservice. You can check if the service can publish and receive to/from the created topic.


The above part is for the Filestore JMS. When trying the AQ JMS however I got the following exception;

weblogic.jms.common.InvalidDestinationException: [JMSClientExceptions:055090]Foreign destination

This was caused by setting the wrong connection factory in the HermesJMS configuration. I changed the binding value to aqjms/testForeignConnectionFactory (the connection factory defined in the foreign server configuration). Then I encountered various other errors which apparently were already encountered by others (http://jianmingli.com/wp/?p=2950). Also I later found that Oracle has provided a working, well documented, example of this; http://www.oracle.com/technetwork/middleware/weblogic/aq-jms-demo-171733.zip. Checking the documentation in the example is worthwhile!

At first I got the following exception; java.lang.UnsupportedOperationException: Remote JDBC disabled

I fixed it as followed; locate setDomainEnv.sh and set WLS_JDBC_REMOTE_ENABLED to true and restart the server (see for example; https://forums.oracle.com/forums/thread.jspa?threadID=989569).

The next error I encountered was;

Caused by: java.rmi.MarshalException: error marshalling return; nested exception is:
    java.io.NotSerializableException: oracle.jdbc.driver.T4CConnection
    at weblogic.rjvm.ResponseImpl.unmarshalReturn(ResponseImpl.java:237)
    at weblogic.rmi.internal.BasicRemoteRef.invoke(BasicRemoteRef.java:223)
    ... 19 more
Caused by: java.io.NotSerializableException: oracle.jdbc.driver.T4CConnection
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1164)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)

This is fixed by setting the JDBC properties not using JNDI lookup in the foreign server to obtain the datasource.


Then I was able to send a message in HermesJMS. The next error I encountered is in SOAP UI;

Fri Feb 01 06:36:08 PST 2013:ERROR:oracle.jms.AQjmsException: ORA-24047: invalid agent name durableSubscriptionaqjms/TestX, agent name should be of the form NAME
ORA-06512: at "SYS.DBMS_AQADM_SYS", line 6270
ORA-06512: at line 1
ORA-06512: at "SYS.DBMS_AQJMS", line 129
ORA-06512: at line 1

I decided to add a subscriber in the database;

DECLARE
   subscriber          sys.aq$_agent;
BEGIN
   subscriber := sys.aq$_agent('SUBSCRIBER', null, null);
   DBMS_AQADM.ADD_SUBSCRIBER(
      queue_name         => 'JMSTOPIC',
      subscriber         =>  subscriber);
END;
/
COMMIT;

If you forget to commit, the below part will make SOAP UI hang!

Specify the subscriber while enqueueing;


Finally it worked... Now I could compare and test the different implementations.

Results

There are of course a lot of properties which can be compared. I choose performance, management of durable subscribers and monitoring of messages. I didn't look into it in much detail but it should give some idea of major differences between the implementations.

Performance

AQ JMS

Load test in SOAP UI. First I tested the AQ JMS implementation.

With 50 threads; avg response; 449.82ms, failure rate; 90.17%


With one thread; avg response; 76.05ms, failure rate; 0%


More then one thread lead to slow responses and a high error rate. I don't believe the failure rate is something which can't be fixed with some tweaking of settings but I haven't looked into it.

Filestore JMS

I should be honest here. I've set the Client ID policy to unrestricted and the Subscription Sharing Policy to Sharable. See below.


With 50 threads; avg response 58.79ms, failure rate; 0.018%


With one thread; avg response 20.37ms, failure rate; 0%


Maintaining the subscribers

When using AQ JMS, the durable subscribers can be maintained by using the Oracle Enterprise Manager and by using PL/SQL scripts. When using the Filestore implementation, the durable subscribers can be maintained from the Weblogic console and by using WLST.

AQ JMS
Support from the Oracle Enterprise manager; http://docs.oracle.com/cd/B28359_01/server.111/b28420/manage.htm#i1005928

Adding subscribers can also be achieved by scripting them with PL/SQL;

Adding;
DECLARE
   subscriber          sys.aq$_agent;
BEGIN
   subscriber := sys.aq$_agent('SUBSCRIBER', null, null);
   DBMS_AQADM.ADD_SUBSCRIBER(
      queue_name         => 'JMSTOPIC',
      subscriber         =>  subscriber);
END;

/

Removing;
DECLARE
   subscriber       sys.aq$_agent;
BEGIN
   subscriber := sys.aq$_agent ('SUBSCRIBER', null, null);
   DBMS_AQADM.REMOVE_SUBSCRIBER(
      queue_name => 'JMSTOPIC',
      subscriber => subscriber);
END;
/


Filestore JMS
Weblogic interface

This can also be scripted by using WLST e.g.  http://www.javamonamour.org/2011/09/wlst-deleting-durable-subscribers.html

Checking messages

AQ JMS
AQ JMS messages can be checked by looking at the queue tables and viewes;


Also the queue behavior can be altered using for example SQLDeveloper (http://www.oracle.com/technetwork/developer-tools/sql-developer/overview/index.html).

Filestore JMS
Messages can be viewed from the Weblogic console. The behavior of the queue can also be altered from the console.
Conclusion

Initial configuration required
AQ JMS requires configuration in the Oracle database and in the Weblogic console. A filestore JMS implementation requires only (and less) configuration in the weblogic server. Configuration of testing tools required more work (mainly due to the errors encountered) in SOAP UI /HermesJMS for the AQ JMS implementation then for the Filestore JMS implementation. If you know how exactly you have to configure SOAPUI/HermesJMS, the work required on the client side is similar.

Performance
The performance of the Filestore JMS implementation was significantly better (about 7 times) when using 50 threads. When using one thread, the performance of the Filestore JMS implementation was about 3 times better. The failure rate for the AQ JMS implementation when using 50 threads was very high. This is likely due to a configuration error or wrong test case.

Subscribers and managing messages
Managing subscribers is similar for both implementations in complexity, GUI support and scripting options. Messages can be managed similarly in both implementations. I've not looked into error handling and re-enqueueing options.

Thursday, June 21, 2012

Exception handling; fault management and priority messages

Introduction

Exception handling is often a topic which is paid too little attention to. Functional people often consider this a technical topic and technical people consider the handling of error situations to mostly be functional in nature. This difference in opinion can result in software going to a production environment which is difficult to maintain.

It is suggested to first read the following two articles to understand the background of this article;
http://javaoraclesoa.blogspot.nl/2012/05/exception-handling-in-soa-suite-10g-and.html
http://javaoraclesoa.blogspot.nl/2012/05/re-enqueueing-faulted-bpel-messages.html

The below article describes a combination of exception handling by using a custom Java class in a fault policy to retire the process and a BPEL catch branch to reenqueue a message with a higher priority.

The implementation will satisfy the following requirements;

If an error occurs
- the process will be retired to prevent future faults in case a service invocation fails (fault management framework and custom Java fault handler)
- the faulted message must be put on a queue so it can easily be re-offered to the process after the error has been resolved (catch branch in BPEL using AQ adapter to re-enqueue a message)
- the faulted message must be picked up before the other messages in the queue if the problem is fixed in order to retain the order of messages processed (AQ with sorting on priority)

The fault management framework

Summary

The fault management framework does not handle all exceptions which occur but only invoke exceptions. The fault management framework takes precedence over the BPEL catch branch. The fault management framework makes it easy to use custom Java fault handlers to for example retire a process.

The BPEL catch branch does not allow a custom Java class to handle the exception. You can however use Java embedding. See for an example; http://javaoraclesoa.blogspot.nl/2012/03/base64encode-and-base64decode-in-bpel.html. Mind that when an exception is rethrown in a Catch branch and a CatchAll branch is present on the same level, the exception is propagated to the higher level and not caught by the CatchAll branch!

Example

The following example will retire a process when errors occur in invoke activities. The rethrow action makes sure that after the process has been retired, the error is propagated to the catch activities in the BPEL process.

The following code can be put in the <SCA-project>/SCA-INF/src/ms/exceptiontest.soa.faultHandlers folder;

package ms.exceptiontest.soa.faultHandlers;

import com.collaxa.cube.engine.fp.BPELFaultRecoveryContextImpl;
import java.util.logging.Logger;
import oracle.integration.platform.faultpolicy.IFaultRecoveryContext;
import oracle.integration.platform.faultpolicy.IFaultRecoveryJavaClass;
import oracle.soa.management.facade.Composite;
import oracle.soa.management.facade.Locator;
import oracle.soa.management.facade.LocatorFactory;

public class retireFaultHandler implements IFaultRecoveryJavaClass {
    private final static Logger logger =
        Logger.getLogger(retireFaultHandler.class.getName());

    public void handleRetrySuccess(IFaultRecoveryContext iFaultRecoveryContext) {
    }

    public String handleFault(IFaultRecoveryContext iFaultCtx) {
        try {
            BPELFaultRecoveryContextImpl bpelCtx =
                (BPELFaultRecoveryContextImpl)iFaultCtx;
            Locator loc = LocatorFactory.createLocator();
            Composite comp =
                loc.lookupComposite(bpelCtx.getProcessDN().getCompositeDN());
            comp.retire();
            System.out.println("retired " + comp.getDN());

        } catch (Exception e) {
            logger.severe("Error in FaultHandler " +
                          retireFaultHandler.class.getName());
            e.printStackTrace();
        }
        return "OK";
    }
}

Then you can use the following fault-policies.xml

<?xml version="1.0" encoding="UTF-8"?>
<faultPolicies xmlns="http://schemas.oracle.com/bpel/faultpolicy"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <faultPolicy version="2.1.3" id="ConnectionFaults">
    <Conditions>
      <faultName>
        <condition>
          <action ref="retireProcess"/>
        </condition>
      </faultName>
    </Conditions>
    <Actions>
      <Action id="retireProcess">
        <javaAction className="ms.exceptiontest.soa.faultHandlers.retireFaultHandler"
                    defaultAction="ora-rethrow-fault">
          <returnValue value="OK" ref="ora-rethrow-fault"/>
        </javaAction>
      </Action>
      <!-- Generics -->
      <Action id="ora-terminate">
        <abort/>
      </Action>
      <Action id="ora-replay-scope">
        <replayScope/>
      </Action>
      <Action id="ora-rethrow-fault">
        <rethrowFault/>
      </Action>
      <Action id="ora-human-intervention">
        <humanIntervention/>
      </Action>
    </Actions>
  </faultPolicy>
</faultPolicies>

And fault-bindings.xml

<?xml version="1.0" encoding="UTF-8"?>
<faultPolicyBindings version="2.0.1"
                     xmlns="http://schemas.oracle.com/bpel/faultpolicy"
                     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <composite faultPolicy="ConnectionFaults"/>
</faultPolicyBindings>

Re-enqueue messages using a higher priority

PL/SQL queue code

To be able to re-enqueue messages using a higher priority, a multiconsumer queue is created. Creating a multiconsumer queue is not required but allows more flexibility since multiple parties can become subscriber on the queue.

The queue uses a sort_list of priority,enq_time making sure messages with higher priority are dequeued first. The retention time is set to 1 week since even if messages are dequeued, it is handy if we can still find them to check their contents. The user holding the queues (and queue package), should have the following grants;

GRANT EXECUTE ON DBMS_AQADM TO testuser;
GRANT EXECUTE ON DBMS_AQ TO testuser;
GRANT AQ_ADMINISTRATOR_ROLE TO testuser;

--creating queue tables
BEGIN
  DBMS_AQADM.CREATE_QUEUE_TABLE( Queue_table => '"TESTUSER"."SOA_MULTI_QT"', Queue_payload_type => 'SYS.XMLTYPE', Sort_list => 'PRIORITY,ENQ_TIME', Multiple_consumers => TRUE, Compatible => '8.1.3');
END;

--creating queues
BEGIN
  DBMS_AQADM.CREATE_QUEUE( Queue_name => 'TESTUSER.SOA_MULTI_QUEUE', Queue_table => 'TESTUSER.SOA_MULTI_QT', Queue_type => 0, Max_retries => 5, Retry_delay => 0, Retention_time => '604800', dependency_tracking => FALSE, COMMENT => 'multi queue');
END;

--adding subscribers
begin
DBMS_AQADM.ADD_SUBSCRIBER ('TESTUSER.SOA_MULTI_QUEUE',sys.aq$_agent('FAULTTEST', null, null));
DBMS_AQADM.Start_queue( Queue_name => 'TESTUSER.SOA_MULTI_QUEUE',TRUE,TRUE );
end;

To drop the queue, queuetable, etc, the following can be used

BEGIN
  DBMS_AQADM.Stop_queue( Queue_name => 'TESTUSER.SOA_MULTI_QUEUE' );
  DBMS_AQADM.DROP_QUEUE( Queue_name => 'TESTUSER.SOA_MULTI_QUEUE' );
  DBMS_AQADM.DROP_QUEUE_TABLE( Queue_table => '"TESTUSER"."SOA_MULTI_QT"');
END;

To allow other users to enqueue messages without having to grant them rights to the DBMS_AQ package, the following package can be used;

CREATE OR REPLACE
PACKAGE soa_queue_pack
IS
type t_recipients_list IS TABLE OF VARCHAR2 (50); -- index by binary_integer;

PROCEDURE vul_multi_queue(
    p_queue_naam      IN VARCHAR2 ,
    p_xml_payload     IN xmltype ,
    p_priority        IN BINARY_INTEGER ,
    p_recipients_list IN t_recipients_list);
END soa_queue_pack;

create or replace
PACKAGE BODY soa_queue_pack
IS
PROCEDURE vul_multi_queue(
    p_queue_naam      IN VARCHAR2 ,
    p_xml_payload     IN xmltype ,
    p_priority        IN BINARY_INTEGER ,
    p_recipients_list IN t_recipients_list )
IS
  l_msg_aq raw(18);
  l_enq_opt dbms_aq.enqueue_options_t;
  l_msg_prop dbms_aq.message_properties_t;
  l_recipients_list dbms_aq.aq$_recipient_list_t;
BEGIN
  FOR i IN p_recipients_list.FIRST .. p_recipients_list.LAST
  LOOP
    l_recipients_list(i) := sys.aq$_agent(p_recipients_list(i),NULL,NULL);
  END LOOP;
  l_msg_prop.priority       := p_priority;
  l_msg_prop.recipient_list := l_recipients_list;
  dbms_aq.enqueue(p_queue_naam, l_enq_opt, l_msg_prop, p_xml_payload, l_msg_aq);
END;
END;

Other users can then do the following to enqueue messages ('testuser' is the user who owns the package and queues);

DECLARE
l_recipients          testuser.soa_queue_pack.t_recipients_list;
l_message_id RAW(16);
l_message SYS.XMLType;
BEGIN
l_recipients := testuser.soa_queue_pack.t_recipients_list('FAULTTEST');
l_message := sys.XMLType.createXML('<itemCollectionArray xmlns:msg_out="http://test.ms/itemcollections" xmlns="http://test.ms/itemcollections"><msg_out:itemsCollection><msg_out:item><msg_out:name>name</msg_out:name><msg_out:value>Piet</msg_out:value></msg_out:item></msg_out:itemsCollection></itemCollectionArray>');
testuser.soa_queue_pack.vul_multi_queue('TESTUSER.SOA_MULTI_QUEUE',l_message,5,l_recipients);
END;

This can for example be used in a PL/SQL table trigger in a different schema to enqueue a message on a certain status change or insert.

BPEL dequeueing and enqueueing

In BPEL you can specify the consumer a property in the AQ wizard for the dequeue operation;


For enqueueing, you can specify the enqueue priority to for example make sure messages causing errors, are picked up first when the problem is fixed.


To make sure no loops are caused and to throttle the speed at which messages are processed by the AQ adapter, the following properties can be used in the composite.xml;

  <service name="soa_multi_queue_AQ"
           ui:wsdlLocation="soa_multi_queue_AQ.wsdl">
    <interface.wsdl interface="http://xmlns.oracle.com/pcbpel/adapter/aq/bpel/multiqueue/soa_multi_queue_AQ#wsdl.interface(Dequeue_ptt)"/>
    <binding.jca config="soa_multi_queue_AQ_aq.jca"/>
    <property name="minimumDelayBetweenMessages">10000</property>
    <property name="adapter.aq.dequeue.threads" type="xs:string" many="false">1</property>
  </service>

The above setting causes 1 message every 10 seconds to be picked up from the queue.

Demonstration

The following BPEL process has been created;
This process calls an HelloWorld webservice. This webservice can be shutdown or retired in order to simulate a RemoteException (failure to invoke the webservice). The fault policy will call the custom Java action to retire the process. After retirement, the fault is rethrown and caught by the catchall block. This will re-enqueue the message with a higher priority. Upon fixing the error and enabling the process, the faulted message is picked up first and processed correctly.

First a message is put on the queue by executing;
DECLARE
l_recipients          testuser.soa_queue_pack.t_recipients_list;
l_message_id RAW(16);
l_message SYS.XMLType;
BEGIN
l_recipients := testuser.soa_queue_pack.t_recipients_list('FAULTTEST');
l_message := sys.XMLType.createXML('<itemCollectionArray xmlns:msg_out="http://test.ms/itemcollections" xmlns="http://test.ms/itemcollections"><msg_out:itemsCollection><msg_out:item><msg_out:name>name</msg_out:name><msg_out:value>Piet</msg_out:value></msg_out:item></msg_out:itemsCollection></itemCollectionArray>');
testuser.soa_queue_pack.vul_multi_queue('TESTUSER.SOA_MULTI_QUEUE',l_message,5,l_recipients);
END;

Then confirm the message is processed succesfully;

Next, disable the HelloWorld service by retiring it and re-enqueue a message. Check how CallHelloWorld handles the exception. The process is retired and the message is re-enqueued with a higher priority.



You can also confirm that after enabling the HelloWorld service and Activating the CallHelloWorld process, it first picks up the faulted message. You can download the sample (BPEL/Java code) at; http://dl.dropbox.com/u/6693935/blog/ExceptionTest.zip

Friday, May 4, 2012

Re-enqueueing faulted BPEL messages using Oracle AQ

Introduction

Exception handling is an important topic to consider when using Middleware solutions to link different systems together. Often for example the 24/7 database appears to be more like 23/7 (no 100% up-time) or database packages a composite depends on, get changed without the SOA developer being informed about it. This can cause BPEL processes not to be able to complete successfully.

In a development environment, this is no big deal but in a production environment, where possibly large numbers of messages are processed, you'd better make sure you've thought about how to deal with for example unreachable databases. You don't want to lose messages or have a hard time restoring the faulted messages.

The below pattern provides an option for error handling using Advanced Queues (AQ). It uses an error queue to store messages which have gone wrong in BPEL and allows for an easy mechanism to offer the failed messages again to the process.

The pattern involves three queues. Messages are read from the SourceQueue. A database procedure is called to enrich the source message. If enrichment fails, the message is put on an ErrorQueue and the process is terminated. If all goes well, the resulting message is put on a TargetQueue. Messages from the ErrorQueue can be re-enqueued on the SourceQueue to reinitiate processing of failed messages.

It is suggested that the ErrorQueue and the SourceQueue are in the same database; if the message can be picked up from the source and the process is started, you can be pretty sure the source is available.

An additional benefit of using this pattern is that Oracle Advanced Queues can be used for throttling BPEL processing when BPEL is misused for batch processing.

Setup

Database

I've used http://docs.oracle.com/cd/B10501_01/appdev.920/a96587/apexampl.htm#33919 as a reference to put the PL/SQL AQ code together.

Grants

First create a test user in your database. I've called this user 'testuser'. Then grant the user the required privileges to be able to do some Advanced Queueing;

Execute as system user the following;
GRANT EXECUTE ON DBMS_AQADM TO testuser;
GRANT Aq_administrator_role TO testuser;

Create queue tables and queues

BEGIN
   DBMS_AQADM.CREATE_QUEUE_TABLE
      (queue_table => 'TEST_SOURCE_QUEUE_QT',
       queue_payload_type => 'SYS.XMLType',
       multiple_consumers => TRUE);
END;
/
BEGIN
   DBMS_AQADM.CREATE_QUEUE_TABLE
      (queue_table => 'TEST_TARGET_QUEUE_QT',
       queue_payload_type => 'SYS.XMLType',
       multiple_consumers => TRUE);
END;
/
BEGIN
   DBMS_AQADM.CREATE_QUEUE_TABLE
      (queue_table => 'TEST_ERROR_QUEUE_QT',
       queue_payload_type => 'SYS.XMLType',
       multiple_consumers => TRUE);
END;
/
begin
DBMS_AQADM.CREATE_QUEUE ('TEST_SOURCE_QUEUE', 'TEST_SOURCE_QUEUE_QT');
DBMS_AQADM.CREATE_QUEUE ('TEST_TARGET_QUEUE', 'TEST_TARGET_QUEUE_QT');
DBMS_AQADM.CREATE_QUEUE ('TEST_ERROR_QUEUE', 'TEST_ERROR_QUEUE_QT');
DBMS_AQADM.START_QUEUE ('TEST_SOURCE_QUEUE');
DBMS_AQADM.START_QUEUE ('TEST_TARGET_QUEUE');
DBMS_AQADM.START_QUEUE ('TEST_ERROR_QUEUE');
end;

Now you've created three queue tables and three queues using those tables. I've made the queues multiconsumer for additional flexibility maybe at a later stage. Multiconsumer queues allow for different parties to produce and consume messages from the queue without interfering with each other. Especially for an error queue, this can come in handy.

You don't have to register subscribers to the queue since that's done automatically upon deployment of the BPEL process (in SOA Suite 11.1.1.6 on an 11.2 database). If you're running older software, you can use a script like;
begin
DBMS_AQADM.ADD_SUBSCRIBER ('SOA_GDI.TEST_SOURCE_QUEUE',sys.aq$_agent('EXCEPTIONTEST', null, null));
end;
To add subscribers.

I've used a small database package to simulate an often encountered error; the procedure I want to call is not valid. I wanted to use a database call for the example and was not interested in the functionality of the package.

CREATE OR REPLACE PACKAGE "TESTUSER"."SOA_TEST" AS
  function getsystimestamp return timestamp;
END SOA_TEST;
/
create or replace
PACKAGE BODY SOA_TEST AS
  function getsystimestamp return timestamp AS
  BEGIN
    RETURN systimestamp;
  END getsystimestamp;
END SOA_TEST;

BPEL

The configuration of the database adapter should be familiar and will not be described in detail here. Configure the database connection in the Weblogic console (add a datasource, go to the DbAdapter configuration and add a connection factory. Refer to the just created datasource in the connection factory. update the DbAdapter configuration). You should also add a connection factory for the AqAdapter referencing the same datasource. It's a good idea to use a datasource which only supports local transactions and is not XA capable. This will avoid some issues.

The below screenshots should be self-explanatory. The process can be downloaded here;
http://dl.dropbox.com/u/6693935/blog/ExceptionDemo.zip




Demonstration

Without error

First I offer a message on the source queue;

DECLARE
  queue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
  message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  recipients          DBMS_AQ.aq$_recipient_list_t;
  message_id RAW(16);
  message SYS.XMLType;
BEGIN
  recipients(1) := sys.aq$_agent('SOURCEQUEUETEST', NULL, NULL);
  message_properties.recipient_list := recipients;
  message := sys.XMLType.createXML('<itemCollectionArray xmlns:msg_out="http://test.ms/itemcollections" xmlns="http://test.ms/itemcollections"><msg_out:itemsCollection><msg_out:item><msg_out:name>Name</msg_out:name><msg_out:value>Piet</msg_out:value></msg_out:item></msg_out:itemsCollection></itemCollectionArray>');
  DBMS_AQ.ENQUEUE( queue_name => 'TESTUSER.TEST_SOURCE_QUEUE',
                   enqueue_options => queue_options,
                   message_properties => message_properties,
                   payload => message,
                   msgid => message_id);
  COMMIT;
END;

I confirm that the message is processed successfully by looking at the Enterprise Manager console

And by looking at the TargetQueue


With error

I invalidated the SOA_TEST.gettimestamp function by adding invalid code and recompiling the package. Then I executed the same procedure as in the 'without error' situation. As expected, my process has faulted. The CatchAll caught the exception, put the message on the error queue and terminated the process.


Restoring the faulted messages

The messages on the ErrorQueue can be restored by putting them on the SourceQueue after the problem is fixed. First fix the problem by making the package compilable again. Then execute the following;

DECLARE
  dequeue_options DBMS_AQ.dequeue_options_t;
  message_properties_d DBMS_AQ.message_properties_t;
  message_handle_d RAW(16);
  MESSAGE sys.XMLType;
  no_messages EXCEPTION;
  enqueue_options DBMS_AQ.enqueue_options_t;
  message_properties_e DBMS_AQ.message_properties_t;
  recipients DBMS_AQ.aq$_recipient_list_t;
  message_handle_e RAW(16);
  pragma exception_init (no_messages, -25228);
BEGIN
  recipients(1)                       := sys.aq$_agent('SOURCEQUEUETEST', NULL, NULL);
  message_properties_e.recipient_list := recipients;
  dequeue_options.wait                := DBMS_AQ.NO_WAIT;
  dequeue_options.consumer_name       := 'ERRORQUEUETEST';
  dequeue_options.navigation          := dbms_aq.FIRST_MESSAGE;
  LOOP
    DBMS_AQ.DEQUEUE(queue_name => 'TESTUSER.TEST_ERROR_QUEUE', dequeue_options => dequeue_options, message_properties => message_properties_d, payload => MESSAGE, msgid => message_handle_d);
    DBMS_AQ.ENQUEUE(queue_name => 'TESTUSER.TEST_SOURCE_QUEUE', enqueue_options => enqueue_options, message_properties => message_properties_e, payload => MESSAGE, msgid => message_handle_e);
    dequeue_options.navigation := dbms_aq.NEXT_MESSAGE;
  END LOOP;
EXCEPTION
WHEN no_messages THEN
  DBMS_OUTPUT.PUT_LINE ('No more messages for ERRORQUEUETEST');
  COMMIT;
END;

Confirm that the message is picked up by BPEL and succesfully processed and put in the TargetQueue. If the problem is not fixed, the message will be put back again on the ErrorQueue. Since there's only one commit at the end, the messages will be dequeued and re-enqueued after all the messages are done. This avoids loops such as ErrorQueue -> (re-enqueue) SourceQueue -> (new error in BPEL) -> ErrorQueue and so forth.