c语言sscanf函数的用法是什么
277
2022-11-15
深入剖析 Delta Lake: schema validation
介绍
Delta Lake 是 Spark 的开源存储层,可提供 ACID 事务和其他数据管理功能,用于机器学习和其他大数据工作,其通过写和快照隔离之间的乐观并发控制,在写入数据期间提供一致性的读取,从而为构建在 HDFS 和云存储上的数据湖(data lakes)带来可靠性。Delta Lake 内置数据版本控制,方便读取历史数据以及版本回滚。
Spark 的每一个 DataFrame 都包含一个 schema,用来定义数据的形态,例如数据类型、列信息以及元数据。在 Delta Lake 中,表的 schema 通过 JSON 格式存储在事务日志中,在数据湖 delta lake 中会自动对插入(更新)的数据与目标表进行 schema 约束,下面就简单介绍一下
schema validation
schema validation 使用以下规则来确定 merge 操作是否可以执行
对于update 和 insert 操作,指定的目标列必须存在于目标增量表中对于updateAll 和 insertAll 操作 源数据集必须具有目标 delta lake 表的所有列,源数据集可以拥有的其它列,但是这些列在 merge 时会被忽略。对于update、insert、updateAll、insertAll操作,如果产生目标列的表达式生成的数据类型与 delta lake 表中的对应列不同,将尝试将它们强转为表中的类型
了解完 schema validation 的规则,我们来实际操作一下
schema validation作用
由于 Schema 约束是一种严格的校验,因此可以用于已清洗、转化完成的数据,保证数据不受污染。而对于一些日志数据,可能并不需要这么严格。
规则验证
首先 new 一个 sparkSession 对象和一个 keyVal 对象
= SparkSession.builder() .appName("delta-test") .master("local[4]") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .getOrCreate();
public class KeyVal implements Serializable { private String key; private String val; //ignore get、 set、constructor }
然后向 data/schema-table 路径插入两条数据
//table path String tablePath = "data/schema-table";List
执行结果为
+---+---+|key|val|+---+---+| 2| 2|| 1| 1|+---+---+
验证update/insert 操作
为了验证 update 、insert 操作的,我们进行了两次操作,第一次根据 update.key = target.key 来判断要插入哪些数据、更新哪些数据,插入的字段和更新的字段都是在目标 delta lake 表存在的 第二次我们打算插入 key=100、value=100 的数据,只不过插入的字段名从val 变成了 value
= DeltaTable.forPath(tablePath);System.out.println("初始数据");deltaTable.toDF().show();List
执行结果如下
验证字段多余和类型不匹配
在这里新建了一个 KeyVal2 对象,该对象相比较 KeyVal 多了一个 id 字段,在这里我打算把 key=1 的 val 更新为1000,插入一条 key=3,val=3 的数据。 请注意这句代码:dataset.selectExpr("id", "key", "cast(val as long)").as("update") 这句代码包含了此次的验证,新增了 id 字段,并且把 val 的 string 类型强转为了 long 类型
= DeltaTable.forPath(tablePath);System.out.println("初始数据");deltaTable.toDF().show();List
从结果中可以看到,程序正常执行了,并且把 key=1 的 val 更新为1000,插入了一条 key=3,val=3 的数据。而我们 KeyVal2 的 id 字段被忽略更新
验证updateAll、inesrtAll
为了验证updateAll、inesrtAll操作中 delta lake 表中的字段不存在于目标集合,我把验证多字段的例子中dataset.selectExpr("id", "key", "cast(val as long)").as("update")代码改为了dataset.selectExpr("key","val as value").as("update") ,也就是说把 val 字段替换为 value 字段
从异常中可以发现,报错内容是目标表中的 val 字段在源数据集 update 中不存在
看完这些有些同学该问了,如果我想通过源数据集的字段,自动向 delta lake 表中新增字段怎么办?当然也是可以的,delte lake 允许用户能够方便地修改表的当前 schema,来适应不断变化的数据。篇幅有限,下篇文章会介绍下 Automatic schema evolution
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~