T4L2

Time to learn for everything

0%

KubeEdge源码分析之CloudCore

阅读本文前,建议先看《KubeEdge概念一览》中的beehive和ChannelContext部分。

1. CloudCore概览

CloudCore是Cloud部分各功能模块的集合,其入口代码在cloudcore.go文件中,如下所示:

1
func main() {
2
   command := app.NewCloudCoreCommand() // 此函数是对cobra调用的封装
3
   logs.InitLogs()
4
   defer logs.FlushLogs()
5
6
   if err := command.Execute(); err != nil {
7
      os.Exit(1)
8
   }
9
}

可见,cloudcore的重点在于app.NewCloudCoreCommand()这个函数,具体如下:

1
func NewCloudCoreCommand() *cobra.Command {
2
   opts := options.NewCloudCoreOptions()
3
   cmd := &cobra.Command{
4
      // ...
5
      Run: func(cmd *cobra.Command, args []string) {
6
         // ...
7
         registerModules(config)
8
         // start all modules
9
         core.Run()
10
      },
11
   }
12
   // ...
13
   return cmd
14
}

这个函数很长,由于是对cobra调用的封装,这里只分析其中的重点部分,cobra知识不再细说。这里的重点就在于两个函数:registerModules注册cloudcore中的功能模块;core.Run启动注册的这些功能模块。截至KubeEdge v1.2,这里共注册启动四大功能模块:CloudHub、EdgeController、DeviceController和SyncController。

2. 注册功能模块

注册功能模块的函数如下:

1
func registerModules(c *v1alpha1.CloudCoreConfig) {
2
   cloudhub.Register(c.Modules.CloudHub, c.KubeAPIConfig)
3
   edgecontroller.Register(c.Modules.EdgeController, c.KubeAPIConfig, "", false)
4
   devicecontroller.Register(c.Modules.DeviceController, c.KubeAPIConfig)
5
   synccontroller.Register(c.Modules.SyncController, c.KubeAPIConfig)
6
}

实际上就是调用了各个模块自己的注册函数,而且各个模块的注册函数也基本一致,以CloudHub为例:

1
func Register(hub *v1alpha1.CloudHub, kubeAPIConfig *v1alpha1.KubeAPIConfig) {
2
   hubconfig.InitConfigure(hub, kubeAPIConfig)
3
   core.Register(newCloudHub(hub.Enable))
4
}

注册做了两件事:初始化配置信息、将模块注册进cloudcore中,具体来看core.Register这个通用注册函数:

1
var (
2
	// Modules map
3
	modules         map[string]Module
4
	disabledModules map[string]Module
5
)
6
7
// Register register module
8
func Register(m Module) {
9
   if m.Enable() {
10
      modules[m.Name()] = m
11
      klog.Infof("Module %v registered successfully", m.Name())
12
   } else {
13
      disabledModules[m.Name()] = m
14
      klog.Warningf("Module %v is disabled, do not register", m.Name())
15
   }
16
}

就是将模块注册到一个全局唯一的map中去。注册中各模块的构造函数也很简单,之需要填充一个enable布尔值,不过SyncController是个特例,在它的博文中会具体分析。

这样,各模块就注册完成了,可以启动了。

3. 功能模块启动

1
// Run starts the modules and in the end does module cleanup
2
func Run() {
3
   // Address the module registration and start the core
4
   StartModules()
5
   // monitor system signal and shutdown gracefully
6
   GracefulShutdown()
7
}

启动也做了两件事:启动各个模块、监听模块停止。其中启动函数如下:

1
// StartModules starts modules that are registered
2
func StartModules() {
3
   beehiveContext.InitContext(beehiveContext.MsgCtxTypeChannel)
4
5
   modules := GetModules()
6
   for name, module := range modules {
7
      //Init the module
8
      beehiveContext.AddModule(name)
9
      //Assemble typeChannels for sendToGroup
10
      beehiveContext.AddModuleGroup(name, module.Group())
11
      go module.Start()
12
      klog.Infof("Starting module %v", name)
13
   }
14
}

首先初始化了beehiveContext,它负责各个模块间的通信,具体可见《KubeEdge概念一览》中对beehive和ChannelContext的分析,而modules就是之前注册模块的map了,将各个模块和所在组注册到beehiveContext中,才能实现通信。最后,启动每一个模块,这里实际用的就是各个模块自己的Start()函数了,将在单独的博文中进行分析。

监听模块停止函数如下:

1
// GracefulShutdown is if it gets the special signals it does modules cleanup
2
func GracefulShutdown() {
3
   c := make(chan os.Signal)
4
   signal.Notify(c, syscall.SIGINT, syscall.SIGHUP, syscall.SIGTERM,
5
      syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP, syscall.SIGABRT)
6
   select {
7
   case s := <-c:
8
      klog.Infof("Get os signal %v", s.String())
9
      //Cleanup each modules
10
      beehiveContext.Cancel()
11
      modules := GetModules()
12
      for name, _ := range modules {
13
         klog.Infof("Cleanup module %v", name)
14
         beehiveContext.Cleanup(name)
15
      }
16
   }
17
}

在收到某些信号后,终止各个模块,并清理beehiveContext。

4. 总结

以上就是CloudCore大致的内容了,可见它的作用就是注册并启动云端的四大功能模块,以及实现模块间通信的beehiveContext。而对四大功能模块具体做了什么,将单独的博文中具体分析。