在对应用进行集群化的时候,对于应用如何规划所使用的cpu核心数量有一个不成文的规定:总核心数量-1。因为操作系统需要消耗一些cpu,如果我们的应用占用了全部的cpu,一旦os需要处理一些其他任务,就会因为没有空闲的核而强制进行上下文切换,会减慢应用的处理速度。

MapReduce模型

map操作中将每个单词设置为1
shuffle过程中将key相同的分到同一个reduce上

HDFS架构

NameNode和DataNode

NameNode和DataNode

blocksize:128M
130M会被拆分成2个block:128M+2M

NameNode:

  • 客户端请求
  • 元数据管理(文件名称、副本系数、block存放的DataNode)

DataNode:

  • 存储用户文件对应的数据块
  • 定期向NameNode发送心跳信息,汇报自身及其所有的block信息,健康状况

SecondNameNode在2.x中已经不是必须的了。

副本机制

hdfs-site.xml中的dfs.replication

  • replication factor:副本因子会被存放在NameNode中
  • 一个文件的所有block大小都一样,除了最后一个

Hadoop环境搭建

用户名hadoop,密码hadoop,/home/hadoop目录结构:

software:存放软件安装包
lib:开发的jar包
app:所有软件的安装目录,子目录tmp
data:测试数据目录
source:框架源码目录

http://archive.cloudera.com/cdh5/cdh/5/
https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_vd_cdh_package_tarball_57.html 2.6.0-cdh5.7.0
jdk7u51-linux-x64
scala2.11.8

修改主机名和hosts文件(/etc/sysconfig/network和etc/hosts)

主机名:hadoop001 - 192.168.100

/etc/sysconfig/network
NETWORKING=yes
HOSTNAME=hadoop001

配置ssh免密登陆(本步骤可以省略,但是重启hadoop进程的时候需要手动输入密码):

1
2
3
4
ssh-keygen -t rsa
cp ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys
# 可以测试是否可以免密登陆本机
ssh hadoop001

由于大数据开发会打开许多FD,修改最大打开文件数量和进程数量:

1
2
3
4
5
# 以下内容添加到 /etc/security/limits.conf 文件后面
* hard nofile 10000
* soft nofile 10000
* hard nproc 10000
* soft nproc 10000

核心配置文件

配置文件路径/home/hadoop/app/hadoop-2.6.0-cdh5.7.0/etc/hadoop

hadoop-env.sh修改JAVA_HOME
core-site.xml

1
2
3
4
5
6
7
8
9
10
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop001:8020</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/home/hadoop/app/tmp</value>
</property>
</configuration>

hdfs-site.xml

1
2
3
4
5
6
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>

格式化HDFS和启停

bin客户端脚本目录,sbin服务端脚本目录

1
2
./bin/hdfs namenode -format
./sbin/start-dfs.sh

使用jps -m命令验证是否启成功:

2042 SecondaryNameNode
1890 DataNode
1571 NameNode

关闭防火墙

1
2
3
chkconfig iptables off # 禁止开机自启动
chkconfig --list
/etc/init.d/iptables stop

可以访问http://hadoop001:50070。端口信息参见hdfs-default.xml的dfs.namenode.http-address,NameNode的WebUI。可以查看集群上的datanode信息,浏览hdfs上的文件,浏览日志信息。

HDFS Shell常用操作

1
2
3
4
5
6
7
8
9
hadoop fs -ls /
hadoop fs -mkdir /test
hadoop fs -mkdir -p /a/b
hadoop fs -ls -R /
hadoop fs -put hdfs.cmd /test
hadoop fs -text /test/hdfs.cmd
hadoop fs -get /test/hdfs.cmd a_tmp
hadoop fs -rm /test/hdfs.cmd
hadoop fs -rm -r /a

集群模式安装

在一台机器上进行安装

以下操作都在hadoop001上进行

1
2
3
4
5
# 在hadoop001上执行下面的代码会远程登录hadoop001,002,002,输入密码之后即可免密登陆
# ssh-copy-id -i ~/.ssh/id_rsa.pub "-p 10022 user@server" 解决默认端口问题
ssh-copy-id -i ~/.ssh/id_rsa.pub hadoop001
ssh-copy-id -i ~/.ssh/id_rsa.pub hadoop002
ssh-copy-id -i ~/.ssh/id_rsa.pub hadoop003

分布式和伪分布式仅仅多了个yarn的resourcemanager的hostname,还有slave配置文件
hadoop001 yarn-site.xml

1
2
3
4
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hadoop001</value>
</property>

slaves

1
2
3
hadoop001
hadoop002
hadoop003

将001上的安装包分发到其他节点

1
2
3
4
scp -r ~/app hadoop@hadoop002:~/
scp -r ~/app hadoop@hadoop003:~/
scp ~/.bash_profile hadoop@hadoop002:~/
scp ~/.bash_profile hadoop@hadoop003:~/

MapReduce

核心概念:

使用Combiner加速WordCount
使用Partitioner指定reducer的输出

  • Split:交由MR作业处理的数据块,是MR中最小的计算单元;HDFS中的blocksize是HDFS中最小的存储单元,默认128M。默认情况下Split和hdfs中的block一一对应。
  • InputFormat:将输入数据进行分片(Split):InputSplit[] getSplit(JobConf job,int numSplits)TextInputFormat
  • OutputFormat:输出。
  • Combiner:可以看成Map节点本地的Reducer,可以预先对数据进行一部分归并,对于求和这样的操作可以有效减少网络传输。
  • Partitioner:有几个partition对应几个reducer作业的输出。
  • jobhistory:记录已经完成的MR信息到指定的HDFS目录下,默认是不开启的。

开启job history后可以通过yarn webui 查看

mapred-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<property>
<name>mapreduce.jobhistory.address</name>
<value>hadoop001:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hadoop001:19888</value>
</property>
<property>
<name>mapreduce.jobhistory.done-dir</name>
<value>/history/done</value>
</property>
<property>
<name>mapreduce.jobhistory.intermediate-done-dir</name>
<value>/history/done_intermediate</value>
</property>

yarn-site.xml

1
2
3
4
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>

启动history server

1
mr-jobhistory-daemon.sh start historyserver

HDFS不适合小文件的存储,如果小文件非常多则集群性能会非常低,namenode维护太多元数据。所以一般情况下集群上的小文件都会定期清理合并成大文件。MapReduce只适合离线批处理。

YARN简介

hadoop 1.x中JobTracker存在单点且职能过多:

  1. 资源管理
  2. 任务调度

TaskTracker和JobTacker之间通过心跳维护。资源利用率低,运维成本高。

架构

Yarn架构

1 * ResourceMananager + n * NodeManager

通常活跃的ResourceManager只有一个(可以做主备,只有主挂掉之后standby才会激活),负责整个集群资源的管理和调度。

  1. 处理客户端请求(启动/杀死)
  2. 启动/监控ApplicationMaster(一个作业对应一个AM)
  3. 监控NodeManager
  4. 系统的资源分配和调度

NodeManager整个集群中有N个,负责单个节点的资源管理和使用以及task的运行情况。

  1. 定期向RM汇报本节点的资源使用请求和各个Container的运行状态
  2. 接收并处理RM的container启停的各种命令
  3. 单个节点的资源管理和任务管理

ApplicationMaster每个应用/作业对应一个,负责应用程序的管理

  1. 数据切分
  2. 为应用程序向RM申请资源(container),并分配内部任务
  3. 与NM通信以启停task,task运行在container中
  4. task的监控和容错

Container对任务运行情况进行描述:cpu、memory

执行流程

作业在yarn上的执行流程

  1. 向yarn提交作业
  2. RM为该作业分配第一个container(AM)
  3. RM会与对应的NM通信,要求NM在这个container上启动应用的AM
  4. AM首先会向RM注册,然后AM将为各个任务申请资源并监控运行情况
  5. AM采用轮训的方式通过RPC协议向RM申请和领取资源
  6. AM申请到资源后便和对应的NM通信要求NM启动任务
  7. NM启动我们作业对应的task

环境搭建

etc/hadoop/mapred-site.xml

1
2
3
4
5
6
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>

etc/hadoop/yarn-site.xml

1
2
3
4
5
6
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
1
./sbin/start-yarn.sh

验证是否启动成功

jps

1610 NodeManager
1919 Jps
1532 ResourceManager

Yarn WebUI地址可以看到资源的使用情况。端口配置参见yarn-default.xml中的yarn.resourcemanager.webapp.address

提交mr作业到yarn上运行:wordcount

hadoop 默认提供了wordcount,位于/home/hadoop/app/hadoop-2.6.0-cdh5.7.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar

1
2
3
4
5
6
7
hadoop fs -mkdir -p /input/wc
hadoop fs -put data/hello.txt /input/wc
hadoop fs -ls -R /
hadoop fs -text /input/wc/hello.txt

# 后面的两个参数指定输入文件和输出目录
hadoop jar app/hadoop-2.6.0-cdh5.7.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar wordcount /input/wc/hello.txt /output/wc
1
2
3
hadoop fs -ls -R /output/wc
# 查看作业运行结果
hadoop fs -text /output/wc/hello.txt/part-r-00000
1
hadoop jar hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar pi 2 5

大数据仓库HIVE

fb开发的构建与hadoop之上用于解决海量结构化的日志数据统计问题。定义了HQL(类SQL)

几乎所有的大数据场景都可以使用MapReduce来完成,HIVE的出现时将HQL转化为MapReduce作业。极大降低大数据技术在数据分析领域的门槛。

产生背景:

  1. mapreduce编程的不便性
  2. HDFS上的文件非结构化,不便于查询

通常用于离线数据处理,HIVE底层的执行引擎:MapReduce,Tez,Spark。hive on mapreduce的意思是hive的执行引擎是mapreduce。

压缩:gzip,lzo,snappy,bzip2
存储:TextFile,SequenceFile,RCFile,ORC,Parquet
UDF:自定义函数

环境搭建

只需部署一个节点即可。

配置HIVE_HOME环境变量

1
2
export HIVE_HOME=/home/hadoop/app/hive-1.1.0-cdh5.7.0
PATH=$HIVE_HOME/bin:$PATH

注意:需要事先安装mysql。yum install -y mysql-server mysql && service mysqld restartmysqladmin -u root password 123456。5.1.73默认安装后root密码为空。配置mysql开机自启动在/etc/rc.d/rc.local中添加service mysqld restart

1
2
3
cd /home/hadoop/app/hive-1.1.0-cdh5.7.0/conf
cp hive-env.sh.template hive-env.sh
touch hive-site.xml

编辑hive-env.sh,设置HADOOP_HOME

hive-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>

<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://localhost:3306/sparksql?createDatabaseIfNotExist=true</value>
</property>

<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>

<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>

<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
</property>

</configuration>

注意需要将mysql驱动放到$HIVE_HOME/lib

启动hive(确保mysql和hdfs已经启动了)

1
./bin/hive

基本操作

1
2
3
4
5
6
7
8
-- 建表
create table hive_wordcount(context string);
-- 加载数据
load data local inpath '/home/hadoop/data/hello.txt' into table hive_wordcount;
-- 查询
select * from hive_wordcount;
-- lateral view explode 将每行记录按照指定分隔符进行拆解
select word,count(1) from hive_wordcount lateral view explode(split(context,'\t')) wc as word group by word;

hql提交执行之后会生成MR作业并在Yarn上运行,可以在Yarn的管理界面上看到。

