您的位置:首页技术文章
文章详情页

JAVA spark创建DataFrame的方法

【字号: 日期:2022-08-25 17:52:40浏览:5作者:猪猪

述说正传,接下来开始说正事。

以前用Python和Scala操作Spark的时候比较多,毕竟Python和Scala代码写起来要简洁很多。

今天一起来看看Java版本怎么创建DataFrame,代码写起来其实差不多,毕竟公用同一套API。测试数据可以参考我之前的文章。

先来总结下Spark的一般流程:

1,先创建Spark基础变量,spark,sc

2,加载数据,rdd.textFile,spark.read.csv/json等

3,数据处理,mapPartition, map,filter,reduce等一系列transformation操作

4,数据保存,saveAstextFile,或者其他DataFrame方法

祭出代码

package dev.java;import dev.utils.Utils;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.RowFactory;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.types.StructType;import scala.Tuple2;import java.util.List;public class Spark1 { private static final String fileData = 'seed'; private static final String fileSave = 'result'; private static SparkSession spark = SparkSession.builder().appName('Java-Spark').master('local[*]').config('spark.default.parallelism', 100).config('spark.sql.shuffle.partitions', 100).config('spark.driver.maxResultSize', '3g').getOrCreate(); private static JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); public static void main(String[] args) { Utils.delete(fileSave); // t1(); } private static void t1() { JavaRDD<Row> rdd = sc.textFile(fileData).map(v -> { String[] parts = v.split('t'); return RowFactory.create(parts[0], Long.parseLong(parts[1]));}).filter(v -> v.getLong(1) >= 10000).sortBy(v -> v.getLong(1), false, 100).coalesce(2); Dataset<Row> df = spark.createDataFrame(rdd, StructType.fromDDL('title string, qty long')); df.write().csv(fileSave); spark.stop(); }}

以上就是JAVA操作spark创建DataFrame的方法的详细内容,更多关于JAVA Spark 创建DataFrame的资料请关注好吧啦网其它相关文章!

标签: Java
相关文章: