精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

SeaweedFS 分布式文件系統(tǒng)源碼分析

開發(fā) 架構(gòu)
Master Server 支持多節(jié)點(奇數(shù))部署。使用 Raft 一致性算法來選舉 Leader 節(jié)點,這樣可以保證在 Leader 節(jié)點宕機的情況下,其他節(jié)點可以重新選舉出新的 Leader 節(jié)點,從而保證系統(tǒng)的高可用性。

本文基于 seaweedfs 3.46[1]

SeaweedFS 的架構(gòu)包括 Master Server、Volume Server 和 Filer Server 。

圖片

啟動 Master Server

啟動一個 Master Server 可以使用以下命令:

weed master -ip=127.0.0.1 -ip.bind=0.0.0.0

啟動入口以及所有的參數(shù)定義在 weed/command/master.go ,默認情況 http 監(jiān)聽端口使用 9333 ,grpc 監(jiān)聽端口則在 http 端口的基礎(chǔ)上加 10000 (所有組件的默認規(guī)則)即 19333 :

if *masterOption.portGrpc == 0 {
 *masterOption.portGrpc = 10000 + *masterOption.port
}

Master Server 支持多節(jié)點(奇數(shù))部署。使用 Raft 一致性算法來選舉 Leader 節(jié)點,這樣可以保證在 Leader 節(jié)點宕機的情況下,其他節(jié)點可以重新選舉出新的 Leader 節(jié)點,從而保證系統(tǒng)的高可用性。

如下,啟動一個由三個 Master Server 節(jié)點所組成的集群:

weed master -ip=127.0.0.1 -ip.bind=0.0.0.0 -port=9333 -peers="127.0.0.1:9333,127.0.0.1:9334,127.0.0.1:9335"
weed master -ip=127.0.0.1 -ip.bind=0.0.0.0 -port=9334 -peers="127.0.0.1:9333,127.0.0.1:9334,127.0.0.1:9335"
weed master -ip=127.0.0.1 -ip.bind=0.0.0.0 -port=9335 -peers="127.0.0.1:9333,127.0.0.1:9334,127.0.0.1:9335"

當 Master Server 啟動時,它會嘗試加入集群并參與 Leader 選舉。一旦選舉完成,Leader 節(jié)點將負責(zé)管理整個集群以及 Volume Server 。

首先會創(chuàng)建一個 Master Server 包裝的 weed_server.RaftServer 對象:

raftServer, err = weed_server.NewRaftServer(raftServerOption)
if raftServer == nil {
 glog.Fatalf("please verify %s is writable, see https://github.com/seaweedfs/seaweedfs/issues/717: %s", *masterOption.metaFolder, err)
}

在 weed_server.NewRaftServer() 方法中會創(chuàng)建好 Raft 節(jié)點所需的各種參數(shù)和對象,然后調(diào)用 github.com/seaweedfs/raft[2] 庫創(chuàng)建 RaftServer 對象并啟動 Raft 節(jié)點:

type RaftServer struct {
 // 存儲初始節(jié)點信息
 peers map[string]pb.ServerAddress
 // Raft 節(jié)點
 raftServer raft.Server
 // HashiCorp Raft 節(jié)點
 RaftHashicorp *hashicorpRaft.Raft
 // 用于管理 Raft 節(jié)點之間的通信
 TransportManager *transport.Manager
 // Raft 節(jié)點的數(shù)據(jù)目錄
 dataDir string
 // Raft 節(jié)點的地址
 serverAddr pb.ServerAddress
 // Raft 集群的拓撲結(jié)構(gòu)
 topo *topology.Topology
 // Raft 節(jié)點的 gRPC 服務(wù)
 *raft.GrpcServer
}

func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
 // 通過 option 創(chuàng)建一個 RaftServer 對象 s
 s := &RaftServer{
  peers:      option.Peers,
  serverAddr: option.ServerAddr,
  dataDir:    option.DataDir,
  topo:       option.Topo,
 }

 //...

 // 調(diào)用 github.com/seaweedfs/raft 庫,創(chuàng)建 RaftServer 對象
 s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, option.Topo, "")

 //...

 // 啟動 Raft 節(jié)點
 if err := s.raftServer.Start(); err != nil {
  return nil, err
 }

 // 將節(jié)點加入到 Raft 集群中
 for name, peer := range s.peers {
  if err := s.raftServer.AddPeer(name, peer.ToGrpcAddress()); err != nil {
   return nil, err
  }
 }

 //...

 glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())

 return s, nil
}

最后,會打印出當前的 Leader 節(jié)點,如果對 Raft 選舉算法的處理細節(jié)感興趣,可以繼續(xù)深入 s.raftServer.Start() 的實現(xiàn)。

Raft 節(jié)點啟動成功后,Master Server 會注冊一些集群相關(guān)的接口,方便查看集群狀態(tài):

r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
r.HandleFunc("/cluster/healthz", raftServer.HealthzHandler).Methods("GET", "HEAD")
if *masterOption.raftHashicorp {
 r.HandleFunc("/raft/stats", raftServer.StatsRaftHandler).Methods("GET")
}

請求如下:

$ curl http://127.0.0.1:9333/cluster/status
{"IsLeader":true,"Leader":"127.0.0.1:9333","Peers":["127.0.0.1:9335","127.0.0.1:9334"]}
$ curl http://127.0.0.1:9334/cluster/status
{"Leader":"127.0.0.1:9333","Peers":["127.0.0.1:9335","127.0.0.1:9333"]}
$ curl http://127.0.0.1:9335/cluster/status
{"Leader":"127.0.0.1:9333","Peers":["127.0.0.1:9333","127.0.0.1:9334"]}

啟動 Volume Server

啟動一個 Volume Server 可以使用以下命令:

weed volume -mserver="127.0.0.1:9333" -dir=data -ip=127.0.0.1 -ip.bind=0.0.0.0

啟動入口以及所有的參數(shù)定義在 weed/command/volume.go ,默認情況 http 監(jiān)聽端口使用 8080 ,grpc 監(jiān)聽端口使用 18080 。

其中,-mserver 為 Master Server 連接地址,當需要連接的 Master Server 為集群時,可以將多個 Master Server 的連接地址用逗號分隔; -dir 則用來指定 Volume Server 存儲數(shù)據(jù)文件的目錄。

和 Master Server 不同,Volume Server 支持橫向擴展,其節(jié)點數(shù)量規(guī)模可以隨著數(shù)據(jù)量和性能需求的變化而隨時動態(tài)調(diào)整。

一旦 Volume Server 啟動后,就會與 Master Server 保持通信,匯報自身的狀態(tài),并根據(jù) Master Server 的指示執(zhí)行創(chuàng)建、刪除、修復(fù)等操作。

核心邏輯在 weed/server/volume_grpc_client_to_master.go 的 VolumeServer.doHeartbeat 方法。

首先會創(chuàng)建一個 Master Server 的 gRPC 連接客戶端,并使用該客戶端調(diào)用 SendHeartbeat 方法:

// 創(chuàng)建 Master Server 的 gRPC 連接客戶端
client := master_pb.NewSeaweedClient(grpcConnection)
// 調(diào)用 SendHeartbeat
stream, err := client.SendHeartbeat(ctx)
if err != nil {
 glog.V(0).Infof("SendHeartbeat to %s: %v", masterAddress, err)
 return "", err
}

SendHeartbeat 方法是一個雙向流式 RPC ,允許在一次調(diào)用中發(fā)送多個請求和響應(yīng),其 ProtoBuf 定義如下:

rpc SendHeartbeat (stream Heartbeat) returns (stream HeartbeatResponse) {
}

接著創(chuàng)建一個 goroutine 用來處理從 Master Server 發(fā)送過來的 Heartbeat 請求:

go func() {
 for {
  // 從輸入流中讀取 Heartbeat 請求
  in, err := stream.Recv()
  if err != nil {
   doneChan <- err
   return
  }
  // ...

  // 如果 Heartbeat 請求中包含了卷大小限制,并且該限制和當前 Volume Server 中保存的限制不同
  if in.GetVolumeSizeLimit() != 0 && vs.store.GetVolumeSizeLimit() != in.GetVolumeSizeLimit() {
   // 將 Volume Server 中保存的限制更新為 Heartbeat 請求中的限制
   vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit())
   // 調(diào)用 vs.store.MaybeAdjustVolumeMax() 方法重新計算卷的最大容量
   if vs.store.MaybeAdjustVolumeMax() {
    // 如果計算結(jié)果發(fā)生了變化,則使用 stream.Send() 方法向 Master Server 發(fā)送 Heartbeat 響應(yīng)
    if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
     glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.currentMaster, err)
     return
    }
   }
  }
  // 如果 Heartbeat 請求中包含了新的 Master Server 地址,并且該地址和當前地址不同
  if in.GetLeader() != "" && string(vs.currentMaster) != in.GetLeader() {
   // 通知主函數(shù)切換新的 Master Server 地址作為 Leader
   glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), vs.currentMaster)
   newLeader = pb.ServerAddress(in.GetLeader())
   doneChan <- nil
   return
  }
 }
}()

最后使用一個 for select 來監(jiān)聽來自 Volume Server 存儲層的四個通道:NewVolumesChan、NewEcShardsChan、DeletedVolumesChan 和 DeletedEcShardsChan。每當有新的卷或 EC 分片被創(chuàng)建或刪除時,會生成一個 Heartbeat 消息,并使用 stream.Send() 方法將其發(fā)送到 Master Server ,同時也會定期發(fā)送心跳消息給 Master Server :

for {
 select {
 // 有新的卷被創(chuàng)建
 case volumeMessage := <-vs.store.NewVolumesChan:
  // ...
  // 通知 Master Server
  glog.V(0).Infof("volume server %s:%d adds volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
  if err = stream.Send(deltaBeat); err != nil {
   glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
   return "", err
  }
 // 有新的 EC 分片被創(chuàng)建
 case ecShardMessage := <-vs.store.NewEcShardsChan:
  // ...
  // 通知 Master Server
  glog.V(0).Infof("volume server %s:%d adds ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
   erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
  if err = stream.Send(deltaBeat); err != nil {
   glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
   return "", err
  }
 // 有卷被刪除
 case volumeMessage := <-vs.store.DeletedVolumesChan:
  // ...
  // 通知 Master Server
  glog.V(0).Infof("volume server %s:%d deletes volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
  if err = stream.Send(deltaBeat); err != nil {
   glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
   return "", err
  }
 // 有 EC 分片被刪除
 case ecShardMessage := <-vs.store.DeletedEcShardsChan:
  // ...
  // 通知 Master Server
  glog.V(0).Infof("volume server %s:%d deletes ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
   erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
  if err = stream.Send(deltaBeat); err != nil {
   glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
   return "", err
  }
 // 發(fā)送卷信息的心跳消息
 case <-volumeTickChan.C:
  glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
  vs.store.MaybeAdjustVolumeMax()
  if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
   glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
   return "", err
  }
 // 發(fā)送 EC 分片信息的心跳消息
 case <-ecShardTickChan.C:
  glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port)
  if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil {
   glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
   return "", err
  }
 // Volume Server 停止,退出監(jiān)聽
 case err = <-doneChan:
  return
 // 用于在 Volume Server 停止時發(fā)送最終的心跳消息
 case <-vs.stopChan:
  // ...
  glog.V(1).Infof("volume server %s:%d stops and deletes all volumes", vs.store.Ip, vs.store.Port)
  if err = stream.Send(emptyBeat); err != nil {
   glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
   return "", err
  }
  return
 }
}

啟動 Filer Server

啟動一個 Filer Server 可以使用以下命令:

weed filer -s3 -master="127.0.0.1:9333" -ip=127.0.0.1 -ip.bind=0.0.0.0

啟動入口以及所有的參數(shù)定義在 weed/command/filer.go ,默認情況 http 監(jiān)聽端口使用 8888 ,grpc 監(jiān)聽端口使用 18888 。

在這里,-master 為 Master Server 連接地址,同樣地,當需要連接的 Master Server 為集群時,可以將多個 Master Server 的連接地址用逗號分隔; -s3 則代表要啟動 S3 網(wǎng)關(guān)功能,默認監(jiān)聽 8333 端口。

Filer Server 可以理解為一個文件管理器,通過向下對接 Volume Server 與 Master Server,對外提供豐富的功能與特性,除了自身提供的 API 接口,還支持擴展其它比如 POSIX ,WebDAV,S3 等的文件操作接口。

Filer Server 通過外部數(shù)據(jù)庫存儲文件的元數(shù)據(jù)信息。默認情況下,使用的是 leveldb ,支持替換為其它流行的數(shù)據(jù)庫,例如 Sqlite、MySql、Etcd 等,具體可以參考 wiki/Filer-Stores[3] 。

作為一個 API Server ,F(xiàn)iler Server 在架構(gòu)上就是一個服務(wù)端+數(shù)據(jù)庫模型,其節(jié)點的數(shù)量和規(guī)模可以根據(jù)不同的工作負載和使用情況進行優(yōu)化和調(diào)整。

上傳文件

首先分析 Filer Server 自身提供的 API 接口,上傳文件可以直接調(diào)用 :

$ curl -F "file_name=@test.txt" -X POST "http://127.0.0.1:8888"
{"name":"test.txt","size":14}

文件上傳的接口定義在 weed/server/filer_server_handlers_write.go 的 PostHandler 方法:

func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, contentLength int64) {
 // 解析請求的目標路徑
 // ...
 // 解析請求的查詢參數(shù),用于確定文件的存儲位置和屬性
 // ...
 if query.Has("mv.from") {
  // 若查詢參數(shù)中出現(xiàn) mv.from ,則進行文件移動操作
  fs.move(ctx, w, r, so)
 } else {
  // 文件上傳操作,自動分塊
  fs.autoChunk(ctx, w, r, contentLength, so)
 }

 util.CloseRequest(r)
}

跟蹤到 fs.autoChunk 方法:

func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, contentLength int64, so *operation.StorageOption) {
 //...

 if r.Method == "POST" {
  // 上傳文件
  if r.Header.Get("Content-Type") == "" && strings.HasSuffix(r.URL.Path, "/") {
   reply, err = fs.mkdir(ctx, w, r)
  } else {
   // 自動分塊上傳
   reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, contentLength, so)
  }
 } else {
  // 創(chuàng)建目錄
  reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, contentLength, so)
 }

 //...
}

繼續(xù)來到 fs.doPostAutoChunk 方法:

func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, contentLength int64, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {

 // 讀取上傳的文件內(nèi)容
 multipartReader, multipartReaderErr := r.MultipartReader()
 if multipartReaderErr != nil {
  return nil, nil, multipartReaderErr
 }

 // 讀取第一個分塊,在這里,我們只需要讀取第一個分塊,即上傳文件的內(nèi)容的分塊
 part1, part1Err := multipartReader.NextPart()
 if part1Err != nil {
  return nil, nil, part1Err
 }

 // 獲取文件名和 Content-Type
 fileName := part1.FileName()
 if fileName != "" {
  fileName = path.Base(fileName)
 }
 contentType := part1.Header.Get("Content-Type")
 if contentType == "application/octet-stream" {
  contentType = ""
 }

 // 核心邏輯
 // 將上傳的文件內(nèi)容轉(zhuǎn)換為文件分塊,并返回文件分塊的相關(guān)信息
 fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so)
 if err != nil {
  return nil, nil, err
 }

 // 計算文件內(nèi)容的 MD5 值
 md5bytes = md5Hash.Sum(nil)
 // 保存文件元數(shù)據(jù)信息
 filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent)
 if replyerr != nil {
  fs.filer.DeleteChunks(fileChunks)
 }

 return
}

這些都比較好讀,繼續(xù)跟蹤到核心邏輯處 fs.uploadReaderToChunks ,方法內(nèi)首先會進行一些正確性校驗和必要變量的初始化,然后開啟一個循環(huán),不斷讀取數(shù)據(jù)并將其轉(zhuǎn)換為一個或多個 Chunk :

func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
 // ...一系列操作
 // 進行一些正確性校驗和必要變量的初始化

 for {

  // 使用對象池機制限制 bytes.Buffer 對象的數(shù)量,優(yōu)化內(nèi)存占用
  bytesBufferLimitCond.L.Lock()
  for atomic.LoadInt64(&bytesBufferCounter) >= 4 {
   glog.V(4).Infof("waiting for byte buffer %d", atomic.LoadInt64(&bytesBufferCounter))
   bytesBufferLimitCond.Wait()
  }
  atomic.AddInt64(&bytesBufferCounter, 1)
  bytesBufferLimitCond.L.Unlock()

  bytesBuffer := bufPool.Get().(*bytes.Buffer)
  glog.V(4).Infof("received byte buffer %d", atomic.LoadInt64(&bytesBufferCounter))

  // 【關(guān)鍵】分塊操作,每個塊就是一個 bytes.Buffer
  // 根據(jù) chunkSize 從 partReader 中讀取數(shù)據(jù),并將讀取的數(shù)據(jù)保存到 bytes.Buffer 對象中
  limitedReader := io.LimitReader(partReader, int64(chunkSize))

  bytesBuffer.Reset()

  dataSize, err := bytesBuffer.ReadFrom(limitedReader)

  // 處理讀取數(shù)據(jù)時可能出現(xiàn)的錯誤,以及在讀取完整個文件時的處理
  // ...

  wg.Add(1)
  // 開啟 goroutine 處理
  go func(offset int64) {
   defer func() {
    // 將 bytes.Buffer 對象歸還對象池
    bufPool.Put(bytesBuffer)
    atomic.AddInt64(&bytesBufferCounter, -1)
    // 通知其他 goroutine 可以使用更多的 bytes.Buffer 對象
    bytesBufferLimitCond.Signal()
    wg.Done()
   }()

   // 【關(guān)鍵】上傳數(shù)據(jù)塊
   chunks, toChunkErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), offset, so)

   if toChunkErr != nil {
    // 記錄上傳錯誤
    uploadErrLock.Lock()
    if uploadErr == nil {
     uploadErr = toChunkErr
    }
    uploadErrLock.Unlock()
   }
   if chunks != nil {
    fileChunksLock.Lock()
    fileChunksSize := len(fileChunks) + len(chunks)
    for _, chunk := range chunks {
     // 【關(guān)鍵】將當前上傳的數(shù)據(jù)塊添加到 fileChunks 列表中
     fileChunks = append(fileChunks, chunk)
     glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, fileChunksSize, chunk.FileId, offset, offset+int64(chunk.Size))
    }
    fileChunksLock.Unlock()
   }
  }(chunkOffset)

  // 更新已經(jīng)讀取的數(shù)據(jù)塊的大小
  chunkOffset = chunkOffset + dataSize

  if dataSize < int64(chunkSize) {
   // 已經(jīng)讀取完整個文件
   break
  }
 }

 wg.Wait()

 if uploadErr != nil {
  // 上傳出錯,刪除 fileChunks
  fs.filer.DeleteChunks(fileChunks)
  return nil, md5Hash, 0, uploadErr, nil
 }
 // 【關(guān)鍵】對已經(jīng)上傳的數(shù)據(jù)塊,即 fileChunks 進行排序,以便后續(xù)可以正確地進行數(shù)據(jù)合并
 slices.SortFunc(fileChunks, func(a, b *filer_pb.FileChunk) bool {
  return a.Offset < b.Offset
 })
 // 返回 fileChunks 給調(diào)用方保存
 return fileChunks, md5Hash, chunkOffset, nil, smallContent
}

文件的分塊操作都是在 Filer Server 完成的。而其中上傳數(shù)據(jù)塊的 fs.dataToChunk 方法會與 Master Server 進行交互。

該方法首先會調(diào)用 fs.assignNewFileInfo 向 Master Server 請求分配一個新的文件 ID(fid)以及上傳 URL :

fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(so)
if uploadErr != nil {
 // ...
 return uploadErr
}

然后使用分配的 fid 調(diào)用上傳 URL 上傳數(shù)據(jù)塊:

uploadResult, uploadErr, _ = fs.doUpload(urlLocation, dataReader, fileName, contentType, nil, auth)
if uploadErr != nil {
 // ...
 return uploadErr
}

這個由 Master Server 所分配的上傳 URL ,實際就是 Volume Server 的上傳地址,例 http://127.0.0.1:8080/14,1f343c431d ,其中 14,1f343c431d 就是文件 ID ,其實這個文件 ID 更準確地說應(yīng)該是代表一個數(shù)據(jù)塊的文件 ID。

SeaweedFS 會根據(jù) maxMB 參數(shù),來把文件拆分成多個塊存儲,默認大小是 4MB 。即一個 100MB 大小的文件,上傳到 SeaweedFS 后會被分成 25 個塊存儲,也就是申請分配了 25 個文件 ID 。

f.maxMB = cmdFiler.Flag.Int("maxMB", 4, "split files larger than the limit")

到這里,總算捋清流程了。

那還有一個 S3 接口的文件上傳呢?

不用擔(dān)心,SeaweedFS S3 只是做了一個 API 的代理轉(zhuǎn)發(fā),依舊轉(zhuǎn)發(fā)到 Filer Server 自身提供的 API 接口,邏輯依舊和上面一致,代碼位置在 weed/s3api/s3api_object_handlers.go :

// 這里的 uploadUrl 實際就是 Filer Server 的地址
// 例如在名稱為 test 的 S3 Bucket 中上傳 test.txt 文件
// 則 uploadUrl 為: http://127.0.0.1:8888/buckets/test/test.txt
etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, "")

下載文件

和上傳文件一樣,SeaweedFS S3 為文件下載做了一個代理轉(zhuǎn)發(fā),轉(zhuǎn)發(fā)到 Filer Server 自身提供的 API 接口:

// 這里的 destUrl 實際就是 Filer Server 的地址
// 例如要下載 test Bucket 中的 test.txt 文件
// 則 destUrl 為: http://127.0.0.1:8888/buckets/test/test.txt
s3a.proxyToFiler(w, r, destUrl, false, passThroughResponse)

所以,當下載一個文件時:

$ curl http://127.0.0.1:8888/test.txt
hello test.txt

直接來看 weed/server/filer_server_handlers_read.go 的 GetOrHeadHandler 接口:

func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {

 // ...
 // 從 URL 中獲取文件或文件夾路徑

 // 根據(jù)文件或文件夾的完整路徑從元數(shù)據(jù)數(shù)據(jù)庫中查找出 Entry 記錄(即文件的元數(shù)據(jù)信息)

 // 若是文件夾,則列出文件夾下的文件
 // ...

 // 如果指定了 metadata=true 參數(shù),則直接返回文件或文件夾的元數(shù)據(jù)信息
 if query.Get("metadata") == "true" {
  // ...
  return
 }

 // 減少服務(wù)器帶寬
 // 通過 Etag 資源標識對比資源是否發(fā)生變化
 etag := filer.ETagEntry(entry)
 if checkPreconditions(w, r, entry) {
  // 如果資源未發(fā)生改變,則返回 304 Not Modified 響應(yīng),不返回具體的資源
  // 客戶端可以直接讀取緩存中的數(shù)據(jù)
  return
 }

 // 設(shè)置 ETag 標識到響應(yīng)頭
 setEtag(w, etag)

 // ...

 // 這里是用來處理獲取圖片文件的邏輯
 if rangeReq := r.Header.Get("Range"); rangeReq == "" {
  // ...
 }

 // 獲取普通文件核心邏輯
 processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
  // 偏移量從請求頭中獲取,例 Range: bytes=100-199
  // 若無指定偏移量,默認為 0
  // 判斷請求的范圍是否在文件的內(nèi)容大小范圍內(nèi)
  if offset+size <= int64(len(entry.Content)) {
   // ...
   return err
  }
  // 從元數(shù)據(jù)數(shù)據(jù)庫獲取到的chunks信息
  chunks := entry.GetChunks()
  // 判斷文件是否只存在于遠程存儲中,例如 AWS S3 、Google Cloud Storage 等
  if entry.IsInRemoteOnly() {
   // 將遠程對象緩存到本地集群,并更新新的chunks
   // ...
  }

  // 【核心】開始讀取文件并寫入 HTTP 響應(yīng)
  // MasterClient :Master 節(jié)點的客戶端
  // chunks :要讀取的文件數(shù)據(jù)塊列表
  // offset :請求的文件內(nèi)容的起始位置
  // size :請求的文件內(nèi)容的大小
  // DownloadMaxBytesPs :下載速率的限制,單位是字節(jié)/秒
  err = filer.StreamContentWithThrottler(fs.filer.MasterClient, writer, chunks, offset, size, fs.option.DownloadMaxBytesPs)
  if err != nil {
   stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadStream).Inc()
   glog.Errorf("failed to stream content %s: %v", r.URL, err)
  }
  return err
 })
}

根據(jù)代碼,我們可以直接通過 metadata=true 查詢參數(shù)查看文件的元數(shù)據(jù)信息:

$ curl http://127.0.0.1:8888/test.txt?metadata=true
{"FullPath":"/test.txt","Mtime":"2023-04-23T17:18:37+08:00","Crtime":"2023-04-23T17:18:37+08:00","Mode":432,"Uid":4294967295,"Gid":4294967295,"Mime":"text/plain","TtlSec":0,"UserName":"","GroupNames":null,"SymlinkTarget":"","Md5":"wuSNy045Bd4p8mTjIc40cg==","FileSize":14,"Rdev":0,"Inode":0,"Extended":null,"chunks":[{"file_id":"14,1f343c431d","size":14,"modified_ts_ns":1682241517592601300,"e_tag":"wuSNy045Bd4p8mTjIc40cg==","fid":{"volume_id":14,"file_key":31,"cookie":876364573},"is_compressed":true}],"HardLinkId":null,"HardLinkCounter":0,"Content":null,"Remote":null,"Quota":0}

其中最重要的就是 chunks 信息,里面定義了該文件的所有數(shù)據(jù)塊信息,只要把所有數(shù)據(jù)塊拼湊一起,就可以還原出整個文件。文件大小的原因,這里剛好只有一個塊,其文件 ID 為 14,1f343c431d 。

繼續(xù)解讀文件下載的核心方法 filer.StreamContentWithThrottler ,首先獲取所有文件 ID 所對應(yīng)的 URL 列表:

// 將 chunks 轉(zhuǎn)換為視圖列表 chunkViews
chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)

fileId2Url := make(map[string][]string)

// 通過 chunkViews.Front() 獲取 chunkViews 列表的頭部元素,然后在每次迭代中將 x 移動到下一個元素,直到遍歷完整個列表
for x := chunkViews.Front(); x != nil; x = x.Next {
 // 從 x.Value 中獲取 chunkView 對象
 chunkView := x.Value
 var urlStrings []string
 var err error
 // 獲取 chunkView 對應(yīng)的文件 ID 的 URL 列表,并將 URL 列表存儲在 urlStrings 變量中
 // 在分布式系統(tǒng)中,網(wǎng)絡(luò)故障和其他因素可能導(dǎo)致某些請求失敗,因此需要多次嘗試獲取 URL 列表,以提高獲取成功的概率
 for _, backoff := range getLookupFileIdBackoffSchedule {
  urlStrings, err = masterClient.GetLookupFileIdFunction()(chunkView.FileId)
  if err == nil && len(urlStrings) > 0 {
   break
  }
  glog.V(4).Infof("waiting for chunk: %s", chunkView.FileId)
  time.Sleep(backoff)
 }
 // 錯誤處理
 // ...
 fileId2Url[chunkView.FileId] = urlStrings
}

然后,通過獲取到的 URL 列表下載文件的所有 chunk :

// 下載速度限制器
downloadThrottler := util.NewWriteThrottler(downloadMaxBytesPs)
remaining := size
// 通過遍歷 chunkViews 列表來下載每個 chunk
for x := chunkViews.Front(); x != nil; x = x.Next {
 chunkView := x.Value
 // 檢查文件偏移量
 if offset < chunkView.ViewOffset {
  // ...
 }
 urlStrings := fileId2Url[chunkView.FileId]
 start := time.Now()
 // 【核心】從 URL 列表中讀取 chunkView 的數(shù)據(jù),并將數(shù)據(jù)寫入到 writer 中給到客戶端
 err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize))
 // 更新文件偏移量
 offset += int64(chunkView.ViewSize)
 // 更新剩余數(shù)據(jù)大小
 remaining -= int64(chunkView.ViewSize)
 // ...
}
// 檢查文件的所有數(shù)據(jù)是否都已經(jīng)成功下載
if remaining > 0 {
 glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining)
 err := writeZero(writer, remaining)
 if err != nil {
  return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining)
 }
}

可以總結(jié)出,下載文件本質(zhì)也是和 Master Server 交互,通過文件 ID 獲取到對應(yīng) Volume Server 的數(shù)據(jù)塊下載地址列表,按照列表順序請求下載數(shù)據(jù)塊,最后重新整合成了一個完整的文件返回給客戶端。

最后,附上文件下載的流程:

圖片

參考資料

[1]seaweedfs 3.46: https://github.com/seaweedfs/seaweedfs/tree/3.46

[2]github.com/seaweedfs/raft: https://github.com/seaweedfs/raft/tree/v1.1.0

[3]wiki/Filer-Stores: https://github.com/seaweedfs/seaweedfs/wiki/Filer-Stores

責(zé)任編輯:武曉燕 來源: gopher云原生
相關(guān)推薦

2010-11-01 05:50:46

分布式文件系統(tǒng)

2017-10-17 08:33:31

存儲系統(tǒng)分布式

2010-11-15 13:24:07

分布式文件系統(tǒng)

2013-01-07 10:29:31

大數(shù)據(jù)

2012-08-31 16:04:11

HDFS分布式文件系統(tǒng)

2013-06-18 14:00:59

HDFS分布式文件系統(tǒng)

2012-09-19 15:05:24

MogileFS分布式文件系統(tǒng)

2010-06-04 18:45:43

Hadoop分布式文件

2012-09-19 13:43:13

OpenAFS分布式文件系統(tǒng)

2013-05-27 14:46:06

文件系統(tǒng)分布式文件系統(tǒng)

2011-07-15 17:48:27

Platform

2011-03-16 14:23:38

分布式文件

2012-10-09 16:43:47

FastDFS分布式文件系統(tǒng)

2012-05-10 14:04:07

分布式文件系統(tǒng)架構(gòu)

2012-05-10 15:23:53

分布式文件系統(tǒng)測試

2020-01-03 08:33:57

Ceph硬件系統(tǒng)

2022-09-13 07:51:08

JuiceFS分布式文件系統(tǒng)

2012-07-20 14:40:22

2013-01-07 10:42:43

HDFS

2014-03-12 17:40:07

GlusterFS分布式文件系統(tǒng)
點贊
收藏

51CTO技術(shù)棧公眾號

