SpringBoot與BookKeeper整合,實(shí)現(xiàn)金融級(jí)別的日志存儲(chǔ)系統(tǒng)
作者:Java知識(shí)日歷
選擇Apache BookKeeper作為金融級(jí)日志存儲(chǔ)系統(tǒng)的核心組件,主要是因?yàn)樗邆涓咝阅堋⒏呖煽啃院土己玫目蓴U(kuò)展性,能夠有效滿(mǎn)足金融機(jī)構(gòu)對(duì)日志存儲(chǔ)的要求。
選擇Apache BookKeeper作為金融級(jí)日志存儲(chǔ)系統(tǒng)的核心組件,主要是因?yàn)樗邆涓咝阅堋⒏呖煽啃院土己玫目蓴U(kuò)展性,能夠有效滿(mǎn)足金融機(jī)構(gòu)對(duì)日志存儲(chǔ)的要求。
BookKeeper的優(yōu)勢(shì)
高吞吐量和低延遲
- 分布式架構(gòu): Apache BookKeeper采用分布式的架構(gòu)設(shè)計(jì),能夠支持高并發(fā)的寫(xiě)入和讀取操作。
- 批量寫(xiě)入: 支持批量寫(xiě)入日志條目,顯著提高寫(xiě)入效率。
- 異步I/O: 使用異步I/O操作,減少等待時(shí)間,提升整體性能。
數(shù)據(jù)一致性和持久性
- 強(qiáng)一致性保證: BookKeeper提供強(qiáng)一致性保證,確保所有寫(xiě)入的數(shù)據(jù)都能被正確讀取。
- 多副本復(fù)制: 數(shù)據(jù)在多個(gè)Bookies(BookKeeper節(jié)點(diǎn))上進(jìn)行多副本復(fù)制,防止單點(diǎn)故障導(dǎo)致的數(shù)據(jù)丟失。
- 自動(dòng)恢復(fù): 在節(jié)點(diǎn)故障時(shí),BookKeeper能夠自動(dòng)檢測(cè)并恢復(fù)數(shù)據(jù),確保系統(tǒng)的連續(xù)運(yùn)行。
水平擴(kuò)展能力
- 動(dòng)態(tài)擴(kuò)展: 可以通過(guò)增加Bookies來(lái)擴(kuò)展集群規(guī)模,適應(yīng)不斷增長(zhǎng)的業(yè)務(wù)需求。
- 負(fù)載均衡: 自動(dòng)分配負(fù)載,確保各節(jié)點(diǎn)之間的工作負(fù)載平衡,避免熱點(diǎn)問(wèn)題。
- 靈活性: 支持多種部署方式,包括本地部署、云部署等。
數(shù)據(jù)加密和訪(fǎng)問(wèn)控制
- 數(shù)據(jù)加密: 支持對(duì)存儲(chǔ)的日志數(shù)據(jù)進(jìn)行加密處理,防止未授權(quán)訪(fǎng)問(wèn)。
- 認(rèn)證和授權(quán): 提供細(xì)粒度的權(quán)限管理機(jī)制,限制不同角色的訪(fǎng)問(wèn)權(quán)限。
- 審計(jì)日志: 記錄所有對(duì)系統(tǒng)的訪(fǎng)問(wèn)和操作,便于追蹤和審計(jì)。
哪些公司采用了BookKeeper?
Intel
- 用途: Intel在其物聯(lián)網(wǎng)(IoT)解決方案中使用BookKeeper來(lái)收集和存儲(chǔ)傳感器數(shù)據(jù)。
- 優(yōu)勢(shì): 多副本復(fù)制和自動(dòng)恢復(fù)機(jī)制,確保數(shù)據(jù)的可靠性和完整性。
阿里巴巴集團(tuán)
- 用途: 阿里巴巴在多個(gè)核心系統(tǒng)中使用BookKeeper,包括交易日志存儲(chǔ)、監(jiān)控系統(tǒng)和大數(shù)據(jù)平臺(tái)。
- 優(yōu)勢(shì): 成熟的社區(qū)支持和與現(xiàn)有生態(tài)系統(tǒng)的良好集成,提升了開(kāi)發(fā)效率和系統(tǒng)穩(wěn)定性。
Baidu
- 用途: Baidu在其搜索引擎和推薦系統(tǒng)中使用BookKeeper來(lái)存儲(chǔ)大量的日志和索引數(shù)據(jù)。
- 優(yōu)勢(shì): 高效的數(shù)據(jù)檢索能力和靈活的配置選項(xiàng),適應(yīng)不同的應(yīng)用場(chǎng)景。
Microsoft Azure
- 用途: Microsoft Azure在其云平臺(tái)上使用BookKeeper來(lái)支持各種分布式系統(tǒng)和服務(wù)。
- 優(yōu)勢(shì): 高性能和可擴(kuò)展性,滿(mǎn)足不同規(guī)模的應(yīng)用需求。
PayPal
- 用途: PayPal使用BookKeeper來(lái)存儲(chǔ)支付交易日志,確保每一筆交易的完整記錄和快速查詢(xún)。
- 優(yōu)勢(shì): 數(shù)據(jù)加密和訪(fǎng)問(wèn)控制,保障金融數(shù)據(jù)的安全性。
Yahoo!
- 用途: Yahoo!在其多個(gè)分布式系統(tǒng)中使用BookKeeper,包括搜索引擎日志記錄和流處理系統(tǒng)。
- 優(yōu)勢(shì): 強(qiáng)一致性保證和高可用性,支持復(fù)雜的數(shù)據(jù)處理需求。
- 用途: Twitter在其基礎(chǔ)設(shè)施中使用BookKeeper來(lái)處理大量實(shí)時(shí)數(shù)據(jù)流,包括推文事件和用戶(hù)活動(dòng)日志。
- 優(yōu)勢(shì): 支持高并發(fā)寫(xiě)入和讀取操作,能夠應(yīng)對(duì)快速增長(zhǎng)的業(yè)務(wù)需求。
eBay
- 用途: eBay在其電商平臺(tái)中使用BookKeeper來(lái)存儲(chǔ)交易日志和其他關(guān)鍵數(shù)據(jù)。
- 優(yōu)勢(shì): 安全的數(shù)據(jù)加密和嚴(yán)格的訪(fǎng)問(wèn)控制,保護(hù)敏感信息。
記得啟動(dòng)ZooKeeper服務(wù)器
因?yàn)锽ookKeeper依賴(lài)于ZooKeeper來(lái)進(jìn)行元數(shù)據(jù)管理和協(xié)調(diào)!!!
我這邊的本地環(huán)境已運(yùn)行了ZooKeeper。
代碼實(shí)操
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.5</version>
<relativePath/><!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>bookkeeper-springboot-example</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>bookkeeper-springboot-example</name>
<description>Demo project for Spring Boot and Apache BookKeeper integration</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Apache BookKeeper Client -->
<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-server</artifactId>
<version>4.18.0</version>
</dependency>
<!-- Jackson Databind for JSON processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- Lombok for reducing boilerplate code -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>application.properties
# ZooKeeper 連接字符串
bookkeeper.zk.connectString=localhost:2181
server.port=8080配置類(lèi)
package com.example.bookkeeperspringbootexample.config;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PreDestroy;
import java.io.IOException;
@Configuration
publicclass BookKeeperConfig {
privatestaticfinal Logger logger = LoggerFactory.getLogger(BookKeeperConfig.class);
@Value("${bookkeeper.zk.connectString}")
private String zkConnectString;
private BookKeeper bookKeeper;
private LedgerHandle ledgerHandle;
/**
* 初始化BookKeeper客戶(hù)端
*
* @return BookKeeper實(shí)例
* @throws IOException 如果初始化失敗
*/
@Bean
public BookKeeper bookKeeper() throws IOException {
ClientConfiguration conf = new ClientConfiguration();
conf.setZkServers(zkConnectString);
bookKeeper = new BookKeeper(conf);
logger.info("BookKeeper客戶(hù)端已初始化。");
return bookKeeper;
}
/**
* 創(chuàng)建一個(gè)新的Ledger
*
* @param bookKeeper BookKeeper實(shí)例
* @return LedgerHandle實(shí)例
* @throws Exception 如果創(chuàng)建Ledger失敗
*/
@Bean
public LedgerHandle ledgerHandle(BookKeeper bookKeeper) throws Exception {
ledgerHandle = bookKeeper.createLedger(
BookKeeper.DigestType.CRC32,
"password".getBytes()
);
logger.info("Ledger已創(chuàng)建,ID: {}", ledgerHandle.getId());
return ledgerHandle;
}
/**
* 關(guān)閉BookKeeper客戶(hù)端和Ledger
*/
@PreDestroy
public void shutdown() throws InterruptedException, BookKeeper.BKException {
if (ledgerHandle != null) {
ledgerHandle.close();
logger.info("Ledger已關(guān)閉。");
}
if (bookKeeper != null) {
bookKeeper.close();
logger.info("BookKeeper客戶(hù)端已關(guān)閉。");
}
}
}交易的數(shù)據(jù)模型
package com.example.bookkeeperspringbootexample.model;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 表示交易的數(shù)據(jù)模型
*/
@Data
public class Transaction {
private Long transactionId; // 交易ID
private Double amount; // 交易金額
private LocalDateTime timestamp; // 時(shí)間戳
}服務(wù)類(lèi)
package com.example.bookkeeperspringbootexample.service;
import com.example.bookkeeperspringbootexample.model.Transaction;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@Service
publicclass BookKeeperService {
privatestaticfinal Logger logger = LoggerFactory.getLogger(BookKeeperService.class);
@Autowired
private LedgerHandle ledgerHandle;
@Autowired
private ObjectMapper objectMapper;
/**
* 異步添加交易到BookKeeper
*
* @param transaction 交易對(duì)象
* @return CompletableFuture<Long> 包含新條目的entryId
*/
public CompletableFuture<Long> addTransaction(Transaction transaction) {
try {
byte[] logData = objectMapper.writeValueAsBytes(transaction); // 將交易對(duì)象轉(zhuǎn)換為字節(jié)數(shù)組
return CompletableFuture.supplyAsync(() -> {
try {
long entryId = ledgerHandle.addEntry(logData); // 將字節(jié)數(shù)組添加到Ledger
logger.info("已添加交易,entryId: {}", entryId);
return entryId;
} catch (BKException | InterruptedException e) {
thrownew RuntimeException(e);
}
});
} catch (IOException e) {
thrownew RuntimeException(e);
}
}
/**
* 異步從BookKeeper讀取交易
*
* @param entryId 條目ID
* @return CompletableFuture<Transaction> 包含讀取的交易對(duì)象
*/
public CompletableFuture<Transaction> readTransaction(long entryId) {
return CompletableFuture.supplyAsync(() -> {
try {
LedgerSequence seq = ledgerHandle.readEntries(entryId, entryId); // 讀取指定entryId的條目
if (seq.hasMoreElements()) {
LedgerEntry entry = seq.nextElement(); // 獲取條目
byte[] data = entry.getEntryBytes(); // 獲取條目的字節(jié)數(shù)組
logger.info("已讀取交易,entryId: {}", entryId);
return objectMapper.readValue(data, Transaction.class); // 將字節(jié)數(shù)組轉(zhuǎn)換為交易對(duì)象
}
thrownew IllegalArgumentException("未找到ID為 " + entryId + " 的交易");
} catch (BKException | InterruptedException | ExecutionException | IOException e) {
thrownew RuntimeException(e);
}
});
}
}Controller
package com.example.bookkeeperspringbootexample.controller;
import com.example.bookkeeperspringbootexample.model.Transaction;
import com.example.bookkeeperspringbootexample.service.BookKeeperService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.concurrent.CompletableFuture;
@RestController
@RequestMapping("/transactions")
publicclass TransactionController {
@Autowired
private BookKeeperService bookKeeperService;
/**
* 添加新的交易
*
* @param transaction 交易對(duì)象
* @return ResponseEntity<Long> 包含新條目的entryId
*/
@PostMapping("/")
public ResponseEntity<Long> addTransaction(@RequestBody Transaction transaction) {
CompletableFuture<Long> futureEntryId = bookKeeperService.addTransaction(transaction); // 異步添加交易
try {
Long entryId = futureEntryId.get(); // 獲取結(jié)果
return ResponseEntity.ok(entryId); // 返回成功的HTTP響應(yīng)
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt(); // 中斷線(xiàn)程
return ResponseEntity.internalServerError().build(); // 返回內(nèi)部服務(wù)器錯(cuò)誤
}
}
/**
* 根據(jù)entryId讀取交易
*
* @param entryId 條目ID
* @return ResponseEntity<Transaction> 包含讀取的交易對(duì)象
*/
@GetMapping("/{entryId}")
public ResponseEntity<Transaction> getTransaction(@PathVariable long entryId) {
CompletableFuture<Transaction> futureTransaction = bookKeeperService.readTransaction(entryId); // 異步讀取交易
try {
Transaction transaction = futureTransaction.get(); // 獲取結(jié)果
return ResponseEntity.ok(transaction); // 返回成功的HTTP響應(yīng)
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt(); // 中斷線(xiàn)程
return ResponseEntity.notFound().build(); // 返回未找到資源
}
}
}Application
package com.example.bookkeeperspringbootexample;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class BookKeeperSpringBootExampleApplication {
public static void main(String[] args) {
SpringApplication.run(BookKeeperSpringBootExampleApplication.class, args);
}
}測(cè)試
添加交易
curl -X POST http://localhost:8080/transactions/ \
-H "Content-Type: application/json" \
-d '{"transactionId": 1, "amount": 100.50, "timestamp": "2025-03-19T21:36:06"}'Respons:
1讀取交易
curl -X GET http://localhost:8080/transactions/1Respons:
{"transactionId":1,"amount":100.5,"timestamp":"2025-03-19T21:36:06"}責(zé)任編輯:武曉燕
來(lái)源:
Java知識(shí)日歷
































