QQ个性网:专注于分享免费的QQ个性内容

关于我们| 网站公告| 广告服务| 联系我们| 网站地图

搜索
编程 JavaScript Java C++ Python SQL C Io ML COBOL Racket APL OCaml ABC Sed Bash Visual Basic Modula-2 Logo Delphi IDL Groovy Julia REXX Chapel X10 Forth Eiffel C# Go Rust PHP Swift Kotlin R Dart Perl Ruby TypeScript MATLAB Shell Lua Scala Objective-C F# Haskell Elixir Lisp Prolog Ada Fortran Erlang Scheme Smalltalk ABAP D ActionScript Tcl AWK IDL J PostScript IDL PL/SQL PowerShell

大数据之谜Spark基础篇,Spark实现WordCount实例内幕详解

日期:2025/04/01 06:46来源:未知 人气:57

导读:Spark实现WordCount实例内幕详解先回顾一下前面几节,我们已经讲解Spark的作用与优劣、基本原理与核心RDD特征。在有这些基础后,我们来进入一个实战,基于最典型的WordCount实例来综合运用前面所学到的知识点,并进一步详解。对学习技术方面比较喜欢使用图来分析知识点,有图有真相。首先,编写第一个Spark应用程序 ,我们是如何建立起来的,其入口在哪里呢,需要创建两......

Spark实现WordCount实例内幕详解

先回顾一下前面几节,我们已经讲解Spark的作用与优劣、基本原理与核心RDD特征。在有这些基础后,我们来进入一个实战,基于最典型的WordCount实例来综合运用前面所学到的知识点,并进一步详解。对学习技术方面比较喜欢使用图来分析知识点,有图有真相。

首先,编写第一个Spark应用程序 ,我们是如何建立起来的,其入口在哪里呢,需要创建两个对象。

一: val conf = new SparkConf()

.setAppName("WordCount")

.setMaster("local")

创建SparkConf对象,设置Spark应用的配置信息。setAppName() 设置Spark应用程序在运行中的名字;如果是集群运行,就可以在监控页面直观看到我们运行的job任务。setMaster() 设置运行模式、是本地运行,设置为local即可;如果是集群运行,就可以设置程序要连接的Spark集群的master节点的url。

二:val sc = new SparkContext(conf)

创建SparkContext对象, 在Spark中,SparkContext是Spark所有功能的一个入口,你无论是用java、scala,甚至是python编写,都必须要有一个SparkContext,它的主要作用,包括初始化Spark应用程序所需的一些核心组件,包括调度器(DAGSchedule、TaskScheduler),还会去Spark Master节点上进行注册等。所以SparkContext在Spark应用中是很重要的一个对象。

Spark实现WordCount实例执行流程图

图1-Spark实现WordC

我们先看图中是由两大部分组成,一是Spark基于内存分布式计算集群,二是下面的Scala代码部分。

现在假设我们HDFS上有一个数据文件data.txt文件,需要对其进行WordCount统计计算,如果有对各种算子不了解的同学,也不要紧,代码结合运行流程图一步一步分享。

第A步:val lines = sc.textFile("hdfs://") ,主要功能是加载HDFS中的数据文件进入Spark本地或是集群计算,这里我们使用的是SparkContext的textFile算子,加载后的数据将以每行记录组成元素,元素类型为String。这时我们假设数据被分为3个RDD进入Spark集群的不同节点1、2、3。RDD由Hadoop中HDFS数据文件转变为MappedRDD,这也是创建RDD的一种方式。

第B步:val words = lines.flatMap { line => line.split(" ") } ,主要是对MappedRDD中的每个元素(也就是每一行)进行操作。这里使用transformation中的flatMap算子,作用是可以将一个map数据集转变成为flap数据集,即数据扁平化处理。这里也就是将输入文件的每一行数据,按空格(" ")进行拆分,得到单词数组,再将数组进行扁平化后形成单词字符串,在flatMapRDD中。RDD由MappedRDD转变为flatMappedRDD,这又是创建RDD的一种方式,由RDD创建RDD。

第C步:val pairs = words.map { word => (word, 1) },主要是将第二步的单词数组flatMapRDD中的数据进行标记,即每个行的格式由单个单词转变成<K,V>的形式。这里使用的是map算子,Java编写中略有不同,是mapToPair算子。RDD由flatMappedRDD转变为MapPartitoinsRDD。

第D步:val wordCounts = pairs.reduceByKey { + },主要是将第三步产生的pairs元素的不同RDD中相同key值拉到一起进行value的归并操作。这里你发现一个地方吗?就是RDD由flatMappedRDD转变为MapPartitoinsRDD的时候Hello对应的value变为了2,这是reduceByKey 算子的特性,会在总的归并之前先对每个RDD进行相同可以对应的value执行merge操作(也就是归并相同可以的value),这也是在代码性能优化中优先reduceByKey的原因 ,而不选择GroupByKey。这时RDD由MapPartitoinsRDD转变为shuffleRDD又转变成MappedRDD。

第E步:wordCounts.foreach(wordCount => println("")),对结果数据进行action操作遍历输出到客户端控制台。这里使用foreach算子,该算子属于action类型,每个job任务必须要有一种叫做action类型的算子才能触发应用程序的执行。

图2-DAG经典结构图

DAG有向无环图

有看上一节的同学,应该知道我们提到RDD构建的有向无环图DAG概念,在图1中也有相应的体现,DAG是由RDD通过相关联、相互依赖构成的有向无环图调度器,DAG在调度时候会根据Shuffle阶段将一个job划分为不同的Stage,并且在创建SparkContext的时候就已经进行依赖实例化,每一个SparkContext对应创建一个DAGScheduler(有向无环图调度器),上图我们有一个Shuffle阶段,这样也就是把我们的WordCount任务Job划分为2个Stage,如红色虚线框Stage1和黑色虚线框Stage2。

可以深入了解一下Stage的划分与执行调度

划分Stage是从DAGScheduler中的handleJobSubmitted方法在一次Job任务中触发action算子的RDD生成finalStage。再由finalStage调用getParentStagesAndId方法往前递归RDD的依赖关系,以action算子前面RDD记录为一个Stage,一直往前查找,每当遇到Shuffle阶段,将会取Shuffle前一个的RDD划分为另一个新的Stage,递归执行查找直到最开始的RDD结束。

当这个Job任务RDD划分为Stage完成后,主要是通过DAGScheduler中的submitStage方法用来提交stage,这里有一个getMissingParentStages方法可获取finalStage,如果该Stage存在父调度,将也递归查找,直到所有Stage调度查到完毕,并记录到列表中,已提供运行时将从开始调度Stage进行执行。

Spark实现WordCount实例代码

Scala版:

Java版:

这里我们讲解的是Spark本地运行模式,其运行可以在eclipse或者其他支持的编译器直接调用main运行,也是我们在开发中常用来调试代码块的方式。这里讲解的Spark1.6.x版本。

public class WordCountLocal {

public static void main(String[] args) {

SparkConf conf = new SparkConf()

.setAppName("WordCountLocal")

.setMaster("local");

/** 二:创建SparkContext对象

在Spark中,使用不同语言或是编写不同类型的Spark应用程序,会有一定的不同之处,

上面分析的是使用scala语言,使用的就是原生态的SparkContext对象;

这时编写程序使用Java语言,使用的就是JavaSparkContext对象;

如果是开发Spark SQL程序,那么就是SQLContext、HiveContext对象;

如果是开发Spark Streaming程序,那么就是它独有的SparkContext

*/

JavaSparkContext sc = new JavaSparkContext(conf);

// 第A步:原理基本与Scala相同,只是加载数据后生成的RDD是JavaRDD

JavaRDD lines = sc.textFile("E://data.txt");

// 第B步: 原理基本与Scala相同,只是加载数据后生成的RDD是JavaRDD

JavaRDD words = lines.flatMap(new FlatMapFunction<String, String>() {

private static final long serialVersionUID = 1L;

@Override

public Iterable call(String line) throws Exception {

return Arrays.asList(line.split(" "));

}

});

// 第C步: 原理基本与Scala相同,只是加载数据后生成的RDD是JavaPairRDD

JavaPairRDD<String, Integer> pairs = words.mapToPair(

new PairFunction<String, String, Integer>() {

private static final long serialVersionUID = 1L;

@Override

public Tuple2<String, Integer> call(String word) throws Exception {

return new Tuple2<String, Integer>(word, 1);

}

});

// 第D步: 原理基本与Scala相同,只是加载数据后生成的RDD是JavaPairRDD

JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(

new Function2<Integer, Integer, Integer>() {

private static final long serialVersionUID = 1L;

@Override

public Integer call(Integer v1, Integer v2) throws Exception {

return v1 + v2;

}

});

// 第E步: 原理基本与Scala相同

wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {

private static final long serialVersionUID = 1L;

@Override

public void call(Tuple2<String, Integer> wordCount) throws Exception {

System.out.println(wordCount._1 + "--" + wordCount._2);

}

});

sc.close();

}

简易对比MapReduce实现WordCount应用程序

图-MapReduce实现WordCount执行流程图

MapReduce的执行八步曲

map阶段:

1,读取HDFS文件,并解析每一行成<行偏移量,行内容>(<k,v>)形式。

2,调用map()方法,并接收1中产生的<k,v>数据,进行处理计算得到新的数据结构<k,v>。

3,分区,是对2输出的数据进行分区。

4,Sort与Group,按照key进行排序;分组指的是相同key的value放到一个集合中。

5,归约,将分组后的数据进行归约。

reduce阶段:

6,reduce拉取数据,也就是Shuffle阶段。

7,reduce的合拼与排序,合拼是指将key对应的value数组进行合拼。

8,reduce输出结果数据。

希望对你我他有用

有不足之处请多多指教

欢迎留言评论!!!

欢迎关注大数据之谜

关于我们|网站公告|广告服务|联系我们| 网站地图

Copyright © 2002-2023 某某QQ个性网 版权所有 | 备案号:粤ICP备xxxxxxxx号

声明: 本站非腾讯QQ官方网站 所有软件和文章来自互联网 如有异议 请与本站联系 本站为非赢利性网站 不接受任何赞助和广告