Commit 316af161 authored by liaowenwu's avatar liaowenwu

修改错误时间格式的数据

parent 88805488
...@@ -162,6 +162,10 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple ...@@ -162,6 +162,10 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
if (d == null) { if (d == null) {
return ""; return "";
} }
String originalDateStr = dataObj.getString(columnKey);
if (originalDateStr != null && originalDateStr.startsWith("0000-00-00")) {
return "'0000-00-00" + ("DATETIME".equals(upperCase) ? " 00:00:00'" : "'");
}
LocalDateTime dateTime = LocalDateTime.ofInstant(d.toInstant(), ZoneId.systemDefault()); LocalDateTime dateTime = LocalDateTime.ofInstant(d.toInstant(), ZoneId.systemDefault());
String date = "DATETIME".equals(upperCase) ? DATETIME_FORMAT.format(dateTime) : DATE_FORMAT.format(dateTime); String date = "DATETIME".equals(upperCase) ? DATETIME_FORMAT.format(dateTime) : DATE_FORMAT.format(dateTime);
return String.format("'%s'",date); return String.format("'%s'",date);
......
...@@ -39,7 +39,7 @@ public class MysqlDataSlideSink extends RichSinkFunction<Tuple6<String,String,St ...@@ -39,7 +39,7 @@ public class MysqlDataSlideSink extends RichSinkFunction<Tuple6<String,String,St
@Override @Override
public void open(Configuration parameters) throws Exception { public void open(Configuration parameters) throws Exception {
executorService = new ThreadPoolExecutor(8, 20, 60, executorService = new ThreadPoolExecutor(5, 10, 60,
TimeUnit.SECONDS, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(QUEUE_CAPACITY), new LinkedBlockingDeque<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy()); new ThreadPoolExecutor.CallerRunsPolicy());
......
...@@ -15,7 +15,7 @@ public class EtlUtils { ...@@ -15,7 +15,7 @@ public class EtlUtils {
properties.setProperty("auto.offset.reset", "earliest"); properties.setProperty("auto.offset.reset", "earliest");
properties.setProperty("sasl.jaas.config", getSaslJaasConfig(username,password)); properties.setProperty("sasl.jaas.config", getSaslJaasConfig(username,password));
properties.setProperty("security.protocol", "SASL_PLAINTEXT"); properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "SCRAM-SHA-512"); properties.setProperty("sasl.mechanism", "SCRAM-SHA-256");
properties.setProperty("fetch.max.bytes", "20971520"); //20M properties.setProperty("fetch.max.bytes", "20971520"); //20M
properties.setProperty("flink.consumer.max.fetch.size", "20971520");//20M properties.setProperty("flink.consumer.max.fetch.size", "20971520");//20M
properties.setProperty("session.timeout.ms", "60000"); properties.setProperty("session.timeout.ms", "60000");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment