跳到主要内容

Ant Cloud Native Apps Exploring and Practice - ArchiSummit

· Read will take 14 min

The introduction of the Mesh model is a key path to the application of clouds and ant groups have achieved mass landings internally.The sinking of more middleware capabilities, such as Message, DB, Cache Mesh and others, will be the future shape of intermediate technology when the app evolves from Mesh.Apps run to help developers construct cloud native apps quickly and to further decouple apps and infrastructure, while the app runs at the core of API standards, the community is expected to build together.

Ant Group Mesh Introduction

Ant is a technology and innovation-driven company, from its earliest days as a payment app on Taobao to its current services As a large company with 1.2 billion users worldwide, Ant's technical architecture evolution will probably be divided into the following stages:

Prior to 2006, the earliest payment was a centralized monolithic application with modular development of different businesses.

In 2007, as more scenes of payments were promoted, an application and data splitting began to be made and some modifications to SOA were made.

After 2010, rapid payments, mobile payments, support for two-eleven and balance jewels have been introduced, and users have reached the level of hundreds of millions, and the number of ant applications has grown, and ants have developed many full sets of microservice middleware to support ant operations;

In 2014, like the advent of more business formalities like rush flow, online payments and more scenes, higher requirements for ant availability and stability, ants supported LDC moderation in microservice intermediation, off-site support for business support, and elasticity scaling-up in mixed clouds that support bi-11 ultra-mass traffic.

In 2020, ant business was not only digital finance, but also the emergence of new strategies such as digital life and internationalization, which prompted us to have a more efficient technical structure that would allow the operation to run faster and more steadily, so ant ants were able to internalize a more popular concept of cloud-origin in the industry.

The technical structure of ant can also be seen to evolve along with the business innovations of the company from centralization to SOA to microservices, believing that the classmates with microservices are well known and that the practice of microservices to clouds has been explored by ants themselves in recent years.

Why to introduce Service Mesh

Since ant has a complete set of microservice governance intermediaries, why do you need to introduce Service Mesh?

The service framework for ant self-research is SOFARPC as an example of a powerful SDK that includes a range of capabilities such as discovery of services, routing, melting out streams, etc.In a basic SOFA(Javaa) app, business code integrates SOFARP's SDK, both running in a process.After the large scale of sunk microservice, we faced some of the following problems with:

Upgrade cost:SDK requires business code introduction. Each upgrade requires a change code to be published.Because of the large scale of applications, some major technological changes or safety problems are being repaired.It takes thousands of apps to upgrade each time it takes time. Version Fragment:is highly fragmented, due to the high cost of upgrades, which makes it difficult for us to use historical logic when writing our code and to evolve across technology. Cross-language is unmanageable:ant online applications mostly use Java as a technical stack, but there are many cross-language applications in the front office, AI, Big Data, for example C++/Python/Golang etc. Their service governance capacity is missing due to SDK without a corresponding language.

We note that some concepts of Service Mesh in the cloud are beginning to emerge, so we are beginning to explore this direction.In the concept of Service Mesh, there are two concepts, one Control Plane Control and one Data Plane Dataplane.The core idea of the data plane is to decouple and to abstract some of the unconnected and complex logic (such as service discovery in RPC calls, service routing, melting breaks, security) into an independent process.As long as there is no change in the communications agreement between the operational and independent processes, the evolution of these capabilities can follow the autonomous upgrading of this independent process and the evolution of the entire Mesh can take place in a unified manner.Our cross-language applications, as long as the traffic passes through our Data Plane, are able to enjoy the capacities related to the governance of the services just mentioned, and the application of infrastructure capabilities to the bottom is transparent and truly cloud.

Ant Mesh landing process

So, starting at the end of 2017, ant began to explore the technical direction of Service Mesh and presented a vision of a unified infrastructure with a sense of business upgrade.The main milestone is:

The Technology Advance Research Service Mesh technology was launched at the end of 2017 and set the direction for the future;

Beginning in early 2018 with Golang Self Research Sidecar MOSN and its source, mainly supporting RPC on a two-decimal scale pilot;

2019 New Message Mesh and DB Mesh shape in 618, covering a number of core links and exponentially 618

Two-11 years in 2019, covering hundreds of applications from all high-profile core links, supporting the Big Eleven at that time;

Twenty and eleven years in 2020, more than 80% of online applications are connected to the Mesh system and can be upgraded from capacity development to full capacity for 2 months.

Ant Mesh Landing Architecture

Mesh at ant landing size is about thousands of applications and hundreds of thousands of levels of containers, a scale that falls in industry to a few and two times without a previous path to learn, so as ant arrives in a complete system of research and development delivery to support the mesh of ants as he arrives.

Ant Mesh structure is probably our control plane, as shown in the graph, and the service end of the service governance centre, PaaS, monitoring centre, etc. are deployed as some of the existing products.There are also our transport systems, including R&D platforms and PaaS platforms.The middle is our main player data plane MOSN, which manages RPC, messages, MVC, Tasks four streams, as well as basic capabilities for health screening, monitoring, configuration, security, and technical risks, and MOSN blocks some interaction between operations and basic platforms.DBMesh is an independent product in the ant and is not drawn in the graph.Then the top tier is some of our applications that currently support access to many languages such as Java, Nodejs. For applications, while infrastructure decoupling, access will require an additional upgrade cost, so in order to promote access to the app, ant makes the entire research and development delivery process, including by making the simplest access to the existing framework, by pushing forward in batches to manage risks and progress, and by allowing new applications default access to Mesh to do so.

At the same time, as sincerity grows, each of the capacities faced some problems of collaboration in R&D, and even of mutual impact on performance and stability, so that for the development effectiveness of the Mesh itself, we have made improvements in modular isolation, dynamic plugging of new capacities, automatic regression, and so on, which can be completed within two months from development to roll-out across the site.

Explore on Cloud Native Apps Run

New issues and reflections on mass backwardness

Ant Mesh has now encountered some new problems with: cross-language SDK maintenance master:Canada RPC examples. Most of the logic is already sinking into MOSN, but there is still some communication decoding protocol logic in Java, this SDK has some maintenance costs, how many lightweight SDKs, how many languages a team cannot have research and development in all languages. The quality of the Institute's code in this lightweight SDK is a problem.

A part of the application of the new:ant in business compatible with different environments is deployed both inside the ant and externally exported to financial institutions.When they are deployed to ant the control face of the ant and when the bank is received, the control of the bank is already in place.Most of the applications now contain a layer of their code and temporarily support the next when they meet unsupported components.

The earliest scenes from Service Mesh to Multi-Mesh:ant are Service Mesh, MOSN intercept traffic through network connecting agents, and other intermediates interact with the server through the original SDK.Now MOSN is more than a Service Mosh, but multi-Mesh, because, with the exception of RPC, we have supported more mesh Mesh landing sites, including messages, configurations, caches, etc.Each sinking intermediate can be seen, and almost all have a lightweight SDK on the side of the app, which, in the context of the first issue just a moment ago, finds a very large amount of lightweight SDK that needs to be maintained.In order to keep the features do not interact with each other, each feature opens different ports, calls with MOSN via different protocol.e.g. RPC protocol for RPC, MQ protocol for messages, cached Redis protocol.Then the current MOSN is more than just a flow orientation. For example, the configuration is to expose the API to use business code.

To solve the problems and scenes we are thinking about the following points:

Can the SDK be styled in different intermediaries, languages and languages?

Can interoperability protocols be unified?

  1. Do we sink under our intermediate part to components or capabilities?

Can the implementation of the bottom be replaced?

Ant Cloud Native Apps Runtime Structure

Beginning last March, following several rounds of internal discussions and research into new ideas in industry, we introduced a concept of “cloud native apps” (hereinafter referred to as running on).By definition, we want this operation to include all distributive capabilities that the app cares for, help developers build your cloud native apps quickly, help apps and infrastructure to decouple more!

The core points of runtime design for cloud-native applications are as follows:

**First **, due to experience of MOSN sizing and associated shipping systems, we decided to build up our cloud native app on the basis of MOSN kernel.

**Second **, Abilities instead of Component Orientation, define the APIs for this running time.

Third, the interaction between business code and the Runtime API uses a uniform gRPC protocol so that the side of the business can generate a client directly and directly call through proto file.

Four's component implementation after ability is replacable, for example, registration service provider may be SOFARegistry, or Nacos or Zookeper.

Running abstract capabilities

To abstract some of the capabilities most needed for cloud apping, we set a few principles:

  1. Follow the APIs and Scenarios required for distributed apps instead of components; 2.APIs are intuitive, used in boxes, and are better than configured; 3.APIs are not bound to implement and differentiate using extension fields.

With this principle, we abstract out the primary API, which is the app for mosn.proto, the appcallback.proto for the app when running, and the relevant actuator.proto for the app when running.For example, RPC calls, messages, read caches, read configurations are all applied to running, while RPC receipts, messages, incoming task schedules, are applied when running. Other control checks, component management, traffic controls are related to running wikes.

Three examples of this proto can be seen at:

Run Component Controls

On the other hand, we have two concepts in MOSN for the purpose of realizing replaceability when running. We call a distribution capability and then have a different component to perform this Service, a service that can be implemented with multiple components, and a component that can deliver multiple services.For example, the example in the graph is that the service with the message "MQ-pub" is implemented by SOFAMQ and Kafka Component, while Kafka Component implements both the message and health check service. When a transaction is actually requested via a gRPC-generated client, the data will be sent to Runtime via the gRPC protocol and distributed to the next specific implementation.In this way, the app needs to use only the same set of API, which can be implemented differently by the parameters in the request or when the configuration is running.

Compare between runtime and Mesh

Based on the above, when the cloud app is running and just just Mesh are easy to compare with:

Scene started research last year while the cloud native app is running. The following scenes are currently falling inside the ant area.

Isomer Technical Stack Access

In the case of ants, applications in different languages, in addition to the need for RPC service governance, messages, etc., the infrastructure capabilities such as the one-size-fits-all intermediate of the ant are desirable and Java and Nodejs have corresponding SDKs, while the other languages are not corresponding SDKs.After the application runs, these isomer languages can be used directly through GRPC Client to the ant infrastructure.

Unbind the manufacturer

As mentioned earlier, ant blockchains, wind control, intelligent support services, financial intermediaries, etc., are scenes where they are deployed on their main stations, where there is either Aliyun or cloud.After running, the app can combine a set of code with a mirror when running. By configuring it to determine which bottom layer of implementation to be called, without being bound to specific implementations.For example, the internal interface between ant is for products such as SOFARegistration and SOFAMQ, and on the cloud is for products such as Nacos, RocketMQ, to Zokeper, Kafka and others.This scenario is in the process of reaching us.Of course, this can also be used for legacy system governance, such as upgrading from SOFAMQ 1.0 to SOFAMQ 2.0, and then running apps need not be upgraded.

**FaaS Cold Pool Pool **

FaaS Cool is also a recent scene we are exploring and you know that the Function in FaaS needs to go from Pod creation to Download Function to Start, a process that will be lengthy.After running time, we can create Pod in advance and start up good running. Wait a very simple app logic when the app starts. Test it can be shortened from 5s to 1s.We will continue to explore this direction as well.

Planning and outlook

API

The most important part of the running time is the definition of the API. We already have a more complete set of APIs for the sake of getting inside, but we also see that many products in industry have similar demands, such as dapr, envoy, etc.So one of the next things we will do is to bring together communities to launch a set of recognized cloud native API.

Continuous Open Source

We will also develop our internal running practice in the near future, with a release of 0.1 in May and June, and keep a small monthly release pace, aiming to publish 1.0 by the end of the year.

Summary

Last Summary:

1.Service Mesh mode introduction is a key path to the application of the cloud;

Any mesh that allows Mesh to be generated, but the problem of R&D efficiency remains partially present;

3.Mesh Large-scale landfall is a matter of engineering and requires a complete suite of systems;

  1. Cloud native applications will be the future shape of basic technologies such as intermediaries, further decoupling and distributive capabilities;

The cloud native app runs at the heart of the API, and the community is expected to build one standard together.

Extend Reading

Source Parse 4 Layer Traffic Governance, tcp traffic dump

· Read will take 4 min

Author profile: Giggon, is an open source community lover committed to embracing open sources.

Writing on: April 26, 2022

Overview

The purpose of this document is to analyze the implementation of tcp traffic dump

Prerequisite:

Document content refers to the following version of the code

https://github.com/mosn/layotto

Layotto 0e97e97e970dc504e0298017bd956d2841c44c0810b (main)

Source analysis

Code in: tcpcopy CODE

model.go analysis

This is the core class of tcpcopy's configuration objects

