跳到主要内容

蚂蚁云原生应用运行时的探索和实践 - ArchSummit 上海

· 阅读需 22 分钟

Mesh 模式的引入是实现应用云原生的关键路径,蚂蚁集团已在内部实现大规模落地。随着 Message、DB、Cache Mesh 等更多的中间件能力的下沉,从 Mesh 演进而来的应用运行时将是中间件技术的未来形态。应用运行时旨在帮助开发人员快速的构建云原生应用,帮助应用和基础设施进一步解耦,而应用运行时最核心是 API 标准,期望社区一起共建。

蚂蚁集团 Mesh 化介绍

蚂蚁是一家技术和创新驱动的公司,从最早淘宝里的一个支付应用,到现在服务 全球十二亿用户的大型公司,蚂蚁的技术架构演进大概会分为如下几个阶段:

2006 之前,最早的支付宝就是一个集中式的单体应用,不同的业务做了模块化的开发。

2007 年的时候,随着更多场景支付的推广,开始做了一下应用、数据的拆分,做了 SOA 化的一些改造。

2010 年之后,推出了快捷支付,移动支付,支撑双十一,还有余额宝等现象级产品,用户数到了亿这个级别,蚂蚁的应用数也数量级的增长,蚂蚁自研了很多全套的微服务中间件去支撑蚂蚁的业务;

2014 年,像借呗花呗、线下支付、更多场景更多业务形态的出现,对蚂蚁的可用性和稳定性提出更高的要求,蚂蚁对微服务中间件进行了 LDC 单元化的支持,支撑业务的异地多活,以及为了支撑双十一超大流量的混合云上的弹性扩缩容。

2018 年,蚂蚁的业务不仅仅是数字金融,还有数字生活、国际化等一些新战略的出现,促使我们要有更加高效的技术架构能让业务跑得更快更稳,所以蚂蚁结合业界比较流行的云原生的理念,在内部进行了 Service Mesh、Serverless、可信原生方向的一些落地。

可以看到蚂蚁的技术架构也是跟随公司的业务创新不断演进的,前面的从集中式到 SOA 再到微服务的过程,相信搞过微服务的同学都深有体会,而从微服务到云原生的实践是蚂蚁近几年自己探索出来的。

为什么要引入 Service Mesh

蚂蚁既然有一套完整的微服务治理中间件,那为什么还需要引入 Service Mesh 呢?

拿蚂蚁自研的服务框架 SOFARPC 为例,它是一个功能强大的 SDK,包含了服务发现、路由、熔断限流等一系列能力。在一个基本的 SOFA(Java) 应用里,业务代码集成了 SOFARPC 的 SDK,两者在一个进程里运行。在蚂蚁的大规模落地微服务之后,我们就面临了如下的一些问题:

升级成本高:SDK 是需要业务代码引入的,每次的升级都需要应用修改代码进行发布。由于应用规模较大,在一些大的技术变更或者安全问题修复的时候。每次需要数千个应用一起升级,费时费力。 版本碎片化:由于升级成本高,SDK 版本碎片化严重,这就导致我们写代码的时候需要兼容历史逻辑,整体技术演进困难。 跨语言无法治理:蚂蚁的中后台在线应用大多使用 Java 作为技术栈,但是在前台、AI、大数据等领域有很多的跨语言应用,例如 C++/Python/Golang 等等,由于没有对应语言的 SDK,他们的服务治理能力其实是缺失的。

我们注意到云原生里有 Service Mesh 一些理念开始出现,所以开始往这个方向探索。在 Service Mesh 的理念里,有两个概念,一个是 Control Plane 控制平面,一个是 Data Plane 数据平面。控制面这里暂时不展开,其中数据平面的核心思想就是解耦,将一些业务无需关系的复杂逻辑(如 RPC 调用里的服务发现、服务路由、熔断限流、安全)抽象到一个独立进程里去。只要保持业务和独立进程的通信协议不变,这些能力的演进可以跟随这个独立的进程自主升级,整个 Mesh 就可以做到统一演进。而我们的跨语言应用,只要流量是经过我们的 Data Plane 的,都可以享受到刚才提到的各种服务治理相关的能力,应用对底层的基础设施能力是透明的,真正的云原生的。

蚂蚁 Mesh 落地过程

所以从 2017 年底开始,蚂蚁就开始探索 Service Mesh 的技术方向,并提出了 基础设施统一,业务无感升级 的愿景。主要的里程碑就是:

2017 年底开始技术预研 Service Mesh 技术,并确定为未来发展方向;

2018 年初开始用 Golang 自研 Sidecar MOSN 并开源,主要支持 RPC 在双十一小范围试点;

2019 年 618,新增 Message Mesh 和 DB Mesh 的形态,覆盖若干核心链路,支撑 618 大促;

2019 年双十一,覆盖了所有大促核心链路几百个应用,支撑当时的双十一大促;

2020 年双十一,全站超过 80% 的在线应用接入了 Mesh 化,整套 Mesh 体系也具备了 2 个月从能力开发到全站升级完成的能力。

蚂蚁 Mesh 落地架构

目前 Mesh 化在蚂蚁落地规模是应用约数千个,容器数十万的级别,这个规模的落地,在业界是数一数二的,根本就没有前人的路可以学习,所以蚂蚁在落地过程中,也建设一套完整的研发运维体系去支撑蚂蚁的 Mesh 化。

蚂蚁 Mesh 架构大概如图所示,底下是我们的控制平面,里面部署了服务治理中心、PaaS、监控中心等平台的服务端,都是现有的一些产品。还有就是我们的运维体系,包括研发平台和 PaaS 平台。那中间是我们的主角数据平面 MOSN,里面管理了 RPC、消息、MVC、任务四种流量,还有健康检查、监控、配置、安全、技术风险都下沉的基础能力,同时 MOSN 也屏蔽了业务和基础平台的一些交互。DBMesh 在蚂蚁是一个独立的产品,图里就没画出来。然后最上层是我们的一些应用,目前支持 Java、Nodejs 等多种语言的接入。 对应用来说,Mesh 虽然能做到基础设施解耦,但是接入还是需要一次额外的升级成本,所以为了推进应用的接入,蚂蚁做了整个研发运维流程的打通,包括在现有框架上做最简化的接入,通过分批推进把控风险和进度,让新应用默认接入 Mesh 化等一些事情。

同时随着下沉能力的越来越多,各个能力之前也面临了研发协作的一些问题,甚至互相影响性能和稳定性的问题,所以对于 Mesh 自身的研发效能,我们也做了一下模块化隔离、新能力动态插拔、自动回归等改进,目前一个下沉能力从开发到全站推广完成可以在 2 个月内完成。

云原生应用运行时上的探索

大规模落地后的新问题与思考

蚂蚁 Mesh 大规模落地之后,目前我们遇到了一些新的问题: 跨语言 SDK 的维护成本高:拿 RPC 举例,大部分逻辑已经下沉到了 MOSN 里,但是还有一部分通信编解码协议的逻辑是在 Java 的一个轻量级 SDK 里的,这个 SDK 还是有一定的维护成本的,有多少个语言就有多少个轻量级 SDK,一个团队不可能有精通所有语言的研发,所以这个轻量级 SDK 的代码质量就是一个问题。

业务兼容不同环境的新场景:蚂蚁的一部分应用是既部署在蚂蚁内部,也对外输出到金融机构的。当它们部署到蚂蚁时,对接的是蚂蚁的控制面,当对接到银行的时候,对接的是银行已有的控制面。目前大多数应用的做法是自己在代码里封装一层,遇到不支持的组件就临时支持对接一下。

从 Service Mesh 到 Multi-Mesh:蚂蚁最早的场景是 Service Mesh,MOSN 通过网络连接代理的方式进行了流量拦截,其它的中间件都是通过原始的 SDK 与服务端进行交互。而现在的 MOSN 已经不仅仅是 Service Mesh 了,而是 Multi-Mesh,因为除了 RPC,我们还支持了更多中间件的 Mesh 化落地,包括消息、配置、缓存的等等。可以看到每个下沉的中间件,在应用侧几乎都有一个对应的轻量级 SDK 存在,这个在结合刚才的第一问题,就发现有非常多的轻量级 SDK 需要维护。为了保持功能不互相影响,每个功能它们开启不同的端口,通过不同的协议去和 MOSN 进行调用。例如 RPC 用的 RPC 协议,消息用的 MQ 协议,缓存用的 Redis 协议。然后现在的 MOSN 其实也不仅仅是面向流量了,例如配置就是暴露了一下 API 给业务代码去使用。

为了解决刚才的问题和场景,我们就在思考如下的几个点:

1.不同中间件、不同语言的 SDK 能否风格统一?

2.各个下沉能力的交互协议能否统一?

3.我们的中间件下沉是面向组件还是面向能力?

4.底层的实现是否可以替换?

蚂蚁云原生应用运行时架构

从去年的 3 月份开始,经过内部的多轮讨论,以及对业界一些新理念的调研,我们提出了一个“云原生应用运行时”(下称运行时)的概念。顾名思义,我们希望这个运行时能够包含应用所关心的所有分布式能力,帮助开发人员快速的构建云原生应用,帮助应用和基础设施进一步解耦!

云原生应用运行时设计里核心的几个点如下:

第一,由于有了 MOSN 规模化落地的经验和配套的运维体系,我们决定基于 MOSN 内核去开发我们的云原生应用运行时。

第二,面向能力,而不是面向组件,统一定义出这个运行时的 API 能力。

第三,业务代码和 Runtime API 之间的交互采用统一的 gRPC 协议,这样的话,业务端侧可以直接通过 proto 文件去反向生成一个客户端,直接进行调用。

第四,能力后面对应的组件实现是可以替换的,例如注册服务的提供者可以是 SOFARegistry,也可以是 Nacos 或者 Zookeeper。

运行时能力抽象

为了抽象出云原生应用最需要的一些能力,我们先定了几个原则:

1.关注分布式应用所需的 API 和场景而不是组件; 2.API 符合直觉,开箱即用,约定优于配置; 3.API 不绑定实现,实现差异化使用扩展字段。

有了原则之后,我们就抽象出了三组 API,分别是应用调用运行时的 mosn.proto,运行时调用应用的 appcallback.proto,运行时运维相关的 actuator.proto。例如 RPC 调用、发消息、读缓存、读配置这些都属于应用到运行时的,而 RPC 收请求、收消息、接收任务调度这些属于运行时调应用的,其它监控检查、组件管理、流量控制这些则属于运行时运维相关的。

这三个 proto 的示例可以看下图:

运行时组件管控

另外一方面,为了实现运行时的实现可替换,我们也在 MOSN 提了两个概念,我们把一个个分布式能力称为 Service,然后有不同的 Component 去实现这个 Service,一个 Service 可以有多个组件实现它,一个组件可以实现多个 Service。例如图里的示例就是有“MQ-pub” 这个发消息的 Service 有 SOFAMQ 和 Kafka 两个 Component 去实现,而 Kafka Component 则实现了发消息和健康检查两个 Service。 当业务真正通过 gRPC 生成的客户端发起请求的时候,数据就会通过 gRPC 协议发送给 Runtime,并且分发到后面一个具体的实现上去。这样的话,应用只需要使用同一套 API,通过请求里的参数或者运行时的配置,就对接到不同的实现。

运行时和 Mesh 的对比

综上所述, 云原生应用运行时和刚才 Mesh 简单对比如下:

云原生应用运行时落地场景 从去年中开始研发,运行时目前在蚂蚁内部主要落地了下面几个场景。

异构技术栈接入

在蚂蚁,不同的语言的应用除了 RPC 服务治理、消息等的需求之外,还希望使用上蚂蚁统一的中间件等基础设施能力,Java 和 Nodejs 是有对应的 SDK 的,而其他语言是没有的对应的 SDK 的。有了应用运行时之后,这些异构语言就可以直接通过 gRPC Client 调用运行时,对接上蚂蚁的基础设施。

解除厂商绑定

刚才提到,蚂蚁的区块链、风控、智能客服、金融中台等等业务是既在主站部署,又有阿里云或者专有云部署的场景。有了运行时之后,应用可以一套代码和运行时一起出一个镜像,通过配置去决定调用哪个底层的实现,不跟具体的实现绑定。例如在蚂蚁内部对接的是 SOFARegistry 和 SOFAMQ 等产品,而到云上对接的是 Nacos、RocketMQ 等产品,到专有云对接的又是 Zookeeper、Kafka 等。这个场景我们正在落地当中。当然这个也可以用在遗留系统治理上,例如从 SOFAMQ 1.0 升级到 SOFAMQ 2.0,接了运行时的应用也无需升级。

FaaS 冷启预热池

FaaS 冷启预热池也是我们近期在探索的一个场景,大家知道 FaaS 里的 Function 在冷启的时候,是需要从创建 Pod 到下载 Function 再到启动的,这个过程会比较长。有了运行时之后,我们可以提前把 Pod 创建出来并启动好运行时,等到应用启动的时候其实已经非常简单的应用逻辑了,经过测试发现可以将从 5s 缩短 80% 到 1s。这个方向我们还会持续探索当中。

规划和展望

API 共建

运行时里最主要的一部分就是 API 的定义,为了落地内部,我们已经有一套较为完整的 API,但是我们也看到业界的很多产品有类似的诉求,例如 dapr、envoy 等等。所以接下来我们会去做的一件事情就是联合各个社区去推出一套大家都认可的云原生应用 API。

持续开源

另外我们近期也会将内部的运行时实践逐步开发出来,预计五六月份会发布 0.1 版本,并保持每月发布一个小版本的节奏,争取年底之前发布 1.0 版本。

总结

最后做一下小结:

1.Service Mesh 模式的引入是实现应用原云生的关键路径;

2.任何中间件兼可 Mesh 化,但研发效率问题依然部分存在;

3.Mesh 大规模落地是工程化的事情,需要完整的配套体系;

4.云原生应用运行时将是中间件等基础技术的未来形态,进一步解耦应用与分布式能力;

5.云原生应用运行时核心是 API,期望社区共建一个标准。

延伸阅读

源码解析 4层流量治理,tcp流量dump

· 阅读需 5 分钟

作者简介: 龚中强,是开源社区的爱好者,致力于拥抱开源。

写作时间: 2022年4月26日

Overview

此文档的目的在于分析 tcp 流量 dump 的实现

前提:

文档内容所涉及代码版本如下

https://github.com/mosn/layotto

Layotto 0e97e970dc504e0298017bd956d2841c44c0810b(main分支)

源码分析

代码均在: tcpcopy代码

model.go分析

此类是 tcpcopy 的配置对象的核心类

type DumpConfig struct {
Switch string `json:"switch"` // dump 开关.配置值:'ON' 或 'OFF'
Interval int `json:"interval"` // dump 采样间隔, 单位: 秒
Duration int `json:"duration"` // 单个采样周期, 单位: 秒
CpuMaxRate float64 `json:"cpu_max_rate"` // cpu 最大使用率。当超过此阈值,dump 功能将停止
MemMaxRate float64 `json:"mem_max_rate"` // mem 最大使用率。当超过此阈值,dump 功能将停止
}

type DumpUploadDynamicConfig struct {
Unique_sample_window string // 指定采样窗口
BusinessType _type.BusinessType // 业务类型
Port string // 端口
Binary_flow_data []byte // 二进制数据
Portrait_data string // 用户上传的数据
}

persistence.go分析

此类是 tcpcopy 的 dump 持久化核心处理类

// 该方法在 tcpcopy.go 中 OnData 中调用
func IsPersistence() bool {
// 判断 dump 开关是否开启
if !strategy.DumpSwitch {
if log.DefaultLogger.GetLogLevel() >= log.DEBUG {
log.DefaultLogger.Debugf("%s the dump switch is %t", model.LogDumpKey, strategy.DumpSwitch)
}
return false
}

// 判断是否在采样窗口中
if atomic.LoadInt32(&strategy.DumpSampleFlag) == 0 {
if log.DefaultLogger.GetLogLevel() >= log.DEBUG {
log.DefaultLogger.Debugf("%s the dump sample flag is %d", model.LogDumpKey, strategy.DumpSampleFlag)
}
return false
}

// 判断是否 dump 功能停止(获取系统负载判断处理器和内存是否超过 tcpcopy 的阈值,如果超过则停止)
if !strategy.IsAvaliable() {
if log.DefaultLogger.GetLogLevel() >= log.DEBUG {
log.DefaultLogger.Debugf("%s the system usages are beyond max rate.", model.LogDumpKey)
}
return false
}

return true
}

// 根据配置信息持久化数据
func persistence(config *model.DumpUploadDynamicConfig) {
// 1.持久化二进制数据
if config.Binary_flow_data != nil && config.Port != "" {
if GetTcpcopyLogger().GetLogLevel() >= log.INFO {
GetTcpcopyLogger().Infof("[%s][%s]% x", config.Unique_sample_window, config.Port, config.Binary_flow_data)
}
}
if config.Portrait_data != "" && config.BusinessType != "" {
// 2. 持久化用户定义的数据
if GetPortraitDataLogger().GetLogLevel() >= log.INFO {
GetPortraitDataLogger().Infof("[%s][%s][%s]%s", config.Unique_sample_window, config.BusinessType, config.Port, config.Portrait_data)
}

// 3. 增量持久化内存中的配置信息的变动内容
buf, err := configmanager.DumpJSON()
if err != nil {
if log.DefaultLogger.GetLogLevel() >= log.DEBUG {
log.DefaultLogger.Debugf("[dump] Failed to load mosn config mem.")
}
return
}
// 3.1. 如果数据变化则 dump
tmpMd5ValueOfMemDump := common.CalculateMd5ForBytes(buf)
memLogger := GetMemLogger()
if tmpMd5ValueOfMemDump != md5ValueOfMemDump ||
(tmpMd5ValueOfMemDump == md5ValueOfMemDump && common.GetFileSize(getMemConfDumpFilePath()) <= 0) {
md5ValueOfMemDump = tmpMd5ValueOfMemDump
if memLogger.GetLogLevel() >= log.INFO {
memLogger.Infof("[%s]%s", config.Unique_sample_window, buf)
}
} else {
if memLogger.GetLogLevel() >= log.INFO {
memLogger.Infof("[%s]%+v", config.Unique_sample_window, incrementLog)
}
}
}
}

tcpcopy.go分析

此类为是 tcpcopy 的核心类。

// 向 Mosn 注册 NetWork 
func init() {
api.RegisterNetwork("tcpcopy", CreateTcpcopyFactory)
}

// 返回 tcpcopy 工厂
func CreateTcpcopyFactory(cfg map[string]interface{}) (api.NetworkFilterChainFactory, error) {
tcpConfig := &config{}
// dump 策略转静态配置
if stg, ok := cfg["strategy"]; ok {
...
}
// TODO extract some other fields
return &tcpcopyFactory{
cfg: tcpConfig,
}, nil
}

// 供 pkg/configmanager/parser.go 调用添加或者更新Network filter factory
func (f *tcpcopyFactory) Init(param interface{}) error {
// 设置监听的地址和端口配置
...
return nil
}

// 实现的是 ReadFilter 的 OnData 接口,每次从连接拿到数据都进方法进行处理
func (f *tcpcopyFactory) OnData(data types.IoBuffer) (res api.FilterStatus) {
// 判断当前请求数据是否需要采样 dump
if !persistence.IsPersistence() {
return api.Continue
}

// 异步的采样 dump
config := model.NewDumpUploadDynamicConfig(strategy.DumpSampleUuid, "", f.cfg.port, data.Bytes(), "")
persistence.GetDumpWorkPoolInstance().Schedule(config)
return api.Continue
}

最后我们再来回顾一下整体流程走向:

  1. 从 tcpcopy.go 的初始化函数init() 开始,程序向 CreateGRPCServerFilterFactory 传入 CreateTcpcopyFactory.

  2. Mosn 创建出一个filter chain(代码位置factory.go) ,通过循环调用CreateFilterChain将所有的filter加入到链路结构包括本文的 tcpcopy.

  3. 当流量通过 mosn 将会进入到 tcpcopy.go 的 OnData 方法进行 tcpdump 的逻辑处理.

MOSN 子项目 Layotto:开启服务网格+应用运行时新篇章

· 阅读需 32 分钟

作者简介: 马振军,花名古今,在基础架构领域耕耘多年,对 Service Mesh 有深度实践经验,目前在蚂蚁集团中间件团队负责 MOSN、Layotto 等项目的开发工作。 Layotto 官方 GitHub 地址:https://github.com/mosn/layotto

点击链接即可观看现场视频:https://www.bilibili.com/video/BV1hq4y1L7FY/

Service Mesh 在微服务领域已经非常流行,越来越多的公司开始在内部落地,蚂蚁从 Service Mesh 刚出现的时候开始,就一直在这个方向上大力投入,到目前为止,内部的 Mesh 方案已经覆盖数千个应用、数十万容器并且经过了多次大促考验,Service Mesh 带来的业务解耦,平滑升级等优势大大提高了中间件的迭代效率。

在大规模落地以后,我们又遇到了新的问题,本文主要对 Service Mesh 在蚂蚁内部落地情况进行回顾总结,并分享对 Service Mesh 落地后遇到的新问题的解决方案。

一、Service Mesh 回顾与总结

A、Service Mesh 的初衷

在微服务架构下,基础架构团队一般会为应用提供一个封装了各种服务治理能力的 SDK,这种做法虽然保障了应用的正常运行,但缺点也非常明显,每次基础架构团队迭代一个新功能都需要业务方参与升级才能使用,尤其是 bugfix 版本,往往需要强推业务方升级,这里面的痛苦程度每一个基础架构团队成员都深有体会。

伴随着升级的困难,随之而来的就是应用使用的 SDK 版本差别非常大,生产环境同时跑着各种版本的 SDK,这种现象又会让新功能的迭代必须考虑各种兼容,就好像带着枷锁前进一般,这样随着不断迭代,会让代码维护非常困难,有些祖传逻辑更是一不小心就会掉坑里。

同时这种“重”SDK 的开发模式,导致异构语言的治理能力非常薄弱,如果想为各种编程语言都提供一个功能完整且能持续迭代的 SDK 其中的成本可想而知。

18 年的时候,Service Mesh 在国内持续火爆,这种架构理念旨在把服务治理能力跟业务解耦,让两者通过进程级别的通信方式进行交互。在这种架构模式下,服务治理能力从应用中剥离,运行在独立的进程中,迭代升级跟业务进程无关,这就可以让各种服务治理能力快速迭代,并且由于升级成本低,因此每个版本都可以全部升级,解决了历史包袱问题,同时 SDK 变“轻”直接降低了异构语言的治理门槛,再也不用为需要给各个语言开发相同服务治理能力的 SDK 头疼了。

B、Service Mesh 落地现状

蚂蚁很快意识到了 Service Mesh 的价值,全力投入到这个方向,用 Go 语言开发了 MOSN 这样可以对标 envoy 的优秀数据面,全权负责服务路由,负载均衡,熔断限流等能力的建设,大大加快了公司内部落地 Service Mesh 的进度。

现在 MOSN 在蚂蚁内部已经覆盖了数千个应用、数十万容器,新创建的应用默认接入 MOSN,形成闭环。而且在大家最关心的资源占用、性能损耗方面 MOSN 也交出了一份让人满意的答卷:

  1. RT 小于 0.2ms

  2. CPU 占用增加 0%~2%

  3. 内存消耗增长小于 15M

由于 Service Mesh 降低了异构语言的服务治理门槛,NodeJS、C++等异构技术栈也在持续接入到 MOSN 中。

在看到 RPC 能力 Mesh 化带来的巨大收益之后,蚂蚁内部还把 MQ,Cache,Config 等中间件能力都进行了 Mesh 化改造,下沉到 MOSN,提高了中间件产品整体的迭代效率。

C、新的挑战

  1. 应用跟基础设施强绑定

一个现代分布式应用,往往会同时依赖 RPC、Cache、MQ、Config 等各种分布式能力来完成业务逻辑的处理。

当初看到 RPC 下沉的红利以后,其他各种能力也都快速下沉。初期,大家都会以自己最熟悉的方式来开发,这就导致没有统一的规划管理,如上图所示,应用依赖了各种基础设施的 SDK,而每种 SDK 又以自己特有的方式跟 MOSN 进行交互,使用的往往都是由原生基础设施提供的私有协议,这直接导致了复杂的中间件能力虽然下沉,但应用本质上还是被绑定到了基础设施,比如想把缓存从 Redis 迁移到 Memcache 的话,仍旧需要业务方升级 SDK,这种问题在应用上云的大趋势下表现的更为突出,试想一下,如果一个应用要部署在云上,由于该应用依赖了各种基础设施,势必要先把整个基础设施搬到云上才能让应用顺利部署,这其中的成本可想而知。 因此如何让应用跟基础设施解绑,使其具备可移植能力,能够无感知跨平台部署是我们面临的第一个问题。

  1. 异构语言接入成本高

事实证明 Service Mesh 确实降低了异构语言的接入门槛,但在越来越多的基础能力下沉到 MOSN 以后,我们逐渐意识到为了让应用跟 MOSN 交互,各种 SDK 里都需要对通信协议,序列化协议进行开发,如果再加上需要对各种异构语言都提供相同的功能,那维护难度就会成倍上涨,

Service Mesh 让重 SDK 成为了历史,但对于现在各种编程语言百花齐放、各种应用又强依赖基础设施的场景来说,我们发现现有的 SDK 还不够薄,异构语言接入的门槛还不够低,如何进一步降低异构语言的接入门槛是我们面临的第二个问题。

二、Multi Runtime 理论概述

A、什么是 Runtime?

20 年初的时候,Bilgin lbryam 发表了一篇名为 Multi-Runtime Microservices Architecture 的文章,里面对微服务架构下一阶段的形态进行了讨论。

如上图所示,作者把分布式服务的需求进行了抽象,总共分为了四大类:

  1. 生命周期(Lifecycle) 主要指应用的编译、打包、部署等事情,在云原生的大趋势下基本被 docker、kubernetes 承包。

  2. 网络(Networking) 可靠的网络是微服务之间进行通信的基本保障,Service Mesh 正是在这方面做了尝试,目前 MOSN、envoy 等流行的数据面的稳定性、实用性都已经得到了充分验证。

  3. 状态(State) 分布式系统需要的服务编排,工作流,分布式单例,调度,幂等性,有状态的错误恢复,缓存等操作都可以统一归为底层的状态管理。

  4. 绑定(Binding) 在分布式系统中,不仅需要跟其他系统通信,还需要集成各种外部系统,因此对于协议转换,多种交互模型、错误恢复流程等功能也都有强依赖。

明确了需求以后,借鉴了 Service Mesh 的思路,作者对分布式服务的架构演进进行了如下总结:

第一阶段就是把各种基础设施能力从应用中剥离解耦,通通变成独立 sidecar 模型伴随着应用一起运行。

第二阶段是把各种 sidecar 提供的能力统一抽象成若干个 Runtime,这样应用从面向基础组件开发就演变成了面向各种分布式能力开发,彻底屏蔽掉了底层实现细节,而且由于是面向能力,除了调用提供各种能力的 API 之外,应用再也不需要依赖各种各样基础设施提供的 SDK 了。

作者的思路跟我们希望解决的问题一致,我们决定使用 Runtime 的理念来解决 Service Mesh 发展到现在所遇到的新问题。

B、Service Mesh vs Runtime

为了让大家对 Runtime 有一个更加清晰的认识,上图针对 Service Mesh 跟 Runtime 两种理念的定位、交互方式、通信协议以及能力丰富度进行了总结,可以看到相比 Service Mesh 而言,Runtime 提供了语义明确、能力丰富的 API,可以让应用跟它的交互变得更加简单直接。

三、MOSN 子项目 Layotto

A、dapr 调研

dapr 是社区中一款知名的 Runtime 实现产品,活跃度也比较高,因此我们首先调研了 dapr 的情况,发现 dapr 具有如下优势:

  1. 提供了多种分布式能力,API 定义清晰,基本能满足一般的使用场景。

  2. 针对各种能力都提供了不同的实现组件,基本涵盖了常用的中间件产品,用户可以根据需要自由选择。

当考虑如何在公司内部落地 dapr 时,我们提出了两种方案,如上图所示:

  1. 替换:废弃掉现在的 MOSN,用 dapr 进行替换,这种方案存在两个问题:

a. dapr 虽然提供了很多分布式能力,但目前并不具备 Service Mesh 包含的丰富的服务治理能力。

b. MOSN 在公司内部已经大规模落地,并且经过了多次大促考验,直接用 dapr 来替换 MOSN 稳定性有待验证。

  1. 共存:新增一个 dapr 容器,跟 MOSN 以两个 sidecar 的模式进行部署。这种方案同样存在两个问题:

a. 引入一个新的 sidecar,我们就需要考虑它配套的升级、监控、注入等等事情,运维成本飙升。

b. 多维护一个容器意味着多了一层挂掉的风险,这会降低现在的系统可用性。

同样的,如果你目前正在使用 envoy 作为数据面,也会面临上述问题。 因此我们希望把 Runtime 跟 Service Mesh 两者结合起来,通过一个完整的 sidecar 进行部署,在保证稳定性、运维成本不变的前提下,最大程度复用现有的各种 Mesh 能力。此外我们还希望这部分 Runtime 能力除了跟 MOSN 结合起来之外,未来也可以跟 envoy 结合起来,解决更多场景中的问题,Layotto 就是在这样的背景下诞生。

B、Layotto 架构

如上图所示,Layotto 是构建在 MOSN 之上,在下层对接了各种基础设施,向上层应用提供了统一的,具有各种各样分布式能力的标准 API。对于接入 Layotto 的应用来说,开发者不再需要关心底层各种组件的实现差异,只需要关注应用需要什么样的能力,然后调用对应能力的 API 即可,这样可以彻底跟底层基础设施解绑。

对应用来说,交互分为两块,一个是作为 gRPC Client 调用 Layotto 的标准 API,一个是作为 gRPC Server 来实现 Layotto 的回调,得利于gRPC 优秀的跨语言支持能力,应用不再需要关心通信、序列化等细节问题,进一步降低了异构技术栈的使用门槛。

除了面向应用,Layotto 也向运维平台提供了统一的接口,这些接口可以把应用跟 sidecar 的运行状态反馈给运维平台,方便 SRE 同学及时了解应用的运行状态并针对不同状态做出不同的举措,该功能考虑到跟 k8s 等已有的平台集成,因此我们提供了 HTTP 协议的访问方式。

除了 Layotto 本身设计以外,项目还涉及两块标准化建设,首先想要制定一套语义明确,适用场景广泛的 API 并不是一件容易的事情,为此我们跟阿里、 dapr 社区进行了合作,希望能够推进 Runtime API 标准化的建设,其次对于 dapr 社区已经实现的各种能力的 Components 来说,我们的原则是优先复用、其次开发,尽量不把精力浪费在已有的组件上面,重复造轮子。

最后 Layotto 目前虽然是构建在 MOSN 之上,未来我们希望 Layotto 可以跑在 envoy 上,这样只要应用接入了 Service Mesh,无论数据面使用的是 MOSN 还是 envoy,都可以在上面增加 Runtime能力。

C、Layotto 的移植性

如上图所示,一旦完成 Runtime API 的标准化建设,接入 Layotto 的应用天然具备了可移植性,应用不需要任何改造就可以在私有云以及各种公有云上部署,并且由于使用的是标准 API,应用也可以无需任何改造就在 Layotto 跟 dapr 之间自由切换。

D、名字含义

从上面的架构图可以看出,Layotto 项目本身是希望屏蔽基础设施的实现细节,向上层应用统一提供各种分布式能力,这种做法就好像是在应用跟基础设施之间加了一层抽象,因此我们借鉴了 OSI 对网络定义七层模型的思路,希望 Layotto 可以作为第八层对应用提供服务,otto 是意大利语中8的意思,Layer otto 就是第八层的意思,简化了一下变成了 Layotto,同时项目代号 L8,也是第八层的意思,这个代号也是设计我们项目 LOGO 时灵感的来源。

介绍完项目的整体情况,下面对其中四个主要功能的实现细节进行说明。

E、配置原语

首先是分布式系统中经常使用的配置功能,应用一般使用配置中心来做开关或者动态调整应用的运行状态。Layotto 中配置模块的实现包括两部分,一个是对如何定义配置这种能力的 API 的思考,一个是具体的实现,下面逐个来看。

想要定义一个能满足大部分实际生产诉求的配置 API 并不是一件容易的事,dapr 目前也缺失这个能力,因此我们跟阿里以及 dapr 社区一起合作,为如何定义一版合理的配置 API 进行了激烈讨论。

目前讨论结果还没有最终确定,因此 Layotto 是基于我们提给社区的第一版草案进行实现,下面对我们的草案进行简要说明。

我们先定义了一般配置所需的基本元素:

  1. appId:表示配置属于哪个应用

  2. key:配置的 key

  3. content:配置的值

  4. group:配置所属的分组,如果一个 appId 下面的配置过多,我们可以给这些配置进行分组归类,便于维护。

此外我们追加了两种高级特性,用来适配更加复杂的配置使用场景:

  1. label,用于给配置打标签,比如该配置属于哪个环境,在进行配置查询的时候,我们会使用 label + key 来查询配置。

  2. tags,用户给配置追加的一些附加信息,如描述信息、创建者信息,最后修改时间等等,方便配置的管理,审计等。

对于上述定义的配置 API 的具体实现,目前支持查询、订阅、删除、创建、修改五种操作,其中订阅配置变更后的推送使用的是 gRPC 的 stream 特性,而底层实现这些配置能力的组件,我们选择了国内流行的 apollo,后面也会根据需求增加其他实现。

F、Pub/Sub 原语

对于 Pub/Sub 能力的支持,我们调研了 dapr 现在的实现,发现基本上已经可以满足我们的需求,因此我们直接复用了 dapr 的 API 以及 components,只是在 Layotto 里面做了适配,这为我们节省了大量的重复劳动,我们希望跟 dapr 社区保持一种合作共建的思路,而不是重复造轮子。

其中 Pub 功能是 App 调用 Layotto 提供的 PublishEvent 接口,而 Sub 功能则是应用通过 gRPC Server 的形式实现了 ListTopicSubscriptions 跟 OnTopicEvent 两个接口,一个用来告诉 Layotto 应用需要订阅哪些 topic,一个用于接收 topic 变化时 Layotto 的回调事件。

dapr 对于 Pub/Sub 的定义基本满足我们的需求,但在某些场景下仍有不足,dapr 采用了 CloudEvent 标准,因此 Pub 接口没有返回值,这无法满足我们生产场景中要求 Pub 消息以后服务端返回对应的 messageID 的需求,这一点我们已经把需求提交给了 dapr 社区,还在等待反馈,考虑到社区异步协作的机制,我们可能会先社区一步增加返回结果,然后再跟社区探讨一种更好的兼容方案。

G、RPC 原语

RPC 的能力大家不会陌生,这可能是微服务架构下最最基础的需求,对于 RPC 接口的定义,我们同样参考了 dapr 社区的定义,发现完全可以满足我们的需求,因此接口定义就直接复用 dapr 的,但目前 dapr 提供的 RPC 实现方案还比较薄弱,而 MOSN 经过多年迭代,能力已经非常成熟完善,因此我们大胆把 Runtime 跟 Service Mesh 两种思路结合在一起,把 MOSN 本身作为我们实现 RPC 能力的一个 Component,这样 Layotto 在收到 RPC 请求以后交给 MOSN 进行实际数据传输,这种方案可以通过 istio 动态改变路由规则,降级限流等等设置,相当于直接复用了 Service Mesh 的各种能力,这也说明 Runtime 不是要推翻 Service Mesh,而是要在此基础上继续向前迈一步。

具体实现细节上,为了更好的跟 MOSN 融合,我们在 RPC 的实现上面加了一层 Channel,默认支持dubbo,bolt,http 三种常见的 RPC 协议,如果仍然不能满足用户场景,我们还追加了 Before/After 两种 Filter,可以让用户做自定义扩展,实现协议转换等需求。

H、Actuator 原语

在实际生产环境中,除了应用所需要的各种分布式能力以外,PaaS 等运维平台往往需要了解应用的运行状态,基于这种需求,我们抽象了一套 Actuator 接口,目前 dapr 还没有提供这方面的能力,因此我们根据内部的需求场景进行了设计,旨在把应用在启动期、运行期等阶段各种各样的信息暴露出去,方便 PaaS 了解应用的运行情况。

Layotto 把暴露信息分为两大类:

  1. Health:该模块判断应用当前运行状态是否健康,比如某个强依赖的组件如果初始化失败就需要表示为非健康状态,而对于健康检查的类型我们参考了 k8s,分为:

a. Readiness:表示应用启动完成,可以开始处理请求。

b. Liveness:表示应用存活状态,如果不存活则需要切流等。

  1. Info:该模块预期会暴露应用的一些依赖信息出去,如应用依赖的服务,订阅的配置等等,用于排查问题。

Health 对外暴露的健康状态分为以下三种:

  1. INIT:表示应用还在启动中,如果应用发布过程中返回该值,这个时候 PaaS 平台应该继续等待应用完成启动。

  2. UP:表示应用启动正常,如果应用发布过程中返回该值,意味着 PasS 平台可以开始放入流量。

  3. DOWN:表示应用启动失败,如果应用发布过程中返回该值,意味着 PaaS 需要停止发布并通知应用 owner。

到这里关于 Layotto 目前在 Runtime 方向上的探索基本讲完了,我们通过定义明确语义的 API,使用 gRPC 这种标准的交互协议解决了目前面临的基础设施强绑定、异构语言接入成本高两大问题。随着未来 API 标准化的建设,一方面可以让接入 Layotto 的应用无感知的在各种私有云、公有云上面部署,另一方面也能让应用在 Layotto,dapr 之间自由切换,提高研发效率。

目前 Serverless 领域也是百花齐放,没有一种统一的解决方案,因此 Layotto 除了在上述 Runtime 方向上的投入以外,还在 Serverless 方向上也进行了一些尝试,下面就尝试方案进行介绍。

四、WebAssembly 的探索

A、WebAssembly 简介

WebAssembly,简称 WASM,是一个二进制指令集,最初是跑在浏览器上来解决 JavaScript 的性能问题,但由于它良好的安全性,隔离性以及语言无关性等优秀特性,很快人们便开始让它跑在浏览器之外的地方,随着 WASI 定义的出现,只需要一个 WASM 运行时,就可以让 WASM 文件随处执行。

既然 WebAssembly 可以在浏览器以外的地方运行,那么我们是否能把它用在 Serverless 领域?目前已经有人在这方面做了一些尝试,不过如果这种方案真的想落地的话,首先要考虑的就是如何解决运行中的 WebAssembly 对各种基础设施的依赖问题。

B、WebAssembly 落地原理

目前 MOSN 通过集成 WASM Runtime 的方式让 WASM 跑在 MOSN 上面,以此来满足对 MOSN 做自定义扩展的需求。同时,Layotto 也是构建在 MOSN 之上,因此我们考虑把二者结合在一起,实现方案如下图所示:

开发者可以使用 Go/C++/Rust 等各种各样自己喜欢的语言来开发应用代码,然后把它们编译成 WASM 文件跑在 MOSN 上面,当 WASM 形态的应用在处理请求的过程中需要依赖各种分布式能力时就可以通过本地函数调用的方式调用 Layotto 提供的标准 API,这样直接解决了 WASM 形态应用的依赖问题。

目前 Layotto 提供了 Go 跟 Rust 版 WASM 的实现,虽然只支持 demo 级功能,但已经足够让我们看到这种方案的潜在价值。

此外,WASM 社区目前还处于初期阶段,有很多地方需要完善,我们也给社区提交了一些 PR共同建设,为 WASM 技术的落地添砖加瓦。

C、WebAssembly 落地展望

虽然现在 Layotto 中对 WASM 的使用还处于试验阶段,但我们希望它最终可以成为 Serverless 的一种实现形态,如上图所示,应用通过各种编程语言开发,然后统一编译成 WASM 文件,最后跑在 Layotto+MOSN 上面,而对于应用的运维管理统一由 k8s、docker、prometheus 等产品负责。

五、社区规划

最后来看下 Layotto 在社区的做的一些事情。

A、Layotto vs Dapr

上图列出了 Layotto 跟 dapr 现有的能力对比,在 Layotto 的开发过程中,我们借鉴 dapr 的思路,始终以优先复用、其次开发为原则,旨在达成共建的目标,而对于正在建设或者未来要建设的能力来说,我们计划优先在 Layotto 上落地,然后再提给社区,合并到标准 API,鉴于社区异步协作的机制,沟通成本较高,因此短期内可能 Layotto 的 API 会先于社区,但长期来看一定会统一。

B、API 共建计划

关于如何定义一套标准的 API 以及如何让 Layotto 可以跑在 envoy 上等等事项,我们已经在各个社区进行了深入讨论,并且以后也还会继续推进。

C、Roadmap

Layotto 在目前主要支持 RPC、Config、Pub/Sub、Actuator 四大功能,预计在九月会把精力投入到分布式锁、State、可观测性上面,十二月份会支持 Layotto 插件化,也就是让它可以跑在 envoy 上,同时希望对 WebAssembly 的探索会有进一步的产出。

D、正式开源

前面详细介绍了 Layotto 项目,最重要的还是该项目今天作为 MOSN 的子项目正式开源,我们提供了详细的文档以及 demo 示例方便大家快速上手体验。

对于 API 标准化的建设是一件需要长期推动的事情,同时标准化意味着不是满足一两种场景,而是尽可能的适配大多数使用场景,为此我们希望更多的人可以参与到 Layotto 项目中,描述你的使用场景,讨论 API 的定义方案,一起提交给社区,最终达成 Write once, Run anywhere 的终极目标!

源码解析 7层流量治理,接口限流

· 阅读需 3 分钟

作者简介: 张晨,是开源社区的爱好者,致力于拥抱开源,希望能和社区的各位开源爱好者互相交流互相进步和成长。

写作时间: 2022年4月20日

Overview

此文档的目的在于分析接口限流的实现

前提:

文档内容所涉及代码版本如下

https://github.com/mosn/mosn

Mosn d11b5a638a137045c2fbb03d9d8ca36ecc0def11(develop分支)

源码分析

总体分析

参考
https://mosn.io/docs/concept/extensions/

Mosn 的 Stream Filter 扩展机制

01.png

代码均在: flowcontrol代码

stream_filter_factory.go分析

此类为一个工厂类,用于创建 StreamFilter

定义了一些常量用作默认值

02.png

定义了限流配置类用作加载yaml定义并且解析生产出对应的功能

03.png

init() 初始化内部就是将 name 和 对应构造函数存储到 filter拦截工厂的map中

04.png

