您的位置:首页技术文章
文章详情页

在python中使用pyspark读写Hive数据操作

【字号: 日期:2022-07-22 14:21:20浏览:9作者:猪猪

1、读Hive表数据

pyspark读取hive数据非常简单,因为它有专门的接口来读取,完全不需要像hbase那样,需要做很多配置,pyspark提供的操作hive的接口,使得程序可以直接使用SQL语句从hive里面查询需要的数据,代码如下:

from pyspark.sql import HiveContext,SparkSession _SPARK_HOST = 'spark://spark-master:7077'_APP_NAME = 'test'spark_session = SparkSession.builder.master(_SPARK_HOST).appName(_APP_NAME).getOrCreate() hive_context= HiveContext(spark_session ) # 生成查询的SQL语句,这个跟hive的查询语句一样,所以也可以加where等条件语句hive_database = 'database1'hive_table = 'test'hive_read = 'select * from {}.{}'.format(hive_database, hive_table) # 通过SQL语句在hive中查询的数据直接是dataframe的形式read_df = hive_context.sql(hive_read)

2 、将数据写入hive表

pyspark写hive表有两种方式:

(1)通过SQL语句生成表

from pyspark.sql import SparkSession, HiveContext _SPARK_HOST = 'spark://spark-master:7077'_APP_NAME = 'test' spark = SparkSession.builder.master(_SPARK_HOST).appName(_APP_NAME).getOrCreate() data = [ (1,'3','145'), (1,'4','146'), (1,'5','25'), (1,'6','26'), (2,'32','32'), (2,'8','134'), (2,'8','134'), (2,'9','137')]df = spark.createDataFrame(data, [’id’, 'test_id', ’camera_id’]) # method one,default是默认数据库的名字,write_test 是要写到default中数据表的名字df.registerTempTable(’test_hive’)sqlContext.sql('create table default.write_test select * from test_hive')

(2)saveastable的方式

# method two # 'overwrite'是重写表的模式,如果表存在,就覆盖掉原始数据,如果不存在就重新生成一张表# mode('append')是在原有表的基础上进行添加数据df.write.format('hive').mode('overwrite').saveAsTable(’default.write_test’)

tips:

spark用上面几种方式读写hive时,需要在提交任务时加上相应的配置,不然会报错:

spark-submit --conf spark.sql.catalogImplementation=hive test.py

补充知识:PySpark基于SHC框架读取HBase数据并转成DataFrame

一、首先需要将HBase目录lib下的jar包以及SHC的jar包复制到所有节点的Spark目录lib下

二、修改spark-defaults.conf 在spark.driver.extraClassPath和spark.executor.extraClassPath把上述jar包所在路径加进去

三、重启集群

四、代码

#/usr/bin/python#-*- coding:utf-8 ?*- from pyspark import SparkContextfrom pyspark.sql import SQLContext,HiveContext,SparkSessionfrom pyspark.sql.types import Row,StringType,StructField,StringType,IntegerTypefrom pyspark.sql.dataframe import DataFrame sc = SparkContext(appName='pyspark_hbase')sql_sc = SQLContext(sc) dep = 'org.apache.spark.sql.execution.datasources.hbase'#定义schemacatalog = '''{ 'table':{'namespace':'default', 'name':'teacher'}, 'rowkey':'key', 'columns':{ 'id':{'cf':'rowkey', 'col':'key', 'type':'string'}, 'name':{'cf':'teacherInfo', 'col':'name', 'type':'string'}, 'age':{'cf':'teacherInfo', 'col':'age', 'type':'string'}, 'gender':{'cf':'teacherInfo', 'col':'gender','type':'string'}, 'cat':{'cf':'teacherInfo', 'col':'cat','type':'string'}, 'tag':{'cf':'teacherInfo', 'col':'tag', 'type':'string'}, 'level':{'cf':'teacherInfo', 'col':'level','type':'string'} } }''' df = sql_sc.read.options(catalog = catalog).format(dep).load() print (’***************************************************************’)print (’***************************************************************’)print (’***************************************************************’)df.show()print (’***************************************************************’)print (’***************************************************************’)print (’***************************************************************’)sc.stop()

五、解释

数据来源参考请本人之前的文章,在此不做赘述

schema定义参考如图:

在python中使用pyspark读写Hive数据操作

六、结果

在python中使用pyspark读写Hive数据操作

以上这篇在python中使用pyspark读写Hive数据操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持好吧啦网。

标签: Python 编程
相关文章: