脚本宝典收集整理的这篇文章主要介绍了SparkStreaming接收Flume数据方式一:Push模式,脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。
Push模式说明
Flume 被 用 于 在 Flume agents 之 间 推 送 数 据 . 在 这 种 方 式 下 ,Spark Streaming 可以很方便的建立一个 receiver,起到一个 Avro agent 的作用.Flume 可以将数据推送到改 receiver.
spooldir
memory
avro
# 定义agent的三大组件
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#定义source
a1.sources.r1.tyPE=spooldir
a1.sources.r1.spoolDir=/training/nginx/LOGs/flumeLogs
a1.sources.r1.fileHeader=true
# 定义sink
a1.sinks.k1.type=avro
a1.sinks.k1.channel=c1
a1.sinks.k1.hostname=192.168.88.1
a1.sinks.k1.port=1234
# 定义channel
a1.channels.c1.type=;memory
a1.channels.c1.capacITy=10000
a1.channels.c1.transactionCapacity= 100
# 定义三者关系
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
程序功能说明:
编写程序,代码如下:
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object FlumeDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumeDemo")
val streamingContext = new StreamingContext(sparkConf, Seconds(2))
// 192.168.88.1为Windows中虚拟机网卡vmnet8的IP地址
val flumeEvent: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createstream(streamingContext, "192.168.88.1", 2345,StorageLevel.MEMORY_AND_disk_SER)
// 查看接收到的原始数据
val flumeDStream: DStream[String] = flumeEvent.map(e => {
PRintln(new String(e.event.getBody.array()))
new String(e.event.getBody.array())
})
// 分词,组合
val mapDStream: DStream[(Int,Int)] = flumeDStream.map(x => {
val strings = x.trim.split(",")
val salary = strings(5).toInt
val deptNo = strings(7).toInt
(deptNo,salary)
})
// 实现统计
val resultDStream: DStream[(Int, Int)] = mapDStream.reduceByKey(_ + _)
// 打印结果
resultDStream.print()
// 启动实时计算
streamingContext.start()
// 等待计算结束
streamingContext.awaitTermination()
}
}
特别注意:
程序中的IP地址为Windows上的虚拟机网卡即vmnet8的IP地址,如下图所示:
实验成功关键
:需关闭Windows的防火墙,代码中的IP地址为Windows中虚拟机网卡vmnet8的IP地址
,虚拟机中flume启动后需要访问到该地址,故需要关闭Windows防火墙。关闭后,如下图所示:
运行上述编写的程序,如下所示:
启动Flume,如下命令:
flume-ng agent -n a1 -f conf/a3.conf -DFlume.root.logger=INFO,console
启动成功后,如下所示:
将文件emp.csv(如没有,则先上传到虚拟机中)复制到Fume的source所对应的目录下(/training/nginx/logs/flumeLogs),如下所示:
cp /root/emp.csv /training/nginx/logs/flumeLogs/
查看程序是否读取到数据并完成数据的处理,结果如下图所示:
以上是脚本宝典为你收集整理的SparkStreaming接收Flume数据方式一:Push模式全部内容,希望文章能够帮你解决SparkStreaming接收Flume数据方式一:Push模式所遇到的问题。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。