源码分析:Cilium Network Policy实现
daemon初始化
// daemon/cmd/daemon_main.go
var (
log = logging.DefaultLogger.WithField(logfields.LogSubsys, daemonSubsys)
bootstrapTimestamp = time.Now()
// RootCmd represents the base command when called without any subcommands
RootCmd = &cobra.Command{
Use: "cilium-agent",
Short: "Run the cilium agent",
Run: func(cmd *cobra.Command, args []string) {
cmdRefDir := viper.GetString(option.CMDRef)
if cmdRefDir != "" {
genMarkdown(cmd, cmdRefDir)
os.Exit(0)
}
// Open socket for using gops to get stacktraces of the agent.
addr := fmt.Sprintf("127.0.0.1:%d", viper.GetInt(option.GopsPort))
addrField := logrus.Fields{"address": addr}
if err := gops.Listen(gops.Options{
Addr: addr,
ReuseSocketAddrAndPort: true,
}); err != nil {
log.WithError(err).WithFields(addrField).Fatal("Cannot start gops server")
}
log.WithFields(addrField).Info("Started gops server")
bootstrapStats.earlyInit.Start()
initEnv(cmd)
bootstrapStats.earlyInit.End(true)
runDaemon()
},
}
bootstrapStats = bootstrapStatistics{}
)
func runDaemon() {
datapathConfig := linuxdatapath.DatapathConfiguration{
HostDevice: option.Config.HostDevice,
}
log.Info("Initializing daemon")
option.Config.RunMonitorAgent = true
if err := enableIPForwarding(); err != nil {
log.WithError(err).Fatal("Error when enabling sysctl parameters")
}
iptablesManager := &iptables.IptablesManager{}
iptablesManager.Init()
var wgAgent *wireguard.Agent
if option.Config.EnableWireguard {
switch {
case option.Config.EnableIPSec:
log.Fatalf("Wireguard (--%s) cannot be used with IPSec (--%s)",
option.EnableWireguard, option.EnableIPSecName)
case option.Config.EnableL7Proxy:
log.Fatalf("Wireguard (--%s) is not compatible with L7 proxy (--%s)",
option.EnableWireguard, option.EnableL7Proxy)
}
var err error
privateKeyPath := filepath.Join(option.Config.StateDir, wireguardTypes.PrivKeyFilename)
wgAgent, err = wireguard.NewAgent(privateKeyPath)
if err != nil {
log.WithError(err).Fatal("Failed to initialize wireguard")
}
cleaner.cleanupFuncs.Add(func() {
_ = wgAgent.Close()
})
} else {
// Delete wireguard device from previous run (if such exists)
link.DeleteByName(wireguardTypes.IfaceName)
}
if k8s.IsEnabled() {
bootstrapStats.k8sInit.Start()
if err := k8s.Init(option.Config); err != nil {
log.WithError(err).Fatal("Unable to initialize Kubernetes subsystem")
}
bootstrapStats.k8sInit.End(true)
}
// 实例化daemon
ctx, cancel := context.WithCancel(server.ServerCtx)
d, restoredEndpoints, err := NewDaemon(ctx, cancel,
WithDefaultEndpointManager(ctx, endpoint.CheckHealth),
linuxdatapath.NewDatapath(datapathConfig, iptablesManager, wgAgent))
if err != nil {
select {
case <-server.ServerCtx.Done():
log.WithError(err).Debug("Error while creating daemon")
default:
log.WithError(err).Fatal("Error while creating daemon")
}
return
}
// This validation needs to be done outside of the agent until
// datapath.NodeAddressing is used consistently across the code base.
log.Info("Validating configured node address ranges")
if err := node.ValidatePostInit(); err != nil {
log.WithError(err).Fatal("postinit failed")
}
bootstrapStats.enableConntrack.Start()
log.Info("Starting connection tracking garbage collector")
gc.Enable(option.Config.EnableIPv4, option.Config.EnableIPv6,
restoredEndpoints.restored, d.endpointManager)
bootstrapStats.enableConntrack.End(true)
bootstrapStats.k8sInit.Start()
if k8s.IsEnabled() {
// Wait only for certain caches, but not all!
// (Check Daemon.InitK8sSubsystem() for more info)
<-d.k8sCachesSynced
}
bootstrapStats.k8sInit.End(true)
restoreComplete := d.initRestore(restoredEndpoints)
if wgAgent != nil {
if err := wgAgent.RestoreFinished(); err != nil {
log.WithError(err).Error("Failed to set up wireguard peers")
}
}
if d.endpointManager.HostEndpointExists() {
d.endpointManager.InitHostEndpointLabels(d.ctx)
} else {
log.Info("Creating host endpoint")
if err := d.endpointManager.AddHostEndpoint(
d.ctx, d, d, d.l7Proxy, d.identityAllocator,
"Create host endpoint", nodeTypes.GetName(),
); err != nil {
log.WithError(err).Fatal("Unable to create host endpoint")
}
}
if option.Config.EnableIPMasqAgent {
ipmasqAgent, err := ipmasq.NewIPMasqAgent(option.Config.IPMasqAgentConfigPath)
if err != nil {
log.WithError(err).Fatal("Failed to create ip-masq-agent")
}
ipmasqAgent.Start()
}
if !option.Config.DryMode {
go func() {
if restoreComplete != nil {
<-restoreComplete
}
d.dnsNameManager.CompleteBootstrap()
ms := maps.NewMapSweeper(&EndpointMapManager{
EndpointManager: d.endpointManager,
})
ms.CollectStaleMapGarbage()
ms.RemoveDisabledMaps()
if len(d.restoredCIDRs) > 0 {
// Release restored CIDR identities after a grace period (default 10
// minutes). Any identities actually in use will still exist after
// this.
//
// This grace period is needed when running on an external workload
// where policy synchronization is not done via k8s. Also in k8s
// case it is prudent to allow concurrent endpoint regenerations to
// (re-)allocate the restored identities before we release them.
time.Sleep(option.Config.IdentityRestoreGracePeriod)
log.Debugf("Releasing reference counts for %d restored CIDR identities", len(d.restoredCIDRs))
ipcache.ReleaseCIDRIdentitiesByCIDR(d.restoredCIDRs)
// release the memory held by restored CIDRs
d.restoredCIDRs = nil
}
}()
d.endpointManager.Subscribe(d)
defer d.endpointManager.Unsubscribe(d)
}
// Migrating the ENI datapath must happen before the API is served to
// prevent endpoints from being created. It also must be before the health
// initialization logic which creates the health endpoint, for the same
// reasons as the API being served. We want to ensure that this migration
// logic runs before any endpoint creates.
if option.Config.IPAM == ipamOption.IPAMENI {
migrated, failed := linuxrouting.NewMigrator(
&eni.InterfaceDB{},
).MigrateENIDatapath(option.Config.EgressMultiHomeIPRuleCompat)
switch {
case failed == -1:
// No need to handle this case specifically because it is handled
// in the call already.
case migrated >= 0 && failed > 0:
log.Errorf("Failed to migrate ENI datapath. "+
"%d endpoints were successfully migrated and %d failed to migrate completely. "+
"The original datapath is still in-place, however it is recommended to retry the migration.",
migrated, failed)
case migrated >= 0 && failed == 0:
log.Infof("Migration of ENI datapath successful, %d endpoints were migrated and none failed.",
migrated)
}
}
bootstrapStats.healthCheck.Start()
if option.Config.EnableHealthChecking {
d.initHealth()
}
bootstrapStats.healthCheck.End(true)
d.startStatusCollector()
metricsErrs := initMetrics()
d.startAgentHealthHTTPService()
if option.Config.KubeProxyReplacementHealthzBindAddr != "" {
if option.Config.KubeProxyReplacement != option.KubeProxyReplacementDisabled {
d.startKubeProxyHealthzHTTPService(fmt.Sprintf("%s", option.Config.KubeProxyReplacementHealthzBindAddr))
}
}
bootstrapStats.initAPI.Start()
srv := server.NewServer(d.instantiateAPI())
srv.EnabledListeners = []string{"unix"}
srv.SocketPath = option.Config.SocketPath
srv.ReadTimeout = apiTimeout
srv.WriteTimeout = apiTimeout
defer srv.Shutdown()
srv.ConfigureAPI()
bootstrapStats.initAPI.End(true)
err = d.SendNotification(monitorAPI.StartMessage(time.Now()))
if err != nil {
log.WithError(err).Warn("Failed to send agent start monitor message")
}
if !d.datapath.Node().NodeNeighDiscoveryEnabled() {
// Remove all non-GC'ed neighbor entries that might have previously set
// by a Cilium instance.
d.datapath.Node().NodeCleanNeighbors(false)
} else {
// If we came from an agent upgrade, migrate entries.
d.datapath.Node().NodeCleanNeighbors(true)
// Start periodical refresh of the neighbor table from the agent if needed.
if option.Config.ARPPingRefreshPeriod != 0 && !option.Config.ARPPingKernelManaged {
d.nodeDiscovery.Manager.StartNeighborRefresh(d.datapath.Node())
}
}
log.WithField("bootstrapTime", time.Since(bootstrapTimestamp)).
Info("Daemon initialization completed")
if option.Config.WriteCNIConfigurationWhenReady != "" {
input, err := os.ReadFile(option.Config.ReadCNIConfiguration)
if err != nil {
log.WithError(err).Fatal("Unable to read CNI configuration file")
}
if err = os.WriteFile(option.Config.WriteCNIConfigurationWhenReady, input, 0644); err != nil {
log.WithError(err).Fatalf("Unable to write CNI configuration file to %s", option.Config.WriteCNIConfigurationWhenReady)
} else {
log.Infof("Wrote CNI configuration file to %s", option.Config.WriteCNIConfigurationWhenReady)
}
}
errs := make(chan error, 1)
go func() {
errs <- srv.Serve()
}()
bootstrapStats.overall.End(true)
bootstrapStats.updateMetrics()
go d.launchHubble()
err = option.Config.StoreInFile(option.Config.StateDir)
if err != nil {
log.WithError(err).Error("Unable to store Cilium's configuration")
}
err = option.StoreViperInFile(option.Config.StateDir)
if err != nil {
log.WithError(err).Error("Unable to store Viper's configuration")
}
select {
case err := <-metricsErrs:
if err != nil {
log.WithError(err).Fatal("Cannot start metrics server")
}
case err := <-errs:
if err != nil {
log.WithError(err).Fatal("Error returned from non-returning Serve() call")
}
}
}
// daemon/cmd/daemon.go
func NewDaemon(ctx context.Context, cancel context.CancelFunc, epMgr *endpointmanager.EndpointManager, dp datapath.Datapath) (*Daemon, *endpointRestoreState, error) {
...
d := Daemon{
ctx: ctx,
cancel: cancel,
prefixLengths: createPrefixLengthCounter(),
buildEndpointSem: semaphore.NewWeighted(int64(numWorkerThreads())),
compilationMutex: new(lock.RWMutex),
netConf: netConf,
mtuConfig: mtuConfig,
datapath: dp,
deviceManager: NewDeviceManager(),
nodeDiscovery: nd,
endpointCreations: newEndpointCreationManager(),
apiLimiterSet: apiLimiterSet,
}
...
d.identityAllocator = NewCachingIdentityAllocator(&d)
// 初始化daemon核心组件policy
if err := d.initPolicy(epMgr); err != nil {
return nil, nil, fmt.Errorf("error while initializing policy subsystem: %w", err)
}
nodeMngr = nodeMngr.WithSelectorCacheUpdater(d.policy.GetSelectorCache()) // must be after initPolicy
nodeMngr = nodeMngr.WithPolicyTriggerer(d.policyUpdater)
...
d.k8sWatcher = watchers.NewK8sWatcher(
d.endpointManager,
d.nodeDiscovery.Manager,
&d,
d.policy,
d.svc,
d.datapath,
d.redirectPolicyManager,
d.bgpSpeaker,
d.egressGatewayManager,
option.Config,
)
nd.RegisterK8sNodeGetter(d.k8sWatcher)
ipcache.IPIdentityCache.RegisterK8sSyncedChecker(&d)
...
if k8s.IsEnabled() {
bootstrapStats.k8sInit.Start()
// Errors are handled inside WaitForCRDsToRegister. It will fatal on a
// context deadline or if the context has been cancelled, the context's
// error will be returned. Otherwise, it succeeded.
if err := d.k8sWatcher.WaitForCRDsToRegister(d.ctx); err != nil {
return nil, restoredEndpoints, err
}
// Launch the K8s node watcher so we can start receiving node events.
// Launching the k8s node watcher at this stage will prevent all agents
// from performing Gets directly into kube-apiserver to get the most up
// to date version of the k8s node. This allows for better scalability
// in large clusters.
d.k8sWatcher.NodesInit(k8s.Client())
if option.Config.IPAM == ipamOption.IPAMClusterPool {
// Create the CiliumNode custom resource. This call will block until
// the custom resource has been created
d.nodeDiscovery.UpdateCiliumNodeResource()
}
if err := k8s.WaitForNodeInformation(d.ctx, d.k8sWatcher); err != nil {
log.WithError(err).Error("unable to connect to get node spec from apiserver")
return nil, nil, fmt.Errorf("unable to connect to get node spec from apiserver: %w", err)
}
// Kubernetes demands that the localhost can always reach local
// pods. Therefore unless the AllowLocalhost policy is set to a
// specific mode, always allow localhost to reach local
// endpoints.
if option.Config.AllowLocalhost == option.AllowLocalhostAuto {
option.Config.AllowLocalhost = option.AllowLocalhostAlways
log.Info("k8s mode: Allowing localhost to reach local endpoints")
}
bootstrapStats.k8sInit.End(true)
}
...
// 初始化k8s相关配置
if k8s.IsEnabled() {
bootstrapStats.k8sInit.Start()
// Initialize d.k8sCachesSynced before any k8s watchers are alive, as they may
// access it to check the status of k8s initialization
cachesSynced := make(chan struct{})
d.k8sCachesSynced = cachesSynced
// 初始换k8s相关核心资源以及cilium资源并开始watch资源修改
d.k8sWatcher.InitK8sSubsystem(d.ctx, cachesSynced)
bootstrapStats.k8sInit.End(true)
}
...
}k8s资源初始化
以CNP创建事件为例进行分析
datapath loader初始化
daemon组件policy repository初始化
Last updated