Cilium源码系列一:Cilium Network Policy

daemon初始化

// daemon/cmd/daemon_main.go
var (
	log = logging.DefaultLogger.WithField(logfields.LogSubsys, daemonSubsys)

	bootstrapTimestamp = time.Now()

	// RootCmd represents the base command when called without any subcommands
	RootCmd = &cobra.Command{
		Use:   "cilium-agent",
		Short: "Run the cilium agent",
		Run: func(cmd *cobra.Command, args []string) {
			cmdRefDir := viper.GetString(option.CMDRef)
			if cmdRefDir != "" {
				genMarkdown(cmd, cmdRefDir)
				os.Exit(0)
			}

			// Open socket for using gops to get stacktraces of the agent.
			addr := fmt.Sprintf("127.0.0.1:%d", viper.GetInt(option.GopsPort))
			addrField := logrus.Fields{"address": addr}
			if err := gops.Listen(gops.Options{
				Addr:                   addr,
				ReuseSocketAddrAndPort: true,
			}); err != nil {
				log.WithError(err).WithFields(addrField).Fatal("Cannot start gops server")
			}
			log.WithFields(addrField).Info("Started gops server")

			bootstrapStats.earlyInit.Start()
			initEnv(cmd)
			bootstrapStats.earlyInit.End(true)
			runDaemon()
		},
	}

	bootstrapStats = bootstrapStatistics{}
)

