Kafka
优化
增加吞吐量
// mark - 增加吞吐量 start
// batch.size:批次大小,默认16K
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// linger.ms:等待时间,默认0
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// RecordAccumulator:缓冲区大小,默认32M:buffer.memory
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// compression.type:压缩,默认none,可配置值gzip、snappy、lz4和zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
// mark - 增加吞吐量 end
ack应答级别
- acks=0 这一操作提供了一个最低的延迟,partition的leader副本接收到消息还没有写入磁盘就已经返回ack,当leader故障时有可能丢失数据;
- acks=1 partition的leader副本落盘后返回ack,如果在follower副本同步数据之前leader故障,那么将会丢失数据;
- acks-1
(all) partition的leader和follower副本全部落盘成功后才返回ack。但是如果在follower副本同步完成后,leader副本所在节点发送ack之前,leader副本发生故障,那么会造成数据重复。
// 设置acks
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 重试次数retries,默认是int最大值,2147483647
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
问题
VM option ‘UseG1GC’ is experimental and must be enabled via -XX:+UnlockExperimentalVMOptions
Error: VM option 'UseG1GC' is experimental and must be enabled via -XX:+UnlockExperimentalVMOptions.
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
根据报错信息,到kafka的bin目录下,打开kafka-run-class.sh,删除包含UseG1GC的这一句,之后继续重新启动kafka即可: