flink 单流 kafka 写入 mysqld
配置参考
安装kafka
进入容器
docker exec -it kafka /bin/bash
cd opt/kafka
创建一个主题
bin/kafka-topics.sh --create --zookeeper 192.168.43.50:2181 --replication-factor 1 --partitions 1 --topic flink_test
启动flink sql client
bin/sql-client.sh embedded
sink mysql 创建语句
CREATE TABLE test_result (
`day_time` varchar(64) NOT NULL,
`total_amount` bigint(11) DEFAULT NULL,
PRIMARY KEY (`day_time`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;
配置语句
create table kafka_source (
id BIGINT,
day_time VARCHAR,
amnount BIGINT,
proctime AS PROCTIME ()
)
with (
'connector' = 'kafka',
'topic' = 'flink_test',
'properties.bootstrap.servers' = '192.168.43.50:9092',
'properties.group.id' = 'flink_gp_test1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'properties.zookeeper.connect' = '192.168.43.50:2181/kafka'
);
CREATE TABLE mysql_sink (
day_time string,
total_amount bigint,
PRIMARY KEY (day_time) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.43.50:3306/test?characterEncoding=UTF-8',
'table-name' = 'test_result',
'username' = 'root',
'password' = '123456'
);
INSERT INTO mysql_sink
SELECT day_time,SUM(amnount) AS total_amount
FROM kafka_source
GROUP BY day_time;
运行一个生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink_test
发送数据
{"day_time": "20211001","id": 7,"amnount":2}
{"day_time": "20211002","id": 7,"amnount":20}
{"day_time": "20211003","id": 7,"amnount":30}
{"day_time": "20211004","id": 7,"amnount":40}
{"day_time": "20211005","id": 7,"amnount":50}
{"day_time": "20211006","id": 7,"amnount":60}
{"day_time": "20211007","id": 7,"amnount":70}
{"day_time": "20211008","id": 7,"amnount":80}
{"day_time": "20211009","id": 7,"amnount":90}
{"day_time": "20211010","id": 7,"amnount":20}
{"day_time": "20211010","id": 7,"amnount":30}
{"day_time": "20211010","id": 7,"amnount":40}
mysql test_result 表数据
20211001 2
20211002 20
20211003 30
20211004 40
20211005 50
20211006 60
20211007 70
20211008 80
20211009 90
20211010 50
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