在scala中sparkSQL连接masql并添加新数据

发布于:2025-05-14 ⋅ 阅读:(11) ⋅ 点赞:(0)

以下是 Scala 中使用 Spark SQL 连接 MySQL 并添加数据的完整代码示例(纯文本):

 

1. 准备连接参数(需替换实际信息)

 

scala

val jdbcUrl = "jdbc:mysql://localhost:3306/test_db?useUnicode=true&characterEncoding=utf-8"  

val tableName = "users" // 目标表名  

val user = "root"  

val password = "your_password"  

val driverClass = "com.mysql.cj.jdbc.Driver" // MySQL 8+ 驱动类(5.x 用 com.mysql.jdbc.Driver)  

 

 

2. 创建 SparkSession

 

scala

import org.apache.spark.sql.SparkSession  

 

val spark = SparkSession.builder()  

  .appName("Spark SQL MySQL Insert")  

  .master("local[*]") // 单机模式,集群改为 "yarn" 等  

  .getOrCreate()  

 

 

3. 生成待插入数据(示例 DataFrame)

 

scala

import spark.implicits._  

 

// 示例数据:插入两条用户记录(假设表结构为 id INT, name STRING, age INT)  

val newData = Seq(  

  (3, "Alice", 28),  

  (4, "Bob", 30)  

).toDF("id", "name", "age")  

 

 

4. 写入数据到 MySQL(追加模式)

 

scala

newData.write.jdbc(  

  url = jdbcUrl,  

  table = tableName,  

  mode = "append", // 写入模式:append(追加)、overwrite(覆盖)等  

  properties = new java.util.Properties() {{  

    setProperty("user", user)  

    setProperty("password", password)  

    setProperty("driver", driverClass)  

  }}  

)  

 

 

关键说明

 

1. 写入模式(mode):

 

-  append :数据追加到现有表(表需存在)。

 

-  overwrite :覆盖现有表(需注意权限和数据安全)。

 

-  ignore :忽略重复数据(需表有唯一约束)。

 

-  failIfExists :表存在时抛出异常(默认模式)。

 

2. 表结构要求:

 

- 目标表需提前创建,字段类型需与 DataFrame 匹配(如  id  对应  INT , name  对应  VARCHAR )。

 

3. 驱动与版本适配:

 

- 若报  ClassNotFoundException ,检查驱动是否正确部署(通过  --jars  参数或放入  $SPARK_HOME/jars/ )。

 

- MySQL 5.x 和 8.x 驱动类名不同,需对应修改  driverClass 。

 

4. 批量写入优化:

 

- 可添加参数  ?rewriteBatchedStatements=true  到  jdbcUrl  中,提升批量插入性能:

scala

val jdbcUrl = "jdbc:mysql://localhost:3306/test_db?useUnicode=true&characterEncoding=utf-8&rewriteBatchedStatements=true"  

 

 

完整代码整合

 

scala

import org.apache.spark.sql.SparkSession  

import spark.implicits._  

 

object SparkMySQLInsert {  

  def main(args: Array[String]): Unit = {  

    // 连接参数  

    val jdbcUrl = "jdbc:mysql://localhost:3306/test_db?useUnicode=true&characterEncoding=utf-8"  

    val tableName = "users"  

    val user = "root"  

    val password = "your_password"  

    val driverClass = "com.mysql.cj.jdbc.Driver"  

 

    // 创建 SparkSession  

    val spark = SparkSession.builder()  

      .appName("Spark SQL MySQL Insert")  

      .master("local[*]")  

      .getOrCreate()  

 

    // 生成待插入数据  

    val newData = Seq(  

      (3, "Alice", 28),  

      (4, "Bob", 30)  

    ).toDF("id", "name", "age")  

 

    // 写入数据  

    newData.write.jdbc(  

      url = jdbcUrl,  

      table = tableName,  

      mode = "append",  

      properties = new java.util.Properties() {{  

        setProperty("user", user)  

        setProperty("password", password)  

        setProperty("driver", driverClass)  

      }}  

    )  

 

    spark.stop()  

  }  

}  

 

 

执行时需通过  spark-submit  命令提交,并指定 MySQL 驱动包:

 

bash

spark-submit --jars /path/to/mysql-connector-java.jar your_app.jar


网站公告

今日签到

点亮在社区的每一天
去签到