问答

kafka中某个topic的分区消息大量积压,怎么处理

作者:admin 2021-07-30 我要评论

由于消费者组中一个分区只能被一个消费者消费。所以如果增加消费者组的形式来增加消费者的话,它们的offset也是不一样的,所以没法保证消费被有序的消费完了 ###...

在说正事之前,我要推荐一个福利:你还在原价购买阿里云、腾讯云、华为云服务器吗?那太亏啦!来这里,新购、升级、续费都打折,能够为您省60%的钱呢!2核4G企业级云服务器低至69元/年,点击进去看看吧>>>)

由于消费者组中一个分区只能被一个消费者消费。所以如果增加消费者组的形式来增加消费者的话,它们的offset也是不一样的,所以没法保证消费被有序的消费完了

###

请允许我直接做一个假设,在消息中存放了一个user_id, 可以将消息重新发送一次,大致模型如下
image.png

  • 补上一段代码, 这里就不使用kafka进行说明,采用Queue 进行展示
package org.huifer.queue;

import java.util.LinkedList;
import java.util.Queue;

public class QueueBigData {

  static Queue<Data> sourceData = new LinkedList<Data>();


  static Queue<Data> u1 = new LinkedList<Data>();
  static Queue<Data> u2 = new LinkedList<Data>();
  static Queue<Data> u3 = new LinkedList<Data>();

  public static void main(String[] args) {
    sourceData.add(new Data(1, "1号用户的第 1 个行为"));
    sourceData.add(new Data(2, "2号用户的第 1 个行为"));
    sourceData.add(new Data(3, "3号用户的第 1 个行为"));
    sourceData.add(new Data(1, "1号用户的第 2 个行为"));
    sourceData.add(new Data(4, "4号用户的第 1 个行为"));

    // 设置处理行为
    IListener listener = event -> {
      if (event instanceof SourceEvent) {
        System.out.println(((SourceEvent) event).info + " : " + ((SourceEvent) event).msg);
      }
    };

    for (Data sourceDatum : sourceData) {
      Integer userId = sourceDatum.getUserId();
      if (1 % userId == 0) {
        u1.add(sourceDatum);
        event(listener, sourceDatum, "处理1号特征的数据");
      } else if (2 % userId == 0) {
        u2.add(sourceDatum);
        event(listener, sourceDatum, "处理2号特征的数据");
      } else if (3 % userId == 0) {
        u3.add(sourceDatum);
        event(listener, sourceDatum, "处理3号特征的数据");
      }
    }

    System.out.println();
  }

  private static void event(IListener listener, Data sourceDatum, String info) {
    SourceEvent event = new SourceEvent(sourceDatum.getMsg(), info);
    event.setListener(listener);
    event.event();
  }

  public interface IListener {

    void doEvent(IEvent event);
  }

  public interface IEvent {

    void setListener(IListener listener);
  }

  private static class Data {

    private Integer userId;
    private String msg;

    public Data() {
    }

    public Data(Integer userId, String msg) {
      this.userId = userId;
      this.msg = msg;
    }

    @Override
    public String toString() {
      return "{\"Data\":{"
          + "\"userId\":"
          + userId
          + ",\"msg\":\""
          + msg + '\"'
          + "}}";

    }

    public Integer getUserId() {
      return userId;
    }

    public void setUserId(Integer userId) {
      this.userId = userId;
    }

    public String getMsg() {
      return msg;
    }

    public void setMsg(String msg) {
      this.msg = msg;
    }
  }

  public static class SourceEvent implements IEvent {

    private final String msg;
    private final String info;
    private IListener listener;

    public SourceEvent(String msg, String info) {
      this.msg = msg;
      this.info = info;
    }

    @Override
    public void setListener(IListener listener) {
      this.listener = listener;
    }

    public void event() {
      listener.doEvent(this);
    }
  }

}

版权声明:本文转载自网络,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。本站转载出于传播更多优秀技术知识之目的,如有侵权请联系QQ/微信:153890879删除

相关文章
  • kafka中某个topic的分区消息大量积压,

    kafka中某个topic的分区消息大量积压,

  •  js export function参数后面的冒号为

    js export function参数后面的冒号为

  • 饼图中重写LabelLine

    饼图中重写LabelLine

  • vue functional 组件,导致渲染死循环

    vue functional 组件,导致渲染死循环

腾讯云代理商
海外云服务器