Maven依赖
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>6.9.0</version>
</dependency>
</dependencies>
Kafka 消费者 + InfluxDB 插入逻辑
package com.ruoyi.datainterface;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApi;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class KafkaToInfluxDB {
private static final String INFLUX_URL = "http://localhost:8086";
private static final String TOKEN = "your-influxdb-token";
private static final String ORG = "your-org";
private static final String BUCKET = "your-bucket";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC = "your-kafka-topic";
private static final String GROUP_ID = "your-consumer-group";
public static void main(String[] args) {
InfluxDBClient influxDBClient = InfluxDBClientFactory.create(INFLUX_URL, TOKEN.toCharArray(), ORG, BUCKET);
WriteApi writeApi = influxDBClient.getWriteApi();
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
Point point = parseMessageToPoint(record.value());
writeApi.writePoint(point);
System.out.println("Inserted: " + point.toLineProtocol());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
influxDBClient.close();
}
}
private static Point parseMessageToPoint(String message) {
String[] parts = message.split("\\|");
if (parts.length < 3) {
throw new IllegalArgumentException("Invalid message format: " + message);
}
String deviceId = parts[0];
String timestamp = parts[1];
Point point = Point.measurement("device_data")
.addTag("device_id", deviceId)
.addTag("timestamp",timestamp);
for (int i = 2; i < parts.length; i++) {
point.addField("point" + (i - 1), Double.parseDouble(parts[i]));
}
return point;
}
}