1
2
3
4
5
6
7
8
create table emp(empno int,ename string,job string,mgr int,hiredate string,sal double,comm double,deptno int)ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
create table dept(deptno int,dname string,location string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

load data local inpath '/home/hadoop/data/emp.txt' into table emp;
load data local inpath '/home/hadoop/data/dept.txt' into table dept;

-- 求每个部门的人数
select deptno,count(1) from emp group by deptno;

Spark及其生态圈

为什么快?提供了DAG的执行引擎,支持数据流处理和内存计算。MapReduce中的Map和Reduce是基于进程的(需要维护进程的启动和销毁),而Spark中的job是基于线程池的。

MapReduce效率低下的原因:map的结果会落地到磁盘(不能充分发挥内存威力),经过shuffle然后reduce;每个job(map或者reduce)的粒度都是进程级别(JVM的启动和销毁代价很高)。

环境搭建

下载源码包spark-2.1.0.tgz解压后放到/home/hadoop/source目录。文档地址http://spark.apache.org/docs/2.1.0/building-spark.html

查看源码包中的pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<properties>
<java.version>1.7</java.version>
<maven.version>3.3.9</maven.version>
<sbt.project.name>spark</sbt.project.name>
<slf4j.version>1.7.16</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<hadoop.version>2.2.0</hadoop.version>
<protobuf.version>2.5.0</protobuf.version>
<yarn.version>${hadoop.version}</yarn.version>
</properties>

<profile>
<id>hadoop-2.6</id>
<properties>
<hadoop.version>2.6.4</hadoop.version>
<jets3t.version>0.9.3</jets3t.version>
<zookeeper.version>3.4.6</zookeeper.version>
<curator.version>2.6.0</curator.version>
</properties>
</profile>
1
2
3
4
5
6
# 通过命令行的-D参数覆盖pom.xml中的hadoop.version
# -P参数指定使用pom.xml中的那个profile的id
mvn -Pyarn -Phadoop-2.6 -Phive -Phive-thriftserver -Dhadoop.version=2.6.0-cdh5.7.0 -DskipTests clean package

# 编译一个可以部署的包(推荐使用)
./dev/make-distribution.sh --name 2.6.0-cdh5.7.0 --tgz -Pyarn -Phadoop-2.6 -Phive -Phive-thriftserver -Dhadoop.version=2.6.0-cdh5.7.0 -X

参考:

pom.xml添加cdh版的maven仓库

1
2
3
4
<repository>
<id>cloudera</id>
<url>http://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>

spark源码编译的那些坑

编译完成后会在当前spark源码目录下生成spark-2.1.0-bin-2.6.0-cdh5.7.0.tgz,接下来解压到安装目录即可

Local 模式

1
2
# 把SPARK_HOME添加到环境变量
spark-shell --master local[2]

Spark context Web UI available at http://192.168.1.100:4040。

SparkShell
local[2]的含义

standalone模式

1
cp spark-env.sh.template spark-env.sh

spark的standalone模式架构和hadoop的hdfs/yarn很类似,都是1*master + n*woker.spark-env.sh配置:

1
2
3
4
SPARK_MASTER_HOST=hadoop001
SPARK_WORKER_CORES=2
SPARK_DAEMON_MEMORY=1g
SPARK_WORKER_INSTANCES=2 # worker的数量
1
./sbin/start-all.sh

worker启动报错

1
2
localhost: failed to launch: nice -n 0 /home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/bin/spark-class org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://hadoop001:7077
localhost: JAVA_HOME is not set

解决方案:sbin/spark-config.sh末尾添加:export JAVA_HOME=/home/hadoop/app/jdk1.7.0_51

通过spark-shell连接到SparkMaster

1
2
spark-shell --master spark://hadoop001:7077
spark-shell --help

spark-shell --jars /home/hadoop/software/mysql-connector-java-5.1.23-bin.jar --master local[2]将配置好mysql数据源的hive-site.xml拷贝到spark的conf目录下,使用这个可以使用spark连接到hive。

连接完成之后查看http://192.168.1.200:8080/可以发现Running Applications中有了一个应用,用了4个cores,状态是RUNNING,如果此时再次启动一个spark-shell,由于申请不到core,所以不会运行,状态是WAITING。

wordcount

使用spark-shell连接到spark master在repl中执行:

1
2
3
val file = spark.sparkContext.textFile("file:///home/hadoop/data/wc.txt")
val wordCounts = file.flatMap(line => line.split(",")).map((word => (word, 1))).reduceByKey(_ + _)
wordCounts.collect

开发中建议采用local模式,方便快捷,无需搭建集群。

Spark SQL概述

SQL on Hadoop的解决方案:

  • Hive: 将sql转化为mapreduce作业,metastore,Facebook
  • impala: cloudera,cdh(建议生产环境使用的hadoop版本),自己的守护进程执行,非mr,metastore
  • presto: 基于sql,facebook,京东
  • drill:sql,访问:hdfs,rdbms,json,hbase,mongodb,s3,hive
  • spark sql:dataframe/dataset api,metastore,支持外部数据源:hdfs,rdbms,json,hbase,mongodb,s3,hive

向spark提交应用程序

spark-sql的应用并不仅限于sql,还能访问hive,json,parquet文件。提供了sql,DataFrame,Dataset api,比spark-code中RDD更高级的api。

idea拷贝全类名,右键Copy Reference

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.spark

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext

/**
* SQLContext的使用
*
* 编译打包 mvn clean package -DskipTests
* 上传jar包到服务器 scp target/sql-1.0.jar hadoop@hadoop001:~/lib
*/
object SQLContextApp {

def main(args: Array[String]) {

val path = args(0)

// 1.创建SparkContext
val sparkConf = new SparkConf()

// 在测试或者生产环境中,AppName和Master是我们通过脚本进行指定的
// sparkConf.setAppName("SQLContextApp").setMaster("local[2]")

val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)

// 2.相关处理(json文件)
// json文件来源spark-2.1.0/examples/src/main/resources/people.json
val people = sqlContext.read.format("json").load(path)
people.printSchema()
people.show()
// 3.关闭资源
sc.stop()

}

}
1
2
3
4
5
spark-submit \
--class com.spark.SQLContextApp \
--master local[2] \
/home/hadoop/lib/sql-1.0.jar \
file:///home/hadoop/source/spark-2.1.0/examples/src/main/resources/people.json

