博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka模拟生产-消费者以及自定义分区
阅读量:6066 次
发布时间:2019-06-20

本文共 4311 字,大约阅读时间需要 14 分钟。

hot3.png

基本概念

kafka中的重要角色

  broker:一台kafka服务器就是一个broker,一个集群可有多个broker,一个broker可以容纳多个topic
  topic:可以理解为一个消息队列的名字
  partition:分区,为了实现扩展性,一个topic可以分布到多个broker上,一个topic可以被分成多个partition,partition中的每条消息 都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体的顺序。也就是说,一个topic在集群中可以有多个partition 。kafka有Key Hash算法和Round Robin算法两种分区策略。
  producer:消息的生产者,是向kafka发消息的客户端
  consumer:消息消费者,向broker取消息的客户端
  offset:偏移量,用来记录consumer消费消息的位置
  Consumer Group:消费组,消息系统有两类,一是广播,二是订阅发布。

编码实现

  创建一个生产者

package sancen.kafkaimport java.util.Propertiesimport kafka.producer.{KeyedMessage, Producer, ProducerConfig}/**  * 类名  ProducerDemo  * 作者   彭三青  * 创建时间  2018-11-26 9:49  * 版本  1.0  * 描述: $ 实现一个生产者,把模拟数据发送到kafka集群  */object ProducerDemo {  def main(args: Array[String]): Unit = {    // 定义一个接收数据的topic    val topic = "test"    // 创建配置信息    val props = new Properties()    // 指定序列化类    props.put("serializer.class", "kafka.serializer.StringEncoder")    // 指定kafka列表    props.put("metadata.broker.list", "SC01:9092, SC01:9092, SC03:9092")    // 设置发送数据后的响应方式    props.put("request.required.acks", "0")    // 指定分区器    // props.put("partitioner.class", "kafka.producer.DefaultPartitioner    // 自定义分区器    props.put("partitioner.class", "day01.kafka.CustomPartitioner")    // 创建producer对象    val config: ProducerConfig = new ProducerConfig(props)    // 创建生产者对象    val producer: Producer[String, String] = new Producer(config)    // 模拟数据    for(i <- 1 to 10000){      val msg = s"$i : producer send data"      producer.send(new KeyedMessage[String, String](topic, msg)) //key偏移量,也可以给空 v实际的数据      Thread.sleep(500)    }  }}

  创建消费者

package sancen.kafkaimport java.util.Propertiesimport java.util.concurrent.{ExecutorService, Executors}import kafka.consumer._import scala.collection.mutable/**  * 类名  ConsumerDemo  * 作者   彭三青  * 创建时间  2018-11-26 10:08  * 版本  1.0  * 描述: $ 创建一个Consumer消费kafka的数据  */class ConsumerDemo(val consumer: String, val stream: KafkaStream[Array[Byte], Array[Byte]]) extends Runnable{  override def run(): Unit = {    val it: ConsumerIterator[Array[Byte], Array[Byte]] = stream.iterator()    while (it.hasNext()){      val data = it.next()      val topic = data.topic      val partition = data.partition      val offset = data.offset      val msg: String = new String(data.message())      println(s"Consumer:$consumer, topic:$topic, partiton:$partition, offset:$offset, msg:$msg")    }  }}object ConsumerDemo {  def main(args: Array[String]): Unit = {    // 定义获取的topic    val topic = "test"    // 定义一个map,用来存储多个topic key:topic名称,value:指定线程数用来获取topic的数据    val topics = new mutable.HashMap[String, Int]() // 要求就要传一个map,可以放一个或者多个topic    topics.put(topic, 2)    // 配置信息    val props = new Properties()    // 指定consumer组名    props.put("group.id", "group02")    // 指定zk列表    props.put("zookeeper.connect", "SC01:2181,SC02:2181,SC03:2181")    // 指定offset异常时需要获取的offset值    props.put("auto.offset.reset", "smallest")    // 创建配置信息    val config = new ConsumerConfig(props)    // 创建consumer对象    val consumer: ConsumerConnector = Consumer.create(config)    // 获取数据,返回的map类型中key:topic名称,value:topic对应的数据    val streams: collection.Map[String, List[KafkaStream[Array[Byte], Array[Byte]]]] = consumer.createMessageStreams(topics)    // 获取指定topic的数据    val stream: Option[List[KafkaStream[Array[Byte], Array[Byte]]]] = streams.get(topic)    // 创建固定大小的线程池    val pool: ExecutorService = Executors.newFixedThreadPool(3)     for(i <- 0 until stream.size){       pool.execute(new ConsumerDemo(s"Consumer:$i", stream.get(i)))     }  }}

  创建自定义分区类

package sancen.kafkaimport kafka.producer.Partitionerimport kafka.utils.VerifiablePropertiesimport org.apache.kafka.common.utils.Utils/**  * 类名  CustomPartitioner  * 作者   彭三青  * 创建时间  2018-11-26 20:29  * 版本  1.0  * 描述: $  */// 要实现自定义分区器必须要继承Partitionerclass CustomPartitioner(props: VerifiableProperties) extends Partitioner{  override def partition(key: Any, numPartitions: Int): Int = {    Utils.abs(key.hashCode) % numPartitions  }}

程序测试

  后台启动kafka集群

kafka-server-start.sh kafka_2.11-0.9.0.1/config/server.properties &

  在kafka集群上创建一个名为test的topic,指定分区为2,一般一个topic对应一个分区

kafka-topics.sh --create --zookeeper SC01:2181 --replication-factor 2 --partitions 2 --topic test

  分别运行ProducerDemo和ConsumerDemo则可以在ConsumerDemo端窗口打印出信息

在这里插入图片描述

转载于:https://my.oschina.net/u/3875806/blog/2962794

你可能感兴趣的文章
2017 Hackatari Codeathon B. 2Trees(深搜)(想法)
查看>>
单词统计
查看>>
输入一个数字计算圆的面积
查看>>
在Delphi中隐藏程序进程
查看>>
AngularJS PhoneCat代码分析
查看>>
MEF元数据应用说明
查看>>
maven错误解决:编码GBK的不可映射字符
查看>>
2016/4/19 反射
查看>>
SharePoint Wiki发布页面的“保存冲突”
查看>>
oracle 10g 数据库与客户端冲突导致实例创建无监听问题
查看>>
Delphi中读取文本文件的方法(实例一)
查看>>
Linux常用命令
查看>>
Android开源代码解读の使用TelephonyManager获取移动网络信息
查看>>
想说一点东西。。。。
查看>>
css知多少(8)——float上篇
查看>>
NLB网路负载均衡管理器详解
查看>>
水平添加滚动条
查看>>
PHP中”单例模式“实例讲解
查看>>
VS2008查看dll导出函数
查看>>
VM EBS R12迁移,启动APTier . AutoConfig错误
查看>>