Commit 40cd302b authored by liaowenwu's avatar liaowenwu

修改错误时间格式的数据

parent 316af161
...@@ -7,7 +7,7 @@ import com.alibaba.fastjson.JSON; ...@@ -7,7 +7,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.dsk.flink.dsc.utils.EnvProperties; import com.dsk.flink.dsc.utils.EnvProperties;
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple6; import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
...@@ -22,7 +22,7 @@ import java.util.*; ...@@ -22,7 +22,7 @@ import java.util.*;
* @author lww * @author lww
* @date 2025-01-14 * @date 2025-01-14
*/ */
public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple3<String,String,Long>> { public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple4<String, String, Long, String>> {
private static final Map<String,Integer> STR_SQL_TYPE; private static final Map<String,Integer> STR_SQL_TYPE;
private final EnvProperties dbInfoMap; private final EnvProperties dbInfoMap;
...@@ -41,7 +41,7 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple ...@@ -41,7 +41,7 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
STR_SQL_TYPE.put("MEDIUMTEXT",1); STR_SQL_TYPE.put("MEDIUMTEXT",1);
STR_SQL_TYPE.put("LONGTEXT",1); STR_SQL_TYPE.put("LONGTEXT",1);
STR_SQL_TYPE.put("TIME",1); STR_SQL_TYPE.put("TIME",1);
STR_SQL_TYPE.put("TIMESTAMP",1); //STR_SQL_TYPE.put("TIMESTAMP",1);
STR_SQL_TYPE.put("JSON",1); STR_SQL_TYPE.put("JSON",1);
} }
...@@ -51,7 +51,7 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple ...@@ -51,7 +51,7 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
} }
@Override @Override
public void processElement(JSONObject value, Context ctx, Collector<Tuple3<String, String, Long>> out) throws Exception { public void processElement(JSONObject value, Context ctx, Collector<Tuple4<String, String, Long, String>> out) throws Exception {
//返回数据集合 //返回数据集合
String type = value.getString("type"); String type = value.getString("type");
JSONObject mysqlType = value.getJSONObject("mysqlType"); JSONObject mysqlType = value.getJSONObject("mysqlType");
...@@ -103,7 +103,7 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple ...@@ -103,7 +103,7 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
if (MapUtil.getBool(dbInfoMap, "log_enable", false)){ if (MapUtil.getBool(dbInfoMap, "log_enable", false)){
ctx.output(logSlideTag,buildLogData(type, table, pkColumns, pkColumnVals, ts, value.toJSONString())); ctx.output(logSlideTag,buildLogData(type, table, pkColumns, pkColumnVals, ts, value.toJSONString()));
} }
out.collect(Tuple3.of(excueteSql,groupKey,ts)); out.collect(Tuple4.of(excueteSql,groupKey,ts, type));
} }
private static Tuple6<String,String,String,String,String,Long> buildLogData(String type, String table, StringBuilder pkColumns, StringBuilder pkValues, long ts, String dataJsonStr) { private static Tuple6<String,String,String,String,String,Long> buildLogData(String type, String table, StringBuilder pkColumns, StringBuilder pkValues, long ts, String dataJsonStr) {
...@@ -157,13 +157,18 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple ...@@ -157,13 +157,18 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
} }
//时间字段处理 //时间字段处理
if("DATE".equals(upperCase) || "DATETIME".equals(upperCase)){ if("DATE".equals(upperCase)
|| "DATETIME".equals(upperCase)
|| "TIMESTAMP".equals(upperCase)){
Date d = dataObj.getDate(columnKey); Date d = dataObj.getDate(columnKey);
if (d == null) { if (d == null) {
return ""; return "";
} }
if ("TIMESTAMP".equals(upperCase)) {
upperCase = "DATETIME";
}
String originalDateStr = dataObj.getString(columnKey); String originalDateStr = dataObj.getString(columnKey);
if (originalDateStr != null && originalDateStr.startsWith("0000-00-00")) { if (originalDateStr != null && originalDateStr.startsWith("000")) {
return "'0000-00-00" + ("DATETIME".equals(upperCase) ? " 00:00:00'" : "'"); return "'0000-00-00" + ("DATETIME".equals(upperCase) ? " 00:00:00'" : "'");
} }
LocalDateTime dateTime = LocalDateTime.ofInstant(d.toInstant(), ZoneId.systemDefault()); LocalDateTime dateTime = LocalDateTime.ofInstant(d.toInstant(), ZoneId.systemDefault());
......
package com.dsk.flink.dsc.sync; package com.dsk.flink.dsc.sync;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
...@@ -13,7 +14,7 @@ import lombok.extern.slf4j.Slf4j; ...@@ -13,7 +14,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple6; import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.CheckpointingMode;
...@@ -34,6 +35,7 @@ import org.apache.flink.util.OutputTag; ...@@ -34,6 +35,7 @@ import org.apache.flink.util.OutputTag;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
...@@ -87,12 +89,12 @@ public class SyncCustomerDataSource { ...@@ -87,12 +89,12 @@ public class SyncCustomerDataSource {
kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets); kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
} }
DataStreamSource<String> dscKafka = env DataStreamSource<String> dscKafka = env
.addSource(kafkaConsumer) .addSource(kafkaConsumer);
.setParallelism(1); //.setParallelism(1);
SingleOutputStreamOperator<JSONObject> tsGroupStream = dscKafka SingleOutputStreamOperator<JSONObject> tsGroupStream = dscKafka
.map(JSONObject::parseObject) .map(JSONObject::parseObject)
.setParallelism(1) //.setParallelism(1)
.name("dsc-json") .name("dsc-json")
.uid("dsc-json") .uid("dsc-json")
.filter(new FilterFunction<JSONObject>() { .filter(new FilterFunction<JSONObject>() {
...@@ -101,13 +103,13 @@ public class SyncCustomerDataSource { ...@@ -101,13 +103,13 @@ public class SyncCustomerDataSource {
return !value.getBoolean("isDdl") && !"TIDB_WATERMARK".equals(value.getString("type")); return !value.getBoolean("isDdl") && !"TIDB_WATERMARK".equals(value.getString("type"));
} }
}) })
.setParallelism(1) //.setParallelism(1)
.name("dsc-source") .name("dsc-source")
.uid("dsc-source"); .uid("dsc-source");
OutputTag<Tuple6<String,String,String,String,String,Long>> logSlideTag = new OutputTag<Tuple6<String,String,String,String,String,Long>>("log_slide") {}; OutputTag<Tuple6<String,String,String,String,String,Long>> logSlideTag = new OutputTag<Tuple6<String,String,String,String,String,Long>>("log_slide") {};
SingleOutputStreamOperator<Tuple3<String, String, Long>> slide = tsGroupStream SingleOutputStreamOperator<Tuple4<String, String, Long, String>> slide = tsGroupStream
.process(new MysqlDataTransferFunction(envProps,logSlideTag)) .process(new MysqlDataTransferFunction(envProps,logSlideTag))
.name("dsc-sql") .name("dsc-sql")
.uid("dsc-sql"); .uid("dsc-sql");
...@@ -115,19 +117,34 @@ public class SyncCustomerDataSource { ...@@ -115,19 +117,34 @@ public class SyncCustomerDataSource {
SingleOutputStreamOperator<String> groupWindowSqlResultStream = slide SingleOutputStreamOperator<String> groupWindowSqlResultStream = slide
.keyBy(value -> value.f1) .keyBy(value -> value.f1)
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))) .window(TumblingProcessingTimeWindows.of(Time.milliseconds(500)))
.process(new ProcessWindowFunction<Tuple3<String, String, Long>, String, String, TimeWindow>() { .process(new ProcessWindowFunction<Tuple4<String, String, Long, String>, String, String, TimeWindow>() {
@Override @Override
public void process(String s, ProcessWindowFunction<Tuple3<String, String, Long>, String, String, public void process(String s, ProcessWindowFunction<Tuple4<String, String, Long, String>, String, String,
TimeWindow>.Context context, Iterable<Tuple3<String, String, Long>> elements, TimeWindow>.Context context, Iterable<Tuple4<String, String, Long, String>> elements,
Collector<String> out) throws Exception { Collector<String> out) throws Exception {
Tuple3<String, String, Long> maxTsElement = null; List<Tuple4<String, String, Long, String>> elementList = CollUtil.newArrayList(elements);
for (Tuple3<String, String, Long> element : elements) { elementList.sort((a, b) -> {
if (maxTsElement == null || element.f2 >= maxTsElement.f2) { if (!a.f2.equals(b.f2)) {
maxTsElement = element; return Long.compare(b.f2, a.f2);
} }
int orderA = getOpPriority(a.f3);
int orderB = getOpPriority(b.f3);
return Integer.compare(orderB, orderA);
});
if (!elementList.isEmpty()) {
out.collect(elementList.get(0).f0);
} }
if (maxTsElement!= null) { }
out.collect(maxTsElement.f0); private int getOpPriority(String opType) {
switch (opType) {
case "INSERT":
return 3;
case "UPDATE":
return 2;
case "DELETE":
return 1;
default:
return 0;
} }
} }
}) })
...@@ -140,37 +157,10 @@ public class SyncCustomerDataSource { ...@@ -140,37 +157,10 @@ public class SyncCustomerDataSource {
DataStream<Tuple6<String,String,String,String,String,Long>> sideOutput = slide.getSideOutput(logSlideTag); DataStream<Tuple6<String,String,String,String,String,Long>> sideOutput = slide.getSideOutput(logSlideTag);
/*sideOutput.addSink(JdbcSink.sink(
"INSERT INTO dsc_cdc_log (`table`,op_type,pk_columns,pk_values,data_json,cdc_ts) values (?,?,?,?,?,?)",
(ps,t) -> {
ps.setString(1,t.f0);
ps.setString(2,t.f1);
ps.setString(3,t.f2);
ps.setString(4,t.f3);
ps.setString(5,t.f4);
ps.setLong(6,t.f5);
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(3)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUrl(getSinkUrl(envProps))
.withUsername(envProps.getDb_username())
.withPassword(envProps.getDb_password())
.build()
)).uid("dsc-log")
.name("dsc-log");*/
sideOutput.addSink(new MysqlDataSlideSink(envProps)).uid("dsc-log") sideOutput.addSink(new MysqlDataSlideSink(envProps)).uid("dsc-log")
.name("dsc-log"); .name("dsc-log");
env.execute("dsc-client"); env.execute("dsc-client");
} }
/*private static String getSinkUrl(EnvProperties envProps) {
return String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database());
}*/
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment