一、环境准备
1. 下载 MySQL 驱动
- 下载 mysql-connector-java-x.x.x.jar ,放入 Spark 目录的 jars 文件夹(或提交任务时通过 --jars 参数指定)。
2. 启动 SparkSession
scala
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Spark MySQL Example")
.master("local[*]") // 本地模式,集群需调整
.getOrCreate()
二、通过 SparkSQL 创建 MySQL 数据库
方式 1:使用原生 JDBC 执行 SQL
scala
import java.sql.{DriverManager, Connection}
// 注册 MySQL 驱动(可选,部分版本需手动注册)
Class.forName("com.mysql.cj.jdbc.Driver")
// 创建数据库连接
val conn: Connection = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/", // MySQL 连接地址(不带数据库名)
"root", // 用户名
"your_password" // 密码
)
// 执行创建数据库语句
val statement = conn.createStatement()
statement.executeUpdate("CREATE DATABASE IF NOT EXISTS test_db")
statement.close()
conn.close()
方式 2:通过 SparkSQL 直接提交命令(需支持 JDBC 执行)
scala
spark.sql(
s"""
|CREATE DATABASE test_db
|USING jdbc
|OPTIONS (
| url 'jdbc:mysql://localhost:3306/',
| driver 'com.mysql.cj.jdbc.Driver',
| user 'root',
| password 'your_password'
|)
|""".stripMargin
)
三、通过 SparkSQL 创建 MySQL 表
步骤 1:定义表结构(Schema)
scala
import org.apache.spark.sql.types._
val schema = StructType(
Seq(
StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = true),
StructField("age", IntegerType, nullable = true),
StructField("create_time", TimestampType, nullable = true)
)
)
步骤 2:创建表(通过 JDBC 写入空数据触发建表)
scala
// 创建空 DataFrame
val emptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)
// 写入 MySQL 表(若表不存在则自动创建)
emptyDF.write.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/test_db") // 指定数据库
.option("dbtable", "users") // 表名
.option("user", "root")
.option("password", "your_password")
.option("driver", "com.mysql.cj.jdbc.Driver")
.mode("overwrite") // 覆盖模式(可根据需求改为 append/ignore)
.save()
关键参数说明
- dbtable :需指定 完整表名(如 test_db.users ,若已指定数据库可省略前缀)。
- 表结构需与 MySQL 数据类型匹配(如 StringType → VARCHAR , IntegerType → INT )。
四、验证表是否创建成功
1. 通过 SparkSQL 查询表结构
scala
spark.read.jdbc(
"jdbc:mysql://localhost:3306/test_db",
"users",
Map("user" -> "root", "password" -> "your_password", "driver" -> "com.mysql.cj.jdbc.Driver")
).printSchema()
2. 直接登录 MySQL 查看
bash
mysql -uroot -p
USE test_db;
SHOW TABLES;
DESCRIBE users;
注意事项
1. 驱动版本匹配
- MySQL 8.0+ 需使用 mysql-connector-java-8.0+ ,低版本数据库需对应低版本驱动。
2. 权限问题
- 确保 MySQL 用户有创建数据库和表的权限(如 GRANT CREATE ON test_db.* TO 'user'@'localhost'; )。
3. 数据类型映射
- Spark 与 MySQL 数据类型需手动映射(例如 TimestampType → DATETIME ),避免自动创建表时类型错误。