请选择 进入手机版 | 继续访问电脑版

热点推荐

    查看: 51|回复: 1

    spark streaming总结

    [复制链接]
  • TA的每日心情
    开心
    昨天 00:08
  • 签到天数: 362 天

    [LV.8]以坛为家I

    5万

    主题

    5万

    帖子

    16万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    160408
    发表于 2020-12-10 07:46:50 | 显示全部楼层 |阅读模式
    1. 概述3 i' {6 _8 s/ _: h7 v0 w# D8 W

    % ]' L2 A( H$ l- U/ u$ ^对spark streaming的学习总结,学习资料酷玩 Spark- X7 ]- A1 b: O8 H
    目录:: S$ y: I" i% v: z' W' u

      $ G, v  E! m6 ~2 P" d
    • 概述/ Q9 k9 ]% |+ J8 q" d$ }% t
    • 实现原理
      / K  d! ^1 d7 I' |/ F! z
    • window操作& z, ?  J: j* e* x4 A' S
    2. 实现原理
    . X) O: J& ~; `+ r- ]2 ^1 {5 C
    2 u7 d+ K5 y/ Q
    # d0 }% @( \0 q; _; W. A

    " p* F9 ^$ w9 ?2 l& y" I( z如图,spark-streaming的入口是StreamingContext,spark-streaming基于spark core,分成了四个部分:
    * v& v6 [9 N) S" g
    1. DAG静态定义:从outputStreams倒推来形成整个RDD DAG8 t* G- T# b8 y
    2. JOB动态生成:根据DAG静态定义数据来生成可执行的JOB
    ' G; D* g+ Z# V" J3. 数据产生与导入:现在有两种模式,一种是receiver-based,一种是direct approach
    ) H3 q2 a# n! X, F: _/ H! `1 {4. 长时容错:由于spark streaming程序需要长时间运行,为了实现一些语义,如exactly-once、as least once,需要容错。spark streaming通过Checkpoint和WAL来容错
    , w$ ^' H4 K" A" G' `) L6 [
    下面来分别详述
    ) p3 d  [5 Y! a3 A6 q; p& {2.1 DAG静态定义
    3 M) p6 g! B- B$ a' S- q% Q: |$ I" L; I
      . v) A- a$ }  U  s$ j
    • spark streaming的运算逻辑由DStreamGraph来表达,它由DStream组成,DStream底层是持续产生数据的RDD。DStream根据输入与输出分为3种:- a  [% O( Y9 @2 Z' g( U' ]" G9 }
      3 V8 u, K; J" K: t& S
    • InputDStream:不是由其它的DStream转换过来的,表示数据的源头5 r3 a% L. o* Z0 H5 ^& q; I2 z' Q
    • outputDStream: 不能转换为新的DStream,只有一个,为ForeachDStream
      ) H  L8 K0 O# k) I+ d+ _
    • 其它:由其它的DStream转换过来,也能转换为新的DStream2 Z3 P+ ?9 O, B3 s, k1 {4 s
      ; k- V& o6 t( d8 P5 V' }/ q6 r
    • DAG静态定义由DStreamGraph实现,通过它能持续的生成RDD DAG模板;所有的InputDStream在构造时都会被加入到DStreamGraph的成员inputStreams:ArrayBuffer[InputDStream[_]]
      + P+ O# P) B$ f' j) V
    // from InputDStreamssc.graph.addInputStream(this)

      * j- O7 W' n+ U2 m9 s6 o( U( \
    • outputDStream只有一种,即ForeachDStream,在被构造完后会被注册到DStreamGraph的成员outputStreams:ArrayBuffer[InputDStream[_]]4 {1 O2 O7 S! P: ], N! B. O# q
    // from StreamingContextprivate def foreachRDD(      foreachFunc: (RDD[T], Time) => Unit,      displayInnerRDDOps: Boolean): Unit = {    new ForEachDStream(this,      context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()  }private[streaming] def register(): DStream[T] = {    ssc.graph.addOutputStream(this)    this  }2.2 JOB动态生成* R, t, l# q5 h  g" r: U$ M1 ]
    ! @$ i9 i8 @6 G3 [
    ## StreamingContext的代码def this(conf: SparkConf, batchDuration: Duration) = {    this(StreamingContext.createNewSparkContext(conf), null, batchDuration)  }在构造StreamingContext时,会指定batchDuration,它指定了分配数据的周期,在DStream中有个slideDuration属性(通常都是batchDuration,见InputDStream),它指定了JOB生成的周期;JOB动态生成由JobGenerator实现,它在构造时会构造一个定时器RecurringTimer,他会按照规定的周期batchDuration来发送GenerateJobs事件,来异步的调用JobGenerator生成Job;生成Job的步骤如下:& q. b1 a) r6 ]  \( d5 D* \
      + W, U, i& D4 j1 u( c) N
    • 分配数据:只针对于receiver-based的流,会通过ReceiverTracker来分配这个批次需要计算的数据;对于direct approach的流,数据在第2步通过它的compute方法获取3 s& z2 ^0 e, i5 x
    • 构建DAG:通过DStreamGraph的outputDStreams来反推构造出RDD DAG,数据由InputDStream来获取。对于一个批次,一个outputDStream对应一个Job,outputDStreams则对应由Job组成的JobSet。需要指出的是,这里的Job不是core里面的job,它实际相当于一个Runnable% d$ t) K" ^9 s& D  o1 T
      3.提交JOB:JobScheduler在线程池中遍历执行JobSet中的 Job,线程池的大小为spark.streaming.concurrentJobs(默认为1)。当系统处理数据的速率小于数据产生的速率时,则Job会放入到线程池的队列中,这样就产生了调度延迟,影响系统的稳定性. i8 e$ t/ _# l- Z2 E$ M" N
      4.执行spark任务:接着就是SparkContext来调度执行Job里的RDD DAG了
      ) j( M$ }6 W0 ~/ \5 h5 m; C. [
    2.3 数据产生与导入
    1 z+ R7 Z: u- }! A8 L- d& A& [  J/ V: h5 D
    % h9 U9 S* d& _: j) l, S这里只讲receiver-based的流,对于direct approach的流则需要查阅对应的InputDStream的compute方法了。
    ' {9 }0 `, b, v$ F/ I2 Freceiver-based的流通过ReceiverInputDStream来抽象,每一个ReceiverInputDStream的子类都有一个Receiver的实现,用来获取外部数据。# ]# [# u" P: X4 e
    2.3.1 Receiver的启动
    7 n0 t2 x; u( D. w% J+ ^( _* f# Z+ n2 b/ V$ Q. C) {# [3 C. ?

      ( I/ z# Q& @% x8 I3 L$ x
    • ReceiverTracker在被构造时,会通过DStreamGraph获取所有的ReceiverInputDStream,! ?$ B9 M, S) o& d. l$ W, R
    • 在ReceiverTracker被调用start方法时被启动,ReceiverInputDStream集合中的DStream则会获取自己的Receiver实列( C, ]6 u% V; z+ p: N
    • ReceiverTracker根据调度策略ReceiverSchedulingPolicy来获取Receiver执行的偏好位置,总的来说,ReceiverSchedulingPolicy采用轮询的方式将Receiver均匀的分布到Executor上执行' _; H( r# H5 h1 T* g; W
    • Receiver被封装到了ReceiverSupervisorImpl中被调用,这样Receiver专注于接收数据,ReceiverSupervisorImpl专注于存储数据
      0 e" t7 a) ~5 `  Q: B. `7 \: y
    • 每一个Receiver与它的偏好位置被构造成RDD,通过SparkContext来执行;也就是说每一个Receiver对应一个spark Job。为什么是job,而不是一个task,假设为task,且有多个receiver,一个reciever出错会导致整个任务失败,其它的也出错
      $ U7 e& X' T2 Y+ g' c8 k+ }0 R
    2.3.2 Receiver的执行2 L& f  `  t. _5 D, L0 O6 n

    0 V7 A, @4 s" l3 c$ h4 J7 `
    Receiver执行并获取数据的流程如上图,这里需要提醒读者2点:
    1 L& X8 _. t" N+ f. y% K

      * J' ?: A& D! h2 m
    • 在我们实现自己的Receiver时,onStart方法不能阻塞,里面的逻辑要在另一个线程里面执行,参考SocketReceiver
      : p! ]& N: n! t
    • 在我们实现自己的Receiver[T]时,如果需要backpressure,在调用ReceiverSupervisorImpl的store方法来存储数据时,要使用store(dataItem: T)
      ; w1 A4 v- L9 p8 U: Y
    • 当设置了spark.executor.cores与spark.task.cpus的值相同时,构造ReceiverInputDStream时的StorageLevel的副本数最好是2,以免在计算任务本地性时,没有PROCESS_LOCAL,增加额外的网络负载. K+ B) C6 B" u6 j
    • BlockGenerator每隔spark.streaming.blockInterval(默认200ms)来提交一次数据
      $ O! z$ E5 H$ _7 E' W0 I
    2.3.3 数据的导入
    0 H7 U; p, L' o/ g$ D5 d1 z2 B2 e! B9 h: R- K8 z/ Z
    在Receiver存储数据时,最终都需要调用pushAndReportBlock
    : w! v% V3 E9 Q: R  @ 0 && processingDelay > 0) {      // 本批次JobSet执行完成时间 - 上个批次JobSet执行完成时间,单位为s        val delaySinceUpdate = (time - latestTime).toDouble / 1000        // 在本批次任务执行的时间内,处理记录的速率(elements/second)        val processingRate = numElements.toDouble / processingDelay * 1000        // 以上个批次处理记录的速率为基准,获取速率误差        val error = latestRate - processingRate        //这是一个积分校正量,从理论上说为所有历次的error之和。       //这里的意思是:schedulingDelay不为0,说明以前处理的数据多了,      //超过了系统在`批次间隔时间内`能处理的数据,需要把这些多的数据量从下次获取中删掉,        // (in elements/second)        val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis        // 这项可以忽略,因为derivative 默认为0,单位为elements/(second ^ 2)        val dError = (error - latestError) / delaySinceUpdate      // 这个是计算公式,proportional 为比例增益,默认为1,     // integral为积分增益,默认为0.2,    //derivative为微分增益,默认为0        val newRate = (latestRate - proportional * error -                                    integral * historicalError -                                    derivative * dError).max(minRate)        latestTime = time        if (firstRun) {          latestRate = processingRate          latestError = 0D          firstRun = false          None        } else {          latestRate = newRate          latestError = error          Some(newRate)        }      } else {        None      }    }  }上面的数学原理我还没有搞懂,但是主要的目的是消除调度延时并尽可能增大吞吐量
    * x8 a. U$ c  E8 L1 w1 \1 ]2.4 长时容错# F8 d. c* B- x" ]. i

    . \) e- `  e% ]长时容错分为executor端的容错driver端的容错
    3 S" @4 |, ]" ^/ k' l0 U) C2.4.1 相关的配置选项
    9 V& H% U6 Q9 Y# c+ b( @( Q/ b( W
    " u9 E; l7 _/ m% }* `2 H配置项默认值描述spark.streaming.receiver.writeAheadLog.enablefalsereceiver是否启用WALspark.streaming.checkpoint.directorynullcheckpoint目录,也是WAL目录,也可以使用StreamingContext$checkpoint方法设置,方法优先spark.streaming.receiver.writeAheadLog.rollingIntervalSecs60sReciver WAL文件滚动的周期spark.streaming.driver.writeAheadLog.rollingIntervalSecs60sdriver WAL文件滚动的周期2.4.1 executor端的容错$ g3 j7 D6 j& k6 U. R0 m+ A- H
    / E; m* ~* D1 W$ H
    主要是对Receiver接收到的数据进行容错,容错类别如下:4 U; a/ q. [& q* C7 z8 V0 u

      ' v1 w6 F! o3 e4 r1 l
    • 热备:存放在Executor端的BlockManager中,存储级别通常为副本2,能为Executor挂掉容错
      6 v4 U! Y" @! \
    • 冷备:  通过WAL存放到hdfs中,能为整个任务失败容错,数据同时也会进行热备6 S& w" k/ P5 t% I0 Z$ N
    • 重放:数据源支持重放,如kafka,偏移量倒回到失败时的状态,其实这是driver端的容错
      % F& Z0 j7 l0 K- }- O! g
    • 忽略:数据可以丢失
      " E' v8 t9 x/ ]
    主要说一下冷备:
    % {4 l) W3 [& ?4 l
      1 Q5 v0 x. ~6 A
    • WAL输出文件: $checkpointDir/receivedData/$streamId/log-${写入开始时间}-${写入开始时间 + rollingIntervalSecs * 1000}
        R) ?' o3 l  z, y9 U% a; M8 Q
    • WAL输出文件每隔spark.streaming.receiver.writeAheadLog.rollingIntervalSecs滚动一次,避免文件过大9 q) l( o* l7 Y* u. g, D
    • 单条数据的存储格式为:序列化后长度+数据;返回一个FileBasedWriteAheadLogSegment,包含了4 d' U& S3 j1 C, g7 r

      . @- I  A: [$ l& a: a
    • 数据所在的文件
      / N% Q+ w$ ]3 N. }( q# N0 E( H
    • 数据所在的偏移量' g" n' q) Y" G9 X
    • 数据的长度
      5 G! P( d' F7 P  u7 C$ r3 L5 w
      0 `3 O4 s% g% g
    • 最后通知ReceiverTracker存储这些数据的元数据信息ReceivedBlockInfo,主要由如下几部分组成
      2 q2 h# F* H: T: I2 Q

      + s0 r4 o; P, Z0 ]
    • 在BlockManager的blockId, N$ [, q# g) V  B& E
    • 记录条数( e- Z: E' f. k& ]! @0 L
    • 上步返回的FileBasedWriteAheadLogSegment
      9 m" R1 [) @4 g

      + t  H4 I) k* e
    • 在恢复时,ReceiverInputDStream获取数据时,ReceiverTracker会将保留的数据给它,让它通过FileBasedWriteAheadLogSegment中的信息去读取数据。所以,一个ReceivedBlockInfo对应于一个Task( Z) R7 I/ Q7 w1 M* l! x9 K
    2.4.2 driver端的容错
    8 L/ g- @* M1 P2 I* `. j( B* A+ f4 M$ O; i
    数据的容错,一是对ReceivedBlockTracker接收到的元数据进行容错
    回复

    使用道具 举报

    该用户从未签到

    5

    主题

    120

    帖子

    245

    积分

    中级会员

    Rank: 3Rank: 3

    积分
    245
    发表于 7 天前 | 显示全部楼层
    学习了学习了学习了
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    快速回复 返回顶部 返回列表