Storm Tutorial
题图来自网络
2 预备
教程中使用的示例来自 storm-starter 项目。推荐克隆这个项目到本地,并跟随这里面的例子学习。
3 Storm集群
从表面上看,Storm 集群和 Hadoop 集群很像。在 Hadoop 上运行的是 "MapReduce jobs",而在 Storm 上运行的是"topologies"。
"Jobs" 和 "topologies" 差异很大:一个最关键的区别是 MapReduce job 最终会运行完,而 topology 会一直处理(除非你杀掉这个拓扑进程)。
Storm 集群上有两种节点:master 节点和 worker 节点。
Master 节点上运行一个叫做 "Nimbus" 的守护进程(和 Hadoop 的 "JobTracker" 类似),Nimbus 负责在集群中分发代码,为工作节点分配任务,并监控故障。
Worker 节点上运行一个叫做 "Supervisor" 的守护进程。
Supervisor 监听 Nimbus 分发给自己的工作,启动或者停止 worker 进程。
每个 worker 进程执行拓扑的一个子集。正在运行的拓扑在不同的机器上运行着众多的 worker 节点。
Nimbus 和 Supervisors 之间的所有协调配合都是 Zookeeper 集群完成。
此外,Nimbus 守护进程 和 Supervisor 守护进程是快速失败且无状态的,所有的状态信息都由Zookeeper或者本地磁盘保存。
这意味着当你杀掉 -9 Nimbus
或者 Supervisors
,他们会立即开始重启,就像什么也没有发生过一样,这种设计使得 Storm 集群相当稳定。
4 拓扑结构
在 Storm 上做实时计算,需要创建 "topologies"。一个拓扑结构是一个计算过程图。拓扑结构中的每个节点包含一段处理逻辑,节点之间的连接线显示了数据在节点间如何传递。 运行一个拓扑结构是非常简单的。首先,将你所有的代码和依赖打包成一个 jar 文件。然后,你运行下面这段命令:
storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
这里运行的是 backtype.storm.MyTopology
这个类,传入了参数 arg1
和 arg2
。这个类的 main
方法定义了拓扑结构,并且将它提交给 Nimbus。Storm jar 负责连接 Nimbus,上传你的 jar 包。
由于拓扑定义采用的是 Thrift 结构,Nimbus 也是一个 Thrift 服务,所以可以用任意一种编程语言创建并提交拓扑结构。
上面的例子是基于 JVM 语言的最简单方法。更多关于启动停止拓扑结构的信息请参见 Running topologies on a production cluster 。
5 流
Storm 的核心抽象是 "stream"(流)。Stream 是元组 "tuple" 的无限序列。Storm 提供了一系列原语,以分布式和可靠的方式将流转换为一个新的流。例如,你可以将 tweets 流转换成热门话题流。 Storm 为流转换提供的最基本原语是 "spouts" 和 "bolts"。"spouts" 和 "bolts" 都提供了接口,你可以实现它们来运行自己的特定应用逻辑。 Spout 是流的输入源。例如,一个 spout 会读取一个 Kestrel 队列,然后作为一个新的流发出。或者一个 spout 可以连接到 Twitter API,然后作为tweets流发出。 Bolt 消耗所有的输入流,处理加工,再形成新的流发出。复杂的流转换,比如说从tweets流中计算出热门话题流,需要多个步骤,因此需要多个bolts。 Bolts 可以做各种操作:运行方法,过滤元组,流聚合,流连接,数据库会话等等。 Spouts 和 bolts 组成的网络结构被封装进 "topology"(拓扑是提交到 Storm 集群以运行的顶层抽象)。 一个拓扑结构是一个流转换图,它的每一个节点都是一个 spouts 或者 bolts。图中的边表示哪些 bolts 订阅哪些 streams。 当一个 spout 或者 bolt 发出一个元组到 stream,这个元组就会被发送到所有订阅了这个流的 bolts。
节点之间的链接展示了元组是如何传递的。例如,Spout A 和 Bolt B 之间有一个链接, Spout A 到 Bolt C 有一个链接, Bolt B 到Bolt C 有一个链接, 每当 Spout A 发出一个元组,这个元组就会被发送到 Bolt B 和 Bolt C。所有 Bolt B 的输出也都会发送到 Bolt C 。 一个 Storm 拓扑结构中的每个节点都是并行执行的。在你的拓扑中,你可以指定每个节点的并行度,然后 Storm 会在整个集群中产生指定数量的线程去执行。 一个拓扑结构会一直运行,除非你杀掉这个进程。Storm 会自动重新分配失败的任务。此外,Storm 能保证不会有数据丢失,就算机器宕机消息被丢弃。
6 数据模型
Storm 中使用 tuples(元组)作为数据模型。 元组是 key-value 列表,元组的一个字段可以是任何类型的对象。 开箱即用,Storm 支持所有的基本类型,字符串,字节数组。如果要使用一种新类型,你只需要让它实现 serializer 接口。 拓扑中的每个节点都必须声明它发出元组的输出域。例如,下面的 bolt 发出了2元组到域 "double" 和 "triple":
public class DoubleAndTripleBolt extends BaseRichBolt { private OutputCollectorBase _collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) { _collector = collector; } @Override public void execute(Tuple input) { int val = input.getInteger(0); _collector.emit(input, new Values(val*2, val*3)); _collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("double", "triple")); } }
declareOutputFields
方法声明了输出域 ["double", "triple"]
. DoubleAndTripleBolt
的其余代码会在接下来的章节说明。
7 一个简单的拓扑示例
我们来看一个简单的拓扑示例,更加深入的了解概念,看代码是如何成形。下面是来自 storm-starter 项目的例子 ExclamationTopology:
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("words", new TestWordSpout(), 10); builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("words"); builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
上面这个 topology 包含一个 spout 和两个 bolts。spout 发出单词,每个 bolt 在输入的单词后面追加 "!!!" 字符串。 3个节点排列成一条线形结构,spout 发出元组给第一个bolt,第一个bolt发出给第二个bolt。 例如:
- spout 发出元组
["bob"]
和["john"]
- 然后第一个bolt发出
["bob!!!"]
和["john!!!"]
- 最后第二个bolt发出
["bob!!! !!!"]
和["john!!! !!!"]
这段代码用 setSpout 和 setBolt 方法定义了3个节点。
这2个方法都有3个入参,一个用户指定的id,一个包含处理逻辑的对象,以及你希望的节点并行数量。
Spout 的入参id是 "words" ,bolts 的入参id分别是"exclaim1" 和 "exclaim2".
包含处理逻辑的对象分别实现了接口 IRichSpout=、 =IRichBolt
。
最后一个参数,节点并行度,是可选的。它表示集群里有多少线程执行这个节点。如果忽略不填,Storm只会分配一个线程给那个节点。
setBolt 返回一个 InputDeclarer
对象,用来定义 bolt 的输入。
这里,组件 "exclaim1" 声明了它会读取所有 "words" 组件随机分发的元组,而组件 "exclaim2" 声明了它会读取所有 "exclaim1" 组件随机分发的元组。
"shuffle grouping" 的意思是元组会被随机地从输入任务分发给处理任务。有很多方法类分组数据。这些将在几个章节来说明。
如果你想让组件 "exclaim2" 读取所有 "exclaim1" 和 "words" 组件发出的元组,你可以像下面这样定义 "exclaim2" 组件:
builder.setBolt("exclaim2", new ExclamationBolt(), 5) .shuffleGrouping("words") .shuffleGrouping("exclaim1");
如你所见,bolt 可以链式的声明输入,来指定多个数据源。
我们深入挖掘下这个拓扑结构中 spouts 和 bolts 的实现。
Spouts 负责向拓扑结构中发出新消息。TestWordSpout 从字符串列表 ["nathan", "mike", "jackson", "golda", "bertels"] 中随机的发出一个单词作为一个1元组,频率是每100ms一次。
TestWordSpout
类 nextTuple()
方法的实现如下:
public void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); }
如你所见,实现非常简单。
ExclamationBolt
类在输入单词后边追加字符串"!!!"。让我们看下ExclamationBolt的完整实现代码:
public static class ExclamationBolt implements IRichBolt { OutputCollector _collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } @Override public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple); } @Override public void cleanup() { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public Map getComponentConfiguration() { return null; } }
prepare
方法提供了一个 OutputCollector
用来发出元组。这个 bolt 可以在任何时间,从 prepare
, execute
, or cleanup
等方法中发出元组,甚至可以在另一个线程中异步发出。
这里的 prepare
方法只是简单地把 OutputCollector
保存为实例变量,以便后面在 execute
方法中使用。
execute
方法从 bolt 的一个输入中接收了一个元组。 ExclamationBolt 获得这个元组的第一个字段,然后追加上字符串"!!!",再向拓扑中发出。
如果你实现的bolt订阅了多个输入源,你可以通过 Tuple#getSourceComponent
方法找出当前 Tuple 是哪个组件发出的。
execute
方法还做了其它的事情,即输入的元组作为第一个参数往下传,最后一行又确认了输入元组。这些是 Storm's 可靠性 API 的一部分,它们会保证数据不丢失,将在本教程后面解释。
cleanup
方法会被调用,当 bolt 关闭再打开时,需要清理资源的话。无法保证本方法在分布式集群上能被调用:如果运行任务的机器扩容,就没办法调用这个方法。
cleanup
方法的目的是,当你以本地模式(在一个进程里模拟一个Storm集群)运行拓扑时,希望它能跑起来并且杀掉多个其它的拓扑进程且不造成任何资源泄漏。
declareOutputFields
方法声明了 ExclamationBolt
发出的一元组所在的域,叫做 "word"。
getComponentConfiguration
方法允许你配置各个方面,来决定组件如何运行。 Configuration 部分会对这个更高级的问题有更多说明。
cleanup
和 getComponentConfiguration
这类方法都需要在一个 bolt 类里面实现。你可以使用基类提供的默认实现,更简洁的定义自己的bolt。
ExclamationBolt 通过继承 BaseRichBolt 会写得更简洁,就像下面这样:
public static class ExclamationBolt extends BaseRichBolt { OutputCollector _collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } @Override public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
8 在本地模式下运行上述示例
我们来看下如何在本地模式运行 ExclamationTopology
类,以及它怎样发挥作用。
Storm 有两种操作模式:本地模式+分布式模式。
在本地模式下,通过用线程模拟工作节点,Storm 在进程中完整地执行。本地模式对于开发和测试拓扑是非常有用的。
当你运行 storm-starter
项目里面的拓扑示例时,他们就是运行在本地模式下,而且你能看到每个组件发出的消息是怎样的。本地模式下运行拓扑的更多信息,请参考 Local mode。
在分布式模式下,Storm 在集群上运行。当你提交一个拓扑给主节点,你也同样要提交所有必要的代码以运行拓扑。主节点会分发你的代码,分配工作节点,运行拓扑。 如果工作节点挂了,主节点会重新分配工作节点。分布式模式下运行拓扑的更多信息,请参考 Running topologies on a production cluster 。
下面是一个本地模式下运行ExclamationTopology 类的例子:
Config conf = new Config(); conf.setDebug(true); conf.setNumWorkers(2); //本地模式 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown();
首先,这段代码通过创建 LocalCluster
对象,定义了一个进程内的虚拟集群。接着它调用 submitTopology
方法提交一个拓扑到本地集群 LocalCluster 。
提交拓扑到分布式集群和到本地虚拟集群的方式是一样的。
submitTopology
方法有三个参数,第一个是拓扑名字,第二个是拓扑的配置,第三个是拓扑本身。
拓扑名字用来标识一个拓扑,这样后续可以杀掉这个进程。一个拓扑会一直运行下去除非你杀掉它的进程。
拓扑配置用来调配运行拓扑的各个方面。这里的2个设置是非常通用的:
- TOPOLOGY_WORKERS (用 setNumWorkers 方法设置) ,指定了集群分配多少个工作进程去执行拓扑。拓扑中的每个组件要执行尽可能多的线程。分配给组件的线程数量是通过
setBolt
和setSpout
方法配置的。这些线程存在于工作进程里面。每个工作进程包含了部分线程去执行部分组件。举个例子,你所有的组件一共指定了300个线程,配置里面指定了50个工作进程。每个工作进程会执行6个线程,每个线程可能属于不同的组件。你需要调整每个组件的并行度以及线程运行在工作进程的数量,来提高 Storm 拓扑的性能。 TOPOLOGY_DEBUG
(用setDebug
方法设置),如果设置成 =true=,Storm 会记录每一个组件每一次发出的每一条信息。在本地模式下调试拓扑时是很有用的,但是在分布式集群上运行拓扑时可能希望关掉它。
针对拓扑还有很多配置。更多详细配置请参考 the Javadoc for Config 。 学习搭建开发环境,在本地模式下运行你的拓扑,请参看 Creating a new Storm project 。
9 流分组
流分组告诉拓扑怎样在2个组件之间发送元组。记住,spouts 和 bolts 在集群上多任务并行执行。一个拓扑结构在任务级别上执行看起来就像下面这样:
当工作节点 Bolt A 的一个任务(线程)发出元组给工作节点 Bolt B,它到底要发送元组到 Bolt B 的哪个任务(线程)? "stream grouping" 回答了这个问题:告诉Storm怎样在任务集合之间发送元组。 在我们深入探究流分组的不同类型之前,我们先看看storm-starter项目里面的另一个拓扑。 WordCountTopology 读取一个 spout 给出的句子,再从 WordCountBolt 流出每个单词已经出现的次数。
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sentences", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("sentences"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
SplitSentence 为每个句子中的每个单词发出一个元组,当它接收数据时。 WordCount 在内存中维护了一个从单词映射到单词个数的 map。 每当 WordCount 收到一个单词,他就会更新这个单词的个数,并且发出一个新的单词个数。 下面介绍几种不同的流分组:
最简单的分组叫做"shuffle grouping" ,它随机发送一个元组给一个任务。
WordCountTopology 中使用了这种流分组,将 RandomSentenceSpout
类产出的元组发送给 SplitSentence
。它能均匀分发元组给所有的 SplitSentence
。
一个更有趣的分组叫做"fields grouping"。SplitSentence 和 WordCount bolt之间使用了字段分组。对于 WordCount 的功能(同一个单词由同一个任务处理)来说是至关重要的。 否则,不止一个任务会看到同一个单词,导致它们计算出错误的单词个数。字段分组通过字段子集来组织一个数据流。这样就让同样的字段子集归属同一个任务。 WordCount 订阅了 SplitSentence 的输出流,通过按照"word"字段进行分组,同一个单词总是由同一个任务(线程)执行,这样就能产出正确的结果。 字段分组是实现流连接和流聚合以及其它功能的基础。在底层,字段分组由 哈希算法-除留余数法 来实现。
还有一些其它的流分组。更多详情请参考 Concepts 。
10 用其它编程语言定义Bolts
任何编程语言都可以定义 Bolts。其它语言(非 java )编写的 Bolts 是以子进程方式执行的,Storm 和子进程通信是基于输入输出之间的 JSON 消息。
这个通信协议只需要一个100行左右的适配器库支持,而且 Storm 自带了 Ruby, Python, 和 Fancy 的适配器库。
下面是一个出自 WordCountTopology
类的 SplitSentence bolt 定义:
public static class SplitSentence extends ShellBolt implements IRichBolt { public SplitSentence() { super("python", "splitsentence.py"); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
SplitSentence 覆盖了 ShellBolt
,构造函数传入 splitsentence.py 参数,声明了会用 python 语言运行。下面是 splitsentence.py 的代码实现:
import storm class SplitSentenceBolt(storm.BasicBolt): def process(self, tup): words = tup.values[0].split(" ") for word in words: storm.emit([word]) SplitSentenceBolt().run()
用其它语言编写 spouts 和 bolts 以及创建拓扑的更多信息,请参考 Using non-JVM languages with Storm 。
11 保证消息处理
在之前的教程中,我们跳过了发送元组的其他方面问题,这些方面都是 Storm 可靠性API的一部分: Storm 如何保证 spout 发出的每一条消息都被执行,以及作为一个用户应该怎样利用Storm可靠性功能的优势,请参考 Guaranteeing message processing 。
12 事务性拓扑
Storm 确保每一条消息在拓扑中至少被处理一次。一个经常被问到的问题是“基于 Storm 如何完成类似于计数的需求”? 至少处理一次不会造成计数过高么?Storm有一个特征,叫做"transactional topologies"事务性拓扑。它能在多次计算中达到恰好一次的消息语义。更多内容请参考 Guaranteeing Message Processing 。
13 分布式RPC
本教程展示了基于 Storm 如何进行基本的流计算。利用 Storm 原语你还可以做很多其他的事情。 Storm 最有意思的应用之一是 Distributed RPC(分布式RPC),这个应用并行运着密集的计算功能。更多 Distributed RPC 请参考 Distributed RPC 。
14 结论
本教程讲述了开发、测试、部署 Storm 拓扑的宽泛介绍。文档的其它部分深入讲解了试用 Storm 的各个方面。