Kafka (Input Agent)
Important note: This Input Agent can only be selected if at least one Kafka connection has been set up.
Settings
(1) Start on IS only: Only relevant for the add-on module Load Balancing to start the profile on a specific node. If checkbox Can be triggered on any server i s not set and the checkbox Profile may only run in one instance is also not set and no value is set in field Start on IS only, a triggered cronjob in a load balancing system is forced to remain on the Working Node on which it was triggered. See also section Settings for profiles (Load Balancing).
(2) Connection alias: The Kafka alias. See section Kafka connections.
(3) Topic: The topic from which messages are to be received.
(4) Client ID: An optional identifier of a Kafka consumer (in a consumer group) that is passed to a Kafka broker with every request.
(5) Send email notification for received tombstone message: If set, an email is sent when a tombstone is received, otherwise it is skipped and ignored. A tombstone always has a zero payload, so it must not be processed, otherwise the mapping will generate an error during parsing.
(6) Key Type: The data type of the key of the message. Important note: The data type must always be specified. Make sure that you always use matching types when sending and receiving. If, for example, a message is defined and sent as Byte/String and then read as Integer/Byte, this leads to an error and the message cannot be read. Lobster Integration as a consumer is blocked until someone removes this erroneous message from the broker and it does not process any messages from this topic!
(7) Data Type: The data type of the message. Important note: The data type must always be specified. Make sure that you always use matching types when sending and receiving. If, for example, a message is defined and sent as Byte/String and then read as Integer/Byte, this leads to an error and the message cannot be read. Lobster Integration as a consumer is blocked until someone removes this erroneous message from the broker and it does not process any messages from this topic! Important note: If the data type AVRO is used, the address to the schema registry must be specified in (11). Name: schema.registry.url, Value: http://address:port
(8) Reset Timestamp: UTC timestamp. This can be used to trigger a new pickup of all messages that are younger than the specified time. The parameter is automatically reset after retrieval.
(9) Commit each record: If set, each record is committed (asynchronously). If not set, you can specify after how many records the commitment takes place.
(10) Partition Assignment: With this option it is possible to attach statically to selected partitions. Rebalancing is explicitly not considered.
(11) KafkaConsumer Properties: Additional consumer properties can be defined via the context menu. Note: The property group.id , for example, defines the consumer group and is mandatory for 'subscribe'. Lobster Integration creates this property per consumer as grp + <hash code of topic> if it is not explicitly specified.
(12) Buffer messages, Max. wait time: Messages are collected in a buffer. The profile is started with the collected messages when either the maximum number of collected messages is reached or the maximum waiting time. If the profile is saved, messages in the buffer are deleted. Note: The desired seconds can also be entered manually.
(13) Value to forward: When messages are collected, you can decide here what should be returned. If "All" is selected, the individual messages may have to be connected with a delimiter character.
(14) Additional (premature) case to forward: If conditions are set and met (for functions, the function chain must return "true"), premature forwarding takes place.
System variables and header properties
When receiving a message, the key of a message is stored in the system variable MSG_CALL_KAFKA_KEY (if defined) and is therefore available in the mapping. Header properties of a message can be read out using system variables of the form "MSG_CALL_<KEYNAME>" (everything in uppercase).
Autoserialized lists and maps
If you use a profile with a "Kafka" Response to send messages (to a Kafka server) that are retrieved by a profile with a Kafka Input Agent, autoserialized lists and maps (→ prefix autoserialize_) are automatically available in the profile with the Kafka Input Agent if the variable KAFKA_ADD_AUTOSERIALZE_DATA of type Boolean with the value true is created in the profile with the "Kafka" Response.
Example:
Profile 1 has a "Kafka" Response, a variable KAFKA_ADD_AUTOSERIALZE_DATA=true and creates the map autoserialize_mymap in phase 3.
Profile 2 has a "Kafka" Input Agent that retrieves the message sent by profile 1. In phase 3, the map autoserialize_mymap can be accessed directly.