SparkSQL Shell的使用

SparkSQL连接Hive报错找不到表并且使用内置数据库Derby

spark-sql连接hive需要将hive-site.xml复制到spark/conf目录下

1
2
3
4
5
# 启动完成后spark.sql("select * from emp e join dept d on e.deptno = d.deptno").show
spark-shell --jars /home/hadoop/software/mysql-connector-java-5.1.23-bin.jar --master local[2]

# 打开spark-sql shell连接到hive之后可以直接执行sql,和上面的等效
spark-sql --jars /home/hadoop/software/mysql-connector-java-5.1.23-bin.jar --master local[2]

查看执行计划

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
spark-sql> table t(key string,value string);
spark-sql> explain extended select a.key * (2+3),b.value from t a join t b on a.key = b.key and a.key > 3;
== Parsed Logical Plan ==
'Project [unresolvedalias(('a.key * (2 + 3)), None), 'b.value]
+- 'Join Inner, (('a.key = 'b.key) && ('a.key > 3))
:- 'UnresolvedRelation `t`, a
+- 'UnresolvedRelation `t`, b

== Analyzed Logical Plan ==
(CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE)): double, value: string
Project [(cast(key#55 as double) * cast((2 + 3) as double)) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#59, value#58]
+- Join Inner, ((key#55 = key#57) && (cast(key#55 as double) > cast(3 as double)))
:- SubqueryAlias a
: +- MetastoreRelation default, t
+- SubqueryAlias b
+- MetastoreRelation default, t

== Optimized Logical Plan ==
Project [(cast(key#55 as double) * 5.0) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#59, value#58]
+- Join Inner, (key#55 = key#57)
:- Project [key#55]
: +- Filter (isnotnull(key#55) && (cast(key#55 as double) > 3.0))
: +- MetastoreRelation default, t
+- Filter (isnotnull(key#57) && (cast(key#57 as double) > 3.0))
+- MetastoreRelation default, t

== Physical Plan ==
*Project [(cast(key#55 as double) * 5.0) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#59, value#58]
+- *SortMergeJoin [key#55], [key#57], Inner
:- *Sort [key#55 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key#55, 200)
: +- *Filter (isnotnull(key#55) && (cast(key#55 as double) > 3.0))
: +- HiveTableScan [key#55], MetastoreRelation default, t
+- *Sort [key#57 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(key#57, 200)
+- *Filter (isnotnull(key#57) && (cast(key#57 as double) > 3.0))
+- HiveTableScan [key#57, value#58], MetastoreRelation default, t
Time taken: 0.252 seconds, Fetched 1 row(s)
18/10/18 21:22:20 INFO CliDriver: Time taken: 0.252 seconds, Fetched 1 row(s)
spark-sql> 18/10/18 21:22:20 INFO TaskSetManager: Finished task 0.0 in stage 8.0 (TID 8) in 15 ms on localhost (executor driver) (1/1)
18/10/18 21:22:20 INFO TaskSchedulerImpl: Removed TaskSet 8.0, whose tasks have all completed, from pool

Spark-SQL架构图

ThriftServer & Beeline的使用

1
2
3
./sbin/start-thriftserver.sh --master local[2] --jars /home/hadoop/software/mysql-connector-java-5.1.23-bin.jar
# 使用客户端beeline连接即可进行sql操作
./bin/beeline -u jdbc:hive2://localhost:10000 -n hadoop

执行sql后查看http://192.168.1.100:4040/sqlserver/可以发现执行过的sql详细信息,对程序的调优非常有帮助。

ThriftServer和普通的spark-shell/spark-sql的不同:前者不管启动多少个客户端(beeline或者jdbc代码方式)都是一个spark application,只会在启动server的时候申请一次资源,多个客户端可以共享数据。

DataFrame & RDD

A Dataset is a distributed collection of data.A DataFrame is a Dataset organized into named columns.

DataFrame不是SparkSQL提出的,而是在R,Pandas语言中已有的。DataFrame是加了schame(列名,列类型,列值)的RDD。有了schema信息可以在编译的时候做更多的优化。

RDD:
java/scala ==> jvm
python ==> python runtime

DataFrame
java/scala/python => Logic Plan

因此如果语言不同,使用RDD编程会有效率差异,而使用DataFrame则没有区别。

基本操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("DataFrameApp").master("local[2]").getOrCreate()
// load的参数path支持本地fs和分布式fs
// json => DataFrame
val peopleDF = spark.read.format("json").load("file:///Users/yiihua-013/tmp/people.json")
// show columns
peopleDF.printSchema()
// select * from table limit 20
peopleDF.show()
// 查询某列 select name from table
peopleDF.select("name").show()
// select name,age + 10 as age2 from table
peopleDF.select(peopleDF.col("age"), (peopleDF.col("age") + 10).as("age2")).show()
// select * from table where age > 19
peopleDF.filter(peopleDF.col("age") > 19).show()
// select age,count(1) from table group by age
peopleDF.groupBy("age").count().show()
spark.stop()
}

DataFrame和RDD的互操作

DataFrame和RDD的互操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
object DataFrameRDDApp {

// DataFrame 与 RDD的互操作
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("DataFrameApp").master("local[2]").getOrCreate()

inferReflection(spark)
// program(spark)

spark.stop()
}

/**
* 把文本文件通过编程的方式转换成DataFrame
*/
def program(spark: SparkSession): Unit = {
val rdd = spark.sparkContext.textFile("file:///Users/yiihua-013/Downloads/tmp/infos.txt")
val rowRDD = rdd.map(_.split(","))
.map(line => Row(line(0).toInt, line(1), line(2).toInt))
val structType = StructType(
Array(
StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("age", IntegerType, true)
)
)
val infoDF = spark.createDataFrame(rowRDD, structType)
infoDF.printSchema()
infoDF.show()
}

/**
* 使用反射来推断包含特定数据类型的RDD的元数据
*/
def inferReflection(spark: SparkSession): Unit = {
// RDD => DataFrame
val rdd = spark.sparkContext.textFile("file:///Users/yiihua-013/Downloads/tmp/infos.txt")
// 需要导入隐式转换
import spark.implicits._
val infoDF = rdd.map(_.split(","))
.map(line => Info(line(0).toInt, line(1), line(2).toInt))
.toDF()
infoDF.show()

// 通过DataFrame API
infoDF.filter(infoDF.col("age") > 30).show()

// 将DataFrame注册为临时表之后就可以通过sql的方式进行查询了
infoDF.createOrReplaceTempView("infos")
spark.sql("select * from infos where age > 30").show()
}

case class Info(id: Int, name: String, age: Int)

}
  • 反射:case class,需要事先知道字段和字段类型
  • 编程:Row,事先不知道列

优先选择反射的方式。

Dataset > DataFrame > SQL,可以更早抛出错误,可以在编译时发现错误而不是在运行时。

Dataset&DataFrame&SQL

列式存储示意图

列式存储的优点:每一列可以单独压缩,访问同列有更小的IO。

大数据调优(忽略掉一些不必要的信息):

  • 列式存储,列裁剪
  • 分区裁剪
  • 使用min/max统计信息
  • 条件下压到数据源(例如将where下压到jdbc)

其实DataFrame和SQL都会采用Catalyst引擎使用上面的策略进行优化,运行效率是一样的。

数据处理流程

  • 数据采集:Flume,日志写入到HDFS
  • 数据清洗:Spark,Hive,MapReduce或者其他分布式计算框架清理脏数据,清理之后的数据可以放在HDFS上(Hive,SparkSQL)
  • 数据处理:Spark,Hive,MapReduce或者其他分布式计算框架,按照业务进行统计分析
  • 处理结果入库:结果可以放在RDBMS,NoSQL
  • 数据可视化展示:Echarts,HUE,Zeppelin

Spark on Yarn

spark支持4种运行模式:

local:本地开发
standalone:spark自带的,如果一个集群是standalone的话,那么就需要在多台机器上部署spark环境,比较麻烦
yarn:生产环境推荐使用,统一使用yarn进行整个集群作业(MapReduce,Spark)的资源调度
mesos:国内用的不多

不管使用什么模式,spark应用程序的代码是一模一样的,只需要在提交的时候使用--master参数即可指定运行模式即可。

client:Dirver运行在client端(提交spark作业的机器),client会和请求到的container进行通信来完成作业的调度和执行,client是不能退出的。日志信息在控制台输出,便于测试
cluster:Driver运行在ApplicationMaster中,client只要提交完作业就可以关掉,因为作业已经在spark上运行了。日志只能通过yarn logs -applicationId application_1540378428255_0002来查看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--executor-memory 1G \
--total-executor-cores 1 \
/home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.1.0.jar \
4

./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
--executor-memory 1G \
--total-executor-cores 1 \
/home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.1.0.jar \
4

大数据压缩格式的可分割性在hadoop中很重要如果支持分割,就可以分隔成多块并行执行task(执行作业时map的启动个数)。

  • bzip2压缩效果最好,但是压缩速度慢,支持分割
  • gzip压缩和解压比bzip2快,不支持分割
  • lzo压缩效果不如bzip2和gzip,但压缩和解压速度最好,支持分割

参数调优

1
2
spark.sql.shuffle.partitions=300 # 并行度
spark.sql.sources.partitionColumnTypeInference.enabled=false # 禁用分区字段类型推测

大数据处理神器Beam可以使用相同的代码可以运行在MR,Spark,Flink上。

常见问题: