精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

使用Kafka和MongoDB進(jìn)行Go異步處理

開(kāi)發(fā) 后端 其他數(shù)據(jù)庫(kù) Kafka MongoDB
在我前面的博客文章 “我的第一個(gè) Go 微服務(wù):使用 MongoDB 和 Docker 多階段構(gòu)建” 中,我創(chuàng)建了一個(gè) Go 微服務(wù)示例,它發(fā)布一個(gè) REST 式的 http 端點(diǎn),并將從 HTTP POST 中接收到的數(shù)據(jù)保存到 MongoDB 數(shù)據(jù)庫(kù)。

[[240575]]

在我前面的博客文章 “我的***個(gè) Go 微服務(wù):使用 MongoDB 和 Docker 多階段構(gòu)建” 中,我創(chuàng)建了一個(gè) Go 微服務(wù)示例,它發(fā)布一個(gè) REST 式的 http 端點(diǎn),并將從 HTTP POST 中接收到的數(shù)據(jù)保存到 MongoDB 數(shù)據(jù)庫(kù)。

在這個(gè)示例中,我將數(shù)據(jù)的保存和 MongoDB 分離,并創(chuàng)建另一個(gè)微服務(wù)去處理它。我還添加了 Kafka 為消息層服務(wù),這樣微服務(wù)就可以異步處理它自己關(guān)心的東西了。

如果你有時(shí)間去看,我將這個(gè)博客文章的整個(gè)過(guò)程錄制到 這個(gè)視頻中了 :)

下面是這個(gè)使用了兩個(gè)微服務(wù)的簡(jiǎn)單的異步處理示例的上層架構(gòu)圖。

rest-kafka-mongo-microservice-draw-io

rest-kafka-mongo-microservice-draw-io

微服務(wù) 1 —— 是一個(gè) REST 式微服務(wù),它從一個(gè) /POST http 調(diào)用中接收數(shù)據(jù)。接收到請(qǐng)求之后,它從 http 請(qǐng)求中檢索數(shù)據(jù),并將它保存到 Kafka。保存之后,它通過(guò) /POST 發(fā)送相同的數(shù)據(jù)去響應(yīng)調(diào)用者。

微服務(wù) 2 —— 是一個(gè)訂閱了 Kafka 中的一個(gè)主題的微服務(wù),微服務(wù) 1 的數(shù)據(jù)保存在該主題。一旦消息被微服務(wù)消費(fèi)之后,它接著保存數(shù)據(jù)到 MongoDB 中。

在你繼續(xù)之前,我們需要能夠去運(yùn)行這些微服務(wù)的幾件東西:

  1. 下載 Kafka —— 我使用的版本是 kafka_2.11-1.1.0
  2. 安裝 librdkafka —— 不幸的是,這個(gè)庫(kù)應(yīng)該在目標(biāo)系統(tǒng)中
  3. 安裝 Kafka Go 客戶(hù)端
  4. 運(yùn)行 MongoDB。你可以去看我的 以前的文章 中關(guān)于這一塊的內(nèi)容,那篇文章中我使用了一個(gè) MongoDB docker 鏡像。

我們開(kāi)始吧!

首先,啟動(dòng) Kafka,在你運(yùn)行 Kafka 服務(wù)器之前,你需要運(yùn)行 Zookeeper。下面是示例:

  1. $ cd /<download path>/kafka_2.11-1.1.0
  2. $ bin/zookeeper-server-start.sh config/zookeeper.properties

接著運(yùn)行 Kafka —— 我使用 9092 端口連接到 Kafka。如果你需要改變端口,只需要在 config/server.properties 中配置即可。如果你像我一樣是個(gè)新手,我建議你現(xiàn)在還是使用默認(rèn)端口。

  1. $ bin/kafka-server-start.sh config/server.properties

Kafka 跑起來(lái)之后,我們需要 MongoDB。它很簡(jiǎn)單,只需要使用這個(gè) docker-compose.yml 即可。

  1. version: '3'
  2. services:
  3. mongodb:
  4. image: mongo
  5. ports:
  6. - "27017:27017"
  7. volumes:
  8. - "mongodata:/data/db"
  9. networks:
  10. - network1
  11.  
  12. volumes:
  13. mongodata:
  14.  
  15. networks:
  16. network1:

使用 Docker Compose 去運(yùn)行 MongoDB docker 容器。

  1. docker-compose up

這里是微服務(wù) 1 的相關(guān)代碼。我只是修改了我前面的示例去保存到 Kafka 而不是 MongoDB:

rest-to-kafka/rest-kafka-sample.go

  1. func jobsPostHandler(w http.ResponseWriter, r *http.Request) {
  2.  
  3. //Retrieve body from http request
  4. b, err := ioutil.ReadAll(r.Body)
  5. defer r.Body.Close()
  6. if err != nil {
  7. panic(err)
  8. }
  9.  
  10. //Save data into Job struct
  11. var _job Job
  12. err = json.Unmarshal(b, &_job)
  13. if err != nil {
  14. http.Error(w, err.Error(), 500)
  15. return
  16. }
  17.  
  18. saveJobToKafka(_job)
  19.  
  20. //Convert job struct into json
  21. jsonString, err := json.Marshal(_job)
  22. if err != nil {
  23. http.Error(w, err.Error(), 500)
  24. return
  25. }
  26.  
  27. //Set content-type http header
  28. w.Header().Set("content-type", "application/json")
  29.  
  30. //Send back data as response
  31. w.Write(jsonString)
  32.  
  33. }
  34.  
  35. func saveJobToKafka(job Job) {
  36.  
  37. fmt.Println("save to kafka")
  38.  
  39. jsonString, err := json.Marshal(job)
  40.  
  41. jobString := string(jsonString)
  42. fmt.Print(jobString)
  43.  
  44. p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
  45. if err != nil {
  46. panic(err)
  47. }
  48.  
  49. // Produce messages to topic (asynchronously)
  50. topic := "jobs-topic1"
  51. for _, word := range []string{string(jobString)} {
  52. p.Produce(&kafka.Message{
  53. TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
  54. Value: []byte(word),
  55. }, nil)
  56. }
  57. }

這里是微服務(wù) 2 的代碼。在這個(gè)代碼中最重要的東西是從 Kafka 中消費(fèi)數(shù)據(jù),保存部分我已經(jīng)在前面的博客文章中討論過(guò)了。這里代碼的重點(diǎn)部分是從 Kafka 中消費(fèi)數(shù)據(jù):

kafka-to-mongo/kafka-mongo-sample.go

  1. func main() {
  2.  
  3. //Create MongoDB session
  4. session := initialiseMongo()
  5. mongoStore.session = session
  6.  
  7. receiveFromKafka()
  8.  
  9. }
  10.  
  11. func receiveFromKafka() {
  12.  
  13. fmt.Println("Start receiving from Kafka")
  14. c, err := kafka.NewConsumer(&kafka.ConfigMap{
  15. "bootstrap.servers": "localhost:9092",
  16. "group.id": "group-id-1",
  17. "auto.offset.reset": "earliest",
  18. })
  19.  
  20. if err != nil {
  21. panic(err)
  22. }
  23.  
  24. c.SubscribeTopics([]string{"jobs-topic1"}, nil)
  25.  
  26. for {
  27. msg, err := c.ReadMessage(-1)
  28.  
  29. if err == nil {
  30. fmt.Printf("Received from Kafka %s: %s\n", msg.TopicPartition, string(msg.Value))
  31. job := string(msg.Value)
  32. saveJobToMongo(job)
  33. } else {
  34. fmt.Printf("Consumer error: %v (%v)\n", err, msg)
  35. break
  36. }
  37. }
  38.  
  39. c.Close()
  40.  
  41. }
  42.  
  43. func saveJobToMongo(jobString string) {
  44.  
  45. fmt.Println("Save to MongoDB")
  46. col := mongoStore.session.DB(database).C(collection)
  47.  
  48. //Save data into Job struct
  49. var _job Job
  50. b := []byte(jobString)
  51. err := json.Unmarshal(b, &_job)
  52. if err != nil {
  53. panic(err)
  54. }
  55.  
  56. //Insert job into MongoDB
  57. errMongo := col.Insert(_job)
  58. if errMongo != nil {
  59. panic(errMongo)
  60. }
  61.  
  62. fmt.Printf("Saved to MongoDB : %s", jobString)
  63.  
  64. }

我們來(lái)演示一下,運(yùn)行微服務(wù) 1。確保 Kafka 已經(jīng)運(yùn)行了。

  1. $ go run rest-kafka-sample.go

我使用 Postman 向微服務(wù) 1 發(fā)送數(shù)據(jù)。

Screenshot-2018-04-29-22.20.33

Screenshot-2018-04-29-22.20.33

這里是日志,你可以在微服務(wù) 1 中看到。當(dāng)你看到這些的時(shí)候,說(shuō)明已經(jīng)接收到了來(lái)自 Postman 發(fā)送的數(shù)據(jù),并且已經(jīng)保存到了 Kafka。

Screenshot-2018-04-29-22.22.00

Screenshot-2018-04-29-22.22.00

因?yàn)槲覀兩形催\(yùn)行微服務(wù) 2,數(shù)據(jù)被微服務(wù) 1 只保存在了 Kafka。我們來(lái)消費(fèi)它并通過(guò)運(yùn)行的微服務(wù) 2 來(lái)將它保存到 MongoDB。

  1. $ go run kafka-mongo-sample.go

現(xiàn)在,你將在微服務(wù) 2 上看到消費(fèi)的數(shù)據(jù),并將它保存到了 MongoDB。

Screenshot-2018-04-29-22.24.15

Screenshot-2018-04-29-22.24.15

檢查一下數(shù)據(jù)是否保存到了 MongoDB。如果有數(shù)據(jù),我們成功了!

Screenshot-2018-04-29-22.26.39

Screenshot-2018-04-29-22.26.39

完整的源代碼可以在這里找到:

https://github.com/donvito/learngo/tree/master/rest-kafka-mongo-microservice 

責(zé)任編輯:龐桂玉 來(lái)源: Linux中國(guó)
相關(guān)推薦

2023-10-11 14:37:21

工具開(kāi)發(fā)

2015-12-11 13:39:56

GoiOSAndroid

2021-06-15 15:03:21

MongoDBNode.jsCRUD

2023-11-08 15:04:55

事務(wù)GORM

2023-10-30 23:25:48

FuturesGo語(yǔ)言

2015-06-16 11:06:42

JavaCompletable

2024-02-07 11:44:20

NestJSRxJS異步編程

2021-04-26 05:33:54

Python異步編程

2023-09-27 15:34:48

數(shù)據(jù)編程

2023-11-06 08:01:09

Go同步異步

2023-10-28 16:22:21

Go接口

2021-11-29 22:59:34

Go Dockertest集成

2023-06-15 13:01:07

JavaPythonJavaScript

2012-04-19 10:04:20

ibmdw

2018-09-11 09:41:19

2019-07-02 14:05:23

Go語(yǔ)言高并發(fā)

2022-05-05 08:13:16

Go數(shù)組類(lèi)型

2022-08-12 08:38:52

FFmpegLinux命令

2024-01-15 06:05:05

DockerGol ang應(yīng)用程序

2009-02-27 17:15:05

XMLDOMXPath
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

99色这里只有精品| 国产免费一区二区| 男人晚上看的视频| 一区二区日韩| 狠狠躁夜夜躁人人躁婷婷91| 欧美日本韩国在线| 国产精品久久久久久在线| 欧美一区网站| 精品中文视频在线| 日日干日日操日日射| 国产激情视频在线看| 日本一区二区不卡视频| 99蜜桃在线观看免费视频网站| 99久热在线精品996热是什么| 日韩欧美高清| 亚洲精品www久久久| 成人黄色一级大片| 深夜福利视频一区二区| 亚洲精品国产精华液| 日本一区二区久久精品| 动漫av一区二区三区| 日本sm残虐另类| 97精品国产aⅴ7777| 人人艹在线视频| 国产欧美日韩| 亚洲国产日韩欧美在线99| 91丨九色丨蝌蚪| 88xx成人永久免费观看| 亚洲国产日日夜夜| 中文字幕第50页| 成年人视频在线免费观看| 99re8在线精品视频免费播放| 成人av电影天堂| 亚洲高清视频免费观看| 国产精品老牛| 久久久久国产视频| 日本妇女毛茸茸| 日韩在线理论| 国产一区二区三区在线视频 | 欧美va在线播放| 九九热在线免费| 欧美大胆性生话| 婷婷六月综合网| 欧美深夜福利视频| 白白色在线观看| 亚洲精品国产无天堂网2021| 亚洲a∨一区二区三区| 神马久久高清| 91香蕉视频在线| 国语精品中文字幕| 日本美女一级视频| 成人一区在线观看| 成人精品一二区| 精品人妻一区二区三区三区四区| 看片网站欧美日韩| 91精品国产综合久久香蕉的用户体验| 国模私拍一区二区| 丝袜脚交一区二区| 国产成人中文字幕| 中国黄色一级视频| 久久福利资源站| 成人国产精品色哟哟| 91黄色在线视频| 精品中文av资源站在线观看| 亚洲aⅴ日韩av电影在线观看 | 麻豆成人小视频| 亚洲色大成网站www| 91麻豆福利精品推荐| 欧美亚洲另类在线一区二区三区| 欧美色视频免费| 久久久久久久久久久黄色| 日韩欧美第二区在线观看| 国产系列在线观看| 国产精品毛片a∨一区二区三区| 亚洲国产精品一区二区第四页av| 永久免费av在线| 亚洲免费观看在线视频| 一区二区三区一级片| 中文字幕在线播放网址| 亚洲v中文字幕| 久久精品香蕉视频| 亚洲精品第一| 日韩欧美自拍偷拍| 成人免费无码大片a毛片| 亚洲日本三级| 久久夜色精品亚洲噜噜国产mv| 青青草手机在线观看| 尤物精品在线| 国产精品夜间视频香蕉| www.激情五月.com| 97久久精品人人做人人爽| 亚洲色图自拍| av资源中文在线| 欧美日韩激情视频| 五月婷婷丁香色| 福利在线一区| 最新亚洲国产精品| 日本特黄特色aaa大片免费| 蜜桃av综合| 91久久国产自产拍夜夜嗨| 婷婷开心激情网| 国产精品免费看片| 国产一线二线三线女| 欧美精品总汇| 精品国产乱码久久| av在线播放中文字幕| 激情婷婷欧美| 国产精品一区二区三| 搡老岳熟女国产熟妇| 国产精品福利电影一区二区三区四区| 妞干网视频在线观看| 国产91在线精品| 日韩国产精品视频| 国产精品久久久久久久精| 国产精品美女| 99视频在线免费观看| 国产www.大片在线| 激情久久av一区av二区av三区| 老司机久久精品| 国产一区二区区别| 国自在线精品视频| a级片在线视频| 亚洲国产精品99久久久久久久久| 五十路熟女丰满大屁股| 精品久久国产一区| 中文字幕日韩视频| 伊人中文字幕在线观看| youjizz国产精品| 黄色录像特级片| 成人四虎影院| 亚洲丝袜一区在线| 韩国av中文字幕| 成人av电影免费在线播放| 五月天在线免费视频| 成人精品高清在线视频| 亚洲欧美中文日韩v在线观看| 国产大片aaa| 国产99精品视频| 91嫩草国产丨精品入口麻豆| 欧美大片网站| 中文字幕亚洲专区| 免费精品一区二区| 国产亚洲欧洲一区高清在线观看| 无罩大乳的熟妇正在播放| 狠狠一区二区三区| 久久露脸国产精品| 高清毛片aaaaaaaaa片| 亚洲美女淫视频| 99视频在线观看视频| 欧美日韩精品一本二本三本| 444亚洲人体| 伊人福利在线| 精品国产第一区二区三区观看体验| 麻豆明星ai换脸视频| 狠狠色2019综合网| 51xx午夜影福利| jizzjizzjizz欧美| 久久久久成人网| 色婷婷av一区二区三| 天天色天天操综合| 魔女鞋交玉足榨精调教| 亚洲一卡久久| 日韩电影免费观看在| 成人久久网站| 久久精品人人做人人爽| 不卡视频免费在线观看| 亚洲综合视频在线| 亚洲熟女乱综合一区二区三区| 国产午夜精品一区二区三区欧美 | 中文字幕在线播放一区| 亚洲一区日本| 日本成人三级| 欧美亚洲黄色| 久久久久久综合网天天| 视频一区二区在线播放| 欧美视频日韩视频| 久久久久99精品成人片毛片| 91碰在线视频| 奇米视频7777| 99pao成人国产永久免费视频| 欧美一区1区三区3区公司| 国产在视频一区二区三区吞精| 久久亚洲春色中文字幕| 手机亚洲第一页| 欧美日韩国产首页在线观看| 看片网站在线观看| 久久综合99re88久久爱| 午夜精品中文字幕| 亚洲三级视频| 亚洲国产一区二区精品视频 | 久久精品国产第一区二区三区| 中文字幕色呦呦| 亚洲福利天堂| 亚洲伊人久久综合| 日本成人三级电影| 色与欲影视天天看综合网| 亚洲人妻一区二区| 欧美一区二区三区喷汁尤物| 成人午夜视频在线播放| 亚洲精品日韩一| 亚洲精品国产熟女久久久| 国产激情一区二区三区四区| 久久久久免费精品| 今天的高清视频免费播放成人| 午夜视频久久久| 噜噜噜狠狠夜夜躁精品仙踪林| 国产精品综合网站| 在线看片福利| 欧美精品电影在线| 色多多视频在线观看| 亚洲精品理论电影| www.色视频| 欧美绝品在线观看成人午夜影视| av大全在线观看| 一区二区免费视频| 久久国产高清视频| 日本一区二区免费在线观看视频| 大乳护士喂奶hd| 国产精品资源站在线| a在线观看免费视频| 毛片一区二区| 国产午夜大地久久| 欧美激情无毛| 蜜桃视频成人在线观看| 国产视频三区四区| 美国一区二区三区在线播放 | 日本精品600av| www.亚洲成人| 国产一级片在线播放| 亚洲精品动漫久久久久| 亚洲爱爱综合网| 欧美丰满高潮xxxx喷水动漫| 中文字幕日韩三级| 欧美性xxxxhd| 国产三级av片| 色综合天天视频在线观看 | 精精国产xxxx视频在线| 亚洲午夜性刺激影院| 男同在线观看| 亚洲天堂成人在线视频| 欧美少妇另类| 亚洲欧洲成视频免费观看| 色吊丝在线永久观看最新版本| 亚洲国产精品999| 内射后入在线观看一区| 精品国产亚洲一区二区三区在线观看 | 国产专区在线播放| 亚洲欧美国产另类| 免费动漫网站在线观看| 亚洲人永久免费| 成a人片在线观看www视频| 在线观看精品自拍私拍| 最新97超碰在线| 久久精品视频播放| 在线观看三级视频| 欧美激情精品久久久久久| 日本不卡影院| 国语自产精品视频在线看抢先版图片 | 国产性生交xxxxx免费| 日本在线播放一区二区三区| chinese少妇国语对白| 天堂成人国产精品一区| 五月婷婷六月丁香激情| 久久精品国产成人一区二区三区| aaaaaaaa毛片| 国产精品一区免费视频| 无码人妻精品一区二区三| www.亚洲在线| 久久丫精品忘忧草西安产品| 中文一区二区在线观看| 777777国产7777777| 亚洲永久免费av| 伊人手机在线视频| 欧洲视频一区二区| 国产日韩精品suv| 亚洲电影免费观看| 国产原创av在线| 久久久精品一区二区三区| 午夜小视频在线观看| 2020国产精品视频| 素人啪啪色综合| 99久热re在线精品996热视频 | 精彩视频一区二区三区| 国偷自产av一区二区三区麻豆| jlzzjlzz亚洲日本少妇| 国产又大又粗又爽的毛片| 亚洲欧美在线视频| 精品在线播放视频| 欧美日韩精品系列| 国内爆初菊对白视频| 中文字幕视频在线免费欧美日韩综合在线看 | 少妇精品久久久一区二区三区| 亚洲欧美电影在线观看| 亚洲激情影院| 五月激情五月婷婷| 91麻豆.com| 黄色一级片中国| 欧美中文字幕一二三区视频| 草草视频在线播放| 亚洲人成网站777色婷婷| 韩国成人免费视频| 国产精品视频在线观看| 成人h动漫免费观看网站| 制服诱惑一区| 六月丁香综合| 国产一级黄色录像| 亚洲三级在线免费观看| 无码免费一区二区三区| 精品国产一区二区三区不卡| 成人免费高清在线播放| 亚州欧美日韩中文视频| 日韩五码电影| 日韩videos| 国产欧美三级| 国产伦理在线观看| 国产精品福利影院| 国产免费a视频| 亚洲精品福利视频| www视频在线免费观看 | 波多野结衣一区二区三区免费视频| 日本高清久久一区二区三区| 亚洲国产美女| 下面一进一出好爽视频| 国产精品美女久久久久高潮| 男人午夜免费视频| 日韩国产精品视频| 51av在线| 国产日韩欧美二区| 欧美日韩mv| 国产精品igao网网址不卡| 中文字幕中文字幕一区| a片在线免费观看| 精品中文视频在线| 欧美久久天堂| 久久99精品久久久久久秒播放器 | 国产精品成人免费| 夜夜爽妓女8888视频免费观看| 精品亚洲一区二区三区在线播放 | 一本高清dvd不卡在线观看| 色综合免费视频| 久久人91精品久久久久久不卡| 试看120秒一区二区三区| 黄频视频在线观看| 久久aⅴ国产欧美74aaa| 成人性生交大片免费看无遮挡aⅴ| 日本丶国产丶欧美色综合| 香蕉视频黄色片| 91高清视频免费| 日韩丝袜视频| 中文字幕无码不卡免费视频| 2021中文字幕一区亚洲| 黄色在线免费观看| 国产亚洲人成网站在线观看| 欧美日韩不卡| 手机在线观看国产精品| 美女看a上一区| 永久免费看mv网站入口| 欧美一级欧美三级| 欧美巨大xxxx做受沙滩| 国产精品免费一区二区三区四区| 91久久亚洲| 白丝女仆被免费网站| 欧美性videosxxxxx| 久操视频在线观看| 丁香五月网久久综合| 999亚洲国产精| 免费在线观看a视频| 7777精品伊人久久久大香线蕉完整版| jizz性欧美| 国产一区二区不卡视频| 久久资源在线| 国产亚洲精品久久久久久豆腐| 欧美一区二区三区婷婷月色| 国内小视频在线看| 裸模一区二区三区免费| 美女视频免费一区| 亚洲色婷婷一区二区三区| 亚洲精品久久久久中文字幕二区 | a视频免费观看| 欧美在线视频你懂得| 91麻豆国产福利在线观看宅福利| 国产乱码精品一区二区三区不卡| 久久精品成人| 日韩av手机在线免费观看| 欧美大胆一级视频| 色豆豆成人网| 成人在线免费观看网址| 99久久精品久久久久久清纯| 黄色大全在线观看| 欧美大学生性色视频| 综合伊思人在钱三区| а 天堂 在线| 欧美日韩在线另类| 国产丝袜在线| 欧美日韩综合久久| 成人永久免费视频| 怡春院在线视频| 91国内在线视频| 91精品国产成人观看| 玖玖爱在线观看|