精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

SpringBoot分布式事務之可靠消息最終一致性

開發 前端
如果broker未收到消息(如果執行本地事務突然宕機了,相當執行本地事務(executeLocalTransaction)執行結果返回unknow,則和broker未收到確認消息的情況一樣處理。

環境:springboot2.3.9 + RocketMQ4.8.0

可靠消息最終一致性原理

圖片

  • 執行流程
  1. Producer發送Prepare message到broker。
  2. Prepare Message發送成功后開始執行本地事務。
  3. 如果本地事務執行成功的話則返回commit,如果執行失敗則返回rollback。(這個是在事務消息的回調方法里由開發者自己決定commit or rollback)
  4. Producer發送上一步的commit還是rollback到broker,這里有以下兩種情況:

1、如果broker收到了commit/rollback消息 :

如果收到了commit,則broker認為整個事務是沒問題的,執行成功的。那么會下發消息給Consumer端消費。

如果收到了rollback,則broker認為本地事務執行失敗了,broker將會刪除Half Message,不下發給Consumer端。

2、如果broker未收到消息(如果執行本地事務突然宕機了,相當執行本地事務(executeLocalTransaction)執行結果返回unknow,則和broker未收到確認消息的情況一樣處理。):

broker會定時回查本地事務的執行結果:如果回查結果是本地事務已經執行則返回commit,若未執行,則返回unknow。

Producer端回查的結果發送給Broker。Broker接收到的如果是commit,則broker視為整個事務執行成功,如果是rollback,則broker視為本地事務執行失敗,broker刪除Half Message,不下發給consumer。如果broker未接收到回查的結果(或者查到的是unknow),則broker會定時進行重復回查,以確保查到最終的事務結果。重復回查的時間間隔和次數都可配。

工程結構

圖片圖片

建立父子工程,兩個子項目account-manager,integral-manager。

依賴

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.2.0</version>
</dependency>

Account子模塊

  • 配置文件
server:
  port: 8081
---
rocketmq:
  nameServer: localhost:9876
  producer:
    group: pack-mq
---
spring:
  jpa:
    generateDdl: false
    hibernate:
      ddlAuto: update
    openInView: true
    show-sql: true
---
spring:
  datasource:
    driverClassName: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/account?serverTimeznotallow=GMT%2B8
    username: root
    password: ******
    type: com.zaxxer.hikari.HikariDataSource
    hikari:
      minimumIdle: 10
      maximumPoolSize: 200
      autoCommit: true
      idleTimeout: 30000
      poolName: MasterDatabookHikariCP
      maxLifetime: 1800000
      connectionTimeout: 30000
      connectionTestQuery: SELECT 1
  • 業務實體類
// 用戶表
@Entity
@Table(name = "t_account")
public class Account {
  @Id
  private Long id;
  private String name ;
}
// 業務記錄表(用來查詢去重)
@Entity
@Table(name = "t_account_log")
public class AccountLog {
  @Id
  private Long txid;
  private Date createTime ;
}
  • DAO相關類
public interface AccountRepository extends JpaRepository<Account, Long> {
}
public interface AccountLogRepository extends JpaRepository<AccountLog, Long> {
}
  • Service相關類
@Resource
private AccountRepository accountRepository ;
@Resource
private AccountLogRepository accountLogRepository ;
  
// 該方法保存業務數據,同時保存操作記錄;操作記錄用來回查。
@Transactional
public boolean register(Account account) {
  accountRepository.save(account) ;
  AccountLog accountLog = new AccountLog(account.getId(), new Date()) ;
  accountLogRepository.save(accountLog) ;
  return true ;
}
  
public AccountLog existsTxId(Long txid) {
  return accountLogRepository.findById(txid).orElse(null) ;
}
  • 發送消息方法
@Resource
private RocketMQTemplate rocketMQTemplate ;
  
public String sendTx(String topic, String tags, Account account) {
  String uuid = UUID.randomUUID().toString().replaceAll("-", "") ;
  TransactionSendResult result =rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload(account).
      setHeader("tx_id", uuid).build(), uuid) ;
  return result.getSendStatus().name() ;
}
  • 消息監聽(生產者監聽)
@RocketMQTransactionListener
public class ProducerMessageListener implements RocketMQLocalTransactionListener {
  
  @Resource
  private AccountService accountService ;


  @Override
  public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    try {
      Account account = new JsonMapper().readValue((byte[])msg.getPayload(), Account.class) ;
      accountService.register(account) ;
    } catch (Exception e) {
      e.printStackTrace() ;
      return RocketMQLocalTransactionState.ROLLBACK ;
    }
    return RocketMQLocalTransactionState.COMMIT ;
  }


  @Override
  public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
  // 這里檢查本地事務是否執行成功
    try {
      Account account = new JsonMapper().readValue((byte[])msg.getPayload(), Account.class) ;
      System.out.println("執行查詢ID為:" + account.getId() + " 的數據是否存在") ;
      AccountLog accountLog = accountService.existsTxId(account.getId()) ;
      if (accountLog == null) {
        return RocketMQLocalTransactionState.UNKNOWN ;
      }
    } catch (Exception e) {
      e.printStackTrace() ;
      return RocketMQLocalTransactionState.UNKNOWN ;
    }
    return RocketMQLocalTransactionState.COMMIT ;
  }


}
  • Controller接口
@RestController
@RequestMapping("/accounts")
public class AccountController {
  @Resource
  private ProducerMessageService messageService ;
  @PostMapping("/send")
  public Object sendMessage(@RequestBody Account account) {
    return messageService.sendTx("tx-topic", "mks", account) ;
  }
}

Integral子模塊

  • 業務實體類
@Entity
@Table(name = "t_integral")
public class Integral {
  @Id
  private Long id;
  private Integer score ;
  private Long acccountId ;
}
  • DAO相關類
public interface IntegralRepository extends JpaRepository<Integral, Long> {
}
  • Service相關類
@Resource
private IntegralRepository integralRepository ;
  
@Transactional
public Integral saveIntegral(Integral integral) {
  return integralRepository.save(integral) ;
}
  • 消息監聽
@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "mks")
@Component
public class IntegralMessageListener implements RocketMQListener<String> {


  @Resource
  private IntegralService integralService ;
  
  @SuppressWarnings("unchecked")
  @Override
  public void onMessage(String message) {
    System.out.println("Integral接收到消息:" + message) ;
    try {
      Map<String, Object> jsonMap = new JsonMapper().readValue(message, Map.class) ;
      Integer id = (Integer) jsonMap.get("id") ;
      integralService.saveIntegral(new Integral(1L, 1000, id + 0L)) ;
    } catch (Exception e) {
      throw new RuntimeException(e) ;
    }
  }


}

測試

分別啟動兩個子模塊

  • 初始數據表

圖片圖片


  • Postman測試

圖片圖片

Account模塊

圖片圖片

Integral模塊

圖片圖片


當子模塊Account執行本地事務發生錯誤時,事務會回滾并且刪除消息。子模塊Integral并不會收到消息。

責任編輯:武曉燕 來源: 實戰案例錦集
相關推薦

2021-06-16 08:33:02

分布式事務ACID

2022-07-21 06:54:28

微服務系統RocketMQ

2025-08-05 04:22:00

2022-12-19 19:12:17

分布式事務

2019-10-11 23:27:19

分布式一致性算法開發

2019-09-05 08:43:34

微服務分布式一致性數據共享

2021-11-22 16:30:30

分布式一致性分布式系統

2015-10-19 10:42:37

分布式一致性應用系統

2024-01-31 09:54:51

Redis分布式

2024-06-04 10:58:30

2020-05-11 10:30:57

2PC分布式協議

2017-09-21 10:59:36

分布式系統線性一致性測試

2024-11-28 10:56:55

2021-07-28 08:39:25

分布式架構系統

2021-06-03 15:27:31

RaftSOFAJRaft

2022-06-07 12:08:10

Paxos算法

2025-06-09 08:00:37

分布式文件系統

2021-06-06 12:45:41

分布式CAPBASE

2017-09-22 12:08:01

數據庫分布式系統互聯網

2020-10-28 11:15:24

EPaxos分布式性算法
點贊
收藏

51CTO技術棧公眾號

亚洲精品一区二区妖精| 国产乱码在线| 免费人成网站在线观看欧美高清| 中文字幕亚洲一区二区三区五十路| 亚洲欧美国产日韩综合| 中中文字幕av在线| 久久欧美一区二区| 91亚洲精品久久久久久久久久久久| 国产亚洲成人精品| 视频一区中文| 精品日韩成人av| 成人在线免费播放视频| 中文字幕在线播放网址| 久久久亚洲精品石原莉奈| 成人激情视频在线| 色一情一乱一伦一区二区三欧美 | 国产亚洲成av人片在线观看桃| 911福利视频| 在线天堂资源| 亚洲最色的网站| 亚洲欧洲一区二区在线观看| 天天操天天干天天舔| 久久66热re国产| 日韩av电影手机在线| 久久久国产精品黄毛片| 日韩在线视屏| 亚洲男人第一网站| 国内精品免费视频| 国产精品视频一区视频二区 | 东京干手机福利视频| 男女视频一区二区| 国产99在线|中文| 国产精品6666| 国产精品黄色| 久久亚洲精品小早川怜子66| b站大片免费直播| 九九热播视频在线精品6| 91精品国产91久久久久久一区二区| 无码少妇一区二区三区芒果| 高清在线视频不卡| 亚洲亚洲精品在线观看| 亚洲自拍偷拍一区二区三区| 在线视频1区2区| 国产午夜精品美女毛片视频| 蜜桃免费一区二区三区| 四虎在线视频免费观看| 国产91精品露脸国语对白| 亚洲自拍小视频| 国产强伦人妻毛片| 国精产品一区一区三区mba桃花| 国产精品专区一| 做爰视频毛片视频| 美女爽到高潮91| 国产精品人成电影| 亚洲天天综合网| 精品一二三四在线| 成人亲热视频网站| 国产又粗又长又黄| 国产一区二区三区黄视频 | 亚洲精品字幕| 97在线视频免费播放| 国产精品白浆一区二小说| 影音先锋亚洲精品| 97视频在线观看免费高清完整版在线观看 | 天天色综合社区| 91国拍精品国产粉嫩亚洲一区| 色国产精品一区在线观看| 国产精品人人妻人人爽人人牛| 欧美日韩精品免费观看视完整| 色一区在线观看| 爱情岛论坛亚洲首页入口章节| 国产黄色精品| 日韩一区二区三区在线视频| 久久精品无码专区| 五月国产精品| 在线播放国产一区中文字幕剧情欧美 | 妖精视频一区二区三区| 国产亚洲成精品久久| 成人欧美一区二区三区黑人一| 久久久久蜜桃| 久久久久久高潮国产精品视| 久久精品国产成人av| 青青草国产精品亚洲专区无| 亚洲自拍偷拍一区| 日韩三级电影网| 日韩一区欧美一区| 岛国大片在线播放| 精品成人av| 欧美一级日韩一级| 亚洲天堂网一区二区| 波多野结衣在线观看一区二区| 操人视频在线观看欧美| 日韩精品成人在线| 老司机午夜精品| 国产一级特黄a大片99| h网站在线免费观看| 亚洲精品视频免费看| 男女av免费观看| 日韩精品免费视频一区二区三区| 亚洲精品综合精品自拍| 亚洲二区在线播放| 久久九九国产| 成人国产1314www色视频| 高清福利在线观看| 亚洲国产精品久久一线不卡| 欧美一级裸体视频| 极品国产人妖chinesets亚洲人妖| 在线播放国产精品| 免费看日韩毛片| 国产精品综合一区二区三区| 日韩欧美一区二区三区四区| 日韩特级毛片| 欧美高清一级片在线| www黄色在线观看| 国产精品日韩精品欧美精品| 国产精品视频成人| 天堂中文字幕av| 自拍偷拍亚洲激情| 精品www久久久久奶水| 日韩精品免费视频一区二区三区 | 亚洲 欧美 激情 小说 另类| 中文字幕在线免费不卡| 草草久久久无码国产专区| 欧美午夜网站| 日韩性xxxx爱| 亚洲婷婷久久综合| 99久久99久久免费精品蜜臀| 黄色影视在线观看| 免费视频成人| 亚洲人成网站777色婷婷| 精品一区二区三区四| 精品亚洲免费视频| 亚洲人成影视在线观看| www.com.cn成人| 日韩av最新在线| 欧美亚洲天堂网| 国产91色综合久久免费分享| 在线观看免费黄色片| 国产原创一区| 神马久久桃色视频| 在线观看xxxx| 亚洲国产高清在线观看视频| 日本精品久久久久中文字幕| 色狼人综合干| 青草成人免费视频| 天堂a√中文在线| 亚洲电影一区二区| 国产视频精品视频| 影音先锋在线一区| 精品久久中出| 在线观看特色大片免费视频| 精品呦交小u女在线| 成人午夜视频在线播放| 91麻豆国产在线观看| 日本www在线播放| 日韩伦理一区二区三区| 国产成人精品最新| 成年人视频免费在线观看| 欧美性受xxxx| 影音先锋男人资源在线观看| 国产在线精品一区二区夜色| 白白操在线视频| 成人在线视频中文字幕| 欧美在线视频在线播放完整版免费观看 | 蜜桃视频在线观看免费视频| 亚洲精品美女视频| 亚洲精品毛片一区二区三区| 成人欧美一区二区三区黑人麻豆| 免费人成视频在线播放| 亚洲免费黄色| 日韩国产伦理| 国产在线视频你懂的| 亚洲xxxx视频| 亚洲黄色片视频| 亚洲人亚洲人成电影网站色| 在线观看的毛片| 色喇叭免费久久综合| 国产精品视频在线播放| 黄网站免费在线观看| 欧美一级淫片007| 国产性70yerg老太| 久久综合九色综合欧美就去吻| 激情综合网俺也去| 天天综合网网欲色| 国产精品污www一区二区三区| 高清在线视频不卡| 一区二区在线视频播放| 99久久久久成人国产免费| 亚洲国产精品一区二区久久恐怖片| 特级西西人体wwwww| 麻豆精品一区二区三区| 毛片av在线播放| 伊人春色精品| 亚洲一区美女视频在线观看免费| 麻豆免费在线| 日韩一区二区三区国产| 开心激情综合网| 在线观看视频一区| 国产小视频在线观看免费| 欧美国产综合一区二区| 稀缺呦国内精品呦| 蜜臀国产一区二区三区在线播放 | 精品亚洲国产视频| 亚洲精品国产精品国自产网站按摩| 亚洲精品ww久久久久久p站| 四虎永久免费影院| 国产成人在线视频网址| 欧美在线观看视频网站| 99精品国产99久久久久久福利| 亚洲精品一区二区毛豆| 另类春色校园亚洲| 亚洲影院高清在线| 国产精品久久乐| 91精品国产高清自在线| 在线观看电影av| 中文字幕在线日韩| 欧美日韩国产综合视频| 亚洲第一黄色网| www精品国产| 91.麻豆视频| 少妇无套内谢久久久久| 欧美日韩一区二区三区| 久久久久99精品成人片毛片| 中文字幕一区二区在线播放| 午夜时刻免费入口| 91麻豆产精品久久久久久| 精品无码av一区二区三区不卡| 久久精品国产**网站演员| 国产成人无码一二三区视频| 亚洲国产裸拍裸体视频在线观看乱了中文| ijzzijzzij亚洲大全| 久久中文字幕二区| 欧美一区二区在线| 少妇久久久久| 久久伦理网站| 日韩有码一区| 久久av二区| 欧美日韩导航| 久久国产手机看片| 激情av综合| 久久精品国产精品青草色艺| 国内精品国产成人国产三级粉色 | 久久久久无码精品| 韩国三级电影一区二区| 欧美美女一级片| 蜜臀av性久久久久蜜臀aⅴ流畅| 国产精品无码一本二本三本色| 亚洲日韩视频| 日韩av综合在线观看| 日韩午夜免费| 92看片淫黄大片一级| 久久一二三区| 韩国视频一区二区三区| 免费成人你懂的| 中文字幕22页| 国产一区在线视频| 国产性猛交96| 99久久精品国产精品久久| 人妻丰满熟妇av无码久久洗澡 | 国产午夜小视频| 黑人欧美xxxx| 免费无码国产精品| 欧美日韩另类国产亚洲欧美一级| 91精东传媒理伦片在线观看| 7777精品伊人久久久大香线蕉最新版| 国产成人av免费看| 亚洲国产精品久久91精品| 深夜福利视频在线观看| 亚洲人成电影网站| 求av网址在线观看| 欧美激情精品久久久| 亚洲美女炮图| 国产主播在线一区| 成人三级av在线| 日本婷婷久久久久久久久一区二区| 欧美日韩伦理在线免费| 欧美少妇一区二区三区| 日韩天堂av| 亚洲久久中文字幕| 风间由美性色一区二区三区| 亚洲av无码成人精品国产| 国产精品乱码一区二区三区软件 | 国产高清一区二区三区| 夜色77av精品影院| 亚洲第一页在线视频| 日韩午夜av| 亚洲一区二区三区观看| 99精品欧美一区| 蜜桃av免费在线观看| 亚洲一区二区三区中文字幕 | 久久se精品一区精品二区| 国产精久久久久| 国产农村妇女毛片精品久久麻豆| jizz亚洲少妇| 色综合久久久久综合99| 亚洲第一页在线观看| 一区二区三欧美| 91资源在线观看| 国产一区二区香蕉| 欧美影院天天5g天天爽| 一区二区三区三区在线| 日韩亚洲国产精品| 善良的小姨在线| 国产日韩精品一区二区浪潮av| 国产盗摄x88av| 欧美美女直播网站| 四虎影视2018在线播放alocalhost| 久久精品视频亚洲| 国产成人精品亚洲日本在线观看| 99热国产免费| 国产高清欧美| 午夜久久久精品| 久久影院视频免费| 中文字幕一区二区三区手机版| 欧美一区二区三区四区五区| 黄色国产在线| 8x海外华人永久免费日韩内陆视频| 国产激情综合| 一区二区日本伦理| 久久久蜜桃一区二区人| 天堂www中文在线资源| 亚洲老司机在线| 91在线视频国产| 中文字幕日韩在线视频| 日韩在线观看不卡| 鲁丝一区鲁丝二区鲁丝三区| 亚洲国产精品第一区二区三区| 性一交一黄一片| 一区二区在线看| av中文字幕播放| 精品国产区一区二区三区在线观看| 欧美日韩精品一区二区三区视频| 蜜桃传媒视频第一区入口在线看| 亚洲美女91| 亚洲视频在线播放免费| 亚洲高清免费观看高清完整版在线观看| 国产伦精品一区二区三区免.费| 中文字幕精品在线| 美女色狠狠久久| 亚洲精品视频一二三| 日韩专区一卡二卡| 夫妇交换中文字幕| 欧美艳星brazzers| 婷婷在线视频观看| 成人欧美在线观看| 91精品一区二区三区综合| 免费精品99久久国产综合精品应用| 中文字幕精品在线不卡| 自拍偷拍精品视频| 日韩有码在线视频| 99视频这里有精品| 日本国产中文字幕| 成人精品视频网站| 一级免费在线观看| 国产丝袜一区二区三区免费视频| 国产精品av一区二区三区| 日本一区不卡| 久草这里只有精品视频| av激情在线观看| 亚洲成人a**站| 丝袜美腿诱惑一区二区三区| 丝袜美腿玉足3d专区一区| 乱一区二区av| 国精品无码一区二区三区| 精品福利一区二区三区| 2022成人影院| 中文字幕乱码一区二区三区| 国产成人av一区| 免费av网站在线| 日韩有码在线观看| 国产精品网在线观看| 91av在线免费播放| 亚洲欧美日韩一区| 欧美熟妇乱码在线一区| 日韩av免费一区| 久久久久国产| 超碰97在线资源站| 欧美日韩高清一区二区| 蜜乳av一区| 日韩欧美一区二区在线观看 | 91制片厂在线| 欧美变态tickling挠脚心| 这里有精品可以观看| 一区二区三区不卡在线| 成人午夜激情在线| 无码人妻丰满熟妇精品| 操日韩av在线电影| 亚洲成a人片77777在线播放| 免费成年人高清视频| 午夜精品福利一区二区三区av| 国产69久久| 精品国产乱码久久久久久108| 美洲天堂一区二卡三卡四卡视频| 久久久久久久蜜桃| 中文字幕在线看视频国产欧美在线看完整| 亚洲精品一区二区三区中文字幕| 成人性做爰aaa片免费看不忠| 亚洲国产中文字幕在线视频综合| 成人高清免费观看mv| 国产精选在线观看91|