Hive&Spark知识汇总

发布时间:2022-06-21 发布网站:脚本宝典
脚本宝典收集整理的这篇文章主要介绍了Hive&Spark知识汇总脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。

最近将之前所做项目查阅的资料进行汇总,比较琐碎,希望有些知识点或者想法能帮助到你。

参考文献地址:https://blog.csdn.net/u011412768/article/details/93404921https://bLOG.csdn.net/a2639491403/article/details/80044121https://www.jianshu.COM/p/cb0fec7a4f6dhttps://blog.csdn.net/weixin_38292570/article/details/108814678https://www.jianshu.com/p/604f5fd39ba6?appinstall=0https://www.cnblogs.com/lq0310/p/9842078.htMLhttps://www.jianshu.com/p/321034864bdb

spark笔记:a resilient distributed dataset (RDD)1. 实现对pyspark读取的hive数据内容,遍历分析DF = spark.SQL()对df进行collcet操作,它返回<list>,之后就可以遍历list参考代码1:From pyspark.sql import SparkSessionspark_session = SparkSession.builder .appName('knowlEdgedict-dataframe') .master('local') .getOrCreate()

df = spark_session.createDataFrame( schema=['id', 'query', 'cnt'], data=[ (1, '北京房价', 3456), (2, '三亚旅游', 789), (3, '美国大选', 12) ])df.show()dfc = df.collect()PRint(tyPE(dfc))for row in dfc: print(type(row), row, row.id, row['query'])

2.创建SparkSessionfrom pyspark.sql import SparkSessionspark = SparkSession.builder.appName("get-train-data").config("spark.sql.shuffle.partITions", 3200).enableHiveSupport().getOrCreate()sc = spark.sparkContexthc = HiveContext(sc)

&nbsp;

datas = ['9,test1,男,31', '10,test2,女,12']# 将datas数据进行map操作转换rdd = sc.parallelize(datas)rdd = rdd.map(lambda x:x.split('t'))rdd = rdd.map(lambda x:[int(a), int(b)])# 构建一个sch的类型sch = StructType([ StructField('geek_id', IntegerType(), True) StructField('expect_id', IntegerType(), True) ])# 将rdd,sch转换成dataFramedf = spark.createDataFrame(rdd, sch)# 创建一个视图,可以将数据df.createOrReplaceTempview('tmpv')# 查询这个视图spark.sql("select * from tmpv").show()# 将查询内容插入到该表中saprk.sql("insert overwrite table nlp.test select * from tmpv")

arsenal平台使用DAG连接图方式完成作业是有原因的,不同的数据表、特征抽取、合并计算等都是按照各个子功能来分工协作的多个功能放到一个任务中,很容易爆内存,该方式不可取

遍历操作:方法1:使用data.collect()构建data list,生成query score 部分,两个小时处理160w方法2:pyspark.RDD.toLocalIterator(), 该方法返回一个包含RDD中所有元素的迭代器,这个迭代器消耗的内存和这个RDD中最大分区的内存一样大。用这个函数可以方便地将RDD中的数据转换为一个迭代器,方便的进行遍历操作。list(df.toLocalIterator())

groupBy之后可以使用后期正常方法(https://www.jianshu.com/p/604f5fd39ba6?appinstall=0)gdf = df.grupBy(df.column_name)聚合函数 agg(), 聚合计算并返回为DataFrame

Spark中dataframe里data.drop()和data.na.drop()的区别1)data.drop() 丢弃列, data.na.drop() 丢弃行,返回一个丢弃含有null/NaN的行后的新dataframe

# 将pandas数据转换为spark的dataframe格式保存到hive中,pandas中的数据分隔符必须为“,”1. 将pd数据-> spark dataframespark = SparkSession.builder.appName("get-train-data").config("spark.sql.shuffle.partitions", 3200).enableHiveSupport().getOrCreate()data_py = spark.createDataFrame(df)2.创建一个表sql = "create table if not exists nlp_dev.spark_table_tmp (geek_id Bigint, exp_id bigint) Stored as parquet;"spark.sql(sql)3.向数据库中的表存入数据data_py.write.format("parquet").mode("overwrite").saveAsTable("nlp_dev.spark_table_tmp")

