创建数据库和表
我们去创建一个新的数据库,数据表,并插入一条数据。
参考代码如下:
-- 创建数据库
CREATE DATABASE spark;
-- 使用数据库
USE spark;
-- 创建表
create table person(id int, name char(20), age int);
-- 插入示例数据
insert into person values(1, 'jam', 20), (2,'judi', 21);
-- 查看所有数据
select * from person;
-- 退出
quit
提醒:use spark;的作用是使用当前数据库;
Spark连接MySQL数据库
- 新建项目,或者使用之前的项目也可以。
- 修改pom.xml文件。
补充三个依赖:
(1)scala-library 是 Scala 语言的基础库,是编写 Scala 程序的必要条件。
(2)spark-sql_2.12 提供了 Spark SQL 的功能,用于高效的数据处理和分析。
(3)mysql-connector-java 提供了与 MySQL 数据库交互的能力。
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.15</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
请注意,这里并没没有单独添加spark_core的依赖,因为在spark-sql中已经包含了spark_core。
- 写Spark程序连接mysql
核心步骤:
- 创建Properties对象,设置数据库的用户名和密码
- 使用spark.read.jbdc方法,连接数据库
参考代码如下:
impport org.apache.spark.sql.SparkSession
import java.util.Properties
object SparkMySQL {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("SparkMySQL").master("local[*]").getOrCreate()
// 创建properties对象,设置连接mysql的用户名和密码
val prop = new Properties()
prop.setProperty("user", "root")
prop.setProperty("password", "000000")
// 读取mysql数据
val df = spark.read.jdbc("jdbc:mysql://hadoop100:3306/spark", "person", prop)
df.show()
spark.stop()
}
(四)Spark添加数据到mysql
前面演示了数据的查询,现在来看看添加数据到mysql。
核心方法:dataFrame.write.mode("append").jdbc()。
import org.apache.spark.sql.SparkSession
import java.util.Properties
object SparkMySQL {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("SparkMySQL").master("local[*]").getOrCreate()
// 创建properties对象,设置连接mysql的用户名和密码
val prop = new Properties()
prop.setProperty("user", "root")
prop.setProperty("password", "000000")
// 插入一条数据到数据库
val data = Seq(("3", "zhangsan", "30"))
val df2 = spark.createDataFrame(data).toDF("id", "name", "age")
df2.write.mode("append").jdbc("jdbc:mysql://hadoop100:3306/spark", "person", prop)
}
}