当前位置:网站首页>Data Lake (XII): integration of spark3.1.2 and iceberg0.12.1
Data Lake (XII): integration of spark3.1.2 and iceberg0.12.1
2022-07-19 10:08:00 【Lanson】
Spark3.1.2 And Iceberg0.12.1 Integrate
Spark Can operate Iceberg Data Lake , the Iceberg The version is 0.12.1, This version is similar to Spark2.4 Version is compatible . Because in Spark2.4 Operation in version Iceberg Don't support DDL、 Add partition and partition conversion 、Iceberg Metadata query 、insert into/overwrite Wait for the operation , It is recommended to use Spark3.x Version to integrate Iceberg0.12.1 edition , Here we use Spark The version is 3.1.2 edition .
One 、 towards pom File import depends on
stay Idea Created in Maven project , stay pom Import the following key dependencies into the file :
<!-- Configure the following to solve stay jdk1.8 When packing in an environment, there is an error “-source 1.5 China does not support it. lambda expression ” -->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!-- Spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Spark And Iceberg Integrated dependency packages -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark3</artifactId>
<version>0.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark3-runtime</artifactId>
<version>0.12.1</version>
</dependency>
<!-- avro Format Dependency package -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.2</version>
</dependency>
<!-- parquet Format Dependency package -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.12.0</version>
</dependency>
<!-- SparkSQL -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- SparkSQL ON Hive-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!--<!–mysql Rely on the jar package –>-->
<!--<dependency>-->
<!--<groupId>mysql</groupId>-->
<!--<artifactId>mysql-connector-java</artifactId>-->
<!--<version>5.1.47</version>-->
<!--</dependency>-->
<!--SparkStreaming-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- SparkStreaming + Kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!--<!– towards kafka Production data requires packages –>-->
<!--<dependency>-->
<!--<groupId>org.apache.kafka</groupId>-->
<!--<artifactId>kafka-clients</artifactId>-->
<!--<version>0.10.0.0</version>-->
<!--<!– Compile and test using jar package , No transitivity –>-->
<!--<!–<scope>provided</scope>–>-->
<!--</dependency>-->
<!-- StructStreaming + Kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Scala package -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.14</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.12.14</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.12.14</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.12</version>
</dependency>
<dependency>
<groupId>com.google.collections</groupId>
<artifactId>google-collections</artifactId>
<version>1.0</version>
</dependency>
</dependencies>
Two 、SparkSQL Set up catalog To configure
The following operations are mainly SparkSQL operation Iceberg, Again Spark Two are supported in Catalog Set up :hive and hadoop,Hive Catalog Namely iceberg Table storage use Hive Default data path ,Hadoop Catalog You need to specify the Iceberg Format table storage path .
stay SparkSQL The code specifies the Catalog:
val spark: SparkSession = SparkSession.builder().master("local").appName("SparkOperateIceberg")
// Appoint hive catalog, catalog The name is hive_prod
.config("spark.sql.catalog.hive_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hive_prod.type", "hive")
.config("spark.sql.catalog.hive_prod.uri", "thrift://node1:9083")
.config("iceberg.engine.hive.enabled", "true")
// Appoint hadoop catalog,catalog The name is hadoop_prod
.config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hadoop_prod.type", "hadoop")
.config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")
.getOrCreate()
3、 ... and 、 Use Hive Catalog management Iceberg surface
Use Hive Catalog management Iceberg The default data of the table is stored in Hive Corresponding Warehouse Under the table of contents , stay Hive The corresponding Iceberg surface ,SparkSQL Equivalent to Hive client , Additional Settings required “iceberg.engine.hive.enabled” The attribute is true, Otherwise, in the Hive Corresponding Iceberg Data cannot be queried in the format table .
1、 Create table
// Create table ,hive_pord: Appoint catalog name .default: Appoint Hive Libraries that exist in .test: Created iceberg Table name .
spark.sql(
"""
| create table if not exists hive_prod.default.test(id int,name string,age int) using iceberg
""".stripMargin)
Be careful :
1) Create table time , The name of the table is :{Hive Middle Library name }.${ Created Iceberg Format table name }
2) After the table is created , Can be in Hive The corresponding test surface , Created is Hive appearance , In the corresponding Hive warehouse The corresponding data directory can be seen under the directory .

2、 insert data
// insert data
spark.sql(
"""
|insert into hive_prod.default.test values (1,"zs",18),(2,"ls",19),(3,"ww",20)
""".stripMargin)
3、 Query data
// Query data
spark.sql(
"""
|select * from hive_prod.default.test
""".stripMargin).show()
give the result as follows :

