Commit 33b2743c authored by liaowenwu's avatar liaowenwu

修改连接数

parent bcb2bc6c
package com.dsk.flink.dsc.common.function;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.dsk.flink.dsc.utils.EnvProperties;
import org.apache.flink.api.java.tuple.Tuple4;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public abstract class AbstractDbHandler implements ProcHandler{
protected JSONObject value;
protected EnvProperties dbInfoMap;
public AbstractDbHandler(JSONObject data, EnvProperties dbInfoMap) {
this.value = data;
this.dbInfoMap = dbInfoMap;
}
@Override
public Map<String, Object> handler() throws Exception {
String type = value.getString("type");
JSONObject mysqlType = value.getJSONObject("mysqlType");
String table = value.getString("table");
JSONArray pkNames = value.getJSONArray("pkNames");
Set<String> pkNameSet = new HashSet<>();
long ts = value.getLong("ts") == null ? System.currentTimeMillis() : value.getLong("ts");
if(CollUtil.isNotEmpty(pkNames)){
pkNames.forEach(name -> pkNameSet.add(String.valueOf(name)));
}
String excueteSql;
JSONObject dataObj = value.getJSONArray("data").getJSONObject(0);
Boolean logicalDelete = MapUtil.getBool(dbInfoMap, "logical_delete", false);
if(logicalDelete){
mysqlType.put("is_del", "int");
dataObj.put("is_del", "DELETE".equals(type) ? 1 : 0);
}
StringBuilder groupKeyBuilder = new StringBuilder(table);
StringBuilder pkColumns = new StringBuilder();
StringBuilder pkColumnVals = new StringBuilder();
for (String pk : pkNameSet) {
String pkValue = getValueString(dataObj, pk, mysqlType.getString(pk));
groupKeyBuilder.append("-").append(pkValue);
pkColumns.append(pk).append(",");
pkColumnVals.append(pkValue).append("-");
}
String groupKey = groupKeyBuilder.toString();
//添加分表参数
String shardingRule = dataObj.getString("shardingRule");
if (StrUtil.isNotBlank(shardingRule)) {
Map<String,Object> map = JSON.parseObject(shardingRule, Map.class);
String strategy = MapUtil.getStr(map, "strategy");
Map<String,Object> strategyMap = JSON.parseObject(strategy, Map.class);
int i = MapUtil.getInt(strategyMap, "sharding-count");
if (i > 1){
String str = MapUtil.getStr(strategyMap, "sharding-column");
Integer val = dataObj.getInteger(str);
val = val == null ? 1 : val;
table = table.concat("_").concat(String.valueOf(val % i));
}
}
if("INSERT".equals(type) || "UPDATE".equals(type)){
excueteSql = tranferInsertSql(table,dataObj,mysqlType,pkNameSet);
} else {
excueteSql = logicalDelete ? tranferInsertSql(table,dataObj,mysqlType,pkNameSet) : transferDeleteSql(table,dataObj,mysqlType,pkNameSet);
}
Map<String,Object> ret = MapUtil.newHashMap();
ret.put(MAIN_KEY, Tuple4.of(excueteSql,groupKey,ts, type));
ret.put(SLIDE_KEY, buildLogData(type, table, pkColumns, pkColumnVals, ts, value.toJSONString()));
return ret;
}
protected abstract String tranferInsertSql(String table, JSONObject dataObj, JSONObject mysqlType, Set<String> pkNameSet);
protected abstract String transferDeleteSql(String table, JSONObject dataObj, JSONObject mysqlType, Set<String> pkNameSet);
}
package com.dsk.flink.dsc.common.function;
import cn.hutool.core.map.MapUtil;
import com.alibaba.fastjson.JSONObject;
import com.dsk.flink.dsc.utils.EnvProperties;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.HashMap;
import java.util.Map;
/**
* 重构代码
* @author lww
* @date 2025-01-14
*/
public class DataProcFunction extends ProcessFunction<JSONObject, Tuple4<String, String, Long, String>> {
private static final Map<String, ProcHandlerFactory> FACTORY_MAP = new HashMap<>();
private final EnvProperties dbInfoMap;
private final OutputTag<Tuple6<String,String,String,String,String,Long>> logSlideTag;
static {
FACTORY_MAP.put("mysql", new MySQLHandlerFactory());
FACTORY_MAP.put("pg", new PgHandlerFactory());
}
public DataProcFunction(EnvProperties envProps, OutputTag<Tuple6<String,String,String,String,String,Long>> logSlideTag) {
this.dbInfoMap = envProps;
this.logSlideTag = logSlideTag;
}
@Override
public void processElement(JSONObject value, Context ctx, Collector<Tuple4<String, String, Long, String>> out) throws Exception {
ProcHandlerFactory factory = FACTORY_MAP.get(dbInfoMap.getDb_type());
Map<String, Object> handler = factory.createDbHandler(value,dbInfoMap).handler();
if (MapUtil.getBool(dbInfoMap, "log_enable", false)){
ctx.output(logSlideTag,(Tuple6<String,String,String,String,String,Long>)handler.get("slide"));
}
out.collect((Tuple4<String, String, Long, String>)handler.get("main"));
}
}
package com.dsk.flink.dsc.common.function;
import com.alibaba.fastjson.JSONObject;
import com.dsk.flink.dsc.utils.EnvProperties;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
public class MySQLHandler extends AbstractDbHandler{
public MySQLHandler(JSONObject data, EnvProperties dbInfoMap) {
super(data, dbInfoMap);
}
@Override
protected String tranferInsertSql(String table, JSONObject dataObj, JSONObject mysqlType,Set<String> pkNameSet) {
Set<String> columnSet = mysqlType.keySet();
StringBuilder sb = new StringBuilder("REPLACE INTO ").append(table).append(" (");
List<String> valueList = new ArrayList<>();
for (String s : columnSet) {
sb.append("`").append(s).append("`,");
valueList.add(getValueString(dataObj, s, mysqlType.getString(s)));
}
sb.setLength(sb.length() - 1);
sb.append(") values (");
sb.append(String.join(",", valueList));
sb.append(")");
return sb.toString();
}
@Override
protected String transferDeleteSql(String table, JSONObject dataObj, JSONObject mysqlType, Set<String> pkNameSet) {
StringBuilder whereClauseBuilder = new StringBuilder();
for (String pk : pkNameSet) {
if (whereClauseBuilder.length() > 0) {
whereClauseBuilder.append(" and ");
}
whereClauseBuilder.append(pk).append(" = ").append(getValueString(dataObj, pk, mysqlType.getString(pk)));
}
return String.format("DELETE FROM %s WHERE %s",table, whereClauseBuilder);
}
}
package com.dsk.flink.dsc.common.function;
import com.alibaba.fastjson.JSONObject;
import com.dsk.flink.dsc.utils.EnvProperties;
/**
* mysql
* @author lww
* @date 2025/12/16
*/
public class MySQLHandlerFactory implements ProcHandlerFactory{
@Override
public ProcHandler createDbHandler(JSONObject val, EnvProperties dbInfoMap) {
return new MySQLHandler(val, dbInfoMap);
}
}
package com.dsk.flink.dsc.common.function;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.dsk.flink.dsc.utils.EnvProperties;
import org.apache.flink.api.java.tuple.Tuple4;
import java.util.*;
public class PgHandler extends AbstractDbHandler{
public PgHandler(JSONObject data, EnvProperties dbInfoMap) {
super(data, dbInfoMap);
}
@Override
protected String tranferInsertSql(String table, JSONObject dataObj, JSONObject mysqlType, Set<String> pkNameSet) {
Set<String> columnSet = mysqlType.keySet();
StringBuilder sb = new StringBuilder("INSERT INTO ").append(table).append(" (");
List<String> valueList = new ArrayList<>();
for (String s : columnSet) {
sb.append("`").append(s).append("`,");
valueList.add(getValueString(dataObj, s, mysqlType.getString(s)));
}
sb.setLength(sb.length() - 1);
sb.append(") values (");
sb.append(String.join(",", valueList));
sb.append(") ON CONFLICT (");
sb.append(CollUtil.join(pkNameSet,",")).append(") DO UPDATE SET ");
for (String s : columnSet) {
sb.append(s).append("= EXCLUDED.").append(s).append(",");
}
sb.setLength(sb.length() - 1);
return sb.toString();
}
@Override
protected String transferDeleteSql(String table, JSONObject dataObj, JSONObject mysqlType, Set<String> pkNameSet) {
StringBuilder whereClauseBuilder = new StringBuilder();
for (String pk : pkNameSet) {
if (whereClauseBuilder.length() > 0) {
whereClauseBuilder.append(" and ");
}
whereClauseBuilder.append(pk).append(" = ").append(getValueString(dataObj, pk, mysqlType.getString(pk)));
}
return String.format("DELETE FROM %s WHERE %s",table, whereClauseBuilder);
}
}
package com.dsk.flink.dsc.common.function;
import com.alibaba.fastjson.JSONObject;
import com.dsk.flink.dsc.utils.EnvProperties;
/**
* postgreSQL
* @author lww
* @date 2025/12/16
*/
public class PgHandlerFactory implements ProcHandlerFactory{
@Override
public ProcHandler createDbHandler(JSONObject val, EnvProperties dbInfoMap) {
return new PgHandler(val, dbInfoMap);
}
}
package com.dsk.flink.dsc.common.function;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple6;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Date;
import java.util.Map;
public interface ProcHandler {
String MAIN_KEY = "main";
String SLIDE_KEY = "slide";
DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd");
DateTimeFormatter DATETIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
Map<String,Integer> STR_SQL_TYPE = initMap();
static Map<String,Integer> initMap() {
Map<String,Integer> STR_SQL_TYPE = MapUtil.newHashMap();
STR_SQL_TYPE.put("VARCHAR",1);
STR_SQL_TYPE.put("CHAR",1);
STR_SQL_TYPE.put("TINYBLOB",1);
STR_SQL_TYPE.put("BLOB",1);
STR_SQL_TYPE.put("MEDIUMBLOB",1);
STR_SQL_TYPE.put("LONGBLOB",1);
STR_SQL_TYPE.put("TINYTEXT",1);
STR_SQL_TYPE.put("TEXT",1);
STR_SQL_TYPE.put("MEDIUMTEXT",1);
STR_SQL_TYPE.put("LONGTEXT",1);
STR_SQL_TYPE.put("TIME",1);
STR_SQL_TYPE.put("JSON",1);
return Collections.unmodifiableMap(STR_SQL_TYPE);
}
default String getValueString(JSONObject dataObj, String columnKey, String mysqlType){
if(null == dataObj.get(columnKey)){
return "null";
}
String upperCase = mysqlType.toUpperCase();
//需要处理成字符串加引号的类型
if(STR_SQL_TYPE.containsKey(upperCase)){
String step1 = StrUtil.replace(dataObj.getString(columnKey), "\\", "\\\\");
return "'" + StrUtil.replace(step1, "'", "\\'") + "'";
//return String.format("'%s'", dataObj.getString(columnKey).replace("\\","\\\\").replace("'", "\\'") );
}
//时间字段处理
if("DATE".equals(upperCase)
|| "DATETIME".equals(upperCase)
|| "TIMESTAMP".equals(upperCase)){
Date d = dataObj.getDate(columnKey);
if (d == null) {
return "";
}
if ("TIMESTAMP".equals(upperCase)) {
upperCase = "DATETIME";
}
String originalDateStr = dataObj.getString(columnKey);
if (originalDateStr != null && originalDateStr.startsWith("000")) {
return "'0000-00-00" + ("DATETIME".equals(upperCase) ? " 00:00:00'" : "'");
}
LocalDateTime dateTime = LocalDateTime.ofInstant(d.toInstant(), ZoneId.systemDefault());
String date = "DATETIME".equals(upperCase) ? DATETIME_FORMAT.format(dateTime) : DATE_FORMAT.format(dateTime);
return String.format("'%s'",date);
}
return dataObj.getString(columnKey);
}
default Tuple6<String,String,String,String,String,Long> buildLogData(String type, String table, StringBuilder pkColumns, StringBuilder pkValues, long ts, String dataJsonStr) {
if (pkColumns.length() > 0) {
pkColumns.setLength(pkColumns.length()-1);
pkValues.setLength(pkValues.length()-1);
}
String step1 = StrUtil.replace(dataJsonStr, "\\", "\\\\");
String step2 = StrUtil.replace(step1, "'", "\\'");
return Tuple6.of(table,type, pkColumns.toString(), pkValues.toString().replace("'",""),step2,ts);
}
Map<String,Object> handler() throws Exception;
}
package com.dsk.flink.dsc.common.function;
import com.alibaba.fastjson.JSONObject;
import com.dsk.flink.dsc.utils.EnvProperties;
public interface ProcHandlerFactory {
ProcHandler createDbHandler(JSONObject val, EnvProperties dbInfoMap);
}
...@@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollUtil; ...@@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.dsk.flink.dsc.common.function.DataProcFunction;
import com.dsk.flink.dsc.common.function.MysqlDataTransferFunction; import com.dsk.flink.dsc.common.function.MysqlDataTransferFunction;
import com.dsk.flink.dsc.common.sink.MysqlDataSlideSink; import com.dsk.flink.dsc.common.sink.MysqlDataSlideSink;
import com.dsk.flink.dsc.common.sink.MysqlDataTransferSink; import com.dsk.flink.dsc.common.sink.MysqlDataTransferSink;
...@@ -110,7 +111,7 @@ public class SyncCustomerDataSource { ...@@ -110,7 +111,7 @@ public class SyncCustomerDataSource {
OutputTag<Tuple6<String,String,String,String,String,Long>> logSlideTag = new OutputTag<Tuple6<String,String,String,String,String,Long>>("log_slide") {}; OutputTag<Tuple6<String,String,String,String,String,Long>> logSlideTag = new OutputTag<Tuple6<String,String,String,String,String,Long>>("log_slide") {};
SingleOutputStreamOperator<Tuple4<String, String, Long, String>> slide = tsGroupStream SingleOutputStreamOperator<Tuple4<String, String, Long, String>> slide = tsGroupStream
.process(new MysqlDataTransferFunction(envProps,logSlideTag)) .process(new DataProcFunction(envProps,logSlideTag))
.name("dsc-sql") .name("dsc-sql")
.uid("dsc-sql"); .uid("dsc-sql");
......
...@@ -117,6 +117,17 @@ public class EnvProperties extends Properties { ...@@ -117,6 +117,17 @@ public class EnvProperties extends Properties {
String log_enable; String log_enable;
String db_type;
public String getDb_type() {
String t = db_type == null ? this.getProperty("db_type") : db_type;
return t == null ? "mysql" : t;
}
public void setDb_type(String db_type) {
this.db_type = db_type;
}
public String getLog_enable() { public String getLog_enable() {
return logical_delete == null ? this.getProperty("logical_delete") : logical_delete; return logical_delete == null ? this.getProperty("logical_delete") : logical_delete;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment