精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

用 Go 實現(xiàn)一個支持任務(wù)下發(fā)的后臺服務(wù):Gin + Machinery v2 完整實戰(zhàn)

開發(fā) 后端
本文將帶你一步一步使用Go語言,結(jié)合Gin框架與Machinery v2實現(xiàn)一個完整的 “任務(wù)下發(fā)服務(wù)”,并支持通過REST API發(fā)起任務(wù)。

在日常開發(fā)當(dāng)中,我們經(jīng)常希望通過消息通信機(jī)制來異步執(zhí)行任務(wù),例如發(fā)郵件、生成報表、風(fēng)控計算等。這種場景中,使用“任務(wù)隊列”框架來解耦主業(yè)務(wù)流程是一種最佳實踐。

本文將帶你一步一步使用Go語言,結(jié)合Gin框架與Machinery v2實現(xiàn)一個完整的 “任務(wù)下發(fā)服務(wù)”,并支持通過REST API發(fā)起任務(wù)。廢話不多說,開始今天的內(nèi)容吧,let's Go!!!

核心目標(biāo)

項目開始前我們先設(shè)定一個小目標(biāo),具體項如下所示:

  • 使用 Gin 提供一個 HTTP 接口,用于接收任務(wù)參數(shù)
  • 使用 Machinery v2 執(zhí)行后臺任務(wù)(通過 Redis 通信)
  • 使用消息隊列解耦 API 層與實際執(zhí)行邏輯

整體流程

用戶請求 -> Gin Server -> Redis Machinery -> Worker

用戶通過HTTP請求提交任務(wù)參數(shù),Gin服務(wù)將任務(wù)發(fā)送到Machinery任務(wù)隊列中,后續(xù)由Worker異步消費任務(wù)并執(zhí)行。

準(zhǔn)備工作

(1) 安裝 Redis

本地環(huán)境可以直接通過 Docker 運行:

docker run -d --name redis -p 6379:6379 redis

(2) 初始化 Go 項目

go mod init machinery-gin
go get github.com/gin-gonic/gin
go get github.com/RichardKnop/machinery/v2

(3) 創(chuàng)建對應(yīng)目錄和代碼文件

?  machinery-gin tree .
.
├── cmd
│   ├── api
│   │   └── main.go
│   └── worker
│       └── main.go
├── config
│   └── config.go
├── controller
│   └── task_controller.go
├── go.mod
├── go.sum
├── router
│   └── router.go
├── scheduler
│   └── manager.go
├── service
│   └── task_service.go
└── tasks
    ├── handler.go
    └── registry.go


10 directories, 11 files

核心模塊代碼實現(xiàn)

(1) config/config.go

package config


import "github.com/RichardKnop/machinery/v2/config"


func GetMachineryConfig() *config.Config {
	return &config.Config{
		Broker:        "redis://localhost:6379",
		DefaultQueue:  "machinery_tasks",
		ResultBackend: "redis://localhost:6379",
	}
}

(2) tasks/handler.go

package tasks


import (
	"fmt"
	"time"
)


func PrintMessage(msg string) error {
	fmt.Printf("?? Task Received: %s at %s\n", msg, time.Now().Format(time.RFC3339))
	return nil
}

(3) tasks/registry.go

package tasks


import "github.com/RichardKnop/machinery/v2"


func RegisterTasks(server *machinery.Server) error {
	return server.RegisterTasks(map[string]interface{}{
		"print_message": PrintMessage,
	})
}

(4) scheduler/manager.go

package scheduler


import (
	"sync"
	"time"


	"github.com/RichardKnop/machinery/v2"
	"github.com/RichardKnop/machinery/v2/tasks"
)


type ScheduledTask struct {
	Name     string
	Interval time.Duration
	Msg      string
	Paused   bool
	StopChan chan struct{}
}


var (
	tasksMap = make(map[string]*ScheduledTask)
	mu       sync.Mutex
)


