您的位置:首页技术文章
文章详情页

Java API如何实现向Hive批量导入数据

浏览:6日期:2022-08-08 18:06:32
Java API实现向Hive批量导入数据

Java程序中产生的数据,如果导入oracle或者mysql库,可以通过jdbc连接insert批量操作完成,但是当前版本的hive并不支持批量insert操作,因为需要先将结果数据写入hdfs文件,然后插入Hive表中。

package com.enn.idcard; import java.io.IOException;import java.sql.Connection;import java.sql.DriverManager;import java.sql.SQLException;import java.sql.Statement;import java.util.ArrayList;import java.util.List; import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;/** * <p>Description: </p> * @author kangkaia * @date 2017年12月26日 下午1:42:24 */public class HiveJdbc { public static void main(String[] args) throws IOException { List<List> argList = new ArrayList<List>();List<String> arg = new ArrayList<String>();arg.add('12345');arg.add('m');argList.add(arg);arg = new ArrayList<String>();arg.add('54321');arg.add('f');argList.add(arg);//System.out.println(argList.toString());String dst = '/test/kk.txt';createFile(dst,argList);loadData2Hive(dst); } /** * 将数据插入hdfs中,用于load到hive表中,默认分隔符是'001' * @param dst * @param contents * @throws IOException */ public static void createFile(String dst , List<List> argList) throws IOException{Configuration conf = new Configuration();FileSystem fs = FileSystem.get(conf);Path dstPath = new Path(dst); //目标路径//打开一个输出流FSDataOutputStream outputStream = fs.create(dstPath);StringBuffer sb = new StringBuffer();for(List<String> arg:argList){for(String value:arg){sb.append(value).append('001');}sb.deleteCharAt(sb.length() - 4);//去掉最后一个分隔符sb.append('n');}sb.deleteCharAt(sb.length() - 2);//去掉最后一个换行符byte[] contents = sb.toString().getBytes();outputStream.write(contents);outputStream.close();fs.close();System.out.println('文件创建成功!'); } /** * 将HDFS文件load到hive表中 * @param dst */ public static void loadData2Hive(String dst) { String JDBC_DRIVER = 'org.apache.hive.jdbc.HiveDriver'; String CONNECTION_URL = 'jdbc:hive2://server-13:10000/default;auth=noSasl'; String username = 'admin';String password = 'admin';Connection con = null;try {Class.forName(JDBC_DRIVER);con = (Connection) DriverManager.getConnection(CONNECTION_URL,username,password);Statement stmt = con.createStatement();String sql = ' load data inpath ’'+dst+'’ into table population.population_information ';stmt.execute(sql);System.out.println('loadData到Hive表成功!');} catch (SQLException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();}finally {// 关闭rs、ps和conif(con != null){try {con.close();} catch (SQLException e) {e.printStackTrace();}}}} }注意:

本例使用mvn搭建,conf配置文件放在src/main/resources目录下。

Hive提供的默认文件存储格式有textfile、sequencefile、rcfile等。用户也可以通过实现接口来自定义输入输的文件格式。

在实际应用中,textfile由于无压缩,磁盘及解析的开销都很大,一般很少使用。Sequencefile以键值对的形式存储的二进制的格式,其支持针对记录级别和块级别的压缩。rcfile是一种行列结合的存储方式(text file和sequencefile都是行表[row table]),其保证同一条记录在同一个hdfs块中,块以列式存储。一般而言,对于OLTP而言,行表优势大于列表,对于OLAP而言,列表的优势大于行表,特别容易想到当做聚合操作时,列表的复杂度将会比行表小的多,虽然单独rcfile的列运算不一定总是存在的,但是rcfile的高压缩率确实减少文件大小,因此实际应用中,rcfile总是成为不二的选择,达观数据平台在选择文件存储格式时也大量选择了rcfile方案。

通过hdfs导入hive的表默认是textfile格式的,因此可以改变存储格式,具体方法是先创建sequencefile、rcfile等格式的空表,然后重新插入数据即可。

insert overwrite table seqfile_table select * from textfile_table; ……insert overwrite table rcfile_table select * from textfile_table;java 批量插入hive中转在HDFS

稍微修改了下,这文章是通过将数据存盘后,加载到HIVE.

模拟数据放到HDFS然后加载到HIVE,请大家记得添加HIVE JDBC依赖否则会报错。

加载前的数据表最好用外部表,否则会drop表的时候元数据会一起删除!

<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>1.1.0</version> </dependency>

代码

import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;import java.sql.Connection;import java.sql.DriverManager;import java.sql.SQLException;import java.sql.Statement;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;public class Demo { public static void main(String[] args) throws Exception { List<List> argList = new ArrayList<List>();List<String> arg = new ArrayList<String>();arg.add('12345');arg.add('m');argList.add(arg);arg = new ArrayList<String>();arg.add('54321');arg.add('f');argList.add(arg);//System.out.println(argList.toString());String dst = '/test/kk.txt';createFile(dst,argList);//loadData2Hive(dst); } /** * 将数据插入hdfs中,用于load到hive表中,默认分隔符是'|' * @param dst * @param contents * @throws IOException * @throws Exception * @throws InterruptedException */ public static void createFile(String dst , List<List> argList) throws IOException, InterruptedException, Exception{Configuration conf = new Configuration();FileSystem fs = FileSystem.get(new URI('hdfs://hadoop:9000'),conf,'root');Path dstPath = new Path(dst); //目标路径//打开一个输出流FSDataOutputStream outputStream = fs.create(dstPath);StringBuffer sb = new StringBuffer();for(List<String> arg:argList){for(String value:arg){sb.append(value).append('|');}sb.deleteCharAt(sb.length() - 1);//去掉最后一个分隔符sb.append('n');}byte[] contents = sb.toString().getBytes();outputStream.write(contents);outputStream.flush();;outputStream.close();fs.close();System.out.println('文件创建成功!'); } /** * 将HDFS文件load到hive表中 * @param dst */ public static void loadData2Hive(String dst) { String JDBC_DRIVER = 'org.apache.hive.jdbc.HiveDriver'; String CONNECTION_URL = 'jdbc:hive2://hadoop:10000/default'; String username = 'root';String password = 'root';Connection con = null;try {Class.forName(JDBC_DRIVER);con = (Connection) DriverManager.getConnection(CONNECTION_URL,username,password);Statement stmt = con.createStatement();String sql = ' load data inpath ’'+dst+'’ into table test ';//test 为插入的表stmt.execute(sql);System.out.println('loadData到Hive表成功!');} catch (SQLException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();}finally {// 关闭rs、ps和conif(con != null){try {con.close();} catch (SQLException e) {e.printStackTrace();}}}} }

以上为个人经验,希望能给大家一个参考,也希望大家多多支持好吧啦网。

标签: Java
相关文章: