当前位置:网站首页>Mqant in-depth analysis

Mqant in-depth analysis

2022-07-19 04:12:00 sandyznb

This article mainly focuses on BaseModule Of OnInit Carried out by

// OnInit  When the module is initialized, call 
func (m *BaseModule) OnInit(subclass module.RPCModule, app module.App, settings *conf.ModuleSettings, opt ...server.Option) {
	// Initialization module 
	m.App = app
	m.subclass = subclass
	m.settings = settings
	// Create a remote call RPC

	opts := server.Options{
		Metadata: map[string]string{},
	}
	for _, o := range opt {
		o(&opts)
	}
	if opts.Registry == nil {
		opt = append(opt, server.Registry(app.Registry()))
	}

	if opts.RegisterInterval == 0 {
		opt = append(opt, server.RegisterInterval(app.Options().RegisterInterval))
	}

	if opts.RegisterTTL == 0 {
		opt = append(opt, server.RegisterTTL(app.Options().RegisterTTL))
	}

	if len(opts.Name) == 0 {
		opt = append(opt, server.Name(subclass.GetType()))
	}

	if len(opts.ID) == 0 {
		opt = append(opt, server.ID(mqanttools.GenerateID().String()))
	}

	if len(opts.Version) == 0 {
		opt = append(opt, server.Version(subclass.Version()))
	}
	server := server.NewServer(opt...)
	err := server.OnInit(subclass, app, settings)
	if err != nil {
		log.Warning("server OnInit fail id(%s) error(%s)", m.GetServerID(), err)
	}
	hostname, _ := os.Hostname()
	server.Options().Metadata["hostname"] = hostname
	server.Options().Metadata["pid"] = fmt.Sprintf("%v", os.Getpid())
	ctx, cancel := context.WithCancel(context.Background())
	m.exit = cancel
	m.serviceStopeds = make(chan bool)
	m.service = service.NewService(
		service.Server(server),
		service.RegisterTTL(app.Options().RegisterTTL),
		service.RegisterInterval(app.Options().RegisterInterval),
		service.Context(ctx),
	)

	go func() {
		err := m.service.Run()
		if err != nil {
			log.Warning("service run fail id(%s) error(%s)", m.GetServerID(), err)
		}
		close(m.serviceStopeds)
	}()
	m.GetServer().SetListener(m)
}

All the deeper things in the framework It's all from here , Mainly a service,service Depend on server, So let's take a look at server Creation and initialization of

One :server Creation and initialization of

// NewServer returns a new server with options passed in
func NewServer(opt ...Option) Server {
	return newRPCServer(opt...)
}

func newRPCServer(opts ...Option) Server {
	options := newOptions(opts...)
	return &rpcServer{
		opts: options,
		exit: make(chan chan error),
	}
}

func newOptions(opt ...Option) Options {
	opts := Options{
		Metadata: map[string]string{},
	}

	for _, o := range opt {
		o(&opts)
	}

	if opts.Registry == nil {
		opts.Registry = registry.DefaultRegistry
	}

	if len(opts.Address) == 0 {
		opts.Address = DefaultAddress
	}

	if len(opts.Name) == 0 {
		opts.Name = DefaultName
	}

	if len(opts.ID) == 0 {
		opts.ID = DefaultID
	}

	if len(opts.Version) == 0 {
		opts.Version = DefaultVersion
	}

	return opts
}

func (s *rpcServer) OnInit(module module.Module, app module.App, settings *conf.ModuleSettings) error {
	server, err := defaultrpc.NewRPCServer(app, module) // By default, a local RPC
	if err != nil {
		log.Warning("Dial: %s", err)
	}
	s.server = server
	s.opts.Address = server.Addr()
	if err := s.ServiceRegister(); err != nil {
		return err
	}
	return nil
}

This server What's done in the bag is actually right rpc_server and consul Packaging ,OnInit Only then created a local RPC, And then assign it to server object

server, err := defaultrpc.NewRPCServer(app, module) // By default, a local RPC

SetListener、Register、RegisterGO All calls are created above rpc It's the same as api

ServiceRegister and ServiceDeregister Mainly based on consul Requirements for registration of relevant information and elimination of registration information

 OnDestroy() Mainly called Stop()---> s.server.Done()  call rpcserver Of Done()

func (s *RPCServer) Done() (err error) {
	// Wait for the request being executed to complete 
	//close(s.mq_chan)   // close mq_chan passageway 
	//<-s.call_chan_done //mq_chan The information of the channel has been processed 
	s.wg.Wait()
	//s.call_chan_done <- nil
	// Close the queue link 
	if s.nats_server != nil {
		err = s.nats_server.Shutdown()
	}
	return
}

server This package There are so many things , That's right rpcserver Packaging + Service registration and discovery Provide the interface

Two :service The creation of

// NewService NewService
func NewService(opts ...Option) Service {
	return newService(opts...)
}
func newService(opts ...Option) Service {
	options := newOptions(opts...)

	return &service{
		opts: options,
	}
}
func newOptions(opts ...Option) Options {
	opt := Options{
		Registry: registry.DefaultRegistry,
		Context:  context.Background(),
	}

	for _, o := range opts {
		o(&opt)
	}

	return opt
}

service The creation of It's all old school ,service After creation, it is called Run()

func (s *service) Run() error {
	if err := s.Start(); err != nil {
		return err
	}

	// start reg loop
	ex := make(chan bool)
	go s.run(ex)

	//ch := make(chan os.Signal, 1)
	//signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)

	select {
	// wait on kill signal
	//case <-ch:
	// wait on context cancel
	case <-s.opts.Context.Done():
	}

	// exit reg loop
	close(ex)
	return s.Stop()
}

Run() There is nothing in it , The first is to call service Their own Start(), Mainly call server Of Start()

func (s *service) Start() error {
	for _, fn := range s.opts.BeforeStart {
		if err := fn(); err != nil {
			return err
		}
	}

	if err := s.opts.Server.Start(); err != nil {
		return err
	}

	if err := s.opts.Server.ServiceRegister(); err != nil {
		return err
	}

	for _, fn := range s.opts.AfterStart {
		if err := fn(); err != nil {
			return err
		}
	}

	return nil
}

 server Of Start() In fact, nothing was done inside , however server Of ServiceRegister() But it's done , As already said . Actually server It has been executed once after the creation ServiceRegister, then serverice There's another one inside , this 2 Time Just remove it once , Personally, I think it's a little redundant .BeforeStart() and  AfterStart() Set a callback and rent a call , It is seldom used

service Of Start() When it's done, you'll call your own run() 了

func (s *service) run(exit chan bool) {
	if s.opts.RegisterInterval <= time.Duration(0) {
		return
	}

	t := time.NewTicker(s.opts.RegisterInterval)

	for {
		select {
		case <-t.C:
			err := s.opts.Server.ServiceRegister()
			if err != nil {
				log.Warning("service run Server.Register error: ", err)
			}
		case <-exit:
			t.Stop()
			return
		}
	}
}

The main thing here is to turn on the timer , Timed call ServiceRegister()

then select Keep blocking waiting for the one created in front ctx Completion

That is to say, at this time service I'm running normally , When the customized module is OnDestroy()

 m.exit() It's called cancel(), Called cancel() after service Of Run() Medium select There will be no more congestion

That is, the one in front ctx complete ,ex Then it is closed , that run() The timer in will be stopped ,s.Stop() Mainly called server Of ServiceDeregister() and Stop() It's called (Stop What I did has been said before ).Run() The life cycle of is coming ,serviceStopeds Then it is closed . Then it will be executed m.GetServer().OnDestroy() ,server Of OnDestroy() Also call server Of Stop(), Actually, I've done it once before .

 

原网站

版权声明
本文为[sandyznb]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/200/202207170329454331.html