AMQP/JMS (Input Agent)

Introduction: Phase 1.


The Input Agent AMQP can receive data from a message queue of a server that implements the Advanced Message Queuing Protocol. This protocol is platform independent and is supported by a large number of systems. Most JMS servers already have an AMQP plugin

Lobster_data can access an AMQP server installed on the same or a different computer. That means that Lobster_data is in the client role.

Note: This Input Agent can also be used to read from JMS server queues.


images/download/attachments/36575286/AMQP_en-version-4-modificationdate-1578879473719-api-v2.png


(1) Only relevant for the add-on module Load Balancing to start the profile on a specific node.

(2) Selection of a AMQP or JMS alias. See section AMQP Connections.

(3) Selection of the message type (Subscriber, Topic, Routing or RPC). The style RPC is synchronous, the others are asynchronous. Note: If you use style RPC, it is additionally necessary to use a Response Route of type Custom Class and the specific class PassBackDataResponse. The reason is that in the case of an RPC call a response is expected by the calling client, which is generated by the mentioned Response Route class in Lobster_data and from there returned to the Input Agent and then to the client. See also sections Third Possibility - HTTP Response Chain and AMQP Networks with Style RPC. Note: If you use style Topic, the checkbox Persistent ('Durable') will appear. Set this checkbox if (4) is a durable queue (such a queue will survive a broker restart) . If you are using ActiveMQ, you must use a JMS Client ID in the connection, otherwise, this generates an error, which you cannot detect directly, but only in the Server Logs under _data/error.logs in the following form:

...javax.jms.JMSException: You cannot create a durable subscriber without specifying a unique clientID on a Connection...

(4) Name of the queue to listen to, resp. the topic. See also (3). Note: If an Azure Service Bus is used and Topic is selected in (3), the following syntax must be used here: <topic_name>/subscriptions/<subscription_name>, e.g. mytopic/subscriptions/mysubscription.

(5) If you need to filter the messages you want to receive, you can use a JMS API message selector, which allows you as consumer to specify the messages you are interested in. See Oracle JMS Message Selectors and the section below.

(6) Here you can set the Consumer Delivery Acknowledgement behaviour, i.e. how Lobster_data confirms the receipt of a message to the message bus, see here, for example. If the checkbox is not set, the default behavior applies, i.e. the receipt is acknowledged after the backup file of the profile job has been created (except for message type RPC). If the checkbox is set, a received message will only be confirmed after the job has been successfully completed. Processing in the background is not possible in that case (parallel processing and queueing of jobs). If the Response Routes have checkbox Execute response in own thread set, the receipt of a message is always confirmed after phase 6. Note: If a message is not confirmed by Lobster_data, the setting of the JMS/AMQP server takes effect and the message may be delivered again or rejected and forwarded to the Dead Letter Queue. If this is the case, you can therefore, similar to working with databases, "commit" a transaction at the end of the processing in the profile if no error has occurred. In the event of an error, profiles for error handling can be set up, for example, which may then process the erroneous messages from the dead letter queue. Note: A dead letter queue can also be created via an AMQP Response Route if the profile that uses this Response Route has a defined system variable AMQP_SYS_<name of the queue argument>. For RabbitMQ, for example, that would be AMQP_SYS_x-dead-letter-exchange, see here.

Note


An AMQP message consists of a header and a body. The body contains data that shall be transformed in the profile. Additionally, the header may contain parameters, which have to have a name and a value. The header parameter NameX, for example, will be available in the profile via variable MSG_CALL_NAMEX, which has to be declared first, however. So the variable name is the prefix MSG_CALL_ followed by the parameter name in upper case. Specific examples: MSG_CALL_GROUPID, MSG_CALL_SUBJECT, MSG_CALL_MESSAGEID, MSG_CALL_CORRELATIONID.

JMS Message Selectors


JMS message selectors use a relatively complex language that is constructed in a similar way to a SQL WHERE clause. Such a selector can be used for filtering on message headers and properties, but not on the message content. Following are a few examples because the Oracle documentation is a bit thin there.


Element

Description

Example Selector

Header fields

Any headers except JMSDestination, JMSExpiration and JMSReplyTo.

JMSPriority = 9

Properties

Message properties that follow Java identifier naming.

releaseYear = 1982

String literals

String literals in single quotes, duplicate to escape.

title = 'Sam''s'

Number literals

Numbers in Java syntax (int and double).

releaseYear = 1982

Boolean literals

TRUE and FALSE.

isAvailable = TRUE

( )

Brackets.

(releaseYear < 1980) OR (releaseYear > 1989)

AND, OR, NOT

Logical operators.

(releaseYear < 1980) AND NOT (title = 'Thriller')

=, <>, <, <=, >, >=

Comparison operators.

(releaseYear < 1980) AND (title <> 'Thriller')

LIKE

String comparison with wildcards '_' (stands for a single character) and '%' (stands for any sequence of characters, including an empty sequence). Both wildcards can be anywhere in the string.

title LIKE 'Thrill%'

IN

Find value in set of strings. The IN operator works only on sets of strings, not on numbers.

title IN ('Off the wall', 'Thriller', 'Bad')

BETWEEN

Check whether number is in range (both numbers inclusive).

releaseYear BETWEEN 1980 AND 1989

IS NULL, IS NOT NULL

Check whether value is null or not null.

releaseYear IS NOT NULL

*, +, -, /

Arithmetic operators.

releaseYear * 2 > 2000 - 20


Note: Take a look at the Response Route AMQP on how to 'send messages to selectors', i.e. how to send messages that can be fetched by a specific selector.

Profile Suspension


If profiles with this Input Agent are suspended, no messages are accepted during the suspension, but remain on the broker. If the suspension is released, it can take up to 60 seconds for these messages to be processed.