博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark Streaming源码解读之No Receivers
阅读量:7075 次
发布时间:2019-06-28

本文共 1915 字,大约阅读时间需要 6 分钟。

hot3.png

有提到过Spark Streaming事务是如何保证exactly once的语义的。

从spark core程序来讲,读取固定数据来源比如hdfs中,spark只是做为一个计算框架。

而在流处理中,只是多了一个时间维度。

若在某一时刻,知道所需处理数据的来源,直接读取,而不用被动的接收(Receiver),那就是和普通的Spark 程序没什么差别了。

本文将着重Kafka中direct方式的读取,以切入,跟踪源码分析。

入口是KafkaUtils,先创建了一个回调函数定义,再获取到kafka集群,并获取到起始偏移量,最后创建一个DirectKafkaInputDStream,用于创建RDD。

// KafkaUtils.scala line 473  def createDirectStream[    K: ClassTag,    V: ClassTag,    KD <: Decoder[K]: ClassTag,    VD <: Decoder[V]: ClassTag] (      ssc: StreamingContext,      kafkaParams: Map[String, String],      topics: Set[String]  ): InputDStream[(K, V)] = {    val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)    val kc = new KafkaCluster(kafkaParams)    val fromOffsets = getFromOffsets(kc, kafkaParams, topics)    new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](      ssc, kafkaParams, fromOffsets, messageHandler)  }

KafkaCluster 在实例化时没有任何动作,只是单纯的创建对象。

接下来获取每个partition的偏移量,064013_12R3_120395.png

  1. 先获取设置的偏移量配置
  2. 获取分区,使用的是Either,即要么left要么right。此处left为抛出一个异常,right为返回TopicAndPartition Set。scala语法博大精深。064434_3pPW_120395.png064724_9v3Y_120395.png,后面的大量用到了Either。业务逻辑没什么复杂。
  3. 偏移量默认是最大的。065034_K1PZ_120395.png

DStream创建之后,整个DAG回溯的lineage如下:

DirectKafkaInputDStream -> MappedDStream > FlatMappedDStream -> MappedDStream -> ShuffledDStream -> ForEachDStream

当DAG回溯到DirectKafkaInputDStream时,会调用compute。创建KafkaRDD,并且将最新的偏移量保存,以便下次计算新的偏移量。

144945_5MVg_120395.png

从当RDD进入计算时,会调用compute。151114_V7rf_120395.png,此处的offsetRanges就是Kafka的TopicAndPartition和对应的偏移量。最后结果就是kafka有多少个partition,spark就会有多少个partition与之对应。

142347_vqAX_120395.png

 

直接抓Kafka数据的方式与Receiver的方式的对比:

  1. 实现方式
    1. Direct方式只是定时生产RDD,通过RDD 回溯至最早的KafkaRDD来获取数据,是很自然的做法
    2. Receiver是以RDD的形式封装Receiver,在Worker中启动后,未到时的数据存在内存队列中,定时触发来收割数据,放入Spark中
  2. 数据可靠性
    1. Direct数据存放在外部Kafka中,kafka自带副本。无需spark做冗余
    2. Receiver需要WAL来预防数据丢失,但是因为wal的批处理的特性,还是有可能丢失数据。
  3. 负载均衡
    1. Direct以RDD的形式,每个Duration产生的RDD都会在执行时动态最优。
    2. Receiver 与 Worker绑定,无法动态调整。
  4. 一致性保证
    1. direct借助kafka的ack,失败的消息会自动重试。
    2. 借助wal实现一致性。
  5. 对资源的合理使用
    1. direct方式数据都存在kafka,没冗余,没wal,
    2. receiver未收割的数据存在内存queue中,必要时要开启wal,至少多了1分副本。

 

转载于:https://my.oschina.net/corleone/blog/684847

你可能感兴趣的文章
【02】创建型-工厂方法
查看>>
Vue-cli(四) Vue文件
查看>>
NSA用OpenFlow,间谍机构的SDN轰趴
查看>>
iOS 网络--图片库本地选取
查看>>
GVIM中文乱码问题(文本及菜单乱码)
查看>>
zabbix_agentd 服务启动
查看>>
Yii2 如何关闭debug
查看>>
Oozie Bundle 规范
查看>>
VMWare下虚拟机NAT共享方式上网的配置说明
查看>>
NAT另类使用方式
查看>>
http之缓存的实现原理
查看>>
开启归档并更新归档目录
查看>>
Mac技巧之用键盘利器 Alfred 直接搜索 iTunes Store 或 Mac App Store 应用商店的方法
查看>>
C/MFC如何获得应用程序当前路径(整理)
查看>>
ES5新特性
查看>>
CentOS Mahout部署
查看>>
很简单,但很实用:数组键值的用途
查看>>
libstdc++.so.6: version `GLIBCXX_3.4.21'
查看>>
Ionic2构建iOS应用上传总结
查看>>
TIMESTAMP with ****问题连不上mysql
查看>>