基于 RDD 的 API基于 DataFrame/Dataset 的 APISpark 中有本质的不同,主要体现在以下几个方面:


1. 数据表示方式

RDD DataFrame/Dataset
非结构化/半结构化,存储的是原始的 Java/Scala/Python 对象 结构化,数据以列式存储(类似于关系型数据库的表)
没有内置的 Schema(字段名和类型) 自带 Schema(字段名 + 数据类型)
数据以 (key, value) 等基本形式存储 数据以 Row 对象存储,并带有列名

示例:

  • RDD("apple", 3)(只是一个元组,Spark 不知道它的含义)
  • DataFrameRow(word="apple", count=3)(明确知道 word 是字符串,count 是整数)

2. 优化方式

RDD DataFrame/Dataset
无优化,直接按代码逻辑执行 Catalyst 优化器 自动优化执行计划(如谓词下推、列裁剪、Join 优化等)
需要手动优化(如 persist()repartition() 自动优化存储格式(如 Parquet 列式存储)
Tungsten 二进制存储(较晚引入) 默认使用 Tungsten 二进制格式,减少序列化开销

示例:

1
2
3
4
5
# RDD 版本(无优化)
rdd.filter(lambda x: x > 10).map(lambda x: x * 2).collect()

# DataFrame 版本(Catalyst 优化)
df.filter(df["value"] > 10).select(df["value"] * 2).collect()

DataFrame 的优化器可能会合并 filterselect,减少数据扫描量。


3. API 风格

RDD DataFrame/Dataset
函数式编程mapfilterreduceByKey 声明式 SQL 风格selectwheregroupBy
需要手动编写 lambda 表达式 可以使用 SQL 表达式 或 DSL(领域特定语言)
适合复杂业务逻辑 适合结构化数据处理

示例:

1
2
3
4
5
# RDD 方式(函数式)
rdd.map(lambda x: (x[0], x[1] * 2)).filter(lambda x: x[1] > 10)

# DataFrame 方式(SQL 风格)
df.select("word", df["count"] * 2).where(df["count"] > 10)

4. 执行效率

RDD DataFrame/Dataset
JVM 对象序列化(较慢) Tungsten 二进制格式(减少序列化开销)
无执行计划优化 Catalyst 优化器 生成最优执行计划
适合低层次控制 适合大规模数据分析

性能对比:

  • DataFrame 通常比 RDD 快 2-10 倍(特别是聚合、Join 等操作)。
  • RDD 更适合 细粒度控制(如自定义分区、复杂计算)。

5. 适用场景

RDD DataFrame/Dataset
非结构化数据(文本、日志) 结构化/半结构化数据(JSON、CSV、Parquet)
需要自定义计算逻辑 需要 SQL 查询或聚合分析
低层次 API(如机器学习底层) 高层次 API(如 Spark SQL、MLlib)

总结

特性 RDD DataFrame
数据格式 原始对象 结构化(带 Schema)
优化方式 无优化 Catalyst + Tungsten
API 风格 函数式 SQL/DSL
执行效率 较慢 较快(优化后)
适用场景 非结构化数据、复杂逻辑 结构化数据、分析查询

推荐:

  • 优先使用 DataFrame/Dataset(性能更好,代码更简洁)。
  • 仅在需要精细控制时用 RDD(如自定义分区、复杂业务逻辑)。