当前位置:网站首页>Mqant in-depth analysis
Mqant in-depth analysis
2022-07-19 04:12:00 【sandyznb】
This article mainly focuses on BaseModule Of OnInit Carried out by
// OnInit When the module is initialized, call
func (m *BaseModule) OnInit(subclass module.RPCModule, app module.App, settings *conf.ModuleSettings, opt ...server.Option) {
// Initialization module
m.App = app
m.subclass = subclass
m.settings = settings
// Create a remote call RPC
opts := server.Options{
Metadata: map[string]string{},
}
for _, o := range opt {
o(&opts)
}
if opts.Registry == nil {
opt = append(opt, server.Registry(app.Registry()))
}
if opts.RegisterInterval == 0 {
opt = append(opt, server.RegisterInterval(app.Options().RegisterInterval))
}
if opts.RegisterTTL == 0 {
opt = append(opt, server.RegisterTTL(app.Options().RegisterTTL))
}
if len(opts.Name) == 0 {
opt = append(opt, server.Name(subclass.GetType()))
}
if len(opts.ID) == 0 {
opt = append(opt, server.ID(mqanttools.GenerateID().String()))
}
if len(opts.Version) == 0 {
opt = append(opt, server.Version(subclass.Version()))
}
server := server.NewServer(opt...)
err := server.OnInit(subclass, app, settings)
if err != nil {
log.Warning("server OnInit fail id(%s) error(%s)", m.GetServerID(), err)
}
hostname, _ := os.Hostname()
server.Options().Metadata["hostname"] = hostname
server.Options().Metadata["pid"] = fmt.Sprintf("%v", os.Getpid())
ctx, cancel := context.WithCancel(context.Background())
m.exit = cancel
m.serviceStopeds = make(chan bool)
m.service = service.NewService(
service.Server(server),
service.RegisterTTL(app.Options().RegisterTTL),
service.RegisterInterval(app.Options().RegisterInterval),
service.Context(ctx),
)
go func() {
err := m.service.Run()
if err != nil {
log.Warning("service run fail id(%s) error(%s)", m.GetServerID(), err)
}
close(m.serviceStopeds)
}()
m.GetServer().SetListener(m)
}
All the deeper things in the framework It's all from here , Mainly a service,service Depend on server, So let's take a look at server Creation and initialization of
One :server Creation and initialization of
// NewServer returns a new server with options passed in
func NewServer(opt ...Option) Server {
return newRPCServer(opt...)
}
func newRPCServer(opts ...Option) Server {
options := newOptions(opts...)
return &rpcServer{
opts: options,
exit: make(chan chan error),
}
}
func newOptions(opt ...Option) Options {
opts := Options{
Metadata: map[string]string{},
}
for _, o := range opt {
o(&opts)
}
if opts.Registry == nil {
opts.Registry = registry.DefaultRegistry
}
if len(opts.Address) == 0 {
opts.Address = DefaultAddress
}
if len(opts.Name) == 0 {
opts.Name = DefaultName
}
if len(opts.ID) == 0 {
opts.ID = DefaultID
}
if len(opts.Version) == 0 {
opts.Version = DefaultVersion
}
return opts
}
func (s *rpcServer) OnInit(module module.Module, app module.App, settings *conf.ModuleSettings) error {
server, err := defaultrpc.NewRPCServer(app, module) // By default, a local RPC
if err != nil {
log.Warning("Dial: %s", err)
}
s.server = server
s.opts.Address = server.Addr()
if err := s.ServiceRegister(); err != nil {
return err
}
return nil
}
This server What's done in the bag is actually right rpc_server and consul Packaging ,OnInit Only then created a local RPC, And then assign it to server object
server, err := defaultrpc.NewRPCServer(app, module) // By default, a local RPC
SetListener、Register、RegisterGO All calls are created above rpc It's the same as api
ServiceRegister and ServiceDeregister Mainly based on consul Requirements for registration of relevant information and elimination of registration information
OnDestroy() Mainly called Stop()---> s.server.Done() call rpcserver Of Done()
func (s *RPCServer) Done() (err error) {
// Wait for the request being executed to complete
//close(s.mq_chan) // close mq_chan passageway
//<-s.call_chan_done //mq_chan The information of the channel has been processed
s.wg.Wait()
//s.call_chan_done <- nil
// Close the queue link
if s.nats_server != nil {
err = s.nats_server.Shutdown()
}
return
}
server This package There are so many things , That's right rpcserver Packaging + Service registration and discovery Provide the interface
Two :service The creation of
// NewService NewService
func NewService(opts ...Option) Service {
return newService(opts...)
}
func newService(opts ...Option) Service {
options := newOptions(opts...)
return &service{
opts: options,
}
}
func newOptions(opts ...Option) Options {
opt := Options{
Registry: registry.DefaultRegistry,
Context: context.Background(),
}
for _, o := range opts {
o(&opt)
}
return opt
}
service The creation of It's all old school ,service After creation, it is called Run()
func (s *service) Run() error {
if err := s.Start(); err != nil {
return err
}
// start reg loop
ex := make(chan bool)
go s.run(ex)
//ch := make(chan os.Signal, 1)
//signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
select {
// wait on kill signal
//case <-ch:
// wait on context cancel
case <-s.opts.Context.Done():
}
// exit reg loop
close(ex)
return s.Stop()
}
Run() There is nothing in it , The first is to call service Their own Start(), Mainly call server Of Start()
func (s *service) Start() error {
for _, fn := range s.opts.BeforeStart {
if err := fn(); err != nil {
return err
}
}
if err := s.opts.Server.Start(); err != nil {
return err
}
if err := s.opts.Server.ServiceRegister(); err != nil {
return err
}
for _, fn := range s.opts.AfterStart {
if err := fn(); err != nil {
return err
}
}
return nil
}
server Of Start() In fact, nothing was done inside , however server Of ServiceRegister() But it's done , As already said . Actually server It has been executed once after the creation ServiceRegister, then serverice There's another one inside , this 2 Time Just remove it once , Personally, I think it's a little redundant .BeforeStart() and AfterStart() Set a callback and rent a call , It is seldom used
service Of Start() When it's done, you'll call your own run() 了
func (s *service) run(exit chan bool) {
if s.opts.RegisterInterval <= time.Duration(0) {
return
}
t := time.NewTicker(s.opts.RegisterInterval)
for {
select {
case <-t.C:
err := s.opts.Server.ServiceRegister()
if err != nil {
log.Warning("service run Server.Register error: ", err)
}
case <-exit:
t.Stop()
return
}
}
}
The main thing here is to turn on the timer , Timed call ServiceRegister()
then select Keep blocking waiting for the one created in front ctx Completion
That is to say, at this time service I'm running normally , When the customized module is OnDestroy()
m.exit() It's called cancel(), Called cancel() after service Of Run() Medium select There will be no more congestion
That is, the one in front ctx complete ,ex Then it is closed , that run() The timer in will be stopped ,s.Stop() Mainly called server Of ServiceDeregister() and Stop() It's called (Stop What I did has been said before ).Run() The life cycle of is coming ,serviceStopeds Then it is closed . Then it will be executed m.GetServer().OnDestroy() ,server Of OnDestroy() Also call server Of Stop(), Actually, I've done it once before .
边栏推荐
- 关于数据库的问题,唯一和非重复的概念
- leetcode7-dfs+动态规划+双指针
- Which computer painting software works well: try artweaver plus, which is comparable to Sai painting software | download the latest version of artweaver
- 小程序毕设作品之微信电子书阅读小程序毕业设计(3)后台功能
- Matlab drawing activation function sigmoid, relu
- [database] must know and know at the end of the period ----- Chapter 12 database recovery
- Idea configures SFTP and SSH, which is very convenient to deploy and locate error logs
- 学术分享 | 基于OpenVINO的多染色病理图像信息评估系统设计与开发
- Welcome to Hensen_ Blog directory of (full site navigation)
- How to realize the association between interfaces in JMeter?
猜你喜欢
学术分享 | 基于OpenVINO的多染色病理图像信息评估系统设计与开发
小程序毕设作品之微信在线教育视频点播学习小程序毕业设计(4)开题报告
Wechat online education video on demand learning applet graduation design (1) development outline
Wechat online education video on demand learning applet graduation design (3) background function
Timeline components
《云智面对面》直播等你来: 算力重新定义生产力
基于OpenVINO Model Server打造人像抠图服务器
英特尔助力开立医疗推动超声产检智能化
Chapter 1 performance platform godeye source code analysis - overall architecture
PAC十年:见证HPC从CPU时代走向XPU纪元
随机推荐
priority_queue的介绍及其使用
How to do clear scanning: try scanning tailor scantailor advanced | including the usage of scantailor
小程序毕设作品之微信在线教育视频点播学习小程序毕业设计(4)开题报告
micro、M3O微服务系列(三)
ASP.NET1==visual studio创建asp.net demo
7.16 simulation summary
小程序毕设作品之微信在线教育视频点播学习小程序毕业设计(3)后台功能
通过Dao投票STI的销毁,SeekTiger真正做到由社区驱动
Timeline components
Buddy: initialize memory domain
Openresty 做静态资源服务器
Cocos creator 3.0 foundation - event system
Laradock restart MySQL found
小程序毕设作品之微信电子书阅读小程序毕业设计(5)任务书
Chapter 0 performance platform godeye source code analysis - Course Introduction
基于stm32f103的智能风扇系统
关于数据库的问题,唯一和非重复的概念
Openresty as a static resource server
wpf 找不到资源文件问题
Typescript数组/对象/字符串/函数参数的解构使用