国产乱国产乱老熟| 亚洲电影中文字幕| 欧美高清视频一区二区| 一级特黄性色生活片| 亚洲国产精品无码久久| 91免费精品| 欧美国产日韩a欧美在线观看 | 97久久精品午夜一区二区| 免费成人深夜天涯网站| 美女网站在线看| 国产激情视频一区二区在线观看 | 成人自拍小视频| 欧美三级精品| 久久亚洲春色中文字幕久久久| 欧美国产日本高清在线| 日本一级免费视频| 在线视频超级| 91丝袜美腿高跟国产极品老师| 欧美激情视频一区二区| 日本高清黄色片| 国产精品色呦| 天天色 色综合| 麻豆视频成人| 亚洲国产成人精品女人久久| 国产欧美日韩| 欧美色偷偷大香| 亚洲精品成人三区| 91九色蝌蚪91por成人| 日韩欧美在线中字| 制服丝袜成人动漫| 特级西西人体www高清大胆| 国产精品高潮呻吟AV无码| 99久久激情| 欧美tickling挠脚心丨vk| 久久人人爽人人爽人人av| 亚洲国产精品一| 国产一区二区91| 欧美疯狂性受xxxxx另类| 欧美aaa级片| 亚洲日韩中文字幕一区| 亚洲精品亚洲人成人网| 亚洲一区中文字幕在线观看| 人妻人人澡人人添人人爽| 全球中文成人在线| 亚洲综合成人在线视频| 久久99久久精品国产| 中文字幕一区二区三区四区欧美| 久久国产亚洲| 国产亚洲一区二区精品| 操人视频免费看| 色爱综合区网| 夜夜爽av福利精品导航| 亚洲欧美中文字幕在线一区| 日本三级黄色网址| 麻豆网站在线看| 成人av免费观看| 国产精品永久免费观看| 农村妇女精品一区二区| 91精品亚洲| 亚洲精品短视频| 国产免费中文字幕| 性国裸体高清亚洲| 午夜伦理一区二区| 四虎影视永久免费在线观看一区二区三区| 国产精品人人爽| 狠狠色丁香婷婷综合久久片| 97在线视频免费播放| 在线观看一区二区三区四区| 韩国主播福利视频一区二区三区| 国产精品国产成人国产三级| 精品国产一区二区三| 国产精品女人久久久| 国产乱码精品一品二品| 国产精品69久久久久| 久久免费精彩视频| 欧美电影免费观看高清| 久久久精品一区| 法国空姐电影在线观看| 久久人人99| 欧美日韩成人精品| 亚洲一二三四五六区| 婷婷亚洲精品| 欧美成人福利视频| 三级视频网站在线观看| 亚洲一区二区三区免费| 欧美夫妻性生活| 午夜免费一区二区| 伊人亚洲精品| 亚洲国产福利在线| 大胸美女被爆操| 欧美1区视频| 日韩中文字幕精品视频| 无码国产69精品久久久久同性| gogo人体一区| 欧美一区二区在线看| 182午夜在线观看| 久久av偷拍| 欧美一区欧美二区| 欧美深性狂猛ⅹxxx深喉 | av中文资源在线| 成人av电影在线| 亚洲欧美日韩国产yyy| 激情网站在线| 国产精品美日韩| 日韩在线导航| 国产系列在线观看| 国产色综合一区| 精品日产一区2区三区黄免费 | 久久久亚洲成人| 欧美色图亚洲视频| 老司机精品久久| 日本国产精品视频| 香蕉免费毛片视频| 亚洲三级影院| 国内外成人免费激情在线视频网站| 无码人妻丰满熟妇精品| 成人动漫一区二区三区| 伊人婷婷久久| 里番在线观看网站| 国产精品视频线看| 蜜桃传媒一区二区三区| a国产在线视频| 欧美日韩在线另类| 免费裸体美女网站| 精品自拍视频| 欧美一区二区三区小说| 中字幕一区二区三区乱码| 成人av资源电影网站| 日韩在线观看免费全集电视剧网站| 毛片久久久久久| 美女网站久久| 成人a视频在线观看| 国产黄a三级三级三级| av色综合久久天堂av综合| 女同一区二区| 美女91在线看| 亚洲福利在线视频| 九九热国产精品视频| 亚洲看片一区| 国产精品v欧美精品∨日韩| 天天操天天干天天爽| 久久精品无码一区二区三区| 亚洲女人毛片| xxxxx.日韩| 欧美成人一区二区三区在线观看| 亚洲啪av永久无码精品放毛片| 精品一区二区男人吃奶| 国产亚洲视频在线| 久久精品视频5| 精品一区二区影视| 国产伦精品一区二区三区照片| 久青草国产在线| 亚洲欧美激情插| 国产又粗又长又爽又黄的视频| 国产区精品视频在线观看豆花| 九九综合九九综合| 亚洲va久久久噜噜噜无码久久| 一区二区三区日本| 午夜精品久久久内射近拍高清| 色噜噜成人av在线| 久久精品视频在线| 性一交一乱一精一晶| 亚洲一区二区视频| 孩娇小videos精品| 欧美激情成人| **亚洲第一综合导航网站| 色呦呦在线看| 日韩精品亚洲视频| 一级黄色录像视频| 日韩中文欧美在线| 国产精品亚洲不卡a| 91www在线| 亚洲伦理中文字幕| 久久久久97国产| 不卡的av在线| 四虎永久在线精品无码视频| 日韩av在线播放网址| 91社区国产高清| a天堂中文在线88| 欧美精品久久一区| 国产精品成人国产乱| 久久超碰97人人做人人爱| 中文字幕精品在线播放| 成人黄色免费观看| 久久伊人色综合| 中国精品一区二区| 久久美女高清视频| 日韩大片一区二区| 色棕色天天综合网| 午夜精品久久久久久久99黑人| 日本xxxx人| 一区二区三区精品在线观看| 特级西西人体wwwww| 黄页网站一区| 翡翠波斯猫1977年美国| 偷拍中文亚洲欧美动漫| 亚洲美腿欧美激情另类| 一区二区国产欧美| 婷婷中文字幕综合| 日韩激情小视频| 久久久.com| 岛国精品一区二区三区| 男女性色大片免费观看一区二区| 日韩高清专区| 中文字幕av一区二区三区四区| 国产极品jizzhd欧美| 日本成人不卡| 视频在线观看99| 日韩福利一区二区| 欧美三级欧美成人高清www| youjizz.com日本| 日韩二区三区在线观看| 大伊香蕉精品视频在线| 婷婷伊人综合| 午夜精品区一区二区三| 免费观看成人www动漫视频| 91成品人片a无限观看| 成年人黄视频在线观看| 日韩女同互慰一区二区| 最新黄色网址在线观看| 中文字幕欧美一区| 国产成人精品综合久久久久99| 欧美天堂亚洲电影院在线观看| 国产精品一区二区三区免费| **欧美日韩在线| 国产成人中文字幕| 在线观看爽视频| 欧美俄罗斯性视频| 9191在线播放| 亚洲免费一级电影| 日本美女一级片| 日韩欧美123| jlzzjlzzjlzz亚洲人| 欧美老年两性高潮| 中文字幕+乱码+中文乱码91| 精品高清一区二区三区| 日本午夜小视频| 国产性做久久久久久| 国产精品无码永久免费不卡| caoporn国产精品| 午夜男人的天堂| 成人免费毛片a| av丝袜天堂网| 欧美日韩一区自拍| 乱子伦一区二区| 亚洲人成免费网站| 欧美极品一区二区| 日本中文字幕视频一区| 国产精品美女久久| 美女精品导航| 久久久久久午夜| 电影av一区| 中文字幕精品—区二区| 欧美一区二区黄片| 亚洲成色777777在线观看影院| 亚洲大尺度网站| 亚洲成人激情图| 天天干视频在线| 亚洲免费高清视频| 粉嫩av在线播放| 精品国产一区二区三区久久久| 美女黄视频在线观看| 欧美精品在线观看| 国产色a在线| 中文字幕日韩欧美在线视频| 日本高清中文字幕在线| 国产视频久久久久| 九色在线免费| 欧美精品一区男女天堂| 中文字幕av无码一区二区三区| 欧美日韩免费高清一区色橹橹| 天天操天天摸天天干| 亚洲另类春色国产| 久久久久久久久久久97| 精品久久久视频| 中文字字幕在线观看| 日韩三级视频在线看| 欧美视频xxxx| 91精品国产综合久久久蜜臀图片| 精品人妻伦一区二区三区久久| 91福利视频在线| 欧美日韩综合在线观看| 在线观看亚洲专区| 亚洲精品77777| 91福利社在线观看| 99精品在线视频观看| 欧美少妇一区二区| 精品人妻一区二区三区蜜桃| 精品爽片免费看久久| 最新av网站在线观看 | 91欧美视频网站| 欧美亚洲国产日韩| 亚洲一区二区免费视频软件合集| 日韩在线影视| 亚洲欧洲日韩综合二区| 亚洲国内欧美| 激情黄色小视频| 美国一区二区三区在线播放| 欧美伦理视频在线观看| 国产乱码精品一区二区三区忘忧草 | 伊人情人网综合| 一本久久综合| 九色91porny| 中日韩免费视频中文字幕| 天天插天天操天天干| 在线综合亚洲欧美在线视频 | 久久精品福利视频| 亚洲v.com| 99视频日韩| 色播一区二区| 成人做爰66片免费看网站| 国产探花一区在线观看| 岛国大片在线播放| 狠狠色2019综合网| 中文字幕在线观看免费高清| 午夜精品一区在线观看| a视频免费在线观看| 中文字幕久热精品视频在线| 涩涩视频在线免费看| dy888夜精品国产专区| 97精品视频| 日韩肉感妇bbwbbwbbw| 91丨九色丨蝌蚪富婆spa| h色网站在线观看| 欧美日韩国产综合视频在线观看| 青青草超碰在线| 午夜精品免费视频| 一本一道久久a久久| 欧美做受777cos| 久久av老司机精品网站导航| 欧美丰满老妇熟乱xxxxyyy| 狠狠做深爱婷婷久久综合一区| 内射后入在线观看一区| 欧美激情喷水视频| 亚洲精品一区二区三区在线| 欧美性受xxxx黑人猛交88| 久国产精品韩国三级视频| 夫妇交换中文字幕| 欧美亚洲图片小说| 国产高清第一页| 久久精品久久久久久| 91精品网站在线观看| 中文字幕成人一区| 精品系列免费在线观看| 国产美女高潮视频| 亚洲福利国产精品| 久久久久精彩视频| 亚洲片国产一区一级在线观看| 超级碰碰久久| 日本一区二区精品| 欧美激情第8页| 无码人妻一区二区三区精品视频| 一区二区三区日韩在线观看| 性一交一乱一伧老太| 国语自产偷拍精品视频偷 | 日韩在线第一区| 久久综合综合久久综合| 中日韩一级黄色片| 日韩一二三区不卡| 久久大胆人体| 久久久久久草| 欧美a一区二区| 国产又黄又嫩又滑又白| 亚洲国产另类av| 亚洲 欧美 激情 另类| 九九热r在线视频精品| 影音先锋欧美激情| 欧美 国产 综合| 国产福利一区二区三区视频在线 | 欧美被日视频| 99久久精品免费看国产一区二区三区| 在线观看日韩av电影| 亚洲18在线看污www麻豆| 亚洲欧美激情在线| 人人妻人人玩人人澡人人爽| 日本国产一区二区三区| 久久中文视频| 无码av免费精品一区二区三区| 欧美性猛交xxxx免费看久久久| 国产大片在线免费观看| 97超碰在线播放| 视频一区在线视频| 99久久婷婷国产综合| 日韩av中文字幕在线播放| 在线观看的网站你懂的| 久久精品美女| 激情文学综合丁香| 国产精品100| 久久精品国产99国产精品澳门| 久久porn| 精品综合久久久久| 精品国产乱码久久久久久虫虫漫画| eeuss影院在线观看| 国产乱码精品一区二区三区日韩精品| 日韩福利视频导航| 免看一级a毛片一片成人不卡| 亚洲一二在线观看| 91国内精品| 日本美女视频一区| 色综合色狠狠天天综合色|