func runDaemon() {
	datapathConfig := linuxdatapath.DatapathConfiguration{
		HostDevice: option.Config.HostDevice,
	}

	log.Info("Initializing daemon")

	option.Config.RunMonitorAgent = true

	if err := enableIPForwarding(); err != nil {
		log.WithError(err).Fatal("Error when enabling sysctl parameters")
	}

	iptablesManager := &iptables.IptablesManager{}
	iptablesManager.Init()

	var wgAgent *wireguard.Agent
	if option.Config.EnableWireguard {
		switch {
		case option.Config.EnableIPSec:
			log.Fatalf("Wireguard (--%s) cannot be used with IPSec (--%s)",
				option.EnableWireguard, option.EnableIPSecName)
		case option.Config.EnableL7Proxy:
			log.Fatalf("Wireguard (--%s) is not compatible with L7 proxy (--%s)",
				option.EnableWireguard, option.EnableL7Proxy)
		}

		var err error
		privateKeyPath := filepath.Join(option.Config.StateDir, wireguardTypes.PrivKeyFilename)
		wgAgent, err = wireguard.NewAgent(privateKeyPath)
		if err != nil {
			log.WithError(err).Fatal("Failed to initialize wireguard")
		}

		cleaner.cleanupFuncs.Add(func() {
			_ = wgAgent.Close()
		})
	} else {
		// Delete wireguard device from previous run (if such exists)
		link.DeleteByName(wireguardTypes.IfaceName)
	}

	if k8s.IsEnabled() {
		bootstrapStats.k8sInit.Start()
		if err := k8s.Init(option.Config); err != nil {
			log.WithError(err).Fatal("Unable to initialize Kubernetes subsystem")
		}
		bootstrapStats.k8sInit.End(true)
	}

    // 实例化daemon
	ctx, cancel := context.WithCancel(server.ServerCtx)
	d, restoredEndpoints, err := NewDaemon(ctx, cancel,
		WithDefaultEndpointManager(ctx, endpoint.CheckHealth),
		linuxdatapath.NewDatapath(datapathConfig, iptablesManager, wgAgent))
	if err != nil {
		select {
		case <-server.ServerCtx.Done():
			log.WithError(err).Debug("Error while creating daemon")
		default:
			log.WithError(err).Fatal("Error while creating daemon")
		}
		return
	}

	// This validation needs to be done outside of the agent until
	// datapath.NodeAddressing is used consistently across the code base.
	log.Info("Validating configured node address ranges")
	if err := node.ValidatePostInit(); err != nil {
		log.WithError(err).Fatal("postinit failed")
	}

	bootstrapStats.enableConntrack.Start()
	log.Info("Starting connection tracking garbage collector")
	gc.Enable(option.Config.EnableIPv4, option.Config.EnableIPv6,
		restoredEndpoints.restored, d.endpointManager)
	bootstrapStats.enableConntrack.End(true)

	bootstrapStats.k8sInit.Start()
	if k8s.IsEnabled() {
		// Wait only for certain caches, but not all!
		// (Check Daemon.InitK8sSubsystem() for more info)
		<-d.k8sCachesSynced
	}
	bootstrapStats.k8sInit.End(true)
	restoreComplete := d.initRestore(restoredEndpoints)
	if wgAgent != nil {
		if err := wgAgent.RestoreFinished(); err != nil {
			log.WithError(err).Error("Failed to set up wireguard peers")
		}
	}

	if d.endpointManager.HostEndpointExists() {
		d.endpointManager.InitHostEndpointLabels(d.ctx)
	} else {
		log.Info("Creating host endpoint")
		if err := d.endpointManager.AddHostEndpoint(
			d.ctx, d, d, d.l7Proxy, d.identityAllocator,
			"Create host endpoint", nodeTypes.GetName(),
		); err != nil {
			log.WithError(err).Fatal("Unable to create host endpoint")
		}
	}

	if option.Config.EnableIPMasqAgent {
		ipmasqAgent, err := ipmasq.NewIPMasqAgent(option.Config.IPMasqAgentConfigPath)
		if err != nil {
			log.WithError(err).Fatal("Failed to create ip-masq-agent")
		}
		ipmasqAgent.Start()
	}

	if !option.Config.DryMode {
		go func() {
			if restoreComplete != nil {
				<-restoreComplete
			}
			d.dnsNameManager.CompleteBootstrap()

			ms := maps.NewMapSweeper(&EndpointMapManager{
				EndpointManager: d.endpointManager,
			})
			ms.CollectStaleMapGarbage()
			ms.RemoveDisabledMaps()

			if len(d.restoredCIDRs) > 0 {
				// Release restored CIDR identities after a grace period (default 10
				// minutes).  Any identities actually in use will still exist after
				// this.
				//
				// This grace period is needed when running on an external workload
				// where policy synchronization is not done via k8s. Also in k8s
				// case it is prudent to allow concurrent endpoint regenerations to
				// (re-)allocate the restored identities before we release them.
				time.Sleep(option.Config.IdentityRestoreGracePeriod)
				log.Debugf("Releasing reference counts for %d restored CIDR identities", len(d.restoredCIDRs))

				ipcache.ReleaseCIDRIdentitiesByCIDR(d.restoredCIDRs)
				// release the memory held by restored CIDRs
				d.restoredCIDRs = nil
			}
		}()
		d.endpointManager.Subscribe(d)
		defer d.endpointManager.Unsubscribe(d)
	}

	// Migrating the ENI datapath must happen before the API is served to
	// prevent endpoints from being created. It also must be before the health
	// initialization logic which creates the health endpoint, for the same
	// reasons as the API being served. We want to ensure that this migration
	// logic runs before any endpoint creates.
	if option.Config.IPAM == ipamOption.IPAMENI {
		migrated, failed := linuxrouting.NewMigrator(
			&eni.InterfaceDB{},
		).MigrateENIDatapath(option.Config.EgressMultiHomeIPRuleCompat)
		switch {
		case failed == -1:
			// No need to handle this case specifically because it is handled
			// in the call already.
		case migrated >= 0 && failed > 0:
			log.Errorf("Failed to migrate ENI datapath. "+
				"%d endpoints were successfully migrated and %d failed to migrate completely. "+
				"The original datapath is still in-place, however it is recommended to retry the migration.",
				migrated, failed)

		case migrated >= 0 && failed == 0:
			log.Infof("Migration of ENI datapath successful, %d endpoints were migrated and none failed.",
				migrated)
		}
	}

	bootstrapStats.healthCheck.Start()
	if option.Config.EnableHealthChecking {
		d.initHealth()
	}
	bootstrapStats.healthCheck.End(true)

	d.startStatusCollector()

	metricsErrs := initMetrics()

	d.startAgentHealthHTTPService()
	if option.Config.KubeProxyReplacementHealthzBindAddr != "" {
		if option.Config.KubeProxyReplacement != option.KubeProxyReplacementDisabled {
			d.startKubeProxyHealthzHTTPService(fmt.Sprintf("%s", option.Config.KubeProxyReplacementHealthzBindAddr))
		}
	}

	bootstrapStats.initAPI.Start()
	srv := server.NewServer(d.instantiateAPI())
	srv.EnabledListeners = []string{"unix"}
	srv.SocketPath = option.Config.SocketPath
	srv.ReadTimeout = apiTimeout
	srv.WriteTimeout = apiTimeout
	defer srv.Shutdown()

	srv.ConfigureAPI()
	bootstrapStats.initAPI.End(true)

	err = d.SendNotification(monitorAPI.StartMessage(time.Now()))
	if err != nil {
		log.WithError(err).Warn("Failed to send agent start monitor message")
	}

	if !d.datapath.Node().NodeNeighDiscoveryEnabled() {
		// Remove all non-GC'ed neighbor entries that might have previously set
		// by a Cilium instance.
		d.datapath.Node().NodeCleanNeighbors(false)
	} else {
		// If we came from an agent upgrade, migrate entries.
		d.datapath.Node().NodeCleanNeighbors(true)
		// Start periodical refresh of the neighbor table from the agent if needed.
		if option.Config.ARPPingRefreshPeriod != 0 && !option.Config.ARPPingKernelManaged {
			d.nodeDiscovery.Manager.StartNeighborRefresh(d.datapath.Node())
		}
	}

	log.WithField("bootstrapTime", time.Since(bootstrapTimestamp)).
		Info("Daemon initialization completed")

	if option.Config.WriteCNIConfigurationWhenReady != "" {
		input, err := os.ReadFile(option.Config.ReadCNIConfiguration)
		if err != nil {
			log.WithError(err).Fatal("Unable to read CNI configuration file")
		}

		if err = os.WriteFile(option.Config.WriteCNIConfigurationWhenReady, input, 0644); err != nil {
			log.WithError(err).Fatalf("Unable to write CNI configuration file to %s", option.Config.WriteCNIConfigurationWhenReady)
		} else {
			log.Infof("Wrote CNI configuration file to %s", option.Config.WriteCNIConfigurationWhenReady)
		}
	}

	errs := make(chan error, 1)

	go func() {
		errs <- srv.Serve()
	}()

	bootstrapStats.overall.End(true)
	bootstrapStats.updateMetrics()
	go d.launchHubble()

	err = option.Config.StoreInFile(option.Config.StateDir)
	if err != nil {
		log.WithError(err).Error("Unable to store Cilium's configuration")
	}

	err = option.StoreViperInFile(option.Config.StateDir)
	if err != nil {
		log.WithError(err).Error("Unable to store Viper's configuration")
	}

	select {
	case err := <-metricsErrs:
		if err != nil {
			log.WithError(err).Fatal("Cannot start metrics server")
		}
	case err := <-errs:
		if err != nil {
			log.WithError(err).Fatal("Error returned from non-returning Serve() call")
		}
	}
}

// daemon/cmd/daemon.go
func NewDaemon(ctx context.Context, cancel context.CancelFunc, epMgr *endpointmanager.EndpointManager, dp datapath.Datapath) (*Daemon, *endpointRestoreState, error) {
    ...
    
   	d := Daemon{
		ctx:               ctx,
		cancel:            cancel,
		prefixLengths:     createPrefixLengthCounter(),
		buildEndpointSem:  semaphore.NewWeighted(int64(numWorkerThreads())),
		compilationMutex:  new(lock.RWMutex),
		netConf:           netConf,
		mtuConfig:         mtuConfig,
		datapath:          dp,
		deviceManager:     NewDeviceManager(),
		nodeDiscovery:     nd,
		endpointCreations: newEndpointCreationManager(),
		apiLimiterSet:     apiLimiterSet,
	}
    ...
    d.identityAllocator = NewCachingIdentityAllocator(&d)
    // 初始化daemon核心组件policy
	if err := d.initPolicy(epMgr); err != nil {
		return nil, nil, fmt.Errorf("error while initializing policy subsystem: %w", err)
	}
	nodeMngr = nodeMngr.WithSelectorCacheUpdater(d.policy.GetSelectorCache()) // must be after initPolicy
	nodeMngr = nodeMngr.WithPolicyTriggerer(d.policyUpdater) 
    ...
    
    d.k8sWatcher = watchers.NewK8sWatcher(
		d.endpointManager,
		d.nodeDiscovery.Manager,
		&d,
		d.policy,
		d.svc,
		d.datapath,
		d.redirectPolicyManager,
		d.bgpSpeaker,
		d.egressGatewayManager,
		option.Config,
	)
	nd.RegisterK8sNodeGetter(d.k8sWatcher)
	ipcache.IPIdentityCache.RegisterK8sSyncedChecker(&d)
    ...
    
    if k8s.IsEnabled() {
		bootstrapStats.k8sInit.Start()
		// Errors are handled inside WaitForCRDsToRegister. It will fatal on a
		// context deadline or if the context has been cancelled, the context's
		// error will be returned. Otherwise, it succeeded.
		if err := d.k8sWatcher.WaitForCRDsToRegister(d.ctx); err != nil {
			return nil, restoredEndpoints, err
		}

		// Launch the K8s node watcher so we can start receiving node events.
		// Launching the k8s node watcher at this stage will prevent all agents
		// from performing Gets directly into kube-apiserver to get the most up
		// to date version of the k8s node. This allows for better scalability
		// in large clusters.
		d.k8sWatcher.NodesInit(k8s.Client())

		if option.Config.IPAM == ipamOption.IPAMClusterPool {
			// Create the CiliumNode custom resource. This call will block until
			// the custom resource has been created
			d.nodeDiscovery.UpdateCiliumNodeResource()
		}

		if err := k8s.WaitForNodeInformation(d.ctx, d.k8sWatcher); err != nil {
			log.WithError(err).Error("unable to connect to get node spec from apiserver")
			return nil, nil, fmt.Errorf("unable to connect to get node spec from apiserver: %w", err)
		}

		// Kubernetes demands that the localhost can always reach local
		// pods. Therefore unless the AllowLocalhost policy is set to a
		// specific mode, always allow localhost to reach local
		// endpoints.
		if option.Config.AllowLocalhost == option.AllowLocalhostAuto {
			option.Config.AllowLocalhost = option.AllowLocalhostAlways
			log.Info("k8s mode: Allowing localhost to reach local endpoints")
		}

		bootstrapStats.k8sInit.End(true)
	}
    ...
    
    // 初始化k8s相关配置
    if k8s.IsEnabled() {
		bootstrapStats.k8sInit.Start()

		// Initialize d.k8sCachesSynced before any k8s watchers are alive, as they may
		// access it to check the status of k8s initialization
		cachesSynced := make(chan struct{})
		d.k8sCachesSynced = cachesSynced

        // 初始换k8s相关核心资源以及cilium资源并开始watch资源修改
		d.k8sWatcher.InitK8sSubsystem(d.ctx, cachesSynced)
		bootstrapStats.k8sInit.End(true)
	}
    ...
}

k8s资源初始化

// pkg/k8s/watchers/watcher.go
func (k *K8sWatcher) InitK8sSubsystem(ctx context.Context, cachesSynced chan struct{}) {
	// 注册所有需要watch的资源
	resources := k.resourceGroups()
	// 启动informer去watch所有注册的资源
	if err := k.EnableK8sWatcher(ctx, resources); err != nil {
		if !errors.Is(err, context.Canceled) {
			log.WithError(err).Fatal("Unable to start K8s watchers for Cilium")
		}
		// If the context was canceled it means the daemon is being stopped
		return
	}

	...
}

func (k *K8sWatcher) EnableK8sWatcher(ctx context.Context, resources []string) error {
	...
	ciliumNPClient := k8s.CiliumClient()
	asyncControllers := &sync.WaitGroup{}

	serviceOptModifier, err := utils.GetServiceListOptionsModifier(k.cfg)
	if err != nil {
		return fmt.Errorf("error creating service list option modifier: %w", err)
	}
	// 包含所有资源informer的创建和启动
	for _, r := range resources {
		switch r {
		// Core Cilium
		case K8sAPIGroupPodV1Core:
			asyncControllers.Add(1)
			go k.podsInit(k8s.WatcherClient(), asyncControllers)
		case k8sAPIGroupNodeV1Core:
			k.NodesInit(k8s.Client())
		case k8sAPIGroupNamespaceV1Core:
			asyncControllers.Add(1)
			go k.namespacesInit(k8s.WatcherClient(), asyncControllers)
		case k8sAPIGroupCiliumNodeV2:
			asyncControllers.Add(1)
			go k.ciliumNodeInit(ciliumNPClient, asyncControllers)
		// Kubernetes built-in resources
		case k8sAPIGroupNetworkingV1Core:
			swgKNP := lock.NewStoppableWaitGroup()
			k.networkPoliciesInit(k8s.WatcherClient(), swgKNP)
		case K8sAPIGroupServiceV1Core:
			swgSvcs := lock.NewStoppableWaitGroup()
			k.servicesInit(k8s.WatcherClient(), swgSvcs, serviceOptModifier)
		case K8sAPIGroupEndpointSliceV1Beta1Discovery:
			// no-op; handled in K8sAPIGroupEndpointV1Core.
		case K8sAPIGroupEndpointSliceV1Discovery:
			// no-op; handled in K8sAPIGroupEndpointV1Core.
		case K8sAPIGroupEndpointV1Core:
			k.initEndpointsOrSlices(k8s.WatcherClient(), serviceOptModifier)
		// Custom resource definitions
        // cilium network policy的informer
		case k8sAPIGroupCiliumNetworkPolicyV2:
			k.ciliumNetworkPoliciesInit(ciliumNPClient)
		case k8sAPIGroupCiliumClusterwideNetworkPolicyV2:
			k.ciliumClusterwideNetworkPoliciesInit(ciliumNPClient)
		case k8sAPIGroupCiliumEndpointV2:
			k.initCiliumEndpointOrSlices(ciliumNPClient, asyncControllers)
		case k8sAPIGroupCiliumEndpointSliceV2Alpha1:
			// no-op; handled in k8sAPIGroupCiliumEndpointV2
		case k8sAPIGroupCiliumLocalRedirectPolicyV2:
			k.ciliumLocalRedirectPolicyInit(ciliumNPClient)
		case k8sAPIGroupCiliumEgressNATPolicyV2:
			k.ciliumEgressNATPolicyInit(ciliumNPClient)
		default:
			log.WithFields(logrus.Fields{
				logfields.Resource: r,
			}).Fatal("Not listening for Kubernetes resource updates for unhandled type")
		}
	}

	asyncControllers.Wait()
	close(k.controllersStarted)

	return nil
}
// pkg/k8s/watchers/cilium_network_policy.go
// cilium network policy资源的controller初始化和启动
func (k *K8sWatcher) ciliumNetworkPoliciesInit(ciliumNPClient *k8s.K8sCiliumClient) {
	cnpStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)

	ciliumV2Controller := informer.NewInformerWithStore(
		cache.NewListWatchFromClient(ciliumNPClient.CiliumV2().RESTClient(),
			cilium_v2.CNPPluralName, v1.NamespaceAll, fields.Everything()),
		&cilium_v2.CiliumNetworkPolicy{},
		0,
		cache.ResourceEventHandlerFuncs{
			AddFunc: func(obj interface{}) {
				var valid, equal bool
				defer func() { k.K8sEventReceived(metricCNP, metricCreate, valid, equal) }()
				if cnp := k8s.ObjToSlimCNP(obj); cnp != nil {
					valid = true
					if cnp.RequiresDerivative() {
						return
					}

					// We need to deepcopy this structure because we are writing
					// fields.
					// See https://github.com/cilium/cilium/blob/27fee207f5422c95479422162e9ea0d2f2b6c770/pkg/policy/api/ingress.go#L112-L134
					cnpCpy := cnp.DeepCopy()
					// 处理cnp数据
					err := k.addCiliumNetworkPolicyV2(ciliumNPClient, cnpCpy)
                    // 事件统计
					k.K8sEventProcessed(metricCNP, metricCreate, err == nil)
				}
			},
			UpdateFunc: func(oldObj, newObj interface{}) {
				var valid, equal bool
				defer func() { k.K8sEventReceived(metricCNP, metricUpdate, valid, equal) }()
				if oldCNP := k8s.ObjToSlimCNP(oldObj); oldCNP != nil {
					if newCNP := k8s.ObjToSlimCNP(newObj); newCNP != nil {
						valid = true
						if oldCNP.DeepEqual(newCNP) {
							equal = true
							return
						}

						if newCNP.RequiresDerivative() {
							return
						}

						// We need to deepcopy this structure because we are writing
						// fields.
						// See https://github.com/cilium/cilium/blob/27fee207f5422c95479422162e9ea0d2f2b6c770/pkg/policy/api/ingress.go#L112-L134
						oldCNPCpy := oldCNP.DeepCopy()
						newCNPCpy := newCNP.DeepCopy()

						err := k.updateCiliumNetworkPolicyV2(ciliumNPClient, oldCNPCpy, newCNPCpy)
						k.K8sEventProcessed(metricCNP, metricUpdate, err == nil)
					}
				}
			},
			DeleteFunc: func(obj interface{}) {
				var valid, equal bool
				defer func() { k.K8sEventReceived(metricCNP, metricDelete, valid, equal) }()
				cnp := k8s.ObjToSlimCNP(obj)
				if cnp == nil {
					return
				}
				valid = true
				err := k.deleteCiliumNetworkPolicyV2(cnp)
				k.K8sEventProcessed(metricCNP, metricDelete, err == nil)
			},
		},
		k8s.ConvertToCNP,
		cnpStore,
	)

	k.blockWaitGroupToSyncResources(wait.NeverStop, nil, ciliumV2Controller.HasSynced, k8sAPIGroupCiliumNetworkPolicyV2)
	go ciliumV2Controller.Run(wait.NeverStop)
	k.k8sAPIGroups.AddAPI(k8sAPIGroupCiliumNetworkPolicyV2)
}

