kafka生产者发送消息以及兜底方案
yml基础配置
spring:
kafka:
properties:
linger.ms: 50 #默认值:0毫秒,当消息发送比较频繁时,增加一些延迟可增加吞吐量和性能。
#注意:当前消息符合at-least-once,自kafka1.0.0以后,为保证消息有序以及exactly once,这个配置可适当调大为5。
max.in.flight.requests.per.connection: 1
#默认值:5,设置为1即表示producer在connection上发送一条消息,至少要等到这条消息被broker确认收到才继续发送下一条,因此是有序的。
bootstrap-servers: localhost:9092
producer:
retries: 0
# 每次批量发送消息的数量
batch-size: 16384
buffer-memory: 33554432
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
topic:
name:
xxx: xx
Kakfa生产者发送消息
kafka发送消息最基本的实现
代码如下
@Component
@Slf4j
public class ProducerComponent {
@Autowired
private KafkaTemplate kafkaTemplate;
@Value("${xxx.xx}")
private String url;
public Boolean send(String topic, String content) {
log.info("开始kafka推送...");
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, content);
final Boolean[] ok = {true};
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(@NonNull Throwable throwable) {
log.error("发送消息 [{}] 失败!", content, throwable);
ok[0] = false;
}
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("发送消息[{}] 偏移量=[{}] 成功!", content, result.getRecordMetadata().offset());
}
});
try {
// 异步发送,设置等待最多10s
future.get(10, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("等待推送kafka消息失败!", e);
return false;
}
return ok[0];
}
}
兜底方案
- 增加本地消息表,记录发送的消息,发送失败的消息用定时任务重新跑。
- 在kafka发送消息失败后,在失败回调中增加http调用,用http尝试去调用,保证在kafka异常时能保证能正常推送消息。
代码最终的实现
@Component
@Slf4j
public class ProducerComponent {
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private RestTemplate restTemplate;
@Value("${xxx.xx}")
private String url;
/**
* @param eventMsg 消息内容,eventMsg可根据自己需求将入参修改成String或者其它对象
* @param topic
*/
public void sendMsg(EventMsg eventMsg, String topic) {
try {
eventMsg.setSendTime(new Date());
eventMsg.setEventId(IdUtil.getSnowflakeNextId());
String taskTitle = "[sent message] 任务ID:" + eventMsg.getEventId();
String content = JSON.toJSONString(eventMsg);
// todo 在发送消息前可以先把消息存入本地消息表
log.info(taskTitle + "开始kafka推送...");
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, content);
final Boolean[] ok = {true};
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(@NonNull Throwable throwable) {
log.error("{} sent message=[{}] failed!", taskTitle, content, throwable);
// kafka调用失败用http尝试调用
postHttpEventTrackMsg(eventMsg);
ok[0] = false;
}
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("{} sent message=[{}] with offset=[{}] success!", taskTitle, content, result.getRecordMetadata().offset());
}
});
try {
// 因为是异步发送,所以我们等待,最多5s
future.get(5, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("{} waiting for kafka send finish failed!", taskTitle, e);
}
if (!ok[0]) {
log.info("{} 处理失败,任务类型:{}", taskTitle, eventMsg.getEventCode());
// todo something
}
} catch (Exception e) {
log.error("[sent message] 任务ID:{},发送失败,任务类型:{},错误信息为:", eventMsg.getEventId(), eventMsg.getEventCode(), e);
}
}
public void postHttpMsg(EventMsg eventMsg) {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<EventMsg> requestEntity = new HttpEntity<>(eventMsg, headers);
Gson gson = new Gson();
log.info("[sent message] 任务ID:{},Http调用 postHttpMsg-->sendMsg, 入参={}", eventMsg.getEventId(), gson.toJson(eventMsg));
ResponseEntity<JSONObject> responseEntity;
try {
responseEntity = restTemplate.exchange(url, HttpMethod.POST, requestEntity, JSONObject.class);
log.info("[sent message] 任务ID:{},Http调用 postHttpMsg-->sendMsg 发送完成,出参:{}", eventMsg.getEventId(), gson.toJson(responseEntity));
} catch (Exception e) {
log.error("[sent message] 任务ID:{},Http调用 postHttpMsg-->sendMsg 失败 errorMsg:{}", eventMsg.getEventId(), e.getMessage(), e);
}
}
}