RabbitMQ-Anbindung - Tutorial

Last Update: 2018

Dieses Tutorial beschreibt das Verwenden von Profilen als AMQP-Client (wir stellen keinen Server zur Verfügung). Es muss ein externer AMQP-Server vorhanden sein. Hinweis: AMQP-Messages sind nicht zu verwechseln mit dem internen Lobster-Nachrichtentyp "Message". Das Prinzip ist hierbei zwar ähnlich, es handelt sich bei letzterem aber um eine internes und proprietäres Nachrichtenformat.

Für dieses Tutorium werden wir beispielhaft auf einem Windows-System einen AMQP-Server (RabbitMQ) installieren und zwei Profile zum Empfangen und Senden von AMQP-Nachrichten erstellen.

Aktuell werden die AMQP-Protokoll-Versionen 0.9.1 und 1.0 unterstützt.


Installation von RabbitMQ und Erlang


Bitte gehen Sie auf die Seite http://www.rabbitmq.com/install-windows.html (für andere Betriebssysteme gibt es dort Links zu entsprechenden Seiten).

Als Erstes müssen Sie Erlang herunterladen und installieren.

Danach laden Sie bitte den Installer für den RabbitMQ-Server herunter und führen diesen aus (für dieses Tutorial wurde die Version 3.4.3 verwendet). Durch den Installer wird ein RabbitMQ-Server-Dienst installiert. Der Dienst wird nach der Installation bereits laufen und Sie können die Default-Konfiguration für die Zwecke dieses Tutorials verwenden.


Management-Plugin aktivieren


images/download/attachments/137302208/AMQP_1_RabbitMQ_management-version-1-modificationdate-1684396075437-api-v2.png


Der RabbitMQ-Server stellt Ihnen eine Web-Oberfläche zum Management zur Verfügung, wie Sie das vielleicht bereits von ähnlichen Systemen kennen. Allerdings muss diese Web-Oberfläche, das Management-Plugin, erst aktiviert werden. Hierzu sehen Sie sich bitte die Seite http://www.rabbitmq.com/management.html an. Gehen Sie in das Installationsverzeichnis "RabbitMQ Server/rabbitmq_server-3.4.3/sbin/" und führen dort den Kommadozeilen-Befehl "rabbitmq-plugins enable rabbitmq_management" aus.

Wenn Sie alles korrekt durchgeführt haben, können Sie nun das Management-Plugin mit Ihrem Browser unter der URL "http://localhost:15672/" aufrufen. Die Default-Zugangsdaten sind "guest/guest". Sie erhalten eine Ansicht wie im obigen Screenshot.


Queue erstellen und eine Message einstellen


images/download/attachments/137302208/AMQP_3_Add_new_queue-version-1-modificationdate-1684396075429-api-v2.png


Als Nächstes werden wir eine Queue mit dem Namen "test" erstellen. Verwenden Sie dazu bitte alle Einstellungen, wie Sie im obigen Screenshot gezeigt sind.


Message einstellen


images/download/attachments/137302208/AMQP_4_New_test_message-version-1-modificationdate-1684396075445-api-v2.png


Noch befindet sich keine Nachricht (Message) in der neuen Queue, deshalb werden wir nun manuell mit der Management-Oberfläche eine Message in die Queue einstellen. Gehen Sie dabei bitte wie im obigen Screenshot vor.


AMQP-Alias erstellen


Zur Erstellung eines AMQP-Aliases siehe Abschnitt AMQP-Verbindungen.

Zur vereinfachten Darstellung werden wir uns auf das Nötigste beschränken und lediglich folgenden Eintrag verwenden.


images/download/attachments/137302208/735-version-1-modificationdate-1684396075425-api-v2.png


Profil mit Eingangsagent "AMQP" erstellen


images/download/attachments/137302208/736-version-1-modificationdate-1684396075423-api-v2.png

Nun erstellen wir ein neues Profil mit einem Eingangsagenten vom Typ "AMQP". Bitte verwenden Sie die Einstellungen des obigen Screenshots. Sie können im Profil die Option "Kein Mapping" (Seite "Basis-Daten") verwenden und den Ausgangsweg frei lassen. Dann speichern Sie das Profil und achten sie darauf, dass es aktiv ist! Spätestens nach einem Neustart des Integration Server wird die Verbindung zum AMQP-Server aufgebaut werden. Dort wird dann unser neues Profil als "Consumer" auf die Queue "test" hören.

Zu den Parametern


(1) Der Alias, den Sie eingerichtet haben.

(2) Auswahl der "Nachrichten-Art": "Subscriber", "Topic", "Routing", "RPC". Der Style "RPC" ist synchron, die anderen asynchron. Details zu den verschiedenen Styles entnehmen Sie bitte den Tutorials auf https://www.rabbitmq.com/getstarted.html. In unserem Fall verwenden wir "Subscriber".

(3) Name der Queue. In unserem Fall test.

(4) Mit der Checkbox "Persistent" markieren Sie im Eingangsagenten, dass es sich bei der angegebenen Queue um eine "durable" Queue handelt. In unserem Beispiel haben wir eine "transient" Queue verwendet, deswegen verwenden wir die Option Persistent nicht. Details entnehmen Sie bitte auf der Seite https://www.rabbitmq.com/tutorials/amqp-concepts.html dem Abschnitt "Queues".


Neuer Consumer für Test-Queue


images/download/attachments/137302208/AMQP_5_Consumer-version-1-modificationdate-1684396075444-api-v2.png


Wenn Sie alles korrekt eingestellt haben, werden Sie in der Management-Oberfläche eine Ansicht wie im obigen Screenshot vorfinden. Sie sehen dort unser Profil als einen neuen Consumer für die Queue "test". Sollten Sie hier keinen neuen Consumer finden, dann hat etwas mit der Verbindung nicht funktioniert. Sehen Sie sich dazu bitte unten das Kapitel "Verbindungsfehler" an.


Erfolgreiches Profil im Control Center



Wenn der Verbindungsaufbau funktioniert hat, das Profil anlief und sich die Nachrichten von der Queue "test" abgeholt hat, werden Sie im Control Center einen Job vorfinden. Die erhaltenen Dateien können Sie in gewohnter Weise anzeigen lassen (und natürlich auch im Profil verarbeiten).


Profil mit Antwortweg "AMQP" erstellen


images/download/attachments/137302208/737-version-1-modificationdate-1684396075422-api-v2.png

Um Nachrichten (Messages) an unsere Queue "test" zu versenden, können wir ebenfalls ein Profil erstellen. Hierzu können Sie ein einfaches Mapping aufbauen oder einfach eine manuell hochgeladene Datei ohne Mapping durchreichen. Entscheidend ist lediglich der erstellte Ausgangsweg. Konfigurieren Sie diesen bitte wie im obigen Screenshot. Sollte der Verbindungsaufbau und das Einstellen in die Queue test funktioniert haben, können Sie das erneut über die Management-Oberfläche und das Control Center überprüfen. Sehen Sie dort Ihre Nachricht nicht, sehen Sie sich bitte das folgende Kapitel an.

Zu den Parametern


(1) Der Alias, den Sie eingerichtet haben.

(2) Auswahl des Typs: "Publish", "Routing", "Topic", "RPC". Der Style "RPC" ist synchron, die anderen asynchron. Details zu den verschiedenen Styles entnehmen Sie bitte den Tutorials auf https://www.rabbitmq.com/getstarted.html. In unserem Fall verwenden wir "Publish".

(3) Name der Queue. In unserem Fall "test".

(4) Mit "Persistent" markieren Sie im Ausgangsweg nicht nur, dass es sich bei der angegebenen Queue um eine "durable" Queue handelt, sondern auch, dass eine "persistent message" versendet wird. In unserem Beispiel haben wir eine transient Queue verwendet, deswegen verwenden wir die Option "Persistent" nicht. Details entnehmen Sie bitte auf der Seite https://www.rabbitmq.com/tutorials/amqp-concepts.html dem Abschnitt "Queues".


Verbindungsfehler


Um einem möglichen Verbindungsfehler nachzugehen, öffnen Sie bitte die Datei ./logs/services/error.log.

Ein möglicher Fehler wäre zum Beispiel, wenn Sie versuchen auf eine "durable" Queue zuzugreifen, ohne im Eingangsagenten vom Typ "AMQP" die Option "Persistent" gesetzt zu haben. Dann würden Sie eine Fehlermeldung folgender Art erhalten.


16:51:23 SYSTEM:DATAWIZARD:AMQP [AMQP Listener@AMQP] Exception in binding to AMQP queue, retrying in background:
java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
...
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method
(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'test' in vhost '/':
received 'false' but current is 'true', class-id=50, method-id=10), null, ""}
...


Natürlich könnte auch einfach der Dienst nicht laufen, das würde sich dann folgendermaßen äußern.


18:30:33 SYSTEM:DATAWIZARD:AMQPMANAGER Unable to connect to AMQP server with alias 'TestAMQP', retrying in background:
java.net.SocketTimeoutException: connect timed out
at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
...