Spark,SparkSQL操作Mysql, 创建数据库和表

发布于:2025-05-20 ⋅ 阅读:(14) ⋅ 点赞:(0)

一、环境准备

1. 下载 MySQL 驱动

- 下载  mysql-connector-java-x.x.x.jar ,放入 Spark 目录的  jars  文件夹(或提交任务时通过  --jars  参数指定)。

2. 启动 SparkSession

scala

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()

  .appName("Spark MySQL Example")

  .master("local[*]") // 本地模式,集群需调整

  .getOrCreate()

 

二、通过 SparkSQL 创建 MySQL 数据库

方式 1:使用原生 JDBC 执行 SQL

scala

import java.sql.{DriverManager, Connection}

// 注册 MySQL 驱动(可选,部分版本需手动注册)

Class.forName("com.mysql.cj.jdbc.Driver")

// 创建数据库连接

val conn: Connection = DriverManager.getConnection(

  "jdbc:mysql://localhost:3306/", // MySQL 连接地址(不带数据库名)

  "root", // 用户名

  "your_password" // 密码

)

// 执行创建数据库语句

val statement = conn.createStatement()

statement.executeUpdate("CREATE DATABASE IF NOT EXISTS test_db")

statement.close()

conn.close()

 

方式 2:通过 SparkSQL 直接提交命令(需支持 JDBC 执行)

scala

spark.sql(

  s"""

     |CREATE DATABASE test_db

     |USING jdbc

     |OPTIONS (

     | url 'jdbc:mysql://localhost:3306/',

     | driver 'com.mysql.cj.jdbc.Driver',

     | user 'root',

     | password 'your_password'

     |)

     |""".stripMargin

)

 

三、通过 SparkSQL 创建 MySQL 表

步骤 1:定义表结构(Schema)

scala

import org.apache.spark.sql.types._

val schema = StructType(

  Seq(

    StructField("id", IntegerType, nullable = false),

    StructField("name", StringType, nullable = true),

    StructField("age", IntegerType, nullable = true),

    StructField("create_time", TimestampType, nullable = true)

  )

)

 

步骤 2:创建表(通过 JDBC 写入空数据触发建表)

scala

// 创建空 DataFrame

val emptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)

// 写入 MySQL 表(若表不存在则自动创建)

emptyDF.write.format("jdbc")

  .option("url", "jdbc:mysql://localhost:3306/test_db") // 指定数据库

  .option("dbtable", "users") // 表名

  .option("user", "root")

  .option("password", "your_password")

  .option("driver", "com.mysql.cj.jdbc.Driver")

  .mode("overwrite") // 覆盖模式(可根据需求改为 append/ignore)

  .save()

 

关键参数说明

-  dbtable :需指定 完整表名(如  test_db.users ,若已指定数据库可省略前缀)。

- 表结构需与 MySQL 数据类型匹配(如  StringType  →  VARCHAR , IntegerType  →  INT )。

 

四、验证表是否创建成功

1. 通过 SparkSQL 查询表结构

scala

spark.read.jdbc(

  "jdbc:mysql://localhost:3306/test_db",

  "users",

  Map("user" -> "root", "password" -> "your_password", "driver" -> "com.mysql.cj.jdbc.Driver")

).printSchema()

 

2. 直接登录 MySQL 查看

bash

mysql -uroot -p

USE test_db;

SHOW TABLES;

DESCRIBE users;

 

注意事项

1. 驱动版本匹配

- MySQL 8.0+ 需使用  mysql-connector-java-8.0+ ,低版本数据库需对应低版本驱动。

2. 权限问题

- 确保 MySQL 用户有创建数据库和表的权限(如  GRANT CREATE ON test_db.* TO 'user'@'localhost'; )。

3. 数据类型映射

- Spark 与 MySQL 数据类型需手动映射(例如  TimestampType  →  DATETIME ),避免自动创建表时类型错误。


网站公告

今日签到

点亮在社区的每一天
去签到