精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Loki 源碼分析之日志寫入

開發 前端
這里面我們嘗試對 Loki 的源碼進行一些簡單的分析,由于有很多模塊和實現細節,這里我們主要是對核心功能進行分析,希望對大家有所幫助。本文首先對日志的寫入過程進行簡單分析。

 [[403097]]

前面我們介紹了 Loki 的一些基本使用配置,但是對 Loki 還是了解不夠深入,官方文檔寫得較為凌亂,而且沒有跟上新版本,為了能夠對 Loki 有一個更深入的認識,做到有的放矢,這里面我們嘗試對 Loki 的源碼進行一些簡單的分析,由于有很多模塊和實現細節,這里我們主要是對核心功能進行分析,希望對大家有所幫助。本文首先對日志的寫入過程進行簡單分析。

Distributor Push API

Promtail 通過 Loki 的 Push API 接口推送日志數據,該接口在初始化 Distributor 的時候進行初始化,在控制器基礎上包裝了兩個中間件,其中的 HTTPAuthMiddleware 就是獲取租戶 ID,如果開啟了認證配置,則從 X-Scope-OrgID 這個請求 Header 頭里面獲取,如果沒有配置則用默認的 fake 代替。

  1. // pkg/loki/modules.go 
  2. func (t *Loki) initDistributor() (services.Service, error) { 
  3.  ...... 
  4.  if t.cfg.Target != All { 
  5.   logproto.RegisterPusherServer(t.Server.GRPC, t.distributor) 
  6.  } 
  7.  
  8.  pushHandler := middleware.Merge( 
  9.   serverutil.RecoveryHTTPMiddleware, 
  10.   t.HTTPAuthMiddleware, 
  11.  ).Wrap(http.HandlerFunc(t.distributor.PushHandler)) 
  12.  
  13.  t.Server.HTTP.Handle("/api/prom/push", pushHandler) 
  14.  t.Server.HTTP.Handle("/loki/api/v1/push", pushHandler) 
  15.  return t.distributor, nil 

Push API 處理器實現如下所示,首先通過 ParseRequest 函數將 Http 請求轉換成 logproto.PushRequest,然后直接調用 Distributor 下面的 Push 函數來推送日志數據:

  1. // pkg/distributor/http.go 
  2.  
  3. // PushHandler 從 HTTP body 中讀取一個 snappy 壓縮的 proto 
  4. func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { 
  5.  logger := util_log.WithContext(r.Context(), util_log.Logger) 
  6.  userID, _ := user.ExtractOrgID(r.Context()) 
  7.  req, err := ParseRequest(logger, userID, r) 
  8.  ...... 
  9.  _, err = d.Push(r.Context(), req) 
  10.  ...... 
  11.  
  12. func ParseRequest(logger gokit.Logger, userID string, r *http.Request) (*logproto.PushRequest, error) { 
  13.  var body lokiutil.SizeReader 
  14.  contentEncoding := r.Header.Get(contentEnc) 
  15.  switch contentEncoding { 
  16.  case ""
  17.   body = lokiutil.NewSizeReader(r.Body) 
  18.  case "snappy"
  19.   body = lokiutil.NewSizeReader(r.Body) 
  20.  case "gzip"
  21.   gzipReader, err := gzip.NewReader(r.Body) 
  22.   if err != nil { 
  23.    return nil, err 
  24.   } 
  25.   defer gzipReader.Close() 
  26.   body = lokiutil.NewSizeReader(gzipReader) 
  27.  default
  28.   return nil, fmt.Errorf("Content-Encoding %q not supported", contentEncoding) 
  29.  } 
  30.  
  31.  contentType := r.Header.Get(contentType) 
  32.  var req logproto.PushRequest 
  33.  ...... 
  34.  switch contentType { 
  35.  case applicationJSON: 
  36.   var err error 
  37.   if loghttp.GetVersion(r.RequestURI) == loghttp.VersionV1 { 
  38.    err = unmarshal.DecodePushRequest(body, &req) 
  39.   } else { 
  40.    err = unmarshal_legacy.DecodePushRequest(body, &req) 
  41.   } 
  42.   if err != nil { 
  43.    return nil, err 
  44.   } 
  45.  default
  46.   // When no content-type header is set or when it is set to 
  47.   // `application/x-protobuf`: expect snappy compression. 
  48.   if err := util.ParseProtoReader(r.Context(), body, int(r.ContentLength), math.MaxInt32, &req, util.RawSnappy); err != nil { 
  49.    return nil, err 
  50.   } 
  51.  } 
  52.  return &req, nil 

首先我們先了解下 PushRequest 的結構,PushRequest 就是一個 Stream 集合:

  1. // pkg/logproto/logproto.pb.go 
  2. type PushRequest struct { 
  3.  Streams []Stream `protobuf:"bytes,1,rep,name=streams,proto3,customtype=Stream" json:"streams"
  4.  
  5. // pkg/logproto/types.go 
  6. // Stream 流包含一個唯一的標簽集,作為一個字符串,然后還包含一組日志條目 
  7. type Stream struct { 
  8.  Labels  string  `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"
  9.  Entries []Entry `protobuf:"bytes,2,rep,name=entries,proto3,customtype=EntryAdapter" json:"entries"
  10.  
  11. // Entry 是一個帶有時間戳的日志條目 
  12. type Entry struct { 
  13.  Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"
  14.  Line      string    `protobuf:"bytes,2,opt,name=line,proto3" json:"line"

 

然后查看 Distributor 下的 Push 函數實現:

  1. // pkg/distributor/distributor.go 
  2. // Push 日志流集合 
  3. func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) { 
  4.  // 獲取租戶ID 
  5.  userID, err := user.ExtractOrgID(ctx) 
  6.  ...... 
  7.  
  8.  // 首先把請求平鋪成一個樣本的列表 
  9.  streams := make([]streamTracker, 0, len(req.Streams)) 
  10.  keys := make([]uint32, 0, len(req.Streams)) 
  11.  var validationErr error 
  12.  validatedSamplesSize := 0 
  13.  validatedSamplesCount := 0 
  14.  
  15.  validationContext := d.validator.getValidationContextFor(userID) 
  16.  
  17.  for _, stream := range req.Streams { 
  18.   // 解析日志流標簽 
  19.   stream.Labels, err = d.parseStreamLabels(validationContext, stream.Labels, &stream) 
  20.   ...... 
  21.   n := 0 
  22.   for _, entry := range stream.Entries { 
  23.    // 校驗一個日志Entry實體 
  24.    if err := d.validator.ValidateEntry(validationContext, stream.Labels, entry); err != nil { 
  25.     validationErr = err 
  26.     continue 
  27.    } 
  28.    stream.Entries[n] = entry 
  29.    n++ 
  30.    // 校驗成功的樣本大小和個數 
  31.    validatedSamplesSize += len(entry.Line) 
  32.    validatedSamplesCount++ 
  33.   } 
  34.   // 去掉校驗失敗的實體 
  35.   stream.Entries = stream.Entries[:n] 
  36.  
  37.   if len(stream.Entries) == 0 { 
  38.    continue 
  39.   } 
  40.   // 為當前日志流生成用于hash換的token值 
  41.   keys = append(keys, util.TokenFor(userID, stream.Labels)) 
  42.   streams = append(streams, streamTracker{ 
  43.    stream: stream, 
  44.   }) 
  45.  } 
  46.  
  47.  if len(streams) == 0 { 
  48.   return &logproto.PushResponse{}, validationErr 
  49.  } 
  50.  
  51.  now := time.Now() 
  52.  // 每個租戶有一個限速器,判斷可以正常傳輸的日志大小是否應該被限制 
  53.  if !d.ingestionRateLimiter.AllowN(now, userID, validatedSamplesSize) { 
  54.   // 返回429表明客戶端被限速了 
  55.   ...... 
  56.   return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, int(d.ingestionRateLimiter.Limit(now, userID)), validatedSamplesCount, validatedSamplesSize) 
  57.  } 
  58.  
  59.  const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck 
  60.  var descs [maxExpectedReplicationSet]ring.InstanceDesc 
  61.  
  62.  samplesByIngester := map[string][]*streamTracker{} 
  63.  ingesterDescs := map[string]ring.InstanceDesc{} 
  64.  for i, key := range keys { 
  65.   // ReplicationSet 描述了一個指定的鍵與哪些 Ingesters 進行對話,以及可以容忍多少個錯誤 
  66.   // 根據 label hash 到 hash 環上獲取對應的 ingester 節點,一個節點可能有多個對等的 ingester 副本來做 HA 
  67.   replicationSet, err := d.ingestersRing.Get(key, ring.Write, descs[:0], nil, nil) 
  68.   ...... 
  69.   // 最小成功的實例樹 
  70.   streams[i].minSuccess = len(replicationSet.Ingesters) - replicationSet.MaxErrors 
  71.   // 可容忍的最大故障實例數 
  72.   streams[i].maxFailures = replicationSet.MaxErrors 
  73.   // 將 Stream 按對應的 ingester 進行分組 
  74.   for _, ingester := range replicationSet.Ingesters { 
  75.    // 配置每個 ingester 副本對應的日志流數據 
  76.    samplesByIngester[ingester.Addr] = append(samplesByIngester[ingester.Addr], &streams[i]) 
  77.    ingesterDescs[ingester.Addr] = ingester 
  78.   } 
  79.  } 
  80.  
  81.  tracker := pushTracker{ 
  82.   done: make(chan struct{}), 
  83.   err:  make(chan error), 
  84.  } 
  85.  tracker.samplesPending.Store(int32(len(streams))) 
  86.  // 循環Ingesters 
  87.  for ingester, samples := range samplesByIngester { 
  88.   // 讓ingester并行處理通過hash環對應的日志流列表 
  89.   go func(ingester ring.InstanceDesc, samples []*streamTracker) { 
  90.    ...... 
  91.    // 將日志流樣本數據下發給對應的 ingester 節點 
  92.    d.sendSamples(localCtx, ingester, samples, &tracker) 
  93.   }(ingesterDescs[ingester], samples) 
  94.  } 
  95.  ...... 

Push 函數的核心就是根據日志流的標簽來計算一個 Token 值,根據這個 Token 值去哈希環上獲取對應的處理日志的 Ingester 實例,然后并行通過 Ingester 處理日志流數據,通過 sendSamples 函數為單個 ingester 去發送日志樣本數據:

  1. // pkg/distributor/distributor.go 
  2.  
  3. func (d *Distributor) sendSamples(ctx context.Context, ingester ring.InstanceDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) { 
  4.  err := d.sendSamplesErr(ctx, ingester, streamTrackers) 
  5.  ...... 
  6.  
  7. func (d *Distributor) sendSamplesErr(ctx context.Context, ingester ring.InstanceDesc, streams []*streamTracker) error { 
  8.  // 根據 ingester 地址獲取 client 
  9.  c, err := d.pool.GetClientFor(ingester.Addr) 
  10.  ...... 
  11.  // 重新構造 PushRequest 
  12.  req := &logproto.PushRequest{ 
  13.   Streams: make([]logproto.Stream, len(streams)), 
  14.  } 
  15.  for i, s := range streams { 
  16.   req.Streams[i] = s.stream 
  17.  } 
  18.  // 通過 Ingester 客戶端請求數據 
  19.  _, err = c.(logproto.PusherClient).Push(ctx, req) 
  20.  ...... 

Ingester 寫入日志

Ingester 客戶端中的 Push 函數實際上就是一個 gRPC 服務的客戶端:

  1. // pkg/ingester/ingester.go 
  2.  
  3. // Push 實現 logproto.Pusher. 
  4. func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) { 
  5.  // 獲取租戶ID 
  6.  instanceID, err := user.ExtractOrgID(ctx) 
  7.  ...... 
  8.  // 根據租戶ID獲取 instance 對象 
  9.  instance := i.getOrCreateInstance(instanceID) 
  10.  // 直接調用 instance 對象 Push 數據 
  11.  err = instance.Push(ctx, req) 
  12.  return &logproto.PushResponse{}, err 

instance 下的 Push 函數:

  1. // pkg/ingester/instance.go 
  2.  
  3. func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { 
  4.  record := recordPool.GetRecord() 
  5.  record.UserID = i.instanceID 
  6.  defer recordPool.PutRecord(record) 
  7.  
  8.  i.streamsMtx.Lock() 
  9.  defer i.streamsMtx.Unlock() 
  10.  
  11.  var appendErr error 
  12.  for _, s := range req.Streams { 
  13.   // 獲取一個 stream 對象 
  14.   stream, err := i.getOrCreateStream(s, false, record) 
  15.   if err != nil { 
  16.    appendErr = err 
  17.    continue 
  18.   } 
  19.   // 真正用于數據處理的是 stream 對象中的 Push 函數 
  20.   if _, err := stream.Push(ctx, s.Entries, record); err != nil { 
  21.    appendErr = err 
  22.    continue 
  23.   } 
  24.  } 
  25.  ...... 
  26.  return appendErr 
  27.  
  28. func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, record *WALRecord) (*stream, error) { 
  29.  if lock { 
  30.   i.streamsMtx.Lock() 
  31.   defer i.streamsMtx.Unlock() 
  32.  } 
  33.  // 如果 streams 中包含當前標簽列表對應的 stream 對象,則直接返回 
  34.  stream, ok := i.streams[pushReqStream.Labels] 
  35.  if ok { 
  36.   return stream, nil 
  37.  } 
  38.  // record 只在重放 WAL 時為 nil 
  39.  // 我們不希望在重放 WAL 后丟掉數據 
  40.  // 為 instance 降低 stream 流限制 
  41.  var err error 
  42.  if record != nil { 
  43.   // 限流器判斷 
  44.   // AssertMaxStreamsPerUser 確保與當前輸入的流數量沒有達到限制 
  45.   err = i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams)) 
  46.  } 
  47.  ...... 
  48.  // 解析日志流標簽集 
  49.  labels, err := logql.ParseLabels(pushReqStream.Labels) 
  50.  ...... 
  51.  // 獲取對應標簽集的指紋 
  52.  fp := i.getHashForLabels(labels) 
  53.  // 重新實例化一個 stream 對象,這里還會維護日志流的倒排索引 
  54.  sortedLabels := i.index.Add(client.FromLabelsToLabelAdapters(labels), fp) 
  55.  stream = newStream(i.cfg, fp, sortedLabels, i.metrics) 
  56.  // 將stream設置到streams中去 
  57.  i.streams[pushReqStream.Labels] = stream 
  58.  i.streamsByFP[fp] = stream 
  59.  
  60.  // 當重放 wal 的時候 record 是 nil (我們不希望在重放時重寫 wal entries). 
  61.  if record != nil { 
  62.   record.Series = append(record.Series, tsdb_record.RefSeries{ 
  63.    Ref:    uint64(fp), 
  64.    Labels: sortedLabels, 
  65.   }) 
  66.  } else { 
  67.   // 如果 record 為 nil,這就是一個 WAL 恢復 
  68.   i.metrics.recoveredStreamsTotal.Inc() 
  69.  } 
  70.  ...... 
  71.  i.addTailersToNewStream(stream) 
  72.  return stream, nil 

這個里面涉及到 WAL 這一塊的設計,比較復雜,我們可以先看 stream 下面的 Push 函數實現,主要就是將收到的 []Entry 先 Append 到內存中的 Chunk 流([]chunkDesc) 中:

  1. // pkg/ingester/stream.go 
  2. func (s *stream) Push(ctx context.Context, entries []logproto.Entry, record *WALRecord) (int, error) { 
  3.  s.chunkMtx.Lock() 
  4.  defer s.chunkMtx.Unlock() 
  5.  var bytesAdded int 
  6.  prevNumChunks := len(s.chunks) 
  7.  var lastChunkTimestamp time.Time 
  8.  // 如果之前的 chunks 列表為空,則創建一個新的 chunk 
  9.  if prevNumChunks == 0 { 
  10.   s.chunks = append(s.chunks, chunkDesc{ 
  11.    chunk: s.NewChunk(), 
  12.   }) 
  13.   chunksCreatedTotal.Inc() 
  14.  } else { 
  15.   // 獲取最新一個chunk的日志時間戳 
  16.   _, lastChunkTimestamp = s.chunks[len(s.chunks)-1].chunk.Bounds() 
  17.  } 
  18.  
  19.  var storedEntries []logproto.Entry 
  20.  failedEntriesWithError := []entryWithError{} 
  21.  
  22.  for i := range entries { 
  23.   // 如果這個日志條目與我們最后 append 的一行的時間戳和內容相匹配,則忽略它 
  24.   if entries[i].Timestamp.Equal(s.lastLine.ts) && entries[i].Line == s.lastLine.content { 
  25.    continue 
  26.   } 
  27.  
  28.   // 最新的一個 chunk 
  29.   chunk := &s.chunks[len(s.chunks)-1] 
  30.   // 如果當前chunk已經關閉 或者 已經達到設置的最大 Chunk 大小 
  31.   if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, lastChunkTimestamp, chunk, s.cfg.SyncPeriod, s.cfg.SyncMinUtilization) { 
  32.    // 如果 chunk 沒有更多的空間,則調用 Close 來以確保 head block 中的數據都被切割和壓縮。 
  33.    err := chunk.chunk.Close() 
  34.    ...... 
  35.    chunk.closed = true 
  36.    ...... 
  37.    // Append 一個新的 Chunk 
  38.    s.chunks = append(s.chunks, chunkDesc{ 
  39.     chunk: s.NewChunk(), 
  40.    }) 
  41.    chunk = &s.chunks[len(s.chunks)-1] 
  42.    lastChunkTimestamp = time.Time{} 
  43.   } 
  44.   // 往 chunk 里面 Append 日志數據 
  45.   if err := chunk.chunk.Append(&entries[i]); err != nil { 
  46.    failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], err}) 
  47.   } else { 
  48.    // 存儲添加到 chunk 中的日志數據 
  49.    storedEntries = append(storedEntries, entries[i]) 
  50.    // 配置最后日志行的數據 
  51.    lastChunkTimestamp = entries[i].Timestamp 
  52.    s.lastLine.ts = lastChunkTimestamp 
  53.    s.lastLine.content = entries[i].Line 
  54.    // 累計大小 
  55.    bytesAdded += len(entries[i].Line) 
  56.   } 
  57.   chunk.lastUpdated = time.Now() 
  58.  } 
  59.  
  60.  if len(storedEntries) != 0 { 
  61.   // 當重放 wal 的時候 record 將為 nil(我們不希望在重放的時候重寫wal日志條目) 
  62.   if record != nil { 
  63.    record.AddEntries(uint64(s.fp), storedEntries...) 
  64.   } 
  65.   // 后續是用與tail日志的處理 
  66.   ...... 
  67.  } 
  68.  ...... 
  69.  // 如果新增了chunks 
  70.  if len(s.chunks) != prevNumChunks { 
  71.   memoryChunks.Add(float64(len(s.chunks) - prevNumChunks)) 
  72.  } 
  73.  return bytesAdded, nil 

Chunk 其實就是多條日志構成的壓縮包,將日志壓成 Chunk 的可以直接存入對象存儲, 一個 Chunk 到達指定大小之前會不斷 Append 新的日志到里面,而在達到大小之后, Chunk 就會關閉等待持久化(強制持久化也會關閉 Chunk, 比如關閉 ingester 實例時就會關閉所有的 Chunk 并持久化)。Chunk 的大小控制很重要:

  • 假如 Chunk 容量過小: 首先是導致壓縮效率不高,同時也會增加整體的 Chunk 數量, 導致倒排索引過大,最后, 對象存儲的操作次數也會變多, 帶來額外的性能開銷
  • 假如 Chunk 過大: 一個 Chunk 的 open 時間會更長, 占用額外的內存空間, 同時, 也增加了丟數據的風險,Chunk 過大也會導致查詢讀放大

(圖片來源: https://aleiwu.com/post/grafana-loki/)

在將日志流追加到 Chunk 中過后,在 Ingester 初始化時會啟動兩個循環去處理 Chunk 數據,分別從 chunks 數據取出存入優先級隊列,另外一個循環定期檢查從內存中刪除已經持久化過后的數據。

首先是 Ingester 中定義了一個 flushQueues 屬性,是一個優先級隊列數組,該隊列中存放的是 flushOp:

  1. // pkg/ingester/ingester.go 
  2. type Ingester struct { 
  3.  services.Service 
  4.  ...... 
  5.  // 每個 flush 線程一個隊列,指紋用來選擇隊列 
  6.  flushQueues     []*util.PriorityQueue  // 優先級隊列數組 
  7.  flushQueuesDone sync.WaitGroup 
  8.  ...... 
  9.  
  10. // pkg/ingester/flush.go 
  11. // 優先級隊列中存放的數據 
  12. type flushOp struct { 
  13.  from      model.Time 
  14.  userID    string 
  15.  fp        model.Fingerprint 
  16.  immediate bool 

在初始化 Ingester 的時候會根據傳遞的 ConcurrentFlushes 參數來實例化 flushQueues的大?。?/p>

  1. // pkg/ingester/ingester.go 
  2. func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *validation.Overrides, configs *runtime.TenantConfigs, registerer prometheus.Registerer) (*Ingester, error) { 
  3.  ...... 
  4.  i := &Ingester{ 
  5.   ...... 
  6.   flushQueues:           make([]*util.PriorityQueue, cfg.ConcurrentFlushes), 
  7.   ...... 
  8.  } 
  9.  ...... 
  10.  i.Service = services.NewBasicService(i.starting, i.running, i.stopping) 
  11.  return i, nil 

然后通過 services.NewBasicService 實例化 Service 的時候指定了服務的 Starting、Running、Stopping 3 個狀態,在其中的 staring 狀態函數中會啟動協程去消費優先級隊列中的數據

  1. // pkg/ingester/ingester.go 
  2. func (i *Ingester) starting(ctx context.Context) error { 
  3.  // todo,如果開啟了 WAL 的處理 
  4.  ...... 
  5.  // 初始化 flushQueues 
  6.  i.InitFlushQueues() 
  7.  ...... 
  8.  // 啟動循環檢查chunk數據 
  9.  i.loopDone.Add(1) 
  10.  go i.loop() 
  11.  return nil 

初始化 flushQueues 實現如下所示,其中 flushQueuesDone 是一個 WaitGroup,根據配置的并發數量并發執行 flushLoop 操作:

  1. // pkg/ingester/flush.go 
  2. func (i *Ingester) InitFlushQueues() { 
  3.  i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes) 
  4.  for j := 0; j < i.cfg.ConcurrentFlushes; j++ { 
  5.   // 為每個協程構造一個優先級隊列 
  6.   i.flushQueues[j] = util.NewPriorityQueue(flushQueueLength) 
  7.   go i.flushLoop(j) 
  8.  } 

每一個優先級隊列循環消費數據:

  1. // pkg/ingester/flush.go 
  2. func (i *Ingester) flushLoop(j int) { 
  3.  ...... 
  4.  for { 
  5.   // 從隊列中根據優先級取出數據 
  6.   o := i.flushQueues[j].Dequeue() 
  7.   if o == nil { 
  8.    return 
  9.   } 
  10.   op := o.(*flushOp) 
  11.   // 執行真正的刷新用戶序列數據 
  12.   err := i.flushUserSeries(op.userID, op.fp, op.immediate) 
  13.   ...... 
  14.   // 如果退出時刷新失敗了,把失敗的操作放回到隊列中去。 
  15.   if op.immediate && err != nil { 
  16.    op.from = op.from.Add(flushBackoff) 
  17.    i.flushQueues[j].Enqueue(op) 
  18.   } 
  19.  } 

刷新用戶的序列操作,也就是要保存到存儲中去:

  1. // pkg/ingester/flush.go 
  2. // 根據用戶ID刷新用戶日志序列 
  3. func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error { 
  4.  instance, ok := i.getInstanceByID(userID) 
  5.  ...... 
  6.  // 根據instance和fp指紋數據獲取需要刷新的chunks 
  7.  chunks, labels, chunkMtx := i.collectChunksToFlush(instance, fp, immediate) 
  8.  ...... 
  9.  // 執行真正的刷新 chunks 操作 
  10.  err := i.flushChunks(ctx, fp, labels, chunks, chunkMtx) 
  11.  ...... 
  12.  
  13. // 收集需要刷新的 chunks 
  14. func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint, immediate bool) ([]*chunkDesc, labels.Labels, *sync.RWMutex) { 
  15.  instance.streamsMtx.Lock() 
  16.  // 根據指紋數據獲取 stream 
  17.  stream, ok := instance.streamsByFP[fp] 
  18.  instance.streamsMtx.Unlock() 
  19.  if !ok { 
  20.   return nil, nil, nil 
  21.  } 
  22.  
  23.  var result []*chunkDesc 
  24.  stream.chunkMtx.Lock() 
  25.  defer stream.chunkMtx.Unlock() 
  26.  // 循環所有chunks 
  27.  for j := range stream.chunks { 
  28.   // 判斷是否應該刷新當前chunk 
  29.   shouldFlush, reason := i.shouldFlushChunk(&stream.chunks[j]) 
  30.   if immediate || shouldFlush { 
  31.    // 確保不再對該塊進行寫操作(如果沒有關閉,則設置為關閉狀態) 
  32.    if !stream.chunks[j].closed { 
  33.     stream.chunks[j].closed = true 
  34.    } 
  35.    // 如果該 chunk 還沒有被成功刷新,則刷新這個塊 
  36.    if stream.chunks[j].flushed.IsZero() { 
  37.     result = append(result, &stream.chunks[j]) 
  38.     ...... 
  39.    } 
  40.   } 
  41.  } 
  42.  return result, stream.labels, &stream.chunkMtx 

下面是判斷一個具體的 chunk 是否應該被刷新的邏輯:

  1. // pkg/ingester/flush.go 
  2. func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) (bool, string) { 
  3.  // chunk關閉了也應該刷新了 
  4.  if chunk.closed { 
  5.   if chunk.synced { 
  6.    return true, flushReasonSynced 
  7.   } 
  8.   return true, flushReasonFull 
  9.  } 
  10.  // chunk最后更新的時間超過了配置的 chunk 空閑時間 MaxChunkIdle 
  11.  if time.Since(chunk.lastUpdated) > i.cfg.MaxChunkIdle { 
  12.   return true, flushReasonIdle 
  13.  } 
  14.  
  15.  // chunk的邊界時間操過了配置的 chunk  最大時間 MaxChunkAge 
  16.  if fromto := chunk.chunk.Bounds(); to.Sub(from) > i.cfg.MaxChunkAge { 
  17.   return true, flushReasonMaxAge 
  18.  } 
  19.  return false"" 

真正將 chunks 數據刷新保存到存儲中是 flushChunks 函數實現的:

  1. // pkg/ingester/flush.go 
  2. func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs labels.Labels, cs []*chunkDesc, chunkMtx sync.Locker) error { 
  3.  ...... 
  4.  wireChunks := make([]chunk.Chunk, len(cs)) 
  5.  // 下面的匿名函數用于生成保存到存儲中的chunk數據 
  6.  err = func() error { 
  7.   chunkMtx.Lock() 
  8.   defer chunkMtx.Unlock() 
  9.  
  10.   for j, c := range cs { 
  11.    if err := c.chunk.Close(); err != nil { 
  12.     return err 
  13.    } 
  14.    firstTime, lastTime := loki_util.RoundToMilliseconds(c.chunk.Bounds()) 
  15.    ch := chunk.NewChunk( 
  16.     userID, fp, metric, 
  17.     chunkenc.NewFacade(c.chunk, i.cfg.BlockSize, i.cfg.TargetChunkSize), 
  18.     firstTime, 
  19.     lastTime, 
  20.    ) 
  21.  
  22.    chunkSize := c.chunk.BytesSize() + 4*1024 // size + 4kB should be enough room for cortex header 
  23.    start := time.Now() 
  24.    if err := ch.EncodeTo(bytes.NewBuffer(make([]byte, 0, chunkSize))); err != nil { 
  25.     return err 
  26.    } 
  27.    wireChunks[j] = ch 
  28.   } 
  29.   return nil 
  30.  }() 
  31.  
  32.  
  33.  // 通過 store 接口保存 chunk 數據 
  34.  if err := i.store.Put(ctx, wireChunks); err != nil { 
  35.   return err 
  36.  } 
  37.  
  38.  ...... 
  39.  
  40.  chunkMtx.Lock() 
  41.  defer chunkMtx.Unlock() 
  42.  for i, wc := range wireChunks { 
  43.   // flush 成功,寫入刷新時間 
  44.   cs[i].flushed = time.Now() 
  45.   // 下是一些監控數據更新 
  46.   ...... 
  47.  } 
  48.  
  49.  return nil 

chunk 數據被寫入到存儲后,還有有一個協程會去定時清理本地的這些 chunk 數據,在上面的 Ingester 的 staring 函數中最后有一個 go i.loop(),在這個 loop() 函數中會每隔 FlushCheckPeriod(默認 30s,可以通過 --ingester.flush-check-period 進行配置)時間就會去去調用 sweepUsers 函數進行垃圾回收:

  1. // pkg/ingester/ingester.go 
  2. func (i *Ingester) loop() { 
  3.  defer i.loopDone.Done() 
  4.  
  5.  flushTicker := time.NewTicker(i.cfg.FlushCheckPeriod) 
  6.  defer flushTicker.Stop() 
  7.  
  8.  for { 
  9.   select { 
  10.   case <-flushTicker.C: 
  11.    i.sweepUsers(falsetrue
  12.   case <-i.loopQuit: 
  13.    return 
  14.   } 
  15.  } 

sweepUsers 函數用于執行將日志流數據加入到優先級隊列中,并對沒有序列的用戶進行垃圾回收:

  1. // pkg/ingester/flush.go 
  2. // sweepUsers 定期執行 flush 操作,并對沒有序列的用戶進行垃圾回收 
  3. func (i *Ingester) sweepUsers(immediate, mayRemoveStreams bool) { 
  4.  instances := i.getInstances() 
  5.  for _, instance := range instances { 
  6.   i.sweepInstance(instance, immediate, mayRemoveStreams) 
  7.  } 
  8.  
  9. func (i *Ingester) sweepInstance(instance *instance, immediate, mayRemoveStreams bool) { 
  10.  instance.streamsMtx.Lock() 
  11.  defer instance.streamsMtx.Unlock() 
  12.  for _, stream := range instance.streams { 
  13.   i.sweepStream(instance, stream, immediate) 
  14.   i.removeFlushedChunks(instance, stream, mayRemoveStreams) 
  15.  } 
  16.  
  17. // must hold streamsMtx 
  18. func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate bool) { 
  19.  stream.chunkMtx.RLock() 
  20.  defer stream.chunkMtx.RUnlock() 
  21.  if len(stream.chunks) == 0 { 
  22.   return 
  23.  } 
  24.  // 最新的chunk 
  25.  lastChunk := stream.chunks[len(stream.chunks)-1] 
  26.  // 判斷是否應該被flush 
  27.  shouldFlush, _ := i.shouldFlushChunk(&lastChunk) 
  28.  // 如果只有一個chunk并且不是強制持久化切最新的chunk還不應該被flush,則直接返回 
  29.  if len(stream.chunks) == 1 && !immediate && !shouldFlush { 
  30.   return 
  31.  } 
  32.  // 根據指紋獲取用與處理的優先級隊列索引 
  33.  flushQueueIndex := int(uint64(stream.fp) % uint64(i.cfg.ConcurrentFlushes)) 
  34.  firstTime, _ := stream.chunks[0].chunk.Bounds() 
  35.  // 加入到優先級隊列中去 
  36.  i.flushQueues[flushQueueIndex].Enqueue(&flushOp{ 
  37.   model.TimeFromUnixNano(firstTime.UnixNano()), instance.instanceID, 
  38.   stream.fp, immediate, 
  39.  }) 
  40.  
  41. // 移除已經flush過后的chunks數據 
  42. func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream, mayRemoveStream bool) { 
  43.  now := time.Now() 
  44.  
  45.  stream.chunkMtx.Lock() 
  46.  defer stream.chunkMtx.Unlock() 
  47.  prevNumChunks := len(stream.chunks) 
  48.  var subtracted int 
  49.  for len(stream.chunks) > 0 { 
  50.   // 如果chunk還沒有被刷新到存儲 或者 chunk被刷新到存儲到現在的時間還沒操過 RetainPeriod(默認15分鐘,可以通過--ingester.chunks-retain-period 進行配置)則忽略 
  51.   if stream.chunks[0].flushed.IsZero() || now.Sub(stream.chunks[0].flushed) < i.cfg.RetainPeriod { 
  52.    break 
  53.   } 
  54.   subtracted += stream.chunks[0].chunk.UncompressedSize() 
  55.   // 刪除引用,以便該塊可以被垃圾回收起來 
  56.   stream.chunks[0].chunk = nil 
  57.   // 移除chunk 
  58.   stream.chunks = stream.chunks[1:] 
  59.  } 
  60.  ...... 
  61.  // 如果stream中的所有chunk都被清空了,則清空該 stream 的相關數據 
  62.  if mayRemoveStream && len(stream.chunks) == 0 { 
  63.   delete(instance.streamsByFP, stream.fp) 
  64.   delete(instance.streams, stream.labelsString) 
  65.   instance.index.Delete(stream.labels, stream.fp) 
  66.   ...... 
  67.  } 

關于存儲或者查詢等模塊的實現在后文再繼續探索,包括 WAL 的實現也較為復雜。

 

責任編輯:姜華 來源: k8s技術圈
相關推薦

2024-02-04 00:00:00

Loki性能查詢

2010-09-14 10:46:59

2011-02-22 16:23:20

VSFTPD

2025-02-10 02:00:00

2011-08-15 11:31:27

iPhone開發日志

2014-04-21 15:53:59

iOS開源項目CocoaLumber

2023-12-25 11:18:12

OpenTeleme應用日志Loki

2022-12-29 08:00:26

Loki網絡設備

2022-06-28 08:40:16

LokiPromtail日志報警

2021-05-18 07:30:36

開發Spring Boot日志

2021-09-13 08:20:13

Loki日志系統

2023-01-04 08:21:02

Loki配置日志

2024-03-11 00:01:00

PromtailLoki服務器

2024-02-01 09:48:17

2022-06-13 11:33:59

RedoMySQL

2023-12-05 07:21:17

IstioEnvoy

2011-03-15 11:33:18

iptables

2014-08-26 11:11:57

AsyncHttpCl源碼分析

2020-12-22 09:17:49

日志Loki服務

2025-03-26 08:01:18

點贊
收藏

51CTO技術棧公眾號

欧美xxxhd| 91国产免费视频| 欧美日韩一区二区三区四区不卡| 亚洲国产一二三| 精品久久sese| 日本欧美www| 国产精品97| 精品国产髙清在线看国产毛片| 日本美女爱爱视频| 亚洲 小说区 图片区 都市| 久久深夜福利| 麻豆成人在线看| 先锋资源av在线| h1515四虎成人| 一区二区视频在线| 欧美日韩精品久久| 国产精品一区二区人人爽| 亚洲国产精品一区| 中文字幕日韩免费视频| 欧美69精品久久久久久不卡| 手机在线免费看av| 成人av先锋影音| 国产精品成av人在线视午夜片| 久久久久人妻一区精品色| 北条麻妃一区二区三区在线观看| 亚洲精品乱码久久久久久 | 五月婷婷婷婷婷| 亚洲电影二区| 亚洲国产aⅴ成人精品无吗| 视频二区一区| 国产精品国产三级国产普通话对白 | 国产理论视频在线观看| 国产亚洲福利| 色偷偷av一区二区三区| 亚洲av无码一区二区三区观看 | 欧美激情在线观看| 污污视频网站在线免费观看| 哺乳挤奶一区二区三区免费看| 欧美日韩一区成人| www.亚洲视频.com| 最爽无遮挡行房视频在线| 国产女人18毛片水真多成人如厕| 国产精品v欧美精品v日韩精品| 136福利视频导航| 日韩高清不卡一区二区| 午夜精品蜜臀一区二区三区免费| 日韩在线观看免| 伊人久久噜噜噜躁狠狠躁| 欧美精品久久久久久久久老牛影院| 成人国产在线看| 国产在线高清视频| 亚洲国产电影在线观看| 精品国产91亚洲一区二区三区www| 国产视频第二页| 久久se精品一区精品二区| 日产精品99久久久久久| 69视频免费在线观看| 亚洲第一黄色| 欧美激情一二区| 成人观看免费视频| 欧美成人一区二免费视频软件| 中文字幕无线精品亚洲乱码一区 | 国产精品一区二区在线播放 | 国产精品av一区二区三区 | 国产天堂av在线| 不卡在线一区| 亚洲色图第一页| av黄色免费网站| 九一国产精品| 一区三区二区视频| 一本在线免费视频| 91九色精品| 久久天天躁狠狠躁夜夜av| 337人体粉嫩噜噜噜| 精品福利久久久| 中文字幕一区电影| 激情无码人妻又粗又大| 91视频综合| 久久精品国产69国产精品亚洲| 中文字幕美女视频| 欧美午夜电影在线观看| 久久人人爽人人| 国产精品一区二区三区四| 一区二区日韩免费看| 日本亚洲欧美成人| 一区二区自拍偷拍| 国产麻豆视频一区二区| 国产精品久久久久久久天堂第1集 国产精品久久久久久久免费大片 国产精品久久久久久久久婷婷 | 日本精品免费一区二区三区| 最近中文字幕在线免费观看| 久久国产乱子精品免费女| 成人xxxxx| 韩国av永久免费| 久久亚洲欧美国产精品乐播 | 欧美精品一区二区三| 欧美精品激情在线观看| 久久久国产一级片| 少妇精品久久久一区二区| 中文字幕日本精品| 欧美另类videoxo高潮| 色综合天天综合网中文字幕| 久久久999精品| 亚洲综合网在线| 欧美激情成人| 久久久爽爽爽美女图片| 中文字幕视频网| 美女国产一区二区| 99re在线| 日韩精品视频无播放器在线看| 2021国产精品久久精品| 亚洲最大免费| 特级毛片在线| 在线视频亚洲一区| 精产国品一区二区三区| 欧美丝袜足交| 久久久精品视频成人| 国产又大又黑又粗免费视频| 天堂av在线一区| 成人国产精品免费视频| 亚洲AV无码一区二区三区少妇| 国产成人a级片| 日本一区二区三区www| h片在线免费| 欧美日韩亚洲一区二区| 91插插插影院| 国产精品日韩精品中文字幕| 欧美精品在线观看91| 青青视频在线免费观看| 成人性色生活片| 亚洲一区二区三区精品动漫| 女人让男人操自己视频在线观看| 欧美精品视频www在线观看| 国产情侣久久久久aⅴ免费| 日本久久精品| 国产69精品久久久久久| 中文字幕日韩第一页| 国产盗摄精品一区二区三区在线 | 91免费视频黄| 视频在线日韩| 日韩av在线最新| av激情在线观看| 性欧美长视频| 久久免费视频1| 国产精品偷拍| 日韩精品在线一区二区| 久久嫩草捆绑紧缚| 日产国产欧美视频一区精品| 久久综合狠狠综合久久综青草| 久久电影网站| 日韩欧美久久久| 国产av 一区二区三区| 久久成人精品无人区| 亚洲春色在线| 99久久亚洲国产日韩美女| 国产亚洲精品久久久久久牛牛 | 亚洲精选视频在线| 午夜剧场高清版免费观看 | 蜜桃视频一区二区三区在线观看| 蜜桃传媒视频麻豆一区| 国产va在线视频| 亚洲第一免费网站| 国产在线观看你懂的| 国产高清精品久久久久| 大片在线观看网站免费收看| 成人久久精品| 欧美成人全部免费| 国产高潮流白浆喷水视频| 亚洲乱码精品一二三四区日韩在线| 99精品视频国产| 欧美激情电影| 国产精品国产福利国产秒拍 | 国产一级做a爰片久久毛片男| 免费观看性欧美大片无片| 欧美www在线| 懂色av成人一区二区三区| 亚洲一区二区在线播放相泽| 国产午夜在线一区二区三区| 女人天堂亚洲aⅴ在线观看| 国产精品av网站| 成人三级黄色免费网站| 欧美三级日韩三级| 国产精品国产三级国产传播| 国产精品一区三区| 国产v片免费观看| 在线日本制服中文欧美| 国产精品普通话| 国产网站在线免费观看| 精品电影一区二区| 久草手机在线视频| 久久婷婷国产综合国色天香 | 视频一区在线播放| 亚洲韩国在线| 成人资源在线| 国产成人精品一区二区三区| 伊人在线视频| 精品久久久影院| 成人免费毛片男人用品| 国产精品伦一区二区三级视频| 亚洲制服在线观看| 午夜在线视频观看日韩17c| 图片区小说区区亚洲五月| 国产精品国产三级在线观看| 欧美精品激情blacked18| 国产综合在线观看| 欧美一区二区啪啪| 青青草免费观看视频| 中文字幕一区三区| 国产毛片毛片毛片毛片毛片毛片| 日韩精品亚洲专区| 路边理发店露脸熟妇泻火| 欧美国产不卡| 91精品久久久久久久久久久| 麻豆av在线播放| 夜夜躁日日躁狠狠久久88av| 99国产精品久久久久99打野战| 狠狠色狠狠色综合日日小说 | 亚洲国产成人tv| 能直接看的av| 99久久精品免费看| 小明看看成人免费视频| 午夜在线视频一区二区区别| 亚洲精品国产suv一区88| 欧美最新另类人妖| 国产亚洲二区| 人人九九精品视频| 国产精品丝袜久久久久久不卡| 2020国产在线视频| 中文字幕久热精品在线视频| 亚洲色欧美另类| 欧美成人a∨高清免费观看| 中文字幕乱码视频| 欧美日韩美女在线| 手机av在线看| 中文字幕av一区 二区| 成人手机在线免费视频| 国产福利不卡视频| 亚洲久久中文字幕| 噜噜噜在线观看免费视频日韩| 成人免费网站入口| 亚洲国产精品日韩专区av有中文| 日韩国产在线一区| 亚洲欧洲美洲国产香蕉| 国产91视觉| 日韩国产在线不卡视频| 成人动漫网站在线观看| 精品69视频一区二区三区| 日本不卡免费高清视频| 午夜小视频在线观看| 亚洲全黄一级网站| 超碰在线人人干| 欧美一区二区人人喊爽| 99热这里是精品| 日韩一区二区三区观看| 国产美女www爽爽爽视频| 欧美精品123区| 国产口爆吞精一区二区| 91麻豆精品国产| 99热这里只有精品66| 日韩欧美高清一区| 亚洲精品综合久久| 精品国产乱码久久久久久闺蜜| 丰满熟妇人妻中文字幕| 亚洲成人aaa| 亚洲 小说区 图片区 都市| 国产视频一区在线| 久青青在线观看视频国产| 亚洲人成在线一二| 国产在线黄色| 亚洲欧美在线一区二区| 亚洲成人中文字幕在线| 欧美精品一区二区三| 欧美自拍偷拍一区二区| 亚洲男人的天堂网站| 天天操天天射天天舔| 亚洲白拍色综合图区| 亚洲黄色在线免费观看| 亚洲第一视频在线观看| 青青草免费在线| 中文字幕在线观看日韩| 久久综合之合合综合久久| 欧美黄色成人网| 亚洲涩涩在线| 国产精品69av| 中文成人在线| 国产精品区一区| 国产麻豆精品久久| 中文字幕成人一区| 亚洲私拍自拍| 六月激情综合网| 激情欧美日韩一区二区| www.com久久久| 国产精品亚洲视频| 中文字幕在线永久| 中文子幕无线码一区tr| 欧美做爰爽爽爽爽爽爽| 亚洲成人自拍网| 久久精品偷拍视频| 91精品福利在线一区二区三区 | 337p亚洲精品色噜噜噜| 亚洲成人黄色片| 亚洲人成伊人成综合网久久久| 色的视频在线免费看| 欧美大片免费观看| 自由日本语热亚洲人| 国产在线不卡精品| 精品91福利视频| 国产一区二区精品免费| 97国产成人高清在线观看| 4444亚洲人成无码网在线观看| 欧美中文日韩| 图片区偷拍区小说区| 日本一区二区三级电影在线观看| 麻豆视频在线观看| 91福利视频网站| 女人18毛片水真多18精品| 日韩亚洲欧美成人| 黄视频网站在线观看| 69堂成人精品视频免费| 精品国产视频| 国产情侣第一页| 日本在线不卡一区| 中文字幕第3页| 亚洲天堂av一区| 亚洲综合成人av| 亚洲精品中文字| 国产极品人妖在线观看| 成人做爽爽免费视频| 中国老熟女重囗味hdxx| 国产老妇另类xxxxx| 精品无人区无码乱码毛片国产| 午夜国产精品影院在线观看| 国产精品久久久久久久免费看 | 亚洲va欧美va天堂v国产综合| 亚洲天堂免费av| 中文字幕在线日韩| 欧美日韩免费看片| 久久精品综合一区| 影音先锋中文字幕一区| 岛国精品一区二区三区| 亚洲另类在线制服丝袜| 欧美成人手机视频| 欧美一级日韩一级| 免费a级毛片在线播放| 国产精品第3页| 欧洲专线二区三区| www.avtt| 国产99久久久国产精品潘金| www.99热| 欧美日韩在线精品一区二区三区激情| 五月天激情开心网| 午夜精品久久久99热福利| 97se亚洲| 欧美一级爱爱视频| 国产乱子伦视频一区二区三区| 毛片视频免费播放| 精品视频免费在线| 在线看av的网址| 国产日韩欧美91| 婷婷亚洲五月色综合| 看看黄色一级片| 国产精品福利电影一区二区三区四区| 天天天天天天天干| www日韩欧美| 激情五月综合婷婷| 免费的一级黄色片| 成人99免费视频| 天天干天天干天天干天天| 亚洲视频在线观看| 黄色欧美视频| 亚洲成人动漫在线| 福利一区二区在线观看| 日韩av综合在线| 精品少妇一区二区三区日产乱码| а天堂中文在线官网| 麻豆一区二区| 中文字幕剧情在线观看一区| 国产一区二区免费看| 国产亚洲色婷婷久久99精品| 日韩激情av在线播放| 国产精品蜜月aⅴ在线| 台湾无码一区二区| 久久久www成人免费毛片麻豆| 国产精品无码在线播放| 午夜精品一区二区三区av| 成人毛片免费看| 97中文字幕在线观看| 色婷婷狠狠综合| a天堂中文在线官网在线| 精品久久蜜桃| 国精品**一区二区三区在线蜜桃 | 精品99又大又爽又硬少妇毛片| 国产欧美va欧美va香蕉在| 亚洲午夜在线| 在线观看免费小视频| 精品免费国产二区三区| 亚洲第一会所| 精品这里只有精品| 亚洲日本在线a| 高清美女视频一区| 国产日韩久久|