func AddScheduledTask(server *machinery.Server, name, msg string, interval time.Duration) {
	mu.Lock()
	defer mu.Unlock()


	if _, exists := tasksMap[name]; exists {
		return
	}


	t := &ScheduledTask{
		Name:     name,
		Interval: interval,
		Msg:      msg,
		StopChan: make(chan struct{}),
	}
	tasksMap[name] = t


	go func(task *ScheduledTask) {
		ticker := time.NewTicker(task.Interval)
		defer ticker.Stop()


		for {
			select {
			case <-ticker.C:
				if !task.Paused {
					signature := &tasks.Signature{
						Name: "print_message",
						Args: []tasks.Arg{
							{Type: "string", Value: task.Msg},
						},
					}
					server.SendTask(signature)
				}
			case <-task.StopChan:
				return
			}
		}
	}(t)
}


func PauseTask(name string) {
	mu.Lock()
	defer mu.Unlock()
	if task, ok := tasksMap[name]; ok {
		task.Paused = true
	}
}


func ResumeTask(name string) {
	mu.Lock()
	defer mu.Unlock()
	if task, ok := tasksMap[name]; ok {
		task.Paused = false
	}
}


func StopTask(name string) {
	mu.Lock()
	defer mu.Unlock()
	if task, ok := tasksMap[name]; ok {
		close(task.StopChan)
		delete(tasksMap, name)
	}
}

(5) service/task_service.go

package service


import (
	"time"


	"github.com/RichardKnop/machinery/v2"
	"machinery-gin/scheduler"
)


func ScheduleNewTask(server *machinery.Server, name, msg string, intervalSec int) {
	scheduler.AddScheduledTask(server, name, msg, time.Duration(intervalSec)*time.Second)
}


func PauseScheduledTask(name string) {
	scheduler.PauseTask(name)
}


func ResumeScheduledTask(name string) {
	scheduler.ResumeTask(name)
}


func StopScheduledTask(name string) {
	scheduler.StopTask(name)
}

(6) controller/task_controller.go

package controller


import (
	"net/http"


	"github.com/RichardKnop/machinery/v2"
	"github.com/gin-gonic/gin"
	"machinery-gin/service"
)


func TaskHandler(server *machinery.Server) gin.HandlerFunc {
	return func(c *gin.Context) {
		var req struct {
			Name     string `json:"name"`
			Interval int    `json:"interval"`
			Msg      string `json:"msg"`
		}


		if err := c.ShouldBindJSON(&req); err != nil {
			c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
			return
		}


		service.ScheduleNewTask(server, req.Name, req.Msg, req.Interval)
		c.JSON(http.StatusOK, gin.H{"status": "task scheduled"})
	}
}


func PauseHandler() gin.HandlerFunc {
	return func(c *gin.Context) {
		name := c.Query("name")
		service.PauseScheduledTask(name)
		c.JSON(http.StatusOK, gin.H{"status": "paused"})
	}
}


func ResumeHandler() gin.HandlerFunc {
	return func(c *gin.Context) {
		name := c.Query("name")
		service.ResumeScheduledTask(name)
		c.JSON(http.StatusOK, gin.H{"status": "resumed"})
	}
}


func StopHandler() gin.HandlerFunc {
	return func(c *gin.Context) {
		name := c.Query("name")
		service.StopScheduledTask(name)
		c.JSON(http.StatusOK, gin.H{"status": "stopped"})
	}
}

(7) router/router.go

package routes


import (
	"github.com/RichardKnop/machinery/v2"
	"github.com/gin-gonic/gin"
	"machinery-gin/controller"
)


func SetupRouter(server *machinery.Server) *gin.Engine {
	r := gin.Default()


	r.POST("/task/start", controller.TaskHandler(server))
	r.POST("/task/pause", controller.PauseHandler())
	r.POST("/task/resume", controller.ResumeHandler())
	r.POST("/task/stop", controller.StopHandler())


	return r
}

(8) cmd/api/main.go (啟動Gin API服務(wù))

package main


import (
	server "github.com/RichardKnop/machinery/v2"
	redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
	redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis"
	eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
	"machinery-gin/config"
	"machinery-gin/router"
	"machinery-gin/tasks"
)


func main() {
	cfg := config.GetMachineryConfig()


	broker := redisbroker.New(cfg, "localhost:6379", "", "", 0)
	backend := redisbackend.New(cfg, "localhost:6379", "", "", 0)
	lock := eagerlock.New()
	machineryServer := server.NewServer(cfg, broker, backend, lock)


	_ = tasks.RegisterTasks(machineryServer)
	r := routes.SetupRouter(machineryServer)
	r.Run(":9311")
}

(9) cmd/worker/main.go (啟動Worker消費者)

package main


import (
	server "github.com/RichardKnop/machinery/v2"
	redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
	redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis"
	eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
	"machinery-gin/config"
	"machinery-gin/tasks"
)


func main() {
	cfg := config.GetMachineryConfig()
	broker := redisbroker.New(cfg, "localhost:6379", "", "", 0)
	backend := redisbackend.New(cfg, "localhost:6379", "", "", 0)
	lock := eagerlock.New()
	machineryServer := server.NewServer(cfg, broker, backend, lock)
	_ = tasks.RegisterTasks(machineryServer)


	worker := machineryServer.NewWorker("worker_name", 10)
	_ = worker.Launch()
}

測試程序

啟動 API 服務(wù)和 Worker:

go run cmd/api/main.go
go run cmd/worker/main.go

測試命令如下所示:

?  ~ curl -X POST http://localhost:9311/task/start -H 'Content-Type: application/json' -d '{
  "name": "hello-task",
  "interval": 5,
  "msg": "Hello from Machinery"
}'
{"status":"task scheduled"}%                                                       
 ?  ~ curl -X POST http://localhost:9311/task/pause\?name\=hello-task
{"status":"paused"}%
?  ~ curl -X POST http://localhost:9311/task/resume\?name\=hello-task
{"status":"resumed"}%                                                                 
?  ~ curl -X POST http://localhost:9311/task/stop\?name\=hello-task
{"status":"stopped"}%

測試結(jié)果如下所示:

總結(jié)

我們已經(jīng)實現(xiàn)了任務(wù)的“下發(fā)與執(zhí)行”,“暫停/恢復(fù)”后續(xù)可以進(jìn)一步擴(kuò)展:

  • 支持任務(wù)列表,任務(wù)詳情
  • 支持“周期定時任務(wù)(調(diào)度器)”
  • 支持任務(wù)執(zhí)行狀態(tài)查詢/UI管理面板
責(zé)任編輯:趙寧寧 來源: 馬嘍編程筆記
相關(guān)推薦

2025-09-15 08:49:44

GoJSONAPI

2024-01-08 08:36:29

HTTPGo代理服務(wù)器

2022-05-22 13:55:30

Go 語言

2025-05-20 09:39:57

GogRPC微服務(wù)

2024-01-02 13:58:04

GoREST API語言

2024-05-10 08:47:22

標(biāo)準(zhǔn)庫v2Go

2024-03-15 15:20:10

并發(fā)服務(wù)IP

2025-03-06 08:54:24

泛型類型MapGo1

2010-08-06 14:07:21

RIP V2

2010-08-05 17:00:04

RIP V2協(xié)議

2014-04-14 15:54:00

print()Web服務(wù)器

2022-03-06 19:57:50

狀態(tài)機(jī)easyfsm項目

2023-05-10 08:05:41

GoWeb應(yīng)用

2012-04-24 18:10:56

華為E5

2017-05-08 15:00:20

H5代碼服務(wù)器

2021-09-27 09:55:06

Chrome瀏覽器Manifest V2

2020-07-03 10:21:48

Go框架Docker

2021-08-23 15:14:09

Linuxat命令任務(wù)

2023-02-26 01:37:57

goORM代碼

2023-03-01 09:39:40

調(diào)度系統(tǒng)
點贊
收藏

51CTO技術(shù)棧公眾號

