SparkSQL操作Mysql(2)

发布于:2025-05-16 ⋅ 阅读:(7) ⋅ 点赞:(0)
创建数据库和表

我们去创建一个新的数据库,数据表,并插入一条数据。

参考代码如下

-- 创建数据库

CREATE DATABASE spark;
-- 使用数据库

USE spark;

-- 创建表

create table person(id int, name char(20), age int);

-- 插入示例数据

insert into person values(1, 'jam', 20), (2,'judi', 21);

-- 查看所有数据
select * from person;

-- 退出
quit

提醒:use spark;的作用是使用当前数据库;

Spark连接MySQL数据库
  1. 新建项目,或者使用之前的项目也可以。
  2. 修改pom.xml文件。

补充三个依赖:

(1)scala-library 是 Scala 语言的基础库,是编写 Scala 程序的必要条件。

(2)spark-sql_2.12 提供了 Spark SQL 的功能,用于高效的数据处理和分析。

(3)mysql-connector-java 提供了与 MySQL 数据库交互的能力。

       <dependency>

            <groupId>org.scala-lang</groupId>

            <artifactId>scala-library</artifactId>

            <version>2.12.15</version>

        </dependency>

        <dependency>

            <groupId>org.apache.spark</groupId>

            <artifactId>spark-sql_2.12</artifactId>

            <version>3.3.1</version>

         </dependency>

        <dependency>

            <groupId>mysql</groupId>

            <artifactId>mysql-connector-java</artifactId>

            <version>8.0.33</version>

        </dependency>

请注意,这里并没没有单独添加spark_core的依赖,因为在spark-sql中已经包含了spark_core。

  1. 写Spark程序连接mysql

核心步骤:

  1. 创建Properties对象,设置数据库的用户名和密码
  2. 使用spark.read.jbdc方法,连接数据库

参考代码如下:

impport org.apache.spark.sql.SparkSession

import java.util.Properties

object SparkMySQL {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("SparkMySQL").master("local[*]").getOrCreate()



    // 创建properties对象,设置连接mysql的用户名和密码

    val prop = new Properties()

    prop.setProperty("user", "root")

    prop.setProperty("password", "000000")



    // 读取mysql数据

    val df = spark.read.jdbc("jdbc:mysql://hadoop100:3306/spark", "person", prop)

    df.show()

    spark.stop()

  }
Spark添加数据到mysql

前面演示了数据的查询,现在来看看添加数据到mysql。

核心方法:dataFrame.write.mode("append").jdbc()。

import org.apache.spark.sql.SparkSession

import java.util.Properties

object SparkMySQL {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("SparkMySQL").master("local[*]").getOrCreate()



    // 创建properties对象,设置连接mysql的用户名和密码

    val prop = new Properties()

    prop.setProperty("user", "root")

    prop.setProperty("password", "000000")

    // 插入一条数据到数据库

    val data = Seq(("3", "zhangsan", "30"))

    val df2 = spark.createDataFrame(data).toDF("id", "name", "age")

    df2.write.mode("append").jdbc("jdbc:mysql://hadoop100:3306/spark", "person", prop)

  }

}


网站公告

今日签到

点亮在社区的每一天
去签到