Java开发中使用Kafka实现客户端程序的方法
来源:爱站网时间:2020-07-25编辑:网友分享
Kafka作为流数据平台,为开发者提供了三个客户端,这篇文章主要是利用Kafka的API创建一个生产者和消费者,操作是很简单的,今天爱站技术频道小编就为大家带来了Java开发中使用Kafka实现客户端程序的方法,看到最后大家都会感到很惊讶。
Kafka作为流数据平台,为开发者提供了三个客户端,这篇文章主要是利用Kafka的API创建一个生产者和消费者,操作是很简单的,今天爱站技术频道小编就为大家带来了Java开发中使用Kafka实现客户端程序的方法,看到最后大家都会感到很惊讶。
一、创建配置类Config
这个类很简单,只是存放了两个常量,一个是话题TOPIC,一个是线程数THREADS
package com.lya.kafka; /** * 配置项 * @author liuyazhuang * */ public class Config { /** * 话题 */ public static final String TOPIC = "wordcount"; /** * 线程数 */ public static final Integer THREADS = 1; }
二、编程生产者类ProducerDemo
这个类的主要作用就是向Kafka写入相应的消息,并且将消息写入wordcount话题。
package com.lya.kafka; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * 生产者实例 * @author liuyazhuang * */ public class ProducerDemo { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("zk.connect", "192.168.209.121:2181"); props.put("metadata.broker.list","192.168.209.121:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("zk.connectiontimeout.ms", "15000"); ProducerConfig config = new ProducerConfig(props); Producerproducer = new Producer (config); // 发送业务消息 // 读取文件 读取内存数据库 读socket端口 for (int i = 1; i (Config.TOPIC, "this number ===>>> " + i)); } } }
三、编写消息者类ConsumerDemo
这个类的主要作用就是消费Kafka中wordcount话题的消息。
package com.lya.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; /** * 消费者实例 * @author liuyazhuang * */ public class ConsumerDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("zookeeper.connect", "192.168.209.121:2181"); props.put("group.id", "1111"); props.put("auto.offset.reset", "smallest"); props.put("zk.connectiontimeout.ms", "15000"); ConsumerConfig config = new ConsumerConfig(props); ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config); MaptopicCountMap = new HashMap (); topicCountMap.put(Config.TOPIC, Config.THREADS); Map >> consumerMap = consumer.createMessageStreams(topicCountMap); List > streams = consumerMap.get(Config.TOPIC); for(final KafkaStream kafkaStream : streams){ new Thread(new Runnable() { @Override public void run() { for(MessageAndMetadata mm : kafkaStream){ String msg = new String(mm.message()); System.out.println(msg); } } }).start(); } } }
四、运行实例
首先,运行消费者类ConsumerDemo
运行结果如下:
没有打印任何信息。
此时,我们运行生产者类ProducerDemo
我们再次打开消费者的控制台查看如下:
打印出了生产者生产的消息。
至此,Kafka简单客户端编程实例结束。
Java开发中使用Kafka实现客户端程序的方法,今天就全部介绍到这里了,爱站技术频道小编认为,任何一个程序都有它的优点和缺点,在选择的时候一定要根据自己的需求去考虑,不要听信任何的谣言,只有合适自己的才是最好的。
下一篇:实现Java单个模式面板切换