以CNP创建事件为例进行分析

// pkg/k8s/watchers/cilium_network_policy.go
func (k *K8sWatcher) addCiliumNetworkPolicyV2(ciliumNPClient clientset.Interface, cnp *types.SlimCNP) error {
	...
    // 解析cnp并返回api.Rules列表
	var rev uint64
	rules, policyImportErr := cnp.Parse()
	if policyImportErr == nil {
		policyImportErr = k8s.PreprocessRules(rules, &k.K8sSvcCache)
		// Replace all rules with the same name, namespace and
		// resourceTypeCiliumNetworkPolicy
		if policyImportErr == nil {
            // 添加rules到policy repository队列,用于分发给daemon
			rev, policyImportErr = k.policyManager.PolicyAdd(rules, &policy.AddOptions{
				ReplaceWithLabels: cnp.GetIdentityLabels(),
				Source:            metrics.LabelEventSourceK8s,
			})
		}
	}

	if policyImportErr != nil {
		metrics.PolicyImportErrorsTotal.Inc()
		scopedLog.WithError(policyImportErr).Warn("Unable to add CiliumNetworkPolicy")
	} else {
		scopedLog.Info("Imported CiliumNetworkPolicy")
	}

	// Upsert to rule revision cache outside of controller, because upsertion
	// *must* be synchronous so that if we get an update for the CNP, the cache
	// is populated by the time updateCiliumNetworkPolicyV2 is invoked.
	importMetadataCache.upsert(cnp, rev, policyImportErr)

	if !option.Config.DisableCNPStatusUpdates {
		updateContext := &k8s.CNPStatusUpdateContext{
			CiliumNPClient:              ciliumNPClient,
			NodeName:                    nodeTypes.GetName(),
			NodeManager:                 k.nodeDiscoverManager,
			UpdateDuration:              spanstat.Start(),
			WaitForEndpointsAtPolicyRev: k.endpointManager.WaitForEndpointsAtPolicyRev,
		}

		ctrlName := cnp.GetControllerName()
		k8sCM.UpdateController(ctrlName,
			controller.ControllerParams{
				DoFunc: func(ctx context.Context) error {
					return updateContext.UpdateStatus(ctx, cnp, rev, policyImportErr)
				},
			},
		)
	}

	return policyImportErr
}
// daemon/cmd/policy.go
// rules列表添加到policy repository的RepositoryChangeQueue队列
func (d *Daemon) PolicyAdd(rules policyAPI.Rules, opts *policy.AddOptions) (newRev uint64, err error) {
	p := &PolicyAddEvent{
		rules: rules,
		opts:  opts,
		d:     d,
	}
	polAddEvent := eventqueue.NewEvent(p)
    // 入队改rules的事件对象
	resChan, err := d.policy.RepositoryChangeQueue.Enqueue(polAddEvent)
	if err != nil {
		return 0, fmt.Errorf("enqueue of PolicyAddEvent failed: %s", err)
	}

	res, ok := <-resChan
	if ok {
		pRes := res.(*PolicyAddResult)
		return pRes.newRev, pRes.err
	}
	return 0, fmt.Errorf("policy addition event was cancelled")
}

datapath loader初始化

// daemon/cmd/daemon_main.go
func runDaemon() {
	d, restoredEndpoints, err := NewDaemon(ctx, cancel,
		WithDefaultEndpointManager(ctx, endpoint.CheckHealth),
		linuxdatapath.NewDatapath(datapathConfig, iptablesManager, wgAgent))
}
-->
// pkg/datapath/linux/datapath.go
func NewDatapath(cfg DatapathConfiguration, ruleManager datapath.IptablesManager, wgAgent datapath.WireguardAgent) datapath.Datapath {
	dp := &linuxDatapath{
		ConfigWriter:    &config.HeaderfileWriter{},
		IptablesManager: ruleManager,
		nodeAddressing:  NewNodeAddressing(),
		config:          cfg,
		loader:          loader.NewLoader(canDisableDwarfRelocations),
		wgAgent:         wgAgent,
	}

	dp.node = NewNodeHandler(cfg, dp.nodeAddressing, wgAgent)
	return dp
}
-->
func (l *linuxDatapath) Loader() datapath.Loader {
	return l.loader
}
// pkg/datapath/loader/base.go
func (l *Loader) Reinitialize(ctx context.Context, o datapath.BaseProgramOwner, deviceMTU int, iptMgr datapath.IptablesManager, p datapath.Proxy) error {
}

daemon组件policy repository初始化

// daemon/cmd/policy.go
func (d *Daemon) initPolicy(epMgr *endpointmanager.EndpointManager) error {
	// Reuse policy.TriggerMetrics and PolicyTriggerInterval here since
	// this is only triggered by agent configuration changes for now and
	// should be counted in pol.TriggerMetrics.
	rt, err := trigger.NewTrigger(trigger.Parameters{
		Name:            "datapath-regeneration",
		MetricsObserver: &policy.TriggerMetrics{},
		MinInterval:     option.Config.PolicyTriggerInterval,
		TriggerFunc:     d.datapathRegen,
	})
	if err != nil {
		return fmt.Errorf("failed to create datapath regeneration trigger: %w", err)
	}
	d.datapathRegenTrigger = rt

    // 初始换policy组件
	d.policy = policy.NewPolicyRepository(d.identityAllocator,
		d.identityAllocator.GetIdentityCache(),
		certificatemanager.NewManager(option.Config.CertDirectory, k8s.Client()))
	d.policy.SetEnvoyRulesFunc(envoy.GetEnvoyHTTPRules)
	d.policyUpdater, err = policy.NewUpdater(d.policy, epMgr)
	if err != nil {
		return fmt.Errorf("failed to create policy update trigger: %w", err)
	}

	return nil
}
// daemon/cmd/policy.go
// 创建policy repository实例
func NewPolicyRepository(idAllocator cache.IdentityAllocator, idCache cache.IdentityCache, certManager CertificateManager) *Repository {
    // 实例化repoChangeQueue、ruleReactionQueue队列并启动
	repoChangeQueue := eventqueue.NewEventQueueBuffered("repository-change-queue", option.Config.PolicyQueueSize)
	ruleReactionQueue := eventqueue.NewEventQueueBuffered("repository-reaction-queue", option.Config.PolicyQueueSize)
	repoChangeQueue.Run()
	ruleReactionQueue.Run()
	selectorCache := NewSelectorCache(idAllocator, idCache)

	repo := &Repository{
		revision:              1,
		RepositoryChangeQueue: repoChangeQueue,
		RuleReactionQueue:     ruleReactionQueue,
		selectorCache:         selectorCache,
		certManager:           certManager,
	}
	repo.policyCache = NewPolicyCache(repo, true)
	return repo
}

// pkg/eventqueue/eventqueue.go
// 队列启动
func (q *EventQueue) Run() {
	if q.notSafeToAccess() {
		return
	}

	go q.run()
}

func (q *EventQueue) run() {
	q.eventQueueOnce.Do(func() {
		defer close(q.eventsClosed)
		for ev := range q.events {
			select {
			case <-q.drain:
				ev.stats.waitConsumeOffQueue.End(false)
				close(ev.cancelled)
				close(ev.eventResults)
				ev.printStats(q)
			default:
				ev.stats.waitConsumeOffQueue.End(true)
				ev.stats.durationStat.Start()
                // 消费队列数据
				ev.Metadata.Handle(ev.eventResults)
				// Always indicate success for now.
				ev.stats.durationStat.End(true)
				// Ensures that no more results can be sent as the event has
				// already been processed.
				ev.printStats(q)
				close(ev.eventResults)
			}
		}
	})
}

// 处理repoChangeQueue队列数据
func (p *PolicyAddEvent) Handle(res chan interface{}) {
	p.d.policyAdd(p.rules, p.opts, res)
}

// rules规则最终通过该函数通知到所有locally endpoint managed
func (d *Daemon) policyAdd(sourceRules policyAPI.Rules, opts *policy.AddOptions, resChan chan interface{}) {
	policyAddStartTime := time.Now()
	logger := log.WithField("policyAddRequest", uuid.New().String())

	if opts != nil && opts.Generated {
		logger.WithField(logfields.CiliumNetworkPolicy, sourceRules.String()).Debug("Policy Add Request")
	} else {
		logger.WithField(logfields.CiliumNetworkPolicy, sourceRules.String()).Info("Policy Add Request")
	}
	
    // 解析rules里面各种CIDR,无差别加到slice里面返回
	prefixes := policy.GetCIDRPrefixes(sourceRules)
	logger.WithField("prefixes", prefixes).Debug("Policy imported via API, found CIDR prefixes...")

	newPrefixLengths, err := d.prefixLengths.Add(prefixes)
	if err != nil {
		logger.WithError(err).WithField("prefixes", prefixes).Warn(
			"Failed to reference-count prefix lengths in CIDR policy")
		resChan <- &PolicyAddResult{
			newRev: 0,
			err:    api.Error(PutPolicyFailureCode, err),
		}
		return
	}
    // 判断CIDR是否改变
	if newPrefixLengths && !bpfIPCache.BackedByLPM() {
		// 重新编译并初始化基础程序,具体实现参考datapath部分
		if err := d.Datapath().Loader().Reinitialize(d.ctx, d, d.mtuConfig.GetDeviceMTU(), d.Datapath(), d.l7Proxy); err != nil {
			_ = d.prefixLengths.Delete(prefixes)
			err2 := fmt.Errorf("Unable to recompile base programs: %s", err)
			logger.WithError(err2).WithField("prefixes", prefixes).Warn(
				"Failed to recompile base programs due to prefix length count change")
			resChan <- &PolicyAddResult{
				newRev: 0,
				err:    api.Error(PutPolicyFailureCode, err),
			}
			return
		}
	}

	// Any newly allocated identities MUST be upserted to the ipcache if no error is returned.
	// With SelectiveRegeneration this is postponed to the rule reaction queue to be done
	// after the affected endpoints have been regenerated, otherwise new identities are
	// upserted to the ipcache before we return.
	//
	// Release of these identities will be tied to the corresponding policy
	// in the policy.Repository and released upon policyDelete().
	newlyAllocatedIdentities := make(map[string]*identity.Identity)
	if _, err := ipcache.AllocateCIDRs(prefixes, nil, newlyAllocatedIdentities); err != nil {
		_ = d.prefixLengths.Delete(prefixes)
		logger.WithError(err).WithField("prefixes", prefixes).Warn(
			"Failed to allocate identities for CIDRs during policy add")
		resChan <- &PolicyAddResult{
			newRev: 0,
			err:    err,
		}
		return
	}

	// No errors past this point!

	d.policy.Mutex.Lock()

	// removedPrefixes tracks prefixes that we replace in the rules. It is used
	// after we release the policy repository lock.
	var removedPrefixes []*net.IPNet

	// policySelectionWG is used to signal when the updating of all of the
	// caches of endpoints in the rules which were added / updated have been
	// updated.
	var policySelectionWG sync.WaitGroup

	// Get all endpoints at the time rules were added / updated so we can figure
	// out which endpoints to regenerate / bump policy revision.
	allEndpoints := d.endpointManager.GetPolicyEndpoints()

	// Start with all endpoints to be in set for which we need to bump their
	// revision.
	endpointsToBumpRevision := policy.NewEndpointSet(allEndpoints)

	endpointsToRegen := policy.NewEndpointSet(nil)

	if opts != nil {
		if opts.Replace {
			for _, r := range sourceRules {
				oldRules := d.policy.SearchRLocked(r.Labels)
				removedPrefixes = append(removedPrefixes, policy.GetCIDRPrefixes(oldRules)...)
				if len(oldRules) > 0 {
					deletedRules, _, _ := d.policy.DeleteByLabelsLocked(r.Labels)
					deletedRules.UpdateRulesEndpointsCaches(endpointsToBumpRevision, endpointsToRegen, &policySelectionWG)
				}
			}
		}
		if len(opts.ReplaceWithLabels) > 0 {
			oldRules := d.policy.SearchRLocked(opts.ReplaceWithLabels)
			removedPrefixes = append(removedPrefixes, policy.GetCIDRPrefixes(oldRules)...)
			if len(oldRules) > 0 {
				deletedRules, _, _ := d.policy.DeleteByLabelsLocked(opts.ReplaceWithLabels)
				deletedRules.UpdateRulesEndpointsCaches(endpointsToBumpRevision, endpointsToRegen, &policySelectionWG)
			}
		}
	}

	addedRules, newRev := d.policy.AddListLocked(sourceRules)

	// The information needed by the caller is available at this point, signal
	// accordingly.
	resChan <- &PolicyAddResult{
		newRev: newRev,
		err:    nil,
	}

	addedRules.UpdateRulesEndpointsCaches(endpointsToBumpRevision, endpointsToRegen, &policySelectionWG)

	d.policy.Mutex.Unlock()

	if newPrefixLengths && !bpfIPCache.BackedByLPM() {
		// bpf_host needs to be recompiled whenever CIDR policy changed.
		if hostEp := d.endpointManager.GetHostEndpoint(); hostEp != nil {
			logger.Debug("CIDR policy has changed; regenerating host endpoint")
			endpointsToRegen.Insert(hostEp)
			endpointsToBumpRevision.Delete(hostEp)
		}
	}

	// Begin tracking the time taken to deploy newRev to the datapath. The start
	// time is from before the locking above, and thus includes all waits and
	// processing in this function.
	source := ""
	if opts != nil {
		source = opts.Source
	}
	d.endpointManager.CallbackForEndpointsAtPolicyRev(d.ctx, newRev, func(now time.Time) {
		duration, _ := safetime.TimeSinceSafe(policyAddStartTime, logger)
		metrics.PolicyImplementationDelay.WithLabelValues(source).Observe(duration.Seconds())
	})

	// remove prefixes of replaced rules above. Refcounts have been incremented
	// above, so any decrements here will be no-ops for CIDRs that are re-added,
	// and will trigger deletions for those that are no longer used.
	if len(removedPrefixes) > 0 {
		logger.WithField("prefixes", removedPrefixes).Debug("Decrementing replaced CIDR refcounts when adding rules")
		ipcache.ReleaseCIDRIdentitiesByCIDR(removedPrefixes)
		d.prefixLengths.Delete(removedPrefixes)
	}

	logger.WithField(logfields.PolicyRevision, newRev).Info("Policy imported via API, recalculating...")

	labels := make([]string, 0, len(sourceRules))
	for _, r := range sourceRules {
		labels = append(labels, r.Labels.GetModel()...)
	}
	err = d.SendNotification(monitorAPI.PolicyUpdateMessage(len(sourceRules), labels, newRev))
	if err != nil {
		logger.WithError(err).WithField(logfields.PolicyRevision, newRev).Warn("Failed to send policy update as monitor notification")
	}

	if option.Config.SelectiveRegeneration {
		// Only regenerate endpoints which are needed to be regenerated as a
		// result of the rule update. The rules which were imported most likely
		// do not select all endpoints in the policy repository (and may not
		// select any at all). The "reacting" to rule updates enqueues events
		// for all endpoints. Once all endpoints have events queued up, this
		// function will return.
		//
		// With selective regeneration upserting CIDRs to ipcache is performed after
		// endpoint regeneration and serialized with the corresponding ipcache deletes via
		// the policy reaction queue.
		r := &PolicyReactionEvent{
			wg:                &policySelectionWG,
			epsToBumpRevision: endpointsToBumpRevision,
			endpointsToRegen:  endpointsToRegen,
			newRev:            newRev,
			upsertIdentities:  newlyAllocatedIdentities,
		}

		ev := eventqueue.NewEvent(r)
		// This event may block if the RuleReactionQueue is full. We don't care
		// about when it finishes, just that the work it does is done in a serial
		// order.
		_, err := d.policy.RuleReactionQueue.Enqueue(ev)
		if err != nil {
			log.WithError(err).WithField(logfields.PolicyRevision, newRev).Error("enqueue of RuleReactionEvent failed")
		}
	} else {
		// Regenerate all endpoints unconditionally.
		d.TriggerPolicyUpdates(false, "policy rules added")
		// TODO: Remove 'enable-selective-regeneration' agent option.  Without selective
		// regeneration we retain the old behavior of upserting new identities to ipcache
		// before endpoint policy maps have been updated.
		ipcache.UpsertGeneratedIdentities(newlyAllocatedIdentities)
	}

	return
}

// 处理ruleReactionQueue队列数据
func (r *PolicyReactionEvent) Handle(res chan interface{}) {
	// Wait until we have calculated which endpoints need to be selected
	// across multiple goroutines.
	r.wg.Wait()
	reactToRuleUpdates(r.epsToBumpRevision, r.endpointsToRegen, r.newRev, r.upsertIdentities, r.releasePrefixes)
}

Last updated