python sink to kafka

发布于:2024-05-06 ⋅ 阅读:(26) ⋅ 点赞:(0)

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import RuntimeContext, FlatMapFunction, MapFunction
import json
import re
import logging
import sys
from pyflink.datastream.state import ValueStateDescriptor, MapStateDescriptor
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer, TypeInformation,FlinkKafkaProducer
from pyflink.common.typeinfo import Types
from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter, FlushBackoffType
from  pyflink.datastream.connectors import  DeliveryGuarantee
from pyflink.common.serialization import SimpleStringSchema
from datetime import datetime


logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(asctime)s-%(levelname)s-%(message)s")
logger = logging.getLogger(__name__)

# 创建 StreamExecutionEnvironment 对象
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.add_jars("file:///root/flink-sql-connector-kafka_2.11-1.14.4.jar")
from pyflink.datastream import DataStream, StreamExecutionEnvironment
from pyflink.datastream.functions import RuntimeContext, FlatMapFunction, MapFunction
from pyflink.common.typeinfo import Types

env = StreamExecutionEnvironment.get_execution_environment()
data = DataStream(env._j_stream_execution_environment.socketTextStream('192.168.137.201', 8899))

TEST_KAFKA_SERVERS = "192.168.137.201:9092"
TEST_KAFKA_TOPIC = "test_topic_elink"
TEST_GROUP_ID = "pyflink_elink_midsys"


def get_kafka_customer_properties(kafka_servers: str, group_id: str):
    properties = {
        "bootstrap.servers": kafka_servers,
        "fetch.max.bytes": "67108864",
        "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
        "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
        "enable.auto.commit": "false",  # 关闭kafka 自动提交,此处不能传bool 类型会报错
        "group.id": group_id,
    }
    return properties


properties = get_kafka_customer_properties(TEST_KAFKA_SERVERS, TEST_GROUP_ID)
producer_properties = {
    'bootstrap.servers': '192.168.137.201:9092'
}
producer = FlinkKafkaProducer(
    topic=TEST_KAFKA_TOPIC,
    producer_config=properties,
    serialization_schema=SimpleStringSchema()
)

data.add_sink(producer)

data.print()
env.execute()