Cilium Code Walk Through: CNI Create Network
This post is included in Cilium Code Walk Through Series.
TL;DR
This post walks through the code of Cilium CNI creating network for a Pod.
Call stack (code based on 1.8.2
/1.5.1
):
cmdAdd // plugins/cilium-cni/cilium-cni.go
|-loadNetConf(args.StdinData) // plugins/cilium-cni/cilium-cni.go
|-RemoveIfFromNetNSIfExists
|-SetupVeth or IPVLAN
|-IPAMAllocate // plugins/cilium-cni/cilium-cni.go
| |-PostIPAM // api/v1/client/ipam/i_p_a_m_client.go
| /\
| ||
| \/
| ServeHTTP // api/v1/server/restapi/ipam/post_ip_a_m.go
| |-Handler.Handle() // api/v1/server/restapi/ipam/post_ip_a_m.go
| |- Handle() // daemon/cmd/ipam.go
| |-AllocateNext // pkg/ipam/allocator.go
| |-AllocateNextFamily // pkg/ipam/allocator.go
| |-allocateNextFamily // pkg/ipam/allocator.go
| |-AllocateNext // k8s.io/kubernetes/pkg/registry/core/service/ipallocator
| |-AllocateNext // k8s.io/kubernetes/pkg/registry/core/service/allocator/bimap.go
|-configureIface //
| |-addIPConfigToLink
| |-netlink.AddrAdd
| |-netlink.RouteAdd
|-EndpointCreate
|-PutEndpointID
/\
||
\/
ServeHTTP // api/server/restapi/endpint/put_endpoint_id.go
|-Handler.Handle() // api/server/restapi/endpint/put_endpoint_id.go
|- Handle() // daemon/cmd/endpoint.go
|-createEndpoint // daemon/cmd/endpoint.go
|-NewEndpointFromChangeModel // pkg/endpoint/api.go
|-fetchK8sLabelsAndAnnotations // daemon/cmd/endpoint.go
|-endpointmanager.AddEndpoint
| |-Expose // pkg/endpointmanager/manager.go
| |-AllocateID // pkg/endpoint/manager.go
| |-RunK8sCiliumEndpointSync(e) // pkg/k8s/watchers/endpointsynchronizer.go
|-ep.UpdateLabels // pkg/endpoint/endpoint.go
| |-replaceInformationLabels // pkg/endpoint/endpoint.go
| |-ReplaceIdentityLabels // pkg/endpoint/endpoint.go
| |-RunIdentityResolver // pkg/endpoint/endpoint.go
| |-identityLabelsChanged // pkg/endpoint/endpoint.go
| |-AllocateIdentity // kvstore: reuse existing or create new one
| |-forcePolicyComputation // pkg/endpoint/endpoint.go
| |-SetIdentity
| |-runIPIdentitySync // pkg/endpoint/policy.go
| |-UpsertIPToKVStore // pkg/ipcache/kvstore.go
|-Regenerate // pkg/endpoint/policy.go
|-regenerate // pkg/endpoint/policy.go
|-regenerateBPF // pkg/endpoint/bpf.go
|-runPreCompilationSteps
| |-regeneratePolicy
| |-writeHeaderfile
|-realizeBPFState
|-CompileAndLoad // pkt/datapath/loader/loader.go
|-compileAndLoad // pkt/datapath/loader/loader.go
|-compileDatapath // pkt/datapath/loader/loader.go
| |-compile // pkt/datapath/loader/compile.go
| |-compileAndLink // pkt/datapath/loader/compile.go
|-reloadDatapath // pkt/datapath/loader/loader.go
|-replaceDatapath // pkt/datapath/loader/netlink.go
- TL;DR
- 0 High level overview
- 1 Create link device
- 2 Allocate IP address
- 3 Configure pod network
- 4 Create endpoint
- 5 Create CiliumEndpoint (CEP)
- 6 Retrieve or allocate identity
- 7 Calculate policy
- 8 Upsert IP information to kvstore
- 9 Re-generate BPF code
- 10 Conclusion
When creating a Pod in a k8s cluster, kubelet
will call CNI plugin to
create network for this Pod. The specific steps each CNI plugin does
vary a lot, and in this post we will dig into the Cilium’s one.
0 High level overview
Fig 0. Left most: what happens during Cilium CNI create-network (this picture is borrowed from my potential posts in the future, so there are some stuffs not much related to this post, just ignore them)
As an high-level overview, Cilium CNI plugin performs following steps:
- Create link device (e.g. veth pair, IPVLAN device)
- Allocate IP
- Configure Pod network, e.g. IP address, route table, sysctl parameters
- Create
Endpoint
(node local) via Cilium agent API - Create
CiliumEndpoint
(CEP, k8s CRD) via k8s apiserver - Retrieve or allocate identity for this endpoint via kvstore
- Calculate network policy
- Save IP info (e.g.
IP -> identity
mapping) to kvstore - Generate, compile and inject BPF code into kernel
0.1 Source code tree
api/
- cilium REST API entrypoints-
daemon/cmd/
- cilium daemon (agent) implementation, including:- IPAM API handler implementation
- endpoint API handler
- others
plugin/
- plugin implementations for CNI, docker, etcpkg/
- cilium core functionalities implementation
0.2 Add Network Skeleton Code
When kubelet calls a plugin to add network for a Pod, cmdAdd()
method will be invoked. The skeleton code is as follows:
// plugins/cilium-cni/cilium-cni.go
func cmdAdd(args *skel.CmdArgs) (err error) {
n = types.LoadNetConf(args.StdinData)
cniTypes.LoadArgs(args.Args, &cniArgs)
netns.RemoveIfFromNetNSIfExists(netNs, args.IfName)
ep := &models.EndpointChangeRequest{
ContainerID: args.ContainerID,
Labels: addLabels,
State: models.EndpointStateWaitingForIdentity,
K8sPodName: string(cniArgs.K8S_POD_NAME),
K8sNamespace: string(cniArgs.K8S_POD_NAMESPACE),
}
switch conf.DatapathMode {
case DatapathModeVeth:
veth, peer, tmpIfName = connector.SetupVeth(ep.ContainerID, DeviceMTU, ep)
netlink.LinkSetNsFd(*peer, int(netNs.Fd()))
connector.SetupVethRemoteNs(netNs, tmpIfName, args.IfName)
case DatapathModeIpvlan: ...
}
podName := cniArgs.K8S_POD_NAMESPACE + "/" + cniArgs.K8S_POD_NAME
ipam = c.IPAMAllocate("", podName, true) // c: cilium client
if ipv4IsEnabled(ipam) {
ep.Addressing.IPV4 = ipam.Address.IPV4
ipConfig, routes = prepareIP(ep.Addressing.IPV4, ...)
res.IPs = append(res.IPs, ipConfig)
res.Routes = append(res.Routes, routes...)
}
macAddrStr = configureIface(ipam, args.IfName, &state)
c.EndpointCreate(ep)
return cniTypes.PrintResult(res, n.CNIVersion)
}
Let’s walk through this process in detail.
1 Create link device
1.1 Parse CNI parameters
On requesting for adding network for a Pod, kubelet will pass a
CmdArgs
type variable to CNI plugin, which is defined in
github.com/containernetworking/cni/pkg/skel
:
// CmdArgs captures all the arguments passed in to the plugin via both env vars and stdin
type CmdArgs struct {
ContainerID string // container ID
Netns string // container netns
IfName string // desired interface name for container, e.g. `eth0`
Args string // Platform-specific parameters of the container, e.g.
// `K8S_POD_NAMESPACE=xx;K8S_POD_NAME=xx;K8S_POD_INFRA_CONTAINER_ID=xx`
Path string // Path for locating CNI plugin binary, e.g. `/opt/cni/bin`
StdinData []byte // Network configurations
}
The StdinData
field will be unmarshalled into a netConf
variable:
type netConf struct {
cniTypes.NetConf
MTU int `json:"mtu"`
Args Args `json:"args"`
}
where cniTypes.NetConf
is defined as,
// github.com/containernetworking/cni/pkg/types/types.go
// NetConf describes a network.
type NetConf struct {
CNIVersion string `json:"cniVersion,omitempty"`
Name string `json:"name,omitempty"`
Type string `json:"type,omitempty"`
Capabilities map[string]bool `json:"capabilities,omitempty"`
IPAM IPAM `json:"ipam,omitempty"`
DNS DNS `json:"dns"`
RawPrevResult map[string]interface{} `json:"prevResult,omitempty"`
PrevResult Result `json:"-"`
}
After parsing these network configurations, the plugin will create a network device (link device) for this Pod.
Cilium supports two kinds of datapath types (and thus link device types) at the time of writing this post:
- veth pair
- IPVLAN
We will take veth pair as example in the following.
1.2 Create veth pair
The virtual devices that used for connecting a container to the host in Cilium are called a “connector”.
CNI plugin first calls connector.SetupVeth()
to create a veth pair, taking
container ID, MTU and endpoint info as parameters.
veth, peer, tmpIfName := connector.SetupVeth(ep.ContainerID, DeviceMTU, ep)
Naming rule for the veth pair devies:
- host side:
lxc
+ first N digits ofsha256(containerID)
, e.g.lxc12c45
- peer side (container side):
tmp
+ first N digits ofsha256(containerID)
, e.g.tmp12c45
Additional steps in connector
(pkg/endpoint/connector/veth.go
):
- Set sysctl parameters:
/proc/sys/net/ipv4/conf/<veth>/rp_filter = 0
- Set MTU
- Fill endpoint info: MAC (container side), host side MAC, interface name, interface index
1.3 Move peer to container netns
In the next, CNI plugin puts the peer end of the veth pair to container by setting the peer’s network namespace as container’s netns:
netlink.LinkSetNsFd(*peer, int(netNs.Fd()))
As a consequence, the peer device will “disapper” from host, namely, it will not
be listed when executing ifconfig
or ip link
commands on the host.
You must specify the netns in order to see it: ip netns exec <netns> ip link
.
1.4 Rename peer
Next, CNI plugin renames the peer to the given name as specified in CNI arguments:
connector.SetupVethRemoteNs(netNs, tmpIfName, args.IfName)
This will, for example, rename tmp53057
to eth0
inside container.
And this is just how the eth0
device comes up in each container.
2 Allocate IP address
In the next, the plugin will try to allocate IP addresses (IPv4 & IPv6) from the IPAM, and the latter is embedded in the local cilium agent.
Cilium agent is a daemon process running on each host, it includes many services inside itself, for example, local IPAM, endpoint manager, etc. These services expose REST APIs.
The IP allocation process is far more complicated than it looks like. The code is just one line:
ipam := c.IPAMAllocate("")
But the call stack will jump among different places:
- Plugin -
plugin/cilium-cni/
- Cilium client -
pkg/client/
- Clium REST API -
api/v1/server/restapi/ipam/
- Cilium API server -
api/v1/server/restapi/ipam
- Real HTTP handler -
daemon/cmd/ipam.go
- Cilium IPAM implementation (actually is only a wrapper) -
pkg/ipam/
- Final IPAM implementation (k8s builtin) -
k8s.io/kubernetes/pkg/registry/core/service/ipallocator
Let’s walk them through step by step.
2.1 Allocate IP address for given Pod
Start from plugin/cilium-cni/cilium-cni.go
:
podName := cniArgs.K8S_POD_NAMESPACE + "/" + cniArgs.K8S_POD_NAME
ipam := c.IPAMAllocate("", podName) // c: cilium client.
2.2 Cilium client: IPAMAllocate()
IPAMAllocate()
takes two parameters: address family
and owner
; if address
family is empty, both an IPv4 and an IPv6 address will be allocated. Each of
them will be saved at the following fields:
ipam.Address.IPV4
ipam.Address.IPV6
The implementation:
// pkt/client/ipam.go
// IPAMAllocate allocates an IP address out of address family specific pool.
func (c *Client) IPAMAllocate(family, owner string) (*models.IPAMResponse, error) {
params := ipam.NewPostIPAMParams().WithTimeout(api.ClientTimeout)
if family != ""
params.SetFamily(&family)
if owner != ""
params.SetOwner(&owner)
resp, err := c.IPAM.PostIPAM(params)
return resp.Payload, nil
}
where, the client structure is defined in pkt/client/client.go
:
type Client struct {
clientapi.Cilium
}
the client API clientapi.Cilium
is further defined in
api/v1/client/cilium_client.go
:
// clientapi
type Cilium struct {
Daemon *daemon.Client
Endpoint *endpoint.Client
IPAM *ipam.Client // implemented in "api/v1/client/ipam"
Metrics *metrics.Client
Policy *policy.Client
Prefilter *prefilter.Client
Service *service.Client
Transport runtime.ClientTransport
}
OK, now we need to move to c.IPAM.PostIPAM(params)
.
2.3 Call REST API: allocate IP
The cilium API code is auto-generated with golang OpenAPI tools.
api/v1/client/ipam/i_p_a_m_client.go
:
func (a *Client) PostIPAM(params *PostIPAMParams) (*PostIPAMCreated, error) {
result := a.transport.Submit(&runtime.ClientOperation{
ID: "PostIPAM",
Method: "POST",
PathPattern: "/ipam",
Params: params,
})
return result.(*PostIPAMCreated), nil
}
This will POST
the request to /ipam
route on the server side.
2.4 IPAM API server
HTTP request receiving side, api/v1/server/restapi/ipam/post_ip_a_m.go
:
func (o *PostIPAM) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
route, rCtx, _ := o.Context.RouteInfo(r)
var Params = NewPostIPAMParams()
if err := o.Context.BindValidRequest(r, route, &Params); err != nil { // bind params
o.Context.Respond(rw, r, route.Produces, route, err)
return
}
res := o.Handler.Handle(Params) // actually handle the request
o.Context.Respond(rw, r, route.Produces, route, res)
}
The real processing is done in o.Handler.Handle()
method. This method is implemented in the daemon code.
2.5 IPAM HTTP handler
We will just see the IPv4 part in the below.
// daemon/cmd/ipam.go
// Handle incoming requests address allocation requests for the daemon.
func (h *postIPAM) Handle(params ipamapi.PostIPAMParams) middleware.Responder {
resp := &models.IPAMResponse{
HostAddressing: node.GetNodeAddressing(),
Address: &models.AddressPair{},
}
ipv4, ipv6 := h.daemon.ipam.AllocateNext(params.Family)
if ipv4 != nil
resp.Address.IPV4 = ipv4.String()
return ipamapi.NewPostIPAMCreated().WithPayload(resp)
}
where h.daemon.ipam
is actually a CIDR, initialized in
pkg/ipam/ipam.go
:
import "k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
// NewIPAM returns a new IP address manager
func NewIPAM(nodeAddressing datapath.NodeAddressing, c Configuration) *IPAM {
ipam := &IPAM{
nodeAddressing: nodeAddressing,
config: c,
}
if c.EnableIPv4
ipam.IPv4Allocator = ipallocator.NewCIDRRange(nodeAddressing.IPv4().AllocationCIDR().IPNet)
return ipam
}
As we can see, it allocates IP addresses from the specified CIDR.
One important thing needs to be noted here: this is an in-memory IPAM - namyly, all its states (e.g. allocated IPs, avaialbe IPs, reserved IPs) are stored in memory, thus will not survive service restart.
If IPAM states are stored in memory, then how does Cilium agent restore states on restart?
The answer is: cilium agent records each allocated IP addresses in local files, to be more specific, each endpoint’s BPF header file. We will see this later.
Next, let’s move on, go to h.daemon.ipam.AllocateNext(params.Family)
.
2.6 IPAM implementation in pkg/ipam
// pkg/ipam/allocator.go
// AllocateNext allocates the next available IPv4 and IPv6 address out of the configured address pool.
func (ipam *IPAM) AllocateNext(family string) (ipv4 net.IP, ipv6 net.IP, err error) {
ipv4 = ipam.AllocateNextFamily(IPv4)
}
// AllocateNextFamily allocates the next IP of the requested address family
func (ipam *IPAM) AllocateNextFamily(family Family) (ip net.IP, err error) {
switch family {
case IPv4:
ip = allocateNextFamily(family, ipam.IPv4Allocator)
default:
fmt.Errorf("unknown address \"%s\" family requested", family)
}
}
func allocateNextFamily(family Family, allocator *ipallocator.Range) (ip net.IP, err error) {
ip = allocator.AllocateNext()
}
After several layers of function calls, it eventually comes to
allocator.AllocateNext()
, where AllocateNext()
is an interface defined in
k8s.io/kubernetes/pkg/registry/core/service/ipallocator
.
2.7 Real allocation logic in K8s builtin IPAM
Now, IP allocation comes to K8S code. Reserve IP from pool,
// k8s.io/kubernetes/pkg/registry/core/service/ipallocator
func (r *Range) AllocateNext() (net.IP, error) {
offset, ok := r.alloc.AllocateNext()
return addIPOffset(r.base, offset), nil
}
where r.alloc
is an interface:
// Interface manages the allocation of IP addresses out of a range. Interface should be threadsafe.
type Interface interface {
Allocate(net.IP) error
AllocateNext() (net.IP, error)
Release(net.IP) error
ForEach(func(net.IP))
}
r.alloc.AllocateNext()
returns the offset of next available IP from the first
IP (r.base
) in this CIDR, and addIPOffset()
converts the offset to a
net.IP
format presentation.
moving on, let’s see the allocation details,
// k8s.io/kubernetes/pkg/registry/core/service/allocator/bimap.go
// AllocateNext reserves one of the items from the pool.
// (0, false, nil) may be returned if there are no items left.
func (r *AllocationBitmap) AllocateNext() (int, bool, error) {
next, ok := r.strategy.AllocateBit(r.allocated, r.max, r.count)
r.count++
r.allocated = r.allocated.SetBit(r.allocated, next, 1)
return next, true, nil
}
func (rss randomScanStrategy) AllocateBit(allocated *big.Int, max, count int) (int, bool) {
if count >= max {
return 0, false
}
offset := rss.rand.Intn(max)
for i := 0; i < max; i++ {
at := (offset + i) % max
if allocated.Bit(at) == 0 {
return at, true
}
}
return 0, false
}
func (contiguousScanStrategy) AllocateBit(allocated *big.Int, max, count int) (int, bool) {
if count >= max {
return 0, false
}
for i := 0; i < max; i++ {
if allocated.Bit(i) == 0 {
return i, true
}
}
return 0, false
}
As seen above, the IPAM is really tiny. It maintains a bitmap for IP allocation, when an IP is allocated out from the pool, the corresponding bit will be set to 1; when IP is returned to the pool, the bit is set to 0. In this way, it could effectively manage the IP pool.
The IPAM even supports different allocation strategies, as the code snippets shown, e.g. sequential or random.
3 Configure pod network
Now, IP addresses have been ready. Next steps are:
- Calulate routes, gateway
- Configure ip, routes, gateway, sysctl, etc
if ipv4IsEnabled(ipam) {
ep.Addressing.IPV4 = ipam.Address.IPV4
ipConfig, routes := prepareIP(ep.Addressing.IPV4, false, &state, int(conf.RouteMTU))
res.IPs = append(res.IPs, ipConfig)
res.Routes = append(res.Routes, routes...)
}
netNs.Do(func(_ ns.NetNS) error {
allInterfacesPath := filepath.Join("/proc", "sys", "net", "ipv6", "conf", "all", "disable_ipv6")
connector.WriteSysConfig(allInterfacesPath, "0\n")
macAddrStr = configureIface(ipam, args.IfName, &state)
return err
})
3.1 Prepare IP addresses, routes, gateway
First, call prepareIP()
function to prepare the IP Addresses, gateways, route
entries, etc. The function will return a pointer to an IPConfig
, which holds
the IP and gateway information, and a Route
entry list.
// plugins/cilium-cni/cilium-cni.go
func prepareIP(ipAddr string, isIPv6 bool, state *CmdState, mtu int) (*cniTypesVer.IPConfig, []*cniTypes.Route) {
if isIPv6 {
...
} else {
state.IP4 = addressing.NewCiliumIPv4(ipAddr)
state.IP4routes = connector.IPv4Routes(state.HostAddr, mtu)
routes = state.IP4routes
ip = state.IP4
gw = connector.IPv4Gateway(state.HostAddr)
ipVersion = "4"
}
rt := []*cniTypes.Route{}
for _, r := range routes {
rt = append(rt, newCNIRoute(r))
}
gwIP := net.ParseIP(gw)
return &cniTypesVer.IPConfig{
Address: *ip.EndpointPrefix(),
Gateway: gwIP,
Version: ipVersion,
}, rt, nil
}
Take IPv4 as exaple. Following functions are called:
NewCiliumIPv4
IPv4Routes
IPv4Gateway
CiliumIP
NewCiliumIPv4()
will return a CiliumIP
instance, common/addressing/ip.go
:
type CiliumIP interface {
IPNet(ones int) *net.IPNet
EndpointPrefix() *net.IPNet
IP() net.IP
String() string
IsIPv6() bool
GetFamilyString() string
IsSet() bool
}
Routes
IPv4Routes
returns IPv4 routes to be installed in endpoint’s networking
namespace.
pkg/endpoint/connector/ipam.go
:
func IPv4Routes(addr *models.NodeAddressing, linkMTU int) ([]route.Route, error) {
ip := net.ParseIP(addr.IPV4.IP)
return []route.Route{
{
Prefix: net.IPNet{
IP: ip,
Mask: defaults.ContainerIPv4Mask,
},
},
{
Prefix: defaults.IPv4DefaultRoute,
Nexthop: &ip,
MTU: linkMTU,
},
}, nil
}
Gateway
note that, the gateway is just set to the host’s IP (TODO: more info on this):
// IPv4Gateway returns the IPv4 gateway address for endpoints.
func IPv4Gateway(addr *models.NodeAddressing) string {
// The host's IP is the gateway address
return addr.IPV4.IP
}
3.2 Configure interface
After network information has been prepared well, next step is to
configure them to the container. This is achieved by calling configureIface
:
func configureIface(ipam *models.IPAMResponse, ifName string, state *CmdState) (string, error) {
l := netlink.LinkByName(ifName)
addIPConfigToLink(state.IP4, state.IP4routes, l, ifName)
netlink.LinkSetUp(l)
}
configureIface
first finds the link device by device name (eth0
inside
container), then calls addIPConfigToLink
to do the real jobs:
func addIPConfigToLink(...) error {
addr := &netlink.Addr{IPNet: ip.EndpointPrefix()}
netlink.AddrAdd(link, addr)
sort.Sort(route.ByMask(routes))
for _, r := range routes {
rt := &netlink.Route{
LinkIndex: link.Attrs().Index,
Scope: netlink.SCOPE_UNIVERSE,
Dst: &r.Prefix,
MTU: r.MTU,
}
if r.Nexthop == nil {
rt.Scope = netlink.SCOPE_LINK
} else {
rt.Gw = *r.Nexthop
}
netlink.RouteAdd(rt)
}
}
It:
- first call
netlink.AddrAdd
to add IP address to the device - then install the route entries with
netlink.RouteAdd
4 Create endpoint
What is an Endpoint? An endpoint is a “namespaced network interface to which cilium applies policies”. In its simplist way, each normal Pod corresponds to a Cilium Endpoint.
Endpoint is a node-local concept, that is, Endpoint IDs on each node are overlapping.
Endpoint information actually also stores in local files (again, the BPF header files), so cilium agent could restore them on restart.
// api/v1/models/endpoint.go
type Endpoint struct {
// The cilium-agent-local ID of the endpoint
ID int64 `json:"id,omitempty"`
// The desired configuration state of the endpoint
Spec *EndpointConfigurationSpec `json:"spec,omitempty"`
// The desired and realized configuration state of the endpoint
Status *EndpointStatus `json:"status,omitempty"`
}
For more detailed information refer to api/v1/models/endpoint*.go
, there are
couples of source files.
4.1 CNI: create Endpoint
Start from CNI code, plugins/cilium-cni/cilium-cni.go
:
ep.SyncBuildEndpoint = true
c.EndpointCreate(ep)
It calls the client’s EndpointCreate()
method.
4.2 Cilium client: create Endpoint
// pkg/client/endpoint.go
func (c *Client) EndpointCreate(ep *models.EndpointChangeRequest) error {
id := pkgEndpointID.NewCiliumID(ep.ID)
params := endpoint.NewPutEndpointIDParams().WithID(id).WithEndpoint(ep).WithTimeout(api.ClientTimeout)
c.Endpoint.PutEndpointID(params)
return Hint(err)
}
It first call NewCiliumID()
to generate the local endpoint identifier,
// pkg/endpoint/id/id.go
const (
// CiliumLocalIdPrefix is a numeric identifier with local scope. It has
// no cluster wide meaning and is only unique in the scope of a single
// agent. An endpoint is guaranteed to always have a local scope identifier.
CiliumLocalIdPrefix PrefixType = "cilium-local"
)
// NewCiliumID returns a new endpoint identifier of type CiliumLocalIdPrefix
func NewCiliumID(id int64) string {
return fmt.Sprintf("%s:%d", CiliumLocalIdPrefix, id)
}
as the comments say, this ID is host-local, which means it is unique within the host.
Then, the client code arranges request data, and PUT
that data to Cilium REST
API with PutEndpointID(params)
,
// api/client/endpoint/endpoint_client.go
func (a *Client) PutEndpointID(params *PutEndpointIDParams) (*PutEndpointIDCreated, error) {
result := a.transport.Submit(&runtime.ClientOperation{
ID: "PutEndpointID",
Method: "PUT",
PathPattern: "/endpoint/{id}",
Params: params,
})
return result.(*PutEndpointIDCreated), nil
}
4.3 Cilium HTTP server: create Endpoint
The API server is embedded in the daemon process. HTTP requests entrypoint,
// api/server/restapi/endpint/put_endpoint_id.go
type PutEndpointID struct {
Context *middleware.Context
Handler PutEndpointIDHandler
}
func (o *PutEndpointID) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
route, rCtx, _ := o.Context.RouteInfo(r)
var Params = NewPutEndpointIDParams()
if err := o.Context.BindValidRequest(r, route, &Params); err != nil { // bind params
o.Context.Respond(rw, r, route.Produces, route, err)
return
}
res := o.Handler.Handle(Params) // actually handle the request
o.Context.Respond(rw, r, route.Produces, route, res)
}
The real job will be done in o.Handler.Handle()
.
4.4 HTTP Handler: create endpoint
The HTTP handler for PutEndpointID
, daemon/endpoint.go
:
type putEndpointID struct {
d *Daemon
}
func NewPutEndpointIDHandler(d *Daemon) PutEndpointIDHandler {
return &putEndpointID{d: d}
}
func (h *putEndpointID) Handle(params PutEndpointIDParams) middleware.Responder {
epTemplate := params.Endpoint
h.d.createEndpoint(params.HTTPRequest.Context(), epTemplate)
}
as we could see, the handler calls daemon’s createEndpoint
method, which is
defined in the same file. This method attempts to create the endpoint
corresponding to the change request that was specified.
func (d *Daemon) createEndpoint(ctx, epTemplate) (*endpoint.Endpoint, int) {
ep, err := endpoint.NewEndpointFromChangeModel(epTemplate)
if oldEp := endpointmanager.LookupCiliumID(ep.ID); oldEp != nil
return fmt.Errorf("endpoint ID %d already exists", ep.ID)
if oldEp = endpointmanager.LookupContainerID(ep.ContainerID); oldEp != nil
return fmt.Errorf("endpoint for container %s already exists", ep.ContainerID)
addLabels := labels.NewLabelsFromModel(epTemplate.Labels)
infoLabels := []string{}
identityLabels, info := fetchK8sLabels(ep)
addLabels.MergeLabels(identityLabels)
infoLabels.MergeLabels(info)
endpointmanager.AddEndpoint(d, ep, "Create endpoint from API PUT")
ep.UpdateLabels(ctx, d, addLabels, infoLabels, true)
// Now that we have ep.ID we can pin the map from this point. This
// also has to happen before the first build took place.
ep.PinDatapathMap()
if build {
ep.Regenerate(d, &endpoint.ExternalRegenerationMetadata{
Reason: "Initial build on endpoint creation",
ParentContext: ctx,
})
}
}
endpointmanager.AddEndpoint()
will further call ep.Expose()
to notifier, and the latter
will start a controller for this endpoint, for synchronizing this
Endpoint
’s info to apiserver as corresponding CiliumEndpoint
(CEP). We will
see this in section 5.
It then calls ep.UpdateLabels()
, this may:
- try to get the identity of this endpint: e.g. when scaling up existing statefulsets, the identity already exists before the Pod creating.
- allocate identity for this endpoint: e.g. when create a new statefulset.
We will see this in section 6.
In the last, it triggers BPF code re-generation, by calling
ep.Regenerate()
with reason “Initial build on endpoint creation”.
On successful, the revision number will be positive. We will see this in
section 9.
5 Create CiliumEndpoint (CEP)
// pkg/endpoint/manager.go
// Expose exposes the endpoint to the endpointmanager. After this function
// is called, the endpoint may be accessed by any lookup in the endpointmanager.
func (e *Endpoint) Expose(mgr endpointManager) error {
newID := mgr.AllocateID(e.ID)
e.ID = newID
e.startRegenerationFailureHandler()
// Now that the endpoint has its ID, it can be created with a name based on
// its ID, and its eventqueue can be safely started. Ensure that it is only
// started once it is exposed to the endpointmanager so that it will be
// stopped when the endpoint is removed from the endpointmanager.
e.eventQueue = eventqueue.NewEventQueueBuffered(fmt.Sprintf("endpoint-%d", e.ID))
e.eventQueue.Run()
// No need to check liveness as an endpoint can only be deleted via the
// API after it has been inserted into the manager.
mgr.UpdateIDReference(e)
e.updateReferences(mgr)
mgr.RunK8sCiliumEndpointSync(e, option.Config)
return nil
}
The comments illustrates themselves:
// ipkg/k8s/watchers/endpointsynchronizer.go
// RunK8sCiliumEndpointSync starts a controller that synchronizes the endpoint
// to the corresponding k8s CiliumEndpoint CRD. It is expected that each CEP
// has 1 controller that updates it, and a local copy is retained and only
// updates are pushed up.
func (epSync *EndpointSynchronizer) RunK8sCiliumEndpointSync(e *endpoint.Endpoint, conf) {
var (
endpointID = e.ID
controllerName = fmt.Sprintf("sync-to-k8s-ciliumendpoint (%v)", endpointID)
)
ciliumClient := k8s.CiliumClient().CiliumV2()
e.UpdateController(controllerName,
controller.ControllerParams{
RunInterval: 10 * time.Second,
DoFunc: func(ctx context.Context) (err error) {
podName := e.GetK8sPodName()
namespace := e.GetK8sNamespace()
switch {
default:
scopedLog.Debug("Updating CEP from local copy")
switch {
case capabilities.UpdateStatus:
localCEP = ciliumClient.CiliumEndpoints(namespace).UpdateStatus()
default:
localCEP = ciliumClient.CiliumEndpoints(namespace).Update()
}
}
},
})
}
6 Retrieve or allocate identity
“Identity” is a cluster-scope concept (as comparison, “Endpoint” is a node-scope concept), that means, it is unique within the entire Kubernetes cluster.
So to ensure identities are unique within the entire cluster, they are allocated by a central component in the cluster - yes, the kvstore (cilium-etcd).
Starts from ep.UpdateLabels()
:
// pkg/endpoint/endpoint.go
// UpdateLabels is called to update the labels of an endpoint. Calls to this
// function do not necessarily mean that the labels actually changed. The
// container runtime layer will periodically synchronize labels.
//
// If a net label changed was performed, the endpoint will receive a new
// security identity and will be regenerated. Both of these operations will
// run first synchronously if 'blocking' is true, and then in the background.
//
// Returns 'true' if endpoint regeneration was triggered.
func (e *Endpoint) UpdateLabels(ctx, identityLabels, infoLabels, blocking bool) (regenTriggered bool) {
e.replaceInformationLabels(infoLabels)
// replace identity labels and update the identity if labels have changed
rev := e.replaceIdentityLabels(identityLabels)
if rev != 0 {
return e.runIdentityResolver(ctx, rev, blocking)
}
return false
}
and the subsequent calling stack:
|-ep.UpdateLabels // pkg/endpoint/endpoint.go
| |-replaceInformationLabels // pkg/endpoint/endpoint.go
| |-ReplaceIdentityLabels // pkg/endpoint/endpoint.go
| |-RunIdentityResolver // pkg/endpoint/endpoint.go
| |-identityLabelsChanged // pkg/endpoint/endpoint.go
| |-AllocateIdentity // kvstore: reuse existing or create new one
| |-forcePolicyComputation
| |-SetIdentity
| |-runIPIdentitySync // pkg/endpoint/policy.go
| |-UpsertIPToKVStore // pkg/ipcache/kvstore.go
After identity is determined for this endpoint, cilium agent will do two important things:
First, re-calculate network policy, as identity is the eventual security ID. We will see this in section 7.
Second, insert the IP -> identity
mapping into kvstore by calling
UpsertIPToKVStore()
. This is vital for the cilium network policy
framework. We will see this in section 8.
7 Calculate policy
After identity is determined, forcePolicyComputation()
will be called to
calculate network policy for this endpoint, e.g. which service could access to
which port of this Endpoint.
8 Upsert IP information to kvstore
Re-depict Fig 0 here so you could understand why this step is vital to the entire cilium network policy framework:
Fig 0. Left most: what happens during Cilium CNI create-network (this picture is borrowed from my potential posts in the future, so there are some stuffs not much related to this post, just ignore them)
As an example, when a packet sent out from this Endpoint (Pod) reaches a Pod on another node, they will determine whether to allow this traffic by the packet’s identity. How does cilium determine identity for this packet? For direct routing case, it will
- Listen to
IP->Identity
mappings in kvstore (cilium/state/ip/v1
), save to a local cache (ipcache
). - Extract
src_ip
from packet, lookupidentity
info in local cache withsrc_ip
as hash key.
9 Re-generate BPF code
Typical workflow [3]:
- Generate eBPF source code (in a subset of C)
- Compile to ELF file with LLVM, which contains program code, specification for maps and related relocation data
- Parse ELF content and load the program into the kernel with tools like
tc
(traffic control)
In eBPF, maps are efficient key/value stores in the kernel that can be shared between various eBPF programs, but also between user space.
9.1 Generate BPF
Now let’s continue the BPF code regeneration path.
// pkg/endpoint/policy.go
// Regenerate forces the regeneration of endpoint programs & policy
func (e *Endpoint) Regenerate(owner Owner, regenMetadata *ExternalRegenerationMetadata) <-chan bool {
done := make(chan bool, 1)
go func() {
defer func() {
done <- buildSuccess
close(done)
}()
doneFunc := owner.QueueEndpointBuild(uint64(e.ID))
if doneFunc != nil {
regenContext.DoneFunc = doneFunc
err := e.regenerate(owner, regenContext)
doneFunc() // in case not called already
// notify monitor about endpoint regeneration result
} else {
buildSuccess = false
}
}()
return done
}
It calls enpoint’s regenerate
method, which is defined in the same file:
func (e *Endpoint) regenerate(owner Owner, context *regenerationContext) (retErr error) {
origDir := e.StateDirectoryPath()
context.datapathRegenerationContext.currentDir = origDir
// This is the temporary directory to store the generated headers,
// the original existing directory is not overwritten until the
// entire generation process has succeeded.
tmpDir := e.NextDirectoryPath()
context.datapathRegenerationContext.nextDir = tmpDir
os.MkdirAll(tmpDir, 0777)
revision, compilationExecuted = e.regenerateBPF(owner, context)
return e.updateRealizedState(stats, origDir, revision, compilationExecuted)
}
As BPF code regeneration is a series of file based operations, regenerate
will
first prepare working directories for the process, then, call endpoint’s
regenerateBPF
method.
regenerateBPF
rewrites all headers and updates all BPF maps to reflect the
specified endpoint.
pkg/endpoint/bpf.go
:
func (e *Endpoint) regenerateBPF(owner Owner, regenContext *regenerationContext) (revnum uint64, compiled bool, reterr error) {
e.runPreCompilationSteps(owner, regenContext)
if option.Config.DryMode { // No need to compile BPF in dry mode.
return e.nextPolicyRevision, false, nil
}
// Wait for connection tracking cleaning to complete
<-datapathRegenCtxt.ctCleaned
compilationExecuted = e.realizeBPFState(regenContext)
// Hook the endpoint into the endpoint and endpoint to policy tables then expose it
eppolicymap.WriteEndpoint(datapathRegenCtxt.epInfoCache.keys, e.PolicyMap.Fd)
lxcmap.WriteEndpoint(datapathRegenCtxt.epInfoCache)
// Signal that BPF program has been generated.
// The endpoint has at least L3/L4 connectivity at this point.
e.CloseBPFProgramChannel()
// Allow another builder to start while we wait for the proxy
if regenContext.DoneFunc != nil {
regenContext.DoneFunc()
}
e.ctCleaned = true
// Synchronously try to update PolicyMap for this endpoint.
//
// This must be done after allocating the new redirects, to update the
// policy map with the new proxy ports.
e.syncPolicyMap()
return datapathRegenCtxt.epInfoCache.revision, compilationExecuted, err
}
BPF source code (in restricted C) is generated in e.runPreCompilationSteps
,
and write to file through writeHeaderfile
in the end of the function:
// runPreCompilationSteps runs all of the regeneration steps that are necessary
// right before compiling the BPF for the given endpoint.
func (e *Endpoint) runPreCompilationSteps(owner Owner, regenContext *regenerationContext) (error) {
currentDir := datapathRegenCtxt.currentDir
nextDir := datapathRegenCtxt.nextDir
if e.PolicyMap == nil {
e.PolicyMap = policymap.OpenMap(e.PolicyMapPathLocked())
e.PolicyMap.Flush() // Clean up map contents
// Also reset the in-memory state of the realized state as the
// BPF map content is guaranteed to be empty right now.
e.realizedPolicy.PolicyMapState = make(policy.MapState)
}
if e.bpfConfigMap == nil {
e.bpfConfigMap = bpfconfig.OpenMapWithName(e.BPFConfigMapPath(), e.BPFConfigMapName())
e.realizedBPFConfig = &bpfconfig.EndpointConfig{}
}
// Only generate & populate policy map if a security identity is set up for
// this endpoint.
if e.SecurityIdentity != nil {
err = e.regeneratePolicy(owner)
// Configure the new network policy with the proxies.
e.updateNetworkPolicy(owner, datapathRegenCtxt.proxyWaitGroup)
}
// Generate header file specific to this endpoint for use in compiling
// BPF programs for this endpoint.
e.writeHeaderfile(nextDir, owner)
}
Continue the calling stack to e.realizeBPFState
:
func (e *Endpoint) realizeBPFState(regenContext *regenerationContext) (compilationExecuted bool, err error) {
if datapathRegenCtxt.bpfHeaderfilesChanged || datapathRegenCtxt.reloadDatapath {
// Compile and install BPF programs for this endpoint
if datapathRegenCtxt.bpfHeaderfilesChanged {
loader.CompileAndLoad(datapathRegenCtxt.completionCtx, datapathRegenCtxt.epInfoCache)
compilationExecuted = true
} else {
loader.ReloadDatapath(datapathRegenCtxt.completionCtx, datapathRegenCtxt.epInfoCache)
}
e.bpfHeaderfileHash = datapathRegenCtxt.bpfHeaderfilesHash
} else {
Debug("BPF header file unchanged, skipping BPF compilation and installation")
}
return compilationExecuted, nil
}
CompileAndLoad
compiles and reloads the datapath programs (BPF code).
ReloadDatapath
forces the datapath programs to be reloaded. It does
not guarantee recompilation of the programs.
9.2 Compile and link
Moving on, CompileAndLoad
function.
CompileAndLoad
compiles the BPF datapath programs for the specified endpoint
and loads it onto the interface associated with the endpoint.
pkt/datapath/loader/loader.go
:
// Expects the caller to have created the directory at the path ep.StateDir().
func CompileAndLoad(ctx context.Context, ep endpoint) error {
dirs := directoryInfo{
Library: option.Config.BpfDir,
Runtime: option.Config.StateDir,
State: ep.StateDir(),
Output: ep.StateDir(),
}
return compileAndLoad(ctx, ep, &dirs)
}
func compileAndLoad(ctx context.Context, ep endpoint, dirs *directoryInfo) error {
compileDatapath(ctx, ep, dirs, debug)
return reloadDatapath(ctx, ep, dirs)
}
// compileDatapath invokes the compiler and linker to create all state files for
// the BPF datapath, with the primary target being the BPF ELF binary.
//
// If debug is enabled, create also the following output files:
// * Preprocessed C
// * Assembly
// * Object compiled with debug symbols
func compileDatapath(ctx context.Context, ep endpoint, dirs *directoryInfo, debug bool) error {
// Compile the new program
compile(ctx, datapathProg, dirs, debug)
}
The real compiling and linking work is done in compile()
function, which calls
clang/llvm
to compile and link the C source code into BPF byte code, in
pkt/datapath/loader/compile.go
:
// compile and link a program.
func compile(ctx context.Context, prog *progInfo, dir *directoryInfo, debug bool) (err error) {
args := make([]string, 0, 16)
if prog.OutputType == outputSource {
args = append(args, "-E") // Preprocessor
} else {
args = append(args, "-emit-llvm")
if debug {
args = append(args, "-g")
}
}
args = append(args, standardCFlags...)
args = append(args, progCFlags(prog, dir)...)
// Compilation is split between two exec calls. First clang generates
// LLVM bitcode and then later llc compiles it to byte-code.
if prog.OutputType == outputSource {
compileCmd := exec.CommandContext(ctx, compiler, args...)
_ = compileCmd.CombinedOutput(log, debug)
} else {
switch prog.OutputType {
case outputObject:
compileAndLink(ctx, prog, dir, debug, args...)
case outputAssembly:
compileAndLink(ctx, prog, dir, false, args...)
default:
log.Fatalf("Unhandled progInfo.OutputType %s", prog.OutputType)
}
}
}
9.3 Reload datapath
At last, reload the BPF code.
Note that update of the datapath does not cause connections to be dropped [3].
pkt/datapath/loader/loader.go
:
func reloadDatapath(ctx context.Context, ep endpoint, dirs *directoryInfo) error {
// Replace the current program
objPath := path.Join(dirs.Output, endpointObj)
if ep.MustGraftDatapathMap() {
if err := graftDatapath(ctx, ep.MapPath(), objPath, symbolFromEndpoint); err != nil {
scopedLog := ep.Logger(Subsystem).WithFields(logrus.Fields{
logfields.Path: objPath,
})
scopedLog.WithError(err).Warn("JoinEP: Failed to load program")
return err
}
} else {
if err := replaceDatapath(ctx, ep.InterfaceName(), objPath, symbolFromEndpoint); err != nil {
scopedLog := ep.Logger(Subsystem).WithFields(logrus.Fields{
logfields.Path: objPath,
logfields.Veth: ep.InterfaceName(),
})
scopedLog.WithError(err).Warn("JoinEP: Failed to load program")
return err
}
}
return nil
}
Source and object files are stored at /var/run/cilium/state/<endpoint_id>
on
cilium agent. also see /var/lib/cilium
for more details.
Then, call replaceDatapath
to migrate (gracefully update) BPF maps (k/v store
in kernel) and replace tc
filter rules.
pkt/datapath/loader/netlink.go
:
// replaceDatapath the qdisc and BPF program for a endpoint
func replaceDatapath(ctx context.Context, ifName string, objPath string, progSec string) error {
err := replaceQdisc(ifName)
cmd := exec.CommandContext(ctx, "cilium-map-migrate", "-s", objPath)
cmd.CombinedOutput(log, true)
defer func() {
if err == nil {
retCode = "0"
} else {
retCode = "1"
}
args := []string{"-e", objPath, "-r", retCode}
cmd := exec.CommandContext(ctx, "cilium-map-migrate", args...)
_, _ = cmd.CombinedOutput(log, true) // ignore errors
}()
args := []string{"filter", "replace", "dev", ifName, "ingress",
"prio", "1", "handle", "1", "bpf", "da", "obj", objPath,
"sec", progSec,
}
cmd = exec.CommandContext(ctx, "tc", args...).WithFilters(libbpfFixupMsg)
_ = cmd.CombinedOutput(log, true)
}
it first calls replaceQdisc
, which is defined in the same file:
func replaceQdisc(ifName string) error {
link, err := netlink.LinkByName(ifName)
attrs := netlink.QdiscAttrs{
LinkIndex: link.Attrs().Index,
Handle: netlink.MakeHandle(0xffff, 0),
Parent: netlink.HANDLE_CLSACT,
}
qdisc := &netlink.GenericQdisc{
QdiscAttrs: attrs,
QdiscType: "clsact",
}
netlink.QdiscReplace(qdisc)
}
then runs following 3 shell commands in wrapped code:
$ cilium-map-migrate -s <objPath>
$ tc filter replace dev <ifName> ingress prio 1 handle 1 bpf da obj <objPath> sec <progSec>
$ cilium-map-migrate -e <objPath> -r <retCode>
cilium-map-migrate
tool is implemented in bpf/cilium-map-migrate.c
.
This tool has no -h
or --help
options, you need to check the source code
for the (few) options provided.
- Drop monitor for policy learning
- Packet tracing infrastructure
bpf_trace_printk()
replacement
Refer to [2, 3] for more on this.
After above steps are done, we will see something like this on the host:
$ tc qdisc | grep <ifName>
qdisc noqueue 0: dev <ifName> root refcnt 2
qdisc clsact ffff: dev <ifName> parent ffff:fff1
# or
$ tc qdisc show dev <ifName> ingress
qdisc noqueue 0: root refcnt 2
qdisc clsact ffff: parent ffff:fff1
10 Conclusion
OK! This is what happens when calling Cilium CNI plugin to add network for a container. For the limited space, we only covered some of the most important steps and their corresponding implementations. Hope we could dive into more of them in future posts.
At the end, thanks to the Cilium team for making it so cool!
References
- Cilium source code, https://github.com/cilium/cilium
- Advanced programmability and recent updates with tc’s cls bpf, 2016
- Cilium: Networking and security for containers with BPF and XDP, Google Blog, 2016