前置条件

storm 1.1.1

zookeeper,分布式协调服务,HA。

kafka和logstash的整合

1
2
kafka-server-start.sh -daemon /home/hadoop/app/kafka_2.11-0.9.0.0/config/server.properties
kafka-topics.sh --create --zookeeper hadoop001:2181 --replication-factor 1 --partitions 1 --topic logstash_topic

logstash2.4.1
file_kafka.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
input {
file {
path => "/tmp/access.log"
}
}

output {
kafka {
codec => json
topic_id => "logstash_topic"
bootstrap_servers => "hadoop001:9092"
batch_size => 1
}
}

./bin/logstash -f file_kafka.conf

架构和部署

storm架构

主:nimbus,task的指派和分发、资源的分配
从:supervisor,执行任务的具体部分。启动和停止多个worker(配置指定)。一个topology可以运行在多个worker上,也可以通过配置来指定

主从是无状态的,元数据存放在zk中。

worker:运行具体组件逻辑(Spout & Bolt)的进程

=================

task:Spout和Bolt中的每一个线程
executor:Spout和Bolt可能共享一个线程

Nimbus从1.0.0是HA的了。

安装指南

编辑conf/storm-env.sh导出JAVA_HOME。

1
2
3
4
5
6
7
8
9
10
11
# zk
nohup storm dev-zookeeper &
# nimbus
nohup storm nimbus &
# ui (默认8080端口)
nohup storm ui &
# supervisor
nohup storm supervisor &
# logviewer(通过uid查看日志)
nohup storm logviewer &
# 杀掉进程使用kill -9

通过访问storm web ui可以发现启动了2个nimbus,主要是为了HA。

提交任务

1
storm jar ~/lib/storm-1.0.jar io.github.ClusterSumStormTopology
1
2
3
4
# 查看正在运行的topology
storm list
# 杀掉topology
storm kill ClusterSumStormTopology

并行度

并行度

  • 一个worker进程执行的是一个topo的子集
  • 一个worker进程会自动1-n个Executor线程来执行一个topo的component
  • 一个运行的topo就是由集群中的多台物理机上的多个worker进程组成

executor是一个被worker进程启动的单独线程,每个executor只会运行1个topo的1个component
task是最终运行spout或者bolt的最小执行单元

默认:
一个supervisor节点启动4个worker进程,启动4个slots(supervisor.slots.ports)。
每个topo默认启动一个worker进程
每个worker进程会启动一个executor
每个executor启动一个task

提交作业之后执行jps -l

1
2
3
4
5
6
7207 org.apache.storm.ui.core
7288 org.apache.storm.daemon.logviewer
7803 org.apache.storm.daemon.worker
7164 org.apache.storm.daemon.nimbus
7229 org.apache.storm.daemon.supervisor.Supervisor
7789 org.apache.storm.LogWriter

分组策略

A Stream grouping defines how that stream should be partitioned among the bolt’s tasks.

Streaming Groupings

DRPC

常见的序列化方式avro,thrift

参考资料

Storm内部原理分析
Storm并行度