SpringBoot與Connect整合,實現訂單數據實時同步至Elasticsearch功能
作者:Java知識日歷
Kafka Connect 是一個強大的工具,簡化了 Kafka 與其他系統的數據集成過程。通過合理配置和使用 Kafka Connect,可以高效地實現數據的實時同步和處理,提高了數據流動的效率和可靠性。
Kafka Connect 是一個強大的工具,簡化了 Kafka 與其他系統的數據集成過程。通過合理配置和使用 Kafka Connect,可以高效地實現數據的實時同步和處理,提高了數據流動的效率和可靠性。
我們為什么選擇Kafka Connect?
- 減少開發工作量:Kafka Connect 提供了大量的預構建連接器,可以直接用于常見的數據源(如 MySQL、PostgreSQL、Elasticsearch 等),減少了自定義開發的工作量。
- 標準化流程:通過標準的配置文件進行配置,確保數據集成過程的一致性和可維護性。
- 分布式架構:Kafka Connect 支持分布式模式,可以水平擴展以處理大量數據,確保高吞吐量和低延遲。
- 容錯機制:在分布式模式下,多個 Worker 節點協同工作,即使某個節點失敗,其他節點也能繼續處理任務,提高系統的可靠性和可用性。
- 低延遲傳輸:Kafka Connect 可以實現實時的數據流式傳輸,確保數據從源頭到目的地的時間延遲最小化。
- 支持多種協議:Kafka Connect 支持多種數據格式和傳輸協議,滿足不同的實時數據需求。
- JMX 監控:Kafka Connect 集成了 JMX 監控,方便管理和調試。
- 開源免費:Kafka Connect 是 Apache 開源項目,免費使用且無許可費用。
關鍵組件
- Connector:負責管理與外部系統的交互。每個連接器可以有一個或多個任務。
- Task:實際執行數據導入導出工作的單元。
- Converter:負責在 Kafka 消息格式(通常是字節數組)和其他格式之間進行轉換。
- Transformations:在數據進入或離開 Kafka 之前對數據進行處理和轉換。
- Worker:運行連接器和任務的進程。在分布式模式下,有多個 Worker 協同工作。
配置Kafka Connect
定義Kafka Connect如何從Kafka主題讀取消息并將其寫入Elasticsearch。
name=order-sink-connector
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=orders
connection.url=http://localhost:9200
type.name=kafka-connect
key.ignore=true
schema.ignore=true代碼實操
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>配置Kafka生產者
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
publicclass KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
returnnew DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
returnnew KafkaTemplate<>(producerFactory());
}
}Controller
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/orders")
publicclass OrderController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@PostMapping
public void sendOrder(@RequestBody String orderData) {
kafkaTemplate.send("orders", orderData);
}
}Application
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
}
}測試
發送訂單數據
curl -X POST http://localhost:8080/api/orders -H "Content-Type: application/json" -d '{"orderId": "1", "productName": "Laptop", "quantity": 1, "price": 999.99}'驗證Elasticsearch中的數據
curl -X GET "http://localhost:9200/orders/_search?pretty"Respons
{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "orders",
"_type" : "kafka-connect",
"_id" : "AWyGvZJxQhWqoOeTzFwA",
"_score" : 1.0,
"_source" : {
"orderId" : "1",
"productName" : "Laptop",
"quantity" : 1,
"price" : 999.99
}
}
]
}
}責任編輯:武曉燕
來源:
Java知識日歷





































