
本文旨在解决在使用 Docker Compose 部署 Kafka 集群时,应用程序无法向 Kafka 主题发送消息的问题。我们将分析常见的配置错误,并提供修改建议,确保生产者能够正确连接到 Kafka Broker,从而成功发送消息。通过调整 Kafka 的监听器配置以及生产者端的 Broker 地址,可以有效解决此类连接问题。
问题分析
在使用 Docker Compose 部署 Kafka 集群时,生产者无法发送消息到 Kafka 主题,并出现类似 Topic general-events not present in metadata after 60000 ms 的错误,通常是由于以下原因导致:
Kafka Broker 地址配置错误: 生产者配置的 Broker 地址与 Kafka 实际监听的地址不匹配。网络配置问题: 容器之间的网络连接存在问题,导致生产者无法访问 Kafka Broker。Kafka 监听器配置不正确: Kafka 的监听器配置可能不允许从容器外部或特定的网络访问。
解决方案
针对上述问题,可以采取以下步骤进行排查和解决:
1. 检查 Kafka 监听器配置
Kafka 的 KAFKA_ADVERTISED_LISTENERS 环境变量至关重要,它决定了 Kafka Broker 如何向客户端公布自己的地址。确保该配置正确反映了客户端访问 Kafka 的方式。
在 docker-compose.yml 文件中,检查 Kafka 服务的 environment 部分:
kafka: image: confluentinc/cp-kafka:6.1.1 container_name: kafka depends_on: - zookeeper ports: - '9092:9092' expose: - '29092' environment: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1' KAFKA_MIN_INSYNC_REPLICAS: '1'
PLAINTEXT://kafka:29092:允许容器内部的其他服务通过 kafka 这个 hostname 和 29092 端口访问 Kafka Broker。PLAINTEXT_HOST://localhost:9092:允许宿主机通过 localhost 和 9092 端口访问 Kafka Broker。
注意事项:
如果你的生产者运行在 Docker 容器之外,确保 PLAINTEXT_HOST 配置正确,并且宿主机能够访问 Kafka Broker。如果你的生产者运行在 Docker 容器内部,但与 Kafka Broker 不在同一个 Docker 网络中,你需要配置一个可以从生产者容器访问的地址。
2. 修改生产者 Broker 地址
生产者需要使用正确的 Broker 地址才能成功连接到 Kafka 集群。如果 Kafka Broker 的地址发生变化,或者生产者使用了错误的地址,就会导致连接失败。
在生产者代码中,检查 bootstrap.servers 配置项。确保它指向正确的 Kafka Broker 地址。
import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import java.util.Properties;import java.util.logging.Level;import java.util.logging.Logger;public class Producer implements Runnable { private static final Logger LOGGER = Logger.getLogger(Producer.class.getName()); private static final String TOPIC_NAME = "general-events"; private KafkaProducer kafkaProducer = null; private final String KAFKA_CLUSTER_ENV_VAR_NAME = "KAFKA_CLUSTER"; public Producer() { LOGGER.log(Level.INFO, "Kafka Producer running in thread {0}", Thread.currentThread().getName()); Properties kafkaProps = new Properties(); // 使用环境变量或默认值配置 Kafka Broker 地址 String defaultClusterValue = "kafka:29092"; // 修改为 kafka:29092,容器内部访问 String kafkaCluster = System.getProperty(KAFKA_CLUSTER_ENV_VAR_NAME, defaultClusterValue); LOGGER.log(Level.INFO, "Kafka cluster {0}", kafkaCluster); kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster); kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.LongSerializer"); kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); kafkaProps.put(ProducerConfig.ACKS_CONFIG, "0"); this.kafkaProducer = new KafkaProducer(kafkaProps); }}
注意事项:
如果生产者运行在 Docker 容器内部,应该使用 Kafka 容器的 hostname (kafka) 和内部端口 (29092)。如果生产者运行在 Docker 容器外部,应该使用 localhost 和映射到宿主机的端口 (9092)。
3. 检查容器网络
确保生产者容器和 Kafka 容器在同一个 Docker 网络中。可以使用 docker network inspect 命令来检查容器的网络配置。
如果生产者容器和 Kafka 容器不在同一个网络中,可以使用 docker network connect 命令将生产者容器连接到 Kafka 容器所在的网络。
4. 验证 Topic 创建
虽然 init-kafka 服务创建了 topic,但仍建议再次确认 topic 是否成功创建。可以在 Kafka 容器内部执行以下命令:
docker exec -it kafka bashkafka-topics --bootstrap-server kafka:29092 --list
确认列表中包含 general-events topic。
总结
解决 Docker Compose 中 Kafka 消息发送失败的问题,关键在于确保 Kafka 的监听器配置正确,生产者使用正确的 Broker 地址,以及容器之间的网络连接正常。通过仔细检查这些配置,可以有效地解决此类连接问题,确保应用程序能够成功地向 Kafka 主题发送消息。
以上就是解决 Docker Compose 中 Kafka 消息发送失败问题的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/84041.html
微信扫一扫
支付宝扫一扫