对一个 topology,JStorm 最终会调度成一个或多个 worker,每个 worker 即为一个真正的操作系统执行进程,分布到一个集群的一台或者多台机器上并行执行。
而每个 worker 中,又可以有多个 task,分别代表一个执行线程。每个 task 就是上面提到的组件(component)的实现,要么是 spout 要么是 bolt 。
如上图所示, worker 是一个独立 JVM 的进程, 它其实是由 Supervisor 通过命令行执行 Worker#main 方法来启动. worker 进程内部, 运行着许多线程, 包括: Task 线程、序列化/反序列化线程等. 其对应的代码为: com.alibaba.jstorm.daemon.worker.Worker
注意: Storm 与 JStorm 的 worker 模型有所不同, JStorm 移除了 executor 的概念, 详见“JStorm 中 task 与 executor 的关系”.
启动
worker 有两种被启动的方式, 这个在 supervisor 一节已经提到.
- (本地调试) 调用 mk_worker 方法来启动
- (线上部署) 通过命令行调用 main, 在新的 JVM 中启动 Worker
在 mk_worker 中, 执行了如下操作:
- 打日志( sb 这个变量名我很喜欢)
- new Worker ,并设置必要的属性 ( 这里设置的属性真的太多了, 得再开一个章节细讲 ). 总的来说是以下事情:
- 算出当前worker会启动的task
- todo
- 执行 execute
在 execute 方法内, worker 完成了初始化和启动 task 的工作, 主要做了以下事情:- worker 之间互连并启动一个线程监控变化,如果worker任务变更会与启停 worker 重连
- 监控 topology 是否 active 并将这个状态赋给 storm-active-atom 变量,task 根据这个变量决定是否调用 spout 的 nextTuple
- worker 启动线程来执行具体的 tasks
让我们详细看一下 execute 方法内部
在第一步中: 创建并启动分发控制信息的线程
// create recv connection, reduce the count of netty client reconnect |
- 使用Disruptor的队列建立了一个属于worker的 recvControlQueue . 不过到目前为止,没有足够的信息告诉我们这个队列是在worker的生命周期中起到什么作用的.
- 对recvControlQueue 的性能指标进行跟踪( 使用dropwizard/metrics,详见jstorm metric )
- 使用netty创建了一个server, 对指定端口进行监听. NettyServer 会将收到的消息放入 recvControlQueue 和 deserializeQueues ( 忘了 deserializeQueues 是啥? 看下这篇“tuple 在 整个拓扑中的流转过程” )中
- 新建并启动了一个 VirtulPortCtrlDispatch , 这又是个啥东西呢. 似乎只是用来在启动的过程中, 向各个 task 发送控制信息.
上面所述的 recvControlQueue 中的元素会按照taskId 被分发到 VirtulPortCtrlDispatch 持有的 controlQueues 存放的task的 ctrlQueue 中.
也就是 recvControlQueue 中装的东东会被放到 VirtulPortCtrlDispatch.controlQueues 的 ctrlQueue中.那么问题来了, recvControlQueue 中的元素又是哪来的呢? (这个控制信息猜测是后续的)
在第二步中: 创建并启动”维护task之间连接”的线程
创建了一个 RefreshConnections .
为了配置这个类,需要从上下文中取出本次拓扑的拓扑结构,以及当前 worker 将会启动的 task 的taskid 列表 然后通过这两个信息算出当前 worker 会和那些 task 进行连接,不区分是本 worker 和其他 worker
主要工作:
该类会在 worker 的运行过程中,定时去做下面这些事:
维护 task 之间的通信(目前使用的是 netty ):包括新建连接,移除旧的连接。(因为 worker 是可能挂掉掉并重启的,此时 task 之间的链接就需要重新维护)
其中会去计算出 task 的下游 task 在哪个 worker 中. 然后启动 NettyClient 去 connect 下游 worke r 的 port.
在第三步中:创建并启动维护zk状态的线程
在第四步中:创建并启动发送控制信息的线程
DrainerCtrlRunable 这名字取得挺有意思的, 排水控制?
主要工作:
//从transferCtrlQueue里消费 |
这家伙会发送控制信息
TaskMessage message = new TaskMessage(TaskMessage.CONTROL_MESSAGE, targetTask, tupleMessage); |
第五步中:创建并启动心跳维护线程
??维护心跳,谁的心跳?怎么维护?为什么要维护?
第六步中:为worker创建一个metric统计的类
(如果后续datacore想要维护好性能的话,可以考虑加入这个东西来统计各个因子的数据。但是这个对代码改动还挺多的,因此还是看情况吧)
第七步中: 创建并启动task线程. (终于创建了task)
这里已经创建并启动了 task 线程.
第八步中:创建并启动n个序列化,反序列化线程。
这个n还挺讲究的,有专门的算法,代码先贴在下面,可以考虑下为啥要这么计算
最后一步: 初始化步骤结束
经过哐哐哐一通创建,现在有了好多个不同的线程,这些线程被塞到workData里,然后一起返回给了 mk_worker 方法的调用者。
sd.join是说,当worker所创建的所有线程都运行结束后,worker线程才结束.
那么到这里,worker已经完成了所有启动需要的操作.
运行时
todo