Kafka (Input Agent)

Introduction: You can find a description of this phase in section Phase 1 (Introduction).

images/download/thumbnails/135168520/image2020-3-17_15-35-16-version-1-modificationdate-1680150602986-api-v2.png

images/download/attachments/135168520/1178-version-2-modificationdate-1680152919809-api-v2.png images/download/attachments/135168520/1179-version-1-modificationdate-1680150602957-api-v2.png images/download/attachments/135168520/1180-version-1-modificationdate-1680150602951-api-v2.png


Note: See also section File Names, File Patterns, Paths, System Constants and Variables.

(1) Only relevant for the add-on module Load Balancing to start the profile on a specific node. If checkbox Can be triggered on any server i s not set and the checkbox Profile may only run in one instance is also not set and no value is set in field Start on IS only, a triggered cronjob in a load balancing system is forced to remain on the Working Node on which it was triggered. See also section Settings for Profiles (Load Balancing).

(2) The Kafka alias. See section Kafka Connections.

(3) The topic from which messages are to be received.

(4) An optional identifier of a Kafka consumer (in a consumer group) that is passed to a Kafka broker with every request.

(5) If set, an email is sent when a tombstone is received, otherwise it is skipped and ignored. A tombstone always has a zero payload, so it must not be processed, otherwise the mapping will generate an error during parsing.

(6) The data type of the key of the message. Important note: The data type must always be specified. Make sure that you always use matching types when sending and receiving. If, for example, a message is defined and sent as Byte/String and then read as Integer/Byte, this leads to an error and the message cannot be read. Lobster_data as a consumer is blocked until someone removes this erroneous message from the broker and it does not process any messages from this topic!

(7) The data type of the message. Important note: The data type must always be specified. Make sure that you always use matching types when sending and receiving. If, for example, a message is defined and sent as Byte/String and then read as Integer/Byte, this leads to an error and the message cannot be read. Lobster_data as a consumer is blocked until someone removes this erroneous message from the broker and it does not process any messages from this topic! Important note: If the data type AVRO is used, the address to the schema registry must be specified in (11). Name: schema.registry.url, Value: http://address:port

(8) UTC timestamp. This can be used to trigger a new pickup of all messages that are younger than the specified time. The parameter is automatically reset after retrieval.

(9) If set, each record is committed (asynchronously). If not set, you can specify after how many records the commitment takes place.

(10) With this option it is possible to attach statically to selected partitions. Rebalancing is explicitly not considered.

(11) Additional consumer properties can be defined via the context menu. Note: The property group.id , for example, defines the consumer group and is mandatory for 'subscribe'. Lobster_data creates this property per consumer as grp + <hash code of topic> if it is not explicitly specified.

(12) and (13) Messages are collected in a buffer. The profile is started with the collected messages when either the maximum number of collected messages is reached or the maximum waiting time. If the profile is saved, messages in the buffer are deleted. Note: The desired seconds can also be entered manually.

(14) When messages are collected, you can decide here what should be returned. If All is selected, the individual messages may have to be connected with a delimiter character.

(15) If conditions are set and met (for functions, the function chain must return true), premature forwarding takes place.

System Variables and Header Properties


When receiving a message, the key of a message is stored in the system variable MSG_CALL_KAFKA_KEY (if defined) and is therefore available in the mapping. Header properties of a message can be read out using system variables of the form MSG_CALL_<KEYNAME> (everything in uppercase).

Autoserialized Lists and Maps


If you use a profile with a Kafka Response Route to send messages (to a Kafka server) that are retrieved by a profile with a Kafka Input Agent, autoserialized lists and maps (→ prefix autoserialize_) are automatically available in the profile with the Kafka Input Agent if the variable KAFKA_ADD_AUTOSERIALZE_DATA of type Boolean with the value true is created in the profile with the Kafka Response Route.

Example:

Profile 1 has a Kafka Response Route, a variable KAFKA_ADD_AUTOSERIALZE_DATA=true and creates the map autoserialize_mymap in phase 3.

Profile 2 has a Kafka Input Agent that retrieves the message sent by profile 1. In phase 3, the map autoserialize_mymap can be accessed directly.