以下是 Scala 中使用 Spark SQL 连接 MySQL 并添加数据的完整代码示例(纯文本):
1. 准备连接参数(需替换实际信息)
scala
val jdbcUrl = "jdbc:mysql://localhost:3306/test_db?useUnicode=true&characterEncoding=utf-8"
val tableName = "users" // 目标表名
val user = "root"
val password = "your_password"
val driverClass = "com.mysql.cj.jdbc.Driver" // MySQL 8+ 驱动类(5.x 用 com.mysql.jdbc.Driver)
2. 创建 SparkSession
scala
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Spark SQL MySQL Insert")
.master("local[*]") // 单机模式,集群改为 "yarn" 等
.getOrCreate()
3. 生成待插入数据(示例 DataFrame)
scala
import spark.implicits._
// 示例数据:插入两条用户记录(假设表结构为 id INT, name STRING, age INT)
val newData = Seq(
(3, "Alice", 28),
(4, "Bob", 30)
).toDF("id", "name", "age")
4. 写入数据到 MySQL(追加模式)
scala
newData.write.jdbc(
url = jdbcUrl,
table = tableName,
mode = "append", // 写入模式:append(追加)、overwrite(覆盖)等
properties = new java.util.Properties() {{
setProperty("user", user)
setProperty("password", password)
setProperty("driver", driverClass)
}}
)
关键说明
1. 写入模式(mode):
- append :数据追加到现有表(表需存在)。
- overwrite :覆盖现有表(需注意权限和数据安全)。
- ignore :忽略重复数据(需表有唯一约束)。
- failIfExists :表存在时抛出异常(默认模式)。
2. 表结构要求:
- 目标表需提前创建,字段类型需与 DataFrame 匹配(如 id 对应 INT , name 对应 VARCHAR )。
3. 驱动与版本适配:
- 若报 ClassNotFoundException ,检查驱动是否正确部署(通过 --jars 参数或放入 $SPARK_HOME/jars/ )。
- MySQL 5.x 和 8.x 驱动类名不同,需对应修改 driverClass 。
4. 批量写入优化:
- 可添加参数 ?rewriteBatchedStatements=true 到 jdbcUrl 中,提升批量插入性能:
scala
val jdbcUrl = "jdbc:mysql://localhost:3306/test_db?useUnicode=true&characterEncoding=utf-8&rewriteBatchedStatements=true"
完整代码整合
scala
import org.apache.spark.sql.SparkSession
import spark.implicits._
object SparkMySQLInsert {
def main(args: Array[String]): Unit = {
// 连接参数
val jdbcUrl = "jdbc:mysql://localhost:3306/test_db?useUnicode=true&characterEncoding=utf-8"
val tableName = "users"
val user = "root"
val password = "your_password"
val driverClass = "com.mysql.cj.jdbc.Driver"
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("Spark SQL MySQL Insert")
.master("local[*]")
.getOrCreate()
// 生成待插入数据
val newData = Seq(
(3, "Alice", 28),
(4, "Bob", 30)
).toDF("id", "name", "age")
// 写入数据
newData.write.jdbc(
url = jdbcUrl,
table = tableName,
mode = "append",
properties = new java.util.Properties() {{
setProperty("user", user)
setProperty("password", password)
setProperty("driver", driverClass)
}}
)
spark.stop()
}
}
执行时需通过 spark-submit 命令提交,并指定 MySQL 驱动包:
bash
spark-submit --jars /path/to/mysql-connector-java.jar your_app.jar