Tips
Go
(18条消息) Go语言自学系列 | golang包_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang并发编程之channel的遍历_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang并发编程之select switch_COCOgsta的博客-CSDN博客_golang select switch
(18条消息) Go语言自学系列 | golang并发编程之runtime包_COCOgsta的博客-CSDN博客_golang runtime包
(18条消息) Go语言自学系列 | golang接口值类型接收者和指针类型接收者_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang并发编程之Timer_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang方法_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang并发编程之WaitGroup实现同步_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang构造函数_COCOgsta的博客-CSDN博客_golang 构造函数
(18条消息) Go语言自学系列 | golang方法接收者类型_COCOgsta的博客-CSDN博客_golang 方法接收者
(18条消息) Go语言自学系列 | golang接口_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang接口和类型的关系_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang结构体_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang结构体_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang标准库os模块 - File文件读操作_COCOgsta的博客-CSDN博客_golang os.file
(18条消息) Go语言自学系列 | golang继承_COCOgsta的博客-CSDN博客_golang 继承
(18条消息) Go语言自学系列 | golang嵌套结构体_COCOgsta的博客-CSDN博客_golang 结构体嵌套
(18条消息) Go语言自学系列 | golang并发编程之Mutex互斥锁实现同步_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang并发变成之通道channel_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang并发编程之原子操作详解_COCOgsta的博客-CSDN博客_golang 原子操作
(18条消息) Go语言自学系列 | golang并发编程之原子变量的引入_COCOgsta的博客-CSDN博客_go 原子变量
(18条消息) Go语言自学系列 | golang并发编程之协程_COCOgsta的博客-CSDN博客_golang 协程 并发
(18条消息) Go语言自学系列 | golang接口嵌套_COCOgsta的博客-CSDN博客_golang 接口嵌套
(18条消息) Go语言自学系列 | golang包管理工具go module_COCOgsta的博客-CSDN博客_golang 包管理器
(18条消息) Go语言自学系列 | golang标准库os模块 - File文件写操作_COCOgsta的博客-CSDN博客_go os模块
(18条消息) Go语言自学系列 | golang结构体的初始化_COCOgsta的博客-CSDN博客_golang 结构体初始化
(18条消息) Go语言自学系列 | golang通过接口实现OCP设计原则_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang标准库os包进程相关操作_COCOgsta的博客-CSDN博客_golang os包
(18条消息) Go语言自学系列 | golang标准库ioutil包_COCOgsta的博客-CSDN博客_golang ioutil
(18条消息) Go语言自学系列 | golang标准库os模块 - 文件目录相关_COCOgsta的博客-CSDN博客_go语言os库
Golang技术栈,Golang文章、教程、视频分享!
(18条消息) Go语言自学系列 | golang结构体指针_COCOgsta的博客-CSDN博客_golang 结构体指针
Ansible
太厉害了,终于有人能把Ansible讲的明明白白了,建议收藏_互联网老辛
ansible.cfg配置详解
Docker
Docker部署
linux安装docker和Docker Compose
linux 安装 docker
Docker中安装Docker遇到的问题处理
Docker常用命令
docker常用命令小结
docker 彻底卸载
Docker pull 时报错:Get https://registry-1.docker.io/v2/library/mysql: net/http: TLS handshake timeout
Docker 拉镜像无法访问 registry-x.docker.io 问题(Centos7)
docker 容器内没有权限
Linux中关闭selinux的方法是什么?
docker run 生成 docker-compose
Docker覆盖网络部署
docker pull后台拉取镜像
docker hub
Redis
Redis 集群别乱搭,这才是正确的姿势
linux_离线_redis安装
怎么实现Redis的高可用?(主从、哨兵、集群) - 雨点的名字 - 博客园
redis集群离线安装
always-show-logo yes
Redis集群搭建及原理
[ERR] Node 172.168.63.202:7001 is not empty. Either the nodealready knows other nodes (check with CLUSTER NODES) or contains some - 亲爱的不二999 - 博客园
Redis daemonize介绍
redis 下载地址
Redis的redis.conf配置注释详解(三) - 云+社区 - 腾讯云
Redis的redis.conf配置注释详解(一) - 云+社区 - 腾讯云
Redis的redis.conf配置注释详解(二) - 云+社区 - 腾讯云
Redis的redis.conf配置注释详解(四) - 云+社区 - 腾讯云
Linux
在终端连接ssh的断开关闭退出的方法
漏洞扫描 - 灰信网(软件开发博客聚合)
find 命令的参数详解
vim 编辑器搜索功能
非root安装rpm时,mockbuild does not exist
Using a SSH password instead of a key is not possible because Host Key checking
(9条消息) 安全扫描5353端口mDNS服务漏洞问题_NamiJava的博客-CSDN博客_5353端口
Linux中使用rpm命令安装rpm包
ssh-copy-id非22端口的使用方法
How To Resolve SSH Weak Key Exchange Algorithms on CentOS7 or RHEL7 - infotechys.com
Linux cp 命令
yum 下载全量依赖 rpm 包及离线安装(终极解决方案) - 叨叨软件测试 - 博客园
How To Resolve SSH Weak Key Exchange Algorithms on CentOS7 or RHEL7 - infotechys.com
RPM zlib 下载地址
运维架构网站
欢迎来到 Jinja2
/usr/local/bin/ss-server -uv -c /etc/shadowsocks-libev/config.json -f /var/run/s
ruby 安装Openssl 默认安装位置
Linux 常用命令学习 | 菜鸟教程
linux 重命名文件和文件夹
linux命令快速指南
ipvsadm
Linux 下查找日志中的关键字
Linux 切割大 log 日志
CentOS7 关于网络的设置
rsync 命令_Linux rsync 命令用法详解:远程数据同步工具
linux 可视化界面安装
[问题已处理]-执行yum卡住无响应
GCC/G++升级高版本
ELK
Docker部署ELK
ELK+kafka+filebeat+Prometheus+Grafana - SegmentFault 思否
(9条消息) Elasticsearch设置账号密码_huas_xq的博客-CSDN博客_elasticsearch设置密码
Elasticsearch 7.X 性能优化
Elasticsearch-滚动更新
Elasticsearch 的内存优化_大数据系统
Elasticsearch之yml配置文件
ES 索引为Yellow状态
Logstash:Grok filter 入门
logstash grok 多项匹配
Mysql
Mysql相关Tip
基于ShardingJDBC实现数据库读写分离 - 墨天轮
MySQL-MHA高可用方案
京东三面:我要查询千万级数据量的表,怎么操作?
OpenStack
(16条消息) openstack项目中遇到的各种问题总结 其二(云主机迁移、ceph及扩展分区)_weixin_34104341的博客-CSDN博客
OpenStack组件介绍
百度大佬OpenStack流程
openstack各组件介绍
OpenStack生产实际问题总结(一)
OpenStack Train版离线部署
使用Packstack搭建OpenStack
K8S
K8S部署
K8S 集群部署
kubeadm 重新 init 和 join-pudn.com
Kubernetes 实战总结 - 阿里云 ECS 自建 K8S 集群 Kubernetes 实战总结 - 自定义 Prometheus
【K8S实战系列-清理篇1】k8s docker 删除没用的资源
Flannel Pod Bug汇总
Java
Jdk 部署
JDK部署
java线程池ThreadPoolExecutor类使用详解 - bigfan - 博客园
ShardingJDBC实现多数据库节点分库分表 - 墨天轮
Maven Repository: Search/Browse/Explore
其他
Git在阿里,我们如何管理代码分支?
chrome F12调试网页出现Paused in debugger
体验IntelliJ IDEA的远程开发(Remote Development) - 掘金
Idea远程调试
PDF转MD
强哥分享干货
优秀开源项目集合
vercel 配合Github 搭建项目Doc门户
如何用 Github Issues 写技术博客?
Idea 2021.3 Maven 3.8.1 报错 Blocked mirror for repositories 解决
列出maven依赖
[2022-09 持续更新] 谷歌 google 镜像 / Sci-Hub 可用网址 / Github 镜像可用网址总结
阿里云ECS迁移
linux访问github
一文教你使用 Docker 启动并安装 Nacos-腾讯云开发者社区-腾讯云
Nginx
Nginx 部署
Nginx 部署安装
Nginx反向代理cookie丢失的问题_longzhoufeng的博客-CSDN博客_nginx 代理后cookie丢失
Linux 系统 Https 证书生成与Nginx配置 https
数据仓库
实时数仓
松果出行 x StarRocks:实时数仓新范式的实践之路
实时数据仓库的一些分层和分层需要处理的事情,以及数据流向
湖仓一体电商项目
湖仓一体电商项目(一):项目背景和架构介绍
湖仓一体电商项目(二):项目使用技术及版本和基础环境准备
湖仓一体电商项目(三):3万字带你从头开始搭建12个大数据项目基础组件
数仓笔记
数仓学习总结
数仓常用平台和框架
数仓学习笔记
数仓技术选型
尚硅谷教程
尚硅谷学习笔记
尚硅谷所有已知的课件资料
尚硅谷大数据项目之尚品汇(11数据质量管理V4.0)
尚硅谷大数据项目之尚品汇(10元数据管理AtlasV4.0)
尚硅谷大数据项目之尚品汇(9权限管理RangerV4.0)
尚硅谷大数据项目之尚品汇(8安全环境实战V4.0)
尚硅谷大数据项目之尚品汇(7用户认证KerberosV4.1)
尚硅谷大数据项目之尚品汇(6集群监控ZabbixV4.1)
尚硅谷大数据项目之尚品汇(5即席查询PrestoKylinV4.0)
尚硅谷大数据项目之尚品汇(4可视化报表SupersetV4.0)
尚硅谷大数据项目之尚品汇(3数据仓库系统)V4.2.0
尚硅谷大数据项目之尚品汇(2业务数据采集平台)V4.1.0
尚硅谷大数据项目之尚品汇(1用户行为采集平台)V4.1.0
数仓治理
数据中台 元数据规范
数据中台的那些 “经验与陷阱”
2万字详解数据仓库数据指标数据治理体系建设方法论
数据仓库,为什么需要分层建设和管理? | 人人都是产品经理
网易数帆数据治理演进
数仓技术
一文看懂大数据生态圈完整知识体系
阿里云—升舱 - 数据仓库升级白皮书
最全企业级数仓建设迭代版(4W字建议收藏)
基于Hue,Dolphinscheduler,HIVE分析数据仓库层级实现及项目需求案例实践分析
详解数据仓库分层架构
数据仓库技术细节
大数据平台组件介绍
总览 2016-2021 年全球机器学习、人工智能和大数据行业技术地图
Apache DolphinScheduler 3.0.0 正式版发布!
数据仓库面试题——介绍下数据仓库
数据仓库为什么要分层,各层的作用是什么
Databend v0.8 发布,基于 Rust 开发的现代化云数据仓库 - OSCHINA - 中文开源技术交流社区
数据中台
数据中台设计
大数据同步工具之 FlinkCDC/Canal/Debezium 对比
有数数据开发平台文档
Shell
Linux Shell 命令参数
shell 脚本编程
一篇教会你写 90% 的 Shell 脚本
Kibana
Kibana 查询语言(KQL)
Kibana:在 Kibana 中的四种表格制作方式
Kafka
Kafka部署
canal 动态监控 Mysql,将 binlog 日志解析后,把采集到的数据发送到 Kafka
OpenApi
OpenAPI 标准规范,了解一下?
OpenApi学术论文
贵阳市政府数据开放平台设计与实现
OpenAPI简介
开放平台:运营模式与技术架构研究综述
管理
技术部门Leader是不是一定要技术大牛担任?
华为管理体系流程介绍
DevOps
*Ops
XOps 已经成为一个流行的术语 - 它是什么?
Practical Linux DevOps
Jenkins 2.x实践指南 (翟志军)
Jenkins 2权威指南 ((美)布伦特·莱斯特(Brent Laster)
DevOps组件高可用的思路
KeepAlived
VIP + KEEPALIVED + LVS 遇到Connection Peer的问题的解决
MinIO
MinIO部署
Minio 分布式集群搭建部署
Minio 入门系列【16】Minio 分片上传文件 putObject 接口流程源码分析
MinioAPI 浅入及问题
部署 minio 兼容 aws S3 模式
超详细分布式对象存储 MinIO 实战教程
Hadoop
Hadoop 部署
Hadoop集群部署
windows 搭建 hadoop 环境(解决 HADOOP_HOME and hadoop.home.dir are unset
Hadoop 集群搭建和简单应用(参考下文)
Hadoop 启动 NameNode 报错 ERROR: Cannot set priority of namenode process 2639
jps 命令查看 DataNode 进程不见了 (hadoop3.0 亲测可用)
hadoop 报错: Operation category READ is not supported in state standby
Spark
Spark 部署
Spark 集群部署
spark 心跳超时分析 Cannot receive any reply in 120 seconds
Spark学习笔记
apache spark - Failed to find data source: parquet, when building with sbt assembly
Spark Thrift Server 架构和原理介绍
InLong
InLong 部署
Apache InLong部署文档
安装部署 - Docker 部署 - 《Apache InLong v1.2 中文文档》 - 书栈网 · BookStack
基于 Apache Flink SQL 的 InLong Sort ETL 方案解析
关于 Apache Pulsar 在 Apache InLong 接入数据
zookeeper
zookeeper 部署
使用 Docker 搭建 Zookeeper 集群
美团技术团队
StarRocks
StarRocks技术白皮书(在线版)
JuiceFS
AI 场景存储优化:云知声超算平台基于 JuiceFS 的存储实践
JuiceFS 在 Elasticsearch/ClickHouse 温冷数据存储中的实践
JuiceFS format
元数据备份和恢复 | JuiceFS Document Center
JuiceFS 元数据引擎选型指南
Apache Hudi 使用文件聚类功能 (Clustering) 解决小文件过多的问题
普罗米修斯
k8s 之 Prometheus(普罗米修斯)监控,简单梳理下 K8S 监控流程
k8s 部署 - 使用helm3部署监控prometheus(普罗米修斯),从零到有,一文搞定
k8s 部署 - 使用 helm3 部署监控 prometheus(普罗米修斯),从零到有,一文搞定
k8s 部署 - 如何完善 k8s 中 Prometheus(普罗米修斯)监控项目呢?
k8s 部署 - k8s 中 Prometheus(普罗米修斯)的大屏展示 Grafana + 监控报警
zabbix
一文带你掌握 Zabbix 监控系统
Stream Collectors
Nvidia
Nvidia API
CUDA Nvidia驱动安装
NVIDIA驱动失效简单解决方案:NVIDIA-SMI has failed because it couldn‘t communicate with the NVIDIA driver.
ubuntu 20 CUDA12.1安装流程
nvidia开启持久化模式
nvidia-smi 开启持久化
Harbor
Harbor部署文档
Docker 爆出 it doesn't contain any IP SANs
pandoc
其他知识
大模型
COS 597G (Fall 2022): Understanding Large Language Models
如何优雅的使用各类LLM
ChatGLM3在线搜索功能升级
当ChatGLM3能用搜索引擎时
OCR神器,PDF、数学公式都能转
Stable Diffusion 动画animatediff-cli-prompt-travel
基于ERNIE Bot自定义虚拟数字人生成
pika负面提示词
开通GPT4的方式
GPT4网站
低价开通GPT Plus
大模型应用场景分享
AppAgent AutoGPT变体
机器学习
最大似然估计
权衡偏差(Bias)和方差(Variance)以最小化均方误差(Mean Squared Error, MSE)
伯努利分布
方差计算公式
均值的高斯分布估计
没有免费午餐定理
贝叶斯误差
非参数模型
最近邻回归
表示容量
最优容量
权重衰减
正则化项
Sora
Sora官方提示词
看完32篇论文,你大概就知道Sora如何炼成? |【经纬低调出品】
Sora论文
Sora 物理悖谬的几何解释
Sora 技术栈讨论
RAG垂直落地
DB-GPT与TeleChat-7B搭建相关RAG知识库
ChatWithRTX
ChatRTX安装教程
ChatWithRTX 踩坑记录
ChatWithRTX 使用其他量化模型
ChatWithRTX介绍
RAG 相关资料
英伟达—大模型结合 RAG 构建客服场景自动问答
又一大模型技术开源!有道自研RAG引擎QAnything正式开放下载
收藏!RAG入门参考资料开源大总结:RAG综述、介绍、比较、预处理、RAG Embedding等
RAG调研
解决现代RAG实际生产问题
解决现代 RAG 系统中的生产问题-II
Modular RAG and RAG Flow: Part Ⅰ
Modular RAG and RAG Flow: Part II
先进的Retriever技术来增强你的RAGs
高级RAG — 使用假设文档嵌入 (HyDE) 改进检索
提升 RAG:选择最佳嵌入和 Reranker 模型
LangGraph
增强型RAG:re-rank
LightRAG:使用 PyTorch 为 LLM 应用程序提供支持
模型训练
GPU相关资料
[教程] conda安装简明教程(基于miniconda和Windows)
PyTorch CUDA对应版本 | PyTorch
资料
李一舟课程全集
零碎资料
苹果各服共享ID
数据中心网络技术概览
华为大模型训练学习笔记
百度AIGC工程师认证考试答案(可换取工信部证书)
百度智能云生成式AI认证工程师 考试和证书查询指南
深入理解 Megatron-LM(1)基础知识
QAnything
接入QAnything的AI问答知识库,可私有化部署的企业级WIKI知识库
wsl --update失效Error code: Wsl/UpdatePackage/0x80240438的解决办法
Docker Desktop 启动docker engine一直转圈解决方法
win10开启了hyper-v,docker 启动还是报错 docker desktop windows hypervisor is not present
WSL虚拟磁盘过大,ext4迁移 Windows 中创建软链接和硬链接
WSL2切换默认的Linux子系统
Windows的WSL子系统,自动开启sshd服务
新版docker desktop设置wsl(使用windown的子系统)
WSL 开启ssh
Windows安装网易开源QAnything打造智能客服系统
芯片
国内互联网大厂自研芯片梳理
超算平台—算力供应商
Linux 磁盘扩容
Linux使用growpart工具进行磁盘热扩容(非LVM扩容方式)
关于centos7 扩容提示no tools available to resize disk with 'gpt' - o夜雨随风o - 博客园
(小插曲)neo4j配置apoc插件后检查版本发现:Unknown function ‘apoc.version‘ “EXPLAIN RETURN apoc.version()“
vfio-pci与igb_uio映射硬件资源到DPDK的流程分析
KubeVirt
vnc server配置、启动、重启与连接 - 王约翰 - 博客园
虚拟机Bug解决方案
kubevirt 如何通过CDI上传镜像文件
在 K8S 上也能跑 VM!KubeVirt 簡介與建立(部署篇) | Cloud Solutions
KubeVirt 04:容器化数据导入 – 小菜园
Python
安装 flash_attn
手把手教你在linux上安装pytorch与cuda
AI
在启智社区基于PyTorch运行国产算力卡的模型训练实验
Scaling law
免费的GPT3.5 API
AI Engineer Roadmap & Resources 🤖
模型排行
edk2
K8S删除Evicted状态的pod
docker 中启动 docker
远程本地多用户桌面1.17(一种不让电脑跟你抢键鼠的思路) - 哔哩哔哩
-
+
首页
大数据同步工具之 FlinkCDC/Canal/Debezium 对比
> 本文由 [简悦 SimpRead](http://ksria.com/simpread/) 转码, 原文地址 [blog.csdn.net](https://blog.csdn.net/u011250186/article/details/124808632) 《【硬刚[大数据](https://so.csdn.net/so/search?q=%E5%A4%A7%E6%95%B0%E6%8D%AE&spm=1001.2101.3001.7020)之学习路线篇】从零到大数据专家的学习指南 (全面升级版)》 前言 数据准实时复制(CDC)是目前行内实时数据需求大量使用的技术,随着国产化的需求,我们也逐步考虑基于开源产品进行准实时数据同步工具的相关开发,逐步实现对商业产品的替代。本文把市面上常见的几种开源产品,Canal、Debezium、Flink CDC 从原理和适用做了对比,供大家参考。 本文首发微信公众号《import_bigdata》 Debezium Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases. Debezium is durable and fast, so your apps can respond quickly and never miss an event, even when things go wrong. Debezium 是一种 CDC(Change Data Capture)工具,工作原理类似大家所熟知的 Canal, DataBus, Maxwell 等,是通过抽取数据库日志来获取变更。 Debezium 最初设计成一个 Kafka Connect 的 Source Plugin,目前开发者虽致力于将其与 Kafka Connect 解耦,但当前的代码实现还未变动。下图引自 Debeizum 官方文档,可以看到一个 Debezium 在一个完整 CDC 系统中的位置。 ![](https://img-blog.csdnimg.cn/img_convert/8a613ee1ddc6500f8d778f63e8550404.png) Kafka Connect 为 Source Plugin 提供了一系列的编程接口,最主要的就是要实现 SourceTask 的 poll 方法,其返回 List<SourceRecord> 将会被以最少一次语义的方式投递至 Kafka。 Debezium MySQL 架构 ![](https://img-blog.csdnimg.cn/img_convert/e278a3eaac6a4cb7e979326e1b9d21a6.png) Debezium 抽取原理 Reader 体系构成了 MySQL 模块中代码的主线,我们的分析从 Reader 开始。 ![](https://img-blog.csdnimg.cn/img_convert/ff270e5ee0cbfb2c3e300f49351d4d88.png) Reader 继承关系 从名字上应该可以看出,真正主要的是 SnapshotReader 和 BinlogReader,分别实现了对 MySQL 数据的全量读取和增量读取,他们继承于 AbstractReader,里面封装了共用逻辑,下图是 AbstractReader 的内部设计。 ![](https://img-blog.csdnimg.cn/img_convert/42b773a5818f6c534305e374a1c251fd.png) 可以看到,AbstractReader 在实现时,并没有直接将 enqueue 喂进来的 record 投递进 Kafka,而是通过一个内存阻塞队列 BlockingQueue 进行了解耦,这种设计有诸多好处: 职责解耦 如上的图中,在喂入 BlockingQueue 之前,要根据条件判断是否接受该 record;在向 Kafka 投递 record 之前,判断 task 的 running 状态。这样把同类的功能限定在特定的位置。 线程隔离 BlockingQueue 是一个线程安全的阻塞队列,通过 BlockingQueue 实现的生产者消费者模型,是可以跑在不同的线程里的,这样避免局部的阻塞带来的整体的干扰。如上图中的右侧,消费者会定期判断 running 标志位,若 running 被 stop 信号置为了 false,可以立刻停止整个 task, 而不会因 MySQL IO 阻塞延迟相应。 Single 与 Batch 的互相转化 Enqueue record 是单条的投递 record,drain_to 是批量的消费 records。这个用法也可以反过来,实现 batch 到 single 的转化。 可能你还知道阿里开源的另一个 MySQL CDC 工具 canal,他只负责 stream 过程,并没有处理 snapshot 过程,这也是 debezium 相较于 canal 的一个优势。 对于 Debezium 来说,基本沿用了官方搭建从库的这一思路,让我们看下官方文档描述的详细步骤。 MySQL 连接器每次获取快照的时候会执行以下的步骤: 获取一个全局读锁,从而阻塞住其他数据库客户端的写操作。 开启一个可重复读语义的事务,来保证后续的在同一个事务内读操作都是在一个一致性快照中完成的。 读取 binlog 的当前位置。 读取连接器中配置的数据库和表的模式(schema)信息。 释放全局读锁,允许其他的数据库客户端对数据库进行写操作。 (可选)把 DDL 改变事件写入模式改变 topic(schema change topic),包括所有的必要的 DROP 和 CREATEDDL 语句。 扫描所有数据库的表,并且为每一个表产生一个和特定表相关的 kafka topic 创建事件(即为每一个表创建一个 kafka topic)。 提交事务。 记录连接器成功完成快照任务时的连接器偏移量。 部署 基于 Kafka Connect 最常见的架构是通过 Apache Kafka Connect 部署 Debezium。Kafka Connect 为在 Kafka 和外部存储系统之间系统数据提供了一种可靠且可伸缩性的方式。它为 Connector 插件提供了一组 API 和一个运行时:Connect 负责运行这些插件,它们则负责移动数据。通过 Kafka Connect 可以快速实现 Source Connector 和 Sink Connector 进行交互构造一个低延迟的数据 Pipeline: Source Connector(例如,Debezium):将记录发送到 Kafka Sink Connector:将 Kafka Topic 中的记录发送到其他系统 ![](https://img-blog.csdnimg.cn/img_convert/5fd97ae0481aad796be2522e6f63eacf.png) 如上图所示,部署了 MySQL 和 PostgresSQL 的 Debezium Connector 以捕获这两种类型数据库的变更。每个 Debezium Connector 都会与其源数据库建立连接: MySQL Connector 使用客户端库来访问 binlog。 PostgreSQL Connector 从逻辑副本流中读取数据。 除了 Kafka Broker 之外,Kafka Connect 也作为一个单独的服务运行。默认情况下,数据库表的变更会写入名称与表名称对应的 Kafka Topic 中。如果需要,您可以通过配置 Debezium 的 Topic 路由转换来调整目标 Topic 名称。例如,您可以: 将记录路由到名称与表名不同的 Topic 中 将多个表的变更事件记录流式传输到一个 Topic 中 变更事件记录在 Apache Kafka 中后,Kafka Connect 生态系统中的不同 Sink Connector 可以将记录流式传输到其他系统、数据库,例如 Elasticsearch、数据仓库、分析系统或者缓存(例如 Infinispan)。 Debezium Server 另一种部署 Debezium 的方法是使用 Debezium Server。Debezium Server 是一个可配置的、随时可用的应用程序,可以将变更事件从源数据库流式传输到各种消息中间件上。 下图展示了基于 Debezium Server 的变更数据捕获 Pipeline 架构: ![](https://img-blog.csdnimg.cn/img_convert/2d7f2ba37819d3378f5486d12dca4a50.png) Debezium Server 配置使用 Debezium Source Connector 来捕获源数据库中的变更。变更事件可以序列化为不同的格式,例如 JSON 或 Apache Avro,然后发送到各种消息中间件,例如 Amazon Kinesis、Google Cloud Pub/Sub 或 Apache Pulsar。 嵌入式引擎 使用 Debezium Connector 的另一种方法是嵌入式引擎。在这种情况下,Debezium 不会通过 Kafka Connect 运行,而是作为嵌入到您自定义 Java 应用程序中的库运行。这对于在您的应用程序本身内获取变更事件非常有帮助,无需部署完整的 Kafka 和 Kafka Connect 集群,也不用将变更流式传输到 Amazon Kinesis 等消息中间件上。 特性 Debezium 是一组用于 Apache Kafka Connect 的 Source Connector。每个 Connector 都通过使用该数据库的变更数据捕获 (CDC) 功能从不同的数据库中获取变更。与其他方法(例如轮询或双重写入)不同,Debezium 的实现基于日志的 CDC: 确保捕获所有的数据变更。 以极低的延迟生成变更事件,同时避免因为频繁轮询导致 CPU 使用率增加。例如,对于 MySQL 或 PostgreSQL,延迟在毫秒范围内。 不需要更改您的数据模型,例如 ‘Last Updated’ 列。 可以捕获删除操作。 可以捕获旧记录状态以及其他元数据,例如,事务 ID,具体取决于数据库的功能和配置。 Flink CDC 2020 年 7 月提交了第一个 commit,这是基于个人兴趣孵化的项目; 2020 年 7 中旬支持了 MySQL-CDC; 2020 年 7 月末支持了 Postgres-CDC; 一年的时间,该项目在 GitHub 上的 star 数已经超过 800。 ![](https://img-blog.csdnimg.cn/img_convert/ead76ed9ea3c5a9e82d130f7be06d005.png) Flink CDC 发展 Flink CDC 底层封装了 Debezium, Debezium 同步一张表分为两个阶段: 全量阶段:查询当前表中所有记录; 增量阶段:从 binlog 消费变更数据。 大部分用户使用的场景都是全量 + 增量同步,加锁是发生在全量阶段,目的是为了确定全量阶段的初始位点,保证增量 + 全量实现一条不多,一条不少,从而保证数据一致性。从下图中我们可以分析全局锁和表锁的一些加锁流程,左边红色线条是锁的生命周期,右边是 MySQL 开启可重复读事务的生命周期。 ![](https://img-blog.csdnimg.cn/img_convert/41feb4e846e063ffe99c0736d3837bca.png) 以全局锁为例,首先是获取一个锁,然后再去开启可重复读的事务。这里锁住操作是读取 binlog 的起始位置和当前表的 schema。这样做的目的是保证 binlog 的起始位置和读取到的当前 schema 是可以对应上的,因为表的 schema 是会改变的,比如如删除列或者增加列。在读取这两个信息后,SnapshotReader 会在可重复读事务里读取全量数据,在全量数据读取完成后,会启动 BinlogReader 从读取的 binlog 起始位置开始增量读取,从而保证全量数据 + 增量数据的无缝衔接。 表锁是全局锁的退化版,因为全局锁的权限会比较高,因此在某些场景,用户只有表锁。表锁锁的时间会更长,因为表锁有个特征:锁提前释放了可重复读的事务默认会提交,所以锁需要等到全量数据读完后才能释放。 经过上面分析,接下来看看这些锁到底会造成怎样严重的后果: ![](https://img-blog.csdnimg.cn/img_convert/653ec1f741350723498d0975998c36e7.png) Flink CDC 1.x 可以不加锁,能够满足大部分场景,但牺牲了一定的数据准确性。Flink CDC 1.x 默认加全局锁,虽然能保证数据一致性,但存在上述 hang 住数据的风险。 Flink CDC 1.x 得到了很多用户在社区的反馈,主要归纳为三个: ![](https://img-blog.csdnimg.cn/img_convert/067ebfbe466313d27128be50c49666b5.png) 全量 + 增量读取的过程需要保证所有数据的一致性,因此需要通过加锁保证,但是加锁在数据库层面上是一个十分高危的操作。底层 Debezium 在保证数据一致性时,需要对读取的库或表加锁,全局锁可能导致数据库锁住,表级锁会锁住表的读,DBA 一般不给锁权限。 不支持水平扩展,因为 Flink CDC 底层是基于 Debezium,起架构是单节点,所以 Flink CDC 只支持单并发。在全量阶段读取阶段,如果表非常大 (亿级别),读取时间在小时甚至天级别,用户不能通过增加资源去提升作业速度。 全量读取阶段不支持 checkpoint:CDC 读取分为两个阶段,全量读取和增量读取,目前全量读取阶段是不支持 checkpoint 的,因此会存在一个问题:当我们同步全量数据时,假设需要 5 个小时,当我们同步了 4 小时的时候作业失败,这时候就需要重新开始,再读取 5 个小时。 通过上面的分析,可以知道 2.0 的设计方案,核心要解决上述的三个问题,即支持无锁、水平扩展、checkpoint。 ![](https://img-blog.csdnimg.cn/img_convert/5d57ceebcd1240bb0fbe639c928bbd4e.png) 目前,Flink CDC 2.0 也已经正式发布,此次的核心改进和提升包括: 并发读取,全量数据的读取性能可以水平扩展; 全程无锁,不对线上业务产生锁的风险; 断点续传,支持全量阶段的 checkpoint。 本文发自微信公众号《import_bigdata》 Canal canal [kə'næl],译意为水道 / 管道 / 沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。 基于日志增量订阅和消费的业务包括: 数据库镜像 数据库实时备份 索引构建和实时维护 (拆分异构索引、倒排索引等) 业务 cache 刷新 带业务逻辑的增量数据处理 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。 工作原理 MySQL 主备复制原理 ![](https://img-blog.csdnimg.cn/img_convert/e4e199059a524d5e16326def9f340907.png) MySQL master 将数据变更写入二进制日志 (binary log, 其中记录叫做二进制日志事件 binary log events,可以通过 show binlog events 进行查看) MySQL slave 将 master 的 binary log events 拷贝到它的中继日志 (relay log) MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据 canal 工作原理 canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave, 向 MySQL master 发送 dump 协议 MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal) canal 解析 binary log 对象 (原始为 byte 流) Binlog 获取详解 Binlog 发送接收流程,流程如下图所示: ![](https://img-blog.csdnimg.cn/img_convert/a0252132b7e15ede73535d1f3d9ad684.png) 首先,我们需要伪造一个 slave,向 master 注册,这样 master 才会发送 binlog event。注册很简单,就是向 master 发送 COM_REGISTER_SLAVE 命令,带上 slave 相关信息。这里需要注意,因为在 MySQL 的 replication topology 中,都需要使用一个唯一的 server id 来区别标示不同的 server 实例,所以这里我们伪造的 slave 也需要一个唯一的 server id。 接着实现 binlog 的 dump。MySQL 只支持一种 binlog dump 方式,也就是指定 binlog filename + position,向 master 发送 COM_BINLOG_DUMP 命令。在发送 dump 命令的时候,我们可以指定 flag 为 BINLOG_DUMP_NON_BLOCK,这样 master 在没有可发送的 binlog event 之后,就会返回一个 EOF package。不过通常对于 slave 来说,一直把连接挂着可能更好,这样能更及时收到新产生的 binlog event。 Dump 命令包图如下所示: ![](https://img-blog.csdnimg.cn/img_convert/9127f400c5d8e1f4b8b404dc50dfd6d1.png) 如上图所示, 在报文中塞入 binlogPosition 和 binlogFileName 即可让 master 从相应的位置发送 binlog event。 canal 结构 ![](https://img-blog.csdnimg.cn/img_convert/e3943fcc725aeabe34bf05cbe55b9592.png) 说明: server 代表一个 canal 运行实例,对应于一个 jvm,也可以理解为一个进程 instance 对应于一个数据队列 (1 个 server 对应 1..n 个 instance),每一个数据队列可以理解为一个数据库实例。 Server 设计 ![](https://img-blog.csdnimg.cn/img_convert/fdfb02129daa92ccbc58d5e4f643c533.png) server 代表了一个 canal 的运行实例,为了方便组件化使用,特意抽象了 Embeded(嵌入式) / Netty(网络访问) 的两种实现 Embeded : 对 latency 和可用性都有比较高的要求,自己又能 hold 住分布式的相关技术 (比如 failover) Netty : 基于 netty 封装了一层网络协议,由 canal server 保证其可用性,采用的 pull 模型,当然 latency 会稍微打点折扣,不过这个也视情况而定。(阿里系的 notify 和 metaq,典型的 push/pull 模型,目前也逐步的在向 pull 模型靠拢,push 在数据量大的时候会有一些问题) Instance 设计 ![](https://img-blog.csdnimg.cn/img_convert/71605fa53f9a0c9c467986253b61764e.png) instance 代表了一个实际运行的数据队列,包括了 EventPaser,EventSink,EventStore 等组件。 抽象了 CanalInstanceGenerator,主要是考虑配置的管理方式: manager 方式:和你自己的内部 web console/manager 系统进行对接。(目前主要是公司内部使用,Otter 采用这种方式) spring 方式:基于 spring xml + properties 进行定义,构建 spring 配置. 下面是 canalServer 和 instance 如何运行: canalServer.setCanalInstanceGenerator(new CanalInstanceGenerator() { public CanalInstance generate(String destination) { Canal canal = canalConfigClient.findCanal(destination); // 此处省略部分代码 大致逻辑是设置 canal 一些属性 CanalInstanceWithManager instance = new CanalInstanceWithManager(canal, filter) { protected CanalHAController initHaController() { HAMode haMode = parameters.getHaMode(); if (haMode.isMedia()) { return new MediaHAController(parameters.getMediaGroup(), parameters.getDbUsername(), parameters.getDbPassword(), parameters.getDefaultDatabaseName()); } else { return super.initHaController(); } } protected void startEventParserInternal(CanalEventParser parser, boolean isGroup) { // 大致逻辑是 设置支持的类型 // 初始化设置 MysqlEventParser 的主库信息,这处抽象不好,目前只支持 mysql } }; return instance; } }); canalServer.start(); // 启动 canalServer canalServer.start(destination);// 启动对应 instance this.clientIdentity = new ClientIdentity(destination, pipeline.getParameters().getMainstemClientId(), filter); canalServer.subscribe(clientIdentity);// 发起一次订阅,当监听到 instance 配置时,调用 generate 方法注入新的 instance instance 模块: eventParser (数据源接入,模拟 slave 协议和 master 进行交互,协议解析) eventSink (Parser 和 Store 链接器,进行数据过滤,加工,分发的工作) eventStore (数据存储) metaManager (增量订阅 & 消费信息管理器) EventParser 设计 大致过程: ![](https://img-blog.csdnimg.cn/img_convert/03952d8bb6e1427fb8f4bd6c29f51e59.png) 整个 parser 过程大致可分为几步: Connection 获取上一次解析成功的位置 (如果第一次启动,则获取初始指定的位置或者是当前数据库的 binlog 位点) Connection 建立链接,发送 BINLOG_DUMP 指令 // 0. write command number // 1. write 4 bytes bin-log position to start at // 2. write 2 bytes bin-log flags // 3. write 4 bytes server id of the slave // 4. write bin-log file name Mysql 开始推送 Binaly Log 接收到的 Binaly Log 的通过 Binlog parser 进行协议解析,补充一些特定信息 (补充字段名字,字段类型,主键信息,unsigned 类型处理) 传递给 EventSink 模块进行数据存储,是一个阻塞操作,直到存储成功 存储成功后,由 CanalLogPositionManager 定时记录 Binaly Log 位置 EventSink 设计 ![](https://img-blog.csdnimg.cn/img_convert/0d7be0200aa79757b89a20e295fa34df.png) 说明: 数据过滤:支持通配符的过滤模式,表名,字段内容等 数据路由 / 分发:解决 1:n (1 个 parser 对应多个 store 的模式) 数据归并:解决 n:1 (多个 parser 对应 1 个 store) 数据加工:在进入 store 之前进行额外的处理,比如 join 数据 1:n 业务 为了合理的利用数据库资源, 一般常见的业务都是按照 schema 进行隔离,然后在 mysql 上层或者 dao 这一层面上,进行一个数据源路由,屏蔽数据库物理位置对开发的影响,阿里系主要是通过 cobar/tddl 来解决数据源路由问题。 所以,一般一个数据库实例上,会部署多个 schema,每个 schema 会有由 1 个或者多个业务方关注。 数据 n:1 业务 同样,当一个业务的数据规模达到一定的量级后,必然会涉及到水平拆分和垂直拆分的问题,针对这些拆分的数据需要处理时,就需要链接多个 store 进行处理,消费的位点就会变成多份,而且数据消费的进度无法得到尽可能有序的保证。 所以,在一定业务场景下,需要将拆分后的增量数据进行归并处理,比如按照时间戳 / 全局 id 进行排序归并。 EventStore 设计 目前仅实现了 Memory 内存模式,后续计划增加本地 file 存储,mixed 混合模式。 借鉴了 Disruptor 的 RingBuffer 的实现思路 RingBuffer 设计: ![](https://img-blog.csdnimg.cn/img_convert/a97e559b20d605fdea3681ec17ad4e2b.png) 定义了 3 个 cursor Put : Sink 模块进行数据存储的最后一次写入位置 Get : 数据订阅获取的最后一次提取位置 Ack : 数据消费成功的最后一次消费位置 借鉴 Disruptor 的 RingBuffer 的实现,将 RingBuffer 拉直来看: ![](https://img-blog.csdnimg.cn/img_convert/577c73b32f9f0bbc74b9c7d833c5a54f.png) 实现说明: Put/Get/Ack cursor 用于递增,采用 long 型存储 buffer 的 get 操作,通过取余或者与操作。(与操作:cusor & (size - 1) , size 需要为 2 的指数,效率比较高) HA 机制设计 canal 的 ha 分为两部分,canal server 和 canal client 分别有对应的 ha 实现 canal server: 为了减少对 mysql dump 的请求,不同 server 上的 instance 要求同一时间只能有一个处于 running,其他的处于 standby 状态. canal client: 为了保证有序性,一份 instance 同一时间只能由一个 canal client 进行 get/ack/rollback 操作,否则客户端接收无法保证有序。 整个 HA 机制的控制主要是依赖了 zookeeper 的几个特性,watcher 和 EPHEMERAL 节点 (和 session 生命周期绑定),可以看下我之前 zookeeper 的相关文章。 Canal Server: ![](https://img-blog.csdnimg.cn/img_convert/f6b75eb16d653c6721b4d450624e9f31.png) 大致步骤: canal server 要启动某个 canal instance 时都先向 zookeeper 进行一次尝试启动判断 (实现:创建 EPHEMERAL 节点,谁创建成功就允许谁启动) 创建 zookeeper 节点成功后,对应的 canal server 就启动对应的 canal instance,没有创建成功的 canal instance 就会处于 standby 状态 一旦 zookeeper 发现 canal server A 创建的节点消失后,立即通知其他的 canal server 再次进行步骤 1 的操作,重新选出一个 canal server 启动 instance canal client 每次进行 connect 时,会首先向 zookeeper 询问当前是谁启动了 canal instance,然后和其建立链接,一旦链接不可用,会重新尝试 connect Canal Client 的方式和 canal server 方式类似,也是利用 zookeeper 的抢占 EPHEMERAL 节点的方式进行控制。 本文发自微信公众号《import_bigdata》 总结 CDC 的技术方案非常多,目前业界主流的实现机制可以分为两种: 基于查询的 CDC: 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据; 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更; 不保障实时性,基于离线调度存在天然的延迟。 基于日志的 CDC: 实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源; 保障数据一致性,因为 binlog 文件包含了所有历史变更明细; 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。 对比常见的开源 CDC 方案,我们可以发现: ![](https://img-blog.csdnimg.cn/img_convert/48bf319ac5e265056e0b661c34cfa933.png) 对比增量同步能力: - 基于日志的方式,可以很好的做到增量同步; - 而基于查询的方式是很难做到增量同步的。 对比全量同步能力,基于查询或者日志的 CDC 方案基本都支持,除了 Canal。 而对比全量 + 增量同步的能力,只有 Flink CDC、Debezium、Oracle Goldengate 支持较好。 从架构角度去看,该表将架构分为单机和分布式,这里的分布式架构不单纯体现在数据读取能力的水平扩展上,更重要的是在大数据场景下分布式系统接入能力。例如 Flink CDC 的数据入湖或者入仓的时候,下游通常是分布式的系统,如 Hive、HDFS、Iceberg、Hudi 等,那么从对接入分布式系统能力上看,Flink CDC 的架构能够很好地接入此类系统。 在数据转换 / 数据清洗能力上,当数据进入到 CDC 工具的时候是否能较方便的对数据做一些过滤或者清洗,甚至聚合? 在 Flink CDC 上操作相当简单,可以通过 Flink SQL 去操作这些数据; 但是像 DataX、Debezium 等则需要通过脚本或者模板去做,所以用户的使用门槛会比较高。 另外,在生态方面,这里指的是下游的一些数据库或者数据源的支持。Flink CDC 下游有丰富的 Connector,例如写入到 TiDB、MySQL、Pg、HBase、Kafka、ClickHouse 等常见的一些系统,也支持各种自定义 connector。
yg9538
2022年8月12日 21:48
1123
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
PDF文档
PDF文档(打印)
分享
链接
类型
密码
更新密码