使用pyspark连接mongodb进行简单的操作:
from pyspark.sql import SparkSession
from pyspark import SparkConf
import os
# 强制使用 IPv4 并配置环境变量
os.environ["JAVA_HOME"] = r"C:\app\Java\jdk-1.8"
os.environ["HADOOP_HOME"] = r"C:\app\hadoop-3.3.5"
os.environ["SPARK_HOME"] = r"C:\app\spark-3.5.4-bin-hadoop3"
os.environ["PYSPARK_PYTHON"] = "python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python"
# MongoDB 配置
mongo_uri = "mongodb://localhost:27017"
database = "test_db"
collection = "test_clolection"
# spark设置配置
conf = SparkConf()
# conf.set("spark.sql.shuffle.partitions", "512") # 分区数
# conf.set("spark.default.parallelism", "36") # 默认并行度
conf.set("spark.driver.memory", "16g") # 驱动内存
conf.set("spark.executor.memory", "16g") # 执行器内存
conf.set("spark.driver.maxResultSize","3g") # 驱动最大结果大小
conf.set("spark.executor.maxResultSize", "3g") # 执行器最大结果大小
conf.set("spark.driver.host", "127.0.0.1") # 驱动主机
conf.set("spark.driver.bindAddress", "127.0.0.1") # 驱动绑定地址
conf.set("spark.network.timeout", "6000s") # 网络超时时间
conf.set("spark.executor.heartbeatInterval", "600s") # 心跳间隔
# jar包配置
conf.set("spark.jars",
",".join([
r"C:\app\spark-3.5.4-bin-hadoop3\jars\mongo-spark-connector_2.12-10.3.0.jar",
r"C:\app\spark-3.5.4-bin-hadoop3\jars\mongodb-driver-core-4.9.1.jar",
r"C:\app\spark-3.5.4-bin-hadoop3\jars\mongodb-driver-sync-4.9.1.jar",
r"C:\app\spark-3.5.4-bin-hadoop3\jars\bson-4.9.1.jar"
])
)
conf.set("spark.mongodb.read.connection.uri", mongo_uri)
conf.set("spark.mongodb.write.connection.uri", mongo_uri)
# Spark 配置
spark = SparkSession.builder \
.appName("Spark_MongoDB") \
.config(conf=conf) \
.getOrCreate()
# 测试读写 MongoDB
try:
# 写入数据
data = [{"name": "Eva", "age": 28}, {"name": "Frank", "age": 35}]
df = spark.createDataFrame(data)
# 写入数据
df.write.format("mongodb")\
.option("database", database)\
.option("collection", collection)\
.mode("append").save()
# 读取数据 通过pipeline过滤筛选数据
df_read = spark.read.format("mongodb")\
.option("database", database)\
.option("collection", collection)\
.option("aggregation.pipeline", '[{"$match": {"name": {"$in":["Frank", 28]}}}]')\
.load()
df_read.show()
except Exception as e:
print("Error:", e)
finally:
spark.stop()