在对应用进行集群化的时候,对于应用如何规划所使用的cpu核心数量有一个不成文的规定:总核心数量-1。因为操作系统需要消耗一些cpu,如果我们的应用占用了全部的cpu,一旦os需要处理一些其他任务,就会因为没有空闲的核而强制进行上下文切换,会减慢应用的处理速度。
map操作中将每个单词设置为1 shuffle过程中将key相同的分到同一个reduce上
HDFS架构 NameNode和DataNode NameNode和DataNode
blocksize:128M 130M会被拆分成2个block:128M+2M
NameNode:
客户端请求
元数据管理(文件名称、副本系数、block存放的DataNode)
DataNode:
存储用户文件对应的数据块
定期向NameNode发送心跳信息,汇报自身及其所有的block信息,健康状况
SecondNameNode在2.x中已经不是必须的了。
副本机制 hdfs-site.xml中的dfs.replication
replication factor:副本因子会被存放在NameNode中
一个文件的所有block大小都一样,除了最后一个
Hadoop环境搭建 用户名hadoop,密码hadoop,/home/hadoop目录结构:
software:存放软件安装包 lib:开发的jar包 app:所有软件的安装目录,子目录tmp data:测试数据目录 source:框架源码目录
http://archive.cloudera.com/cdh5/cdh/5/ https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_vd_cdh_package_tarball_57.html 2.6.0-cdh5.7.0 jdk7u51-linux-x64 scala2.11.8
修改主机名和hosts文件(/etc/sysconfig/network和etc/hosts)
主机名:hadoop001 - 192.168.100
/etc/sysconfig/network NETWORKING=yes HOSTNAME=hadoop001
配置ssh免密登陆(本步骤可以省略,但是重启hadoop进程的时候需要手动输入密码):
1 2 3 4 ssh-keygen -t rsa cp ~/.ssh/id_rsa.pub ~/.ssh/authorized_keysssh hadoop001
由于大数据开发会打开许多FD,修改最大打开文件数量和进程数量:
1 2 3 4 5 * hard nofile 10000 * soft nofile 10000 * hard nproc 10000 * soft nproc 10000
核心配置文件 配置文件路径/home/hadoop/app/hadoop-2.6.0-cdh5.7.0/etc/hadoop
hadoop-env.sh修改JAVA_HOME core-site.xml
1 2 3 4 5 6 7 8 9 10 <configuration > <property > <name > fs.defaultFS</name > <value > hdfs://hadoop001:8020</value > </property > <property > <name > hadoop.tmp.dir</name > <value > /home/hadoop/app/tmp</value > </property > </configuration >
hdfs-site.xml
1 2 3 4 5 6 <configuration > <property > <name > dfs.replication</name > <value > 1</value > </property > </configuration >
格式化HDFS和启停 bin客户端脚本目录,sbin服务端脚本目录
1 2 ./bin/hdfs namenode -format ./sbin/start-dfs.sh
使用jps -m
命令验证是否启成功:
2042 SecondaryNameNode 1890 DataNode 1571 NameNode
关闭防火墙
1 2 3 chkconfig iptables off chkconfig --list /etc/init.d/iptables stop
可以访问http://hadoop001:50070 。端口信息参见hdfs-default.xml的dfs.namenode.http-address
,NameNode的WebUI。可以查看集群上的datanode信息,浏览hdfs上的文件,浏览日志信息。
HDFS Shell常用操作 1 2 3 4 5 6 7 8 9 hadoop fs -ls / hadoop fs -mkdir /test hadoop fs -mkdir -p /a/b hadoop fs -ls -R / hadoop fs -put hdfs.cmd /test hadoop fs -text /test/hdfs.cmd hadoop fs -get /test/hdfs.cmd a_tmp hadoop fs -rm /test/hdfs.cmd hadoop fs -rm -r /a
集群模式安装 在一台机器上进行安装 以下操作都在hadoop001上进行
1 2 3 4 5 ssh-copy-id -i ~/.ssh/id_rsa.pub hadoop001 ssh-copy-id -i ~/.ssh/id_rsa.pub hadoop002 ssh-copy-id -i ~/.ssh/id_rsa.pub hadoop003
分布式和伪分布式仅仅多了个yarn的resourcemanager的hostname,还有slave配置文件 hadoop001 yarn-site.xml
1 2 3 4 <property > <name > yarn.resourcemanager.hostname</name > <value > hadoop001</value > </property >
slaves
1 2 3 hadoop001 hadoop002 hadoop003
将001上的安装包分发到其他节点 1 2 3 4 scp -r ~/app hadoop@hadoop002:~/ scp -r ~/app hadoop@hadoop003:~/ scp ~/.bash_profile hadoop@hadoop002:~/ scp ~/.bash_profile hadoop@hadoop003:~/
MapReduce 核心概念:
Split:交由MR作业处理的数据块,是MR中最小的计算单元;HDFS中的blocksize是HDFS中最小的存储单元,默认128M。默认情况下Split和hdfs中的block一一对应。
InputFormat:将输入数据进行分片(Split):InputSplit[] getSplit(JobConf job,int numSplits)
。TextInputFormat
OutputFormat:输出。
Combiner:可以看成Map节点本地的Reducer,可以预先对数据进行一部分归并,对于求和这样的操作可以有效减少网络传输。
Partitioner:有几个partition对应几个reducer作业的输出。
jobhistory:记录已经完成的MR信息到指定的HDFS目录下,默认是不开启的。
开启job history后可以通过yarn webui 查看
mapred-site.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 <property > <name > mapreduce.jobhistory.address</name > <value > hadoop001:10020</value > </property > <property > <name > mapreduce.jobhistory.webapp.address</name > <value > hadoop001:19888</value > </property > <property > <name > mapreduce.jobhistory.done-dir</name > <value > /history/done</value > </property > <property > <name > mapreduce.jobhistory.intermediate-done-dir</name > <value > /history/done_intermediate</value > </property >
yarn-site.xml
1 2 3 4 <property > <name > yarn.log-aggregation-enable</name > <value > true</value > </property >
启动history server
1 mr-jobhistory-daemon.sh start historyserver
HDFS不适合小文件的存储,如果小文件非常多则集群性能会非常低,namenode维护太多元数据。所以一般情况下集群上的小文件都会定期清理合并成大文件。MapReduce只适合离线批处理。
YARN简介 hadoop 1.x中JobTracker存在单点且职能过多:
资源管理
任务调度
TaskTracker和JobTacker之间通过心跳维护。资源利用率低,运维成本高。
架构
1 * ResourceMananager + n * NodeManager
通常活跃的ResourceManager只有一个(可以做主备,只有主挂掉之后standby才会激活),负责整个集群资源的管理和调度。
处理客户端请求(启动/杀死)
启动/监控ApplicationMaster(一个作业对应一个AM)
监控NodeManager
系统的资源分配和调度
NodeManager整个集群中有N个,负责单个节点的资源管理和使用以及task的运行情况。
定期向RM汇报本节点的资源使用请求和各个Container的运行状态
接收并处理RM的container启停的各种命令
单个节点的资源管理和任务管理
ApplicationMaster每个应用/作业对应一个,负责应用程序的管理
数据切分
为应用程序向RM申请资源(container),并分配内部任务
与NM通信以启停task,task运行在container中
task的监控和容错
Container对任务运行情况进行描述:cpu、memory
执行流程
向yarn提交作业
RM为该作业分配第一个container(AM)
RM会与对应的NM通信,要求NM在这个container上启动应用的AM
AM首先会向RM注册,然后AM将为各个任务申请资源并监控运行情况
AM采用轮训的方式通过RPC协议向RM申请和领取资源
AM申请到资源后便和对应的NM通信要求NM启动任务
NM启动我们作业对应的task
环境搭建 etc/hadoop/mapred-site.xml
1 2 3 4 5 6 <configuration > <property > <name > mapreduce.framework.name</name > <value > yarn</value > </property > </configuration >
etc/hadoop/yarn-site.xml
1 2 3 4 5 6 <configuration > <property > <name > yarn.nodemanager.aux-services</name > <value > mapreduce_shuffle</value > </property > </configuration >
验证是否启动成功
jps
1610 NodeManager 1919 Jps 1532 ResourceManager
Yarn WebUI地址 可以看到资源的使用情况。端口配置参见yarn-default.xml中的yarn.resourcemanager.webapp.address
提交mr作业到yarn上运行:wordcount
hadoop 默认提供了wordcount,位于/home/hadoop/app/hadoop-2.6.0-cdh5.7.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar
1 2 3 4 5 6 7 hadoop fs -mkdir -p /input/wc hadoop fs -put data/hello.txt /input/wc hadoop fs -ls -R / hadoop fs -text /input/wc/hello.txt hadoop jar app/hadoop-2.6.0-cdh5.7.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar wordcount /input/wc/hello.txt /output/wc
1 2 3 hadoop fs -ls -R /output/wc hadoop fs -text /output/wc/hello.txt/part-r-00000
1 hadoop jar hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar pi 2 5
大数据仓库HIVE
fb开发的构建与hadoop之上用于解决海量结构化的日志数据统计问题。定义了HQL(类SQL)
几乎所有的大数据场景都可以使用MapReduce来完成,HIVE的出现时将HQL转化为MapReduce作业。极大降低大数据技术在数据分析领域的门槛。
产生背景:
mapreduce编程的不便性
HDFS上的文件非结构化,不便于查询
通常用于离线数据处理,HIVE底层的执行引擎:MapReduce,Tez,Spark。hive on mapreduce的意思是hive的执行引擎是mapreduce。
压缩:gzip,lzo,snappy,bzip2 存储:TextFile,SequenceFile,RCFile,ORC,Parquet UDF:自定义函数
环境搭建 只需部署一个节点即可。
配置HIVE_HOME环境变量
1 2 export HIVE_HOME=/home/hadoop/app/hive-1.1.0-cdh5.7.0PATH=$HIVE_HOME /bin:$PATH
注意:需要事先安装mysql。yum install -y mysql-server mysql && service mysqld restart
。mysqladmin -u root password 123456
。5.1.73默认安装后root密码为空。配置mysql开机自启动在/etc/rc.d/rc.local
中添加service mysqld restart
。
1 2 3 cd /home/hadoop/app/hive-1.1.0-cdh5.7.0/confcp hive-env.sh.template hive-env.shtouch hive-site.xml
编辑hive-env.sh,设置HADOOP_HOME
hive-site.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 <?xml version="1.0" ?> <?xml-stylesheet type="text/xsl" href="configuration.xsl" ?> <configuration > <property > <name > javax.jdo.option.ConnectionURL</name > <value > jdbc:mysql://localhost:3306/sparksql?createDatabaseIfNotExist=true</value > </property > <property > <name > javax.jdo.option.ConnectionDriverName</name > <value > com.mysql.jdbc.Driver</value > </property > <property > <name > javax.jdo.option.ConnectionUserName</name > <value > root</value > </property > <property > <name > javax.jdo.option.ConnectionPassword</name > <value > 123456</value > </property > </configuration >
注意需要将mysql驱动放到$HIVE_HOME/lib
。
启动hive(确保mysql和hdfs已经启动了)
基本操作 1 2 3 4 5 6 7 8 create table hive_wordcount(context string);load data local inpath '/home/hadoop/data/hello.txt' into table hive_wordcount; select * from hive_wordcount;select word,count (1 ) from hive_wordcount lateral view explode(split(context,'\t' )) wc as word group by word;
hql提交执行之后会生成MR作业并在Yarn上运行,可以在Yarn的管理界面上看到。
1 2 3 4 5 6 7 8 create table emp(empno int ,ename string,job string,mgr int ,hiredate string,sal double ,comm double ,deptno int )ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ;create table dept(deptno int ,dname string,location string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ;load data local inpath '/home/hadoop/data/emp.txt' into table emp; load data local inpath '/home/hadoop/data/dept.txt' into table dept; select deptno,count (1 ) from emp group by deptno;
Spark及其生态圈 为什么快?提供了DAG的执行引擎,支持数据流处理和内存计算。MapReduce中的Map和Reduce是基于进程的(需要维护进程的启动和销毁),而Spark中的job是基于线程池的。
MapReduce效率低下的原因:map的结果会落地到磁盘(不能充分发挥内存威力),经过shuffle然后reduce;每个job(map或者reduce)的粒度都是进程级别(JVM的启动和销毁代价很高)。
环境搭建 下载源码包spark-2.1.0.tgz解压后放到/home/hadoop/source
目录。文档地址http://spark.apache.org/docs/2.1.0/building-spark.html
查看源码包中的pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 <properties > <java.version > 1.7</java.version > <maven.version > 3.3.9</maven.version > <sbt.project.name > spark</sbt.project.name > <slf4j.version > 1.7.16</slf4j.version > <log4j.version > 1.2.17</log4j.version > <hadoop.version > 2.2.0</hadoop.version > <protobuf.version > 2.5.0</protobuf.version > <yarn.version > ${hadoop.version}</yarn.version > </properties > <profile > <id > hadoop-2.6</id > <properties > <hadoop.version > 2.6.4</hadoop.version > <jets3t.version > 0.9.3</jets3t.version > <zookeeper.version > 3.4.6</zookeeper.version > <curator.version > 2.6.0</curator.version > </properties > </profile >
1 2 3 4 5 6 mvn -Pyarn -Phadoop-2.6 -Phive -Phive-thriftserver -Dhadoop.version=2.6.0-cdh5.7.0 -DskipTests clean package ./dev/make-distribution.sh --name 2.6.0-cdh5.7.0 --tgz -Pyarn -Phadoop-2.6 -Phive -Phive-thriftserver -Dhadoop.version=2.6.0-cdh5.7.0 -X
参考:
pom.xml添加cdh版的maven仓库
1 2 3 4 <repository > <id > cloudera</id > <url > http://repository.cloudera.com/artifactory/cloudera-repos/</url > </repository >
spark源码编译的那些坑
编译完成后会在当前spark源码目录下生成spark-2.1.0-bin-2.6.0-cdh5.7.0.tgz,接下来解压到安装目录即可
Local 模式 1 2 spark-shell --master local [2]
Spark context Web UI available at http://192.168.1.100:4040。
SparkShell local[2]的含义
standalone模式 1 cp spark-env.sh.template spark-env.sh
spark的standalone模式架构和hadoop的hdfs/yarn很类似,都是1*master + n*woker
.spark-env.sh配置:
1 2 3 4 SPARK_MASTER_HOST=hadoop001 SPARK_WORKER_CORES=2 SPARK_DAEMON_MEMORY=1g SPARK_WORKER_INSTANCES=2
worker启动报错
1 2 localhost: failed to launch: nice -n 0 /home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/bin/spark-class org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://hadoop001:7077 localhost: JAVA_HOME is not set
解决方案:sbin/spark-config.sh末尾添加:export JAVA_HOME=/home/hadoop/app/jdk1.7.0_51
通过spark-shell连接到SparkMaster
1 2 spark-shell --master spark://hadoop001:7077 spark-shell --help
spark-shell --jars /home/hadoop/software/mysql-connector-java-5.1.23-bin.jar --master local[2]
将配置好mysql数据源的hive-site.xml拷贝到spark的conf目录下,使用这个可以使用spark连接到hive。
连接完成之后查看http://192.168.1.200:8080/可以发现Running Applications中有了一个应用,用了4个cores,状态是RUNNING,如果此时再次启动一个spark-shell,由于申请不到core,所以不会运行,状态是WAITING。
wordcount 使用spark-shell连接到spark master在repl中执行:
1 2 3 val file = spark.sparkContext.textFile("file:///home/hadoop/data/wc.txt" )val wordCounts = file.flatMap(line => line.split("," )).map((word => (word, 1 ))).reduceByKey(_ + _)wordCounts.collect
开发中建议采用local模式,方便快捷,无需搭建集群。
Spark SQL概述 SQL on Hadoop的解决方案:
Hive: 将sql转化为mapreduce作业,metastore,Facebook
impala: cloudera,cdh(建议生产环境使用的hadoop版本),自己的守护进程执行,非mr,metastore
presto: 基于sql,facebook,京东
drill:sql,访问:hdfs,rdbms,json,hbase,mongodb,s3,hive
spark sql:dataframe/dataset api,metastore,支持外部数据源:hdfs,rdbms,json,hbase,mongodb,s3,hive
向spark提交应用程序
spark-sql的应用并不仅限于sql,还能访问hive,json,parquet文件。提供了sql,DataFrame,Dataset api,比spark-code中RDD更高级的api。
idea拷贝全类名,右键Copy Reference
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package com.sparkimport org.apache.spark.{SparkConf , SparkContext }import org.apache.spark.sql.SQLContext object SQLContextApp { def main (args: Array [String ]) { val path = args(0 ) val sparkConf = new SparkConf () val sc = new SparkContext (sparkConf) val sqlContext = new SQLContext (sc) val people = sqlContext.read.format("json" ).load(path) people.printSchema() people.show() sc.stop() } }
1 2 3 4 5 spark-submit \ --class com.spark.SQLContextApp \ --master local [2] \ /home/hadoop/lib/sql-1.0.jar \ file:///home/hadoop/source/spark-2.1.0/examples/src/main/resources/people.json
SparkSQL Shell的使用 SparkSQL连接Hive报错找不到表并且使用内置数据库Derby
spark-sql连接hive需要将hive-site.xml复制到spark/conf目录下
1 2 3 4 5 spark-shell --jars /home/hadoop/software/mysql-connector-java-5.1.23-bin.jar --master local [2] spark-sql --jars /home/hadoop/software/mysql-connector-java-5.1.23-bin.jar --master local [2]
查看执行计划 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 spark-sql> table t(key string,value string); spark-sql> explain extended select a.key * (2+3),b.value from t a join t b on a.key = b.key and a.key > 3; == Parsed Logical Plan == 'Project [unresolvedalias((' a.key * (2 + 3)), None), 'b.value] +- ' Join Inner, (('a.key = 'b.key) && ('a.key > 3 )) :- 'UnresolvedRelation `t`, a +- ' UnresolvedRelation `t`, b== Analyzed Logical Plan == (CAST(key AS DOUBLE) * CAST((2 + 3 ) AS DOUBLE)): double, value: string Project [(cast(key#55 as double) * cast((2 + 3 ) as double)) AS (CAST(key AS DOUBLE) * CAST((2 + 3 ) AS DOUBLE))#59, value#58] +- Join Inner, ((key#55 = key#57 ) && (cast(key#55 as double) > cast(3 as double))) :- SubqueryAlias a : +- MetastoreRelation default, t +- SubqueryAlias b +- MetastoreRelation default, t == Optimized Logical Plan == Project [(cast(key#55 as double) * 5.0) AS (CAST(key AS DOUBLE) * CAST((2 + 3 ) AS DOUBLE))#59, value#58] +- Join Inner, (key#55 = key#57) :- Project [key#55] : +- Filter (isnotnull(key#55) && (cast(key#55 as double) > 3.0)) : +- MetastoreRelation default, t +- Filter (isnotnull(key#57) && (cast(key#57 as double) > 3.0)) +- MetastoreRelation default, t == Physical Plan == *Project [(cast(key#55 as double) * 5.0) AS (CAST(key AS DOUBLE) * CAST((2 + 3 ) AS DOUBLE))#59, value#58] +- *SortMergeJoin [key#55], [key#57], Inner :- *Sort [key#55 ASC NULLS FIRST], false , 0 : +- Exchange hashpartitioning(key#55, 200) : +- *Filter (isnotnull(key#55) && (cast(key#55 as double) > 3.0)) : +- HiveTableScan [key#55], MetastoreRelation default, t +- *Sort [key#57 ASC NULLS FIRST], false , 0 +- Exchange hashpartitioning(key#57, 200) +- *Filter (isnotnull(key#57) && (cast(key#57 as double) > 3.0)) +- HiveTableScan [key#57, value#58], MetastoreRelation default, t Time taken: 0.252 seconds, Fetched 1 row(s) 18/10/18 21:22:20 INFO CliDriver: Time taken: 0.252 seconds, Fetched 1 row(s) spark-sql> 18/10/18 21:22:20 INFO TaskSetManager: Finished task 0.0 in stage 8.0 (TID 8) in 15 ms on localhost (executor driver) (1/1) 18/10/18 21:22:20 INFO TaskSchedulerImpl: Removed TaskSet 8.0, whose tasks have all completed, from pool
ThriftServer & Beeline的使用 1 2 3 ./sbin/start-thriftserver.sh --master local [2] --jars /home/hadoop/software/mysql-connector-java-5.1.23-bin.jar ./bin/beeline -u jdbc:hive2://localhost:10000 -n hadoop
执行sql后查看http://192.168.1.100:4040/sqlserver/可以发现执行过的sql详细信息,对程序的调优非常有帮助。
ThriftServer和普通的spark-shell/spark-sql的不同:前者不管启动多少个客户端(beeline或者jdbc代码方式)都是一个spark application,只会在启动server的时候申请一次资源,多个客户端可以共享数据。
DataFrame & RDD
A Dataset is a distributed collection of data.A DataFrame is a Dataset organized into named columns.
DataFrame不是SparkSQL提出的,而是在R,Pandas语言中已有的。DataFrame是加了schame(列名,列类型,列值)的RDD。有了schema信息可以在编译的时候做更多的优化。
RDD: java/scala ==> jvm python ==> python runtime
DataFrame java/scala/python => Logic Plan
因此如果语言不同,使用RDD编程会有效率差异,而使用DataFrame则没有区别。
基本操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def main (args: Array [String ]) { val spark = SparkSession .builder().appName("DataFrameApp" ).master("local[2]" ).getOrCreate() val peopleDF = spark.read.format("json" ).load("file:///Users/yiihua-013/tmp/people.json" ) peopleDF.printSchema() peopleDF.show() peopleDF.select("name" ).show() peopleDF.select(peopleDF.col("age" ), (peopleDF.col("age" ) + 10 ).as("age2" )).show() peopleDF.filter(peopleDF.col("age" ) > 19 ).show() peopleDF.groupBy("age" ).count().show() spark.stop() }
DataFrame和RDD的互操作 DataFrame和RDD的互操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 object DataFrameRDDApp { def main (args: Array [String ]) { val spark = SparkSession .builder().appName("DataFrameApp" ).master("local[2]" ).getOrCreate() inferReflection(spark) spark.stop() } def program (spark: SparkSession ): Unit = { val rdd = spark.sparkContext.textFile("file:///Users/yiihua-013/Downloads/tmp/infos.txt" ) val rowRDD = rdd.map(_.split("," )) .map(line => Row (line(0 ).toInt, line(1 ), line(2 ).toInt)) val structType = StructType ( Array ( StructField ("id" , IntegerType , true ), StructField ("name" , StringType , true ), StructField ("age" , IntegerType , true ) ) ) val infoDF = spark.createDataFrame(rowRDD, structType) infoDF.printSchema() infoDF.show() } def inferReflection (spark: SparkSession ): Unit = { val rdd = spark.sparkContext.textFile("file:///Users/yiihua-013/Downloads/tmp/infos.txt" ) import spark.implicits._ val infoDF = rdd.map(_.split("," )) .map(line => Info (line(0 ).toInt, line(1 ), line(2 ).toInt)) .toDF() infoDF.show() infoDF.filter(infoDF.col("age" ) > 30 ).show() infoDF.createOrReplaceTempView("infos" ) spark.sql("select * from infos where age > 30" ).show() } case class Info (id: Int , name: String , age: Int ) }
反射:case class,需要事先知道字段和字段类型
编程:Row,事先不知道列
优先选择反射的方式。
Dataset > DataFrame > SQL,可以更早抛出错误,可以在编译时发现错误而不是在运行时。
列式存储的优点:每一列可以单独压缩,访问同列有更小的IO。
大数据调优(忽略掉一些不必要的信息):
列式存储,列裁剪
分区裁剪
使用min/max统计信息
条件下压到数据源(例如将where下压到jdbc)
其实DataFrame和SQL都会采用Catalyst引擎使用上面的策略进行优化,运行效率是一样的。
数据处理流程
数据采集:Flume,日志写入到HDFS
数据清洗:Spark,Hive,MapReduce或者其他分布式计算框架清理脏数据,清理之后的数据可以放在HDFS上(Hive,SparkSQL)
数据处理:Spark,Hive,MapReduce或者其他分布式计算框架,按照业务进行统计分析
处理结果入库:结果可以放在RDBMS,NoSQL
数据可视化展示:Echarts,HUE,Zeppelin
Spark on Yarn spark支持4种运行模式:
local:本地开发 standalone:spark自带的,如果一个集群是standalone的话,那么就需要在多台机器上部署spark环境,比较麻烦 yarn:生产环境推荐使用,统一使用yarn进行整个集群作业(MapReduce,Spark)的资源调度 mesos:国内用的不多
不管使用什么模式,spark应用程序的代码是一模一样的,只需要在提交的时候使用--master
参数即可指定运行模式即可。
client:Dirver运行在client端(提交spark作业的机器),client会和请求到的container进行通信来完成作业的调度和执行,client是不能退出的。日志信息在控制台输出,便于测试 cluster:Driver运行在ApplicationMaster中,client只要提交完作业就可以关掉,因为作业已经在spark上运行了。日志只能通过yarn logs -applicationId application_1540378428255_0002
来查看
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --executor-memory 1G \ --total-executor-cores 1 \ /home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.1.0.jar \ 4 ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn-cluster \ --executor-memory 1G \ --total-executor-cores 1 \ /home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.1.0.jar \ 4
大数据压缩格式的可分割性在hadoop中很重要如果支持分割,就可以分隔成多块并行执行task(执行作业时map的启动个数)。
bzip2压缩效果最好,但是压缩速度慢,支持分割
gzip压缩和解压比bzip2快,不支持分割
lzo压缩效果不如bzip2和gzip,但压缩和解压速度最好,支持分割
参数调优
1 2 spark.sql.shuffle.partitions=300 # 并行度 spark.sql.sources.partitionColumnTypeInference.enabled=false # 禁用分区字段类型推测
大数据处理神器Beam可以使用相同的代码可以运行在MR,Spark,Flink上。
常见问题: