SparkSQL操作Mysql

发布于:2025-05-15 ⋅ 阅读:(17) ⋅ 点赞:(0)
(一创建数据库和表

接下来,我们去创建一个新的数据库,数据表,并插入一条数据。

参考代码如下

-- 创建数据库

CREATE DATABASE spark;
-- 使用数据库

USE spark;

-- 创建表

create table person(id int, name char(20), age int);

-- 插入示例数据

insert into person values(1, 'jam', 20), (2,'judi', 21);

-- 查看所有数据
select * from person;

-- 退出
quit

提醒:use spark;的作用是使用当前数据库;

(二Spark连接MySQL数据库
  1. 新建项目,或者使用之前的项目也可以。
  2. 修改pom.xml文件。

补充三个依赖:

(1)scala-library 是 Scala 语言的基础库,是编写 Scala 程序的必要条件。

(2)spark-sql_2.12 提供了 Spark SQL 的功能,用于高效的数据处理和分析。

       (3)mysql-connector-java 提供了与 MySQL 数据库交互的能力。

 <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.12.15</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.3.1</version>
         </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.33</version>
        </dependency>

请注意,这里并没没有单独添加spark_core的依赖,因为在spark-sql中已经包含了spark_core。

3.写Spark程序连接mysql

核心步骤:

  1. 创建Properties对象,设置数据库的用户名和密码
  2. 使用spark.read.jbdc方法,连接数据库

参考代码如下:

impport org.apache.spark.sql.SparkSession
import java.util.Properties
object SparkMySQL {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("SparkMySQL").master("local[*]").getOrCreate()

    // 创建properties对象,设置连接mysql的用户名和密码
    val prop = new Properties()
    prop.setProperty("user", "root")
    prop.setProperty("password", "000000")

    // 读取mysql数据
    val df = spark.read.jdbc("jdbc:mysql://hadoop100:3306/spark", "person", prop)
    df.show()
    spark.stop()
  }
(三)Spark添加数据到mysql

前面演示了数据的查询,现在来看看添加数据到mysql。

核心方法:dataFrame.write.mode("append").jdbc()。

import org.apache.spark.sql.SparkSession
import java.util.Properties
object SparkMySQL {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("SparkMySQL").master("local[*]").getOrCreate()

    // 创建properties对象,设置连接mysql的用户名和密码
    val prop = new Properties()
    prop.setProperty("user", "root")
    prop.setProperty("password", "000000")
    // 插入一条数据到数据库
    val data = Seq(("3", "zhangsan", "30"))
    val df2 = spark.createDataFrame(data).toDF("id", "name", "age")
    df2.write.mode("append").jdbc("jdbc:mysql://hadoop100:3306/spark", "person", prop)
  }
}

上面的代码运行完成之后,切换到finalshell中的mysql端,查看效果。


网站公告

今日签到

点亮在社区的每一天
去签到