多線程事務怎么回滾?說用@Transactional可以回去等通知了!
兄弟們,今天咱們來聊一個能讓不少程序員在夜深人靜時抓頭發的問題 —— 多線程事務回滾。估計有不少剛入行的小伙伴會拍著胸脯說:“這還不簡單?加個 @Transactional 注解不就搞定了嘛!” 要是你也這么想,那我只能說:“兄弟,你還是先回去等通知吧,這事兒沒你想的那么簡單。”
一、@Transactional 是何方神圣?
在聊多線程事務回滾之前,咱們先得搞明白 @Transactional 這個注解到底是個啥。對于 Java 開發者來說,這個注解可太熟悉了,它是 Spring 框架提供的用于事務管理的注解,能讓我們在開發中輕松地實現事務的控制。
簡單來說,當我們在一個方法上加上 @Transactional 注解后,Spring 就會在這個方法執行前開啟一個事務,方法執行完如果沒有出現異常,就會自動提交事務;要是出現了異常,就會自動回滾事務。這操作看起來是不是特別省心?就像給方法加了個 “安全氣囊”,出問題了能自動保護數據。
比如說,我們有一個轉賬的方法:
@Transactional
public void transferMoney(Account from, Account to, BigDecimal amount) {
from.reduceMoney(amount);
to.addMoney(amount);
accountDao.update(from);
accountDao.update(to);
}在這個方法里,如果轉賬過程中出現了異常,比如余額不足之類的,@Transactional 就會讓整個轉賬操作回滾,不會出現錢從一個賬戶扣了,另一個賬戶卻沒收到的情況。這在單線程環境下,那是相當好用。
二、多線程一來,@Transactional 就 “歇菜”?
可一旦到了多線程環境,@Transactional 就有點力不從心了。為啥呢?咱們得從事務的本質和多線程的特性說起。
事務有四大特性,也就是 ACID:原子性(Atomicity)、一致性(Consistency)、隔離性(Isolation)、持久性(Durability)。其中原子性要求事務中的操作要么全部成功,要么全部失敗。而多線程的特點是多個線程并發執行,每個線程都有自己的執行路徑和上下文。
@Transactional 的事務管理是基于線程綁定的,它會把事務和當前線程綁定在一起。也就是說,每個線程都有自己獨立的事務,線程之間的事務是隔離開的。當我們在一個主線程里啟動了多個子線程,每個子線程都會有自己的事務,主線程的 @Transactional 可管不了子線程的事務。
咱們來看個例子:
@Service
public class OrderService {
@Autowired
private OrderDao orderDao;
@Autowired
private InventoryService inventoryService;
@Transactional
public void createOrder(Order order) {
// 保存訂單
orderDao.save(order);
// 開啟多線程扣減庫存
new Thread(() -> {
inventoryService.reduceInventory(order.getProductId(), order.getQuantity());
}).start();
// 模擬出現異常
int i = 1 / 0;
}
}
@Service
public class InventoryService {
@Autowired
private InventoryDao inventoryDao;
@Transactional
public void reduceInventory(Long productId, Integer quantity) {
inventoryDao.reduce(productId, quantity);
}
}在這個例子里,createOrder 方法加了 @Transactional 注解,里面啟動了一個子線程去調用 reduceInventory 方法,而 reduceInventory 方法也加了 @Transactional 注解。當 createOrder 方法執行到 “int i = 1 / 0” 時會拋出異常,按照咱們的預期,整個事務都應該回滾,訂單不保存,庫存也不扣減。可實際運行一下你會發現,訂單確實沒保存,因為主線程的事務回滾了。但庫存呢?居然被扣減了!這是因為子線程的事務和主線程的事務是分開的,主線程拋出異常回滾,并不會影響子線程的事務,子線程的操作已經提交了。
這就好比你和你朋友各自開了一個銀行賬戶,你轉賬時操作失誤,你的賬戶回滾了,但你朋友的賬戶如果已經收到錢了,可不會因為你的失誤再把錢退回來。
所以說,在多線程環境下,想靠 @Transactional 來實現事務回滾,那基本上是行不通的。
三、多線程事務回滾的難點在哪?
多線程事務回滾之所以讓人頭疼,主要是因為它面臨著幾個棘手的問題:
- 事務隔離性:如前面所說,每個線程的事務都是獨立的,彼此之間隔離開來。這本來是事務的一個優點,但在多線程協作完成一個業務時,就成了麻煩事。一個線程的事務出了問題,很難影響到其他線程的事務。
- 并發問題:多線程并發執行時,會出現各種不可預測的情況。比如多個線程同時操作同一份數據,可能會導致數據不一致。就像幾個人同時往一個銀行賬戶里存錢,操作順序不一樣,最后賬戶里的錢可能就不對了。
- 回滾時機難把握:就算我們能監測到某個線程出現了異常,想要讓其他線程都回滾,可其他線程可能已經執行完了自己的操作,甚至已經提交了事務,這時候再想回滾就晚了。
- 性能問題:如果為了實現多線程事務回滾,采用一些過于復雜的機制,可能會大大降低系統的性能。就像為了保證幾個小伙伴行動完全一致,每次都要等所有人都準備好了再動,效率肯定高不了。
四、那多線程事務回滾到底該咋整?
雖然多線程事務回滾很難,但也不是完全沒辦法解決。根據不同的業務場景,我們可以采用不同的方案。
(一)使用全局事務管理器
全局事務管理器就像是一個 “大管家”,可以管理多個線程的事務,協調它們一起提交或回滾。比較典型的就是 Java EE 中的 JTA(Java Transaction API),它可以管理分布式事務,當然也能應對多線程事務。
JTA 的核心是事務管理器(Transaction Manager)和資源管理器(Resource Manager)。事務管理器負責協調各個資源管理器,讓它們要么一起提交事務,要么一起回滾。
不過,使用 JTA 也有一些缺點。首先,它需要應用服務器的支持,比如 JBoss、WebLogic 等,而我們常用的 Tomcat 是不支持的。其次,JTA 的配置比較復雜,而且性能相對較差,因為它需要在多個資源之間進行協調。
如果你的項目是一個大型的分布式系統,并且對事務的一致性要求非常高,那可以考慮使用 JTA。但如果是普通的中小型項目,用 JTA 可能就有點 “殺雞用牛刀” 了。
(二)采用本地消息表
本地消息表是一種比較實用的方案,它的思路是將分布式事務(這里也包括多線程事務)轉化為本地事務來處理。
具體做法是這樣的:
- 當我們需要在多線程中執行一系列操作時,先在本地數據庫創建一張消息表,記錄每個操作的狀態。
- 每個線程執行完自己的操作后,不是直接提交事務,而是往消息表中插入一條消息,標記該操作已完成。
- 有一個專門的線程(或者定時任務)來監測消息表中的消息。如果所有線程的操作都完成了,就把所有事務提交;如果有任何一個線程的操作出現異常,就根據消息表中的記錄,讓所有已完成操作的線程進行回滾。
舉個例子,還是前面的訂單和庫存的例子。我們可以創建一張消息表,里面有訂單操作狀態和庫存操作狀態。主線程保存訂單,子線程扣減庫存。主線程執行完保存訂單操作后,往消息表插入一條訂單操作完成的消息;子線程執行完扣減庫存操作后,插入一條庫存操作完成的消息。
然后有一個監測線程,當它發現訂單操作失敗(比如主線程拋出異常),就會通知子線程進行回滾,把庫存加回去;如果庫存操作失敗,就通知主線程回滾,不保存訂單。
這種方案的優點是實現起來相對簡單,不需要依賴復雜的框架,而且對性能影響也不大。但它也有缺點,消息表需要我們自己維護,而且可能會出現消息表數據和業務數據不一致的情況,需要做好容錯處理。
(三)使用消息隊列
消息隊列也可以用來解決多線程事務回滾問題,它的核心思想是 “異步確保”。
具體步驟如下:
- 主線程執行自己的操作,如果成功,就往消息隊列里發送一條消息。
- 子線程從消息隊列中接收消息,然后執行自己的操作。如果子線程執行成功,就發送一個確認消息;如果失敗,就不發送或者發送一個失敗消息。
- 主線程如果沒有收到子線程的確認消息,就進行回滾操作。
還是拿訂單和庫存的例子來說,主線程先保存訂單,如果保存成功,就往消息隊列里發送一條 “扣減庫存” 的消息。子線程從消息隊列里收到消息后,執行扣減庫存的操作,如果成功,就發送一條 “庫存扣減成功” 的消息。主線程如果收到了這條確認消息,就提交事務;如果一段時間內沒收到,就回滾事務,把訂單刪除。
這種方案的優點是解耦了主線程和子線程,提高了系統的靈活性和可擴展性。但它也有不足,消息隊列可能會出現消息丟失、重復消費等問題,需要我們去解決。而且這種方式是異步的,可能會有一定的延遲。
(四)兩階段提交(2PC)
兩階段提交是分布式事務中常用的一種方案,也可以借鑒到多線程事務回滾中。它把事務的提交過程分為兩個階段:準備階段和提交階段。
- 準備階段:協調者(可以是主線程)向所有參與者(子線程)發送準備請求,問它們是否可以執行事務并提交。參與者執行事務,但不真正提交,而是告訴協調者自己是否準備好。
- 提交階段:如果所有參與者都準備好(返回 yes),協調者就向所有參與者發送提交請求,讓它們真正提交事務;如果有任何一個參與者沒準備好(返回 no),協調者就發送回滾請求,讓所有參與者回滾事務。
舉個形象的例子,就像一群人去聚餐,準備階段就是服務員問每個人 “菜都上齊了,可以開始吃了嗎?”,大家都表示可以。然后到了提交階段,服務員說 “那大家開始吃吧”,于是大家就開動了。如果有人說 “我的菜還沒上”,那服務員就會說 “那大家再等等,先別吃”。
在多線程事務中,主線程可以充當協調者,子線程充當參與者。主線程先讓各個子線程執行操作,但不提交,然后根據子線程的反饋決定是提交還是回滾。
這種方案能保證事務的一致性,但也有明顯的缺點。首先,性能比較差,因為所有參與者都要等待其他參與者的響應,在高并發場景下不太適用。其次,如果協調者出現故障,整個系統可能會陷入停滯狀態。
(五)TCC(Try-Confirm-Cancel)模式
TCC 模式是一種更靈活的事務解決方案,它把一個業務操作分為三個階段:
- Try 階段:嘗試執行業務,完成所有業務檢查,并且預留好必要的資源。比如扣減庫存的 Try 階段,就是檢查庫存是否充足,并把要扣減的庫存預留出來,不讓其他線程使用。
- Confirm 階段:如果所有業務的 Try 階段都成功了,就執行 Confirm 操作,真正地提交業務。比如把預留的庫存真正扣減掉。
- Cancel 階段:如果有任何一個業務的 Try 階段失敗了,就執行 Cancel 操作,釋放預留的資源,回滾到原來的狀態。比如把預留的庫存釋放出來。
在多線程場景下,每個線程負責一個業務的 Try 操作,然后主線程判斷所有線程的 Try 操作是否都成功。如果都成功,就通知所有線程執行 Confirm 操作;如果有失敗的,就通知所有線程執行 Cancel 操作。
TCC 模式的優點是靈活性高,可以根據具體的業務場景來實現 Try、Confirm 和 Cancel 操作,而且對性能的影響相對較小。但它的缺點也很明顯,需要我們為每個業務都實現這三個階段的邏輯,開發成本比較高,而且要考慮各種異常情況,比如 Confirm 或 Cancel 操作失敗該怎么辦。
就拿電商下單來說,Try 階段要檢查商品是否存在、庫存是否充足、用戶余額是否夠等,并預留庫存和金額;Confirm 階段就是實際扣減庫存和金額,創建訂單;Cancel 階段就是釋放預留的庫存和金額。
五、實戰案例:用本地消息表解決多線程事務回滾
前面說了這么多理論,接下來咱們就通過一個實戰案例,看看如何用本地消息表來解決多線程事務回滾問題。
假設我們有一個業務場景:用戶下單,需要同時完成兩個操作,保存訂單和扣減庫存,這兩個操作分別在兩個線程中執行,要求要么都成功,要么都失敗。
(一)創建數據庫表
首先,我們需要創建訂單表、庫存表和消息表。
訂單表(order):
CREATE TABLE `order` (
`id` bigint NOT NULL AUTO_INCREMENT,
`user_id` bigint NOT NULL,
`product_id` bigint NOT NULL,
`quantity` int NOT NULL,
`status` int NOT NULL COMMENT '0-未支付,1-已支付,2-取消',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;庫存表(inventory):
CREATE TABLE `inventory` (
`id` bigint NOT NULL AUTO_INCREMENT,
`product_id` bigint NOT NULL,
`quantity` int NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `idx_product_id` (`product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;消息表(transaction_message):
CREATE TABLE `transaction_message` (
`id` bigint NOT NULL AUTO_INCREMENT,
`business_type` varchar(50) NOT NULL COMMENT '業務類型:ORDER-訂單,INVENTORY-庫存',
`business_id` bigint NOT NULL COMMENT '業務ID',
`status` int NOT NULL COMMENT '0-處理中,1-成功,2-失敗',
`create_time` datetime NOT NULL,
`update_time` datetime NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;(二)編寫實體類
創建 Order、Inventory 和 TransactionMessage 實體類,對應上面的表結構。
Order 類:
public class Order {
private Long id;
private Long userId;
private Long productId;
private Integer quantity;
private Integer status;
// getter和setter方法
}Inventory 類:
public class Inventory {
private Long id;
private Long productId;
private Integer quantity;
// getter和setter方法
}TransactionMessage 類:
public class TransactionMessage {
private Long id;
private String businessType;
private Long businessId;
private Integer status;
private LocalDateTime createTime;
private LocalDateTime updateTime;
// getter和setter方法
}(三)編寫 DAO 層
使用 MyBatis 或 JPA 等持久層框架,編寫 OrderDao、InventoryDao 和 TransactionMessageDao,實現對數據庫的操作。
這里以 MyBatis 為例,OrderDao:
public interface OrderDao {
void save(Order order);
void delete(Long id);
}InventoryDao:
public interface InventoryDao {
Inventory getByProductId(Long productId);
void update(Inventory inventory);
}TransactionMessageDao:
public interface TransactionMessageDao {
void save(TransactionMessage message);
void updateStatus(TransactionMessage message);
List<TransactionMessage> getByBusinessTypesAndStatus(List<String> businessTypes, Integer status);
}(四)編寫 Service 層
訂單服務(OrderService):
@Service
publicclass OrderService {
@Autowired
private OrderDao orderDao;
@Autowired
private TransactionMessageDao messageDao;
@Transactional
public Long createOrder(Order order) {
// 保存訂單
orderDao.save(order);
// 往消息表插入訂單操作的消息,狀態為處理中
TransactionMessage message = new TransactionMessage();
message.setBusinessType("ORDER");
message.setBusinessId(order.getId());
message.setStatus(0);
message.setCreateTime(LocalDateTime.now());
message.setUpdateTime(LocalDateTime.now());
messageDao.save(message);
return order.getId();
}
@Transactional
public void rollbackOrder(Long orderId) {
// 刪除訂單
orderDao.delete(orderId);
// 更新消息表中訂單操作的狀態為失敗
TransactionMessage message = new TransactionMessage();
message.setBusinessType("ORDER");
message.setBusinessId(orderId);
message.setStatus(2);
message.setUpdateTime(LocalDateTime.now());
messageDao.updateStatus(message);
}
}庫存服務(InventoryService):
@Service
publicclass InventoryService {
@Autowired
private InventoryDao inventoryDao;
@Autowired
private TransactionMessageDao messageDao;
@Transactional
public Long reduceInventory(Long productId, Integer quantity) {
// 獲取庫存
Inventory inventory = inventoryDao.getByProductId(productId);
if (inventory == null || inventory.getQuantity() < quantity) {
thrownew RuntimeException("庫存不足");
}
// 扣減庫存
inventory.setQuantity(inventory.getQuantity() - quantity);
inventoryDao.update(inventory);
// 往消息表插入庫存操作的消息,狀態為處理中
TransactionMessage message = new TransactionMessage();
message.setBusinessType("INVENTORY");
message.setBusinessId(productId);
message.setStatus(0);
message.setCreateTime(LocalDateTime.now());
message.setUpdateTime(LocalDateTime.now());
messageDao.save(message);
return productId;
}
@Transactional
public void rollbackInventory(Long productId, Integer quantity) {
// 恢復庫存
Inventory inventory = inventoryDao.getByProductId(productId);
inventory.setQuantity(inventory.getQuantity() + quantity);
inventoryDao.update(inventory);
// 更新消息表中庫存操作的狀態為失敗
TransactionMessage message = new TransactionMessage();
message.setBusinessType("INVENTORY");
message.setBusinessId(productId);
message.setStatus(2);
message.setUpdateTime(LocalDateTime.now());
messageDao.updateStatus(message);
}
}事務協調服務(TransactionCoordinatorService):
@Service
publicclass TransactionCoordinatorService {
@Autowired
private OrderService orderService;
@Autowired
private InventoryService inventoryService;
@Autowired
private TransactionMessageDao messageDao;
public void processOrder(Order order, Integer quantity) {
// 用于存儲訂單ID和商品ID,方便回滾
Long orderId = null;
Long productId = order.getProductId();
boolean isSuccess = true;
// 線程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 用于獲取子線程的執行結果
CountDownLatch countDownLatch = new CountDownLatch(1);
// 存儲子線程是否執行成功
AtomicBoolean inventorySuccess = new AtomicBoolean(false);
try {
// 主線程創建訂單
orderId = orderService.createOrder(order);
// 子線程扣減庫存
executorService.submit(() -> {
try {
inventoryService.reduceInventory(productId, quantity);
inventorySuccess.set(true);
} catch (Exception e) {
e.printStackTrace();
inventorySuccess.set(false);
} finally {
countDownLatch.countDown();
}
});
// 等待子線程執行完成
countDownLatch.await();
// 檢查子線程是否執行成功
if (!inventorySuccess.get()) {
isSuccess = false;
} else {
// 檢查消息表中訂單和庫存的消息狀態
List<String> businessTypes = Arrays.asList("ORDER", "INVENTORY");
List<TransactionMessage> messages = messageDao.getByBusinessTypesAndStatus(businessTypes, 0);
if (messages.size() != 2) {
isSuccess = false;
} else {
// 更新消息狀態為成功
for (TransactionMessage message : messages) {
message.setStatus(1);
message.setUpdateTime(LocalDateTime.now());
messageDao.updateStatus(message);
}
}
}
} catch (Exception e) {
e.printStackTrace();
isSuccess = false;
} finally {
executorService.shutdown();
}
// 如果有任何失敗,進行回滾
if (!isSuccess) {
if (orderId != null) {
orderService.rollbackOrder(orderId);
}
// 回滾庫存
try {
Inventory inventory = inventoryDao.getByProductId(productId);
if (inventory != null) {
inventory.setQuantity(inventory.getQuantity() + quantity);
inventoryDao.update(inventory);
// 更新庫存消息狀態為失敗
TransactionMessage message = new TransactionMessage();
message.setBusinessType("INVENTORY");
message.setBusinessId(productId);
message.setStatus(2);
message.setUpdateTime(LocalDateTime.now());
messageDao.updateStatus(message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}在這個服務中,我們使用了線程池來管理子線程,用 CountDownLatch 來等待子線程執行完成,用 AtomicBoolean 來存儲子線程的執行結果。首先,主線程創建訂單,并往消息表插入訂單操作的消息。然后子線程扣減庫存,并往消息表插入庫存操作的消息。主線程等待子線程執行完成后,檢查子線程是否執行成功,以及消息表中的消息狀態。如果一切正常,就更新消息狀態為成功;如果有任何失敗,就進行回滾,刪除訂單,恢復庫存,并更新消息狀態為失敗。
(五)編寫 Controller 層
@RestController
@RequestMapping("/order")
publicclass OrderController {
@Autowired
private TransactionCoordinatorService coordinatorService;
@PostMapping("/create")
public String createOrder(@RequestBody OrderParam param) {
Order order = new Order();
order.setUserId(param.getUserId());
order.setProductId(param.getProductId());
order.setQuantity(param.getQuantity());
order.setStatus(0);
coordinatorService.processOrder(order, param.getQuantity());
return"success";
}
staticclass OrderParam {
private Long userId;
private Long productId;
private Integer quantity;
// getter和setter方法
}
}(六)測試
啟動項目,發送一個 POST 請求到 “/order/create”,模擬用戶下單。我們可以故意制造一些異常,比如讓庫存不足,看看是否會回滾;或者讓子線程執行時拋出異常,看看訂單是否會被刪除。
通過這個案例,我們可以看到,用本地消息表確實可以實現多線程事務的回滾,雖然實現起來有點麻煩,但在中小型項目中還是比較實用的。
六、總結
多線程事務回滾是 Java 開發中一個比較復雜的問題,@Transactional 注解在這種場景下幾乎起不到作用。我們可以根據實際的業務場景,選擇合適的解決方案,比如本地消息表、消息隊列、TCC 模式等。
每種方案都有自己的優缺點,沒有絕對的最好,只有最適合的。在實際開發中,我們要綜合考慮系統的性能、一致性要求、開發成本等因素,選擇最適合當前項目的方案。
































