我对你的敬仰犹如滔滔江水连绵不绝,又如黄河泛滥一发不可收拾。 – 周星驰

流处理概述

环境配置

参考spark-sql中的环境配置,目录结构同上,jdk和spark版本升级如下:

jdk:1.8
spark:2.2

实时流处理框架和架构

  • Strom: 真实时,来一条处理一条
  • Spark Streaming: 基于自定义时间间隔(2s,3s)的批量处理。微批处理
  • Kafka:国内使用较少
  • Flink:离线批处理 + 实时

实时流处理架构

为什么要将Flume收集的日志放入Kafka而不是直接丢给实时处理系统?

一般情况下业务有高峰期和低峰期,高峰期如果大量日志直接命中实时流处理系统,流处理系统可能扛不住压力,所以一般情况下会加一层MQ用来缓冲。

实时流处理的应用

电信行业,如果用户流量快用完的时候发短信订购流量包
电商行业,双十一大屏;搜索A商品的时候可能会向你推荐B商品,因为A和B可能有关联关系

大数据分析可以提高转化率。

分布式日志收集框架Flume

解决什么问题

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data.

有多台服务器上运行着我们的应用程序,各个机器上有os日志,web server日志,application的日志。假设我们需要将这些日志要收集到hadoop集群上就需要定时执行scp。实时性和容错性得不到保证,文本文件的格式一般需要压缩传输。这些问题flume都帮我们解决了。只需要配置文件就OK了。类似框架还可以使用LogStash。

架构和核心组件

架构和核心组件

source:采集
channel:聚集,类似于数据的缓存池,临时存放的地方
sink:输出

常用架构

环境部署

下载解压,配置系统环境变量:

1
2
export FLUME_HOME=/home/hadoop/app/apache-flume-1.6.0-cdh5.7.6-bin
PATH=$FLUME_HOME/bin:$PATH

配置文件在conf目录下,cp flume-env.sh.template flume-env.sh,导出$JAVA_HOME.

从指定端口收集数据到控制台

一个简单的例子

a1:agent的名称
r1:source的名称
k1:sink的名称
c1:channel的名称

example.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop001
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/example.conf --name a1 -Dflume.root.logger=INFO,console

使用telnet连接到hadoop001 44444并发送数据,可以看到agent输出。

1
(SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F 0D                               hello. }

Event是Flume数据传输的基本单环,Event = 可选的header + byte array

实时监控文件采集新增数据到控制台

agent选型:exec source + memory channel + logger sink

example.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/data.log
a1.sources.r1.shell = /bin/sh -c

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

将A服务器上的日志实时采集到B服务器

一个agent的输出作为另一个agent的输入

exec source + memory channel + avro sink
avro source + memory channel + logger sink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# exec-memory-avro.conf
exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel

exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /home/hadoop/data/data.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c

exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = hadoop001
exec-memory-avro.sinks.avro-sink.port = 44444

exec-memory-avro.channels.memory-channel.type = memory

exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel

# avro-memory-logger.conf 这个要先启动
avro-memory-logger.sources = avro-source
avro-memory-logger.sinks = logger-sink
avro-memory-logger.channels = memory-channel

avro-memory-logger.sources.avro-source.type = avro
avro-memory-logger.sources.avro-source.bind = hadoop001
avro-memory-logger.sources.avro-source.port = 44444

avro-memory-logger.sinks.logger-sink.type = logger

avro-memory-logger.channels.memory-channel.type = memory

avro-memory-logger.sources.avro-source.channels = memory-channel
avro-memory-logger.sinks.logger-sink.channel = memory-channel

消息队列Kafka

架构及核心概念

Kafka架构

每一条消息记录由key,value和timestamp组成。topic用于给消息记录打标签,消费者可以按照指定的标签进行消费。

部署和使用

先安装zookeeper,配置$ZK_HOMEcp zoo_sample.cfg zoo.cfg,配置dataDir=/home/hadoop/app/tmp/zk

使用./zkServer.sh start启动服务器之后就可以使用./zkCli.sh连接到服务器。zk-cli命令ls /ls /zookeeper

kafka版本kafka_2.11-0.9.0.0,下载解压并配置KAFKA_HOME。

单节点单broker

config/server.properties

1
2
3
4
broker.id=0
listeners=PLAINTEXT://:9092
log.dirs=/home/hadoop/app/tmp/kafka-logs
zookeeper.connect=hadoop001:2181
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 启动kafka
kafka-server-start.sh /home/hadoop/app/kafka_2.11-0.9.0.0/config/server.properties
# 创建topic
kafka-topics.sh --create --zookeeper hadoop001:2181 --replication-factor 1 --partitions 1 --topic hello_topic
# 查看所有topic
kafka-topics.sh --list --zookeeper hadoop001:2181

# 生产消息
kafka-console-producer.sh --broker-list hadoop001:9092 --topic hello_topic
# 启动一个消费者消费消息(注意官方文档中的--bootstrap-server要换成--zookeeper)
kafka-console-consumer.sh --zookeeper hadoop001:2181 --topic hello_topic --from-beginning

# 查看描述
kafka-topics.sh --describe --zookeeper hadoop001:2181

启动生产者和消费者之后就可以在生产者所在tty发送消息了,从而消费者可以收到消息。上面的cmd中消费者加了参数--from-beginning,消费者重启之后以前的消息会再次收到。

单节点多broker

在单台服务器上配置多台kafka。

将server.properties复制为server-1.properties,server-2.properties,server-3.properties并修改对应的broker.id,listeners,log.dirs

1
2
3
4
5
6
7
8
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties &
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties &
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties &

# 因为此时启动了3个broker所以replication-factor为3
kafka-topics.sh --create --zookeeper hadoop001:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
kafka-console-producer.sh --broker-list hadoop001:9093,hadoop001:9094,hadoop001:9095 --topic my-replicated-topic
kafka-console-consumer.sh --zookeeper hadoop001:2181 --topic my-replicated-topic

容错性测试

1
2
3
[hadoop@hadoop001 my-replicated-topic-0]$ kafka-topics.sh --describe --zookeeper hadoop001:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3

上面的描述信息显示1号broker是主,现在使用kill -9干掉2号(pid可以使用jps -m查看)

1
2
3
[hadoop@hadoop001 my-replicated-topic-0]$ kafka-topics.sh --describe --zookeeper hadoop001:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,3

Isr中2不见了。接下来干掉1号broker。可以发现消费者终端中出现了warning,但是发的消息还是可以收到。

1
2
3
[hadoop@hadoop001 my-replicated-topic-0]$ kafka-topics.sh --describe --zookeeper hadoop001:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 3 Replicas: 1,2,3 Isr: 3

Leader由1变成了3,因此只要有一个broker正常就可以了。

Kafka高级特性

消息事务

为什么要支持事务?满足:“读取-处理-写入”模式。

数据传输的事务定义:

最多一次:消息不会被重复发送,最多被传输一次(可能一次也不传输)
最少一次:消息不会被漏发送,最少被传输一次(可能被重复传输),目前多数消息系统采用
精确的一次:Exactly Once,不会被漏传输也不会重复传输,每个消息都被传输一次而且仅仅被传输一次,这是大家所期望的。

Kafka保证事务的机制:

  • 内部重试:Producer幂等处理
  • 多分区原子写入
  • 避免僵尸实例

零拷贝

java nio中的channel.transforTo()方法,底层系统调用是sendfile。将磁盘上的一个文件块发送到网络上需要经过这样的几步:

  1. OS将数据从磁盘读入到内核空间的页缓存
  2. 应用程序将数据从内核空间读入到用户空间的缓存中
  3. 应用程序将数据写回到内核空间的socket缓存中
  4. OS将数据从socket缓冲区复制到网卡缓冲区,以便将数据发送出去

以上数据经过了4次拷贝,如果采用零拷贝机制则只有以下几步:

  1. OS将数据从磁盘读入到内核空间的页缓存
  2. 将数据的位置和长度信息的描述符增加内核空间(socket缓冲区)
  3. OS将数据从内核拷贝到网卡缓冲区,以便将数据经过网络发出

数据只经过了2次拷贝!零拷贝指的是:内核空间和用户空间之间的拷贝次数是0。

Kafka零拷贝

整合Flume完成数据采集

Flume cdh版本的soure-sink配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# avro-memory-kafka.conf
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = memory-channel

avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = hadoop001
avro-memory-kafka.sources.avro-source.port = 44444

avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.brokerList = hadoop001:9092
avro-memory-kafka.sinks.kafka-sink.topic = hello_topic
avro-memory-kafka.sinks.kafka-sink.batchSize = 5
avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1

avro-memory-kafka.channels.memory-channel.type = memory

avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel
1
2
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/avro-memory-kafka.conf --name avro-memory-kafka -Dflume.root.logger=INFO,console
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/exec-memory-avro.conf --name exec-memory-avro -Dflume.root.logger=INFO,console

Spark-Streaming环境搭建

下载scala2.11.8配置$SCALA_HOME环境变量。

Hbase的配置

配置$HBASE_HOME系统变量。

修改conf/hbase-env.sh中的JAVA_HOME并设置HBASE_MANAGES_ZK=false
修改conf/hbase-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<property>
<name>hbase.rootdir</name>
<value>hdfs://hadoop001:8020/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop001:2181</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/home/hadoop/app/tmp</value>
</property>

修改conf/regionservers为hadoop001

启动bin/start-hbase.sh,WebUI地址http://hadoop001:60010

启动hbase客户端

1
2
3
4
5
6
./bin/hbase shell
version
status
create 'member','info','address'
list
describe 'member'

Spark-Streaming核心概念和编程

入门

one stack to rule them all.一栈式解决。

官网的例子

使用spark-submit提交(生产)

1
2
3
nc -lk 9999
spark-submit --master local[2] --class org.apache.spark.examples.streaming.NetworkWordCount --name NetworkWordCount /home/hadoop/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.2.0.jar hadoop001 9999
# nc中输入的一行数据会实时在spark-shell中进行wordcount展示

使用spark-shell提交(开发)

1
spark-shell --master local[2]

将下面的代码粘贴到spark-shell中即可:

1
2
3
4
5
6
7
8
import org.apache.spark.streaming.{Seconds, StreamingContext}
val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream("hadoop001", 9999)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()

Spark Streaming接收到实时数据流,把数据按照指定时间段切成一片片的小数据块,把小数据块交给SparkEngine处理。

SparkStreaming工作原理

Spark应用程序运行在Driver端,Driver端会要求Executor启动一些接收器,Receiver启动之后会将InputStream拆分成小的Block存放在内存中(如果设置了多副本的话还会拷贝到其他机器),之后Receiver会将block信息高速StreamingContext,每隔指定的时间周期StreamingContext会通知SparkContext应该启动一些jobs了,接下来SparkContext将jobs分发到Executor执行。

DStreams:

Discretized Streams (DStreams): a DStream is represented by a continuous series of RDDs, a DStream is represented by a continuous series of RDDs, which is Spark’s abstraction of an immutable, distributed dataset.

InputStreams & Receivers:

Every input DStream (except file stream, discussed later in this section) is associated with a Receiver (Scala doc, Java doc) object which receives the data from a source and stores it in Spark’s memory for processing.

处理Socket数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* SparkStreaming处理Socket数据
* 测试:nc -lk 6789 并输入数据
*/
object NetworkWordCount {

// Exception in thread "main" java.lang.NoSuchMethodError: com.fasterxml.jackson.module.scala.deser.BigDecimalDeserializer$.handledType()Ljava/lang/Class;
// 在pom中添加com.fasterxml.jackson依赖(jar包冲突)
// java.lang.NoClassDefFoundError: net/jpountz/util/SafeUtils
//
def main(args: Array[String]) {
// 设置为local或者local[1]无法执行,因为只有一个线程获取不到资源(从socket接收数据需要启动一个Receiver)
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val lines = ssc.socketTextStream("localhost", 6789)
val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
result.print()

ssc.start()
ssc.awaitTermination()

}
}

处理文件数据

1
2
3
4
5
6
7
8
9
10
11
12
13
object FileWordCount {
def main(args: Array[String]) {
// 不需要Receiver,因此local没有问题
val sparkConf = new SparkConf().setMaster("local").setAppName("FileWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
// tmp/ss目录中的文件一旦被生成,向文件中追加内容将不会被处理
val lines = ssc.textFileStream("file:///tmp/ss")
val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
result.print()
ssc.start()
ssc.awaitTermination()
}
}

window:定时进行一个时间段内的数据处理。