在 Scala 中使用 Spark SQL 读取 CSV 文件并写入 MySQL 数据库是一个常见的数据处理任务。以下是实现这一功能的详细步骤和代码示例:
1. 环境准备
确保你已经安装了以下组件:
Apache Spark:用于数据处理。
MySQL 数据库:用于存储数据。
MySQL JDBC 驱动:用于连接 MySQL 数据库。
将 MySQL JDBC 驱动添加到 Spark 的依赖中。如果你使用的是 SBT 构建工具,可以在 build.sbt
文件中添加以下依赖:
scala
复制
libraryDependencies += "mysql" % "mysql-connector-java" % "8.0.33"
2. 代码实现
以下是一个完整的 Scala 程序示例,展示如何读取 CSV 文件并将其写入 MySQL 数据库:
import org.apache.spark.sql.{SparkSession, DataFrame}
object CsvToMySQL {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("CsvToMySQL")
.master("local[*]") // 本地模式,生产环境中可以配置为集群地址
.getOrCreate()
// 设置日志级别
spark.sparkContext.setLogLevel("WARN")
// 读取 CSV 文件
val csvFilePath = "path/to/your/csvfile.csv" // 替换为你的 CSV 文件路径
val df: DataFrame = spark.read
.option("header", "true") // 假设 CSV 文件有表头
.option("inferSchema", "true") // 自动推断数据类型
.csv(csvFilePath)
// 查看读取的数据
df.show()
// 配置 MySQL 数据库连接信息
val jdbcUrl = "jdbc:mysql://localhost:3306/your_database" // 替换为你的数据库地址和数据库名
val jdbcUser = "your_username" // 替换为你的数据库用户名
val jdbcPassword = "your_password" // 替换为你的数据库密码
val jdbcTable = "your_table" // 替换为你的目标表名
// 写入 MySQL 数据库
df.write
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", jdbcTable)
.option("user", jdbcUser)
.option("password", jdbcPassword)
.mode("overwrite") // 如果表已存在,则覆盖
.save()
// 停止 SparkSession
spark.stop()
}
}
3. 代码说明
创建 SparkSession:
SparkSession.builder()
创建一个 SparkSession 构建器。.appName("CsvToMySQL")
设置应用程序名称。.master("local[*]")
设置为本地模式,使用所有可用的 CPU 核心。在生产环境中,可以配置为集群地址。
读取 CSV 文件:
使用
spark.read.csv()
方法读取 CSV 文件。.option("header", "true")
表示 CSV 文件的第一行是表头。.option("inferSchema", "true")
自动推断数据类型。
写入 MySQL 数据库:
使用
df.write.format("jdbc")
指定使用 JDBC 方式写入。.option("url", jdbcUrl)
设置 MySQL 数据库的连接 URL。.option("dbtable", jdbcTable)
设置目标表名。.option("user", jdbcUser)
和.option("password", jdbcPassword)
设置数据库用户名和密码。.mode("overwrite")
设置写入模式为覆盖。如果需要追加数据,可以使用.mode("append")
。
停止 SparkSession:
调用
spark.stop()
停止 SparkSession,释放资源。
4. 注意事项
CSV 文件路径:确保 CSV 文件路径正确,且 Spark 有权限访问。
MySQL 表结构:如果目标表不存在,Spark 会根据 DataFrame 的结构自动创建表。如果表已存在,确保表结构与 DataFrame 的结构一致。
JDBC 驱动:确保 MySQL JDBC 驱动已正确添加到 Spark 的依赖中。
通过以上步骤,你可以轻松地将 CSV 文件中的数据读取到 Spark 中,并将其写入 MySQL 数据库。