Kafka (Eingangsagent)
Wichtiger Hinweis: Der Eingangsagent ist nur auswählbar, wenn mindestens eine Kafka-Verbindung eingerichtet ist.
Einstellungen
(1) Nur auf IS starten: Nur relevant für das Zusatzmodul Load Balancing, um das Profil auf einem bestimmten Node zu starten. Ist die Checkbox "Kann auf jedem Server getriggert werden" nicht gesetzt und ist zudem die Checkbox Profil darf nur in einer Instanz laufen nicht gesetzt und ist kein Wert in Feld Nur auf IS starten eingetragen, dann wird in einem Load-Balancing-System erzwungen, dass ein getriggerter Cronjob auf dem Working Node verbleibt, auf dem er getriggert wurde. Siehe auch Abschnitt Einstellungen für Profile (Load Balancing).
(2) Verbindungs-Alias: Der Kafka-Alias. Siehe Abschnitt Kafka-Verbindungen.
(3) Topic: Das Topic, von dem Nachrichten empfangen werden.
(4) Client-ID: Eine optionale ID eines Kafka-Consumers (in einer Consumer-Gruppe), die bei jeder Anforderung an einen Kafka-Broker übergeben wird.
(5) Send email notification for received tombstone message: Falls gesetzt, wird eine E-Mail gesendet, wenn ein Tombstone empfangen wird, ansonsten wird er übersprungen und ignoriert. Ein Tombstone hat immer einen Null-Payload, daher darf er nicht verarbeitet werden, da sonst das Mapping beim Parser einen Fehler erzeugt.
(6) Key Type: Der Datentyp des Schlüssels der Nachricht. Wichtiger Hinweis: Der Datentyp muss immer angegeben werden. Achten Sie darauf, dass Sie beim Senden und Empfangen immer übereinstimmende Typen verwenden. Wird z. B. eine Nachricht als Byte/String definiert und gesendet und dann als Integer/Byte gelesen, führt das zu einem Fehler und die Nachricht kann nicht gelesen werden. Lobster Integration als Consumer ist so lange blockiert, bis jemand diese falsche Nachricht vom Broker entfernt und verarbeitet so lange keine Nachrichten von diesem Topic!
(7) Data Type: Der Datentyp der Nachricht. Wichtiger Hinweis: Der Datentyp muss immer angegeben werden. Achten Sie darauf, dass Sie beim Senden und Empfangen immer übereinstimmende Typen verwenden. Wird z. B. eine Nachricht als Byte/String definiert und gesendet und dann als Integer/Byte gelesen, führt das zu einem Fehler und die Nachricht kann nicht gelesen werden. Lobster Integration als Consumer ist so lange blockiert, bis jemand diese falsche Nachricht vom Broker entfernt und verarbeitet so lange keine Nachrichten von diesem Topic! Wichtiger Hinweis: Wird der Datentyp AVRO verwendet, muss in (11) die Adresse zur Schema-Registry angegeben werden. Name: schema.registry.url, Wert: http://address:port
(8) Reset Timestamp: UTC-Timestamp. Damit kann eine Neuabholung aller Nachrichten, die jünger als der angegebene Zeitpunkt sind, ausgelöst werden. Der Parameter wird automatisch nach Abholung zurückgesetzt.
(9) Commit each record: Wenn gesetzt, dann wird jeder Record asynchron committed. Wenn nicht gesetzt, dann kann ein Intervall angegeben werden, nach wie vielen Records asynchron committed wird.
(10) Partition Assignment: Mit dieser Option ist es möglich, sich statisch an ausgewählte Partitionen zu hängen. Hierbei wird explizit keinerlei automatisches Rebalancing berücksichtigt.
(11) KafkaConsumer Properties: Über das Kontextmenü können weitere Consumer-Properties definiert werden. Hinweis: Die Property group.id definiert z. B. die Consumer-Gruppe und ist bei "subscribe" Pflicht. Lobster Integration erzeugt dieses Property pro Consumer als grp + <Hashcode des Topic> , wenn es fehlt.
(12) Nachrichten sammeln, Max. Wartezeit: Nachrichten werden in einem Buffer gesammelt. Das Profil wird mit den gesammelten Nachrichten gestartet, wenn entweder die maximale Anzahl an gesammelten Nachrichten erreicht ist oder die maximale Wartezeit. Wird das Profil gespeichert, werden Nachrichten im Buffer gelöscht. Hinweis: Es können hier auch manuell die gew ü nschten Sekunden eingegeben werden.
(13) Weiterzuleitender Wert: Wenn Nachrichten gesammelt werden, können Sie hier entscheiden, was zurückgegeben werden soll. Bei Alle müssen die einzelnen Nachrichten evtl. mit einem Trennzeichen verbunden werden.
(14) Zusätzliche (vorzeitige) Weiterleitung: Sind Bedingungen gesetzt und erfüllt (bei Funktionen muss die Funktionskette true ergeben), dann findet eine vorzeitige Weiterleitung statt.
System-Variablen und Header-Properties
Beim Empfang wird der Key einer Nachricht in der System-Variable MSG_CALL_KAFKA_KEY abgelegt (falls definiert) und steht damit im Mapping zu Verfügung. Header-Properties einer Nachricht können über System-Variablen der Form MSG_CALL_<KEYNAME> (alles großgeschrieben) ausgelesen werden.
Autoserialisierte Listen und Maps
Wenn Sie mit einem Profil mit Kafka-Antwortweg Nachrichten versenden (an einen Kafka-Sever), die von einem Profil mit Kafka-Eingangsagenten abgerufen werden, stehen autoserialisierte Listen und Maps (→ Präfix autoserialize_) automatisch im Profil mit dem Kafka-Eingangsagenten zur Verfügung, wenn im Profil mit dem Kafka-Antwortweg die Variable KAFKA_ADD_AUTOSERIALZE_DATA des Typs Boolean mit dem Wert true angelegt ist.
Beispiel:
Profil 1 hat einen Kafka-Antwortweg, die Variable KAFKA_ADD_AUTOSERIALZE_DATA=true und legt in Phase 3 die Map autoserialize_mymap an.
Profil 2 hat einen Kafka-Eingangsagenten, der die von Profil 1 erzeugte Nachricht abruft. In Phase 3 kann nun direkt auf die Map autoserialize_mymap zugefriffen werden.