Twitter Storm進階初步設置
本篇Blog是一個簡單的Storm入門例子,目的讓讀者明白Storm是怎樣的運行機制。以及后續(xù)會放出的幾篇Storm高級特性以及最終將Storm融入Hadoop 2.x的YARN中。目的讀者是已經(jīng)進階大數(shù)據(jù)的Hadoop,Spark用戶,或者了解Storm想深入理解Storm的讀者用戶。
項目Pom(Storm jar沒有提交到Maven中央倉庫,需要在項目中加入下面的倉庫地址):
- <repositories>
- <repository>
- <id>central</id>
- <name>Maven Repository Switchboard</name>
- <layout>default</layout>
- <url>http://maven.oschina.net/content/groups/public/</url>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </repository>
- <repository>
- <id>clojars</id>
- <url>https://clojars.org/repo/</url>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- <releases>
- <enabled>true</enabled>
- </releases>
- </repository>
- </repositories>
- <dependencies>
- <dependency>
- <groupId>org.yaml</groupId>
- <artifactId>snakeyaml</artifactId>
- <version>1.13</version>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.3.3</version>
- </dependency>
- <dependency>
- <groupId>org.clojure</groupId>
- <artifactId>clojure</artifactId>
- <version>1.5.1</version>
- </dependency>
- <dependency>
- <groupId>storm</groupId>
- <artifactId>storm</artifactId>
- <version>0.9.0.1</version>
- </dependency>
- <dependency>
- <groupId>storm</groupId>
- <artifactId>libthrift7</artifactId>
- <version>0.7.0</version>
- </dependency>
- </dependencies>
下面是一個Storm的HelloWord的例子,代碼有刪減,熟悉Storm的讀者自然能把代碼組織成一個完整的例子。
- public static void main(String[] args) {
- Config conf = new Config();
- conf.put(Config.STORM_LOCAL_DIR, "/Volumes/Study/data/storm");
- conf.put(Config.STORM_CLUSTER_MODE, "local");
- //conf.put("storm.local.mode.zmq", "false");
- conf.put("storm.zookeeper.root", "/storm");
- conf.put("storm.zookeeper.session.timeout", 50000);
- conf.put("storm.zookeeper.servers", "nowledgedata-n15");
- conf.put("storm.zookeeper.port", 2181);
- //conf.setDebug(true);
- //conf.setNumWorkers(2);
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("words", new TestWordSpout(), 2);
- builder.setBolt("exclaim2", new DefaultStringBolt(), 5)
- .shuffleGrouping("words");
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", conf, builder.createTopology());
- }
Config.STORM_LOCAL_DIR是配置一個本地路徑,Storm會在這個路徑寫入一些配置信息和臨時數(shù)據(jù)。
Config.STORM_CLUSTER_MODE是運行模式,local和distributed兩個選項,即本地模式和分布式模式。本地模式在運行時時多線程模擬的,開發(fā)測試用;分布式模式在分布式集群下是多進程的,真正的分布式。
Storm的Spout和Blot高可用是通過ZooKeeper協(xié)調(diào)的,storm.zookeeper.root是一個ZooKeeper地址,并且有對應的端口號
Debug是測試模式,有更詳細的日志信息。
TestWordSpout是一個Storm自帶的例子,用來隨機的產(chǎn)生new String[] {“nathan”, “mike”, “jackson”, “golda”, “bertels”};列表中的字符串,用來提供數(shù)據(jù)源。
其中DefaultStringBolt的源碼:
- OutputCollector collector;
- public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
- public void execute(Tuple tuple) {
- log.info("rev a message: " + tuple.getString(0));
- collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
- collector.ack(tuple);
- }
運行日志:
- 10658 [Thread-29-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
- 10658 [Thread-31-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
- 10758 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
- 10758 [Thread-33-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
- 10859 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
- 10859 [Thread-29-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels
- 10961 [Thread-31-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
- 10961 [Thread-33-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
- 11061 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
- 11062 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
- 11162 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels
- 11163 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
數(shù)據(jù)由一個Storm叫做噴嘴(Spout,也相當一個水龍頭,能產(chǎn)生數(shù)據(jù)的來源端)產(chǎn)生,然后傳遞給后端一連串的的Blot,最終被轉(zhuǎn)換和消費。而Spout和Blot都是并行的,并行度都可以自己設置(本地運行是靠多線程模擬的)。如:
- builder.setSpout("words", new TestWordSpout(), 2);
- builder.setBolt("exclaim2", new DefaultStringBolt(), 5)
噴嘴TestWordSpout的并行度是2,DefaultStringBolt的并行度是5.
從日志可以看出,數(shù)據(jù)經(jīng)過噴嘴到達預先定于的一個Blot,打印了日志。我測試代碼設置的并行度是5,日志中統(tǒng)計,確實是5個線程:
- Thread-29-exclaim2
- Thread-31-exclaim2
- Thread-26-exclaim2
- Thread-33-exclaim2
- Thread-35-exclaim2
關(guān)于Storm是是什么?這里有詳細的介紹。
借用OSC網(wǎng)友的話說,Hadoop就是商場里自動升降式的電梯,用戶需要排隊等待,選按樓層,然后到達;而Storm就像是自動扶梯,扶梯預先設置好運行后,來人就立即運走,目的地是明確的。
Storm按我的理解,Storm和Hadoop是完全不同的,設計上也沒有半點擬合的部分。Storm更像是我之前介紹過的Spring Integration,是一個數(shù)據(jù)流系統(tǒng)。它能把數(shù)據(jù)按照預設定的流程,把數(shù)據(jù)做各種轉(zhuǎn)換,傳遞,分解,合并,***數(shù)據(jù)到達后端存儲。只不過Storm是可以分布式,而且分布式的能力也是可以自己設置。
Storm的這種特性很適合大數(shù)據(jù)類的ETL系統(tǒng)開發(fā)。





