Type DumpConfig struct {-
Switch `json:"twitch"` // dump switch. Values:'ON' or 'OFF'
Interval int `json:"interval" //dump sampling interval Unit: Second
Duration int `json:"duration"// Single Sampling Cycle Unit: Second
CpuMaxate float64 `json:"cpu_max_rate"\/ cpu Maximum usage The ump feature will stop
MemMaxRate float64 `json:"mem_max_rate"` // mem maximum usage. When this threshold is exceeded, The ump feature will stop
}

Type DumpUpadDynamic Architect 6
Unique_sample_windowing string// Specify sample window
BusinessType _type. usinessType // Business Type
Port string // Port
Binary_flow_data []byte// binary data
Portrait_data string // User uploaded data
}

persistence.go analysis

This is the dump persistent core processing class of tcpcopy

// This method is called in OnData in tcpcopy.go
func IsPersistence() bool {
// 判断 dump 开关是否开启
if !strategy.DumpSwitch {
if log.DefaultLogger.GetLogLevel() >= log.DEBUG {
log.DefaultLogger.Debugf("%s the dump switch is %t", model.LogDumpKey, strategy.DumpSwitch)
}
return false
}

// Check whether it is in the sampling window
if atomic.LoadInt32(&strategy.DumpSampleFlag) == 0 {
if log.DefaultLogger.GetLogLevel() >= log.DEBUG {
log.DefaultLogger.Debugf("%s the dump sample flag is %d", model.LogDumpKey, strategy.DumpSampleFlag)
}
return false
}

// Check whether the dump function is stopped. Obtain the system load and check whether the processor and memory exceeds the threshold of the tcpcopy. If yes, stop the dump function.
if !strategy.IsAvaliable() {
if log.DefaultLogger.GetLogLevel() >= log.DEBUG {
log.DefaultLogger.Debugf("%s the system usages are beyond max rate.", model.LogDumpKey)
}
return false
}

return true
}

// Persist data based on configuration information
func persistence(config *model.DumpUploadDynamicConfig) {
// 1.Persisting binary data
if config.Binary_flow_data != nil && config.Port != "" {
if GetTcpcopyLogger().GetLogLevel() >= log.INFO {
GetTcpcopyLogger().Infof("[%s][%s]% x", config.Unique_sample_window, config.Port, config.Binary_flow_data)
}
}
if config.Portrait_data != "" && config.BusinessType != "" {
// 2. Persisting Binary data Persisting user-defined data
if GetPortraitDataLogger().GetLogLevel() >= log.INFO {
GetPortraitDataLogger().Infof("[%s][%s][%s]%s", config.Unique_sample_window, config.BusinessType, config.Port, config.Portrait_data)
}

// 3. Changes in configuration information in incrementally persistent memory
buf, err := configmanager.DumpJSON()
if err != nil {
if log.DefaultLogger.GetLogLevel() >= log.DEBUG {
log.DefaultLogger.Debugf("[dump] Failed to load mosn config mem.")
}
return
}
// 3.1. dump if the data changes
tmpMd5ValueOfMemDump := common.CalculateMd5ForBytes(buf)
memLogger := GetMemLogger()
if tmpMd5ValueOfMemDump != md5ValueOfMemDump ||
(tmpMd5ValueOfMemDump == md5ValueOfMemDump && common.GetFileSize(getMemConfDumpFilePath()) <= 0) {
md5ValueOfMemDump = tmpMd5ValueOfMemDump
if memLogger.GetLogLevel() >= log.INFO {
memLogger.Infof("[%s]%s", config.Unique_sample_window, buf)
}
} else {
if memLogger.GetLogLevel() >= log.INFO {
memLogger.Infof("[%s]%+v", config.Unique_sample_window, incrementLog)
}
}
}
}

tcpcopy.go analysis

This is the core class of tcpcopy.

// Sign up to NetWork 
func init() with MFA
api. egisterNetwork("tcpcopy", CreateTccopyFactory)
}

// returns tcpcopy Factory
func CreateTccopyFactory(cfg map[string]interface{}) (api. etworkFilterChainFactory, error) LO
tcpConfig := &config{}
// dump policy transition to static configuration
if stg, ok := cfg["strategy"]; ok {
...
}
//TODO excerpt some other fields
return &tcpcopyFactoryLU
cfg: tcpConfig,
}, nil
}

// for pkg/configmanager/parser. o Call to add or update Network filter factory
func (f *tcpcopyFactory) Init(param interface{}) error error 56
// Set listening address and port configuration
...
return nil
}

// implements the OnData Interface of ReadFilter, processing
func (f *tcpcopyFactory) OnData(data types.IoBuffer) (res api. ilterStatus) online
// Determines whether the current requested data requires sampling dump
if !persiste.Isistence() {
return api.Continue
}

// Asynchronous sample dump
config := model.NewDumpUpadDynamic Config(strategy. umpSampleUuid, "", f.cfg.port, data.Bytes(), "")
persistence.GetDumpWorkPoolInstance().Schedule(config)
return api.Continue
}

Finally, we look back at the overall process progress:

  1. Starting from the initialization function init() of tccopy.go to CreateGRPCServerFilterFactory Incoming CreateTcpcopyFactory.

  2. Mosn created a filter chain (code positionfactory.go) by circulating CreateFilterChain to add all filters to the chain structure, including tccopy.

  3. When the traffic passes through mosn will enter the tcpcopy.go OnData method for tcpcopump logical processing.

MOSN subproject Layotto:opens the service grid + new chapter when app runs

· Read will take 21 min

Author profile: Magnetic Army. Fancy is an ancient one, cultivating for many years in the infrastructure domain, with in-depth practical experience of Service Mosh, and currently responsible for the development of projects such as MOSN, Layotto and others in the middle group of ant groups. Layotto official GitHub address: https://github.com/mosn/layotto

Click on a link to view the live video:https://www.bilibili.com/video/BV1hq4y1L7FY/

Service Mesh is already very popular in the area of microservices, and a growing number of companies are starting to fall inside, and ants have been investing heavily in this direction from the very beginning of Service Mesh programme. So far, the internal Mesh programme has covered thousands of applications, hundreds of thousands of containers and has been tested many times, the decoupling of business coupling brought about by Service Mosh, smooth upgrades and other advantages have greatly increased iterative efficiency in intermediaries.

We have encountered new problems after mass landings, and this paper focuses on a review of service Mesh's internal landings and on sharing solutions to new problems encountered after service Mesh landing.

Service Mesh Review and Summary

Instrument for standardized international reporting of military expenditures

Under the microservice architecture, infrastructure team typically provides a SDK that encapsulates the ability to govern the various services, while ensuring the proper functioning of the application, it is also clear that each infrastructure team iterates a new feature that requires the involvement of the business party to use it, especially in the bug version of the framework, often requiring a forceful upgrade of the business side, where every member of the infrastructure team has a deep sense of pain.

The difficulties associated with upgrading are compounded by the very different versions of the SDK versions used by the application and the fact that the production environment runs in various versions of the SDK, which in turn makes it necessary to consider compatibility for the iterations of new functions as if they go ahead with the shacks, so that the maintenance of the code is very difficult and some ancestral logic becomes uncareful.

The development pattern of the “heavy” SDKs makes the governance of the isomer language very weak and the cost of providing a functionally complete and continuously iterative SDK for all programming languages is imaginable.

In 18 years, Service Mesh continued to explode in the country, a framework concept designed to decouple service governance capacity with business and allow them to interact through process-level communications.Under this architecture model, service governance capacity is isolated from the application and operated in independent processes, iterative upgrading is unrelated to business processes, which allows for rapid iterations of service governance capacity, and each version can be fully upgraded because of the low cost of upgrading, which has addressed the historical burden and the SDK “light” directly reduces the governance threshold for isomer languages and no longer suffers from the SDK that needs to develop the same service governance capability for each language.

Current status of service Mesh landings

蚂蚁很快意识到了 Service Mesh 的价值,全力投入到这个方向,用 Go 语言开发了 MOSN 这样可以对标 envoy 的优秀数据面,全权负责服务路由,负载均衡,熔断限流等能力的建设,大大加快了公司内部落地 Service Mesh 的进度。

Now that MOSN has overwritten thousands of apps and hundreds of thousands of containers inside ant ants, newly created apps have default access to MOSN to form closers.And MOSN handed over a satisfactory: in terms of resource occupancy and loss of performance that is of greatest concern to all.

  1. RT is less than 0.2 ms

  2. Increase CPU usage by 0% to 2%

  3. Memory consumption growth less than 15M

The technical stack of the NodeJS, C+++ isomers is also continuously connected to MOSN due to the Service Mesh service management thresholds that lower the isomer language.

After seeing the huge gains from RPC capacity Mih, internal ants also transformed MQ, Cache, Config and other middleware capabilities, sinking to MOSN, improving the iterative efficiency of the intermediate product as a whole.

C. New challenges

  1. Apply strong binding to infrastructure

A modern distributed application often relies on RPC, Cache, MQ, Config and other distributed capabilities to complete the processing of business logic.

When RPC was initially seen, other capabilities were quickly sinking.Initially, they were developed in the most familiar way, leading to a lack of integrated planning management, as shown in the graph above, which relied on SDKs of a variety of infrastructure, and in which SDK interacted with MOSN in a unique way, often using private agreements provided by the original infrastructure, which led directly to a complex intermediate capability, but in essence the application was tied to the infrastructure, such as the need to upgrade the SDK from Redis to Memcache, which was more pronounced in the larger trend of the application cloud, assuming that if an application was to be deployed on the cloud, because the application relied on a variety of infrastructures, it would be necessary to move the entire infrastructure to the cloud before the application could be successfully deployed. So how to untie the application to the infrastructure so that it can be transplantable and that it can feel free to deploy across the platform is our first problem.

  1. Isomal language connectivity

It has been proved that Service Mesh does reduce the access threshold of heterogeneous languages, but after more and more basic capabilities sink to MOSN, we gradually realized that in order to allow applications to interact with MOSN, various SDKS need to develop communication protocols and serialization protocols. If you add in the need to provide the same functionality for a variety of heterogeneous languages, the difficulty of maintenance increases exponentially

Service Mesh has made the SDK historic, but for the current scenario of programming languages and applications with strong infrastructural dependence, we find that the existing SDK is not thin enough, that the threshold for access to the isomer language is not low enough and that the threshold for further lowering the isomer language is the second problem we face.

Multi Runtime Theory Overview

A, what is Runtime?

In the early 20th century, Bilgin lbryam published a paper called Multi-Runtime Microservices Architecture This article discusses the shape of the next phase of microservices architecture.

As shown in the graph above, the author abstracts the demand for distributed services and is divided into four chaos:

  1. Life Cycle (Lifecycle) mainly refers to compilation, packing, deployment and so forth, and is largely contracted by docker and kubernetes in the broad cloud of origins.

  2. Network (Networking) A reliable network is the basic guarantee of communication between microservices, and Service Mesh is trying to do so and the stability and usefulness of the current popular data face of MOSN and envoy have been fully tested.

  3. The status (State) services that are required for distribution systems, workflow, distribution single, dispatching, power equivalent, state error restoration, caching, etc. can be uniformly classified as bottom status management.

  4. Binding (Binding) requires not only communication with other systems but also integration of various external systems in distributed systems, and therefore has strong reliance on protocol conversion, multiple interactive models, error recovery processes, etc.

After the need has been clarified, drawing on the ideas of Service Mesh, the author has summarized the evolution of the distributed services architecture as: below.

Phase I is to decouple infrastructure capabilities from the application and to convert them into an independent residecar model that runs with the application.

The second stage is to unify the capabilities offered by the sidecar into a single settlement run from the development of the basic component to the development of the various distributive capabilities to the development of the various distributive capacities, completely block the details of the substrate and, as a result of the ability orientation of the API, the application no longer needs to rely on SDK from a wide range of infrastructures, except for the deployment of the APIs that provide the capabilities.

The author's thinking is consistent with what we want to resolve, and we have decided to use the Runtime concept to solve the new problems that Service Mesh has encountered to date.

B, Service Mesh vs Runtime

In order to create a clearer understanding of Runtime, a summary of Service Mesh with regard to the positioning, interaction, communication protocols and capacity richness of the two concepts of Runtime is shown, as can be seen from Service Mosh, when Runtime provides a clearly defined and capable API, making the application more straightforward to interact with it.

MOSN sub-project Layotto

A, dapr research

dapr is a well-known Runtime product in the community and has a high level of activity, so we first looked at the dapr case, finding that the dapr has the following advantage of:

  1. A variety of distributive capabilities are provided, and the API is clearly defined and generally meets the general usage scenario.

  2. Different delivery components are provided for each capability, essentially covering commonly used intermediate products that can be freely chosen by users as needed.

When considering how to set up a dapr within a company, we propose two options, such as the chart: above

  1. Replace:with the current MOSN and replace with the dapr. There are two problems with:

Dapr does not currently have the full range of service governance capabilities included in Service Mesh although it provides many distributive capabilities.

b. MOSN has fallen on a large scale within the company and has been tested on numerous occasions with the direct replacement of MOSN stability by a dapr.

  1. In:, add a dapr container that will be deployed with MOSN in two sidecar mode.This option also has two problems with:

The introduction of a new sidecar will require consideration of upgrading, monitoring, infusion and so forth, and the cost of transport will soar.

b. The increased maintenance of a container implies an additional risk of being hacked and this reduces the availability of the current system.

Similarly, if you are currently using envoy as a data face, you will also face the above problems. We therefore wish to combine Runtime with Service Mesh and deploy through a full sidecar to maximize the use of existing MSh capabilities while ensuring stability and the constant cost of delivery.In addition, we hope that, in addition to being associated with MOSN, the capacity of the RPF will be combined in the future with envoy to solve the problems in more scenarios, in which Layotto was born.

Layout B & Layout

As shown in the above chart, Layotto is above all over the infrastructure and provides a standard API for upper-tier applications with a uniform range of distributive capabilities.For Layotto applications, developers no longer need to care for differences in the implementation of substrate components, but just what competencies the app needs and then call on the adaptive API, which can be completely untied to the underlying infrastructure.

For applications, interaction is divided into two blocks, one as a standard API for GRPC Clients calling Layotto and another as a GRPC Server to implement the Layotto callback and benefit from the gRPC excellent cross-language support capability, which no longer requires attention to communications, serialization, etc., and further reduces the threshold for the use of the technical stack of isomers.

In addition to its application-oriented, Layotto also provides a unified interface to the platform that feeds the app along with the sidecar state of operation, facilitates SRE peer learning to understand the state of the app and make different initiatives for different states, taking into account existing platform integration with k8s and so we provide access to HTTP protocol.

In addition to Layotto itself design, the project involves two standardized constructions, firstly to develop a set of terminological clocks; the application of a broad range of APIs is not an easy task. We have worked with the Ari and Dapr communities in the hope that the building of the Runtime API will be advanced, and secondly for the components of the capabilities already achieved in the dapr community, our principle is to reuse, redevelop and minimize wasting efforts over existing components and repeat rotations.

In the end, Layotto is now built over MOSN, we would like Layotto to be able to run on envoy, so that you can increase Runtime capacity as long as you use Service Mesh, regardless of whether the data face is used by MOSN or envoy.

C, Layotto transplantation

As shown in the graph above, once the standardisation of the Runtime API is completed, access to Layotto applications is naturally portable, applications can be deployed on private clouds and various public clouds without any modification, and since standard API is used, applications can be freely switched between Layotto and dapr without any modification.

Meaning of name

As can be seen from the above schematic chart, the Layotto project itself is intended to block the details of the infrastructure and to provide a variety of distributive capabilities to the upper level of application. This approach is as if it adds a layer of abstraction between the application and the infrastructure, so we draw on the OSI approach to defining a seven-tiered model of the network and want Layot to serve the eighth tier of the application, to be 8 in Italian, Layer otto is meant to simplify to become Layotto, along with Project Code L8, which is also the eighth tier and is the source of inspiration for

An overview of the completion of the project is presented below, with details of the achievement of four of its main functions.

E. Configuration of original language

First is the configuration function commonly used in distributed systems, applications generally use the configuration center to switch or dynamically adjust the running state of the application.The implementation of the configuration module in Layotto consists of two parts. One is a reflection on how to define the API for this capability, and one is a specific implementation, each of which is seen below.

It is not easy to define a configuration API that meets most of the actual production demands. Dapr currently lacks this capability, so we worked with Ali and the Dapr community to engage in intense discussions on how to define a version of a reasonable configuration API.

As the outcome of the discussions has not yet been finalized, Layotto is therefore based on the first version of the draft we have submitted to the community, and a brief description of our draft is provided below.

We first defined the basic element: for general configuration

  1. appId:indicates which app the configuration belongs to

  2. Key configured for key:

  3. Value of content:configuration

  4. group:configurations are configured. If an appId is too many configurations, we can group these configurations for maintenance.

In addition, we added two advanced features to suit more complex configurations using Scene:

  1. label, used to label configurations, such as where the configuration belongs, and when conducting configuration queries, we'll use label + key to query configuration.

  2. tags, users give configuration additional information such as description, creator information, final modification time, etc. to facilitate configuration management, audits, etc.

For the specific implementation of the configuration API as defined above, we currently support query, subscription, delete, create, and modify five kinds of actions in which subscriptions to configuration changes use the stream feature of GRPC and the components where the configuration capacity is implemented at the bottom, we have selected the domestically popular apollo and will add others later depending on demand.

F: Pub/Sub

for Pub/Sub capabilities, we have explored the current implementation of dapr and have found that we have largely met our needs, so we have directly reintroduced the Dapr API and components that have been suitably matched in Layotto, which has saved us a great deal of duplication and we would like to maintain a collaborative approach with the dapr community rather than repeat the rotation.

Pub is an event interface provided by the App calls Layotto and the Sub function is one that implements ListTopicSubscriptions with OnTopicEvent in the form of a gRPC Server, one that tells Layotto apps that need to subscribe to which topics, and a callback event for Layotto receive a change in top.

Dapr for the definition of Pub/Sub basically meets our needs, but there are still shortfalls in some scenarios, dapr uses CloudEvent standards, so the pub interface does not return value, which does not meet the need in our production scenes to require pub messages to return to the messageID that we have already submitted the needs to dapr communities, and we are waiting for feedback, taking into account mechanisms for community asynchronous collaboration, we may first increase the results and then explore with the community a better compatibility programme.

G and RPC original

The capacity of RPC is not unfamiliar and may be the most basic needs under the microservice architecture, the definition of RPC interfaces, we also refer to the dapr community definition and therefore the interface definition is fully responsive to our needs and thus the interface definition is a direct reuse of dapr but the current RPC delivery programme provided by dapr is still weak, and MOSN is very mature over the years, This is a brave combination of Runtime with Service Mesh and MOSN itself as a component of our capacity to implement RPC and thereby Layotto submit to MOSN for actual data transfer upon receipt of RPC requests, The option could change routing rules through istio, downgraded flow and so on, which would amount to a direct replication of Service Mesh's capabilities. This would also indicate that Runtime is not about listing the Service Mesh, but rather a step forward on that basis.

In terms of details, in order to better integrate with MOSN, we have added one Channel, default support for dubbo, bolt, HTTP three common RPC protocols to the RPC. If we still fail to meet the user scene, we have added Before/After filter to allow users to customize extensions and implement protocol conversions, etc.

H, Actuator

In actual production environments, in addition to the various distributive capabilities required for the application, we often need to understand the operational state of the application, based on this need, we abstract an actuator interface, and we currently do not have the capability to do so at the moment, dapr and so we are designed on the basis of internal demand scension scenarios to expose the full range of information on the application at the startup and running stages, etc.

Layotto divides the exposure information into an individual:

  1. Health:This module determines whether the app is healthy, e.g. a strongly dependent component needs to be unhealthy if initialization fails, and we refer to k8s for the type of health check to:

a. Readiness:indicates that the app is ready to start and can start processing requests.

b. Liveness:indicates the state of life of the app, which needs to be cut if it does not exist.

  1. Info:This module is expected to expose some of the dependencies of the app, such as the service on which the app depends, the subscription configuration, etc. for troubleshooting issues.

Health exposure health status is divided into the following Atlash:

  1. INIT:indicates that the app is still running. If the app returns this value during the release process, the PaaS platform should continue waiting for the app to be successfully started.

  2. UP:indicates that the app is starting up normally, and if the app returns this value, the PasS platform can start loading traffic.

  3. DOWN:indicates that the app failed to boot, meaning PaaS needs to stop publishing and notify the app owner if the app returns this value during the release process.

The search for Layotto is now largely complete in the Runtime direction, and we have addressed the current problems of infrastructure binding and the high cost of isomer language access using a standard interactive protocol such as gRPC to define a clearly defined API.As the future API standardises the application of Layotto can be deployed on various privately owned and publicly owned clouds, on the one hand, and free switching between Layotto, dapr and more efficient research and development, on the other.

Currently, Serverless fields are also flown and there is no single solution, so Layotto makes some attempts in Serverless directions, in addition to the input in the Runtime direction described above.

Exploring WebAssembly

Introduction to the Web Assembly

WebAssembly, abbreviated WASM, a collection of binary commands initially running on the browser to solve the JavaScript performance problems, but due to its good safety, isolation, and linguistic indifference, one quickly starts to get it to run outside the browser. With the advent of the WASI definition, only one WASM will be able to execute the WAS document anywhere.

Since WebAssembly can run outside the browser, can we use it in Serverless fields?Some attempts had been made in that regard, but if such a solution were to be found to be a real one, it would be the first question of how to address the dependence of a functioning Web Assembly on infrastructure.

Principles of B and Web Assembly landing

Currently MOSN runs on MOSN by integrating WASM Runtime to meet the need for custom extensions to MOSN.Layotto is also built over MOSN so we consider combining the two in order to implement the following graph:

Developers can develop their code using a variety of preferred languages such as Go/C+/Rust and then run them over MOSN to produce WASM files and call Layotto provide standard API via local function when WASM style applications need to rely on various distribution capabilities in processing requests, thereby directly resolving the dependency of WASM patterns.

Layotto now provides Go with the implementation of the Rust version of WASM, while supporting the demo tier function only, is enough for us to see the potential value of such a programme.

In addition, the WASM community is still in its early stages and there are many places to be refined, and we have submitted some PRs to the community to build up the backbone of the WASM technology.

C. WebAssembly Landscape Outlook

Although the use of WAS in Layotto is still in the experimental stage, we hope that it will eventually become a service unless it is developed through a variety of programming languages, as shown in the graph above, and then codify the WASM document, which will eventually run on Layotto+MOSN, while the application wiki management is governed by k8, docker, prometheus and others.

Community planning

Finally, look at what Layotto does in the community.

A, Layotto vs Dapr

Charted Layotto in contrast to the existing capabilities in Layotto, our development process at Layotto, always aim to achieve the goal of a common building, based on the principle of re-use, secondary development, and for the capacity being built or to be built in the future, we plan to give priority to Layotto and then to the community to merge into standard API, so that in the short term it is possible that the Layotto API will take precedence over the community, but will certainly be unified in the long term, given the mechanism for community asynchronous collaboration.

The APP

We have had extensive discussions in the community about how to define a standard API and how Layotto can run on envoy, and we will continue to do so.

C, Road map

Layotto currently support four major functionalities in support of RPC, Config, Pub/Sub, Actuator and is expected to devote attention to distribution locks and observations in September, and Layotto plugging in December, which it will be able to run on envoy, with the hope that further outputs will be produced for the WebCongress exploration.

Official open source

gave a detailed presentation of the Layotto project and most importantly the project is being officially opened today as a sub-project of MOSN and we have provided detailed documentation and demo examples to facilitate quick experience.

The construction of the API standardization is a matter that needs to be promoted over the long term, while standardization means not meeting one or two scenarios, but the best possible fitness for most use scenarios, so we hope that more people can participate in the Layotto project, describe your use scenario, discuss the API definition options, come together to the community, ultimately reach the ultimate goal of Write once, Run any!

Source Parsing 7 Layer Traffic Governance, Interface Limit

· Read will take 2 min

Author Profile: was a fester of an open source community committed to embracing open sources and hoping to interact with each other’s open-source enthusiasts for progress and growth.

Writing Time: 20 April 2022

Overview

The purpose of this document is to analyze the implementation of the interface flow

Prerequisite:

Document content refers to the following version of the code

https://github.com/mosn/mosn

Mosn d11b5a638a137045c2fb03d9d8ca36ecc0def11 (Division Develop)

Source analysis

Overall analysis

Reference to
https://mosn.io/docs/concept/extensions/

Mosn Stream Filter Extension

01.png

Code in: flowcontrol代码

stream_filter_factory.go analysis

This class is a factory class to create StreamFilter.

Some constant values are defined for default values

02.png

Defines the restricted stream config class to load yaml definition and parse production corresponding functions

03.png

init() Inner initialization is the storage of name and corresponding constructor to the filter blocking plant map

04.png

Highlight createRpcFlowControlFilterFactory Production rpc Current Factory

05.png

Before looking at streamfilter, we see how factory classes are producing restricted streamers

06.png

Limit the streaming to the restricted stream chain structure to take effect in sequential order.

CreateFilterChain method adds multiple filters to the link structure

07.png

We can see that this interface is achieved by a wide variety of plant types, including those that we are studying today.

08.png

Stream_filter.go Analysis

09.png

Overall process:

Finally, we look back at the overall process progress:

  1. Starting from the initialization function of stream_filter_factory.go, the program inserted createRpcFlowControlFilterFactory.

  2. Mosn created a filter chain (code positionfactory.go) by circulating CreateFilterChain to include all filters in the chain structure, including our master restricted streaming today.

  3. Create Limiter NewStreamFilter().

  4. OnReceive() and eventually by sentinel (whether the threshold has been reached, whether to release traffic or stop traffic, StreamFilterStop or StreamFilterContinue).

Layotto Source Parsing — Processing RPC requests

· Read will take 21 min

This paper is based on the Dubbo Json RPC as an example of the Layotto RPC processing.

by:Wang Zhilong | 21April 2022

  • overview
  • [source analysis](#source analysis)
    • 0x00 Layotto initialize RPC
    • [0x01 Dubbo-go-sample client to request request] (#_0x01-dubbo-go-sample-client-request request)
    • [0x02 Mosn EventLoop Reader Processing Request Data](#_0x02-mosn-eventloop-read processing request)
    • [0x03 Grpc Sever as NetworkFilter to process requests](#_0x03-grpc-sever as -networkfilter-process requests)
    • [0x04 Layotto send RPC requests and write to Local Virtual Connections](#_0x04-layotto -rpc-request and write to -local-virtual connection)
    • [0x05 Mosn reads Remote and executes Filter and Proxy Forwarding](#_0x05-mosn-read-remote-remote--and --filter-and proxy forward)
    • [0x06 Dubbo-go-sample server received response to request return] (#_0x06-dubbo-go-sample-server-received response return)
    • [0x07 Mosn Framework handles response and writes back to Remote Virtual Connections](#_0x07-mosn-Framework handles response and -remote-virtual connection)
    • [0x08 Layotto receive RPC responses and read Local Virtual Connections](#_0x08-layotto-receive-rpc-response and read -local-virtual connection)
    • [0x09 Grpc Sever processed data frames returned to client](#_0x09-grpc-sever processed frame returned to client)
    • [0x10 Dubbo-go-sample client receiving response](#_0x10-dubbo-go-sample-client-receiving response)
  • summary

General description

Layotto has a clear and rich semiconductor API as a distributed prototype collection of prototype language distinguished from the network proxy service Mesh and using standard protocol API, which is part of the RPC API.Through RPC API app developers can interact with local Layotto instances of applications that also use the Sidecar architecture, thereby indirectly calling different service methods and using built-in capabilities to perform distributive tracking and diagnosis, traffic control, error handling, secure links, etc.and Layotto is based on the Grpc handler design, using the X-Protocol protocol for secure and reliable communications, except for Http/Grpc communications with other services.As shown in the following code, the RPC API interface is in line with Dapr and is available for RPC calls through the Grpc interface InvokeService.

type DaprClient interface {
// Invokes a method on a remote Dapr app.
InvokeService(ctx context.Context, in *InvokeServiceRequest, opts ...grpc.CallOption) (*v1.InvokeResponse, error)
...
}

Source analysis

For ease of understanding, from outside to inside, from inside to outside, from flow to source code, that is, from Client, through one layer of logic to the Server receiving a return response to requests, from another layer of return to client, and from one layer of analysis of Layotto RPC processes, split into 10 steps.Also, since the content of Gypc Client and Server handshakes and interactions is not the focus of this paper, the analysis is relatively brief and the other steps are relatively detailed and one can move directly from the directory to the corresponding step depending on his or her case.

Note:based on commit hash:1d2bed68c3b2372c34a12aeed41be125a4fdd15a

0x00 Layotto initialize RPC

Layotto starts the process involves a large number of processes in which only the initialization of the process related to RPC and described below is analyzed because Layotto is based on Mosn and is therefore starting from the Main function, urfave/cli library calls Mosn StageManager Mos, thus initializing GrpcServer in Mosn NetworkFilter as follows.

mosn.io/mosn/pkg/stagemanager.(*StageManager).runInitStage at stage_manager.go
=>
mosn.io/mosn/pkg/mosn.(*Mosn).initServer at mosn.go
=>
mosn.io/mosn/pkg/filter/network/grpc.(*grpcServerFilterFactory).Init at factory.go
=>
mosn.io/mosn/pkg/filter/network/grpc.(*Handler).New at factory.go
// 新建一个带有地址的 Grpc 服务器。同一个地址返回同一个服务器,只能启动一次
func (s *Handler) New(addr string, conf json.RawMessage, options ...grpc.ServerOption) (*registerServerWrapper, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
sw, ok := s.servers[addr]
if ok {
return sw, nil
}
ln, err := NewListener(addr)
if err != nil {
log.DefaultLogger.Errorf("create a listener failed: %v", err)
return nil, err
}
// 调用 NewRuntimeGrpcServer
srv, err := s.f(conf, options...)
if err != nil {
log.DefaultLogger.Errorf("create a registered server failed: %v", err)
return nil, err
}
sw = &registerServerWrapper{
server: srv,
ln: ln,
}
s.servers[addr] = sw
return sw, nil
}
=
main.NewRunvtimeGrpcServer at main.go
=>
mosn.io/layotto/pkg/runtime.(*MosnRuntime).initRuntime at runtime.go
=>
mosn.io/layotto/pkg/runtime.(*MosnRuntime).initRpcs at runtime.go
=>
mosn.io/layotto/components/rpc/invoker/mosn.(*mosnInvoker).Init at mosninvoker.go
func (m *mosnInvoker) Init(conf rpc.RpcConfig) error {
var config mosnConfig
if err := json.Unmarshal(conf.Config, &config); err != nil {
return err
}

// 初始化 RPC 调用前的 Filter
for _, before := range config.Before {
m.cb.AddBeforeInvoke(before)
}

// 初始化 RPC 调用后的 Filter
for _, after := range config.After {
m.cb.AddAfterInvoke(after)
}

if len(config.Channel) == 0 {
return errors.New("missing channel config")
}

// 初始化与 Mosn 通信使用的通道、协议及对应端口
channel, err := channel.GetChannel(config.Channel[0])
if err != nil {
return err
}
m.channel = channel
return nil
}
...
// 完成一些列初始化后在 grpcServerFilter 中启动 Grpc Server
mosn.io/mosn/pkg/filter/network/grpc.(*grpcServerFilterFactory).Init at factory.go
func (f *grpcServerFilterFactory) Init(param interface{}) error {
...
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(f.UnaryInterceptorFilter),
grpc.StreamInterceptor(f.StreamInterceptorFilter),
}
// 经过上述初始化,完成 Grpc registerServerWrapper 的初始化
sw, err := f.handler.New(addr, f.config.GrpcConfig, opts...)
if err != nil {
return err
}
// 启动 Grpc sever
sw.Start(f.config.GracefulStopTimeout)
f.server = sw
log.DefaultLogger.Debugf("grpc server filter initialized success")
return nil
}
...
// StageManager 在 runInitStage 之后进入 runStartStage 启动 Mosn
func (stm *StageManager) runStartStage() {
st := time.Now()
stm.SetState(Starting)
for _, f := range stm.startupStages {
f(stm.app)
}

stm.wg.Add(1)
// 在所有启动阶段完成后启动 Mosn
stm.app.Start()
...
}

0x01 Dubbo-go-sample client request

Follow the example of Dubbo Json Rpc Example

go un demo/rpc/dubbo_json_rpc/dub_json_client/client.go -d '{"jsonrpc": "2.0", "method":"GetUser", "params":["A003"],"id":9527}'

Use Layotto for App Gypc API InvokeService initiate RPC calls, data filling and connecting processes leading to the dispatch of data to Layotto via SendMsg in Grpc clientStream, as follows.


func main() {
data := flag.String("d", `{"jsonrpc":"2.0","method":"GetUser","params":["A003"],"id":9527}`, "-d")
flag.Parse()

conn, err := grpc.Dial("localhost:34904", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}

cli := runtimev1pb.NewRuntimeClient(conn)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
// 通过 Grpc 接口 InvokeService 进行 RPC 调用
resp, err := cli.InvokeService(
ctx,
// 使用 runtimev1pb.InvokeServiceRequest 发起 Grpc 请求
&runtimev1pb.InvokeServiceRequest{
// 要请求的 server 接口 ID
Id: "org.apache.dubbo.samples.UserProvider",
Message: &runtimev1pb.CommonInvokeRequest{
// 要请求的接口对应的方法名
Method: "GetUser",
ContentType: "",
Data: &anypb.Any{Value: []byte(*data)},
HttpExtension: &runtimev1pb.HTTPExtension{Verb: runtimev1pb.HTTPExtension_POST},
},
},
)
if err != nil {
log.Fatal(err)
}

fmt.Println(string(resp.Data.GetValue()))
}
=>
mosn.io/layotto/spec/proto/runtime/v1.(*runtimeClient).InvokeService at runtime.pb.go
=>
google.golang.org/grpc.(*ClientConn).Invoke at call.go
=>
google.golang.org/grpc.(*clientStream).SendMsg at stream.go
=>
google.golang.org/grpc.(*csAttempt).sendMsg at stream.go
=>
google.golang.org/grpc/internal/transport.(*http2Client).Write at http2_client.go

0x02 Mosn EventLoop Reader Processing Request Data

The kernel from Layotto mentioned above is a mock-up of Mosn, so when network connection data arrives, it will first be read and written at the L4 network level in Mosn as follows.

mosn.io/mosn/pkg/network.(*listener).accept at listener.go
=>
mosn.io/mosn/pkg/server.(*activeListener).OnAccept at handler.go
=>
mosn.io/mosn/pkg/server.(*activeRawConn).ContinueFilterChain at handler.go
=>
mosn.io/mosn/pkg/server.(*activeListener).newConnection at handler.go
=>
mosn.io/mosn/pkg/network.(*connection).Start at connection.go
=>
mosn.io/mosn/pkg/network.(*connection).startRWLoop at connection.go
func (c *connection) startRWLoop(lctx context.Context) {
c.internalLoopStarted = true

utils.GoWithRecover(func() {
// 读协程
c.startReadLoop()
}, func(r interface{}) {
c.Close(api.NoFlush, api.LocalClose)
})

if c.checkUseWriteLoop() {
c.useWriteLoop = true
utils.GoWithRecover(func() {
// 写协程
c.startWriteLoop()
}, func(r interface{}) {
c.Close(api.NoFlush, api.LocalClose)
})
}
}

In the startRWLoop method, we can see that two separate protocols will be opened to deal with reading and writing operations on the connection: startReadLoop and startWriteLoop; the following streams will be made in startReadLoop; the data read at the network level will be handled by the filterManager filter chain, as follows.

mosn.io/mosn/pkg/network.(*connection).doRead at connection.go
=>
mosn.io/mosn/pkg/network.(*connection).onRead at connection.go
=>
mosn.io/mosn/pkg/network.(*filterManager).OnRead at filtermanager.go
=>
mosn.io/mosn/pkg/network.(*filterManager).onContinueReading at filtermanager.go
func (fm *filterManager) onContinueReading(filter *activeReadFilter) {
var index int
var uf *activeReadFilter

if filter != nil {
index = filter.index + 1
}

// filterManager遍历过滤器进行数据处理
for ; index < len(fm.upstreamFilters); index++ {
uf = fm.upstreamFilters[index]
uf.index = index
// 对没有初始化的过滤器调用其初始化方法 OnNewConnection,本例为func (f *grpcFilter) OnNewConnection() api.FilterStatus(向 Listener 发送 grpc 连接以唤醒 Listener 的 Accept)
if !uf.initialized {
uf.initialized = true

status := uf.filter.OnNewConnection()

if status == api.Stop {
return
}
}

buf := fm.conn.GetReadBuffer()

if buf != nil && buf.Len() > 0 {
// 通知相应过滤器处理
status := uf.filter.OnData(buf)

if status == api.Stop {
return
}
}
}
}
=>
mosn.io/mosn/pkg/filter/network/grpc.(*grpcFilter).OnData at filter.go
=>
mosn.io/mosn/pkg/filter/network/grpc.(*grpcFilter).dispatch at filter.go
func (f *grpcFilter) dispatch(buf buffer.IoBuffer) {
if log.DefaultLogger.GetLogLevel() >= log.DEBUG {
log.DefaultLogger.Debugf("grpc get datas: %d", buf.Len())
}
// 发送数据唤醒连接读取
f.conn.Send(buf)
if log.DefaultLogger.GetLogLevel() >= log.DEBUG {
log.DefaultLogger.Debugf("read dispatch finished")
}
}

0x03 Grpc Sever processed requests as NetworkFilter

Reading data from the original connection in the first phase will enter the Grpc Serve handling, the Serve method will use the net.Listener listener, each time a new protocol is launched to handle the new connection (handleRawCon), and a RPC call based on Http2-based transport will be set out below.

google.golang.org/grpc.(*Server).handleRawConn at server.go
func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
// 校验服务状态
if s.quit.HasFired() {
rawConn.Close()
return
}
rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
if err != nil {
...
}
// HTTP2 握手,创建 Http2Server 与客户端交换帧的初始化信息,帧和窗口大小等
st := s.newHTTP2Transport(conn, authInfo)
if st == nil {
return
}

rawConn.SetDeadline(time.Time{})
if !s.addConn(lisAddr, st) {
return
}
// 创建一个协程进行流处理
go func() {
s.serveStreams(st)
s.removeConn(lisAddr, st)
}()
...
}
=>
google.golang.org/grpc.(*Server).serveStreams at server.go
=>
google.golang.org/grpc.(*Server).handleStream at server.go
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
// 找到到需要调用的 FullMethod,此例为 spec.proto.runtime.v1.Runtime/InvokeService
sm := stream.Method()
if sm != "" && sm[0] == '/' {
sm = sm[1:]
}
...
service := sm[:pos]
method := sm[pos+1:]

// 从注册的 service 列表中找到对应 serviceInfo 对象
srv, knownService := s.services[service]
if knownService {
// 根据方法名找到单向请求的 md——MethodDesc,此 demo 为 mosn.io/layotto/spec/proto/runtime/v1._Runtime_InvokeService_Handler
if md, ok := srv.methods[method]; ok {
s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
// 流式请求
if sd, ok := srv.streams[method]; ok {
s.processStreamingRPC(t, stream, srv, sd, trInfo)
return
}
}
...
=>
google.golang.org/grpc.(*Server).processUnaryRPC at server.go
=>
mosn.io/layotto/spec/proto/runtime/v1._Runtime_InvokeService_Handler at runtime.pb.go
=>
google.golang.org/grpc.chainUnaryServerInterceptors at server.go
=>
// 服务端单向调用拦截器,用以调用 Mosn 的 streamfilter
mosn.io/mosn/pkg/filter/network/grpc.(*grpcServerFilterFactory).UnaryInterceptorFilter at factory.go
=>
google.golang.org/grpc.getChainUnaryHandler at server.go
// 递归生成链式UnaryHandler
func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
if curr == len(interceptors)-1 {
return finalHandler
}

return func(ctx context.Context, req interface{}) (interface{}, error) {
// finalHandler就是mosn.io/layotto/spec/proto/runtime/v1._Runtime_InvokeService_Handler
return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
}
}

0x04 Layotto send RPC requests and write to local virtual connections

The 0x03 process follows Runtime_InvokeService_Handler, converted from the GRPC Default API to Dapr API, entering the light RPC framework provided by Layotto in Mosn, as follows.

mosn.io/layotto/spec/proto/runtime/v1._Runtime_InvokeService_Handler at runtime.pb.go
=>
mosn.io/layotto/pkg/grpc/default_api.(*api).InvokeService at api.go
=>
mosn.io/layotto/pkg/grpc/dapr.(*daprGrpcAPI).InvokeService at dapr_api.go
=>
mosn.io/layotto/components/rpc/invoker/mosn.(*mosnInvoker).Invoke at mosninvoker.go
// 请求 Mosn 底座和返回响应
func (m *mosnInvoker) Invoke(ctx context.Context, req *rpc.RPCRequest) (resp *rpc.RPCResponse, err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("[runtime][rpc]mosn invoker panic: %v", r)
log.DefaultLogger.Errorf("%v", err)
}
}()

// 1. 如果超时时间为 0,设置默认 3000ms 超时
if req.Timeout == 0 {
req.Timeout = 3000
}
req.Ctx = ctx
log.DefaultLogger.Debugf("[runtime][rpc]request %+v", req)
// 2. 触发请求执行前的自定义逻辑
req, err = m.cb.BeforeInvoke(req)
if err != nil {
log.DefaultLogger.Errorf("[runtime][rpc]before filter error %s", err.Error())
return nil, err
}
// 3. 核心调用,下文会进行详细分析
resp, err = m.channel.Do(req)
if err != nil {
log.DefaultLogger.Errorf("[runtime][rpc]error %s", err.Error())
return nil, err
}
resp.Ctx = req.Ctx
// 4. 触发请求返回后的自定义逻辑
resp, err = m.cb.AfterInvoke(resp)
if err != nil {
log.DefaultLogger.Errorf("[runtime][rpc]after filter error %s", err.Error())
}
return resp, err
}
=>
mosn.io/layotto/components/rpc/invoker/mosn/channel.(*httpChannel).Do at httpchannel.go
func (h *httpChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) {
// 1. 使用上一阶段设置的默认超时设置 context 超时
timeout := time.Duration(req.Timeout) * time.Millisecond
ctx, cancel := context.WithTimeout(req.Ctx, timeout)
defer cancel()

// 2. 创建连接得到,启动 readloop 协程进行 Layotto 和 Mosn 的读写交互(具体见下文分析)
conn, err := h.pool.Get(ctx)
if err != nil {
return nil, err
}

// 3. 设置数据写入连接的超时时间
hstate := conn.state.(*hstate)
deadline, _ := ctx.Deadline()
if err = conn.SetWriteDeadline(deadline); err != nil {
hstate.close()
h.pool.Put(conn, true)
return nil, common.Error(common.UnavailebleCode, err.Error())
}
// 4. 因为初始化时配置的 Layotto 与 Mosn 交互使用的是 Http 协议,所以这里会构造 Http 请求
httpReq := h.constructReq(req)
defer fasthttp.ReleaseRequest(httpReq)

// 借助 fasthttp 请求体写入虚拟连接
if _, err = httpReq.WriteTo(conn); err != nil {
hstate.close()
h.pool.Put(conn, true)
return nil, common.Error(common.UnavailebleCode, err.Error())
}

// 5. 构造 fasthttp.Response 结构体读取和解析 hstate 的返回,并设置读取超时时间
httpResp := &fasthttp.Response{}
hstate.reader.SetReadDeadline(deadline)

// 在 Mosn 数据返回前这里会阻塞,readloop 协程读取 Mosn 返回的数据之后流程见下述 0x08 阶段
if err = httpResp.Read(bufio.NewReader(hstate.reader)); err != nil {
hstate.close()
h.pool.Put(conn, true)
return nil, common.Error(common.UnavailebleCode, err.Error())
}
h.pool.Put(conn, false)
...
}
=>
mosn.io/layotto/components/rpc/invoker/mosn/channel.(*connPool).Get at connpool.go
// Get is get wrapConn by context.Context
func (p *connPool) Get(ctx context.Context) (*wrapConn, error) {
if err := p.waitTurn(ctx); err != nil {
return nil, err
}

p.mu.Lock()
// 1. 从连接池获取连接
if ele := p.free.Front(); ele != nil {
p.free.Remove(ele)
p.mu.Unlock()
wc := ele.Value.(*wrapConn)
if !wc.isClose() {
return wc, nil
}
} else {
p.mu.Unlock()
}

// 2. 创建新的连接
c, err := p.dialFunc()
if err != nil {
p.freeTurn()
return nil, err
}
wc := &wrapConn{Conn: c}
if p.stateFunc != nil {
wc.state = p.stateFunc()
}
// 3. 启动 readloop 独立协程读取 Mosn 返回的数据
if p.onDataFunc != nil {
utils.GoWithRecover(func() {
p.readloop(wc)
}, nil)
}
return wc, nil
}
=>

The creation of a new connection in the second step above requires attention by calling dialFunc func() in the protocol that initialized the init phase (net.Conn, error), because the configuration interacted with Mosn with Http protocols, this is newHttpChanel, which is currently supported by the Bolt, Dubbo et al.

mosn.io/layotto/components/rpc/invoker/mosn/channel.newHttpChannel at httpchannel.go
// newHttpChannel is used to create rpc.Channel according to ChannelConfig
func newHttpChannel(config ChannelConfig) (rpc.Channel, error) {
hc := &httpChannel{}
// 为减少连接创建开销的连接池,定义在 mosn.io/layotto/components/rpc/invoker/mosn/channel/connpool.go
hc.pool = newConnPool(
config.Size,
// dialFunc
func() (net.Conn, error) {
_, _, err := net.SplitHostPort(config.Listener)
if err == nil {
return net.Dial("tcp", config.Listener)
}
//创建一对虚拟连接(net.Pipe),Layotto 持有 local,Mosn 持有 remote, Layotto 向 local 写入,Mosn 会收到数据, Mosn 从 remote读取,执行 filter 逻辑并进行代理转发,再将响应写到 remote ,最后 Layotto 从 remote 读取,获得响应
local, remote := net.Pipe()
localTcpConn := &fakeTcpConn{c: local}
remoteTcpConn := &fakeTcpConn{c: remote}
// acceptFunc 是定义在 mosn.io/layotto/components/rpc/invoker/mosn/channel.go 中的闭包,闭包中监听了 remote 虚拟连接
if err := acceptFunc(remoteTcpConn, config.Listener); err != nil {
return nil, err
}
// the goroutine model is:
// request goroutine ---> localTcpConn ---> mosn
// ^ |
// | |
// | |
// hstate <-- readloop goroutine <------
return localTcpConn, nil
},
// stateFunc
func() interface{} {
// hstate 是 readloop 协程与 request 协程通信的管道,是一对读写 net.Conn,请求协程从 reader net.Conn 中读数据,readloop 协程序往 writer net.Conn 写数据
s := &hstate{}
s.reader, s.writer = net.Pipe()
return s
},
hc.onData,
hc.cleanup,
)
return hc, nil
}

0x05 Mosn read Remote and execute Filter and proxy forwarding

(1) Similar to 0x02, filtermanager executes the filter processing phase where proxy forwarding is made in proxy with the following code.

...
mosn.io/mosn/pkg/network.(*filterManager).onContinueReading at filtermanager.go
=>
mosn.io/mosn/pkg/proxy.(*proxy).OnData at proxy.go
func (p *proxy) OnData(buf buffer.IoBuffer) api.FilterStatus {
if p.fallback {
return api.Continue
}

if p.serverStreamConn == nil {
...
p.serverStreamConn = stream.CreateServerStreamConnection(p.context, proto, p.readCallbacks.Connection(), p)
}
//把数据分发到对应协议的解码器,在这里因为是 POST /org.apache.dubbo.samples.UserProvider HTTP/1.1,所以是 mosn.io/mosn/pkg/stream/http.(*serverStreamConnection).serve at stream.go
p.serverStreamConn.Dispatch(buf)

return api.Stop
}
=>

(2) ServerStreamConnection.serve listens and handles requests to downstream OnReceive, as described below.

mosn.io/mosn/pkg/stream/http.(*serverStream).handleRequest at stream.go
func (s *serverStream) handleRequest(ctx context.Context) {
if s.request != nil {
// set non-header info in request-line, like method, uri
injectCtxVarFromProtocolHeaders(ctx, s.header, s.request.URI())
hasData := true
if len(s.request.Body()) == 0 {
hasData = false
}

if hasData {
//在此进入 downstream OnReceive
s.receiver.OnReceive(s.ctx, s.header, buffer.NewIoBufferBytes(s.request.Body()), nil)
} else {
s.receiver.OnReceive(s.ctx, s.header, nil, nil)
}
}
}
=>
mosn.io/mosn/pkg/proxy.(*downStream).OnReceive at downstream.go
func (s *downStream) OnReceive(ctx context.Context, headers types.HeaderMap, data types.IoBuffer, trailers types.HeaderMap) {
...
var task = func() {
...

phase := types.InitPhase
for i := 0; i < 10; i++ {
s.cleanNotify()

phase = s.receive(s.context, id, phase)
...
}
}
}

if s.proxy.serverStreamConn.EnableWorkerPool() {
if s.proxy.workerpool != nil {
// use the worker pool for current proxy
s.proxy.workerpool.Schedule(task)
} else {
// use the global shared worker pool
pool.ScheduleAuto(task)
}
return
}

task()
return

}

(3) The above ScheduleAuto schedule, after processing the reveive of downstream Stream, processing upstam Request, as well as an application with an application from the network layer, eventually sending data from connection.Write and entering WaitNotify phases, as detailed below.

mosn.io/mosn/pkg/sync.(*workerPool).ScheduleAuto at workerpool.go
=>
mosn.io/mosn/pkg/sync.(*workerPool).spawnWorker at workerpool.go
=>
mosn.io/mosn/pkg/proxy.(*downStream).receive at downstream.go
=>
InitPhase=>DownFilter=>MatchRoute=>DownFilterAfterRoute=>ChooseHost=>DownFilterAfterChooseHost=>DownRecvHeader=>DownRecvData
=>
mosn.io/mosn/pkg/proxy.(*downStream).receiveData at downstream.go
=>
mosn.io/mosn/pkg/proxy.(*upstreamRequest).appendData at upstream.go
=>
mosn.io/mosn/pkg/stream/http.(*clientStream).doSend at stream.go
=>
github.com/valyala/fasthttp.(*Request).WriteTo at http.go
=>
mosn.io/mosn/pkg/stream/http.(*streamConnection).Write at stream.go
>
mosn.io/mosn/pkg/network.(*connection).Write at connection.go
=>
mosn.io/mosn/pkg/proxy.(*downStream).receive at downstream.go
func (s *downStream) receive(ctx context.Context, id uint32, phase types.Phase) types.Phase {
for i := 0; i <= int(types.End-types.InitPhase); i++ {
s.phase = phase

switch phase {
...
case types.WaitNotify:
s.printPhaseInfo(phase, id)
if p, err := s.waitNotify(id); err != nil {
return p
}

if log.Proxy.GetLogLevel() >= log.DEBUG {
log.Proxy.Debugf(s.context, "[proxy] [downstream] OnReceive send downstream response %+v", s.downstreamRespHeaders)
}
...
}
=>
func (s *downStream) waitNotify(id uint32) (phase types.Phase, err error) {
if atomic.LoadUint32(&s.ID) != id {
return types.End, types.ErrExit
}

if log.Proxy.GetLogLevel() >= log.DEBUG {
log.Proxy.Debugf(s.context, "[proxy] [downstream] waitNotify begin %p, proxyId = %d", s, s.ID)
}
select {
// 阻塞等待
case <-s.notify:
}
return s.processError(id)
}

0x06 Dubbo-go-sample server received request return response

Here is a dubo-go-sample server handling, leave it now, post log messages and check the source code by interested classes.

[2022-04-18/21:03:03:18 github.com/apache/dub-go-samples/rpc/jsonrpc/go-server/pkg.(*UserProvider2).GetUser: user_provider2.go: 53] userID: "A003"
[2022-04-18/21:03:18 github.com/apache/dub-go-samples/rpc/jsonrpc/go-server/pkg. (*UserProvider2).GetUser: user_provider2.go: 56] rsp:&pkg.User{ID:"113", Name:"Moorse", Age:30, sex:0, Birth:703391943, Sex:"MAN"MAN"}

0x07 Mosn framework handles responses and writes back to Remote Virtual Connection

After the third phase of 0x05 above, the response logic goes into the UpRecvData phase of the reveive cycle phase through a series of final response writing back to the remote virtual connection at 0x04, as follows.

mosn.io/mosn/pkg/proxy.(*downStream).receive at downstream.go
func (s *downStream) waitNotify(id uint32) (phase types.Phase, err error) {
if atomic.LoadUint32(&s.ID) != id {
return types.End, types.ErrExit
}

if log.Proxy.GetLogLevel() >= log.DEBUG {
log.Proxy.Debugf(s.context, "[proxy] [downstream] waitNotify begin %p, proxyId = %d", s, s.ID)
}
// 返回响应
select {
case <-s.notify:
}
return s.processError(id)
}
=>
UpFilter
=>
UpRecvHeader
=>
func (s *downStream) receive(ctx context.Context, id uint32, phase types.Phase) types.Phase {
for i := 0; i <= int(types.End-types.InitPhase); i++ {
s.phase = phase

switch phase {
...
case types.UpRecvData:
if s.downstreamRespDataBuf != nil {
s.printPhaseInfo(phase, id)
s.upstreamRequest.receiveData(s.downstreamRespTrailers == nil)
if p, err := s.processError(id); err != nil {
return p
}
}
...
}
=>
mosn.io/mosn/pkg/proxy.(*upstreamRequest).receiveData at upstream.go
=>
mosn.io/mosn/pkg/proxy.(*downStream).onUpstreamData at downstream.go
=>
mosn.io/mosn/pkg/proxy.(*downStream).appendData at downstream.go
=>
mosn.io/mosn/pkg/stream/http.(*serverStream).AppendData at stream.go
=>
mosn.io/mosn/pkg/stream/http.(*serverStream).endStream at stream.go
=>
mosn.io/mosn/pkg/stream/http.(*serverStream).doSend at stream.go
=>
github.com/valyala/fasthttp.(*Response).WriteTo at http.go
=>
github.com/valyala/fasthttp.writeBufio at http.go
=>
github.com/valyala/fasthttp.(*statsWriter).Write at http.go
=>
mosn.io/mosn/pkg/stream/http.(*streamConnection).Write at stream.go

0x08 Layotto receive RPC responses and read Local Virtual Connection

Readloop Reading IO, activated by 0x04 above, is activated from connection read data from Mosn and then forwarded to the hstate pipe to return to the request process, as follows.

mosn.io/layotto/components/rpc/invoker/mosn/channel.(*connPool).readloop at connpool.go
// readloop is loop to read connected then exec onDataFunc
func (p *connPool) readloop(c *wrapConn) {
var err error

defer func() {
c.close()
if p.cleanupFunc != nil {
p.cleanupFunc(c, err)
}
}()

c.buf = buffer.NewIoBuffer(defaultBufSize)
for {
// 从连接读取数据
n, readErr := c.buf.ReadOnce(c)
if readErr != nil {
err = readErr
if readErr == io.EOF {
log.DefaultLogger.Debugf("[runtime][rpc]connpool readloop err: %s", readErr.Error())
} else {
log.DefaultLogger.Errorf("[runtime][rpc]connpool readloop err: %s", readErr.Error())
}
}

if n > 0 {
// 在onDataFunc 委托给 hstate 处理数据
if onDataErr := p.onDataFunc(c); onDataErr != nil {
err = onDataErr
log.DefaultLogger.Errorf("[runtime][rpc]connpool onData err: %s", onDataErr.Error())
}
}

if err != nil {
break
}

if c.buf != nil && c.buf.Len() == 0 && c.buf.Cap() > maxBufSize {
c.buf.Free()
c.buf.Alloc(defaultBufSize)
}
}
}
=>
mosn.io/layotto/components/rpc/invoker/mosn/channel.(*httpChannel).onData at httpchannel.go
=>
mosn.io/layotto/components/rpc/invoker/mosn/channel.(*hstate).onData at httpchannel.go
=>
net.(*pipe).Write at pipe.go
=>
mosn.io/layotto/components/rpc/invoker/mosn/channel.(*httpChannel).Do at httpchannel.go
func (h *httpChannel) Do(req *rpc.RPCRequest) (*rpc.RPCResponse, error) {
...
// 接上述0x04阶段,mosn 数据返回后,从 hstate 读取 readloop 协程从 mosn 返回的数据
if err = httpResp.Read(bufio.NewReader(hstate.reader)); err != nil {
hstate.close()
h.pool.Put(conn, true)
return nil, common.Error(common.UnavailebleCode, err.Error())
}
h.pool.Put(conn, false)

// 获取 fasthttp 的数据部分,解析状态码,失败返回错误信息和状态码
body := httpResp.Body()
if httpResp.StatusCode() != http.StatusOK {
return nil, common.Errorf(common.UnavailebleCode, "http response code %d, body: %s", httpResp.StatusCode(), string(body))
}

// 6. 将结果转换为 rpc.RPCResponse 返回
rpcResp := &rpc.RPCResponse{
ContentType: string(httpResp.Header.ContentType()),
Data: body,
Header: map[string][]string{},
}
httpResp.Header.VisitAll(func(key, value []byte) {
rpcResp.Header[string(key)] = []string{string(value)}
})
return rpcResp, nil

0x09 Grpc Sever processed data frames returned to clients

Grpc does not write data directly to connections, but uses a systray loop to fetch frames from a cache structure and write them back to the client, as follows.

google.golang.org/grpc/internal/transport.NewServerTransport at http2_server.go
func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
...
// 协程异步loop循环
go func() {
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
if err := t.loopy.run(); err != nil {
if logger.V(logLevel) {
logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
}
}
t.conn.Close()
t.controlBuf.finish()
close(t.writerDone)
}()
go t.keepalive()
return t, nil
}
=>
google.golang.org/grpc/internal/transport.(*loopyWriter).run at controlbuf.go
=>
google.golang.org/grpc/internal/transport.(*bufWriter).Flush at http_util.go
=>
mosn.io/mosn/pkg/filter/network/grpc.(*Connection).Write at conn.go
=>
mosn.io/mosn/pkg/network.(*connection).Write at connection.go
=>
mosn.io/mosn/pkg/network.(*connection).writeDirectly at connection.go
=>
mosn.io/mosn/pkg/network.(*connection).doWrite at connection.go

0x10 dubbo-go-sample customer received response

The transmission of data from 0x01 above will be blocked in the client grpc bottom reading, and Layotto returns data from some of the processing layers above to enable ClientBottom Read IO, as follows.

google.golang.org/grpc.(*ClientCon). Invoke at call.go
=>
google.golang.org/grpc.(*ClientCon). Invoke at call.go
=>
google.golang.org/grpc.(*clientStream). RecvMsg at stream. o
=>
google.golang.org/grpc.(*clientStream).withRetry at stream.go
=>
google.golang.org/grpc.(*csAtempt.recvMsg at stream.go
=>
google.golang.org/grpc.recvAndDecompress at rpc_util. o
=>
google.golang.org/grpc.recv at rpc_util.go
=>
google.golang.org/grpc.(*parser).recvMsg at rpc_util.go
=>
google.golang.org/grpc.(*csAttempt).recvMsg at stream. o
func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) LO
if _, err := p. .Read(p.header[:]); err != nil {
return 0, nil, err
}
...
}

Last returned data:

{"jsonrpc": "2.0", "id":9527, "result":{"id":"113", "name":"Moorse", "age":30,"time":703394193,"sex":"MAN"}}

Summary

The Layotto RPC process involves knowledge related to GRPC, Dapr, Mosn and others, and the overall process is lengthy, although it is clearer and simpler simply to see Layotto for Mosn an abstract lightweight RPC framework and is more innovative and useful for further study.Here Layotto RPC requests are analyzed and time-limited without some more comprehensive and in-depth profiles, such as defects, welcome contact:rayo.wangzl@gmail.com.It is also hoped that there will be greater participation in source analysis and open source communities, learning together and making progress together.

Source parsing layotto startup process

· Read will take 6 min

Author Intro to: Libin, https://github.com/ZLBer

Writing: 4 May 2022

  • Overview
  • [source analysis](#source analysis)
    • [1.cmd analysis](#1.cmd analysis)
    • [2.Callback functionNewRuntimeGrpcServer分析](#2.callback function NewRuntimeGrpcServer analysis)
    • [3.runtimeanalyze](#3.runtime analyse)
  • summary

Overview

Layotto "Parasite" in MOSN. The start process is in effect starting MOSN, MOSN back Layotto during startup to get Layotto start.

Source analysis

Everything originating from our command line: layotto start -c configpath

1.cmd analysis

Main init function starts with:

func init() {   
//将layotto的初始化函数传给mosn,让mosn启动的时候进行回调
mgrpc.RegisterServerHandler("runtime", NewRuntimeGrpcServer)
....
}

cmd action starts to execute:

	Action: func(c *cli.Context) error {
app := mosn.NewMosn()
//stagemanager用于管理mosn启动的每个阶段,可以添加相应的阶段函数,比如下面的ParamsParsedStage、InitStage、PreStartStage、AfterStartStage
//这里是将configpath传给mosn,下面都是mosn相关的逻辑
stm := stagemanager.InitStageManager(c, c.String("config"), app)
stm.AppendParamsParsedStage(ExtensionsRegister)
stm.AppendParamsParsedStage(func(c *cli.Context) {
err := featuregate.Set(c.String("feature-gates"))
if err != nil {
os.Exit(1)
}
})·
stm.AppendInitStage(mosn.DefaultInitStage)
stm.AppendPreStartStage(mosn.DefaultPreStartStage)
stm.AppendStartStage(mosn.DefaultStartStage)
//这里添加layotto的健康检查机制
stm.AppendAfterStartStage(SetActuatorAfterStart)
stm.Run()
// wait mosn finished
stm.WaitFinish()
return nil
},

NewRuntimeGrpcServer Analysis

Returns NewRuntimeGrpcServer when MOSN is launched, data is an unparsed configuration, opts is a grpc configuration, returning Gpc server

func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrpc.RegisteredServer, error) {
// 将原始的配置文件解析成结构体形式。
cfg, err := runtime.ParseRuntimeConfig(data)
// 新建layotto runtime, runtime包含各种组件的注册器和各种组件的实例。
rt := runtime.NewMosnRuntime(cfg)
// 3.runtime开始启动
server, err := rt.Run(
...
// 4. 添加所有组件的初始化函数
// 我们只看下File组件的,将NewXXX()添加到组件Factory里
runtime.WithFileFactory(
file.NewFileFactory("aliyun.oss", alicloud.NewAliCloudOSS),
file.NewFileFactory("minio", minio.NewMinioOss),
file.NewFileFactory("aws.s3", aws.NewAwsOss),
file.NewFileFactory("tencent.oss", tencentcloud.NewTencentCloudOSS),
file.NewFileFactory("local", local.NewLocalStore),
file.NewFileFactory("qiniu.oss", qiniu.NewQiniuOSS),
),
...
return server, err

)

//
}

runtime analysis

Look at the structure of runtime, the composition of the runtime' at the aggregate level of the:'

type MosnRuntime struct {
// 包括组件的config
runtimeConfig *MosnRuntimeConfig
info *info.RuntimeInfo
srv mgrpc.RegisteredServer
// 组件注册器,用来注册和新建组件,里面有组件的NewXXX()函数
helloRegistry hello.Registry
configStoreRegistry configstores.Registry
rpcRegistry rpc.Registry
pubSubRegistry runtime_pubsub.Registry
stateRegistry runtime_state.Registry
lockRegistry runtime_lock.Registry
sequencerRegistry runtime_sequencer.Registry
fileRegistry file.Registry
bindingsRegistry mbindings.Registry
secretStoresRegistry msecretstores.Registry
customComponentRegistry custom.Registry
hellos map[string]hello.HelloService
// 各种组件
configStores map[string]configstores.Store
rpcs map[string]rpc.Invoker
pubSubs map[string]pubsub.PubSub
states map[string]state.Store
files map[string]file.File
locks map[string]lock.LockStore
sequencers map[string]sequencer.Store
outputBindings map[string]bindings.OutputBinding
secretStores map[string]secretstores.SecretStore
customComponent map[string]map[string]custom.Component
AppCallbackConn *rawGRPC.ClientConn
errInt ErrInterceptor
started bool
//初始化函数
initRuntimeStages []initRuntimeStage
}

runtime is the run function logic as follows:

func (m *MosnRuntime) Run(opts..Option) (mgrpc.RegisteredServer, error) um
// launch flag
m. targeted = true
// newly created runtime configuration
o := newRuntimeOptions()
// run our previously imported option,. Really register various components Factory with
for _, opt := range opts {
opt(o)
}
//initialization component
if err := m. nitRuntime(o); err != nil {
return nil, err
}

//initialize Grpc,api assignment
var grpcOpts[]grpc. Absorption
if o.srvMaker != nil LO
grpcOpts = append(grpcOpts, grpc.GithNewServer(o.srvMaker))
}
var apis []grpc.GrpcAPI
ac := &grpc. pimplicationContextFe
m.runtimeConfig.AppManagement.AppId,
m.hellos,
m.configStories,
m.rpcs,
m.pubSubs,
m. tates,
m.files,
m.locks,
m.sequencers,
m.sendToOutputBinding,
m.secretStories,
m. ustomCompany,
}
// Factor generation of each component
for _, apiFactory := range o. piFactorys LOR
api := apiFactory(ac)
// init the GrpcAPI
if err := api.Init(m. ppCallbackCon); err != nil {
return nil, err
}
apis = append(apis, api)
}
// pass the api interface and configuration to grpc
grpcOpts = append(grpcOpts,
grpc.GrpOptions(o.options... ,
grpc.MithGrpcAPIs(apis),
)
//start grpc
var err error = nil
m. rv, err = grpc.NewGrpServer (grpcOpts...)
return m.srv, err
}

Component initialization function initRuntime :

func (m *MosnRuntime) initRuntime (r *runtimeOptions) errant error LO
st := time.Now()
if len(m.initRuntimeStages) === 0 56
m.initRuntimeStages = append(m. nitRuntimeStages, DefaultInitRuntimeStage
}
// Call DefaultInitRuntimeStage
for _, f := range m. nitRuntime Stages FEM
err := f(r, m)
if err != nil {
return err
}
}
. .
return nil
}

DefaultInitRuntimeStage component initialization logic, call init method for each component:

func DefaultInitRuntimeStage(o *runtimeOptions, m *MosnRuntime) error {
...
//初始化config/state/file/lock/sequencer/secret等各种组件
if err := m.initCustomComponents(o.services.custom); err != nil {
return err
}
if err := m.initHellos(o.services.hellos...); err != nil {
return err
}
if err := m.initConfigStores(o.services.configStores...); err != nil {
return err
}
if err := m.initStates(o.services.states...); err != nil {
return err
}
if err := m.initRpcs(o.services.rpcs...); err != nil {
return err
}
if err := m.initOutputBinding(o.services.outputBinding...); err != nil {
return err
}
if err := m.initPubSubs(o.services.pubSubs...); err != nil {
return err
}
if err := m.initFiles(o.services.files...); err != nil {
return err
}
if err := m.initLocks(o.services.locks...); err != nil {
return err
}
if err := m.initSequencers(o.services.sequencers...); err != nil {
return err
}
if err := m.initInputBinding(o.services.inputBinding...); err != nil {
return err
}
if err := m.initSecretStores(o.services.secretStores...); err != nil {
return err
}
return nil
}

Example file component, see initialization function:

func (m *MosnRuntime) initFiles(files ...file.FileFactory) ERRORY ERROR LO

//register configured components on
m.fileRegistry.Register(files...)
for name, config := range m. untimesConfig.Files Fact
//create/create a new component instance
c, err := m.fileRegistry.Create(name)
if err !=nil L/
m. rrInt(err, "creation files component %s failed", name)
return err
}
if err := c. nit(context.TODO(), &config); err != nil LO
m. rrInt(err, "init files component %s failed", name)
return err
}
//assignment to runtime
m. files[name] = c
}
return nil
}

Here MOSN, Grpc and Layotto are all started, and the code logic of the component can be called through the Gypc interface.

Summary

Overall view of the entire startup process, Layotto integration with MOSN to start, parse configuration files, generate component classes in the configuration file and expose the api of Grpc.

Layotto Source Parsing — WebAssembly

· Read will take 17 min

This paper mainly analyses the relevant implementation and application of Layotto Middle WASM.

by:Wang Zhilong | 18 May 2022

General description

WebAssemly Abbreviations WASM, a portable, small and loaded binary format operating in sandboxing implementation environment, was originally designed to achieve high-performance applications in web browsers, benefiting from its good segregation and security, multilingual support, cool-start fast flexibility and agility and application to embed other applications for better expansion, and obviously we can embed it into Layotto.Layotto supports loading compiled WASM files and interacting with the Target WASM API via proxy_abi_version_0_2_0; other Layotto also supports loading and running WASM carrier functions and supports interfaces between Function and access to infrastructure; and Layotto communities are also exploring the compilation of components into WASM modules to increase segregation between modules.This article uses the Layotto official quickstart example of accessing redis as an example to analyze WebAssemly in Layotto Related implementation and application.

Source analysis

Note:is based on commit hash:f1cf350a52b5a1a0b3788a31681007a056e332ef

Frame INIT

As the bottom layer of Layotto is Mosn, the WASM extension framework is also the WASM extension framework that reuses Mosn, as shown in figure 1 Layotto & Mosn WASM framework [1].

mosn_wasm_ext_framework_module

Figure 1 Layotto & Mosn WASM framework

Among them, Manager is responsible for managing and dynamically updating WASM plugins;VM for managing WASM virtual machines, modules and instances;ABI serves as the application binary interface to provide an external interface [2].

Here a brief review of the following concepts:
Proxy-Wasm :WebAssembly for Proxies (ABI specification) is an unrelated ABI standard that defines how proxy and WASM modules interact [3] in functions and callbacks. proxy-wasm-go-sdk :defines the interface of function access to system resources and infrastructure services based on proxy-wasm/spec which brings together the Runtime API to increase access to infrastructure.
proxy-wasm-go-host WebAssembly for Proxies (GoLang host implementation):Proxy-Wasm golang implementation to implement Runtime ABI logic in Layotto.
VM: Virtual Machine Virtual machine. The Runtime types are wasmtime, wasmer, V8, Lucet, WAMR, and wasm3

1, see first the configuration of stream filter in quickstart例子 as follows, two WASM plugins can be seen, using waste VM to start a separate instance with configuration: below

 "stream_filters": [
LO
"type": "Layotto",
"config": API
"Function1": LOs
"name": "function1", // Plugin name
"instance_num": 1, // Number of sandbox instances
"vm_config": LO
"engine": "waste", // Virtual Machine Type Runtime Type
"path": "demo/faas/code/golang/client/function_1. asm" /waste file path
}
},
"Function2": LO
"name": "function2", // Plugin name
"instance_num": 1, // Number of sandbox instances
"vm_config": LO
"engine": "waste", // Virtual Machine Type Runtime Type
"path": "demo/faas/code/golang/server/function_2. asm" /wasm file path
}
}
}
}
]

The primary logic in the configuration above is to receive HTTP requests, then call function2 through ABI, and return function2 as detailed below in code:

func (Ctx *pHeaders) OnHttpRequestBody(bodySize int, endOfStream bool) types.Action Led
/1. get request body
body, err := proxywasm. etHttpRequestBody(0, bodySize)
if err != nil L/
proxywasm.LogErrorf("GetHttpRequestBody failed: %v", err)
return types. ctionPause
}

/2. parse request param
bookName, err := getQueryParam(string(body), "name")
if err != nil Led
proxywasm. ogErrorf("param not found: %v", err)
returns types. ctionPause
}

/3. Request function2 through ABI
inventories, err := proxywasm. nvokeService("id_2", "", bookName)
if err != nil LO
proxywasm.Logrorf("invoke service failed: %v", err)
return types. ctionPause
}

/4. return result
proxywasm. ppendHttpResponseBody([]byte ("There are " + inventories + " inventories for " + bookName + ".")
return types.ActionContinue
}

Function2 Primary logic is to receive HTTP requests, then call redisis through ABI and return to redis, as shown below in code:

func (Ctx *pHeaders) OnHttpRequestBody(bodySize int, endOfStream bool) types.Action 6
//1. get requested body
body, err := proxywasm.GetHttpRequestBody(0, bodySize)
if err != nil Led
proxywasm. ogErrorf("GetHttpRequestBody failed: %v", err)
returns types.ActionPause
}
bookName:= string(body)

/ 2. get request state from redis by specific key through ABI
inventories, err := proxywastem. etState("redis", bookName)
if err != nil LO
proxywasm.LogErrorf("GetState failed: %v", err)
returns types. ctionPause
}

/ 3. return result
proxywasm.AppendHttpResponseBody([]byte(inventories))
return types.ActionContinue
}
  1. The Manager component of the Frame 1 WASM is initialized at Mosn filter Init stage as shown below in code:
// Create a proxy factory for WasmFilter
func createProxyWasmFilterFactory(confs map[string]interface{}) (api.StreamFilterChainFactory, error) {
factory := &FilterConfigFactory{
config: make([]*filterConfigItem, 0, len(confs)),
RootContextID: 1,
plugins: make(map[string]*WasmPlugin),
router: &Router{routes: make(map[string]*Group)},
}

for configID, confIf := range confs {
conf, ok := confIf.(map[string]interface{})
if !ok {
log.DefaultLogger.Errorf("[proxywasm][factory] createProxyWasmFilterFactory config not a map, configID: %s", configID)
return nil, errors.New("config not a map")
}
// Parse the wasm filter configuration
config, err := parseFilterConfigItem(conf)
if err != nil {
log.DefaultLogger.Errorf("[proxywasm][factory] createProxyWasmFilterFactory fail to parse config, configID: %s, err: %v", configID, err)
return nil, err
}

var pluginName string
if config.FromWasmPlugin == "" {
pluginName = utils.GenerateUUID()

// The WASM plug-in configuration is initialized according to the stream filter configuration. VmConfig is vm_config, and InstanceNum is instance_num
v2Config := v2.WasmPluginConfig{
PluginName: pluginName,
VmConfig: config.VmConfig,
InstanceNum: config.InstanceNum,
}

// The WasmManager instance manages the configuration of all plug-ins in a unified manner by managing the PluginWrapper object, providing the ability to add, delete, check and modify. Continue 3
err = wasm.GetWasmManager().AddOrUpdateWasm(v2Config)
if err != nil {
config.PluginName = pluginName
addWatchFile(config, factory)
continue
}

addWatchFile(config, factory)
} else {
pluginName = config.FromWasmPlugin
}
config.PluginName = pluginName

// PluginWrapper wraps the plug-in and configuration in AddOrUpdateWasm above to complete the initialization, which is pulled from sync.Map according to the plug-in name to manage and register the PluginHandler
pw := wasm.GetWasmManager().GetWasmPluginWrapperByName(pluginName)
if pw == nil {
return nil, errors.New("plugin not found")
}

config.VmConfig = pw.GetConfig().VmConfig
factory.config = append(factory.config, config)

wasmPlugin := &WasmPlugin{
pluginName: config.PluginName,
plugin: pw.GetPlugin(),
rootContextID: config.RootContextID,
config: config,
}
factory.plugins[config.PluginName] = wasmPlugin
// Register PluginHandler to provide extended callback capabilities for the plug-in's life cycle, such as the plug-in starting OnPluginStart and updating OnConfigUpdate. Continue 4
pw.RegisterPluginHandler(factory)
}

return factory, nil
}

3 Corresponding to Figure 1 WASM frame, NewWasmPlugin, for creating initialization of the WASM plugin, where VM, Module and Instance refer to virtual machines, modules and instances in WASM, as shown below in code:

func NewWasmPlugin(wasmConfig v2.WasmPluginConfig) (types.WasmPlugin, error) {
// check instance num
instanceNum := wasmConfig.InstanceNum
if instanceNum <= 0 {
instanceNum = runtime.NumCPU()
}

wasmConfig.InstanceNum = instanceNum

// Get the wasmer compilation and execution engine according to the configuration
vm := GetWasmEngine(wasmConfig.VmConfig.Engine)
if vm == nil {
log.DefaultLogger.Errorf("[wasm][plugin] NewWasmPlugin fail to get wasm engine: %v", wasmConfig.VmConfig.Engine)
return nil, ErrEngineNotFound
}

// load wasm bytes
var wasmBytes []byte
if wasmConfig.VmConfig.Path != "" {
wasmBytes = loadWasmBytesFromPath(wasmConfig.VmConfig.Path)
} else {
wasmBytes = loadWasmBytesFromUrl(wasmConfig.VmConfig.Url)
}

if len(wasmBytes) == 0 {
log.DefaultLogger.Errorf("[wasm][plugin] NewWasmPlugin fail to load wasm bytes, config: %v", wasmConfig)
return nil, ErrWasmBytesLoad
}

md5Bytes := md5.Sum(wasmBytes)
newMd5 := hex.EncodeToString(md5Bytes[:])
if wasmConfig.VmConfig.Md5 == "" {
wasmConfig.VmConfig.Md5 = newMd5
} else if newMd5 != wasmConfig.VmConfig.Md5 {
log.DefaultLogger.Errorf("[wasm][plugin] NewWasmPlugin the hash(MD5) of wasm bytes is incorrect, config: %v, real hash: %s",
wasmConfig, newMd5)
return nil, ErrWasmBytesIncorrect
}

// Create the WASM module, which is the stateless binary code that has been compiled
module := vm.NewModule(wasmBytes)
if module == nil {
log.DefaultLogger.Errorf("[wasm][plugin] NewWasmPlugin fail to create module, config: %v", wasmConfig)
return nil, ErrModuleCreate
}

plugin := &wasmPluginImpl{
config: wasmConfig,
vm: vm,
wasmBytes: wasmBytes,
module: module,
}

plugin.SetCpuLimit(wasmConfig.VmConfig.Cpu)
plugin.SetMemLimit(wasmConfig.VmConfig.Mem)

// Contains module and runtime state to create instance, notable is that here will call proxywasm. RegisterImports registered users realize the Imports of function, Examples include proxy_invoke_service and proxy_get_state
actual := plugin.EnsureInstanceNum(wasmConfig.InstanceNum)
if actual == 0 {
log.DefaultLogger.Errorf("[wasm][plugin] NewWasmPlugin fail to ensure instance num, want: %v got 0", instanceNum)
return nil, ErrInstanceCreate
}

return plugin, nil
}

Corresponding to ABI components in Figure 1 WASM frames, the OnPluginStart method calls proxy-wasm-go-host corresponding to ABI Exports and Imports etc.

// Execute the plugin of FilterConfigFactory
func (f *FilterConfigFactory) OnPluginStart(plugin types.WasmPlugin) {
plugin.Exec(func(instance types.WasmInstance) bool {
wasmPlugin, ok := f.plugins[plugin.PluginName()]
if !ok {
log.DefaultLogger.Errorf("[proxywasm][factory] createProxyWasmFilterFactory fail to get wasm plugin, PluginName: %s",
plugin.PluginName())
return true
}

// 获取 proxy_abi_version_0_2_0 版本的与 WASM 交互的 API
a := abi.GetABI(instance, AbiV2)
a.SetABIImports(f)
exports := a.GetABIExports().(Exports)
f.LayottoHandler.Instance = instance

instance.Lock(a)
defer instance.Unlock()

// Use the exports function proxy_get_id (which corresponds to the GetID function in the WASM plug-in) to get the ID of WASM
id, err := exports.ProxyGetID()
if err != nil {
log.DefaultLogger.Errorf("[proxywasm][factory] createProxyWasmFilterFactory fail to get wasm id, PluginName: %s, err: %v",
plugin.PluginName(), err)
return true
}
// If you register the ID and the corresponding plug-in in the route, the route can be performed using the key-value pair in the http Header. For example, 'id:id_1' is routed to Function1 based on id_1
f.router.RegisterRoute(id, wasmPlugin)

// The root context is created by proxy_on_context_create when the first plug-in is loaded with the given root ID and persists for the entire life of the virtual machine until proxy_on_delete is deleted
// It is worth noting that the first plug-in here refers to a use case where multiple loosely bound plug-ins (accessed via the SDK using the Root ID to the Root Context) share data within the same configured virtual machine [4]
err = exports.ProxyOnContextCreate(f.RootContextID, 0)
if err != nil {
log.DefaultLogger.Errorf("[proxywasm][factory] OnPluginStart fail to create root context id, err: %v", err)
return true
}

vmConfigSize := 0
if vmConfigBytes := wasmPlugin.GetVmConfig(); vmConfigBytes != nil {
vmConfigSize = vmConfigBytes.Len()
}

// VM is called when the plug-in is started with the startup
_, err = exports.ProxyOnVmStart(f.RootContextID, int32(vmConfigSize))
if err != nil {
log.DefaultLogger.Errorf("[proxywasm][factory] OnPluginStart fail to create root context id, err: %v", err)
return true
}

pluginConfigSize := 0
if pluginConfigBytes := wasmPlugin.GetPluginConfig(); pluginConfigBytes != nil {
pluginConfigSize = pluginConfigBytes.Len()
}

// Called when the plug-in loads or reloads its configuration
_, err = exports.ProxyOnConfigure(f.RootContextID, int32(pluginConfigSize))
if err != nil {
log.DefaultLogger.Errorf("[proxywasm][factory] OnPluginStart fail to create root context id, err: %v", err)
return true
}

return true
})
}

Workflow

The workflow for Layotto Middle WASM is broadly as shown in figure 2 Layotto & Mosn WASM workflow, where the configuration is largely covered by the initial elements above, with a focus on the request processing. mosn_wasm_ext_framework_workflow

Figure 2 Layotto & Mosn WAS Workflow

By Layotto underneath Mosn, as a workpool schedule, implement the OnReceive method of StreamFilterChain to Wasm StreamFilter in proxy downstream, as configured and detailed in code: below

func (f *Filter) OnReceive(ctx context.Context, headers api.HeaderMap, buf buffer.IoBuffer, trailers api.HeaderMap) api.StreamFilterStatus {
// Gets the id of the WASM plug-in
id, ok := headers.Get("id")
if !ok {
log.DefaultLogger.Errorf("[proxywasm][filter] OnReceive call ProxyOnRequestHeaders no id in headers")
return api.StreamFilterStop
}

// Obtain the WASM plug-in from the router based on its id
wasmPlugin, err := f.router.GetRandomPluginByID(id)
if err != nil {
log.DefaultLogger.Errorf("[proxywasm][filter] OnReceive call ProxyOnRequestHeaders id, err: %v", err)
return api.StreamFilterStop
}
f.pluginUsed = wasmPlugin

plugin := wasmPlugin.plugin
// Obtain an instance of WasmInstance
instance := plugin.GetInstance()
f.instance = instance
f.LayottoHandler.Instance = instance

// The ABI consists of Exports and Imports, through which users interact with the WASM extension
pluginABI := abi.GetABI(instance, AbiV2)
if pluginABI == nil {
log.DefaultLogger.Errorf("[proxywasm][filter] OnReceive fail to get instance abi")
plugin.ReleaseInstance(instance)
return api.StreamFilterStop
}
// Set the Imports section. The import section is provided by the user. The execution of the virtual machine depends on some of the capabilities provided by the host Layotto, such as obtaining request information, which are provided by the user through the import section and invoked by the WASM extension
pluginABI.SetABIImports(f)

// The Exports section is provided by the WASM plug-in and can be called directly by the user to wake up the WASM virtual machine and execute the corresponding WASM plug-in code in the virtual machine
exports := pluginABI.GetABIExports().(Exports)
f.exports = exports

instance.Lock(pluginABI)
defer instance.Unlock()

// Create the current plug-in context according to rootContextID and contextID
err = exports.ProxyOnContextCreate(f.contextID, wasmPlugin.rootContextID)
if err != nil {
log.DefaultLogger.Errorf("[proxywasm][filter] NewFilter fail to create context id: %v, rootContextID: %v, err: %v",
f.contextID, wasmPlugin.rootContextID, err)
return api.StreamFilterStop
}

endOfStream := 1
if (buf != nil && buf.Len() > 0) || trailers != nil {
endOfStream = 0
}

// Call proxy-wasm-go-host, encoding the request header in the format specified by the specification
action, err := exports.ProxyOnRequestHeaders(f.contextID, int32(headerMapSize(headers)), int32(endOfStream))
if err != nil || action != proxywasm.ActionContinue {
log.DefaultLogger.Errorf("[proxywasm][filter] OnReceive call ProxyOnRequestHeaders err: %v", err)
return api.StreamFilterStop
}

endOfStream = 1
if trailers != nil {
endOfStream = 0
}

if buf == nil {
arg, _ := variable.GetString(ctx, types.VarHttpRequestArg)
f.requestBuffer = buffer.NewIoBufferString(arg)
} else {
f.requestBuffer = buf
}

if f.requestBuffer != nil && f.requestBuffer.Len() > 0 {
// Call proxy-wasm-go-host, encoding the request body in the format specified by the specification
action, err = exports.ProxyOnRequestBody(f.contextID, int32(f.requestBuffer.Len()), int32(endOfStream))
if err != nil || action != proxywasm.ActionContinue {
log.DefaultLogger.Errorf("[proxywasm][filter] OnReceive call ProxyOnRequestBody err: %v", err)
return api.StreamFilterStop
}
}

if trailers != nil {
// Call proxy-wasm-go-host, encoding the request tail in the format specified by the specification
action, err = exports.ProxyOnRequestTrailers(f.contextID, int32(headerMapSize(trailers)))
if err != nil || action != proxywasm.ActionContinue {
log.DefaultLogger.Errorf("[proxywasm][filter] OnReceive call ProxyOnRequestTrailers err: %v", err)
return api.StreamFilterStop
}
}

return api.StreamFilterContinue
}

2, proxy-wasm-go-host encode Mosn requests for triplets into the specified format and call Proxy-Wasm ABI equivalent interface in Proxy_on_request_headers and call the WASMER virtual machine to pass the request information to the WASM plugin.

func (a *ABIContext) CallWasmFunction (functionName string, args ..interface{}) (interface{}, Action, error) um
ff, err := a.Instance. eExportsFunc(functionName)
if err != nil {
return nil, ActionContinue, err
}

// Call waste virtual machine (Github.com/wasmerio/wasmer-go/wasmer.(*Function).Call at function.go)
res, err := ff. all(args....)
if err != nil L/
a.Instance.HandleError(err)
return nil, ActionContinue, err
}

// if we have sync call, e. HttpCall, then unlocked the waste instance and wait until it resp
action := a.Imports.Wait()

return res, action, nil
}
  1. The WASMER virtual machine is processed to call specific functions of the WASM plug-in, such as the OnHttpRequestBody function in the example // function, := instance.Exports.GetFunction("exported_function") // nativeFunction = function.Native() // = nativeFunction(1, 2, 3) // Native converts Function to a native Go function that can be called
func (self *Function) Native() NativeFunction {
...
self.lazyNative = func(receivedParameters ...interface{}) (interface{}, error) {
numberOfReceivedParameters := len(receivedParameters)
numberOfExpectedParameters := len(expectedParameters)
...
results := C.wasm_val_vec_t{}
C.wasm_val_vec_new_uninitialized(&results, C.size_t(len(ty.Results())))
defer C.wasm_val_vec_delete(&results)

arguments := C.wasm_val_vec_t{}
defer C.wasm_val_vec_delete(&arguments)

if numberOfReceivedParameters > 0 {
C.wasm_val_vec_new(&arguments, C.size_t(numberOfReceivedParameters), (*C.wasm_val_t)(unsafe.Pointer(&allArguments[0])))
}

// Call functions inside the WASM plug-in
trap := C.wasm_func_call(self.inner(), &arguments, &results)

runtime.KeepAlive(arguments)
runtime.KeepAlive(results)
...
}

return self.lazyNative
}

4, proxy-wasm-go-sdk converts the requested data from the normative format to a user-friendly format and then calls the user extension code.Proxy-wasm-go-sdk, based on proxy-waste/spec implementation, defines the interface between function access to system resources and infrastructure services, and builds on this integration of the Runtime API, adding ABI to infrastructure access.

// function1The main logic is to receive the HTTP request, call function2 using the ABI, and return the function2 result. The code is as follows
func (ctx *httpHeaders) OnHttpRequestBody(bodySize int, endOfStream bool) types.Action {
//1. get request body
body, err := proxywasm.GetHttpRequestBody(0, bodySize)
if err != nil {
proxywasm.LogErrorf("GetHttpRequestBody failed: %v", err)
return types.ActionPause
}

//2. parse request param
bookName, err := getQueryParam(string(body), "name")
if err != nil {
proxywasm.LogErrorf("param not found: %v", err)
return types.ActionPause
}

//3. request function2 through ABI
inventories, err := proxywasm.InvokeService("id_2", "", bookName)
if err != nil {
proxywasm.LogErrorf("invoke service failed: %v", err)
return types.ActionPause
}

//4. return result
proxywasm.AppendHttpResponseBody([]byte("There are " + inventories + " inventories for " + bookName + "."))
return types.ActionContinue
}

5, WASM plugin is registered at RegisterFunc initialization. For example, Function1 RPC calls Proxy InvokeService,Function2 to get ProxyGetState specified in Redis as shown below in:

Function1 Call Function2, Proxy InvokeService for Imports function proxy_invoke_service through the Proxy InvokeService

func ProxyInvokeService(instance common). asmInstance, idPtr int32, idSize int32, methodPtr int32, methodPtr int32, paramPtr int32, resultPtr int32, resultSize int32) int32 56
id, err := instance. etMemory(uint64(idPtr), uint64(idSize))
if err != nil LO
returnWasmResultInvalidMemoryAcces.Int32()
}

method, err := instance. etMemory(uint64 (methodPtr), uint64 (methodSize))
if err != nil LO
returnWasmResultInvalidMemoryAccess. nt32()
}

param, err := instance.GetMemory(uint64 (paramPtr), uint64 (paramSize))
if err != nil Fe
returnn WasmResultInvalidMemoryAccess. nt32()
}

ctx:= getImportHandler(instance)

// Laytto rpc calls
ret, res := ctx. nvokeService(string(id), string(param))
if res != WasmResultOk 6
return res.Int32()


return copyIntoInstance(instance, ret, resultPtr, resultSize).Int32()
}

Function2 Get Redis via ProxyGetState to specify key Valye, ProxyGetState for Imports function proxy_get_state

func ProxyGetState(instance common.WasmInstance, storeNamePtr int32, storeNameSize int32, keyPtr int32, valuePtr int32, valueSize int32) int32 Fe
storeName, err := instance. etMemory(uint64 (storeNamePtr), uint64 (storeNameSize))
if err != nil LO
returnWasmResultInvalidMemoryAccess.Int32()
}

key, err := instance. etMemory(uint64(keyPtr), uint64(keySize))
if err != nil LO
returnWasmResultInvalidMemoryAccess.Int32()
}

ctx := getImportHandler(instance)

ret, res := ctx. etState(string(storeName), string(key))
if res != WasmResultOk 6
return res.Int32()
}

return copyIntoInstance(instance, ret, valuePtr, valueSize). Int32()
}

More than the Layotto rpc process is briefly described as the implementation of [5]by two virtual connections using the Dapr API and underneath Mosn, see previous order articles [Layotto source parsing — processing RPC requests] (https://mosn.io/layotto/#/blog/code/layotto-rpc/index), where data from Redis can be obtained directly from Dapr State code and is not developed here.

FaaS Mode

Look back back to the WASM features:bytes code that match the machine code; guarantee good segregation and security in the sandbox; compile cross-platforms, easily distributed, and load running; have lightweight and multilingual flexibilities and seem naturally suitable for FaaS.

So Layotto also explores support for WASM FaaS mode by loading and running WASM carrier functions and supporting interfaces and access to infrastructure between Function.Since the core logic of loading the WASM has not changed, except that there is a difference between usage and deployment methods and those described above, the Layotto load part of the ASM logic is not redundant.

In addition to the Wasm-Proxy implementation, the core logic of the FaaS mode is to manage the *.wasm package and Kubernetes excellent structuring capabilities by expanding Containerd to multiple-run plugins containerd-shim-layotto-v2 [6]and using this "piercing wire" ingenuity to use Docker mirror capability. Specific structures and workflows can be found in Figure 3 Layotto FaaS Workflow.

layotto_faas_workflow

Figure 3 Layotto FaaS Workflow

Here a simple look at the master function of containerd-shim-layotto-v2. It can be seen that shim.Run runs the WASM as io.containerd.layotto.v2, and runtime_type of the containerd plugins.crimerd.runtimes corresponding to the plugin.When creating a Pod, you specify runtimeClassName: layotto in yaml speed, and eventually kubelet will load and run them when cric-plugin calls containerd-shim-layotto-v2 is running.

func main() {
startLayotto()
// 解析输入参数,初始化运行时环境,调用 wasm.New 实例化 service 对象
shim.Run("io.containerd.layotto.v2", wasm.New)
}

func startLayotto() {
conn, err := net.Dial("tcp", "localhost:2045")
if err == nil {
conn.Close()
return
}

cmd := exec.Command("layotto", "start", "-c", "/home/docker/config.json")
cmd.Start()
}

Summary

Layotto WebAssemly involves more basic WASM knowledge, but it is understandable that the examples are shallow deeper and gradual.At the end of the spectrum, the ASM technology can be seen to have been applied to many fields such as Web-Front, Serverlessness, Game Scene, Edge Computing, Service Grids, or even to the Docker parent Solomon Hykes recently said: "If the WASM technology is available in 2008, I will not be able to do the Docker" (later added that:Docker will not be replaced and will walk side by side with WASM) The ASM seems to be becoming lighter and better performing cloud-origin technology and being applied to more areas after the VM and Container, while believing that there will be more use scenes and users in Mosn community push and in Layotto continue exploration, here Layotto WebAssemly relevant source code analysis has been completed. Given time and length, some more comprehensive and in-depth profiles have not been carried out, and if there are flaws, welcome fingers, contact:rayo. angzl@gmail.com.

References