20250330 Pyflink with Paimon

发布于:2025-03-28 ⋅ 阅读:(66) ⋅ 点赞:(0)

1. 数据湖

2. 本地安装Pyflink和Paimon

  • 必须安装Python 3.11

  • Pip install

python -m pip install apache-flink==1.20.1

  • 需要手动加入这两个jar

测试代码:

import argparse
import logging
import sys
import time

from pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtf

logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(message)s")


t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("parallelism.default", "1")


my_source_ddl = """
    create table source (
        word STRING
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '{}'
    )
""".format("D:/PyCharmWorkspace/PaimonLakeP02/src/basic/words.csv")

print(t_env.execute_sql(my_source_ddl))


print(t_env.execute_sql("""
    -- if you're trying out Paimon in a distributed environment,
    -- the warehouse path should be set to a shared file system, such as HDFS or OSS
    CREATE CATALOG paimon_catalog WITH (
        'type'='paimon',
        'warehouse'='D:/PyCharmWorkspace/PaimonLakeP02/src/basic/paimon'
    );
"""))

print(t_env.execute_sql("""
    USE CATALOG paimon_catalog;
"""))

print(t_env.execute_sql("""
    -- create a word count table
    CREATE TABLE IF NOT EXISTS word_count (
        word STRING PRIMARY KEY NOT ENFORCED,
        cnt BIGINT
    );
"""))




# r=t_env.sql_query("select word from source").execute()
# r.print()
stmt_set = t_env.create_statement_set()
r=stmt_set.add_insert_sql("""
insert into word_count select word, count(1) as `count` from default_catalog.default_database.source group by word
""")
stmt_set.execute().wait()

# print sink
t_env.sql_query("select 'another print', * from word_count").execute().print()

print("===========end==============")

启动成功:

Paimon的本地数据文件:

参考资料

安装指引:Quick Start | Apache Paimon​​​​​​

下载包:Downloads | Apache Flink

附录:遇到的问题

1. Flink2.0 + Paimon

//没有配套的Paimon库,会报Sink不匹配异常。