stay Hive Corresponding test Data can also be queried in the table :

4、 Delete table
// Delete table , The data corresponding to the deleted table will not be deleted
spark.sql(
"""
|drop table hive_prod.default.test
""".stripMargin)
Be careful : After deleting the table , The data will be deleted , But the table directory still exists , If the data is completely deleted , You need to delete the corresponding table directory .
Four 、 use Hadoop Catalog management Iceberg surface
Use Hadoop Catalog Management table , You need to specify the corresponding Iceberg Directory where data is stored .
1、 Create table
// Create table ,hadoop_prod: Appoint Hadoop catalog name .default: Specify the library name .test: Created iceberg Table name .
spark.sql(
"""
| create table if not exists hadoop_prod.default.test(id int,name string,age int) using iceberg
""".stripMargin)
Be careful :
1) Create a table named :{ Randomly defined library name }.${Iceberg Format table name }
2) Create table , Will be in hadoop_prod Create the table under the directory corresponding to the name

2、 insert data
// insert data
spark.sql(
"""
|insert into hadoop_prod.default.test values (1,"zs",18),(2,"ls",19),(3,"ww",20)
""".stripMargin)
3、 Query data
spark.sql(
"""
|select * from hadoop_prod.default.test
""".stripMargin).show()

4、 Create the corresponding Hive Table mapping data
stay Hive Execute the following table creation statement in the table :
CREATE TABLE hdfs_iceberg (
id int,
name string,
age int
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://mycluster/sparkoperateiceberg/default/test'
TBLPROPERTIES ('iceberg.catalog'='location_based_table');
stay Hive Query in “hdfs_iceberg” The table data is as follows :

5、 Delete table
spark.sql(
"""
|drop table hadoop_prod.default.test
""".stripMargin)
Be careful : Delete iceberg After the table , Data is deleted , The corresponding library directory exists .
边栏推荐
- Traffic ranking 100W website
- 【东北师范大学】考研初试复试资料分享
- On the problem of dependency invalidation when the dependency in the basic module is inherited by the sub module in the microservice
- 506.相对名次
- UiO-66-(COOH)2改性聚酰胺纳滤膜|ZIF-8/PVP复合纳米纤维膜|UiO-66-NH2改性聚酰胺纳滤膜
- Aller à l'école = gagner de l'argent? L'Académie des fées sans frais de scolarité!
- 2022-07-16: what is the output of the following go language code? A:[]; B:[5]; C:[5 0 0 0 0]; D:[0 0 0 0 0]。 package main imp
- AsyncLocalStorage 的妙用
- 二维数组与稀疏数组之间的转换
- Talking about the informatization planning of industrial enterprises
猜你喜欢
Rhcsa jour 2 7,15
数据包知识
Rhcsa the next day 7.15
Clwy authority management (II) -- user module
mof定制材料|NH(2)-UiO66/rGO氧化石墨烯纳米复合材料|负载亚甲基蓝的ZIF-90纳米粒子
ROV and AUV of underwater vehicle
Learning summary of MySQL advanced Chapter 11: locate SQL methods with slow execution and analyze the use of query statement explain
高性能IO框架库libevent(三):libevent框架函数概述
齐岳供应负载亚甲基蓝的CuMOF纳米晶|原位生长在泡沫镍上FeMOF纳米片|氧化物纳米线/ZIF系MOFs糖葫芦状复合材料
Let, const, VaR in ES6
随机推荐
es索引、类型(mapping)、文档、ik分词器
【OpenCV 例程200篇】233. 区域特征之矩不变量
高性能IO框架库libevent(三):libevent框架函数概述
卫星网络中基于时变图的节能资源分配策略
流量排名100W网站
How to correctly execute jedis unit test
Brilliant use output
浏览器的故事
Chapter 4 - first order multi-agent system consistency - > pilot follower system consistency
Clwy permission management (III) -- user group module
镧系金属有机骨架([email protected])|罗丹明6G修饰MOF材料|过氧化氢酶@ZIF复合材料|mof材料
闲谈工业企业全厂信息化规划
Rasa 3.x 学习系列-Rasa 3.1.5 版本发布
Chapter 4 - consistency of first-order multi-agent systems - > consistency of continuous time systems with time delays
Week 1: introduction to deep learning and foundation of pytorch
氮杂环分子改性UiO-66-NH2|聚乙烯亚胺改性UiO-66-NH2|[email protected]@ZIF67纳米材料
Excel数据插入Mysql数据库可能遇到的问题
TLS四次握手
vc查看内存泄漏
自己创建的模块 使用cmd打开报 ModuleNotFoundError: No module named 解决方案