Kafka发送消息以及兜底方案

发布于:2025-07-15 ⋅ 阅读:(16) ⋅ 点赞:(0)

kafka生产者发送消息以及兜底方案

yml基础配置

spring:
  kafka:
    properties:
      linger.ms: 50 #默认值:0毫秒,当消息发送比较频繁时,增加一些延迟可增加吞吐量和性能。
      #注意:当前消息符合at-least-once,自kafka1.0.0以后,为保证消息有序以及exactly once,这个配置可适当调大为5。
      max.in.flight.requests.per.connection: 1
    #默认值:5,设置为1即表示producer在connection上发送一条消息,至少要等到这条消息被broker确认收到才继续发送下一条,因此是有序的。
    bootstrap-servers: localhost:9092
    producer:
      retries: 0
      # 每次批量发送消息的数量
      batch-size: 16384
      buffer-memory: 33554432
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    topic:
      name:
        xxx: xx

Kakfa生产者发送消息

kafka发送消息最基本的实现
代码如下

@Component
@Slf4j
public class ProducerComponent {

    @Autowired
    private KafkaTemplate kafkaTemplate;
    
    @Value("${xxx.xx}")
    private String url;

    public Boolean send(String topic, String content) {
        log.info("开始kafka推送...");
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, content);
        final Boolean[] ok = {true};
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(@NonNull Throwable throwable) {
                log.error("发送消息 [{}] 失败!", content, throwable);
                ok[0] = false;
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("发送消息[{}] 偏移量=[{}] 成功!", content, result.getRecordMetadata().offset());
            }
        });

        try {
            // 异步发送,设置等待最多10s
            future.get(10, TimeUnit.SECONDS);
        } catch (Exception e) {
            log.error("等待推送kafka消息失败!", e);
            return false;
        }
        return ok[0];
    }
}

兜底方案

  1. 增加本地消息表,记录发送的消息,发送失败的消息用定时任务重新跑。
  2. 在kafka发送消息失败后,在失败回调中增加http调用,用http尝试去调用,保证在kafka异常时能保证能正常推送消息。

代码最终的实现

@Component
@Slf4j
public class ProducerComponent {

    @Autowired
    private KafkaTemplate kafkaTemplate;
    @Autowired
    private RestTemplate restTemplate;
    @Value("${xxx.xx}")
    private String url;

    /**
     * @param eventMsg 消息内容,eventMsg可根据自己需求将入参修改成String或者其它对象
     * @param topic
     */
    public void sendMsg(EventMsg eventMsg, String topic) {
        try {
            eventMsg.setSendTime(new Date());
            eventMsg.setEventId(IdUtil.getSnowflakeNextId());
            String taskTitle = "[sent message] 任务ID:" + eventMsg.getEventId();
            String content = JSON.toJSONString(eventMsg);
            // todo 在发送消息前可以先把消息存入本地消息表
            log.info(taskTitle + "开始kafka推送...");
            ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, content);
            final Boolean[] ok = {true};
            future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
                @Override
                public void onFailure(@NonNull Throwable throwable) {
                    log.error("{} sent message=[{}] failed!", taskTitle, content, throwable);
                    // kafka调用失败用http尝试调用
                    postHttpEventTrackMsg(eventMsg);
                    ok[0] = false;
                }

                @Override
                public void onSuccess(SendResult<String, String> result) {
                    log.info("{} sent message=[{}] with offset=[{}] success!", taskTitle, content, result.getRecordMetadata().offset());
                }
            });

            try {
                // 因为是异步发送,所以我们等待,最多5s
                future.get(5, TimeUnit.SECONDS);
            } catch (Exception e) {
                log.error("{} waiting for kafka send finish failed!", taskTitle, e);
            }
            if (!ok[0]) {
                log.info("{} 处理失败,任务类型:{}", taskTitle, eventMsg.getEventCode());
                // todo something
            }
        } catch (Exception e) {
            log.error("[sent message] 任务ID:{},发送失败,任务类型:{},错误信息为:", eventMsg.getEventId(), eventMsg.getEventCode(), e);
        }
    }

    public void postHttpMsg(EventMsg eventMsg) {
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        HttpEntity<EventMsg> requestEntity = new HttpEntity<>(eventMsg, headers);
        Gson gson = new Gson();
        log.info("[sent message] 任务ID:{},Http调用 postHttpMsg-->sendMsg, 入参={}", eventMsg.getEventId(), gson.toJson(eventMsg));
        ResponseEntity<JSONObject> responseEntity;
        try {
            responseEntity = restTemplate.exchange(url, HttpMethod.POST, requestEntity, JSONObject.class);
            log.info("[sent message] 任务ID:{},Http调用 postHttpMsg-->sendMsg 发送完成,出参:{}", eventMsg.getEventId(), gson.toJson(responseEntity));
        } catch (Exception e) {
            log.error("[sent message] 任务ID:{},Http调用 postHttpMsg-->sendMsg 失败 errorMsg:{}", eventMsg.getEventId(), e.getMessage(), e);
        }
    }

}

网站公告

今日签到

点亮在社区的每一天
去签到