# PySpark中的DataFrame可以通过toPandas()函数转换成Python的Pandas DataFrame结构。这两者的主要区别是,pandas的操作都是在单个结点上执行的,而PySpark运行在多台机器上,因此在处理大量数据时,PySpark会比Pandas快数倍以上。# spark显示数据spark_df.select("c1").show()# pandas.DataFrame 转换成 spark.DataFramespark_df = spark.createDataFrame(df)# spark.DataFrame 转换成 pandas.DataFramepandas_df = spark_df.toPandas()# 使用csv方法从csv文件中读取并创建一个DataFrame对象,此时的file_path是hadoop上的路径df = spark.read.csv(file_path)df.printSchema()df.show()# 去重操作df.select('user_id').distinct().show()# dataFrame 转 rdddataframe = spark.createDataFrame(RDD)rdd1 = df.rddrdd2 = spark_df.rdd.map(lambda x:x)数据:11223,1223232,2444422233,539854spark数据转换为rdd后的形式:rdd1:[Row(11223=32, 12232=244442), Row(11223=2233, 12232=539854)]

数据存储在本地和数据库中的表里没有啥区别

# spark缺失数据处理fillna: df.na.fill()dropna: df.na.drop()# pyspark 的map-reduce操作。df.map(func); df.reduce(func), 返回类型:seqRDDs

 

在Spark SQL中,如果你想拥有一个临时的view,并想在不同的Session中共享,而且在application的运行周期内可用,那么就需要创建一个全局的临时view。并记得使用的时候加上global_temp作为前缀来引用它,因为全局的临时view是绑定到系统保留的数据库global_temp上。

① 创建一个普通的view和一个全局的view

df.createOrReplaceTempView("emp1")

df.createGlobalTempView("emp2")

② 在当前会话中执行查询,均可查询出结果。

spark.sql("select * from emp1").show

spark.sql("select * from global_temp.emp2").show

③ 开启一个新的会话,执行同样的查询

spark.newSession.sql("select * from emp1").show (运行出错)

spark.newSession.sql("select * from global_temp.emp2").show

cron定时任务0 15 10 ? * * 每天上午10:15触发

dataframe 转 rdd方法地址: https://blog.csdn.net/weixin_43668299/article/details/103847767

class pyspark.sql.Row:DataFrame中的一行,其中的字段可以像属性一样访问。Row可以用来通过使用命名参数来创建一个行对象,字典将按名称排序,学习地址:https://www.jianshu.com/p/103406e59cd3

pyspark的parallelize作用:分发一个本地python集合形成一个rddfrom pyspark.slq import Row: 作用是对等的将数据库中的样本映射到rdd数据,将每一个rdd数据封装到list中,方便读取,实现分布式操作(rdd数据不能直接显示,需要用collect(),行动操作对RDD进行计算,并把结果返回到驱动器程序中。)rdd.map 出来的数据: PythonRDD[90] at RDD at PythonRDD.scala:53, 类型是:<class 'pyspark.sql.types.Row'>,采用自定义函数返回类型也一定要是该类型rdd.map.collect() 出来的数据: [('a', 1), ('e', 1), ('b', 1), ('c', 1)]使用Map对原始的Row数据操作,出入lambda函数后,将Row转为字典,每个字段操作完成再封装成Row返回即可示例:def do_task(line): line = line.asDict() line["query"]=line["query"] + "_map" return Row(**line)

脚本宝典总结

以上是脚本宝典为你收集整理的Hive&Spark知识汇总全部内容,希望文章能够帮你解决Hive&Spark知识汇总所遇到的问题。

如果觉得脚本宝典网站内容还不错,欢迎将脚本宝典推荐好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。