Конфигурация Apache Kafka за NAT/LB

Чтобы сделать наш кластер Kafka доступным как из Интернета, так и из нашей частной сети, мы решили настроить Kafka следующим образом:

Private VIP:9000 => All brokers:9092 (topology query only)
Private VIP:9001 => Broker #1:9092
Private VIP:9002 => Broker #2:9092
...
Public VIP:9000 => All brokers:9092 (topology query only)
Public VIP:9001 => Broker #1:9092
Public VIP:9002 => Broker #2:9092
...

Таким образом мы настроили наш балансировщик нагрузки, а затем наши брокеры:

listeners=PLAINTEXT://<server_priv_ip>:9092
advertised.listeners=INTERNAL://<private_VIP>:9001,EXTERNAL://<public_vip>:9001
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
inter.broker.listener.name=INTERNAL

Теперь, как и следовало ожидать, брокеры Kafka не запускаются:

ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.lang.IllegalArgumentException: requirement failed: Each listener must have a different port, listeners: INTERNAL://<private_VIP>:9001,EXTERNAL://<public_vip>:9001

С моей точки зрения администратора, мой подход был совершенно логичным, хотя я ожидал, что производитель / потребители получат оба адреса, где бы они ни связывались с брокером. Использование одного и того же порта на 2 разных адресах кажется логичным и способствует ясности...

Первый вопрос: почему это не так?
Второй вопрос: как мне достичь своей цели, если не так? (большинство вариантов на столе)

2 ответа

Решение

Мы поговорили с экспертом Кафки, и вот что из этого получилось.

Проще говоря, хотя kafka знает имена слушателей, чтобы различать 2 слушателя, ему нужно знать, к какому клиенту, который подключается, он хочет подключиться и может сделать это только через входящий порт.

Также слушатели и рекламируемые слушатели отображаются друг на друга, если у них одинаковый порт, поэтому вам нужно, чтобы они совпадали.

В итоге вот что мы сделали:

listeners=INTERNAL://hostname:900N,EXTERNAL://hostname:910N,REPLICATION:hostname:9092
advertised.listeners==INTERNAL://vip:900N,EXTERNAL://vip:910N,REPLICATION:hostname:9092  
listeners.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,REPLICATION:PLAINTEXT
inter.listener.protocol=REPLICATION

Где "N" - идентификатор брокера (считая от 1 форвардов)

На Балансировщике нагрузки мы сопоставляем каждый порт VIP с IP-адресом брокера, а также с виртуальным IP-адресом на портах 9000(внутренняя сеть) и 9100 (внешняя сеть), которые сопоставляются с соответствующим прослушивателем каждого брокера.

Это немного излишне, но работает как положено (до тех пор, пока такие темы, как внутренние метаданные kafka, такие как __consumer_offset тиражируются)

Есть один сценарий, который недостижим в Кафке.

  1. В кластере настроены три сервера Kafka. Серверы Kafka имеют частные IP-адреса и доступны только из собственной VPN, скажем, KAFKA VPN.
  2. Существует один общедоступный IP-адрес, который доступен, когда компьютер подключен к KAFKA VPN + Office Network. Этот общедоступный IP-адрес используется только для NAT частных IP-адресов, чтобы люди могли получить к нему доступ через Интернет из сети Office.
  3. В Office Network есть один потребитель. Теперь этот потребитель может получать темы из Kafka, используя общедоступный IP-адрес, когда он подключен к KAFKA VPN.

Теперь я хочу подключить общедоступный IP-адрес, отключив KAFKA VPN. Это возможно ? Сеть работает нормально. Я проверил трассировку маршрута и обратную трассировку маршрута.

Я думаю, что есть какая-то проблема с конфигурацией производителя/сервера Kafka/потребителя.

Конфигурация NAT:: 10.XX.XX.XX:9092 -> AA.XX.XX.XX:9095 10.XX.XX.XY:9092 -> AA.XX.XX.XX:9093 10.XX.XX. XZ:9092 -> AA.XX.XX.XX:9094

Конфигурация производителя:: bootstrap-servers=10.XX.XX.XX:9092,10.XX.XX.XY:9092,10.XX.XX.XZ:9092

Конфигурация Kafka:: Listeners=SASL_SSL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9095 Advertised.listeners=SASL_SSL://10.XX.XX.XX:9092,EXTERNAL://AA.XX.XX.XX:9095 прослушиватель.security.protocol.map=SASL_SSL:SASL_SSL,EXTERNAL:SASL_SSL

Потребительская конфигурация:: sh kafka-console-consumer.sh --bootstrap-server AA.XX.XX.XX:9093,AA.XX.XX.XX:9094,AA.XX.XX.XX:9095 --topic test --consumer.config потребительские.свойства

Ошибка, которую я получаю, когда не подключен к KAFKA VPN::[2022-03-02 17:35:58,236] WARN [Consumer clientId=consumer-test_group-1, groupId=test_group] Соединение с узлом 2147483646 (10.XX) .XX.XX/10.XX.XX.XX:9092) не удалось установить. Брокер может быть недоступен. (org.apache.kafka.clients.NetworkClient)[2022-03-02 17:36:26,421] WARN [Consumer clientId=consumer-test_group-1, groupId=test_group] Соединение с узлом 2 (10.XX.XX.XY/10.XX.XX.XY:9092) не удалось установить. Брокер может быть недоступен. (org.apache.kafka.clients.NetworkClient)[2022-03-02 17:36:47,467] WARN [Consumer clientId=consumer-test_group-1, groupId=test_group] Соединение с узлом 3 (10.XX.XX.XZ/10.XX.XX.XZ:9092) не удалось установить. Брокер может быть недоступен. (org.apache.kafka.clients.NetworkClient

Теперь некоторые из вопросов::

  1. Даже если я использую общедоступный IP-адрес для подключения к Kafka, почему в журналах отображается частный IP-адрес?
  2. Должны ли серверы начальной загрузки у производителя и серверы начальной загрузки у потребителя быть абсолютно одинаковыми?
  3. Как я могу проверить ответ на запрос метаданных. Есть ли у Кафки какая-нибудь программа для проверки? Как и в случае с kafka-console-consumer.
  4. Как мне это исправить?
  • Рассмотрите свойства SASL и SSL и наличие сертификата.

Было бы здорово, если бы кто-нибудь помог мне в этом

Спасибо, Милан К.

Другие вопросы по тегам