ArthurChiao's Blog

Cilium Code Walk Through: Agent Start Process

Published at 2019-05-29 | Last Update 2020-09-01

This post walks through the cilium agent start process. Code bases on 1.8.2.

This post belongs to Cilium Code Walk Through Series.



runDaemon                                                                    //    daemon/cmd/daemon_main.go
  |-enableIPForwarding                                                       
  |-k8s.Init                                                                 // -> pkg/k8s/init.go
  |-NewDaemon                                                                // -> daemon/cmd/daemon.go
  |  |-d := Daemon{}
  |  |-d.initMaps                                                            //    daemon/cmd/datapath.go
  |  |-d.svc.RestoreServices                                                 // -> pkg/service/service.go
  |  |  |-restoreBackendsLocked
  |  |  |-restoreServicesLocked
  |  |-d.k8sWatcher.RunK8sServiceHandler                                     //    pkg/k8s/watchers/watcher.go
  |  |  |-k8sServiceHandler                                                  //    pkg/k8s/watchers/watcher.go
  |  |    |-eventHandler                                                     //    pkg/k8s/watchers/watcher.go
  |  |-k8s.RegisterCRDs
  |  |-d.bootstrapIPAM                                                       // -> daemon/cmd/ipam.go
  |  |-restoredEndpoints := d.restoreOldEndpoints                            // -> daemon/cmd/state.go
  |  |  |-ioutil.ReadDir                                                   
  |  |  |-endpoint.FilterEPDir // filter over endpoint directories
  |  |  |-for ep := range possibleEPs
  |  |      validateEndpoint(ep)
  |  |        |-allocateIPsLocked
  |  |-k8s.Client().AnnotateNode                                           
  |  |-d.bootstrapClusterMesh                                              
  |  |-d.init                                                                //    daemon/cmd/daemon.go
  |  |  |-os.MkdirAll(globalsDir)
  |  |  |-d.createNodeConfigHeaderfile
  |  |  |-d.Datapath().Loader().Reinitialize
  |  |-monitoragent.NewAgent
  |  |-d.syncEndpointsAndHostIPs                                             // -> daemon/cmd/datapath.go
  |  |  |-insert special identities to lxcmap, ipcache
  |  |-UpdateController("sync-endpoints-and-host-ips")
  |  |-loader.RestoreTemplates                                               // -> pkg/datapath/loader/cache.go
  |  |  |-os.RemoveAll()
  |  |-ipcache.InitIPIdentityWatcher                                         // -> pkg/ipcache/kvstore.go
  |     |-watcher = NewIPIdentityWatcher
  |     |-watcher.Watch
  |        |-IPIdentityCache.Upsert/Delete
  |-gc.Enable                                                                // -> pkg/maps/ctmap/gc/gc.go
  |   |-for { runGC() } // conntrack & nat gc
  |-initKVStore
  |  |-UpdateController("kvstore-locks-gc", RunLocksGC)
  |  |-kvstore.Setup
  |-initRestore(restoredEndpoints)
  |  |-regenerateRestoredEndpoints(restoredEndpoints)                        // daemon/cmd/state.go
  |  |-UpdateController("sync-lb-maps-with-k8s-services")
  |-initHealth
  |-startStatusCollector
  |  |-status.NewCollector(probes)                                           // pkg/status
  |-startAgentHealthHTTPService
  |-SendNotification
  |  |-monitorAgent.SendEvent(AgentNotifyStart)
  |-srv.Serve()  // start Cilium agent API server
  |-k8s.Client().MarkNodeReady()
  |-launchHubble()

0 Overview

The daemon process starts by executing command

$ cilium-agent --config-dir=/tmp/cilium/config-map

This will trigger runDaemon() method with provided CLI arguments:

// daemon/cmd/daemon_main.go

