问答

springboot 整合 flink 处理Kafka数据 动态写入HDFS不同文件上

作者:admin 2021-04-17 我要评论

package cn.nucarf.tianyan.service.dwd; import cn.nucarf.tianyan.config.AB; import cn.nucarf.tianyan.config.ProYml; import cn.nucarf.tianyan.config.Sin...

在说正事之前,我要推荐一个福利:你还在原价购买阿里云、腾讯云、华为云服务器吗?那太亏啦!来这里,新购、升级、续费都打折,能够为您省60%的钱呢!2核4G企业级云服务器低至69元/年,点击进去看看吧>>>)

package cn.nucarf.tianyan.service.dwd;
import cn.nucarf.tianyan.config.AB;
import cn.nucarf.tianyan.config.ProYml;
import cn.nucarf.tianyan.config.SinkHDFS;
import cn.nucarf.tianyan.pojo.bean.AllocationBean;
import cn.nucarf.tianyan.pojo.event.AllocationEvent;
import cn.nucarf.tianyan.pojo.eventmessage.AllocationAndRecovery;
import cn.nucarf.tianyan.service.dwd.Impl.StreamInputImpl;
import cn.nucarf.tianyan.utill.DateU;
import com.alibaba.fastjson.JSON;
import org.apache.calcite.util.Static;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.Serializable;
import java.lang.reflect.Array;
import java.nio.Buffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
转账 分配记录 分配 回收表 */
@Service
public class DwdAllocation implements StreamInputImpl,Serializable {

private static final String kafkaTopic = "WJY_ALLO_RECYC_LOG";

private static String CREATE_GMT ="2020-05-03";
private static final Long TIME = 1617206400000L;
@Autowired
SinkHDFS sinkHDFS;
@Autowired
ProYml proYml;
@Override
public void DsPro(StreamExecutionEnvironment env, Properties properties) {

 //   System.out.println(proYml.getAutoCommitInterval());

DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<>(kafkaTopic, new SimpleStringSchema(), properties));
stream.flatMap(new FlatMapFunction<String, String>() {

        @Override

public void flatMap(String s, Collector<String> collector) throws Exception {

            AllocationBean allocationBean = JSON.parseObject(s, AllocationBean.class);

List<AllocationEvent> event = allocationBean.getEvent();
for (AllocationEvent e : event) {

                if (e.getEventTime() > TIME || e.getEventTime() < TIME && e.getEventName().equals("his_data_json")) {
                    AllocationAndRecovery eventMessage = e.getEventMessage();

final String date = DateU.getDate(eventMessage.getCreateGmt());
CREATE_GMT = date;
String line = JSON.toJSONString(eventMessage);
String arr = JSON.parseObject(line).values().toString();
collector.collect(arr);
}

            }
        }
    })
 .addSink(sinkHDFS.sinkHDFS(proYml.getDwdAlloRecycRecord(),CREATE_GMT));

}
}

package cn.nucarf.tianyan.service.dwd;
import cn.nucarf.tianyan.config.AB;
import cn.nucarf.tianyan.config.ProYml;
import cn.nucarf.tianyan.config.SinkHDFS;
import cn.nucarf.tianyan.pojo.bean.AllocationBean;
import cn.nucarf.tianyan.pojo.event.AllocationEvent;
import cn.nucarf.tianyan.pojo.eventmessage.AllocationAndRecovery;
import cn.nucarf.tianyan.service.dwd.Impl.StreamInputImpl;
import cn.nucarf.tianyan.utill.DateU;
import com.alibaba.fastjson.JSON;
import org.apache.calcite.util.Static;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.Serializable;
import java.lang.reflect.Array;
import java.nio.Buffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
转账 分配记录 分配 回收表 */
@Service
public class DwdAllocation implements StreamInputImpl,Serializable {

private static final String kafkaTopic = "WJY_ALLO_RECYC_LOG";

private static String CREATE_GMT ="2020-05-03";
private static final Long TIME = 1617206400000L;
@Autowired
SinkHDFS sinkHDFS;
@Autowired
ProYml proYml;
@Override
public void DsPro(StreamExecutionEnvironment env, Properties properties) {

 //   System.out.println(proYml.getAutoCommitInterval());

DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<>(kafkaTopic, new SimpleStringSchema(), properties));
stream.flatMap(new FlatMapFunction<String, String>() {

        @Override

public void flatMap(String s, Collector<String> collector) throws Exception {

            AllocationBean allocationBean = JSON.parseObject(s, AllocationBean.class);

List<AllocationEvent> event = allocationBean.getEvent();
for (AllocationEvent e : event) {

                if (e.getEventTime() > TIME || e.getEventTime() < TIME && e.getEventName().equals("his_data_json")) {
                    AllocationAndRecovery eventMessage = e.getEventMessage();

final String date = DateU.getDate(eventMessage.getCreateGmt());
CREATE_GMT = date;
String line = JSON.toJSONString(eventMessage);
String arr = JSON.parseObject(line).values().toString();
collector.collect(arr);
}

            }
        }
    })
 .addSink(sinkHDFS.sinkHDFS(proYml.getDwdAlloRecycRecord(),CREATE_GMT));

}
}

题目描述

题目来源及自己的思路

相关代码

粘贴代码文本(请勿用截图)

你期待的结果是什么?实际看到的错误信息又是什么?

版权声明:本文转载自网络,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。本站转载出于传播更多优秀技术知识之目的,如有侵权请联系QQ/微信:153890879删除

相关文章
  • nginx响应速度很慢

    nginx响应速度很慢

  • 点击选中的多选框,会在已选那一栏显示

    点击选中的多选框,会在已选那一栏显示

  • PHP 多态的理解

    PHP 多态的理解

  • 关于C语言中static的问题

    关于C语言中static的问题

腾讯云代理商
海外云服务器