// main.go
1、func main()
- .....
- 首先调用sm, err := newSubnetManager()创建subnet manager
- ....
- 调用ctx, cancel := context.WithCancel(context.Background())
- 调用nm, err := network.NewNetworkManager(ctx, sm)
- 创建runFunc = func(ctx context.Context) {
nm.Run(ctx)
}
- 创建一个goroutine,其中调用runFunc(ctx)
// main.go
2、func newSubnetManager() (subnet.Manager, error)
- 当opts.kubeSubnetMgr为true时,调用return kube.NewSubnetManager()
- 否则,创建cfg := &etcdv2.EtcdConfig{...}
- 最后return etcdv2.NewLocalManager(cfg)
subnet.Manager数据结构如下所示:
type Manager interface { GetNetworkConfig(ctx context.Context, network string) (*Config, error) AcquireLease(ctx context.Context, network string, attrs *LeaseAttrs) (*Lease, error) RenewLease(ctx context.Context, network string, lease *Lease) error RevokeLease(ctx context.Context, network string, sn ip.IP4Net) error WatchLease(ctx context.Context, network string, sn ip.IP4Net, cursor interface{}) (LeaseWatchResult, error) WatchLeases(ctx context.Context, network string, cursor interface{}) (LeaseWatchResult, error) WatchNetworks(ctx context.Context, cursor interface{}) (NetworkWatchResult, error) AddReservation(ctx context.Context, network string, r *Reservation) error RemoveReservation(ctx context.Context, network string, subnet ip.IP4Net) error ListReservations(ctx context.Context, network string) ([]Reservation, error)}
// subnet/etcdv2/local_manager.go
func NewLocalManager(config *EtcdConfig) (Manager, error)
- 首先调用r, err := newEtcdSubnetRegistry(config, nil)
- 再调用return newLocalManager(r), nil --> newLocalManager的作用仅仅只是返回一个&LocalManager{register: r}
LocalManager的结构如下所示:
type LocalManager struct { registry Registry}
// subnet/etcdv2/registry.go
func newEtcdSubnetRegistry(config *EtcdConfig, cliNewFunc etcdNewFunc) (Registry, error)
- 创建r := &etcdSubnetRegistry{
etcdCfg: config,
networkRegex: regexp.MustCompile(config.Prefix + `/([^/]*)(/|/config)?$`),
}
- 当cliNewFunc不为空时,设置c.cliNewFunc为cliNewFunc,否则设置r.cliNewFunc为newEtcdClient
- 调用r.cli, err = r.cliNewFunc(config)
- 最后返回return r, nil
etcdSubnetRegistry结构如下所示:
type etcdSubnetRegistry struct { cliNewFunc etcdNewFunc mux sync.Mutex cli etcd.KeysAPI etcdCfg *EtcdConfig networkRegex *regexp.Regexp}
// subnet/etcdv2/registry.go
func newEtcdClien(c *EtcdConfig) (etcd.KeysAPI, error)
- 该函数根据EtcdConfig中的配置,和etcd集群建立连接,创建一个client对象,最后调用etcd.NewKeysAPI(cli)返回对于etcd集群操作的主要的API:包括Get,Set,Delete等等
// network/manager.go
func NewNetworkManager(ctx context.Context, sm subnet.Manager) (*Manager, error)
- 调用extIface, err := lookupExtIface(opts.iface),其中iface是用于宿主机直接通信的网卡
- 调用bm := backend.NewManager(ctx, sm, extIface)
- 创建manager := &Manager{
ctx: ctx,
sm: sm,
bm: bm,
allowedNetworks: make(map[string]bool),
networks: make(map[string]*Network),
watch: opts.watchNetworks,
ipMasq: opts.ipMasq,
extIface: extIface,
}
- 遍历opts.networks,将对应的manager.allowedNetworks[name]都置为true
- 最后,返回return manager, nil
// network/manager.go
func lookupExtIface(ifname string) (*backend.ExternalInterface, error)
- 若ifname不为空则:
- 先调用ifaceAddr = net.ParseIP(ifname),若ifaceAddr不为空,则调用iface, err = ip.GetInterfaceByIP(ifaceAddr),否则调用iface, err = net.InterfaceByName(ifname)
- 否则,调用iface, err = ip.GetDefaultGatewayIface()
- 若ifaceAddr为nil,则调用ifaceAddr, err = ip.GetIfaceIP4Addr(iface)
- 若用户指定了opts.publicIP则尝试将extAddr设置为它,否则默认将external address设置为ifaceAddr
- 最后返回&backend.ExternalInterface{
Iface: iface,
IfaceAddr: ifaceAddr,
ExtAddr: extAddr,
}, nil
// backend/backend.go
func NewManager(ctx context.Context, sm subnet.Manager, extIface *ExternalInterface) Manager
- 该函数仅仅只是用参数填充manager结构并返回
backend.manager数据结构如下:
type manager struct { ctx context.Context sm subnet.Manager extIface *ExternalInterface mux sync.Mutex active map[string]Backend wg sync.WaitGroup}
// network/manager.go
func (m *Manager) Run(ctx context.Context)
- 当m.isMultiNetwork()为false时,调用m.networks[""] = NewNetwork(ctx, m.sm, m.bm, "", m.ipMasq) --->仅仅只是用参数填充一个Network数据结构
- 运行所有现存的network,调用m.forEachNetwork(func(n *Network){...}),func中生成一个goroutine,调用m.runNetwork(n)
Network结构如下所示:
type Network struct { Name string Config *subnet.Config ctx context.Context cancelFunc context.CancelFunc sm subnet.Manager bm backend.Manager ipMasq bool bn backend.Network}
// network/manager.go
func (m *Manager) runNetwork(n *Network)
- 该函数先调用n.Run(m.extIface, func(bn backend.Network){}), 其中func在非MultiNetwork的情况下,依次调用writeSubnetFile(opts.subnetFile, n.Config.Network, m.ipMasq, bn )以及daemon.SdNotify(false, "READY=1")
- 调用m.delNetwork(n)
// network/network.go
func (n *Network) Run(extIface *backend.ExternalInterface, inited func(bn backend.Network))
- 该函数仅仅调用一个无限的for循环,调用n.runOnce(extIface, inited)
// network/network.go
func (n *Network) runOnce(extIface *backend.ExternalInterface, initd func(bn backend.Network)) error
- 首先调用n.retryInit(),再调用inited(n.bn) ---> retryInit每隔一秒调用一次n.init(),直到成功或者连接断开
- 启动一个goroutine,其中运行n.bn.Run(ctx)
- 设置evts := make(chan subnet.Event),再启动一个goroutine,运行subnet.WatchLease(ctx, n.sm, n.Name, n.bn.Lease().Subnet, evts)
- 最后,调用一个无限for循环,等待超时,或者subnet.Event
// network/network.go
func (n *Network) init() error
- 先调用n.Config, err = n.sm.GetNetworkConfig(n.ctx, n.Name)获取网络配置
- 调用be, err := n.bm.GetBackend(n.Config.BackendType)
- 调用n.bn, err = be.RegisterNetwork(n.ctx, n.Name, n.Config)
- 最后,若n.ipMasq为真,则调用setupIPMasq(n.Config.Network)
Config结构如下所示:
type Config struct { Network ip.IP4Net SubnetMin ip.IP4 SubnetMax ip.IP4 SubnetLen uint BackendType string Backend json.RawMessage}
// backend/manager.go
func (bm *manager) GetBackend(backendType string) (Backend, error)
- 调用betype := strings.ToLower(backendType)将backend类型转换为小写
- 调用be, ok := bm.active[betype]判断是否已经有该backend在运行,是则直接返回
- 若为第一次请求,则调用befunc, ok := backendCtors[betype]和be, err := befunc(bm.sm, bm.extIface)创建Backend interface,并添加到bm.active[]中
- 对于backend udp其实就是调用一个New函数,返回一个*UdpBackend类型
- 创建一个goroutine,调用be.Run(bm.ctx),之后再调用delete(bm.active, betype) ---> Run函数仅仅调用 <-ctx.Done()
Backend结构如下所示:
// Besides the entry points in the Backend interface, the backend's New()// function receives static network interface information (like internal and// external IP address, MTU, etc) which it should cache for later use if needed.//// To implement a singleton backend which manages multiple networks, the// New() function should create the singleton backend object once, and return// that object on further calls to New(). The backend is guaranteed that the arguments// passed via New() will not change across invocations. Also, since multiple RegisterNetwork()// and Run() calls may in-flight at any given time for a singleton backend, it must protect these// calls with a mutextype Backend interface { // Called first to start the necessary event loops and such Run(ctx context.Context) // Called when the backend should create or begin managing a new network RegisterNetwork(ctx context.Context, network string, config *subnet.Config) (Network, error)}
---------------------------------------------------- backend 为udp 的情况-------------------------------------------------------------
// backend/udp/udp.go
func (be *UdpBackend) RegisterNetwork(ctx context.Context, netname string, config *subnet.Config) (backend.Network, error)
- 首先调用attrs := subnet.LeaseAttrs{PublicIP: ...}和l, err := be.sm.AcquireLease(ctx, netname, &attrs)来acquire the lease from subnet manager
- 创建tunNet := ip.IP4Net{
IP: l.Subnet.IP,
PrefixLen: config.Network.PrefixLen,
} --> Tunnel's subnet is that of the whole overlay network (e.g, /16) and not that of the individual host (e.g. /24)
- 最后return newNetwork(netname, be.sm, be.extIface, cfg.Port, tunNet, l)
// backend/udp/network.go
func newNetwork(name string, sm subnet.Manager, extIface *backend.ExternalInterface, port int, nw ip.IP4Net, l *subnet.Lease) (*network)
- 创建n := &network{
SimpleNetwork: backend.SimpleNetwork{
SubnetLease: l,
ExtIface: extIface,
}
name: name,
port: port,
sm: sm,
},并且将n.tunNet设置为nw
- 调用n.initTun()
- 调用n.conn, err = net.ListenUDP("udp4", &net.UDPAddr{IP: extIface.IfaceAddr, Port: port})
- 调用n.ctl, n.ctl2, err = newCtlSockets()
// backend/udp/network.go
func (n *network) initTun() error
- 调用n.tun, tunName, err = ip.OpenTun("flannel%d") --->创建tun设备
- 调用err = configureIface(tunName, n.tunNet, n.MTU()) ---> 配置tun设备的地址,MTU,路由并启动
// pkg/ip/tun.go
func OpenTun(name string) (*os.File, string, error)
- 首先调用tun, err := os.OpenFile(tunDevice, os.O_RDWR, 0),其中tunDevice为/dev/net/tun
- 创建变量var ifr ifreqFlags,调用copy(ifr.IfrnName[:len(ifr.IfrnName) -1 ], []byte(name+"\000")),并设置ifr.IfruFlags = syscall.IFF_TUN | syscall.IFF_NO_PI
- 调用err = ioctl(int(tun.Fd()), syscall.TUNSETIFF, uintptr(unsafe.Pointer(&ifr)))
- 调用ifname := fromZeroTerm(ifr.IfrnName[:ifnameSize])
- 最后返回return tun, ifname, nil
// backend/udp/network.go
func configureIface(ifname string, ipn ip.IP4Net, mtu int) error
- 首先调用iface, err := netlink.LinkByName(ifname)找到相应的设备
- 再调用err = netlink.AddrAdd(iface, &netlink.Addr{IPNet: ipn.ToIPNet(), Label: ""})
- 依次调用netlink.LinkSetMTU(iface, mtu)和netlink.LinkSetUp(iface)设置设备的MTU并启动
- 调用err = netlink.RouteAdd(&netlink.Route{
LinkIndex: iface.Attrs().Index,
Scope: netlink.SCOPE_UNIVERSE,
Dst: ipn.Network().ToIPNet(),
})添加路由
// backend/udp/network.go
func (n *network) Run(ctx context.Context)
- 创建一个goroutine,调用runCProxy(n.tun, n.conn, n.ctl2, n.tunNet.IP, n.MTU())
- 再创建一个goroutine,调用subnet.WatchLeases(ctx, n.sm, n.name, n.SubnetLease, evts)\
- 最后,一个无限for循环,等待subnet.Event,调用n.processSubnetEvents(evtBatch)或者等待ctx.Done(),之后调用stopProxy(n.ctl)
// backend/udp/cproxy.go
func runCProxy(tun *os.File, conn *net.UDPConn, ctl *os.File, tunIP ip.IP4, tunMTU int)
- 该函数首先调用c, err := conn.File()获取连接的文件描述符
- 接着调用C.run_proxy(
C.int(tun.Fd()),
C.int(c.Fd()),
C.int(ctl.Fd()),
C.in_addr_t(tunIP.NetworkOrder()),
C.size_t(tunMTU),
C.int(log_errors),
)
// backend/udp/proxy.c
void run_proxy(int tun, int sock, int ctl, in_addr_t tun_ip, size_t tun_mtu, int log_errors)
- 创建struct pollfd fds[PFD_CNT],并将tun, sock和ctl都填充进去
- 创建buf = (char *)malloc(tun_mtu);并且设置fcntl(tun, F_SETFL, O_NONBLOCK);
- 进入while循环,每次调用nfds = poll(fds, PFD_CNT, -1),当fds[PFD_CTL].revents & POLLIN时,调用process_cmd(ctl),若fds[PFD_TUN].revent & POLLIN 或者 fds[PFD_SOCK].revents & POLLIN则进入一个do-while循环,设置activity = 0,再依次调用activity += tun_to_udp(tun, sock, buf, tun_mtu)和activity += udp_to_tun(sock, tun, buf, tun_mtu),直到activity不为0时退出
// backend/udp/proxy.c
static int tun_to_udp(int tun, int sock, char *buf, size_t buflen)
- 首先调用ssize_t pktlen = tun_recv_packet(tun, buf, buflen)从tun中读取数据 -->该函数仅仅调用nread = read(tun, buf, buflen);而已,并且如果nread < sizeof(struct iphdr)会认为出错
- 设置iph = (struct iphdr* ) buf
- 并调用next_hop = find_route((in_addr_t)iph->daddr);
- 若next_hop为假,则调用send_net_unreachable(tun, buf);并返回1 --> next_hop就是将原本的目的地址IP(即目的容器IP)转换为目的容器所在宿主机的IP
- 调用decrement_ttl(iph),若返回值为0,则本函数返回1
- 最后调用sock_send_packet(sock, buf, pktlen, next_hop) -->该函数仅仅将数据从sock中发送出去
udp_to_tun的操作类似,仅仅只是换为调用sock_recv_packet(sock, buf, buflen)和tun_send_packet(tun, buf, pktlen)而已
routes的创建为struct route_entry *routes,route_entry如下所示:
struct route_entry { struct ip_net dst; struct sockaddr_in next_hop;}
ip_net的结构如下所示:
struct ip_net { in_addr_t ip; in_addr_t mask;}
// backend/udp/proxy.c
static struct sockaddr_in *find_route(in_addr_t dst)
- 该函数仅仅只是遍历routes_cnt个routes[]选项,当routes[i].dst包含dst时,将routes[i]和routes[0]互换,并返回&routes[0].next_hop
route_entry结构如下所示:
struct route_entry { struct ip_net dst; struct sockaddr_in next_hop;};
ip_net结构如下所示:
struct ip_net { in_addr_t ip; in_addr_t mask;}
----------------------------------------------------- 路由的添加 --------------------------------------------------
// backend/network.go
func (n *network) processSubnetEvents(batch []subnet.Event)
- 该函数调用for _, evt := range batch,当evt.Type为subnet.EventAdded时
调用setRoute(n.ctl, evt.Lease.Subnet, evt.Lease.Attrs.PublicIP, n.port)
// backend/udp/cproxy.go
func setRoute(ctl *os.File, dst ip.IP4Net, netHopIP ip.IP4, nextHopPort int)
该函数仅仅调用cmd := C.command{
cmd: C.CMD_SET_ROUTE,
dest_net: C.in_addr_t(dst.IP.NetworkOrder()),
dest_net_len: C.int(dst.PrefixLen),
next_hop_ip: C.in_addr_t(nextHopIP.NetworkOrder()),
next_hop_port: C.short(nextHopPort),
}接着调用writeCommand(ctl, &cmd)写入sock,之后进入下一个函数进行处理
// backend/udp/proxy.c
static void process_cmd(int ctl)
- ...
- 调用ssize_t nrecv = recv(ctl, (char *) &cmd, sizeof(cmd), 0)
- 当cmd.cmd == CMD_SET_ROUTE时,设置ipn.mask = netmask(cmd.dest_net_len)和ipn.ip = cmd.dest_net & ipn.mask;
- 调用sa.sin_addr.s_addr = cmd.next_hop_ip;和sa.sin_port = htons(cmd.next_hop_port);
- 最后调用set_route(ipn, &sa)
// backend/udp/proxy.c
static int set_route(struct ip_net dst, struct sockaddr_in *next_hop)
- 首先遍历routes,当存在目的网络相同的路由时,则仅仅改变该路由的next_hop
- 当routes_alloc和routes_cnt相等时,调用realloc进行扩展,并且更新routes和routes_alloc
- 最后将dst和next_hop添加到routes中,并且更新routes_cnt