c语言sscanf函数的用法是什么
284
2022-11-14
(1)通过FlinkSQL将数据写入mysql demo
FlinkSQL的出现,极大程度上降低了Flink的编程门槛,更加容易理解和掌握使用。今天将自己的笔记分享出来,希望能帮助在这方面有需要的朋友。
(1)首先引入POM依赖:
(2)编写代码
public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode() //.useOldPlanner() // flink .useBlinkPlanner() // blink .build(); StreamTableEnvironment ste = StreamTableEnvironment.create(env, settings); String ddl = "CREATE TABLE flinksinksds(\r\n" + "componentname STRING,\r\n" + "componentcount INT,\r\n" + "componentsum INT\r\n" + ") WITH(\r\n" + "'connector.type'='jdbc',\r\n" + "'connector.driver' = 'com.mysql.cj.jdbc.Driver'," + "'connector.url'='jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai',\r\n" + "'connector.table'='flinksink',\r\n" + "'connector.username'='root',\r\n" + "'connector.password'='root',\r\n" + "'connector.write.flush.max-rows'='1'\r\n" + ")"; System.err.println(ddl); ste.executeSql(ddl); String insert = "insert into flinksinksds(componentname,componentcount,componentsum)" + "values('1024', 1 , 2 )"; ste.executeSql(insert); env.execute(); System.exit(0);}
(3)执行结果:
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~