脚本宝典收集整理的这篇文章主要介绍了Spark中的共享变量,脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。
Application:基于Spark的应用程序,包含了driver程序和 集群上的executor
DriverPRogram:运行main函数并且新建SparkContext的程序
ClusterManager:在集群上获取资源的外部服务(例如 standalone,Mesos,Yarn )
WorkerNode:集群中任何可以运行应用用代码的节点
Executor:是在一个workernode上为某应用用启动的一个进程,该进程负责运行任务,并且负责将数据存在内存或者磁盘上。每个应用用都有各自自独立的executors
Task:被送到某个executor上的执行单元
在Driver端定义:sc.longAccumulator 在算子内部进行累加 在Driver端汇总 累加器支持在所有不同节点之间进行累加计算
在Driver端广播:sc.broadcast() 在算子内部取用,不能进行修改 广播到每个Executor中 用完记得“销毁” 如果直接将数据封装task中,会产生很多副本,增加网络传输的数据量,降低效率,因为task的数量远大于Executor的数量
import java.lang
import org.apache.COMmons.lang3.StringUtils
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
object Demo18ShareVARiable {
def main(args: Array[String]): UnIT = {
val conf: SparkConf = new SparkConf().setAppName(" ").setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val lines: RDD[String] = sc.textFile("spark/data/words2.txt")
//需求:
// 以词频统计WordCount程序为例,处理的数据word2.txt所示,包括非单词符号,
// 做WordCount的同时统计出特殊字符的数量
//创建一个计数器/累加器
val mycounter: LongAccumulator = sc.longAccumulator("mycounter")
//定义一个特殊字符集合
val ruleList: List[String] = List(",", ".", "!", "#", "$", "%", "(", ")")
//将集合作为广播变量广播到各个节点
val broadcast: Broadcast[List[String]] = sc.broadcast(ruleList)
//TODO 2.transformation
val wordcountResult: RDD[(String, Int)] = lines.filter(StringUtils.isNoneBlank(_))
.flatMap(_.split("\s+"))
.filter(ch => {
//获取广播数据
val list: List[String] = broadcast.value
if (list.contains(ch)) { //如果是特殊字符
mycounter.add(1)
false
} else { //是单词
true
}
}).map((_, 1))
.reduceByKey(_ + _)
//TODO 3.sink/输出
wordcountResult.foreach(println)
val chresult: lang.Long = mycounter.value //特殊字符数量
println("特殊字符的数量:"+chResult)
}
}
以上是脚本宝典为你收集整理的Spark中的共享变量全部内容,希望文章能够帮你解决Spark中的共享变量所遇到的问题。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。