久久视频在线直播| 久久亚洲国产精品尤物| 日本午夜精品久久久| 国产三级欧美三级日产三级99| 久久成人精品电影| 亚洲色偷偷色噜噜狠狠99网| 日本黄色一级视频| 蜜桃精品视频| 国产精品人妖ts系列视频| 久久久久久久久久国产精品| 性生活免费在线观看| 亚洲av电影一区| 欧美三级视频| 日韩一区二区电影网| 天堂资源在线亚洲资源| 亚洲午夜18毛片在线看| 澳门成人av| 樱桃国产成人精品视频| 国产在线高清精品| av在线播放中文字幕| 美女日韩欧美| 91丨国产丨九色丨pron| 国内精品一区二区三区四区| 极品人妻一区二区| 在线免费av导航| 亚洲女同另类| 7777精品伊人久久久大香线蕉最新版| 深田咏美在线x99av| av手机免费看| 欧美日本不卡| 在线播放国产一区二区三区| 青青青国产在线视频| 国产精品秘入口| 人人狠狠综合久久亚洲| 在线观看不卡av| 挪威xxxx性hd极品| 精品一区二区三区中文字幕 | 精品亚洲乱码一区二区| 日韩欧美一区二区三区在线观看| 91色综合久久久久婷婷| 91中文字精品一区二区| 国产一级视频在线播放| 欧美亚洲国产日韩| 欧美大片在线观看一区二区| 日韩成人三级视频| 欧美熟妇另类久久久久久不卡| 亚洲国产一区二区精品专区| 日韩国产精品视频| 欧美成人免费高清视频| 99se视频在线观看| 国产成人一级电影| 欧美中文在线视频| 你懂得在线观看| 国产劲爆久久| 欧美在线一区二区| 久久免费一级片| 西西人体44www大胆无码| 懂色av一区二区三区免费看| 日韩**中文字幕毛片| 国产精品久久久久久成人| 欧美猛男男男激情videos| 欧美日韩日日夜夜| 久艹在线免费观看| 免费在线国产| 国产精品伊人色| 欧美一区三区三区高中清蜜桃| 欧美a级片免费看| 久久国产电影| 亚洲第一视频网| 亚洲一区二区三区四区五区xx| 手机电影在线观看| 久久久精品国产免费观看同学| 91九色在线视频| 中文字幕第四页| 久久精品毛片| 欧美华人在线视频| 国产午夜精品久久久久久久久| 狠狠综合久久av一区二区蜜桃 | 国产精品毛片无遮挡高清| 亚洲欧美日本国产有色 | 亚洲福利小视频| 好吊一区二区三区视频| 日韩一区中文| 欧美日韩中文字幕综合视频| 米仓穗香在线观看| 国产区美女在线| 亚洲欧美日韩中文字幕一区二区三区| 久久这里精品国产99丫e6| 国产美女www爽爽爽视频| 日韩经典一区二区| 97精品视频在线观看| 欧美一级特黄高清视频| 亚洲欧美在线专区| 午夜精品一区二区三区在线视| 成人h动漫精品一区二区下载| 久久国产人妖系列| 国产va免费精品高清在线观看| 久久精品视频久久| 母乳一区在线观看| 亚洲va欧美va在线观看| 亚洲综合免费视频| 日韩成人一区二区三区在线观看| 成人免费网视频| 国产一区二区在线不卡| 免费人成精品欧美精品| 国产精品wwwwww| 国产熟妇一区二区三区四区| 黑人巨大精品欧美一区| 91精品久久久久| 黄色av网站免费在线观看| 国产不卡在线播放| www.久久久| 精品女同一区二区三区| 久久99精品一区二区三区 | 欧美激情亚洲综合| 亚洲第一区色| 5278欧美一区二区三区| 97人妻人人澡人人爽人人精品 | 96sao精品视频在线观看| 欧美女优在线| 亚洲成人你懂的| 69sex久久精品国产麻豆| 美洲精品一卡2卡三卡4卡四卡| 亚洲激情图片一区| 免费看污黄网站| 婷婷国产精品| 一区二区三区四区视频| 久草视频免费播放| 99精品国产在热久久下载| 91高清视频免费| www.欧美色| 国产**成人网毛片九色| 四虎永久免费网站| 韩国日本一区| 日韩欧美一区二区在线视频| 亚洲一级理论片| 日产国产欧美视频一区精品| 久久香蕉综合色| 免费在线小视频| 欧美色图一区二区三区| 亚洲色图欧美自拍| 粉嫩av一区二区| 美女扒开尿口让男人操亚洲视频网站 | av日韩一区| 制服.丝袜.亚洲.另类.中文| 国产精品20p| 天天超碰亚洲| 午夜精品久久久99热福利| 99精品免费观看| 亚洲欧美日韩人成在线播放| 嫩草视频免费在线观看| gogo人体一区| 欧美激情一区二区三区久久久| 99在线观看免费| 亚洲综合区在线| 已婚少妇美妙人妻系列| 日韩精品社区| 日本国产精品视频| 国产在线色视频| 一区二区免费看| 日韩精品――色哟哟| 国产乱码精品一区二区亚洲| 国产成人av网址| 成年在线电影| 精品久久久久久久久久国产| 色综合五月婷婷| 亚洲激情中文| 国产精品扒开腿做| 北条麻妃在线| 在线播放亚洲一区| 欧美黑人一级片| 蜜臀va亚洲va欧美va天堂| 亚洲欧美日韩综合一区| а天堂中文最新一区二区三区| 蜜臀久久99精品久久久久久宅男 | 最新日韩三级| 日韩视频免费观看高清完整版在线观看| 裸体武打性艳史| 日本亚洲三级在线| 正在播放国产精品| 向日葵视频成人app网址| 国产香蕉精品视频一区二区三区| 国产精品theporn动漫| 9人人澡人人爽人人精品| 少妇熟女一区二区| 另类中文字幕国产精品| 日韩精品视频中文在线观看| 国产伦理片在线观看| 在线看片一区| 日韩av大全| 亚洲色图官网| 国产视频亚洲视频| 亚洲一卡二卡在线观看| 一区二区三区丝袜| 天天躁日日躁aaaa视频| 国产美女在线观看一区| 亚洲免费久久| 懂色av一区二区| 国产精品日韩在线观看| 国产大片在线免费观看| 欧美一区二视频| 欧美视频www| 26uuu亚洲综合色| 久久久久久久久久毛片| 美女诱惑一区| 国产肉体ⅹxxx137大胆| 日本午夜一区| 国产精品视频精品| 超碰在线免费播放| 欧美一区二区视频在线观看2022| 麻豆久久久久久久久久| 亚洲免费观看在线观看| 在线观看中文av| 视频一区二区三区中文字幕| 日本不卡二区| 懂色aⅴ精品一区二区三区| 国产香蕉精品视频一区二区三区| 国产高潮在线观看| 欧美三级电影一区| 亚洲精品男人的天堂| 亚洲尤物在线视频观看| 日韩一区二区三区四区视频| 91麻豆免费看片| 国产精品成人免费一区久久羞羞| 韩国久久久久| 精品欧美国产一区二区三区不卡| 毛片在线网站| 久久久久久久久久久av| 成人无遮挡免费网站视频在线观看| 一区二区中文字幕| 国产一级免费在线观看| 日韩精品一区二区视频| 四虎永久在线观看| 一本久道久久综合中文字幕| 青青青视频在线播放| 国产色爱av资源综合区| 好吊日免费视频| 久久超碰97人人做人人爱| 无码无遮挡又大又爽又黄的视频| 一本久道久久综合狠狠爱| 久久综合久久久久| 综合天天久久| 欧美a级黄色大片| 91精品精品| 强伦女教师2:伦理在线观看| 色综合天天爱| 在线观看亚洲视频啊啊啊啊| 91青青国产在线观看精品| 99三级在线| 亚洲精品一区国产| 国产成人拍精品视频午夜网站| 中文字幕乱码在线播放| 欧美在线国产精品| 伊人久久综合一区二区| 国产91精品青草社区| 韩国成人漫画| 国产精品久久久久久超碰| 91大神在线观看线路一区| 国产精品久久久久久久7电影| 国产精品4hu.www| 91最新在线免费观看| 97久久综合区小说区图片区| 国产精品国产精品国产专区不卡| 日产精品一区| 国产精品嫩草视频| 精品国产一区二| 国产精品久久久久久久久久直播 | 日本不卡视频在线播放| 蜜桃视频成人m3u8| 成人日韩在线电影| jizzjizzjizz欧美| 日本视频一区二区在线观看| 久久国产成人午夜av影院宅| 精品一区二区三区毛片| 欧美精选一区| 久久综合久久色| 国产乱人伦偷精品视频免下载 | 国产亚洲欧洲997久久综合 | 91官网在线免费观看| 久久久国产精品黄毛片| 国产欧美日本一区视频| 少妇激情一区二区三区视频| 六月丁香婷婷色狠狠久久| 三级网站免费看| 美国三级日本三级久久99| 91极品视频在线观看| 欧美亚洲视频| 水蜜桃色314在线观看| 女人香蕉久久**毛片精品| 亚洲精品9999| 国产精品www.| 黄色三级视频在线| 成人在线视频首页| 国产一二三四区在线| 久久网站热最新地址| 性欧美18—19sex性高清| 久久综合成人精品亚洲另类欧美| 国产一区第一页| 五月天精品一区二区三区| 国产男人搡女人免费视频| 色天天综合久久久久综合片| 国产免费福利视频| 亚洲乱码国产乱码精品精天堂| 色婷婷在线视频| 色偷偷偷亚洲综合网另类| 飘雪影视在线观看免费观看 | 欧美四级电影网| 手机看片久久久| 精品久久久久久久久久久| 国产精品一区二区人人爽| 亚洲精品在线91| 蜜桃传媒在线观看免费进入| 91精品国产综合久久久久久蜜臀 | 99久热这里只有精品视频免费观看| 日本午夜精品一区二区| 影音先锋在线一区| 香蕉视频xxx| 国产精品系列在线| 男女全黄做爰文章| 狠狠久久亚洲欧美专区| 黄色污污网站在线观看| 精品少妇一区二区三区视频免付费| www.爱爱.com| 日韩亚洲成人av在线| 超碰在线观看免费| 国产欧美一区二区三区久久人妖| 香蕉久久精品| aa在线观看视频| 成人禁用看黄a在线| 一区二区三区少妇| 一区二区三区在线观看欧美| 一起草av在线| 在线免费观看羞羞视频一区二区| 在线最新版中文在线| 精品国产免费人成电影在线观...| 欧美精品观看| 国产精品欧美性爱| 亚洲欧美另类综合偷拍| 国产美女无遮挡永久免费| 色偷偷88888欧美精品久久久| 日韩在线影院| 色播亚洲婷婷| 蜜臀av一区二区在线观看| 国产真人真事毛片视频| 欧美综合一区二区三区| 国产一级在线| 国产精品中文在线| av毛片精品| 久久久久久久9| a美女胸又www黄视频久久| 日韩精品成人一区| 欧美日韩在线三级| 最新av网站在线观看| 国产噜噜噜噜久久久久久久久| 四虎8848精品成人免费网站| 91 视频免费观看| 26uuu另类欧美亚洲曰本| 国产www在线| 亚洲日本中文字幕免费在线不卡| 宅男在线观看免费高清网站| caoporn国产精品免费公开| 亚洲精品色图| 亚洲永久精品ww.7491进入| 亚洲欧美一区二区三区国产精品 | 亚洲一区三区在线观看| 黄页网站一区| 亚洲视频 中文字幕| 岛国av一区二区三区| 国产尤物视频在线| 91久久精品美女高潮| 欧美日韩精品| 精品少妇人妻一区二区黑料社区| 欧洲精品在线观看| 黄网站视频在线观看| 国产不卡av在线| 久久久久久久久丰满| 亚洲成人精品在线播放| 精品久久久在线观看| 1024免费在线视频| 国产精品一级久久久| 日本一本中文字幕| 麻豆av免费在线观看| 日韩高清av在线| 免费日韩电影| 五月天男人天堂| 成人精品在线视频观看| 无码人妻精品一区二区50| 久久激情视频免费观看| 一区二区三区电影大全| 亚洲欧美电影在线观看| 国产mv日韩mv欧美| 少妇一级淫片日本| 欧美日韩第一视频| av在线精品| 欧美老熟妇喷水| 成人av在线一区二区三区| 中文字幕精品无| 欧美激情aaaa| 日韩欧美1区| 国产伦精品一区二区三区妓女|