Tips
Go
(18条消息) Go语言自学系列 | golang包_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang并发编程之channel的遍历_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang并发编程之select switch_COCOgsta的博客-CSDN博客_golang select switch
(18条消息) Go语言自学系列 | golang并发编程之runtime包_COCOgsta的博客-CSDN博客_golang runtime包
(18条消息) Go语言自学系列 | golang接口值类型接收者和指针类型接收者_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang并发编程之Timer_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang方法_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang并发编程之WaitGroup实现同步_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang构造函数_COCOgsta的博客-CSDN博客_golang 构造函数
(18条消息) Go语言自学系列 | golang方法接收者类型_COCOgsta的博客-CSDN博客_golang 方法接收者
(18条消息) Go语言自学系列 | golang接口_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang接口和类型的关系_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang结构体_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang结构体_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang标准库os模块 - File文件读操作_COCOgsta的博客-CSDN博客_golang os.file
(18条消息) Go语言自学系列 | golang继承_COCOgsta的博客-CSDN博客_golang 继承
(18条消息) Go语言自学系列 | golang嵌套结构体_COCOgsta的博客-CSDN博客_golang 结构体嵌套
(18条消息) Go语言自学系列 | golang并发编程之Mutex互斥锁实现同步_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang并发变成之通道channel_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang并发编程之原子操作详解_COCOgsta的博客-CSDN博客_golang 原子操作
(18条消息) Go语言自学系列 | golang并发编程之原子变量的引入_COCOgsta的博客-CSDN博客_go 原子变量
(18条消息) Go语言自学系列 | golang并发编程之协程_COCOgsta的博客-CSDN博客_golang 协程 并发
(18条消息) Go语言自学系列 | golang接口嵌套_COCOgsta的博客-CSDN博客_golang 接口嵌套
(18条消息) Go语言自学系列 | golang包管理工具go module_COCOgsta的博客-CSDN博客_golang 包管理器
(18条消息) Go语言自学系列 | golang标准库os模块 - File文件写操作_COCOgsta的博客-CSDN博客_go os模块
(18条消息) Go语言自学系列 | golang结构体的初始化_COCOgsta的博客-CSDN博客_golang 结构体初始化
(18条消息) Go语言自学系列 | golang通过接口实现OCP设计原则_COCOgsta的博客-CSDN博客
(18条消息) Go语言自学系列 | golang标准库os包进程相关操作_COCOgsta的博客-CSDN博客_golang os包
(18条消息) Go语言自学系列 | golang标准库ioutil包_COCOgsta的博客-CSDN博客_golang ioutil
(18条消息) Go语言自学系列 | golang标准库os模块 - 文件目录相关_COCOgsta的博客-CSDN博客_go语言os库
Golang技术栈,Golang文章、教程、视频分享!
(18条消息) Go语言自学系列 | golang结构体指针_COCOgsta的博客-CSDN博客_golang 结构体指针
Ansible
太厉害了,终于有人能把Ansible讲的明明白白了,建议收藏_互联网老辛
ansible.cfg配置详解
Docker
Docker部署
linux安装docker和Docker Compose
linux 安装 docker
Docker中安装Docker遇到的问题处理
Docker常用命令
docker常用命令小结
docker 彻底卸载
Docker pull 时报错:Get https://registry-1.docker.io/v2/library/mysql: net/http: TLS handshake timeout
Docker 拉镜像无法访问 registry-x.docker.io 问题(Centos7)
docker 容器内没有权限
Linux中关闭selinux的方法是什么?
docker run 生成 docker-compose
Docker覆盖网络部署
docker pull后台拉取镜像
docker hub
Redis
Redis 集群别乱搭,这才是正确的姿势
linux_离线_redis安装
怎么实现Redis的高可用?(主从、哨兵、集群) - 雨点的名字 - 博客园
redis集群离线安装
always-show-logo yes
Redis集群搭建及原理
[ERR] Node 172.168.63.202:7001 is not empty. Either the nodealready knows other nodes (check with CLUSTER NODES) or contains some - 亲爱的不二999 - 博客园
Redis daemonize介绍
redis 下载地址
Redis的redis.conf配置注释详解(三) - 云+社区 - 腾讯云
Redis的redis.conf配置注释详解(一) - 云+社区 - 腾讯云
Redis的redis.conf配置注释详解(二) - 云+社区 - 腾讯云
Redis的redis.conf配置注释详解(四) - 云+社区 - 腾讯云
Linux
在终端连接ssh的断开关闭退出的方法
漏洞扫描 - 灰信网(软件开发博客聚合)
find 命令的参数详解
vim 编辑器搜索功能
非root安装rpm时,mockbuild does not exist
Using a SSH password instead of a key is not possible because Host Key checking
(9条消息) 安全扫描5353端口mDNS服务漏洞问题_NamiJava的博客-CSDN博客_5353端口
Linux中使用rpm命令安装rpm包
ssh-copy-id非22端口的使用方法
How To Resolve SSH Weak Key Exchange Algorithms on CentOS7 or RHEL7 - infotechys.com
Linux cp 命令
yum 下载全量依赖 rpm 包及离线安装(终极解决方案) - 叨叨软件测试 - 博客园
How To Resolve SSH Weak Key Exchange Algorithms on CentOS7 or RHEL7 - infotechys.com
RPM zlib 下载地址
运维架构网站
欢迎来到 Jinja2
/usr/local/bin/ss-server -uv -c /etc/shadowsocks-libev/config.json -f /var/run/s
ruby 安装Openssl 默认安装位置
Linux 常用命令学习 | 菜鸟教程
linux 重命名文件和文件夹
linux命令快速指南
ipvsadm
Linux 下查找日志中的关键字
Linux 切割大 log 日志
CentOS7 关于网络的设置
rsync 命令_Linux rsync 命令用法详解:远程数据同步工具
linux 可视化界面安装
[问题已处理]-执行yum卡住无响应
GCC/G++升级高版本
ELK
Docker部署ELK
ELK+kafka+filebeat+Prometheus+Grafana - SegmentFault 思否
(9条消息) Elasticsearch设置账号密码_huas_xq的博客-CSDN博客_elasticsearch设置密码
Elasticsearch 7.X 性能优化
Elasticsearch-滚动更新
Elasticsearch 的内存优化_大数据系统
Elasticsearch之yml配置文件
ES 索引为Yellow状态
Logstash:Grok filter 入门
logstash grok 多项匹配
Mysql
Mysql相关Tip
基于ShardingJDBC实现数据库读写分离 - 墨天轮
MySQL-MHA高可用方案
京东三面:我要查询千万级数据量的表,怎么操作?
OpenStack
(16条消息) openstack项目中遇到的各种问题总结 其二(云主机迁移、ceph及扩展分区)_weixin_34104341的博客-CSDN博客
OpenStack组件介绍
百度大佬OpenStack流程
openstack各组件介绍
OpenStack生产实际问题总结(一)
OpenStack Train版离线部署
使用Packstack搭建OpenStack
K8S
K8S部署
K8S 集群部署
kubeadm 重新 init 和 join-pudn.com
Kubernetes 实战总结 - 阿里云 ECS 自建 K8S 集群 Kubernetes 实战总结 - 自定义 Prometheus
【K8S实战系列-清理篇1】k8s docker 删除没用的资源
Flannel Pod Bug汇总
Java
Jdk 部署
JDK部署
java线程池ThreadPoolExecutor类使用详解 - bigfan - 博客园
ShardingJDBC实现多数据库节点分库分表 - 墨天轮
Maven Repository: Search/Browse/Explore
其他
Git在阿里,我们如何管理代码分支?
chrome F12调试网页出现Paused in debugger
体验IntelliJ IDEA的远程开发(Remote Development) - 掘金
Idea远程调试
PDF转MD
强哥分享干货
优秀开源项目集合
vercel 配合Github 搭建项目Doc门户
如何用 Github Issues 写技术博客?
Idea 2021.3 Maven 3.8.1 报错 Blocked mirror for repositories 解决
列出maven依赖
[2022-09 持续更新] 谷歌 google 镜像 / Sci-Hub 可用网址 / Github 镜像可用网址总结
阿里云ECS迁移
linux访问github
一文教你使用 Docker 启动并安装 Nacos-腾讯云开发者社区-腾讯云
Nginx
Nginx 部署
Nginx 部署安装
Nginx反向代理cookie丢失的问题_longzhoufeng的博客-CSDN博客_nginx 代理后cookie丢失
Linux 系统 Https 证书生成与Nginx配置 https
数据仓库
实时数仓
松果出行 x StarRocks:实时数仓新范式的实践之路
实时数据仓库的一些分层和分层需要处理的事情,以及数据流向
湖仓一体电商项目
湖仓一体电商项目(一):项目背景和架构介绍
湖仓一体电商项目(二):项目使用技术及版本和基础环境准备
湖仓一体电商项目(三):3万字带你从头开始搭建12个大数据项目基础组件
数仓笔记
数仓学习总结
数仓常用平台和框架
数仓学习笔记
数仓技术选型
尚硅谷教程
尚硅谷学习笔记
尚硅谷所有已知的课件资料
尚硅谷大数据项目之尚品汇(11数据质量管理V4.0)
尚硅谷大数据项目之尚品汇(10元数据管理AtlasV4.0)
尚硅谷大数据项目之尚品汇(9权限管理RangerV4.0)
尚硅谷大数据项目之尚品汇(8安全环境实战V4.0)
尚硅谷大数据项目之尚品汇(7用户认证KerberosV4.1)
尚硅谷大数据项目之尚品汇(6集群监控ZabbixV4.1)
尚硅谷大数据项目之尚品汇(5即席查询PrestoKylinV4.0)
尚硅谷大数据项目之尚品汇(4可视化报表SupersetV4.0)
尚硅谷大数据项目之尚品汇(3数据仓库系统)V4.2.0
尚硅谷大数据项目之尚品汇(2业务数据采集平台)V4.1.0
尚硅谷大数据项目之尚品汇(1用户行为采集平台)V4.1.0
数仓治理
数据中台 元数据规范
数据中台的那些 “经验与陷阱”
2万字详解数据仓库数据指标数据治理体系建设方法论
数据仓库,为什么需要分层建设和管理? | 人人都是产品经理
网易数帆数据治理演进
数仓技术
一文看懂大数据生态圈完整知识体系
阿里云—升舱 - 数据仓库升级白皮书
最全企业级数仓建设迭代版(4W字建议收藏)
基于Hue,Dolphinscheduler,HIVE分析数据仓库层级实现及项目需求案例实践分析
详解数据仓库分层架构
数据仓库技术细节
大数据平台组件介绍
总览 2016-2021 年全球机器学习、人工智能和大数据行业技术地图
Apache DolphinScheduler 3.0.0 正式版发布!
数据仓库面试题——介绍下数据仓库
数据仓库为什么要分层,各层的作用是什么
Databend v0.8 发布,基于 Rust 开发的现代化云数据仓库 - OSCHINA - 中文开源技术交流社区
数据中台
数据中台设计
大数据同步工具之 FlinkCDC/Canal/Debezium 对比
有数数据开发平台文档
Shell
Linux Shell 命令参数
shell 脚本编程
一篇教会你写 90% 的 Shell 脚本
Kibana
Kibana 查询语言(KQL)
Kibana:在 Kibana 中的四种表格制作方式
Kafka
Kafka部署
canal 动态监控 Mysql,将 binlog 日志解析后,把采集到的数据发送到 Kafka
OpenApi
OpenAPI 标准规范,了解一下?
OpenApi学术论文
贵阳市政府数据开放平台设计与实现
OpenAPI简介
开放平台:运营模式与技术架构研究综述
管理
技术部门Leader是不是一定要技术大牛担任?
华为管理体系流程介绍
DevOps
*Ops
XOps 已经成为一个流行的术语 - 它是什么?
Practical Linux DevOps
Jenkins 2.x实践指南 (翟志军)
Jenkins 2权威指南 ((美)布伦特·莱斯特(Brent Laster)
DevOps组件高可用的思路
KeepAlived
VIP + KEEPALIVED + LVS 遇到Connection Peer的问题的解决
MinIO
MinIO部署
Minio 分布式集群搭建部署
Minio 入门系列【16】Minio 分片上传文件 putObject 接口流程源码分析
MinioAPI 浅入及问题
部署 minio 兼容 aws S3 模式
超详细分布式对象存储 MinIO 实战教程
Hadoop
Hadoop 部署
Hadoop集群部署
windows 搭建 hadoop 环境(解决 HADOOP_HOME and hadoop.home.dir are unset
Hadoop 集群搭建和简单应用(参考下文)
Hadoop 启动 NameNode 报错 ERROR: Cannot set priority of namenode process 2639
jps 命令查看 DataNode 进程不见了 (hadoop3.0 亲测可用)
hadoop 报错: Operation category READ is not supported in state standby
Spark
Spark 部署
Spark 集群部署
spark 心跳超时分析 Cannot receive any reply in 120 seconds
Spark学习笔记
apache spark - Failed to find data source: parquet, when building with sbt assembly
Spark Thrift Server 架构和原理介绍
InLong
InLong 部署
Apache InLong部署文档
安装部署 - Docker 部署 - 《Apache InLong v1.2 中文文档》 - 书栈网 · BookStack
基于 Apache Flink SQL 的 InLong Sort ETL 方案解析
关于 Apache Pulsar 在 Apache InLong 接入数据
zookeeper
zookeeper 部署
使用 Docker 搭建 Zookeeper 集群
美团技术团队
StarRocks
StarRocks技术白皮书(在线版)
JuiceFS
AI 场景存储优化:云知声超算平台基于 JuiceFS 的存储实践
JuiceFS 在 Elasticsearch/ClickHouse 温冷数据存储中的实践
JuiceFS format
元数据备份和恢复 | JuiceFS Document Center
JuiceFS 元数据引擎选型指南
Apache Hudi 使用文件聚类功能 (Clustering) 解决小文件过多的问题
普罗米修斯
k8s 之 Prometheus(普罗米修斯)监控,简单梳理下 K8S 监控流程
k8s 部署 - 使用helm3部署监控prometheus(普罗米修斯),从零到有,一文搞定
k8s 部署 - 使用 helm3 部署监控 prometheus(普罗米修斯),从零到有,一文搞定
k8s 部署 - 如何完善 k8s 中 Prometheus(普罗米修斯)监控项目呢?
k8s 部署 - k8s 中 Prometheus(普罗米修斯)的大屏展示 Grafana + 监控报警
zabbix
一文带你掌握 Zabbix 监控系统
Stream Collectors
Nvidia
Nvidia API
CUDA Nvidia驱动安装
NVIDIA驱动失效简单解决方案:NVIDIA-SMI has failed because it couldn‘t communicate with the NVIDIA driver.
ubuntu 20 CUDA12.1安装流程
nvidia开启持久化模式
nvidia-smi 开启持久化
Harbor
Harbor部署文档
Docker 爆出 it doesn't contain any IP SANs
pandoc
其他知识
大模型
COS 597G (Fall 2022): Understanding Large Language Models
如何优雅的使用各类LLM
ChatGLM3在线搜索功能升级
当ChatGLM3能用搜索引擎时
OCR神器,PDF、数学公式都能转
Stable Diffusion 动画animatediff-cli-prompt-travel
基于ERNIE Bot自定义虚拟数字人生成
pika负面提示词
开通GPT4的方式
GPT4网站
低价开通GPT Plus
大模型应用场景分享
AppAgent AutoGPT变体
机器学习
最大似然估计
权衡偏差(Bias)和方差(Variance)以最小化均方误差(Mean Squared Error, MSE)
伯努利分布
方差计算公式
均值的高斯分布估计
没有免费午餐定理
贝叶斯误差
非参数模型
最近邻回归
表示容量
最优容量
权重衰减
正则化项
Sora
Sora官方提示词
看完32篇论文,你大概就知道Sora如何炼成? |【经纬低调出品】
Sora论文
Sora 物理悖谬的几何解释
Sora 技术栈讨论
RAG垂直落地
DB-GPT与TeleChat-7B搭建相关RAG知识库
ChatWithRTX
ChatRTX安装教程
ChatWithRTX 踩坑记录
ChatWithRTX 使用其他量化模型
ChatWithRTX介绍
RAG 相关资料
英伟达—大模型结合 RAG 构建客服场景自动问答
又一大模型技术开源!有道自研RAG引擎QAnything正式开放下载
收藏!RAG入门参考资料开源大总结:RAG综述、介绍、比较、预处理、RAG Embedding等
RAG调研
解决现代RAG实际生产问题
解决现代 RAG 系统中的生产问题-II
Modular RAG and RAG Flow: Part Ⅰ
Modular RAG and RAG Flow: Part II
先进的Retriever技术来增强你的RAGs
高级RAG — 使用假设文档嵌入 (HyDE) 改进检索
提升 RAG:选择最佳嵌入和 Reranker 模型
LangGraph
增强型RAG:re-rank
LightRAG:使用 PyTorch 为 LLM 应用程序提供支持
RAG 101:分块策略
模型训练
GPU相关资料
[教程] conda安装简明教程(基于miniconda和Windows)
PyTorch CUDA对应版本 | PyTorch
资料
李一舟课程全集
零碎资料
苹果各服共享ID
数据中心网络技术概览
华为大模型训练学习笔记
百度AIGC工程师认证考试答案(可换取工信部证书)
百度智能云生成式AI认证工程师 考试和证书查询指南
深入理解 Megatron-LM(1)基础知识
QAnything
接入QAnything的AI问答知识库,可私有化部署的企业级WIKI知识库
wsl --update失效Error code: Wsl/UpdatePackage/0x80240438的解决办法
Docker Desktop 启动docker engine一直转圈解决方法
win10开启了hyper-v,docker 启动还是报错 docker desktop windows hypervisor is not present
WSL虚拟磁盘过大,ext4迁移 Windows 中创建软链接和硬链接
WSL2切换默认的Linux子系统
Windows的WSL子系统,自动开启sshd服务
新版docker desktop设置wsl(使用windown的子系统)
WSL 开启ssh
Windows安装网易开源QAnything打造智能客服系统
芯片
国内互联网大厂自研芯片梳理
超算平台—算力供应商
Linux 磁盘扩容
Linux使用growpart工具进行磁盘热扩容(非LVM扩容方式)
关于centos7 扩容提示no tools available to resize disk with 'gpt' - o夜雨随风o - 博客园
(小插曲)neo4j配置apoc插件后检查版本发现:Unknown function ‘apoc.version‘ “EXPLAIN RETURN apoc.version()“
vfio-pci与igb_uio映射硬件资源到DPDK的流程分析
KubeVirt
vnc server配置、启动、重启与连接 - 王约翰 - 博客园
虚拟机Bug解决方案
kubevirt 如何通过CDI上传镜像文件
在 K8S 上也能跑 VM!KubeVirt 簡介與建立(部署篇) | Cloud Solutions
KubeVirt 04:容器化数据导入 – 小菜园
Python
安装 flash_attn
手把手教你在linux上安装pytorch与cuda
AI
在启智社区基于PyTorch运行国产算力卡的模型训练实验
Scaling law
免费的GPT3.5 API
AI Engineer Roadmap & Resources 🤖
模型排行
edk2
K8S删除Evicted状态的pod
docker 中启动 docker
远程本地多用户桌面1.17(一种不让电脑跟你抢键鼠的思路) - 哔哩哔哩
华为鲲鹏服务器(ARM架构)部署Prometheus
在Linux上安装配置Grafana_AI开发平台ModelArts_华为云
abrt-ccpp干崩服务器查询记录
kubevirt 中文社区
VNCServer 连接方法
Pod创建流程代码版本[kubelet篇]
[译]深入剖析 Kubernetes MutatingAdmissionWebhook-腾讯云开发者社区-腾讯云
[译]深入剖析 Kubernetes MutatingAdmissionWebhook-腾讯云开发者社区-腾讯云
深入理解 Kubernetes Admission Webhook-阳明的博客
CentOS7 安装 mbedtls和mbedtls-devel
docker in docker 启动命令
go 协程泄漏 pprof
-
+
首页
基于 Apache Flink SQL 的 InLong Sort ETL 方案解析
> 本文由 [简悦 SimpRead](http://ksria.com/simpread/) 转码, 原文地址 [mp.weixin.qq.com](https://mp.weixin.qq.com/s?__biz=Mzk0NDMwMTY1OQ==&mid=2247484562&idx=1&sn=50b12e54b0c382d1bf046c8e6022711d&chksm=c327f7a6f4507eb0dca99a53414f09dcc436fd9b1dec17a7f7c026a963ad17e82b3d7d8be342&scene=178&cur_album_id=2152999043877126148#rd) ▍背景 ======================= 随着 Apache InLong(incubating) 的用户和开发者逐渐增多,更丰富的使用场景和低成本运营诉求越来越强烈,其中,InLong 全链路增加 Transform(T)的需求反馈最多。经过 @yunqingmoswu、@EMsnap、@gong、@thexiay 社区开发者的调研和设计,完成了基于 Flink SQL 的 InLong Sort ETL 方案,本文将详细介绍该方案的实现细节。 首先,基于 Apache Flink SQL 主要有以下方面的考量: * Flink SQL 拥有强大的表达能力带来的高可扩展性、灵活性,基本上 Flink SQL 能支持社区大多数需求场景。当 Flink SQL 内置的函数不满足需求时,我们还可通过各种 UDF 来扩展。 * Flink SQL 相比 Flink 底层 API 实现开发成本更低,只有第一次需要实现 Flink SQL 的转换逻辑,后续可专注于 Flink SQL 能力本身的构建,比如扩展 Connector、自定义函数 UDF 等。 * 一般来说,Flink SQL 将更健壮、运行也将更稳定。原因在于 Flink SQL 屏蔽了 Flink 底层大量的细节,有强大的社区支持,并且经过大量用户的实践。 * 对用户来说,Flink SQL 也更加通俗易懂,特别是对使用过 SQL 用户来说,使用方式简单、熟悉,这有助于用户快速落地。 * 对于存量实时任务的迁移,如果其原本就是 SQL 类型的任务,尤其是 Flink SQL 任务,其迁移成本极低,部分情况下甚至都不用做任何改动。 注意:本方案的所有代码,可以参考 Apache InLong Sort 模块,所含功能可在即将发布的 1.2.0 版本中下载使用。 ********▍**********方案介绍** ### **2.1 方案需求** ### 该方案的主要需求,是完成的 InLong Sort 模块 Transform(T)能力,包括: <table><colgroup><col><col></colgroup><tbody><tr><th width="179"><strong>Transform</strong></th><th width="356"><p>说明</p></th></tr><tr><td width="40">字段分割</td><td width="356">将一个字段通过某个分割符分割为多个新的字段</td></tr><tr><td width="40">内容提取</td><td width="356">提取一个字段的一部分产生一个新的字段</td></tr><tr><td width="40">窗口内去重</td><td width="356">在一个时间窗口内对数据去重</td></tr><tr><td colspan="1" width="40">时间窗口聚合</td><td colspan="1" width="356">在一个时间窗口内对数据进行聚合操作</td></tr><tr><td colspan="1" width="40">字符串替换</td><td colspan="1" width="356">将替换一个字符串字段中的部分或全部内容</td></tr><tr><td colspan="1" width="40">值替换</td><td colspan="1" width="356">给定一个匹配值,如果该字段的值等于该值,则将其替换为目标值</td></tr><tr><td colspan="1" width="40">时间格式转换</td><td colspan="1" width="356">将一个字段的值转换为目标时间格式的字符串</td></tr><tr><td colspan="1" width="40">连接</td><td colspan="1" width="356">支持两表 Join</td></tr><tr><td colspan="1" width="40">数据过滤</td><td colspan="1" width="356">将满足过滤条件的数据舍弃或者保留</td></tr></tbody></table> **2.2** **使用场景** ---------------- 大数据集成的用户,在很多业务场景下都有数据转换、连接、过滤等 Transform 需求。 **2.3 设计目标** ------------ 本次设计需要达到以下目标: * 功能性:在 InLong Sort 现有架构、数据流模型下,覆盖基本的 Transform 能力,并具备快速扩张的能力。 * 兼容性:新的 InLong Sort 数据模型向前兼容,确保历史任务能够正常配置运行。 * 可维护性:InLong Sort 数据模型转 Flink SQL 只需实现一遍,后期有新增的功能需求时,这块不需要改动,哪怕有改动也是少量改动即可支持。 * 可扩展性:当出现开源 Flink Connector 或者内置 Flink SQL 函数不满足需求时,可通过自定义 Flink Connector、UDF 来实现其功能扩展。 **2.4 基本概念** ------------ 核心概念参照概要设计中的名词解释 <table><colgroup><col><col></colgroup><tbody><tr><th>名称</th><th>含义</th></tr><tr><td colspan="1">InLong Dashborad</td><td colspan="1">Inlong 前端管理界面</td></tr><tr><td>InLong Manager Client </td><td>将 Manager 当中的接口进行包装,供外部用户程序调用,不经过前端 InLong Dashboard</td></tr><tr><td colspan="1">InLong Manager Openapi</td><td colspan="1">Inlong manager 与外部系统调用接口</td></tr><tr><td colspan="1">InLong Manager metaData</td><td colspan="1">Inlong manager 元数据管理,包括 group、stream 纬度的元数据信息</td></tr><tr><td colspan="1">InLong Manager task manager</td><td colspan="1">Inlong manager 中管理数据源采集任务模块,管理 agent 的任务下发,指令下发,心跳上报</td></tr><tr><td>InLong Group</td><td>数据流组,包含多个数据流,一个 Group 代表一个数据接入</td></tr><tr><td>InLong Stream</td><td>数据流,一个数据流有具体的流向</td></tr><tr><td colspan="1">Stream Source</td><td colspan="1">流中有对应的采集端和 sink 端,本设计中只涉及到 stream source</td></tr><tr><td colspan="1">Stream Info</td><td colspan="1">Sort 中数据流向的抽象,包含该数据流的各种来源、转换、去向等</td></tr><tr><td colspan="1">Group Info</td><td colspan="1">Sort 中对数据流向的封装,一个 GroupInfo 可包含多个 Stream Info</td></tr><tr><td colspan="1">Node</td><td colspan="1">数据同步中数据源、数据转换、数据去向的抽象</td></tr><tr><td colspan="1">Extract Node</td><td colspan="1">数据同步的来源端抽象</td></tr><tr><td colspan="1">Load Node</td><td colspan="1">数据同步的去向端抽象</td></tr><tr><td colspan="1">MySQL Extract Node</td><td colspan="1">MySQL 数据来源抽象</td></tr><tr><td colspan="1">Kafka Load Node</td><td colspan="1">kafka 数据去向抽象</td></tr><tr><td colspan="1">Transform Node</td><td colspan="1">数据同步的转换过程抽象</td></tr><tr><td colspan="1">Aggregate Transform Node</td><td colspan="1">数据同步聚合类转换过程抽象</td></tr><tr><td colspan="1">Node Relation</td><td colspan="1">数据同步中各个节点关系抽象</td></tr><tr><td colspan="1">Field Relation </td><td colspan="1">数据同步中上下游节点字段间关系的抽象</td></tr><tr><td colspan="1">Function</td><td colspan="1">转换函数的抽象,即数据同步 T 中各个 T 能力实现的抽象</td></tr><tr><td colspan="1">Substring Function</td><td colspan="1">字符串截取函数的抽象</td></tr><tr><td colspan="1">Filter Function</td><td colspan="1">数据过滤函数的抽象</td></tr><tr><td colspan="1">Function Param</td><td colspan="1">函数的入参抽象</td></tr><tr><td colspan="1">Constant Param</td><td colspan="1">常量参数</td></tr><tr><td colspan="1">Field Info</td><td colspan="1">节点字段</td></tr><tr><td colspan="1">Meta FieldInfo</td><td colspan="1">节点元信息字段</td></tr></tbody></table> **2.5 领域模型** 本次设计主要涉及到以下实体: Group、Stream、GroupInfo、StreamInfo、Node、NodeRelation、FieldRelation、Function、FilterFunction、SubstringFunction、FunctionParam、FieldInfo、MetaFieldInfo、MySQLExtractNode、KafkaLoadNode 等 为了便于理解,本小节将对实体之间关系进行建模分析。领域模型的实体对应关系说明: * 一个 Group 对应一个 GroupInfo * 一个 Group 包含一个或者多个 Stream * 一个 Stream 对应一个 StreamInfo * 一个 GroupInfo 包含一个或者多个 StreamInfo * 一个 StreamInfo 包含多个 Node * 一个 StreamInfo 包含 1 个或者多个 NodeRelation * 一个 NodeRelation 包含 1 个或者多个 FieldRelation * 一个 NodeRelation 包含 0 个或者多个 FilterFunction * 一个 FieldRelation 包含 1 个 Function 或者一个 FieldInfo 作为来源字段,一个 FieldInfo 作为目标字段 * 一个 Function 包含 1 个或者多个 FunctionParam 上述关系由 UML 对象关系图可以表示为: ![](http://kmgy.top:9090/image/2022/9/12/640_repeat_1662971925625__154527.png) **2.6 功能用例图** ------------- ![](http://kmgy.top:9090/image/2022/9/12/640_repeat_1662971925626__511889.png) ********▍系统概要设计******** ========================== **3.1 系统架构图** ------------- ![](http://kmgy.top:9090/image/2022/9/12/640_repeat_1662971925623__828425.png) * Serialization: 序列化实现模块 * Deserialization: 反序列化实现模块 * Flink Source: 自定义 Flink source 实现模块 * Flink Sink: 自定义的 Flink sink 实现模块 * Transformation: 自定义的 Transform 实现模块 * GroupInfo: 对应 Inlong group * StreamInfo: 对应 inlong stream * Node: 对数据同步中数据来源、数据转换、数据去向的抽 * FlinkSQLParser: SQL 解析器 **3.2 InLong Sort 内部运行流程图** ![](http://kmgy.top:9090/image/2022/9/12/640_repeat_1662971925621__854968.png) **3.3 模块设计** ------------ 本次设计只对原有系统增加 Flink Connector、FlinkSQL Generator 两个模块,对 Data Model 模块有修改。 ### 3.3.1 模块结构 ![](http://kmgy.top:9090/image/2022/9/12/640_repeat_1662971925770__612946.png) 3.3.2 模块划分 重要模块划分说明: <table data-resize-percent="48.76140808344198"><colgroup><col data-resize-pixel="227" data-resize-percent="30.388219544846052" data-offset-left="40.5" data-offset-right="267.5"><col data-resize-pixel="520" data-resize-percent="69.61178045515395" data-offset-left="267.5" data-offset-right="787.5"></colgroup><tbody><tr><th>名称</th><th>说明</th></tr><tr><td>FlinkSQLParser</td><td>用于生成 FlinkSQL 核心类,包含 GroupInfo 的引用</td></tr><tr><td>GroupInfo</td><td>Sort 内部对 inlong group 的抽象,用于封装整个 inlong group 同步相关信息,包含对 List<StreamInfo> 的引用</td></tr><tr><td>StreamInfo</td><td>Sort 内部对 inlong stream 的抽象,用于封装 inlong stream 同步相关信息,包含 List<Node>、List<NodeRelation> 的引用</td></tr><tr><td colspan="1">Node</td><td colspan="1">同步节点的顶层接口,它的各个子类实现主要用于对同步数据源、转换节点的数据封装</td></tr><tr><td colspan="1">ExtractNode</td><td colspan="1">数据 extract 节点抽象, 继承自 Node</td></tr><tr><td colspan="1">LoadNode</td><td colspan="1">数据 load 节点抽象, 继承自 Node</td></tr><tr><td colspan="1">TransformNode</td><td colspan="1">数据转换节点抽象,继承自 Node</td></tr><tr><td colspan="1">NodeRelation</td><td colspan="1">定义节点间的关系</td></tr><tr><td colspan="1">FieldRelation</td><td colspan="1">定义节点间字段的关系</td></tr><tr><td colspan="1">Function</td><td colspan="1">T 能力执行函数的抽象</td></tr><tr><td colspan="1">FilterFunction</td><td colspan="1">用于数据过滤的 Function 抽象, 继承自 Function</td></tr><tr><td colspan="1">SubstringFunction</td><td colspan="1">用于字符串截取 Function 抽象, 继承自 Function</td></tr><tr><td colspan="1">FunctionParam</td><td colspan="1">用于函数参数的抽象</td></tr><tr><td colspan="1">ConstantParam</td><td colspan="1">函数常量参数的封装,继承自 FunctionParam</td></tr><tr><td colspan="1">FieldInfo</td><td colspan="1">节点字段的封装,也可做函数入参使用, 继承自 FunctionParam</td></tr><tr><td colspan="1">MetaFieldInfo</td><td colspan="1">内置字段的封装,目前主要用于 canal-json 的元数据字段场景, 继承自 FieldInfo</td></tr></tbody></table> ▍系统详细设计 ======================= 下面具体以 MySQL 同步数据到 Kafka 为例来说明 SQL 生成的原理 **4.1 Node 生成 SQL** -------------------- ### 4.1.1 ExtractNode 生成 SQL 节点配置为:nodeconfig1 ``` private Node buildMySQLExtractNode() { List<FieldInfo> fields = Arrays.asList( new FieldInfo("name", new StringFormatInfo()), new FieldInfo("age", new IntFormatInfo())); return new MySqlExtractNode("1", "mysql_input", fields, null, null, "id", Collections.singletonList("tableName"), "localhost", "root", "password", "inlong", null, null, null, null); } ``` 生成的 SQL 为:ss ``` CREATE TABLE `mysql_1` (`name` string,`age` int) with ('connector' = 'mysql-cdc-inlong', 'hostname' = 'localhost', 'username' = 'root', 'password' = 'password', 'database-name' = 'inlong', 'table-name' = 'tableName') ``` ### 4.1.2 TransformNode 生成 SQL 节点配置为:nodeconfig2 ``` List<FilterFunction> filters = Arrays.asList( new SingleValueFilterFunction(EmptyOperator.getInstance(), new FieldInfo("age", new IntFormatInfo()), LessThanOperator.getInstance(), new ConstantParam(25)), new SingleValueFilterFunction(AndOperator.getInstance(), new FieldInfo("age", new IntFormatInfo()), MoreThanOrEqualOperator.getInstance(), new ConstantParam(18)) ); ``` 生成的 SQL 为:ss2 ``` SELECT `name` AS `name`,`age` AS `age` FROM `mysql_1` WHERE `age` < 25 AND `age` >= 18 ``` ### 4.1.3 LoadNode 生成 SQL 节点配置为:nodeconfig3 ``` private Node buildKafkaLoadNode(FilterStrategy filterStrategy) { List<FieldInfo> fields = Arrays.asList( new FieldInfo("name", new StringFormatInfo()), new FieldInfo("age", new IntFormatInfo()) ); List<FieldRelation> relations = Arrays .asList( new FieldRelation(new FieldInfo("name", new StringFormatInfo()), new FieldInfo("name", new StringFormatInfo())), new FieldRelation(new FieldInfo("age", new IntFormatInfo()), new FieldInfo("age", new IntFormatInfo())) ); List<FilterFunction> filters = Arrays.asList( new SingleValueFilterFunction(EmptyOperator.getInstance(), new FieldInfo("age", new IntFormatInfo()), LessThanOperator.getInstance(), new ConstantParam(25)), new SingleValueFilterFunction(AndOperator.getInstance(), new FieldInfo("age", new IntFormatInfo()), MoreThanOrEqualOperator.getInstance(), new ConstantParam(18)) ); return new KafkaLoadNode("2", "kafka_output", fields, relations, filters, filterStrategy, "topic1", "localhost:9092", new CanalJsonFormat(), null, null, "id"); } ``` 生成的 SQL 为:ss3 ``` CREATE TABLE `kafka_3` (`name` string,`age` int) with ( 'connector' = 'kafka-inlong', 'topic' = 'topic1', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'canal-json-inlong', 'canal-json-inlong.ignore-parse-errors' = 'true', 'canal-json-inlong.map-null-key.mode' = 'DROP', 'canal-json-inlong.encode.decimal-as-plain-number' = 'true', 'canal-json-inlong.timestamp-format.standard' = 'SQL', 'canal-json-inlong.map-null-key.literal' = 'null' ) ``` **4.2 字段 T 生成 SQ****L** ----------------------- ### 4.2.1 过滤算子 相关配置见 4.1 节点配置 生成的 SQL 分别为:ss4 ``` INSERT INTO `kafka_3` SELECT `name` AS `name`,`age` AS `age` FROM `mysql_1` WHERE `age` < 25 AND `age` >= 18 ``` 4.2.2 水位线 GroupInfo 完整配置如下:nodeconfig3 ``` private Node buildMySqlExtractNode() { List<FieldInfo> fields = Arrays.asList( new FieldInfo("name", new StringFormatInfo()), new FieldInfo("age", new IntFormatInfo()), new FieldInfo("ts", new TimestampFormatInfo())); WatermarkField wk = new WatermarkField(new FieldInfo("ts", new TimestampFormatInfo()), new StringConstantParam("1"), new TimeUnitConstantParam(TimeUnit.MINUTE)); return new MySqlExtractNode("1", "mysql_input", fields, wk, null, "id", Collections.singletonList("tableName"), "localhost", "root", "password", "inlong", null, null, null, null); } private Node buildKafkaNode() { List<FieldInfo> fields = Arrays.asList( new FieldInfo("name", new StringFormatInfo()), new FieldInfo("age", new IntFormatInfo()), new FieldInfo("ts", new TimestampFormatInfo())); List<FieldRelation> relations = Arrays .asList(new FieldRelation(new FieldInfo("name", new StringFormatInfo()), new FieldInfo("name", new StringFormatInfo())), new FieldRelation(new FieldInfo("age", new IntFormatInfo()), new FieldInfo("age", new IntFormatInfo())) ); return new KafkaLoadNode("2", "kafka_output", fields, relations, null, null, "topic", "localhost:9092", new JsonFormat(), 1, null, "id"); } private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) { List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList()); List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList()); return new NodeRelation(inputIds, outputIds); } @Override public GroupInfo getTestObject() { Node input = buildMySqlExtractNode(); Node output = buildKafkaNode(); StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(input, output), Collections.singletonList( buildNodeRelation(Collections.singletonList(input), Collections.singletonList(output)))); return new GroupInfo("1", Collections.singletonList(streamInfo)); } ```
yg9538
2022年9月12日 16:41
557
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
PDF文档
PDF文档(打印)
分享
链接
类型
密码
更新密码