因为我安装的是spark 3.5.6,所以需要安装
pip install pyspark==3.5.6
Pyspark从postgresql读数据
import time
from pyspark.sql import DataFrame, SparkSession,DataFrameReader
from pyspark.sql.functions import to_json, struct
spark: SparkSession.Builder = SparkSession.builder
session: SparkSession = spark.appName("Python Spark SQL data source example") \
.config("spark.jars", r"C:\Users\84977\Downloads\postgresql-42.7.6.jar") \
.master("spark://192.168.220.132:7077")\
.getOrCreate()
last_max_id = 0 # 保存上次读取的最大ID
page_size= 2
while True:
query = f"""
(SELECT * FROM public.complexjson
WHERE id > {last_max_id}
ORDER BY id ASC
LIMIT {page_size}) as t
"""
df: DataFrame = session.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://192.168.220.130:32222/postgresdb") \
.option("dbtable", query) \
.option("driver", "org.postgresql.Driver") \
.option("user", "postgresadmin") \
.option("password", "admin123") \
.load()
if df.count() > 0:
df.show(truncate=False)
json_df = df.select(to_json(struct("*")).alias("json"))
for row in json_df.collect():
print(row["json"])
# 更新 last_max_id
max_id = df.agg({"id": "max"}).collect()[0][0]
last_max_id = max_id
time.sleep(10) # 每10秒轮询一次