Cilium Code Walk Through: Restore Endpoints and Identities
This post walks you through the endpoint and identity restoring process during
Cilium agent restart. Code bases on Cilium 1.8.2
.
It’s recommended to read Cilium Code Walk Through: Agent Start Process before reading this post.
This post is included in Cilium Code Walk Through Series.
- 1 Daemon/agent restart
- 2 Extract endpoints info from local files:
restoreOldEndpoints()
- 3 Reserve IP addresses for existing endpoints
- 4 Regenerate BPF for restored endpoints
runDaemon // daemon/cmd/daemon_main.go
|-NewDaemon // -> daemon/cmd/daemon.go
| |-restoredEndpoints := d.restoreOldEndpoints
| |-ioutil.ReadDir
| |-endpoint.FilterEPDir // filter over endpoint directories
| |-for ep := range possibleEPs
| validateEndpoint(ep)
| |-allocateIPsLocked
|-initRestore(restoredEndpoints) // -> daemon/cmd/state.go
|-regenerateRestoredEndpoints // daemon/cmd/state.go
|-for ep
| Expose
| |-NewEventQueueBuffered
| |-eventQueue.Run ------------------>---|
| |
|-for ep |
RegenerateAfterRestore | // -> pkg/endpoint/restore.go
|-restoreIdentity | //
|-Regenerate | // -> pkg/endpoint/policy.go
|-eventQueue.Enqueue(epEvent) |
/\ |
|| |
\/ |
eventQueue.Run() <-----------------------<-----| // pkg/endpoint/events.go
|-for ev := range q.events
metadata.Handle
|-EndpointRegenerationEvent.Handle // pkg/endpoint/events.go
|-regenerate // -> pkg/endpoint/policy.go
|-runPreCompilationSteps
|-updateAndOverrideEndpointOptions
|-writeHeaderfile
| |-ctmap.WriteBPFMacros()
|-regenerateBPF // pkg/endpoint/bpf.go
|-realizeBPFState // pkg/endpoint/bpf.go
|-if CompileAndLoad // pkg/datapath/loader/loader.go
|-compileDatapath // -> pkg/datapath/loader/compile.go
|-reloadDatapath // pkg/datapath/loader/loader.go
|-replaceDatapath // pkg/datapath/loader/loader.go
|-cmd.exec("cilium-map-migrate -s <objPath>")
|-cmd.exec("tc filter replace xx ..")
|-cmd.exec("cilium-map-migrate -e <objPath> -r <retCode>")
elif CompileOrLoad // pkg/datapath/loader/loader.go
|-ReloadDatapath // pkg/datapath/loader/loader.go
|-reloadDatapath // pkg/datapath/loader/loader.go
|-replaceDatapath
|-cmd.exec("cilium-map-migrate -s <objPath>")
|-cmd.exec("tc filter replace xx ..")
|-cmd.exec("cilium-map-migrate -e <objPath> -r <retCode>")
else ReloadDatapath // pkg/datapath/loader/loader.go
|-reloadDatapath // pkg/datapath/loader/loader.go
|-replaceDatapath
|-cmd.exec("cilium-map-migrate -s <objPath>")
|-cmd.exec("tc filter replace xx ..")
|-cmd.exec("cilium-map-migrate -e <objPath> -r <retCode>")
Major steps:
- Restore endpoint info from files:
restoreOldEndpoints()
- Re-generate BPF for endpoints:
regenerateRestoredEndpoints()
- Allocate identity for endpoints (policy based on identies, not on endpoints)
- Regenerate BPF for endpoints
1 Daemon/agent restart
On Cilium agent restarting, method runDaemon()
will be invoked.
runDaemon()
first calls NewDaemon()
to create the daemon process, then
restores endpoint info from files in the local file system;
then, it calls regenerateRestoredEndpoints()
to allocate new identies for
the endpoints, and regenerate BPF for identities.
func NewDaemon(dp datapath.Datapath) (*Daemon, *endpointRestoreState, error) {
d := Daemon{ ... }
d.initMaps() // Open or create BPF maps.
...
// restore endpoints before any IPs are allocated to avoid eventual IP conflicts later on,
// otherwise any IP conflict will result in the endpoint not being able to be restored.
restoredEndpoints := d.restoreOldEndpoints(option.Config.StateDir, true)
...
}
2 Extract endpoints info from local files: restoreOldEndpoints()
By default, the value of option.Config.StateDir
passed to method
restoreOldEndpoints()
is /var/run/cilium/
.
This method performs the first step in restoring the endpoint structure, allocating their existing IP out of the CIDR block and then inserting the endpoints into the endpoints list.
// daemon/cmd/state.go
func (d *Daemon) restoreOldEndpoints(dir string, clean bool) (*endpointRestoreState) {
state := &endpointRestoreState{
restored: []*endpoint.Endpoint{},
toClean: []*endpoint.Endpoint{},
}
existingEndpoints = lxcmap.DumpToMap() // get previous endpoint IDs from BPF map
dirFiles := ioutil.ReadDir(dir) // state dir: `/var/run/cilium/`
eptsID := endpoint.FilterEPDir(dirFiles) // `/var/run/cilium/<ep_id>/lxc_config.h`
possibleEPs := ReadEPsFromDirNames(dir, eptsID) // parse endpoint ID from dir name
for ep := range possibleEPs {
ep.SetAllocator(d.identityAllocator)
d.validateEndpoint(ep) // further call allocateIPsLocked() to retain IP for this endpoint
ep.SetDefaultConfiguration(true)
state.restored.append(ep) // insert into restored list, will regen bpf for them
delete(existingEndpoints, ep.IPv4.String())
}
for hostIP, info := range existingEndpoints // for the remaining endpoints, delete them
if ip := net.ParseIP(hostIP) && !info.IsHost() // from endpoint map
lxcmap.DeleteEntry(ip)
return state
}
Function FilterEPDir()
selects all the directories whose name match one of the
following patterns:
- file name consists of only numbers, e.g.
2303/
, full path/var/run/cilium/2303
*_next/
, full path/var/run/cilium/*_next/
*_next_fail/
, full path/var/run/cilium/*_next_fail/
Then it finds the endpoint C header files in the directories,
with full path like /var/run/cilium/<ep_id>/lxc_config.h
.
3 Reserve IP addresses for existing endpoints
On agent restart, IPAM states are reset, which makes all IP addresses in IPAM available for allocating - including those that already being used by running containers on this host.
From the previous step, agent has recovered the IP addresses (IPv4 and/or IPv6)
that an endpoint is using now by parsing file
/var/run/cilium/<ep_id>/lxc_config.h
.
Now, it must reserve, or re-allocate, the IP addresses from
IPAM to prevent them from being allocated out again. This is accomplished by
calling validateEndpoint(ep) -> allocateIPsLocked()
.
4 Regenerate BPF for restored endpoints
Resources in k8s cluster may have changed during agent restart, e.g. the Service to backend mappings. Besides, there may also be configuration changes for the agent.
So to keep endpoint states up to date, we need to regenerate the BPF code & map for this endpoint:
// daemon/cmd/state.go
func (d *Daemon) regenerateRestoredEndpoints(state) (restoreComplete chan struct{}) {
epRegenerated := make(chan bool, len(state.restored))
for i := len(state.restored)-1; i >= 0; i-- {
ep := state.restored[i]
ep.Expose(d.endpointManager) // Insert ep into endpoint manager so it can be regenerated
} // later with RegenerateAllEndpoints().
for ep := range state.restored // loop over restored endpoints
go func() {
ep.RegenerateAfterRestore() // perform BPF regeneration
epRegenerated <- true
}(ep, epRegenerated)
for ep := range state.toClean // clean the endpoints that no need to restore
d.deleteEndpointQuiet(ep)
go func() {
for buildSuccess := range epRegenerated
if total++ >= len(state.restored) break
log.Info("Finished regenerating restored endpoints")
}()
}
Major steps:
- Insert all waiting-to-be-restored endpoints into endpoint manager via
Expose()
; this will create an event queue for each endpoint, and the queue will listen to BPF regenerate events. - Restore identity for endpoint
- Enqueue an
EndpointRegenerationEvent
into endpoint’s event queue, which is created in step 1. - On receiving
EndpointRegenerationEvent
, handler performs BPF regeneration.
Let’s see them in detail.
4.1 Create per-endpoint event queue
// pkg/endpoint/manager.go
// Expose exposes the endpoint to the endpointmanager
func (e *Endpoint) Expose(mgr endpointManager) error {
newID := mgr.AllocateID(e.ID)
e.ID = newID
e.eventQueue = eventqueue.NewEventQueueBuffered("endpoint-"+e.ID, Config.EndpointQueueSize)
e.eventQueue.Run()
e.updateReferences(mgr)
e.getLogger().Info("New endpoint")
mgr.RunK8sCiliumEndpointSync(e, option.Config)
}
// pkg/eventqueue/eventqueue.go
func (q *EventQueue) Run() {
go q.eventQueueOnce.Do(func() {
for ev := range q.events {
select {
default:
ev.Metadata.Handle(ev.eventResults)
}
}
})
}
4.2 Restore identities for endpoints
// pkg/endpoint/restore.go
// RegenerateAfterRestore performs the following operations on the specified Endpoint:
// * allocates an identity for the Endpoint
// * regenerates the endpoint
func (e *Endpoint) RegenerateAfterRestore() error {
e.restoreIdentity()
regenerationMetadata := ®eneration.ExternalRegenerationMetadata{
Reason: "syncing state to host",
RegenerationLevel: regeneration.RegenerateWithDatapathRewrite,
}
buildSuccess := <-e.Regenerate(regenerationMetadata)
log.Info("Restored endpoint")
}
4.3 Enqueue EndpointRegenerationEvent
event into endpoint’s queue
Regenerate()
is defined as:
// pkg/endpoint/policy.go
// Regenerate forces the regeneration of endpoint programs & policy
// Should only be called with e.state at StateWaitingToRegenerate, StateWaitingForIdentity, or StateRestoring
func (e *Endpoint) Regenerate(regenMetadata *regeneration.ExternalRegenerationMetadata) <-chan bool {
done := make(chan bool, 1)
regenContext := ParseExternalRegenerationMetadata(ctx, cFunc, regenMetadata)
epEvent := eventqueue.NewEvent(&EndpointRegenerationEvent{regenContext: regenContext, ep: e})
resChan := e.eventQueue.Enqueue(epEvent)
return done
}
As can be seen, Regenerate()
enqueues an EndpointRegenerationEvent
into the
the endpoint’s event queue.
Now let’s go to the receiving side of this event queue.
4.4 Event handler: perform BPF regeneration
Handler for the EndpointRegenerationEvent
type event:
// pkg/endpoint/events.go
// Handle handles the regeneration event for the endpoint.
func (ev *EndpointRegenerationEvent) Handle(res chan interface{}) {
e := ev.ep
doneFunc := e.owner.QueueEndpointBuild(e.ID)
if doneFunc != nil { // dequeued endpoint from build queue
ev.ep.regenerate(ev.regenContext)
doneFunc()
e.notifyEndpointRegeneration(err)
}
res <- &EndpointRegenerationResult{ err: err, }
}
where regenerate()
is defined as:
// pkg/endpoint/policy.go
func (e *Endpoint) regenerate(context *regenerationContext) (retErr error) {
origDir := e.StateDirectoryPath()
tmpDir := e.NextDirectoryPath()
e.removeDirectory(tmpDir)
os.MkdirAll(tmpDir, 0777)
defer func() { e.removeDirectory(tmpDir) }()
revision, stateDirComplete = e.regenerateBPF(context)
return e.updateRealizedState(stats, origDir, revision, stateDirComplete)
}
It creates a temporary directory for creating new BPFs, then calls regenerateBPF()
.
The latter will further call more deeper methods to fulfull the BPF
regeneration. As this call stack is fairly deep, we list them in a dedicated
section in the below.
4.5 From regenerateBPF()
to the eventual clang/tc
commands
ReloadDatapath()
(or compileAndLoad()
) occupies most of the total endpoint regeneration time,
recorded as scope=bpfLoadProg
in metric cilium_endpoint_regeneration_time_stats_seconds_bucket
.
// pkg/endpoint/bpf.go
// regenerateBPF rewrites all headers and updates all BPF maps to reflect the specified endpoint.
// ReloadDatapath forces the datapath progs to be reloaded. It does not guarantee recompilation of the programs.
//
// Returns the policy revision number when the regeneration has called,
// Whether the new state dir is populated with all new BPF state files.
func (e *Endpoint) regenerateBPF(regenContext) (revnum uint64, stateDirComplete bool) {
headerfileChanged = e.runPreCompilationSteps() // execute clang commands to compile BPF
<-datapathRegenCtxt.ctCleaned // Wait for conntrack cleaning to complete
compilationExecuted = e.realizeBPFState() // execute tc commands to reload BPF
if !datapathRegenCtxt.epInfoCache.IsHost() {
// Hook the endpoint into the endpoint and endpoint to policy tables then expose it
epErr := eppolicymap.WriteEndpoint(epInfoCache, e.policyMap)
lxcmap.WriteEndpoint(epInfoCache)
}
e.closeBPFProgramChannel() // Signal that BPF program has been generated.
// The endpoint has at least L3/L4 connectivity at this point.
e.syncPolicyMap() // Synchronously try to update PolicyMap for this endpoint.
stateDirComplete = headerfileChanged && compilationExecuted
return datapathRegenCtxt.epInfoCache.revision, stateDirComplete, err
}
// pkg/endpoint/bpf.go
func (e *Endpoint) realizeBPFState() (compilationExecuted bool, err error) {
if regenLevel == RegenerateWithDatapathRebuild { // compile AND load
e.owner.Datapath().Loader().CompileAndLoad()
compilationExecuted = true
} else if regenLevel == RegenerateWithDatapathRewrite { // compile OR load
e.owner.Datapath().Loader().CompileOrLoad()
compilationExecuted = true
} else { // RegenerateWithDatapathLoad // reload
e.owner.Datapath().Loader().ReloadDatapath()
}
e.bpfHeaderfileHash = datapathRegenCtxt.bpfHeaderfilesHash
return compilationExecuted, nil
}
// pkg/datapath/loader/loader.go
func (l *Loader) CompileOrLoad(ctx context.Context, ep datapath.Endpoint, stats *metrics.SpanStat) error {
templatePath := l.templateCache.fetchOrCompile(ctx, ep, stats)
template := elf.Open(templatePath)
symPath := path.Join(ep.StateDir(), defaults.TemplatePath)
os.RemoveAll(symPath)
os.Symlink(templatePath, symPath)
epObj := endpointObj
if ep.IsHost()
epObj = hostEndpointObj
dstPath := path.Join(ep.StateDir(), epObj)
opts, strings := ELFSubstitutions(ep)
template.Write(dstPath, opts, strings)
return l.ReloadDatapath(ctx, ep, stats)
}
// ReloadDatapath reloads the BPF datapath pgorams for the specified endpoint.
func (l *Loader) ReloadDatapath(ctx context.Context, ep datapath.Endpoint, stats *metrics.SpanStat) (err error) {
dirs := directoryInfo{
Library: option.Config.BpfDir,
Runtime: option.Config.StateDir,
State: ep.StateDir(),
Output: ep.StateDir(),
}
return l.reloadDatapath(ctx, ep, &dirs)
}
4.5.1 Pre-compile BPF
See runPreCompilationSteps()
.
4.5.2 Compile BPF
See realizeBPFState() -> CompileAndLoad() -> compileAndLoad() -> compileDatapath()
.
// pkg/datapath/loader/loader.go
// CompileAndLoad compiles the BPF datapath programs for the specified endpoint
// and loads it onto the interface associated with the endpoint.
//
// Expects the caller to have created the directory at the path ep.StateDir().
func (l *Loader) CompileAndLoad(ctx context.Context, ep datapath.Endpoint, stats *metrics.SpanStat) error {
dirs := directoryInfo{
Library: option.Config.BpfDir,
Runtime: option.Config.StateDir,
State: ep.StateDir(),
Output: ep.StateDir(),
}
return l.compileAndLoad(ctx, ep, &dirs, stats)
}
func (l *Loader) compileAndLoad(ctx context.Context, ep datapath.Endpoint, dirs *directoryInfo, stats *metrics.SpanStat) error {
compileDatapath(ctx, dirs, ep.IsHost(), debug, ep.Logger(Subsystem))
return l.reloadDatapath(ctx, ep, dirs)
}
4.5.3 Reload BPF
All the above three cases will eventually call reloadDatapath()
to fulfill the
BPF reloading for the endpoint.
// pkg/datapath/loader/loader.go
func (l *Loader) reloadDatapath(ctx context.Context, ep datapath.Endpoint, dirs *directoryInfo) error {
objPath := path.Join(dirs.Output, endpointObj)
if ep.IsHost() {
objPath = path.Join(dirs.Output, hostEndpointObj)
l.reloadHostDatapath(ctx, ep, objPath)
} else if ep.HasIpvlanDataPath() {
...
} else {
l.replaceDatapath(ctx, ep.InterfaceName(), objPath, symbolFromEndpoint, dirIngress)
if ep.RequireEgressProg()
l.replaceDatapath(ctx, ep.InterfaceName(), objPath, symbolToEndpoint, dirEgress)
}
if ep.RequireEndpointRoute() {
if ip := ep.IPv4Address(); ip.IsSet()
upsertEndpointRoute(ep, *ip.IPNet(32))
}
}
// pkg/datapath/loader/netlink.go
// replaceDatapath the qdisc and BPF program for a endpoint
func (l *Loader) replaceDatapath(ctx context.Context, ifName, objPath, progSec, progDirection string) error {
replaceQdisc(ifName)
cmd := exec.CommandContext(ctx, "cilium-map-migrate", "-s", objPath)
cmd.CombinedOutput(log, true)
defer func() {
if err == nil
retCode = "0"
else
retCode = "1"
args := []string{"-e", objPath, "-r", retCode}
cmd := exec.CommandContext(ctx, "cilium-map-migrate", args...)
cmd.CombinedOutput(log, true) // ignore errors
}()
args := []string{"filter", "replace", "dev", ifName, progDirection,
"prio", "1", "handle", "1", "bpf", "da", "obj", objPath, "sec", progSec, }
cmd = exec.CommandContext(ctx, "tc", args...).WithFilters(libbpfFixupMsg)
cmd.CombinedOutput(log, true)
}