以下是使用 Spark SQL 在 MySQL 中创建数据库和表的步骤(基于 Scala API):
1. 准备工作
- 添加 MySQL 驱动依赖
同前所述,需在 Spark 环境中引入 MySQL Connector JAR 包(如 mysql-connector-java-8.0.33.jar )。
- 获取 SparkSession
scala
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Spark SQL MySQL")
.master("local[*]") // 或集群地址
.getOrCreate()
2. 创建 MySQL 数据库
方式 1:通过原生 SQL 语句创建
scala
// 构建 MySQL 连接参数
val jdbcUrl = "jdbc:mysql://localhost:3306/?useSSL=false&useUnicode=true&characterEncoding=utf8" // 未指定数据库名
val user = "root"
val password = "your_password"
// 使用 Spark SQL 执行原生 MySQL 语句(需通过 JDBC 提交)
spark.sql(s"""
CREATE TABLE jdbc($jdbcUrl)
(statement 'CREATE DATABASE IF NOT EXISTS test_db')
""").show()
方式 2:通过 DataFrameWriter 间接创建(需先连接空数据库)
scala
// 连接到 MySQL 服务器(未指定数据库)
val emptyDbUrl = "jdbc:mysql://localhost:3306/?useSSL=false"
val createDbDf = spark.createDataFrame(Seq.empty[(String)]) // 空 DataFrame
createDbDf.write.format("jdbc")
.option("url", emptyDbUrl)
.option("dbtable", "(CREATE DATABASE IF NOT EXISTS test_db) AS dummy") // 执行建库语句
.option("user", user)
.option("password", password)
.mode("append")
.save()
3. 创建 MySQL 表
步骤 1:连接到目标数据库
scala
val dbUrl = "jdbc:mysql://localhost:3306/test_db?useSSL=false&characterEncoding=utf8"
步骤 2:定义表结构并创建表
方式 1:通过 Spark SQL 建表语句(DDL)
scala
// 定义表结构(DDL 语法需符合 MySQL 规范)
val createTableSql = """
CREATE TABLE IF NOT EXISTS users (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(50) NOT NULL,
age INT,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""
// 执行建表语句(通过 JDBC 提交)
spark.sql(s"""
CREATE TABLE jdbc($dbUrl)
(statement '$createTableSql')
""").show()
方式 2:通过 DataFrame 模式推断创建表
scala
// 创建示例 DataFrame(定义表结构)
val data = Seq(
(1, "Alice", 25),
(2, "Bob", 30)
)
val schema = "id INT, name STRING, age INT"
val df = spark.createDataFrame(data, schema)
// 写入空表(自动创建表结构)
df.write.format("jdbc")
.option("url", dbUrl)
.option("dbtable", "users") // 表名
.option("user", user)
.option("password", password)
.option("driver", "com.mysql.cj.jdbc.Driver")
.mode("overwrite") // 若表不存在则创建,存在则覆盖
.save()
关键说明
1. 建库建表权限:需确保 MySQL 用户(如 root )具备 CREATE DATABASE 和 CREATE TABLE 权限。
2. DDL 语法兼容性:Spark SQL 通过 JDBC 执行的是 原生 MySQL 语句,需遵循 MySQL 的语法规则(如引擎、字符集设置)。
3. 表已存在处理:使用 CREATE TABLE IF NOT EXISTS 避免重复建表报错。
通过以上方法,可利用 Spark SQL 在 MySQL 中完成数据库和表的创建操作。