func runDaemon() {
    enableIPForwarding()                                  // turn on ip forwarding in kernel
    k8s.Init(Config)                                      // init k8s utils
                                                          
    d, restoredEndpoints := NewDaemon()                   
    gc.Enable(restoredEndpoints.restored)                 // Starting connection tracking garbage collector
    d.initKVStore()                                       // init cilium-etcd
                                                          
    restoreComplete := d.initRestore(restoredEndpoints)

    d.initHealth()                                        // init cilium health-checking if enabled
    d.startStatusCollector()                              
    d.startAgentHealthHTTPService()                       
                                                          
    d.SendNotification(monitorAPI.AgentNotifyStart, repr)

    go func() { errs <- srv.Serve() }()                   // start Cilium HTTP API server

    k8s.Client().MarkNodeReady(nodeTypes.GetName())
    d.launchHubble()
}

This method performs following tasks sequentially:

  1. Prapare to create daemon
    1. Enable IP forwarding in kernel
    2. Init k8s package
  2. Create new daemon, restore endpoints
  3. Enable conntrack/nat GC
  4. Init kvstore (cilium-etcd)
  5. Regenerate BPF for endpoints
  6. Init health checks, status checker, metrics
  7. Send agent started messsage to monitor
  8. Serve Cilium HTTP API
  9. Mark node as ready in k8s
  10. Start hubble if enabled

1 Prepare to create daemon

1.1 Enable IP forwarding: enableIPForwarding()

For Linux, this will set below sysctl parameters for the node:

  • net.ipv4.ip_forward=1
  • net.ipv6.ip_forward=1
  • net.ipv4.conf.all.forwarding=1

1.2 Init k8s package: k8s.Init()

// Init initializes the Kubernetes package. It is required to call Configure() beforehand.
func Init(conf k8sconfig.Configuration) error {
    k8sRestClient := createDefaultClient()
    heartBeat := func() { k8sRestClient.Get().Resource("healthz").Do(ctx) }

    if Config.K8sHeartbeatTimeout != 0
        controller.NewManager().UpdateController("k8s-heartbeat", ControllerParams{
                DoFunc: ...
                RunInterval: Config.K8sHeartbeatTimeout,
            })

    k8sversion.Update(Client(), conf)
}

2 Create daemon: NewDaemon()

// daemon/cmd/daemon.go

func NewDaemon(epMgr *EndpointManager, dp Datapath) (*Daemon, *endpointRestoreState, error) {
    if Config.ReadCNIConfiguration != ""     // read custom cni conf if specified
        netConf = cnitypes.ReadNetConf(Config.ReadCNIConfiguration)

    d := Daemon{ netConf: netConf, datapath: dp }
    d.initMaps()                // Open or create BPF maps.

    if Config.RestoreState
        d.svc.RestoreServices() // Read the service IDs of existing services from the BPF map and reserve them.

    d.k8sWatcher.RunK8sServiceHandler()

    k8s.RegisterCRDs()
    if Config.IPAM == ipamIPAMOperator
        d.nodeDiscovery.UpdateCiliumNodeResource()

    d.k8sCachesSynced = d.k8sWatcher.InitK8sSubsystem()
    d.bootstrapIPAM()

    restoredEndpoints := d.restoreOldEndpoints(Config.StateDir, true)
    d.allocateIPs()

    // Annotation must after discovery of the PodCIDR range and allocation of the health IPs.
    k8s.Client().AnnotateNode(nodeName, GetIPv4AllocRange(), GetInternalIPv4())

    d.bootstrapClusterMesh(nodeMngr)
    d.init()
    d.syncEndpointsAndHostIPs()

    controller.NewManager().UpdateController("sync-endpoints-and-host-ips", ControllerParams{
            DoFunc: func() { return d.syncEndpointsAndHostIPs() }, })

    loader.RestoreTemplates(Config.StateDir) // restore previous BPF templates

    // Start watcher for endpoint IP --> identity mappings in key-value store.
    ipcache.InitIPIdentityWatcher()
    identitymanager.Subscribe(d.policy)

    return &d, restoredEndpoints, nil
}

2.1 Read custom CNI file if configured

See Trip.com: First Step towards Cloud-native Networking for how custom CNI can be used.

2.2 Create daemon instance

It then initiates a daemon instance, and its fields:

    d := Daemon{ netConf: netConf, datapath: dp }
    d.svc = service.NewService(&d)
    d.identityAllocator = cache.NewCachingIdentityAllocator(&d)
    d.policy = policy.NewPolicyRepository()
    ...

2.3initMaps(): open or create all BPF maps

// daemon/cmd/datapath.go

// initMaps opens all BPF maps (and creates them if they do not exist).
func (d *Daemon) initMaps() error {
    lxcmap.LXCMap.OpenOrCreate()
    ipcachemap.IPCache.OpenParallel()
    ...
    d.svc.InitMaps(Config.EnableIPv4, createSockRevNatMaps, Config.RestoreState)
    policymap.InitCallMap()

    for ep := range d.endpointManager.GetEndpoints()
        ep.InitMap()

    for ep := range d.endpointManager.GetEndpoints()
        for m := range ctmap.LocalMaps(ep, Config.EnableIPv4)
            m.Create()

    for m := range ctmap.GlobalMaps(Config.EnableIPv4)
        m.Create()

    ipv4Nat := nat.GlobalMaps(Config.EnableIPv4)
    ipv4Nat.Create()
    if Config.EnableNodePort
       neighborsmap.InitMaps(Config.EnableIPv4)

    // Set up the list of IPCache listeners in the daemon, to be used by syncEndpointsAndHostIPs()
    ipcache.IPIdentityCache.SetListeners()

    if !Config.RestoreState
        lxcmap.LXCMap.DeleteAll() // If we are not restoring state, all endpoints can be deleted.

    if Config.EnableSessionAffinity {
        lbmap.AffinityMatchMap.OpenOrCreate()
        lbmap.Affinity4Map.OpenOrCreate()
    }
}

2.4 RestoreServices(): restore service <-> backend mappings from BPF maps

    if Config.RestoreState
        d.svc.RestoreServices() // Read the service IDs of existing services from the BPF map and reserve them.
// pkg/service/service.go

// RestoreServices restores services from BPF maps.
//
// The method should be called once before establishing a connectivity to kube-apiserver.
func (s *Service) RestoreServices() error {
    s.restoreBackendsLocked() // Restore backend IDs
    s.restoreServicesLocked() // Restore service cache from BPF maps

    if option.Config.EnableSessionAffinity
        s.deleteOrphanAffinityMatchesLocked() // Remove no longer existing affinity matches

    s.deleteOrphanBackends() // Remove obsolete backends and release their IDs
}

2.5 RunK8sServiceHandler(): listen to k8s Service changes

    d.k8sWatcher.RunK8sServiceHandler()
// pkg/k8s/watchers/watcher.go

func (k *K8sWatcher) RunK8sServiceHandler() {
    go k.k8sServiceHandler()
}

func (k *K8sWatcher) k8sServiceHandler() {
    eventHandler := func(event k8s.ServiceEvent) {
        svc := event.Service

        switch event.Action {
        case k8s.UpdateService:
            k.addK8sSVCs(event.ID, event.OldService, svc, event.Endpoints)
            translator := k8s.NewK8sTranslator(event.ID, *event.Endpoints, false, svc.Labels, true)
            result := k.policyRepository.TranslateRules(translator)
            if result.NumToServicesRules > 0 // Only trigger policy updates if ToServices rules are in effect
                k.policyManager.TriggerPolicyUpdates(true, "Kubernetes service endpoint added")

        case k8s.DeleteService:
            ...
        }
    }

    for {
        event := <-k.K8sSvcCache.Events // Kubernetes service definition changed
        eventHandler(event)
    }
}

2.6 RegisterCRDs

    k8s.RegisterCRDs()

    if Config.IPAM == ipamIPAMOperator
        d.nodeDiscovery.UpdateCiliumNodeResource()

2.7 bootstrapIPAM

    d.bootstrapIPAM()
// daemon/cmd/ipam.go

func (d *Daemon) bootstrapIPAM() {
    if Config.IPv4Range != AutoCIDR {
        allocCIDR := cidr.ParseCIDR(Config.IPv4Range)
        node.SetIPv4AllocRange(allocCIDR)
    }

    node.AutoComplete()

    // Set up ipam conf after init() because we might be running d.conf.KVStoreIPv4Registration
    d.ipam = ipam.NewIPAM(d.datapath.LocalNodeAddressing(), option.Config, d.nodeDiscovery, d.k8sWatcher)
}

2.8 restoreOldEndpoints(): extract endpoints info from local files

    restoredEndpoints := d.restoreOldEndpoints(Config.StateDir, true)
    d.allocateIPs()
// daemon/cmd/state.go

// Perform the first step in restoring the endpoint structure,
// allocating their existing IP out of the CIDR block and then inserting the
// endpoints into the endpoints list. It needs to be followed by a call to
// regenerateRestoredEndpoints() once the endpoint builder is ready.
func (d *Daemon) restoreOldEndpoints(dir string, clean bool) (*endpointRestoreState) {
    state := &endpointRestoreState{
        restored: []*endpoint.Endpoint{},
        toClean:  []*endpoint.Endpoint{},
    }

    existingEndpoints = lxcmap.DumpToMap()             // get previous endpoint IDs from BPF map
    dirFiles := ioutil.ReadDir(dir)                    // state dir: `/var/run/cilium/`
    eptsID := endpoint.FilterEPDir(dirFiles)           // `/var/run/cilium/<ep_id>/lxc_config.h`
                                                       
    possibleEPs := ReadEPsFromDirNames(dir, eptsID)    // parse endpoint ID from dir name
    for ep := range possibleEPs {                      
        ep.SetAllocator(d.identityAllocator)           
        d.validateEndpoint(ep)  // further call allocateIPsLocked() to retain IP for this endpoint
        ep.SetDefaultConfiguration(true)               
                                                       
        state.restored.append(ep)                      // insert into restored list, will regen bpf for them
        delete(existingEndpoints, ep.IPv4.String())
    }

    for hostIP, info := range existingEndpoints        // for the remaining endpoints, delete them
        if ip := net.ParseIP(hostIP) && !info.IsHost() // from endpoint map
            lxcmap.DeleteEntry(ip)

    return state
}

2.9 AnnotateNode

    // Annotation must after discovery of the PodCIDR range and allocation of the health IPs.
    k8s.Client().AnnotateNode(nodeName, GetIPv4AllocRange(), GetInternalIPv4())

See Cilium Code Walk Through: Agent CIDR Init.

2.10 d.init(): clean state dir, setup sockops, init datapath loader

    d.bootstrapClusterMesh(nodeMngr)
    d.init()
    d.monitorAgent = monitoragent.NewAgent(context.TODO(), defaults.MonitorBufferPages)
    d.syncEndpointsAndHostIPs()

    controller.NewManager().UpdateController("sync-endpoints-and-host-ips", ControllerParams{
            DoFunc: func() { return d.syncEndpointsAndHostIPs() }, })

// daemon/cmd/daemon.go

func (d *Daemon) init() error {
    globalsDir := option.Config.GetGlobalsDir()
    os.MkdirAll(globalsDir, defaults.RuntimePathRights)
    os.Chdir(option.Config.StateDir)

    // Remove any old sockops and re-enable with _new_ programs if flag is set
    sockops.SockmapDisable()
    sockops.SkmsgDisable()

    d.createNodeConfigHeaderfile()

    if Config.SockopsEnable {
        eppolicymap.CreateEPPolicyMap()
        sockops.SockmapEnable()
    }

    d.Datapath().Loader().Reinitialize(d.ctx, d, mtu, d.Datapath(), d.l7Proxy, d.ipam)
}
// daemon/cmd/datapath.go

// syncLXCMap adds local host enties to bpf lxcmap, as well as ipcache, if needed, and also
// notifies the daemon and network policy hosts cache if changes were made.
func (d *Daemon) syncEndpointsAndHostIPs() error {
    specialIdentities := []identity.IPIdentityPair{}

    addrs := d.datapath.LocalNodeAddressing().IPv4().LocalAddresses()
    for ip := range addrs
        specialIdentities.append(IPIdentityPair{IP: ip, ID: identity.ReservedIdentityHost})

    specialIdentities.append(identity.IPIdentityPair{
            IP:   net.IPv4zero,
            Mask: net.CIDRMask(0, net.IPv4len*8),
            ID:   identity.ReservedIdentityWorld,
        })

    existingEndpoints := lxcmap.DumpToMap()

    for ipIDPair := range specialIdentities {
        hostKey := node.GetIPsecKeyIdentity()
        isHost := ipIDPair.ID == identity.ReservedIdentityHost
        if isHost
            lxcmap.SyncHostEntry(ipIDPair.IP) // Added local ip to endpoint map

        delete(existingEndpoints, ipIDPair.IP.String())

        // Upsert will not propagate (reserved:foo->ID) mappings across the cluster, designed so.
        ipcache.IPIdentityCache.Upsert(ipIDPair.Prefix, hostKey,
            Identity{ID: ipIDPair.ID, Source: source.Local})
    }

    for hostIP, info := range existingEndpoints
        if ip := net.ParseIP(hostIP); info.IsHost()
            lxcmap.DeleteEntry(ip) // Removed outdated host ip from endpoint map
            ipcache.IPIdentityCache.Delete(hostIP, source.Local)
        }
}

2.11 RestoreTemplates

    loader.RestoreTemplates(Config.StateDir) // restore previous BPF templates

    // Start watcher for endpoint IP --> identity mappings in key-value store.
    ipcache.InitIPIdentityWatcher()
    identitymanager.Subscribe(d.policy)
// pkg/datapath/loader/cache.go

// RestoreTemplates populates the object cache from templates on the filesystem at the specified path.
func RestoreTemplates(stateDir string) error {
    path := filepath.Join(stateDir, defaults.TemplatesDir)
    RemoveAll(path)
}
// pkg/ipcache/kvstore.go

// InitIPIdentityWatcher initializes the watcher for ip-identity mapping events in the key-value store.
func InitIPIdentityWatcher() {
    go func() {
        watcher = NewIPIdentityWatcher(kvstore.Client())
        watcher.Watch(context.TODO())
    }()
}

// When events are received from the kvstore, All IPIdentityMappingListener are notified.
func (iw *IPIdentityWatcher) Watch(ctx context.Context) {
    watcher := iw.backend.ListAndWatch(ctx, "endpointIPWatcher", IPIdentitiesPath, 512)

    for {
        select {
        case event, ok := <-watcher.Events:
            // Synchronize local caching of endpoint IP to ipIDPair mapping with
            // operation key-value store has informed us about.
            switch event.Typ {
            case EventTypeListDone:
                for listener := range IPIdentityCache.listeners
                    listener.OnIPIdentityCacheGC()

            case EventTypeCreate, EventTypeModify:
                json.Unmarshal(event.Value, &ipIDPair)
                IPIdentityCache.Upsert(ip, HostIP, ipIDPair.Key, k8sMeta, Identity{ipIDPair.ID, source.KVStore})

            case kvstore.EventTypeDelete:
                ...
            }
        }
    }
}

3 gc.Enable(): enable conntrack/nat GC

// pkg/maps/ctmap/gc/gc.go

// Enable enables the connection tracking garbage collection.
func Enable(ipv4, ipv6 bool, restoredEndpoints []*endpoint.Endpoint, mgr EndpointManager) {
    go func() {
        for {
            epsMap = make(map[string]*endpoint.Endpoint) // contains an IP -> EP mapping.

            gcStart = time.Now()
            eps := mgr.GetEndpoints()
            for e := range eps
                epsMap[e.GetIPv4Address()] = e

            if len(eps) > 0 || initialScan
                maxDeleteRatio = runGC(nil, ipv4, createGCFilter(initialScan, restoredEndpoints, emitEntryCB))

            for e := range eps
                runGC(e, ipv4, &ctmap.GCFilter{RemoveExpired: true, EmitCTEntryCB: emitEntryCB})
    }()
}

4 initKVStore(): setup some GC jobs, connect to kvstore

// daemon/cmd/daemon_main.go

func (d *Daemon) initKVStore() {
    controller.NewManager().UpdateController("kvstore-locks-gc", ControllerParams{
            // agent writes a file as lock before accessing a key in kvstore, run GC for those lock files
            DoFunc: func() { kvstore.RunLockGC() },
            RunInterval: defaults.KVStoreStaleLockTimeout,
        })

    kvstore.Setup(Config.KVStore, Config.KVStoreOpt) // connect to kvstore
}

5 initRestore(): regenerate BPF for restored endpoints

// daemon/cmd/state.go

func (d *Daemon) initRestore(restoredEndpoints *endpointRestoreState) chan struct{} {
    // When we regenerate restored endpoints, it is guaranteed tha we have
    // received the full list of policies present at the time the daemon is bootstrapped.
    restoreComplete = d.regenerateRestoredEndpoints(restoredEndpoints)

    go func() {
        if d.clustermesh != nil               // wait for all cluster mesh to be synchronized
            d.clustermesh.ClustersSynced()    // with the datapath before proceeding.

        // Start controller which removes any leftover Kubernetes services that may have been deleted
        // while Cilium was not running. It will not run again unless updated elsewhere.
        // This means that if, for instance, a user manually adds a service via the CLI into the BPF maps,
        // that it will not be cleaned up by the daemon until it restarts.
        controller.NewManager().UpdateController("sync-lb-maps-with-k8s-services", ControllerParams{
                DoFunc: func() { return d.svc.SyncWithK8sFinished() },
            })
    }()
    return restoreComplete
}

5.1 Re-regenerate restored endpoints

This will regenerate BPF for all restored endpoints.

See Cilium Code Walk Through: Agent Restore Endpoints And Identities.

5.2 Init ClusterMesh if enabled

See Cilium Code Walk Through: ClusterMesh.

6 Init health checks, metrics, Cilium API server

6.1 Init health checks

See Cilium Code Walk Through: Cilium Health.

6.2 Init status collector

//daemon/cmd/status.go

func (d *Daemon) startStatusCollector() {
    probes := []status.Probe{
        {
            Name: "check-locks",
            Probe: func() (interface{}, error) { ...  },
        }, {
            Name: "kvstore",
            Probe: func() { return kvstore.Client().Status() },
        }, {
            Name: "kubernetes",
            Probe: func() () { return d.getK8sStatus() 
        }, {
            Name: "ipam", ...
        }, {
            Name: "node-monitor",
            Probe: func() () { return d.monitorAgent.State() },
        }, {
            Name: "cluster",
            Probe: func() {return &models.ClusterStatus{d.nodeDiscovery.LocalNode.Fullname() }
        }, {
            Name: "cilium-health", ...
        }, {
            Name: "l7-proxy",
            Probe: func() () { return d.l7Proxy.GetStatusModel() },
        }, {
            Name: "controllers",
            Probe: func() () { return controller.GetGlobalStatus(), nil },
        }, {
            Name: "clustermesh",
            Probe: func() () { return d.clustermesh.Status(), nil },
        },
    }

    d.statusCollector = status.NewCollector(probes, status.Config{})
}

7 Send notification to monitor

Sends a "Cilium agent started" (monitorAPI.AgentNotifyStart) message to the monitor.

// daemon/cmd/daemon.go

// sends an agent notification to the monitor
func (d *Daemon) SendNotification(typ, text string) {
    event := monitorAPI.AgentNotify{Type: typ, Text: text}
    return d.monitorAgent.SendEvent(MessageTypeAgent, event)
}

8 Serve Cilium HTTP API

At this point, cilium commands can be correctly get handled, e.g. cilium status --brief.

9 Mark node ready

10 Launch hubble if configured

Misc

IPAM States Restoration

IPAM manages IP address allocation, it tracks two states:

{
    "allocated_ips": [], // IPs have been allocated out
    "available_ips": [], // IPs avaiable for allocation
}

IPAM stores its states in memory. How could this survive agent restart or host reboot?

The secret lies in the files on local node:

  1. For each allocated IP, Cilium creates an endpoint for it, and write the endpoint info into a local file (C header file).
  2. When agent restarts or host reboots, IPAM states in memory will be reset. The agent will loop over all endpoint files, parsing the IP inside it, and reserve them in IPAM.

In this way, IPAM restores its states.