Flink流数据采集笔记(三) : Source读取数据编程

发布于:2023-02-12 ⋅ 阅读:(643) ⋅ 点赞:(0)

目录

一  Flink核心编程概述

二  Environment环境

三  Source

(一)  准备

(二)  从Java中读取数据

(三)  从文件中读取数据

注意事项

(四)  从Socket读取数据

(五)  从Kafka读取数据

(六)  自定义Source 

关键点:


一  Flink核心编程概述

        从开发步骤的角度来讲,主要分为四大部分 :

         

二  Environment环境

        没说的,两行代码解决全部

        批:

ExecutionEnvironment benv = ExecutionEnvironment.getExecutionEnvironment();

        流:

  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

三  Source

(一)  准备

        为了更好的方便敲代码,一个导包,一个JavaBean类

<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.16</version>
</dependency>
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * 水位传感器:用于接收水位数据
 *
 * id:传感器编号
 * ts:时间戳
 * vc:水位
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {
    private String id;
    private Long ts;
    private Integer vc;
}

(二)  从Java中读取数据

public class SourceJava {
    public static void main(String[] args) throws Exception {
        //1.获取流的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //2.TODO 从集合中读取数据
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
        DataStreamSource<Integer> streamSource = env.fromCollection(list);

        //从元素中读取数据
        DataStreamSource<String> dataStreamSource = env.fromElements("a", "b", "c", "d");

//        streamSource.print();
        dataStreamSource.print();
        env.execute();
    }
}

(三)  从文件中读取数据

public class Source_File {
    public static void main(String[] args) throws Exception {
        //1.获取流的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //2.TODO 从文件读取数据
//.setParallelism(2)-即并行度可不写;"input/niaiwowoaini.txt"是文件的所在
        DataStreamSource<String> streamSource = env.readTextFile("input/niaiwowoaini.txt").setParallelism(2);

        streamSource.print();

        env.execute();
    }
}

注意事项

参数可以是文件也可以是目录,可以是相对路径也可以是绝对路径,甚至也可以是hdfs的文件.

相对路径---系统属性:user.dir获取路径   /  idea:从project的根目录获取   /  standalone模式下是集群节点根目录

从hdfs上读取:使用路径:hdfs://hadoop102:8020/...., 由于Flink没有提供hadoop相关依赖, 需要pom中添加相关依赖:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.1.3</version>
</dependency>

(四)  从Socket读取数据

DataStreamSource<String> streamSource = env.socketTextStream("hadoop102", 9999);

(五)  从Kafka读取数据

添加相关依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.13.0</version>
</dependency>

此处kafka读取数据是有两种写法

第一种(较为喜欢):


import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

public class Flink04_Source_Kafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);
        
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"520520");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
        DataStreamSource<String> streamSource = env
                .addSource(new FlinkKafkaConsumer<String>("sensor", new SimpleStringSchema(), properties))
                .setParallelism(3);

        streamSource.print();
        env.execute();
    }
}

第二种:

public class Source_Kafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("hadoop102:9092")
                .setGroupId("520")
                .setTopics("sensor")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        DataStreamSource<String> streamSource = env
        .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "source-kafka");
        
        streamSource.print();
        env.execute();
    }
}

(六)  自定义Source 

关键点:

        1 实现SourceFunction相关接口,如果希望 Source可以指定并行度,那么就 实现 ParallelSourceFunction 这个接口

        2 重写相关方法:run(): 主要逻辑 和  cancel(): 停止逻辑


import com.atguigu.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;

public class Source_Custom {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<WaterSensor> streamSource = env.addSource(new MySource());
        streamSource.print();
        env.execute();
    }

    //public static class MySource implements SourceFunction<WaterSensor>{
    public static class MySource implements ParallelSourceFunction<WaterSensor> {
        private Random random = new Random();
        private  Boolean isRunning = true;
        @Override
        public void run(SourceContext<WaterSensor> ctx) throws Exception {
            while (isRunning){
                ctx.collect(new WaterSensor("sensor"+random.nextInt(1000),System.currentTimeMillis(),random.nextInt(100)));
            }
            Thread.sleep(200);
        }

        @Override
        public void cancel() {
            isRunning= false;
        }
    }

}


网站公告

今日签到

点亮在社区的每一天
去签到