Tips
Go
(18条消息) Go语言自学系列 | golang包_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang并发编程之channel的遍历_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang并发编程之select switch_COCOgsta的博客-CSDN博客_golang select switch
(18条消息) Go语言自学系列 | golang并发编程之runtime包_COCOgsta的博客-CSDN博客_golang runtime包
(18条消息) Go语言自学系列 | golang接口值类型接收者和指针类型接收者_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang并发编程之Timer_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang方法_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang并发编程之WaitGroup实现同步_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang构造函数_COCOgsta的博客-CSDN博客_golang 构造函数
(18条消息) Go语言自学系列 | golang方法接收者类型_COCOgsta的博客-CSDN博客_golang 方法接收者
(18条消息) Go语言自学系列 | golang接口_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang接口和类型的关系_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang结构体_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang结构体_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang标准库os模块 - File文件读操作_COCOgsta的博客-CSDN博客_golang os.file
(18条消息) Go语言自学系列 | golang继承_COCOgsta的博客-CSDN博客_golang 继承
(18条消息) Go语言自学系列 | golang嵌套结构体_COCOgsta的博客-CSDN博客_golang 结构体嵌套
(18条消息) Go语言自学系列 | golang并发编程之Mutex互斥锁实现同步_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang并发变成之通道channel_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang并发编程之原子操作详解_COCOgsta的博客-CSDN博客_golang 原子操作
(18条消息) Go语言自学系列 | golang并发编程之原子变量的引入_COCOgsta的博客-CSDN博客_go 原子变量
(18条消息) Go语言自学系列 | golang并发编程之协程_COCOgsta的博客-CSDN博客_golang 协程 并发
(18条消息) Go语言自学系列 | golang接口嵌套_COCOgsta的博客-CSDN博客_golang 接口嵌套
(18条消息) Go语言自学系列 | golang包管理工具go module_COCOgsta的博客-CSDN博客_golang 包管理器
(18条消息) Go语言自学系列 | golang标准库os模块 - File文件写操作_COCOgsta的博客-CSDN博客_go os模块
(18条消息) Go语言自学系列 | golang结构体的初始化_COCOgsta的博客-CSDN博客_golang 结构体初始化
(18条消息) Go语言自学系列 | golang通过接口实现OCP设计原则_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang标准库os包进程相关操作_COCOgsta的博客-CSDN博客_golang os包
(18条消息) Go语言自学系列 | golang标准库ioutil包_COCOgsta的博客-CSDN博客_golang ioutil
(18条消息) Go语言自学系列 | golang标准库os模块 - 文件目录相关_COCOgsta的博客-CSDN博客_go语言os库
Golang技术栈,Golang文章、教程、视频分享!
(18条消息) Go语言自学系列 | golang结构体指针_COCOgsta的博客-CSDN博客_golang 结构体指针
Ansible
太厉害了,终于有人能把Ansible讲的明明白白了,建议收藏_互联网老辛
ansible.cfg配置详解
Docker
Docker部署
linux安装docker和Docker Compose
linux 安装 docker
Docker中安装Docker遇到的问题处理
Docker常用命令
docker常用命令小结
docker 彻底卸载
Docker pull 时报错:Get https://registry-1.docker.io/v2/library/mysql: net/http: TLS handshake timeout
Docker 拉镜像无法访问 registry-x.docker.io 问题(Centos7)
docker 容器内没有权限
Linux中关闭selinux的方法是什么?
docker run 生成 docker-compose
Docker覆盖网络部署
docker pull后台拉取镜像
docker hub
Redis
Redis 集群别乱搭,这才是正确的姿势
linux_离线_redis安装
怎么实现Redis的高可用?(主从、哨兵、集群) - 雨点的名字 - 博客园
redis集群离线安装
always-show-logo yes
Redis集群搭建及原理
[ERR] Node 172.168.63.202:7001 is not empty. Either the nodealready knows other nodes (check with CLUSTER NODES) or contains some - 亲爱的不二999 - 博客园
Redis daemonize介绍
redis 下载地址
Redis的redis.conf配置注释详解(三) - 云+社区 - 腾讯云
Redis的redis.conf配置注释详解(一) - 云+社区 - 腾讯云
Redis的redis.conf配置注释详解(二) - 云+社区 - 腾讯云
Redis的redis.conf配置注释详解(四) - 云+社区 - 腾讯云
Linux
在终端连接ssh的断开关闭退出的方法
漏洞扫描 - 灰信网(软件开发博客聚合)
find 命令的参数详解
vim 编辑器搜索功能
非root安装rpm时,mockbuild does not exist
Using a SSH password instead of a key is not possible because Host Key checking
(9条消息) 安全扫描5353端口mDNS服务漏洞问题_NamiJava的博客-CSDN博客_5353端口
Linux中使用rpm命令安装rpm包
ssh-copy-id非22端口的使用方法
How To Resolve SSH Weak Key Exchange Algorithms on CentOS7 or RHEL7 - infotechys.com
Linux cp 命令
yum 下载全量依赖 rpm 包及离线安装(终极解决方案) - 叨叨软件测试 - 博客园
How To Resolve SSH Weak Key Exchange Algorithms on CentOS7 or RHEL7 - infotechys.com
RPM zlib 下载地址
运维架构网站
欢迎来到 Jinja2
/usr/local/bin/ss-server -uv -c /etc/shadowsocks-libev/config.json -f /var/run/s
ruby 安装Openssl 默认安装位置
Linux 常用命令学习 | 菜鸟教程
linux 重命名文件和文件夹
linux命令快速指南
ipvsadm
Linux 下查找日志中的关键字
Linux 切割大 log 日志
CentOS7 关于网络的设置
rsync 命令_Linux rsync 命令用法详解:远程数据同步工具
linux 可视化界面安装
[问题已处理]-执行yum卡住无响应
GCC/G++升级高版本
ELK
Docker部署ELK
ELK+kafka+filebeat+Prometheus+Grafana - SegmentFault 思否
(9条消息) Elasticsearch设置账号密码_huas_xq的博客-CSDN博客_elasticsearch设置密码
Elasticsearch 7.X 性能优化
Elasticsearch-滚动更新
Elasticsearch 的内存优化_大数据系统
Elasticsearch之yml配置文件
ES 索引为Yellow状态
Logstash:Grok filter 入门
logstash grok 多项匹配
Mysql
Mysql相关Tip
基于ShardingJDBC实现数据库读写分离 - 墨天轮
MySQL-MHA高可用方案
京东三面:我要查询千万级数据量的表,怎么操作?
OpenStack
(16条消息) openstack项目中遇到的各种问题总结 其二(云主机迁移、ceph及扩展分区)_weixin_34104341的博客-CSDN博客
OpenStack组件介绍
百度大佬OpenStack流程
openstack各组件介绍
OpenStack生产实际问题总结(一)
OpenStack Train版离线部署
使用Packstack搭建OpenStack
K8S
K8S部署
K8S 集群部署
kubeadm 重新 init 和 join-pudn.com
Kubernetes 实战总结 - 阿里云 ECS 自建 K8S 集群 Kubernetes 实战总结 - 自定义 Prometheus
【K8S实战系列-清理篇1】k8s docker 删除没用的资源
Flannel Pod Bug汇总
Java
Jdk 部署
JDK部署
java线程池ThreadPoolExecutor类使用详解 - bigfan - 博客园
ShardingJDBC实现多数据库节点分库分表 - 墨天轮
Maven Repository: Search/Browse/Explore
其他
Git在阿里,我们如何管理代码分支?
chrome F12调试网页出现Paused in debugger
体验IntelliJ IDEA的远程开发(Remote Development) - 掘金
Idea远程调试
PDF转MD
强哥分享干货
优秀开源项目集合
vercel 配合Github 搭建项目Doc门户
如何用 Github Issues 写技术博客?
Idea 2021.3 Maven 3.8.1 报错 Blocked mirror for repositories 解决
列出maven依赖
[2022-09 持续更新] 谷歌 google 镜像 / Sci-Hub 可用网址 / Github 镜像可用网址总结
阿里云ECS迁移
linux访问github
一文教你使用 Docker 启动并安装 Nacos-腾讯云开发者社区-腾讯云
Nginx
Nginx 部署
Nginx 部署安装
Nginx反向代理cookie丢失的问题_longzhoufeng的博客-CSDN博客_nginx 代理后cookie丢失
Linux 系统 Https 证书生成与Nginx配置 https
数据仓库
实时数仓
松果出行 x StarRocks:实时数仓新范式的实践之路
实时数据仓库的一些分层和分层需要处理的事情,以及数据流向
湖仓一体电商项目
湖仓一体电商项目(一):项目背景和架构介绍
湖仓一体电商项目(二):项目使用技术及版本和基础环境准备
湖仓一体电商项目(三):3万字带你从头开始搭建12个大数据项目基础组件
数仓笔记
数仓学习总结
数仓常用平台和框架
数仓学习笔记
数仓技术选型
尚硅谷教程
尚硅谷学习笔记
尚硅谷所有已知的课件资料
尚硅谷大数据项目之尚品汇(11数据质量管理V4.0)
尚硅谷大数据项目之尚品汇(10元数据管理AtlasV4.0)
尚硅谷大数据项目之尚品汇(9权限管理RangerV4.0)
尚硅谷大数据项目之尚品汇(8安全环境实战V4.0)
尚硅谷大数据项目之尚品汇(7用户认证KerberosV4.1)
尚硅谷大数据项目之尚品汇(6集群监控ZabbixV4.1)
尚硅谷大数据项目之尚品汇(5即席查询PrestoKylinV4.0)
尚硅谷大数据项目之尚品汇(4可视化报表SupersetV4.0)
尚硅谷大数据项目之尚品汇(3数据仓库系统)V4.2.0
尚硅谷大数据项目之尚品汇(2业务数据采集平台)V4.1.0
尚硅谷大数据项目之尚品汇(1用户行为采集平台)V4.1.0
数仓治理
数据中台 元数据规范
数据中台的那些 “经验与陷阱”
2万字详解数据仓库数据指标数据治理体系建设方法论
数据仓库,为什么需要分层建设和管理? | 人人都是产品经理
网易数帆数据治理演进
数仓技术
一文看懂大数据生态圈完整知识体系
阿里云—升舱 - 数据仓库升级白皮书
最全企业级数仓建设迭代版(4W字建议收藏)
基于Hue,Dolphinscheduler,HIVE分析数据仓库层级实现及项目需求案例实践分析
详解数据仓库分层架构
数据仓库技术细节
大数据平台组件介绍
总览 2016-2021 年全球机器学习、人工智能和大数据行业技术地图
Apache DolphinScheduler 3.0.0 正式版发布!
数据仓库面试题——介绍下数据仓库
数据仓库为什么要分层,各层的作用是什么
Databend v0.8 发布,基于 Rust 开发的现代化云数据仓库 - OSCHINA - 中文开源技术交流社区
数据中台
数据中台设计
大数据同步工具之 FlinkCDC/Canal/Debezium 对比
有数数据开发平台文档
Shell
Linux Shell 命令参数
shell 脚本编程
一篇教会你写 90% 的 Shell 脚本
Kibana
Kibana 查询语言(KQL)
Kibana:在 Kibana 中的四种表格制作方式
Kafka
Kafka部署
canal 动态监控 Mysql,将 binlog 日志解析后,把采集到的数据发送到 Kafka
OpenApi
OpenAPI 标准规范,了解一下?
OpenApi学术论文
贵阳市政府数据开放平台设计与实现
OpenAPI简介
开放平台:运营模式与技术架构研究综述
管理
技术部门Leader是不是一定要技术大牛担任?
华为管理体系流程介绍
DevOps
*Ops
XOps 已经成为一个流行的术语 - 它是什么?
Practical Linux DevOps
Jenkins 2.x实践指南 (翟志军)
Jenkins 2权威指南 ((美)布伦特·莱斯特(Brent Laster)
DevOps组件高可用的思路
KeepAlived
VIP + KEEPALIVED + LVS 遇到Connection Peer的问题的解决
MinIO
MinIO部署
Minio 分布式集群搭建部署
Minio 入门系列【16】Minio 分片上传文件 putObject 接口流程源码分析
MinioAPI 浅入及问题
部署 minio 兼容 aws S3 模式
超详细分布式对象存储 MinIO 实战教程
Hadoop
Hadoop 部署
Hadoop集群部署
windows 搭建 hadoop 环境(解决 HADOOP_HOME and hadoop.home.dir are unset
Hadoop 集群搭建和简单应用(参考下文)
Hadoop 启动 NameNode 报错 ERROR: Cannot set priority of namenode process 2639
jps 命令查看 DataNode 进程不见了 (hadoop3.0 亲测可用)
hadoop 报错: Operation category READ is not supported in state standby
Spark
Spark 部署
Spark 集群部署
spark 心跳超时分析 Cannot receive any reply in 120 seconds
Spark学习笔记
apache spark - Failed to find data source: parquet, when building with sbt assembly
Spark Thrift Server 架构和原理介绍
InLong
InLong 部署
Apache InLong部署文档
安装部署 - Docker 部署 - 《Apache InLong v1.2 中文文档》 - 书栈网 · BookStack
基于 Apache Flink SQL 的 InLong Sort ETL 方案解析
关于 Apache Pulsar 在 Apache InLong 接入数据
zookeeper
zookeeper 部署
使用 Docker 搭建 Zookeeper 集群
美团技术团队
StarRocks
StarRocks技术白皮书(在线版)
JuiceFS
AI 场景存储优化:云知声超算平台基于 JuiceFS 的存储实践
JuiceFS 在 Elasticsearch/ClickHouse 温冷数据存储中的实践
JuiceFS format
元数据备份和恢复 | JuiceFS Document Center
JuiceFS 元数据引擎选型指南
Apache Hudi 使用文件聚类功能 (Clustering) 解决小文件过多的问题
普罗米修斯
k8s 之 Prometheus(普罗米修斯)监控,简单梳理下 K8S 监控流程
k8s 部署 - 使用helm3部署监控prometheus(普罗米修斯),从零到有,一文搞定
k8s 部署 - 使用 helm3 部署监控 prometheus(普罗米修斯),从零到有,一文搞定
k8s 部署 - 如何完善 k8s 中 Prometheus(普罗米修斯)监控项目呢?
k8s 部署 - k8s 中 Prometheus(普罗米修斯)的大屏展示 Grafana + 监控报警
zabbix
一文带你掌握 Zabbix 监控系统
Stream Collectors
Nvidia
Nvidia API
CUDA Nvidia驱动安装
NVIDIA驱动失效简单解决方案:NVIDIA-SMI has failed because it couldn‘t communicate with the NVIDIA driver.
ubuntu 20 CUDA12.1安装流程
nvidia开启持久化模式
nvidia-smi 开启持久化
Harbor
Harbor部署文档
Docker 爆出 it doesn't contain any IP SANs
pandoc
其他知识
大模型
COS 597G (Fall 2022): Understanding Large Language Models
如何优雅的使用各类LLM
ChatGLM3在线搜索功能升级
当ChatGLM3能用搜索引擎时
OCR神器,PDF、数学公式都能转
Stable Diffusion 动画animatediff-cli-prompt-travel
基于ERNIE Bot自定义虚拟数字人生成
pika负面提示词
开通GPT4的方式
GPT4网站
低价开通GPT Plus
大模型应用场景分享
AppAgent AutoGPT变体
机器学习
最大似然估计
权衡偏差(Bias)和方差(Variance)以最小化均方误差(Mean Squared Error, MSE)
伯努利分布
方差计算公式
均值的高斯分布估计
没有免费午餐定理
贝叶斯误差
非参数模型
最近邻回归
表示容量
最优容量
权重衰减
正则化项
Sora
Sora官方提示词
看完32篇论文,你大概就知道Sora如何炼成? |【经纬低调出品】
Sora论文
Sora 物理悖谬的几何解释
Sora 技术栈讨论
RAG垂直落地
DB-GPT与TeleChat-7B搭建相关RAG知识库
ChatWithRTX
ChatRTX安装教程
ChatWithRTX 踩坑记录
ChatWithRTX 使用其他量化模型
ChatWithRTX介绍
RAG 相关资料
英伟达—大模型结合 RAG 构建客服场景自动问答
又一大模型技术开源!有道自研RAG引擎QAnything正式开放下载
收藏!RAG入门参考资料开源大总结:RAG综述、介绍、比较、预处理、RAG Embedding等
RAG调研
解决现代RAG实际生产问题
解决现代 RAG 系统中的生产问题-II
Modular RAG and RAG Flow: Part Ⅰ
Modular RAG and RAG Flow: Part II
先进的Retriever技术来增强你的RAGs
高级RAG — 使用假设文档嵌入 (HyDE) 改进检索
提升 RAG:选择最佳嵌入和 Reranker 模型
LangGraph
增强型RAG:re-rank
LightRAG:使用 PyTorch 为 LLM 应用程序提供支持
模型训练
GPU相关资料
[教程] conda安装简明教程(基于miniconda和Windows)
PyTorch CUDA对应版本 | PyTorch
资料
李一舟课程全集
零碎资料
苹果各服共享ID
数据中心网络技术概览
华为大模型训练学习笔记
百度AIGC工程师认证考试答案(可换取工信部证书)
百度智能云生成式AI认证工程师 考试和证书查询指南
深入理解 Megatron-LM(1)基础知识
QAnything
接入QAnything的AI问答知识库,可私有化部署的企业级WIKI知识库
wsl --update失效Error code: Wsl/UpdatePackage/0x80240438的解决办法
Docker Desktop 启动docker engine一直转圈解决方法
win10开启了hyper-v,docker 启动还是报错 docker desktop windows hypervisor is not present
WSL虚拟磁盘过大,ext4迁移 Windows 中创建软链接和硬链接
WSL2切换默认的Linux子系统
Windows的WSL子系统,自动开启sshd服务
新版docker desktop设置wsl(使用windown的子系统)
WSL 开启ssh
Windows安装网易开源QAnything打造智能客服系统
芯片
国内互联网大厂自研芯片梳理
超算平台—算力供应商
Linux 磁盘扩容
Linux使用growpart工具进行磁盘热扩容(非LVM扩容方式)
关于centos7 扩容提示no tools available to resize disk with 'gpt' - o夜雨随风o - 博客园
(小插曲)neo4j配置apoc插件后检查版本发现:Unknown function ‘apoc.version‘ “EXPLAIN RETURN apoc.version()“
vfio-pci与igb_uio映射硬件资源到DPDK的流程分析
KubeVirt
vnc server配置、启动、重启与连接 - 王约翰 - 博客园
虚拟机Bug解决方案
kubevirt 如何通过CDI上传镜像文件
在 K8S 上也能跑 VM!KubeVirt 簡介與建立(部署篇) | Cloud Solutions
KubeVirt 04:容器化数据导入 – 小菜园
Python
安装 flash_attn
手把手教你在linux上安装pytorch与cuda
AI
在启智社区基于PyTorch运行国产算力卡的模型训练实验
Scaling law
免费的GPT3.5 API
AI Engineer Roadmap & Resources 🤖
模型排行
-
+
首页
canal 动态监控 Mysql,将 binlog 日志解析后,把采集到的数据发送到 Kafka
> 本文由 [简悦 SimpRead](http://ksria.com/simpread/) 转码, 原文地址 [www.tuicool.com](https://www.tuicool.com/articles/b22ui2E) 生产者要将发送的数据转化为字节数组才能通过网络发动给 Kafka,对于一些简单的数据,Kafka 自带了一些序列化工具。 ``` //创建生产者实例 private static Producer<String , String> createProducer(){ Properties properties = new Properties(); properties.put("metadata.broker.list" , GlobalConfigUtil.kafkaBootstrap); properties.put("zookeeper.connect" , GlobalConfigUtil.kafkaZookeeper); properties.put("serializer.class" , StringEncoder.class.getName()); return new Producer<String, String>(new ProducerConfig(properties)); } ``` 在通常的微服务中,服务之间需要频繁的传递各种负责的数据结构,但是 kafka 仅仅支持简单的类型如 String,Integer。于是我们在服务之间使用 JSONObject,因为 JSON 可以很容易的转化为 String,而 String 的序列化和反序列化已经被支持。 ``` JSONObject jsonObject = new JSONObject(); jsonObject.put("logFileName", logFileName); jsonObject.put("logFileOffset", logFileOffset); jsonObject.put("dbName", dbName); jsonObject.put("tableName", tableName); jsonObject.put("eventType", eventType); jsonObject.put("columnValueList", columnValueList); jsonObject.put("emptyCount", emptyCount); jsonObject.put("timestamp", timestamp); //拼接所有binlog解析的字段 String data = JSON.toJSONString(jsonObject); // 解析后的数据发送到kafka KafkaSender.sendMessage(GlobalConfigUtil.kafkaInput, JSON.toJSONString(key), data); ``` ResourceBundle 类是用来读取 propertise 资源文件的,可以在初始化时把配置项全部一次读入,并保存在静态成员变量中。避免每次需要的时候才去读取相关配置文件的 class,I/O 速度慢,容易造成性能上的瓶颈。 ``` //读取application.properties文件 private static ResourceBundle resourceBundle = ResourceBundle.getBundle("application"); public static String canalHost= resourceBundle.getString("canal.host"); public static String canalPort = resourceBundle.getString("canal.port"); public static String canalInstance = resourceBundle.getString("canal.instance"); public static String mysqlUsername = resourceBundle.getString("mysql.username"); public static String mysqlPassword= resourceBundle.getString("mysql.password"); public static String kafkaBootstrap= resourceBundle.getString("kafka.bootstrap.servers"); public static String kafkaZookeeper= resourceBundle.getString("kafka.zookeeper.connect"); public static String kafkaInput = resourceBundle.getString("kafka.input.topic"); ``` #### 完整代码 ``` #pom文件 <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.0.24</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.9.0.1</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <!--对象和json 互相转换的--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.44</version> </dependency> ``` ``` import java.util.Locale; import java.util.ResourceBundle; /** * 配置文件的公共类 */ public class GlobalConfigUtil { //读取application.properties文件 private static ResourceBundle resourceBundle = ResourceBundle.getBundle("application"); public static String canalHost= resourceBundle.getString("canal.host"); public static String canalPort = resourceBundle.getString("canal.port"); public static String canalInstance = resourceBundle.getString("canal.instance"); public static String mysqlUsername = resourceBundle.getString("mysql.username"); public static String mysqlPassword= resourceBundle.getString("mysql.password"); public static String kafkaBootstrap= resourceBundle.getString("kafka.bootstrap.servers"); public static String kafkaZookeeper= resourceBundle.getString("kafka.zookeeper.connect"); public static String kafkaInput = resourceBundle.getString("kafka.input.topic"); public static void main(String[] args) { System.out.println(canalHost); } } ``` ``` import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import kafka.serializer.StringEncoder; import java.util.Properties; /** * Kafka生产消息工具类 */ public class KafkaSender { private String topic; public KafkaSender(String topic){ super(); this.topic = topic; } /** * 发送消息到Kafka指定topic * * @param topic topic名字 * @param key 键值 * @param data 数据 */ public static void sendMessage(String topic , String key , String data){ Producer<String, String> producer = createProducer(); producer.send(new KeyedMessage<String , String>(topic , key , data)); } /** * 创建生产者实例 * @return */ private static Producer<String , String> createProducer(){ Properties properties = new Properties(); properties.put("metadata.broker.list" , GlobalConfigUtil.kafkaBootstrap); properties.put("zookeeper.connect" , GlobalConfigUtil.kafkaZookeeper); properties.put("serializer.class" , StringEncoder.class.getName()); return new Producer<String, String>(new ProducerConfig(properties)); } } ``` ``` import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.UUID; /** * Canal解析binlog日志工具类 */ public class CanalClient { static class ColumnValuePair { private String columnName; private String columnValue; private Boolean isValid; public ColumnValuePair(String columnName, String columnValue, Boolean isValid) { this.columnName = columnName; this.columnValue = columnValue; this.isValid = isValid; } public String getColumnName() { return columnName; } public void setColumnName(String columnName) { this.columnName = columnName; } public String getColumnValue() { return columnValue; } public void setColumnValue(String columnValue) { this.columnValue = columnValue; } public Boolean getIsValid() { return isValid; } public void setIsValid(Boolean isValid) { this.isValid = isValid; } } /** * 获取Canal连接 * * @param host 主机名 * @param port 端口号 * @param instance Canal实例名 * @param username 用户名 * @param password 密码 * @return Canal连接器 */ public static CanalConnector getConn(String host, int port, String instance, String username, String password) { CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password); return canalConnector; } /** * 解析Binlog日志 * * @param entries Binlog消息实体 * @param emptyCount 操作的序号 */ public static void analysis(List<CanalEntry.Entry> entries, int emptyCount) { for (CanalEntry.Entry entry : entries) { // 只解析mysql事务的操作,其他的不解析 if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } // 那么解析binlog CanalEntry.RowChange rowChange = null; try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { e.printStackTrace(); } // 获取操作类型字段(增加 删除 修改) CanalEntry.EventType eventType = rowChange.getEventType(); // 获取binlog文件名称 String logfileName = entry.getHeader().getLogfileName(); // 读取当前操作在binlog文件的位置 long logfileOffset = entry.getHeader().getLogfileOffset(); // 获取当前操作所属的数据库 String dbName = entry.getHeader().getSchemaName(); // 获取当前操作所属的表 String tableName = entry.getHeader().getTableName();//当前操作的是哪一张表 long timestamp = entry.getHeader().getExecuteTime();//执行时间 // 解析操作的行数据 for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { // 删除操作 if (eventType == CanalEntry.EventType.DELETE) { // 获取删除之前的所有列数据 dataDetails(rowData.getBeforeColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp); } // 新增操作 else if (eventType == CanalEntry.EventType.INSERT) { // 获取新增之后的所有列数据 dataDetails(rowData.getAfterColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp); } // 更新操作 else { // 获取更新之后的所有列数据 dataDetails(rowData.getAfterColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp); } } } } /** * 解析具体一条Binlog消息的数据 * * @param columns 当前行所有的列数据 * @param logFileName binlog文件名 * @param logFileOffset 当前操作在binlog中的位置 * @param dbName 当前操作所属数据库名称 * @param tableName 当前操作所属表名称 * @param eventType 当前操作类型(新增、修改、删除) * @param emptyCount 操作的序号 */ private static void dataDetails(List<CanalEntry.Column> columns, String logFileName, Long logFileOffset, String dbName, String tableName, CanalEntry.EventType eventType, int emptyCount, long timestamp) { // 找到当前那些列发生了改变 以及改变的值 List<ColumnValuePair> columnValueList = new ArrayList<ColumnValuePair>(); for (CanalEntry.Column column : columns) { ColumnValuePair columnValuePair = new ColumnValuePair(column.getName(), column.getValue(), column.getUpdated()); columnValueList.add(columnValuePair); } String key = UUID.randomUUID().toString(); JSONObject jsonObject = new JSONObject(); // jsonObject.put("logFileName", logFileName); // jsonObject.put("logFileOffset", logFileOffset); jsonObject.put("dbName", dbName); jsonObject.put("tableName", tableName); jsonObject.put("eventType", eventType); jsonObject.put("columnValueList", columnValueList); // jsonObject.put("emptyCount", emptyCount); // jsonObject.put("timestamp", timestamp); // 拼接所有binlog解析的字段 String data = JSON.toJSONString(jsonObject); System.out.println("【JSON】" + data); // 解析后的数据发送到kafka KafkaSender.sendMessage(GlobalConfigUtil.kafkaInput, JSON.toJSONString(key), data); } /** * 客户端入口方法 * @param args */ public static void main(String[] args) { // 加载配置文件 String host = GlobalConfigUtil.canalHost; int port = Integer.parseInt(GlobalConfigUtil.canalPort); String instance = GlobalConfigUtil.canalInstance; String username = GlobalConfigUtil.mysqlUsername; String password = GlobalConfigUtil.mysqlPassword; // 获取Canal连接 CanalConnector conn = getConn(host, port, instance, username, password); // 从binlog中读取数据 int batchSize = 100; int emptyCount = 1; try { conn.connect(); conn.subscribe(".*..*"); conn.rollback(); int totalCount = 120; //循环次数 while (emptyCount < totalCount) { // 获取数据 Message message = conn.getWithoutAck(batchSize); long id = message.getId(); int size = message.getEntries().size(); if (id == -1 || size == 0) { emptyCount=0; //没有读取到任何数据 System.out.println("目前没有读取到任何数据..."); } else { //有数据,那么解析binlog日志 analysis(message.getEntries(), emptyCount); emptyCount++; } // 确认消息 conn.ack(message.getId()); } } catch (Exception e) { e.printStackTrace(); } finally { conn.disconnect(); } } } ``` ``` #application.properties, 以下请更改为自已的数据库信息 canal.host=xxx.xx.xxx.xxx canal.port=11111 canal.instance=example mysql.username=root mysql.password=xxxxxx kafka.bootstrap.servers = xxx.xx.xxx.xxx:9092 kafka.zookeeper.connect = xxx.xx.xxx.xxx:2182 kafka.input.topic=test ``` 具体代码请移步: [SimpleMysqlCanalKafkaSample](https://github.com/ShawnVanorGit/hello_mysql_canal_kafka)
yg9538
Aug. 2, 2022, 1:32 p.m.
387
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
PDF文档
PDF文档(打印)
分享
链接
类型
密码
更新密码