着重讲一下 createRpcFlowControlFilterFactory 生产出rpc流控工厂

05.png

在查看streamFilter之前我们来看看工厂类是如何生产出限流器的

06.png

限流器加入到限流链路结构中按照设定顺序依次生效。

CreateFilterChain 方法将多个filter 加入到链路结构中

07.png

我们可以看到有各种各样的工厂类包括我们今天研究的限流工厂类实现了此接口

08.png

Stream_filter.go分析

09.png

整体流程:

最后我们再来回顾一下整体流程走向:

  1. 从stream_filter_factory.go的初始化函数init() 开始,程序向creatorStreamFactory(map类型)插入了 createRpcFlowControlFilterFactory.

  2. Mosn 创建出一个filter chain(代码位置factory.go) ,通过循环调用CreateFilterChain将所有的filter加入到链路结构包括我们今天的主人公限流器.

  3. 创建限流器 NewStreamFilter().

  4. 当流量通过mosn 将会进入到限流器的方法 OnReceive() 中并最终借助sentinel实现限流逻辑(是否已经达到阈值,是放行流量还是拦截流量, StreamFilterStop or StreamFilterContinue).

Layotto 源码解析 —— 处理 RPC 请求

· 阅读需 24 分钟

本文主要以 Dubbo Json RPC 为例来分析 Layotto RPC 处理流程。

作者:王志龙 | 2022年4月21日

概述

Layotto 作为区别于网络代理 Service Mesh 的分布式原语集合且使用标准协议的 Runtime,具有明确和丰富的语义 API,而 RPC API 就是众多 API 中的一种。通过 RPC API 应用程序开发者可以通过与同样使用 Sidecar 架构的应用本地 Layotto 实例进行交互,从而间接的调用不同服务的方法,并可以利用内置能力完成分布式追踪和诊断,流量调控,错误处理,安全链路等操作。并且 Layotto 的 RPC API 基于 Mosn 的 Grpc handler 设计,除了 Http/Grpc,与其它服务通信时还可以利用Mosn的多协议机制,使用 X-Protocol 协议进行安全可靠通信。如下代码所示,RPC API 的接口与 Dapr 一致,通过 Grpc 接口 InvokeService 即可进行 RPC 调用。

type DaprClient interface {
// Invokes a method on a remote Dapr app.
InvokeService(ctx context.Context, in *InvokeServiceRequest, opts ...grpc.CallOption) (*v1.InvokeResponse, error)
...
}

源码分析

为了便于理解,这里从外到内,再从内到外,由数据流转映射到源代码,也就是从Client发起请求,穿越一层一层的逻辑,到 Server 收到请求返回响应,再一层层的回到 Client 收到响应,一层层来分析 Layotto 的 RPC 流程,总共拆分成十步。另外因为 Grpc Client 和 Server 握手及交互相关的内容不是本文重点,所以分析的相对简略一些,其它步骤内容相对详细一些,大家也可以根据自己的情况直接从目录跳转到相应步骤。

备注:本文基于 commit hash:1d2bed68c3b2372c34a12aeed41be125a4fdd15a

0x00 Layotto 初始化 RPC

Layotto 启动流程涉及众多本流程,在此只分析下跟 RPC 相关的及下述流程用的初始化,因为 Layotto 是建立在 Mosn 之上,所以从 Main 函数出发,urfave/cli 库会调用 Mosn 的 StageManager 初始化 Mosn, 进而在 Mosn NetworkFilter 中初始化 GrpcServer,具体流程如下。

mosn.io/mosn/pkg/stagemanager.(*StageManager).runInitStage at stage_manager.go
=>
mosn.io/mosn/pkg/mosn.(*Mosn).initServer at mosn.go
=>
mosn.io/mosn/pkg/filter/network/grpc.(*grpcServerFilterFactory).Init at factory.go
=>
mosn.io/mosn/pkg/filter/network/grpc.(*Handler).New at factory.go
// 新建一个带有地址的 Grpc 服务器。同一个地址返回同一个服务器,只能启动一次
func (s *Handler) New(addr string, conf json.RawMessage, options ...grpc.ServerOption) (*registerServerWrapper, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
sw, ok := s.servers[addr]
if ok {
return sw, nil
}
ln, err := NewListener(addr)
if err != nil {
log.DefaultLogger.Errorf("create a listener failed: %v", err)
return nil, err
}
// 调用 NewRuntimeGrpcServer
srv, err := s.f(conf, options...)
if err != nil {
log.DefaultLogger.Errorf("create a registered server failed: %v", err)
return nil, err
}
sw = &registerServerWrapper{
server: srv,
ln: ln,
}
s.servers[addr] = sw
return sw, nil
}
=
main.NewRunvtimeGrpcServer at main.go
=>
mosn.io/layotto/pkg/runtime.(*MosnRuntime).initRuntime at runtime.go
=>
mosn.io/layotto/pkg/runtime.(*MosnRuntime).initRpcs at runtime.go
=>
mosn.io/layotto/components/rpc/invoker/mosn.(*mosnInvoker).Init at mosninvoker.go
func (m *mosnInvoker) Init(conf rpc.RpcConfig) error {
var config mosnConfig
if err := json.Unmarshal(conf.Config, &config); err != nil {
return err
}

// 初始化 RPC 调用前的 Filter
for _, before := range config.Before {
m.cb.AddBeforeInvoke(before)
}

// 初始化 RPC 调用后的 Filter
for _, after := range config.After {
m.cb.AddAfterInvoke(after)
}

if len(config.Channel) == 0 {
return errors.New("missing channel config")
}

// 初始化与 Mosn 通信使用的通道、协议及对应端口
channel, err := channel.GetChannel(config.Channel[0])
if err != nil {
return err
}
m.channel = channel
return nil
}
...
// 完成一些列初始化后在 grpcServerFilter 中启动 Grpc Server
mosn.io/mosn/pkg/filter/network/grpc.(*grpcServerFilterFactory).Init at factory.go
func (f *grpcServerFilterFactory) Init(param interface{}) error {
...
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(f.UnaryInterceptorFilter),
grpc.StreamInterceptor(f.StreamInterceptorFilter),
}
// 经过上述初始化,完成 Grpc registerServerWrapper 的初始化
sw, err := f.handler.New(addr, f.config.GrpcConfig, opts...)
if err != nil {
return err
}
// 启动 Grpc sever
sw.Start(f.config.GracefulStopTimeout)
f.server = sw
log.DefaultLogger.Debugf("grpc server filter initialized success")
return nil
}
...
// StageManager 在 runInitStage 之后进入 runStartStage 启动 Mosn
func (stm *StageManager) runStartStage() {
st := time.Now()
stm.SetState(Starting)
for _, f := range stm.startupStages {
f(stm.app)
}

stm.wg.Add(1)
// 在所有启动阶段完成后启动 Mosn
stm.app.Start()
...
}

0x01 Dubbo-go-sample client 发起请求

根据 Dubbo Json Rpc Example 例子运行如下命令

go run demo/rpc/dubbo_json_rpc/dubbo_json_client/client.go -d '{"jsonrpc":"2.0","method":"GetUser","params":["A003"],"id":9527}'

使用 Layotto 对 App 提供的 Grpc API InvokeService 发起 RPC 调用,经过数据填充和连接建立等流程,最终通过 Grpc clientStream 中调用 SendMsg 向 Layotto 发送数据,具体流程如下。


func main() {
data := flag.String("d", `{"jsonrpc":"2.0","method":"GetUser","params":["A003"],"id":9527}`, "-d")
flag.Parse()

conn, err := grpc.Dial("localhost:34904", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}

cli := runtimev1pb.NewRuntimeClient(conn)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
// 通过 Grpc 接口 InvokeService 进行 RPC 调用
resp, err := cli.InvokeService(
ctx,
// 使用 runtimev1pb.InvokeServiceRequest 发起 Grpc 请求
&runtimev1pb.InvokeServiceRequest{
// 要请求的 server 接口 ID
Id: "org.apache.dubbo.samples.UserProvider",
Message: &runtimev1pb.CommonInvokeRequest{
// 要请求的接口对应的方法名
Method: "GetUser",
ContentType: "",
Data: &anypb.Any{Value: []byte(*data)},
HttpExtension: &runtimev1pb.HTTPExtension{Verb: runtimev1pb.HTTPExtension_POST},
},
},
)
if err != nil {
log.Fatal(err)
}

fmt.Println(string(resp.Data.GetValue()))
}
=>
mosn.io/layotto/spec/proto/runtime/v1.(*runtimeClient).InvokeService at runtime.pb.go
=>
google.golang.org/grpc.(*ClientConn).Invoke at call.go
=>
google.golang.org/grpc.(*clientStream).SendMsg at stream.go
=>
google.golang.org/grpc.(*csAttempt).sendMsg at stream.go
=>
google.golang.org/grpc/internal/transport.(*http2Client).Write at http2_client.go

0x02 Mosn EventLoop 读协程处理请求数据

上文说过 Layotto 的内核相当于是 Mosn,所以当网络连接数据到达时,会先到 Mosn 的 L4 网络层进行读写,具体流程如下。

mosn.io/mosn/pkg/network.(*listener).accept at listener.go
=>
mosn.io/mosn/pkg/server.(*activeListener).OnAccept at handler.go
=>
mosn.io/mosn/pkg/server.(*activeRawConn).ContinueFilterChain at handler.go
=>
mosn.io/mosn/pkg/server.(*activeListener).newConnection at handler.go
=>
mosn.io/mosn/pkg/network.(*connection).Start at connection.go
=>
mosn.io/mosn/pkg/network.(*connection).startRWLoop at connection.go
func (c *connection) startRWLoop(lctx context.Context) {
c.internalLoopStarted = true

utils.GoWithRecover(func() {
// 读协程
c.startReadLoop()
}, func(r interface{}) {
c.Close(api.NoFlush, api.LocalClose)
})

if c.checkUseWriteLoop() {
c.useWriteLoop = true
utils.GoWithRecover(func() {
// 写协程
c.startWriteLoop()
}, func(r interface{}) {
c.Close(api.NoFlush, api.LocalClose)
})
}
}

在 startRWLoop 方法中我们可以看到会分别开启两个协程来分别处理该连接上的读写操作,即 startReadLoop 和 startWriteLoop,在 startReadLoop 中经过如下流转,把网络层读到的数据,由 filterManager 过滤器管理器把数据交由过滤器链进行处理,具体流程如下。

mosn.io/mosn/pkg/network.(*connection).doRead at connection.go
=>
mosn.io/mosn/pkg/network.(*connection).onRead at connection.go
=>
mosn.io/mosn/pkg/network.(*filterManager).OnRead at filtermanager.go
=>
mosn.io/mosn/pkg/network.(*filterManager).onContinueReading at filtermanager.go
func (fm *filterManager) onContinueReading(filter *activeReadFilter) {
var index int
var uf *activeReadFilter

if filter != nil {
index = filter.index + 1
}

// filterManager遍历过滤器进行数据处理
for ; index < len(fm.upstreamFilters); index++ {
uf = fm.upstreamFilters[index]
uf.index = index
// 对没有初始化的过滤器调用其初始化方法 OnNewConnection,本例为func (f *grpcFilter) OnNewConnection() api.FilterStatus(向 Listener 发送 grpc 连接以唤醒 Listener 的 Accept)
if !uf.initialized {
uf.initialized = true

status := uf.filter.OnNewConnection()

if status == api.Stop {
return
}
}

buf := fm.conn.GetReadBuffer()

if buf != nil && buf.Len() > 0 {
// 通知相应过滤器处理
status := uf.filter.OnData(buf)

if status == api.Stop {
return
}
}
}
}
=>
mosn.io/mosn/pkg/filter/network/grpc.(*grpcFilter).OnData at filter.go
=>
mosn.io/mosn/pkg/filter/network/grpc.(*grpcFilter).dispatch at filter.go
func (f *grpcFilter) dispatch(buf buffer.IoBuffer) {
if log.DefaultLogger.GetLogLevel() >= log.DEBUG {
log.DefaultLogger.Debugf("grpc get datas: %d", buf.Len())
}
// 发送数据唤醒连接读取
f.conn.Send(buf)
if log.DefaultLogger.GetLogLevel() >= log.DEBUG {
log.DefaultLogger.Debugf("read dispatch finished")
}
}

0x03 Grpc Sever 作为 NetworkFilter 处理请求

第一阶段中从原始连接读取数据,会进入 Grpc Serve 处理,Serve 方法通过 net.Listener 监听连接,每次启动一个新的协程来处理新的连接(handleRawConn),建立一个基于Http2 的 Transport 进行传输层的 RPC 调用,具体流程如下。

google.golang.org/grpc.(*Server).handleRawConn at server.go
func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
// 校验服务状态
if s.quit.HasFired() {
rawConn.Close()
return
}
rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
if err != nil {
...
}
// HTTP2 握手,创建 Http2Server 与客户端交换帧的初始化信息,帧和窗口大小等
st := s.newHTTP2Transport(conn, authInfo)
if st == nil {
return
}

rawConn.SetDeadline(time.Time{})
if !s.addConn(lisAddr, st) {
return
}
// 创建一个协程进行流处理
go func() {
s.serveStreams(st)
s.removeConn(lisAddr, st)
}()
...
}
=>
google.golang.org/grpc.(*Server).serveStreams at server.go
=>
google.golang.org/grpc.(*Server).handleStream at server.go
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
// 找到到需要调用的 FullMethod,此例为 spec.proto.runtime.v1.Runtime/InvokeService
sm := stream.Method()
if sm != "" && sm[0] == '/' {
sm = sm[1:]
}
...
service := sm[:pos]
method := sm[pos+1:]

// 从注册的 service 列表中找到对应 serviceInfo 对象
srv, knownService := s.services[service]
if knownService {
// 根据方法名找到单向请求的 md——MethodDesc,此 demo 为 mosn.io/layotto/spec/proto/runtime/v1._Runtime_InvokeService_Handler
if md, ok := srv.methods[method]; ok {
s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
// 流式请求
if sd, ok := srv.streams[method]; ok {
s.processStreamingRPC(t, stream, srv, sd, trInfo)
return
}
}
...
=>
google.golang.org/grpc.(*Server).processUnaryRPC at server.go
=>
mosn.io/layotto/spec/proto/runtime/v1._Runtime_InvokeService_Handler at runtime.pb.go
=>
google.golang.org/grpc.chainUnaryServerInterceptors at server.go
=>
// 服务端单向调用拦截器,用以调用 Mosn 的 streamfilter
mosn.io/mosn/pkg/filter/network/grpc.(*grpcServerFilterFactory).UnaryInterceptorFilter at factory.go
=>
google.golang.org/grpc.getChainUnaryHandler at server.go
// 递归生成链式UnaryHandler
func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
if curr == len(interceptors)-1 {
return finalHandler
}

return func(ctx context.Context, req interface{}) (interface{}, error) {
// finalHandler就是mosn.io/layotto/spec/proto/runtime/v1._Runtime_InvokeService_Handler
return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
}
}

0x04 Layotto 发送 RPC 请求并写入 Local 虚拟连接

接上述 0x03 流程,从 Runtime_InvokeService_Handler 起,由 GRPC 默认 API 转换为 Dapr API,进入 Layotto 提供的对接 Mosn 的轻量 RPC 框架,具体流程如下。

mosn.io/layotto/spec/proto/runtime/v1._Runtime_InvokeService_Handler at runtime.pb.go
=>
mosn.io/layotto/pkg/grpc/default_api.(*api).InvokeService at api.go
=>
mosn.io/layotto/pkg/grpc/dapr.(*daprGrpcAPI).InvokeService at dapr_api.go
=>
mosn.io/layotto/components/rpc/invoker/mosn.(*mosnInvoker).Invoke at mosninvoker.go
// 请求 Mosn 底座和返回响应
func (m *mosnInvoker) Invoke(ctx context.Context, req *rpc.RPCRequest) (resp *rpc.RPCResponse, err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("[runtime][rpc]mosn invoker panic: %v", r)
log.DefaultLogger.Errorf("%v", err)
}
}()

// 1. 如果超时时间为 0,设置默认 3000ms 超时
if req.Timeout == 0 {
req.Timeout = 3000
}
req.Ctx = ctx
log.DefaultLogger.Debugf("[runtime][rpc]request %+v", req)
// 2. 触发请求执行前的自定义逻辑
req, err = m.cb.BeforeInvoke(req)
if err != nil {
log.DefaultLogger.Errorf("[runtime][rpc]before filter error %s", err.Error())
return nil, err
}
// 3. 核心调用,下文会进行详细分析
resp, err = m.channel.Do(req)
if err != nil {
log.DefaultLogger.Errorf("[runtime][rpc]error %s", err.Error())
return nil, err
}
resp.Ctx = req.Ctx
// 4. 触发请求返回后的自定义逻辑
resp, err = m.cb.AfterInvoke(resp)
if err != nil {
log.DefaultLogger.Errorf("[runtime][rpc]after filter error %s", err.Error())
}
return resp, err
}
=>
mosn.io/layotto/components/rpc/invoker/mosn/channel.(*httpChannel).Do at httpchannel.go
func (h *httpChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) {
// 1. 使用上一阶段设置的默认超时设置 context 超时
timeout := time.Duration(req.Timeout) * time.Millisecond
ctx, cancel := context.WithTimeout(req.Ctx, timeout)
defer cancel()

// 2. 创建连接得到,启动 readloop 协程进行 Layotto 和 Mosn 的读写交互(具体见下文分析)
conn, err := h.pool.Get(ctx)
if err != nil {
return nil, err
}

// 3. 设置数据写入连接的超时时间
hstate := conn.state.(*hstate)
deadline, _ := ctx.Deadline()
if err = conn.SetWriteDeadline(deadline); err != nil {
hstate.close()
h.pool.Put(conn, true)
return nil, common.Error(common.UnavailebleCode, err.Error())
}
// 4. 因为初始化时配置的 Layotto 与 Mosn 交互使用的是 Http 协议,所以这里会构造 Http 请求
httpReq := h.constructReq(req)
defer fasthttp.ReleaseRequest(httpReq)

// 借助 fasthttp 请求体写入虚拟连接
if _, err = httpReq.WriteTo(conn); err != nil {
hstate.close()
h.pool.Put(conn, true)
return nil, common.Error(common.UnavailebleCode, err.Error())
}

// 5. 构造 fasthttp.Response 结构体读取和解析 hstate 的返回,并设置读取超时时间
httpResp := &fasthttp.Response{}
hstate.reader.SetReadDeadline(deadline)

// 在 Mosn 数据返回前这里会阻塞,readloop 协程读取 Mosn 返回的数据之后流程见下述 0x08 阶段
if err = httpResp.Read(bufio.NewReader(hstate.reader)); err != nil {
hstate.close()
h.pool.Put(conn, true)
return nil, common.Error(common.UnavailebleCode, err.Error())
}
h.pool.Put(conn, false)
...
}
=>
mosn.io/layotto/components/rpc/invoker/mosn/channel.(*connPool).Get at connpool.go
// Get is get wrapConn by context.Context
func (p *connPool) Get(ctx context.Context) (*wrapConn, error) {
if err := p.waitTurn(ctx); err != nil {
return nil, err
}

p.mu.Lock()
// 1. 从连接池获取连接
if ele := p.free.Front(); ele != nil {
p.free.Remove(ele)
p.mu.Unlock()
wc := ele.Value.(*wrapConn)
if !wc.isClose() {
return wc, nil
}
} else {
p.mu.Unlock()
}

// 2. 创建新的连接
c, err := p.dialFunc()
if err != nil {
p.freeTurn()
return nil, err
}
wc := &wrapConn{Conn: c}
if p.stateFunc != nil {
wc.state = p.stateFunc()
}
// 3. 启动 readloop 独立协程读取 Mosn 返回的数据
if p.onDataFunc != nil {
utils.GoWithRecover(func() {
p.readloop(wc)
}, nil)
}
return wc, nil
}
=>

上面第二步创建新的连接需要注意下,是调用了 init 阶段的 RegistChannel 初始化的协议中的 dialFunc func() (net.Conn, error),因为配置里与 Mosn 交互用的是 Http 协议,所以这里是 newHttpChanel,目前还支持 Bolt,Dubbo 等,详见如下代码。

mosn.io/layotto/components/rpc/invoker/mosn/channel.newHttpChannel at httpchannel.go
// newHttpChannel is used to create rpc.Channel according to ChannelConfig
func newHttpChannel(config ChannelConfig) (rpc.Channel, error) {
hc := &httpChannel{}
// 为减少连接创建开销的连接池,定义在 mosn.io/layotto/components/rpc/invoker/mosn/channel/connpool.go
hc.pool = newConnPool(
config.Size,
// dialFunc
func() (net.Conn, error) {
_, _, err := net.SplitHostPort(config.Listener)
if err == nil {
return net.Dial("tcp", config.Listener)
}
//创建一对虚拟连接(net.Pipe),Layotto 持有 local,Mosn 持有 remote, Layotto 向 local 写入,Mosn 会收到数据, Mosn 从 remote读取,执行 filter 逻辑并进行代理转发,再将响应写到 remote ,最后 Layotto 从 remote 读取,获得响应
local, remote := net.Pipe()
localTcpConn := &fakeTcpConn{c: local}
remoteTcpConn := &fakeTcpConn{c: remote}
// acceptFunc 是定义在 mosn.io/layotto/components/rpc/invoker/mosn/channel.go 中的闭包,闭包中监听了 remote 虚拟连接
if err := acceptFunc(remoteTcpConn, config.Listener); err != nil {
return nil, err
}
// the goroutine model is:
// request goroutine ---> localTcpConn ---> mosn
// ^ |
// | |
// | |
// hstate <-- readloop goroutine <------
return localTcpConn, nil
},
// stateFunc
func() interface{} {
// hstate 是 readloop 协程与 request 协程通信的管道,是一对读写 net.Conn,请求协程从 reader net.Conn 中读数据,readloop 协程序往 writer net.Conn 写数据
s := &hstate{}
s.reader, s.writer = net.Pipe()
return s
},
hc.onData,
hc.cleanup,
)
return hc, nil
}

0x05 Mosn 读取 Remote 并执行 Filter 和代理转发

(1) 与 0x02 类似,filtermanager 执行过滤器处理阶段,这里会到 proxy 中进行代理转发,详见如下代码。

...
mosn.io/mosn/pkg/network.(*filterManager).onContinueReading at filtermanager.go
=>
mosn.io/mosn/pkg/proxy.(*proxy).OnData at proxy.go
func (p *proxy) OnData(buf buffer.IoBuffer) api.FilterStatus {
if p.fallback {
return api.Continue
}

if p.serverStreamConn == nil {
...
p.serverStreamConn = stream.CreateServerStreamConnection(p.context, proto, p.readCallbacks.Connection(), p)
}
//把数据分发到对应协议的解码器,在这里因为是 POST /org.apache.dubbo.samples.UserProvider HTTP/1.1,所以是 mosn.io/mosn/pkg/stream/http.(*serverStreamConnection).serve at stream.go
p.serverStreamConn.Dispatch(buf)

return api.Stop
}
=>

(2) serverStreamConnection.serve 监听并处理请求到 downstream OnReceive,详见如下代码。

mosn.io/mosn/pkg/stream/http.(*serverStream).handleRequest at stream.go
func (s *serverStream) handleRequest(ctx context.Context) {
if s.request != nil {
// set non-header info in request-line, like method, uri
injectCtxVarFromProtocolHeaders(ctx, s.header, s.request.URI())
hasData := true
if len(s.request.Body()) == 0 {
hasData = false
}

if hasData {
//在此进入 downstream OnReceive
s.receiver.OnReceive(s.ctx, s.header, buffer.NewIoBufferBytes(s.request.Body()), nil)
} else {
s.receiver.OnReceive(s.ctx, s.header, nil, nil)
}
}
}
=>
mosn.io/mosn/pkg/proxy.(*downStream).OnReceive at downstream.go
func (s *downStream) OnReceive(ctx context.Context, headers types.HeaderMap, data types.IoBuffer, trailers types.HeaderMap) {
...
var task = func() {
...

phase := types.InitPhase
for i := 0; i < 10; i++ {
s.cleanNotify()

phase = s.receive(s.context, id, phase)
...
}
}
}

if s.proxy.serverStreamConn.EnableWorkerPool() {
if s.proxy.workerpool != nil {
// use the worker pool for current proxy
s.proxy.workerpool.Schedule(task)
} else {
// use the global shared worker pool
pool.ScheduleAuto(task)
}
return
}

task()
return

}

(3) 上述 ScheduleAuto 调度后,经过 downStream 的 reveive 的各个阶段处理,经过 upstreamRequest、http clientStream 等处理,最终从网络层的 connection.Write 发送数据并进入 WaitNotify 阶段阻塞,详见如下代码。

mosn.io/mosn/pkg/sync.(*workerPool).ScheduleAuto at workerpool.go
=>
mosn.io/mosn/pkg/sync.(*workerPool).spawnWorker at workerpool.go
=>
mosn.io/mosn/pkg/proxy.(*downStream).receive at downstream.go
=>
InitPhase=>DownFilter=>MatchRoute=>DownFilterAfterRoute=>ChooseHost=>DownFilterAfterChooseHost=>DownRecvHeader=>DownRecvData
=>
mosn.io/mosn/pkg/proxy.(*downStream).receiveData at downstream.go
=>
mosn.io/mosn/pkg/proxy.(*upstreamRequest).appendData at upstream.go
=>
mosn.io/mosn/pkg/stream/http.(*clientStream).doSend at stream.go
=>
github.com/valyala/fasthttp.(*Request).WriteTo at http.go
=>
mosn.io/mosn/pkg/stream/http.(*streamConnection).Write at stream.go
>
mosn.io/mosn/pkg/network.(*connection).Write at connection.go
=>
mosn.io/mosn/pkg/proxy.(*downStream).receive at downstream.go
func (s *downStream) receive(ctx context.Context, id uint32, phase types.Phase) types.Phase {
for i := 0; i <= int(types.End-types.InitPhase); i++ {
s.phase = phase

switch phase {
...
case types.WaitNotify:
s.printPhaseInfo(phase, id)
if p, err := s.waitNotify(id); err != nil {
return p
}

if log.Proxy.GetLogLevel() >= log.DEBUG {
log.Proxy.Debugf(s.context, "[proxy] [downstream] OnReceive send downstream response %+v", s.downstreamRespHeaders)
}
...
}
=>
func (s *downStream) waitNotify(id uint32) (phase types.Phase, err error) {
if atomic.LoadUint32(&s.ID) != id {
return types.End, types.ErrExit
}

if log.Proxy.GetLogLevel() >= log.DEBUG {
log.Proxy.Debugf(s.context, "[proxy] [downstream] waitNotify begin %p, proxyId = %d", s, s.ID)
}
select {
// 阻塞等待
case <-s.notify:
}
return s.processError(id)
}

0x06 Dubbo-go-sample server 收到请求返回响应

这里就是 dubbo-go-sample server的处理,暂不展开,贴下日志信息,感兴趣的同学可以回去翻看源码。

[2022-04-18/21:03:18 github.com/apache/dubbo-go-samples/rpc/jsonrpc/go-server/pkg.(*UserProvider2).GetUser: user_provider2.go: 53] userID:"A003"
[2022-04-18/21:03:18 github.com/apache/dubbo-go-samples/rpc/jsonrpc/go-server/pkg.(*UserProvider2).GetUser: user_provider2.go: 56] rsp:&pkg.User{ID:"113", Name:"Moorse", Age:30, sex:0, Birth:703394193, Sex:"MAN"}

0x07 Mosn 框架处理响应并写回 Remote 虚拟连接

接上述 0x05 第三阶段,在 reveive 的循环阶段的 UpRecvData 阶段进入处理响应逻辑,经过一系列处理最终 Response 写回 0x04 中的 remote 虚拟连接,具体流程如下。

mosn.io/mosn/pkg/proxy.(*downStream).receive at downstream.go
func (s *downStream) waitNotify(id uint32) (phase types.Phase, err error) {
if atomic.LoadUint32(&s.ID) != id {
return types.End, types.ErrExit
}

if log.Proxy.GetLogLevel() >= log.DEBUG {
log.Proxy.Debugf(s.context, "[proxy] [downstream] waitNotify begin %p, proxyId = %d", s, s.ID)
}
// 返回响应
select {
case <-s.notify:
}
return s.processError(id)
}
=>
UpFilter
=>
UpRecvHeader
=>
func (s *downStream) receive(ctx context.Context, id uint32, phase types.Phase) types.Phase {
for i := 0; i <= int(types.End-types.InitPhase); i++ {
s.phase = phase

switch phase {
...
case types.UpRecvData:
if s.downstreamRespDataBuf != nil {
s.printPhaseInfo(phase, id)
s.upstreamRequest.receiveData(s.downstreamRespTrailers == nil)
if p, err := s.processError(id); err != nil {
return p
}
}
...
}
=>
mosn.io/mosn/pkg/proxy.(*upstreamRequest).receiveData at upstream.go
=>
mosn.io/mosn/pkg/proxy.(*downStream).onUpstreamData at downstream.go
=>
mosn.io/mosn/pkg/proxy.(*downStream).appendData at downstream.go
=>
mosn.io/mosn/pkg/stream/http.(*serverStream).AppendData at stream.go
=>
mosn.io/mosn/pkg/stream/http.(*serverStream).endStream at stream.go
=>
mosn.io/mosn/pkg/stream/http.(*serverStream).doSend at stream.go
=>
github.com/valyala/fasthttp.(*Response).WriteTo at http.go
=>
github.com/valyala/fasthttp.writeBufio at http.go
=>
github.com/valyala/fasthttp.(*statsWriter).Write at http.go
=>
mosn.io/mosn/pkg/stream/http.(*streamConnection).Write at stream.go

0x08 Layotto 接收 RPC 响应并读取 Local 虚拟连接

上述 0x04 启动的 readloop 协程读IO被激活,从连接读取数Mosn 传回的数据,然后交给 hstate 管道中转处理再返回给请求协程,具体流程如下。

mosn.io/layotto/components/rpc/invoker/mosn/channel.(*connPool).readloop at connpool.go
// readloop is loop to read connected then exec onDataFunc
func (p *connPool) readloop(c *wrapConn) {
var err error

defer func() {
c.close()
if p.cleanupFunc != nil {
p.cleanupFunc(c, err)
}
}()

c.buf = buffer.NewIoBuffer(defaultBufSize)
for {
// 从连接读取数据
n, readErr := c.buf.ReadOnce(c)
if readErr != nil {
err = readErr
if readErr == io.EOF {
log.DefaultLogger.Debugf("[runtime][rpc]connpool readloop err: %s", readErr.Error())
} else {
log.DefaultLogger.Errorf("[runtime][rpc]connpool readloop err: %s", readErr.Error())
}
}

if n > 0 {
// 在onDataFunc 委托给 hstate 处理数据
if onDataErr := p.onDataFunc(c); onDataErr != nil {
err = onDataErr
log.DefaultLogger.Errorf("[runtime][rpc]connpool onData err: %s", onDataErr.Error())
}
}

if err != nil {
break
}

if c.buf != nil && c.buf.Len() == 0 && c.buf.Cap() > maxBufSize {
c.buf.Free()
c.buf.Alloc(defaultBufSize)
}
}
}
=>
mosn.io/layotto/components/rpc/invoker/mosn/channel.(*httpChannel).onData at httpchannel.go
=>
mosn.io/layotto/components/rpc/invoker/mosn/channel.(*hstate).onData at httpchannel.go
=>
net.(*pipe).Write at pipe.go
=>
mosn.io/layotto/components/rpc/invoker/mosn/channel.(*httpChannel).Do at httpchannel.go
func (h *httpChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) {
...
// 接上述0x04阶段,mosn 数据返回后,从 hstate 读取 readloop 协程从 mosn 返回的数据
if err = httpResp.Read(bufio.NewReader(hstate.reader)); err != nil {
hstate.close()
h.pool.Put(conn, true)
return nil, common.Error(common.UnavailebleCode, err.Error())
}
h.pool.Put(conn, false)

// 获取 fasthttp 的数据部分,解析状态码,失败返回错误信息和状态码
body := httpResp.Body()
if httpResp.StatusCode() != http.StatusOK {
return nil, common.Errorf(common.UnavailebleCode, "http response code %d, body: %s", httpResp.StatusCode(), string(body))
}

// 6. 将结果转换为 rpc.RPCResponse 返回
rpcResp := &rpc.RPCResponse{
ContentType: string(httpResp.Header.ContentType()),
Data: body,
Header: map[string][]string{},
}
httpResp.Header.VisitAll(func(key, value []byte) {
rpcResp.Header[string(key)] = []string{string(value)}
})
return rpcResp, nil

0x09 Grpc Sever 处理数据帧返回给客户端

Grpc 并没有直接写入数据到连接,而是用协程异步 loop 循环从一个缓存结构里面获取帧然后写回到客户端,具体流程如下。

google.golang.org/grpc/internal/transport.NewServerTransport at http2_server.go
func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
...
// 协程异步loop循环
go func() {
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
if err := t.loopy.run(); err != nil {
if logger.V(logLevel) {
logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
}
}
t.conn.Close()
t.controlBuf.finish()
close(t.writerDone)
}()
go t.keepalive()
return t, nil
}
=>
google.golang.org/grpc/internal/transport.(*loopyWriter).run at controlbuf.go
=>
google.golang.org/grpc/internal/transport.(*bufWriter).Flush at http_util.go
=>
mosn.io/mosn/pkg/filter/network/grpc.(*Connection).Write at conn.go
=>
mosn.io/mosn/pkg/network.(*connection).Write at connection.go
=>
mosn.io/mosn/pkg/network.(*connection).writeDirectly at connection.go
=>
mosn.io/mosn/pkg/network.(*connection).doWrite at connection.go

0x10 dubbo-go-sample client 接收响应

接上述 0x01 发送数据之后会阻塞在 Client grpc 底层读IO中, Layotto经过上述一些列处理层层返回数据激活Client底层Read IO,具体流程如下。

google.golang.org/grpc.(*ClientConn).Invoke at call.go
=>
google.golang.org/grpc.(*ClientConn).Invoke at call.go
=>
google.golang.org/grpc.(*clientStream).RecvMsg at stream.go
=>
google.golang.org/grpc.(*clientStream).withRetry at stream.go
=>
google.golang.org/grpc.(*csAttempt).recvMsg at stream.go
=>
google.golang.org/grpc.recvAndDecompress at rpc_util.go
=>
google.golang.org/grpc.recv at rpc_util.go
=>
google.golang.org/grpc.(*parser).recvMsg at rpc_util.go
=>
google.golang.org/grpc.(*csAttempt).recvMsg at stream.go
func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) {
if _, err := p.r.Read(p.header[:]); err != nil {
return 0, nil, err
}
...
}

最终收到返回数据:

{"jsonrpc":"2.0","id":9527,"result":{"id":"113","name":"Moorse","age":30,"time":703394193,"sex":"MAN"}}

总结

Layotto RPC 处理流程涉及 GRPC、Dapr、Mosn 等相关的知识,整体流程较长,不过单纯看 Layotto 针对 Mosn 抽象的轻量 RPC 框架还是比较清晰和简单的,与 Mosn 集成的方式也比较新颖,值得进一步研读。至此 Layotto RPC 请求处理就分析完了,时间有限,没有进行一些更全面和深入的剖析,如有纰漏之处,欢迎指正,联系方式:rayo.wangzl@gmail.com。另外在此也希望大家能踊跃参与源码分析和开源社区来,一起学习,共同进步。

源码解析 layotto启动流程

· 阅读需 6 分钟

作者简介: 张立斌,https://github.com/ZLBer

写作时间: 2022年5月4日

Overview

Layotto “寄生”在 MOSN 里,启动流程其实是先启动 MOSN, MOSN 在启动过程中回调 Layotto ,让 Layotto 启动。

源码分析

一切起源于我们的命令行: layotto start -c configpath

1.cmd分析

main 的 init 函数首先运行:

func init() {   
//将layotto的初始化函数传给mosn,让mosn启动的时候进行回调
mgrpc.RegisterServerHandler("runtime", NewRuntimeGrpcServer)
....
}

cmd 的 action 开始执行:

	Action: func(c *cli.Context) error {
app := mosn.NewMosn()
//stagemanager用于管理mosn启动的每个阶段,可以添加相应的阶段函数,比如下面的ParamsParsedStage、InitStage、PreStartStage、AfterStartStage
//这里是将configpath传给mosn,下面都是mosn相关的逻辑
stm := stagemanager.InitStageManager(c, c.String("config"), app)
stm.AppendParamsParsedStage(ExtensionsRegister)
stm.AppendParamsParsedStage(func(c *cli.Context) {
err := featuregate.Set(c.String("feature-gates"))
if err != nil {
os.Exit(1)
}
})·
stm.AppendInitStage(mosn.DefaultInitStage)
stm.AppendPreStartStage(mosn.DefaultPreStartStage)
stm.AppendStartStage(mosn.DefaultStartStage)
//这里添加layotto的健康检查机制
stm.AppendAfterStartStage(SetActuatorAfterStart)
stm.Run()
// wait mosn finished
stm.WaitFinish()
return nil
},

2.回调函数NewRuntimeGrpcServer分析

MOSN 启动的时候回调 NewRuntimeGrpcServer ,data 是未解析的配置文件,opts 是 grpc 的配置项,返回 Grpc server

func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrpc.RegisteredServer, error) {
// 将原始的配置文件解析成结构体形式。
cfg, err := runtime.ParseRuntimeConfig(data)
// 新建layotto runtime, runtime包含各种组件的注册器和各种组件的实例。
rt := runtime.NewMosnRuntime(cfg)
// 3.runtime开始启动
server, err := rt.Run(
...
// 4. 添加所有组件的初始化函数
// 我们只看下File组件的,将NewXXX()添加到组件Factory里
runtime.WithFileFactory(
file.NewFileFactory("aliyun.oss", alicloud.NewAliCloudOSS),
file.NewFileFactory("minio", minio.NewMinioOss),
file.NewFileFactory("aws.s3", aws.NewAwsOss),
file.NewFileFactory("tencent.oss", tencentcloud.NewTencentCloudOSS),
file.NewFileFactory("local", local.NewLocalStore),
file.NewFileFactory("qiniu.oss", qiniu.NewQiniuOSS),
),
...
return server, err

)

//
}

3.runtime分析

看一下 runtime 的结构体,从整体上把握 runtime 的构成:

type MosnRuntime struct {
// 包括组件的config
runtimeConfig *MosnRuntimeConfig
info *info.RuntimeInfo
srv mgrpc.RegisteredServer
// 组件注册器,用来注册和新建组件,里面有组件的NewXXX()函数
helloRegistry hello.Registry
configStoreRegistry configstores.Registry
rpcRegistry rpc.Registry
pubSubRegistry runtime_pubsub.Registry
stateRegistry runtime_state.Registry
lockRegistry runtime_lock.Registry
sequencerRegistry runtime_sequencer.Registry
fileRegistry file.Registry
bindingsRegistry mbindings.Registry
secretStoresRegistry msecretstores.Registry
customComponentRegistry custom.Registry
hellos map[string]hello.HelloService
// 各种组件
configStores map[string]configstores.Store
rpcs map[string]rpc.Invoker
pubSubs map[string]pubsub.PubSub
states map[string]state.Store
files map[string]file.File
locks map[string]lock.LockStore
sequencers map[string]sequencer.Store
outputBindings map[string]bindings.OutputBinding
secretStores map[string]secretstores.SecretStore
customComponent map[string]map[string]custom.Component
AppCallbackConn *rawGRPC.ClientConn
errInt ErrInterceptor
started bool
//初始化函数
initRuntimeStages []initRuntimeStage
}

runtime 的 run 函数逻辑如下:

func (m *MosnRuntime) Run(opts ...Option) (mgrpc.RegisteredServer, error) {
// 启动标志
m.started = true
// 新建runtime配置
o := newRuntimeOptions()
//这里运行我们之前传入的option函数,其实就是将各种组件Factory注册进来
for _, opt := range opts {
opt(o)
}
//初始化组件
if err := m.initRuntime(o); err != nil {
return nil, err
}

//初始化Grpc,api赋值
var grpcOpts []grpc.Option
if o.srvMaker != nil {
grpcOpts = append(grpcOpts, grpc.WithNewServer(o.srvMaker))
}
var apis []grpc.GrpcAPI
ac := &grpc.ApplicationContext{
m.runtimeConfig.AppManagement.AppId,
m.hellos,
m.configStores,
m.rpcs,
m.pubSubs,
m.states,
m.files,
m.locks,
m.sequencers,
m.sendToOutputBinding,
m.secretStores,
m.customComponent,
}
//调用组件的factory生成每个组件
for _, apiFactory := range o.apiFactorys {
api := apiFactory(ac)
// init the GrpcAPI
if err := api.Init(m.AppCallbackConn); err != nil {
return nil, err
}
apis = append(apis, api)
}
// 将api接口和配置传给grpc
grpcOpts = append(grpcOpts,
grpc.WithGrpcOptions(o.options...),
grpc.WithGrpcAPIs(apis),
)
//启动grpc
var err error = nil
m.srv, err = grpc.NewGrpcServer(grpcOpts...)
return m.srv, err
}

组件的初始化函数 initRuntime :

func (m *MosnRuntime) initRuntime(r *runtimeOptions) error {
st := time.Now()
if len(m.initRuntimeStages) == 0 {
m.initRuntimeStages = append(m.initRuntimeStages, DefaultInitRuntimeStage)
}
// 调用DefaultInitRuntimeStage
for _, f := range m.initRuntimeStages {
err := f(r, m)
if err != nil {
return err
}
}
...
return nil
}

DefaultInitRuntimeStage 组件初始化逻辑,调用每个组件的 init 方法:

func DefaultInitRuntimeStage(o *runtimeOptions, m *MosnRuntime) error {
...
//初始化config/state/file/lock/sequencer/secret等各种组件
if err := m.initCustomComponents(o.services.custom); err != nil {
return err
}
if err := m.initHellos(o.services.hellos...); err != nil {
return err
}
if err := m.initConfigStores(o.services.configStores...); err != nil {
return err
}
if err := m.initStates(o.services.states...); err != nil {
return err
}
if err := m.initRpcs(o.services.rpcs...); err != nil {
return err
}
if err := m.initOutputBinding(o.services.outputBinding...); err != nil {
return err
}
if err := m.initPubSubs(o.services.pubSubs...); err != nil {
return err
}
if err := m.initFiles(o.services.files...); err != nil {
return err
}
if err := m.initLocks(o.services.locks...); err != nil {
return err
}
if err := m.initSequencers(o.services.sequencers...); err != nil {
return err
}
if err := m.initInputBinding(o.services.inputBinding...); err != nil {
return err
}
if err := m.initSecretStores(o.services.secretStores...); err != nil {
return err
}
return nil
}

以 file 组件为例,看下初始化函数:

func (m *MosnRuntime) initFiles(files ...*file.FileFactory) error {

//将配置的组件注册进去
m.fileRegistry.Register(files...)
for name, config := range m.runtimeConfig.Files {
//create调用NewXXX()函数新建一个组件实例
c, err := m.fileRegistry.Create(name)
if err != nil {
m.errInt(err, "create files component %s failed", name)
return err
}
if err := c.Init(context.TODO(), &config); err != nil {
m.errInt(err, "init files component %s failed", name)
return err
}
//赋值给runtime
m.files[name] = c
}
return nil
}

至此 MOSN、Grpc、Layotto 都已经启动完成,通过 Grpc 的接口就可以调用到组件的代码逻辑。

总结

总览整个启动流程,Layotto 结合 MOSN 来做启动,解析配置文件,生成配置文件中的组件类,将 Grpc 的 api 暴露出去。

Layotto 源码解析 —— WebAssembly

· 阅读需 22 分钟

本文主要分析 Layotto 中 WASM 的相关实现和应用。

作者:王志龙 | 2022年5月18日

概述

WebAssemly 简称 WASM,是一种运行在沙箱化的执行环境中的可移植、体积小、加载快的二进制格式,WASM最初设计是为了在网络浏览器中实现高性能应用,得益于它良好的隔离性和安全性、多语言支持、冷启动快等灵活性和敏捷性等特性,又被应用于嵌入其它应用程序中以获得较好的扩展能力,显然我们可以将它嵌入到 Layotto 中。Layotto 支持加载编译好的 WASM 文件,并通过 proxy_abi_version_0_2_0 的 API 与目标 WASM 进行交互; 另外 Layotto 也支持加载并运行以 WASM 为载体的 Function,并支持 Function 之间互相调用以及访问基础设施;同时 Layotto 社区也正在探索把 component 编译成 WASM 模块以此来增强模块间的隔离性。本文以 Layotto 官方 quickstart 即访问redis相关示例为例来分析 Layotto 中 WebAssemly 相关的实现和应用。

源码分析

备注:本文基于 commit hash:f1cf350a52b5a1a0b3788a31681007a056e332ef

框架INIT

由于 Layotto 的底层是 Mosn,WASM 的扩展框架也是复用 Mosn 的 WASM 扩展框架,如图1 Layotto & Mosn WASM 框架 [1] 所示。

mosn_wasm_ext_framework_module

图1 Layotto & Mosn WASM 框架

其中,Manager 负责对 WASM 插件进行管理和动态更新;VM 负责对 WASM 虚拟机、模块和实例进行管理;ABI 作为应用程序二进制接口,提供对外使用接口 [2]。

这里首先简单回顾下几个概念:
Proxy-Wasm :WebAssembly for Proxies (ABI specification) 是一个代理无关的 ABI 标准,它约定了代理和 WASM 模块如何以函数和回调的形式互动 [3]。
proxy-wasm-go-sdk :定义了函数访问系统资源及基础设施服务的接口,基于 proxy-wasm/spec 实现,在此基础上结合 Runtime API 增加了对基础设施访问的 ABI。
proxy-wasm-go-host WebAssembly for Proxies (GoLang host implementation):Proxy-Wasm 的 golang 实现,用以在 Layotto 中实现 Runtime ABI 的具体逻辑。
VM:Virtual Machine 虚拟机,Runtime类型有:wasmtime、Wasmer、V8、 Lucet、WAMR、wasm3,本文例子中使用 wasmer

1、首先看 quickstart例子 中 stream filter 的配置,如下可以看到配置中有两个 WASM 插件,使用 wasmer VM 分别启动一个实例,详见如下配置:

 "stream_filters": [
{
"type": "Layotto",
"config": {
"function1": {
"name": "function1", // 插件名
"instance_num": 1, // 沙箱实例个数
"vm_config": {
"engine": "wasmer", // 虚拟机 Runtime 类型
"path": "demo/faas/code/golang/client/function_1.wasm" // wasm 文件路径
}
},
"function2": {
"name": "function2", // 插件名
"instance_num": 1, // 沙箱实例个数
"vm_config": {
"engine": "wasmer", // 虚拟机 Runtime 类型
"path": "demo/faas/code/golang/server/function_2.wasm" // wasm 文件路径
}
}
}
}
]

上述配置中 function1 主要逻辑就是接收 HTTP 请求,然后通过 ABI 调用 function2,并返回 function2 结果,详见如下代码:

func (ctx *httpHeaders) OnHttpRequestBody(bodySize int, endOfStream bool) types.Action {
//1. get request body
body, err := proxywasm.GetHttpRequestBody(0, bodySize)
if err != nil {
proxywasm.LogErrorf("GetHttpRequestBody failed: %v", err)
return types.ActionPause
}

//2. parse request param
bookName, err := getQueryParam(string(body), "name")
if err != nil {
proxywasm.LogErrorf("param not found: %v", err)
return types.ActionPause
}

//3. request function2 through ABI
inventories, err := proxywasm.InvokeService("id_2", "", bookName)
if err != nil {
proxywasm.LogErrorf("invoke service failed: %v", err)
return types.ActionPause
}

//4. return result
proxywasm.AppendHttpResponseBody([]byte("There are " + inventories + " inventories for " + bookName + "."))
return types.ActionContinue
}

function2 主要逻辑就是接收 HTTP 请求,然后通过 ABI 调用 redis,并返回 redis 结果,详见如下代码:

func (ctx *httpHeaders) OnHttpRequestBody(bodySize int, endOfStream bool) types.Action {
//1. get request body
body, err := proxywasm.GetHttpRequestBody(0, bodySize)
if err != nil {
proxywasm.LogErrorf("GetHttpRequestBody failed: %v", err)
return types.ActionPause
}
bookName := string(body)

//2. get request state from redis by specific key through ABI
inventories, err := proxywasm.GetState("redis", bookName)
if err != nil {
proxywasm.LogErrorf("GetState failed: %v", err)
return types.ActionPause
}

//3. return result
proxywasm.AppendHttpResponseBody([]byte(inventories))
return types.ActionContinue
}

2、对应图1 WASM 框架 中的 Manager 部分,在 Mosn filter Init 阶段进行初始化,详见如下代码:

// Create a proxy factory for WasmFilter
func createProxyWasmFilterFactory(confs map[string]interface{}) (api.StreamFilterChainFactory, error) {
factory := &FilterConfigFactory{
config: make([]*filterConfigItem, 0, len(confs)),
RootContextID: 1,
plugins: make(map[string]*WasmPlugin),
router: &Router{routes: make(map[string]*Group)},
}

for configID, confIf := range confs {
conf, ok := confIf.(map[string]interface{})
if !ok {
log.DefaultLogger.Errorf("[proxywasm][factory] createProxyWasmFilterFactory config not a map, configID: %s", configID)
return nil, errors.New("config not a map")
}
// 解析 wasm filter 配置
config, err := parseFilterConfigItem(conf)
if err != nil {
log.DefaultLogger.Errorf("[proxywasm][factory] createProxyWasmFilterFactory fail to parse config, configID: %s, err: %v", configID, err)
return nil, err
}

var pluginName string
if config.FromWasmPlugin == "" {
pluginName = utils.GenerateUUID()

// 根据 stream filter 的配置初始化 WASM 插件配置,VmConfig 即 vm_config,InstanceNum 即 instance_num
v2Config := v2.WasmPluginConfig{
PluginName: pluginName,
VmConfig: config.VmConfig,
InstanceNum: config.InstanceNum,
}

// WasmManager 实例通过管理 PluginWrapper 对象对所有插件的配置进行统一管理,提供增删查改能力。下接3
err = wasm.GetWasmManager().AddOrUpdateWasm(v2Config)
if err != nil {
config.PluginName = pluginName
addWatchFile(config, factory)
continue
}

addWatchFile(config, factory)
} else {
pluginName = config.FromWasmPlugin
}
config.PluginName = pluginName

// PluginWrapper 在上面的 AddOrUpdateWasm 中对插件及配置进行封装完成初始化,这里根据插件名从 sync.Map 拿出,以管理并注册 PluginHandler
pw := wasm.GetWasmManager().GetWasmPluginWrapperByName(pluginName)
if pw == nil {
return nil, errors.New("plugin not found")
}

config.VmConfig = pw.GetConfig().VmConfig
factory.config = append(factory.config, config)

wasmPlugin := &WasmPlugin{
pluginName: config.PluginName,
plugin: pw.GetPlugin(),
rootContextID: config.RootContextID,
config: config,
}
factory.plugins[config.PluginName] = wasmPlugin
// 注册 PluginHandler,以对插件的生命周期提供扩展回调能力,例如插件启动 OnPluginStart、更新 OnConfigUpdate。下接4
pw.RegisterPluginHandler(factory)
}

return factory, nil
}

3、对应图1 WASM 框架中 VM 部分,NewWasmPlugin 用来创建初始化 WASM 插件,其中 VM、Module 和 Instance 分别对应 WASM 中的虚拟机、模块和实例,详见如下代码:

func NewWasmPlugin(wasmConfig v2.WasmPluginConfig) (types.WasmPlugin, error) {
// check instance num
instanceNum := wasmConfig.InstanceNum
if instanceNum <= 0 {
instanceNum = runtime.NumCPU()
}

wasmConfig.InstanceNum = instanceNum

// 根据配置获取 wasmer 编译和执行引擎
vm := GetWasmEngine(wasmConfig.VmConfig.Engine)
if vm == nil {
log.DefaultLogger.Errorf("[wasm][plugin] NewWasmPlugin fail to get wasm engine: %v", wasmConfig.VmConfig.Engine)
return nil, ErrEngineNotFound
}

// load wasm bytes
var wasmBytes []byte
if wasmConfig.VmConfig.Path != "" {
wasmBytes = loadWasmBytesFromPath(wasmConfig.VmConfig.Path)
} else {
wasmBytes = loadWasmBytesFromUrl(wasmConfig.VmConfig.Url)
}

if len(wasmBytes) == 0 {
log.DefaultLogger.Errorf("[wasm][plugin] NewWasmPlugin fail to load wasm bytes, config: %v", wasmConfig)
return nil, ErrWasmBytesLoad
}

md5Bytes := md5.Sum(wasmBytes)
newMd5 := hex.EncodeToString(md5Bytes[:])
if wasmConfig.VmConfig.Md5 == "" {
wasmConfig.VmConfig.Md5 = newMd5
} else if newMd5 != wasmConfig.VmConfig.Md5 {
log.DefaultLogger.Errorf("[wasm][plugin] NewWasmPlugin the hash(MD5) of wasm bytes is incorrect, config: %v, real hash: %s",
wasmConfig, newMd5)
return nil, ErrWasmBytesIncorrect
}

// 创建 WASM 模块,WASM 模块是已被编译的无状态二进制代码
module := vm.NewModule(wasmBytes)
if module == nil {
log.DefaultLogger.Errorf("[wasm][plugin] NewWasmPlugin fail to create module, config: %v", wasmConfig)
return nil, ErrModuleCreate
}

plugin := &wasmPluginImpl{
config: wasmConfig,
vm: vm,
wasmBytes: wasmBytes,
module: module,
}

plugin.SetCpuLimit(wasmConfig.VmConfig.Cpu)
plugin.SetMemLimit(wasmConfig.VmConfig.Mem)

// 创建包含模块和运行时状态的实例,值得关注的是,这里最终会调用 proxywasm.RegisterImports 注册用户实现的 Imports 函数,比如示例中的 proxy_invoke_service 和 proxy_get_state
actual := plugin.EnsureInstanceNum(wasmConfig.InstanceNum)
if actual == 0 {
log.DefaultLogger.Errorf("[wasm][plugin] NewWasmPlugin fail to ensure instance num, want: %v got 0", instanceNum)
return nil, ErrInstanceCreate
}

return plugin, nil
}

4、 对应图1 WASM 框架 中的 ABI 部分,OnPluginStart 方法中会调用 proxy-wasm-go-host 的对应方法对 ABI 的 Exports 和 Imports 等进行相关设置。

// Execute the plugin of FilterConfigFactory
func (f *FilterConfigFactory) OnPluginStart(plugin types.WasmPlugin) {
plugin.Exec(func(instance types.WasmInstance) bool {
wasmPlugin, ok := f.plugins[plugin.PluginName()]
if !ok {
log.DefaultLogger.Errorf("[proxywasm][factory] createProxyWasmFilterFactory fail to get wasm plugin, PluginName: %s",
plugin.PluginName())
return true
}

// 获取 proxy_abi_version_0_2_0 版本的与 WASM 交互的 API
a := abi.GetABI(instance, AbiV2)
a.SetABIImports(f)
exports := a.GetABIExports().(Exports)
f.LayottoHandler.Instance = instance

instance.Lock(a)
defer instance.Unlock()

// 使用 exports 函数 proxy_get_id(对应到 WASM 插件中 GetID 函数)获取 WASM 的 ID
id, err := exports.ProxyGetID()
if err != nil {
log.DefaultLogger.Errorf("[proxywasm][factory] createProxyWasmFilterFactory fail to get wasm id, PluginName: %s, err: %v",
plugin.PluginName(), err)
return true
}
// 把ID 和 对应的插件注册到路由中,即可通过 http Header 中的键值对进行路由,比如 'id:id_1' 就会根据 id_1 路由到上面的 Function1
f.router.RegisterRoute(id, wasmPlugin)

// 当第一个插件使用给定的根 ID 加载时通过 proxy_on_context_create 创建根上下文,并在虚拟机的整个生命周期中持续存在,直到 proxy_on_delete 删除
// 值得注意的是这里说的第一个插件指的是多个松散绑定的插件(通过 SDK 使用 Root ID 对 Root Context 访问)在同一已配置虚拟机内共享数据的使用场景 [4]
err = exports.ProxyOnContextCreate(f.RootContextID, 0)
if err != nil {
log.DefaultLogger.Errorf("[proxywasm][factory] OnPluginStart fail to create root context id, err: %v", err)
return true
}

vmConfigSize := 0
if vmConfigBytes := wasmPlugin.GetVmConfig(); vmConfigBytes != nil {
vmConfigSize = vmConfigBytes.Len()
}

// VM 伴随启动的插件启动时调用
_, err = exports.ProxyOnVmStart(f.RootContextID, int32(vmConfigSize))
if err != nil {
log.DefaultLogger.Errorf("[proxywasm][factory] OnPluginStart fail to create root context id, err: %v", err)
return true
}

pluginConfigSize := 0
if pluginConfigBytes := wasmPlugin.GetPluginConfig(); pluginConfigBytes != nil {
pluginConfigSize = pluginConfigBytes.Len()
}

// 当插件加载或重新加载其配置时调用
_, err = exports.ProxyOnConfigure(f.RootContextID, int32(pluginConfigSize))
if err != nil {
log.DefaultLogger.Errorf("[proxywasm][factory] OnPluginStart fail to create root context id, err: %v", err)
return true
}

return true
})
}

工作流程

Layotto 中 WASM 的工作流程大致如下图2 Layotto & Mosn WASM 工作流程所示,其中配置更新在上述初始化环节基本已囊括,这里重点看一下请求处理流程。 mosn_wasm_ext_framework_workflow

图2 Layotto & Mosn WASM 工作流程

1、由 Layotto 底层 Mosn 收到请求,经过 workpool 调度,在 proxy downstream 中按照配置依次执行 StreamFilterChain 到 Wasm StreamFilter 的 OnReceive 方法,具体逻辑详见如下代码:

func (f *Filter) OnReceive(ctx context.Context, headers api.HeaderMap, buf buffer.IoBuffer, trailers api.HeaderMap) api.StreamFilterStatus {
// 获取 WASM 插件的 id
id, ok := headers.Get("id")
if !ok {
log.DefaultLogger.Errorf("[proxywasm][filter] OnReceive call ProxyOnRequestHeaders no id in headers")
return api.StreamFilterStop
}

// 从 router 中根据 id 获取对应的 WASM 插件
wasmPlugin, err := f.router.GetRandomPluginByID(id)
if err != nil {
log.DefaultLogger.Errorf("[proxywasm][filter] OnReceive call ProxyOnRequestHeaders id, err: %v", err)
return api.StreamFilterStop
}
f.pluginUsed = wasmPlugin

plugin := wasmPlugin.plugin
// 获取 WasmInstance 实例
instance := plugin.GetInstance()
f.instance = instance
f.LayottoHandler.Instance = instance

// ABI 包含 导出(Exports)和导入(Imports)两个部分,用户通过这它们与 WASM 扩展插件进行交互
pluginABI := abi.GetABI(instance, AbiV2)
if pluginABI == nil {
log.DefaultLogger.Errorf("[proxywasm][filter] OnReceive fail to get instance abi")
plugin.ReleaseInstance(instance)
return api.StreamFilterStop
}
// 设置导入 Imports 部分,导入部分由用户提供,虚拟机的执行需要依赖宿主机 Layotto 提供的部分能力,例如获取请求信息,这些能力通过导入部分由用户提供,并由 WASM 扩展调用
pluginABI.SetABIImports(f)

// 导出 Exports 部分由 WASM 插件提供,用户可直接调用——唤醒 WASM 虚拟机,并在虚拟机中执行对应的 WASM 插件代码
exports := pluginABI.GetABIExports().(Exports)
f.exports = exports

instance.Lock(pluginABI)
defer instance.Unlock()

// 根据 rootContextID 和 contextID 创建当前插件上下文
err = exports.ProxyOnContextCreate(f.contextID, wasmPlugin.rootContextID)
if err != nil {
log.DefaultLogger.Errorf("[proxywasm][filter] NewFilter fail to create context id: %v, rootContextID: %v, err: %v",
f.contextID, wasmPlugin.rootContextID, err)
return api.StreamFilterStop
}

endOfStream := 1
if (buf != nil && buf.Len() > 0) || trailers != nil {
endOfStream = 0
}

// 调用 proxy-wasm-go-host,编码请求头为规范指定的格式
action, err := exports.ProxyOnRequestHeaders(f.contextID, int32(headerMapSize(headers)), int32(endOfStream))
if err != nil || action != proxywasm.ActionContinue {
log.DefaultLogger.Errorf("[proxywasm][filter] OnReceive call ProxyOnRequestHeaders err: %v", err)
return api.StreamFilterStop
}

endOfStream = 1
if trailers != nil {
endOfStream = 0
}

if buf == nil {
arg, _ := variable.GetString(ctx, types.VarHttpRequestArg)
f.requestBuffer = buffer.NewIoBufferString(arg)
} else {
f.requestBuffer = buf
}

if f.requestBuffer != nil && f.requestBuffer.Len() > 0 {
// 调用 proxy-wasm-go-host,编码请求体为规范指定的格式
action, err = exports.ProxyOnRequestBody(f.contextID, int32(f.requestBuffer.Len()), int32(endOfStream))
if err != nil || action != proxywasm.ActionContinue {
log.DefaultLogger.Errorf("[proxywasm][filter] OnReceive call ProxyOnRequestBody err: %v", err)
return api.StreamFilterStop
}
}

if trailers != nil {
// 调用 proxy-wasm-go-host,编码请求尾为规范指定的格式
action, err = exports.ProxyOnRequestTrailers(f.contextID, int32(headerMapSize(trailers)))
if err != nil || action != proxywasm.ActionContinue {
log.DefaultLogger.Errorf("[proxywasm][filter] OnReceive call ProxyOnRequestTrailers err: %v", err)
return api.StreamFilterStop
}
}

return api.StreamFilterContinue
}

2、proxy-wasm-go-host 将 Mosn 请求三元组编码成规范指定的格式,并调用Proxy-Wasm ABI 规范中的 proxy_on_request_headers 等对应接口,调用 WASMER 虚拟机将请求信息传至 WASM 插件。

func (a *ABIContext) CallWasmFunction(funcName string, args ...interface{}) (interface{}, Action, error) {
ff, err := a.Instance.GetExportsFunc(funcName)
if err != nil {
return nil, ActionContinue, err
}

// 调用 wasmer 虚拟机(github.com/wasmerio/wasmer-go/wasmer.(*Function).Call at function.go)
res, err := ff.Call(args...)
if err != nil {
a.Instance.HandleError(err)
return nil, ActionContinue, err
}

// if we have sync call, e.g. HttpCall, then unlock the wasm instance and wait until it resp
action := a.Imports.Wait()

return res, action, nil
}

3、WASMER 虚拟机经过处理调用 WASM 插件的具体函数,比如例子中的 OnHttpRequestBody 函数 // function, := instance.Exports.GetFunction("exported_function") // nativeFunction = function.Native() // = nativeFunction(1, 2, 3) // Native 会将 Function 转换为可以调用的原生 Go 函数

func (self *Function) Native() NativeFunction {
...
self.lazyNative = func(receivedParameters ...interface{}) (interface{}, error) {
numberOfReceivedParameters := len(receivedParameters)
numberOfExpectedParameters := len(expectedParameters)
...
results := C.wasm_val_vec_t{}
C.wasm_val_vec_new_uninitialized(&results, C.size_t(len(ty.Results())))
defer C.wasm_val_vec_delete(&results)

arguments := C.wasm_val_vec_t{}
defer C.wasm_val_vec_delete(&arguments)

if numberOfReceivedParameters > 0 {
C.wasm_val_vec_new(&arguments, C.size_t(numberOfReceivedParameters), (*C.wasm_val_t)(unsafe.Pointer(&allArguments[0])))
}

// 调用 WASM 插件内函数
trap := C.wasm_func_call(self.inner(), &arguments, &results)

runtime.KeepAlive(arguments)
runtime.KeepAlive(results)
...
}

return self.lazyNative
}

4、proxy-wasm-go-sdk 将请求数据从规范格式转换为便于用户使用的格式,然后调用用户扩展代码。proxy-wasm-go-sdk 基于 proxy-wasm/spec 实现,定义了函数访问系统资源及基础设施服务的接口,并在此基础上结合 Runtime API 的思路,增加了对基础设施访问的ABI。

// function1主要逻辑就是接收 HTTP 请求,然后通过 ABI 调用 function2,并返回 function2 结果,具体代码如下所示
func (ctx *httpHeaders) OnHttpRequestBody(bodySize int, endOfStream bool) types.Action {
//1. get request body
body, err := proxywasm.GetHttpRequestBody(0, bodySize)
if err != nil {
proxywasm.LogErrorf("GetHttpRequestBody failed: %v", err)
return types.ActionPause
}

//2. parse request param
bookName, err := getQueryParam(string(body), "name")
if err != nil {
proxywasm.LogErrorf("param not found: %v", err)
return types.ActionPause
}

//3. request function2 through ABI
inventories, err := proxywasm.InvokeService("id_2", "", bookName)
if err != nil {
proxywasm.LogErrorf("invoke service failed: %v", err)
return types.ActionPause
}

//4. return result
proxywasm.AppendHttpResponseBody([]byte("There are " + inventories + " inventories for " + bookName + "."))
return types.ActionContinue
}

5、WASM 插件通过初始化时 RegisterFunc 注册的 ABI Imports 函数,比如例子中 Function1 RPC 调用 Function2 的 ProxyInvokeService,Function2 用以获取 Redis 中指定 Key 的 Valye 的 ProxyGetState,具体代码如下所示:

Function1 通过 ProxyInvokeService 调用 Function2,ProxyInvokeService 对应 Imports 函数 proxy_invoke_service

func ProxyInvokeService(instance common.WasmInstance, idPtr int32, idSize int32, methodPtr int32, methodSize int32, paramPtr int32, paramSize int32, resultPtr int32, resultSize int32) int32 {
id, err := instance.GetMemory(uint64(idPtr), uint64(idSize))
if err != nil {
return WasmResultInvalidMemoryAccess.Int32()
}

method, err := instance.GetMemory(uint64(methodPtr), uint64(methodSize))
if err != nil {
return WasmResultInvalidMemoryAccess.Int32()
}

param, err := instance.GetMemory(uint64(paramPtr), uint64(paramSize))
if err != nil {
return WasmResultInvalidMemoryAccess.Int32()
}

ctx := getImportHandler(instance)

// Laytto rpc calls
ret, res := ctx.InvokeService(string(id), string(method), string(param))
if res != WasmResultOk {
return res.Int32()
}

return copyIntoInstance(instance, ret, resultPtr, resultSize).Int32()
}

Function2 通过 ProxyGetState 获取 Redis 中指定 Key 的 Valye, ProxyGetState 对应 Imports 函数 proxy_get_state

func ProxyGetState(instance common.WasmInstance, storeNamePtr int32, storeNameSize int32, keyPtr int32, keySize int32, valuePtr int32, valueSize int32) int32 {
storeName, err := instance.GetMemory(uint64(storeNamePtr), uint64(storeNameSize))
if err != nil {
return WasmResultInvalidMemoryAccess.Int32()
}

key, err := instance.GetMemory(uint64(keyPtr), uint64(keySize))
if err != nil {
return WasmResultInvalidMemoryAccess.Int32()
}

ctx := getImportHandler(instance)

ret, res := ctx.GetState(string(storeName), string(key))
if res != WasmResultOk {
return res.Int32()
}

return copyIntoInstance(instance, ret, valuePtr, valueSize).Int32()
}

以上 Layotto rpc 流程简要说是通过两个虚拟连接借助 Dapr API 和 底层 Mosn 实现 [5],具体可参见前序文章Layotto源码解析——处理RPC请求,从 Redis 中获取数据可直接阅读 Dapr State 相关代码,在此不一一展开了。

FaaS模式

回过头来再看 WASM 的特性:字节码有与机器码相匹敌的性能;沙箱中执行保证良好的隔离性和安全性;编译后跨平台、易分发和加载运行;具备轻量且多语言开发的灵活性,似乎天然的就适合做 FaaS。

所以 Layotto 也探索支持了 WASM FaaS 模式,即加载并运行以 WASM 为载体的 Function,并支持 Function 之间相互调用及访问基础设施。因加载 WASM 的核心逻辑并未变化,只是使用和部署方式上与上述方式有差别,故 Layotto 加载 WASM 部分逻辑不再赘述。

除 Wasm-Proxy 相关实现外,FaaS 模式核心逻辑是通过扩展 Containerd 实现多运行时插件 containerd-shim-layotto-v2 [6],并借此"穿针引线"的巧妙的利用了 Docker 的镜像能力来管理 *.wasm 包和 Kubernetes 优秀的编排能力来调度函数,具体架构和工作流可见图3 Layotto FaaS Workflow。

layotto_faas_workflow

图3 Layotto FaaS Workflow

这里简单看一下 containerd-shim-layotto-v2 的主函数,可以看到 shim.Run 设置的 WASM 的运行时为 io.containerd.layotto.v2,也就是 containerd 中 plugins.cri.containerd.runtimes 对应插件的 runtime_type。当创建 Pod 时,在 yaml 的 spec 中指定 runtimeClassName: layotto,经过调度,最终 kubelet 就会通过 cri-plugin 调用 containerd 中的 containerd-shim-layotto-v2 运行时来进行加载和运行等相关处理。

func main() {
startLayotto()
// 解析输入参数,初始化运行时环境,调用 wasm.New 实例化 service 对象
shim.Run("io.containerd.layotto.v2", wasm.New)
}

func startLayotto() {
conn, err := net.Dial("tcp", "localhost:2045")
if err == nil {
conn.Close()
return
}

cmd := exec.Command("layotto", "start", "-c", "/home/docker/config.json")
cmd.Start()
}

总结

Layotto WebAssemly 虽然涉及较多 WASM 相关的基础知识,但通过示例由浅入深,循序渐进也不难理解。最后整体看一下 WASM 技术,可以看到它已经被应用到Web前端、Serverless、游戏场景、边缘计算、服务网格等很多领域,甚至就连 Docker 之父 Solomon Hykes 在前不久都表示: "如果 WASM 这个技术在2008年就有的话,我就不搞Docker了"(后来又补充道:Docker 不会被替换,会与 WASM 并肩而行),不管怎么说,WASM 似乎在继 VM 和 Container 之后,正在成为更轻量及性能更好的云原生技术而被应用到更多的领域,与此同时,相信在 Mosn 社区的推动以及 Layotto 的继续探索中 WASM 也会有更多使用场景和用户,至此 Layotto WebAssemly 相关源码分析就完了,鉴于时间和篇幅,没有进行一些更全面和深入的剖析,如有纰漏之处,欢迎指正,联系方式:rayo.wangzl@gmail.com

参考资料