下面是一个完整的示例,展示如何使用SparkSQL读取CSV文件并将数据写入MySQL数据库。
1. 准备工作
首先确保你有:
1. 运行中的Spark环境
2. MySQL数据库连接信息
3. 适当的JDBC驱动
2. 示例代码
Scala版本
import org.apache.spark.sql.{SparkSession, SaveMode}
object CsvToMysql {
def main(args: Array[String]): Unit = {
// 创建SparkSession
val spark = SparkSession.builder()
.appName("CSV to MySQL")
.master("local[*]") // 本地模式,生产环境去掉这行
.getOrCreate()
// 读取CSV文件
val df = spark.read
.option("header", "true") // 第一行作为header
.option("inferSchema", "true") // 自动推断数据类型
.csv("path/to/your/file.csv") // CSV文件路径
// 显示数据
df.show()
// MySQL连接配置
val jdbcUrl = "jdbc:mysql://localhost:3306/your_database"
val connectionProperties = new java.util.Properties()
connectionProperties.put("user", "your_username")
connectionProperties.put("password", "your_password")
connectionProperties.put("driver", "com.mysql.jdbc.Driver")
// 写入MySQL
df.write
.mode(SaveMode.Overwrite) // 如果表存在则覆盖
.jdbc(jdbcUrl, "your_table", connectionProperties)
spark.stop()
}
}
Python (PySpark)版本
from pyspark.sql import SparkSession
# 创建SparkSession
spark = SparkSession.builder \
.appName("CSV to MySQL") \
.config("spark.jars", "/path/to/mysql-connector-java-8.0.23.jar") \ # MySQL驱动路径
.getOrCreate()
# 读取CSV文件
df = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("path/to/your/file.csv")
# 显示数据
df.show()
# MySQL连接配置
jdbc_url = "jdbc:mysql://localhost:3306/your_database"
connection_properties = {
"user": "your_username",
"password": "your_password",
"driver": "com.mysql.jdbc.Driver"
}
# 写入MySQL
df.write \
.mode("overwrite") \ # 可选: "append", "ignore", "error" (默认)
.jdbc(jdbc_url, "your_table", properties=connection_properties)
spark.stop()
3. 关键点说明
1. CSV读取选项:
`header`: 是否将第一行作为列名
`inferSchema`: 是否自动推断数据类型
其他可选参数:`delimiter`, `quote`, `escape`, `nullValue`等
2. 写入模式:
`overwrite`: 覆盖现有表
`append`: 追加数据
`ignore`: 表存在时不做任何操作
`error`或`errorifexists`(默认): 表存在时报错
3. MySQL连接:
需要MySQL JDBC驱动
驱动可以:
通过`--jars`参数在spark-submit时指定
在代码中通过`spark.jars`配置指定
放在Spark的`jars`目录下
4. 性能优化:
批量写入:`connectionProperties.put("batchsize", "10000")`
并行写入:`df.repartition(10).write...` (根据数据量调整分区数)
4. 运行方式
对于Scala项目,打包后使用spark-submit运行:
spark-submit --class CsvToMysql \
--jars /path/to/mysql-connector-java-8.0.23.jar \
your_application.jar
对于Python脚本:
spark-submit --jars /path/to/mysql-connector-java-8.0.23.jar \
your_script.py
5. 常见问题解决
1. 驱动类找不到:
①确保驱动jar包路径正确
②检查驱动版本与MySQL版本兼容
2. 连接拒绝:
①检查MySQL服务是否运行
②检查用户名密码是否正确
③检查MySQL是否允许远程连接
3. 权限问题:
确保数据库用户有创建表和写入数据的权限
4. 数据类型不匹配:
①可以在写入前使用`df.printSchema()`检查数据类型
②必要时使用`cast()`函数转换数据类型