WEBKT

巧用eBPF监控K8s Pod网络流量:TCP连接数与流量大小全掌握

217 0 0 0

在云原生时代,Kubernetes (K8s) 已成为容器编排的事实标准。然而,随着微服务架构的普及,服务间的网络通信变得日益复杂,监控和分析 K8s 集群中 Pod 的网络流量变得至关重要。本文将探讨如何利用 eBPF (extended Berkeley Packet Filter) 技术,高效、安全地监控 K8s Pod 的网络流量,特别是统计每个 Pod 的 TCP 连接数和流量大小等关键指标。

为什么选择 eBPF?

传统的网络监控方法,例如使用 tcpdump 抓包,会对系统性能产生较大影响,尤其是在高负载环境下。eBPF 则提供了一种更优雅的解决方案。它允许用户在内核态动态地注入自定义的代码,而无需修改内核源码或加载内核模块。eBPF 程序运行在沙箱环境中,具有很高的安全性,并且能够以极低的开销进行网络数据包的过滤、修改和分析。

相比于传统的用户态网络监控工具,eBPF 具有以下优势:

  • 高性能: eBPF 程序直接运行在内核态,减少了用户态和内核态之间的数据拷贝和上下文切换。
  • 低开销: eBPF 程序可以有选择性地捕获和处理网络数据包,避免了不必要的数据处理。
  • 安全性: eBPF 程序运行在沙箱环境中,受到内核的严格安全检查,防止恶意代码对系统造成损害。
  • 灵活性: eBPF 允许用户自定义网络监控逻辑,满足各种不同的需求。

实现步骤

1. 准备工作

  • 安装 bpftool: bpftool 是一个用于管理和调试 eBPF 程序的命令行工具。可以使用包管理器进行安装,例如在 Ubuntu 上可以使用 apt install bpftool 命令。
  • 安装 libbpf: libbpf 是一个用于加载、验证和管理 eBPF 程序的 C 库。许多 eBPF 工具都依赖于它。通常,libbpf 会随 Linux 内核一起发布,但为了获得最新的功能和修复,建议手动安装最新版本。
  • 安装 clang 和 llvm: clangllvm 是用于编译 eBPF 程序的编译器工具链。可以使用包管理器进行安装,例如在 Ubuntu 上可以使用 apt install clang llvm 命令。
  • 确保内核支持 eBPF: 检查内核版本是否大于 4.14,这是 eBPF 功能较为完善的版本。可以通过 uname -r 命令查看内核版本。

2. 编写 eBPF 程序

以下是一个使用 eBPF 监控 TCP 连接数和流量大小的示例程序 (example.c):

#include <linux/bpf.h>
#include <bpf_helpers.h>
#include <linux/tcp.h>
#include <linux/ip.h>

#define MAX_ENTRIES 65536

struct flow_key {
    __u32 src_addr;
    __u32 dst_addr;
    __u16 src_port;
    __u16 dst_port;
    __u32 pod_id; // Add Pod ID
};

struct data_t {
    __u64 packets;
    __u64 bytes;
};

BPF_HASH(flow_stats, struct flow_key, struct data_t, MAX_ENTRIES);
BPF_HASH(pod_ip_map, __u32, __u32, MAX_ENTRIES); // IP to Pod ID mapping

static inline int parse_ip(void *data, u64 nh_off, void *data_end, struct iphdr **iphdrpp)
{
    struct iphdr *iph = data + nh_off;

    if ((void*)iph + sizeof(*iph) > data_end)
        return -1;

    *iphdrpp = iph;
    return 0;
}

static inline int parse_tcp(void *data, u64 nh_off, void *data_end, struct tcphdr **tcphdrpp)
{
    struct tcphdr *tcph;
    struct iphdr *iph;

    int err = parse_ip(data, nh_off, data_end, &iph);
    if (err)
        return -1;

    tcph = (void*)iph + sizeof(struct iphdr);
    if ((void*)tcph + sizeof(*tcph) > data_end)
        return -1;

    *tcphdrpp = tcph;
    return 0;
}

SEC("tracepoint/syscalls/sys_enter_connect")
int bpf_prog1(void *ctx) {
    struct flow_key key = {};
    struct data_t *data;
    struct sock *sk = (struct sock *)PT_REGS_PARM1(ctx);
    __u32 saddr = sk->__sk_common.skc_rcv_saddr;

    // Get Pod ID from IP
    __u32 *pod_id = bpf_map_lookup_elem(&pod_ip_map, &saddr);
    if (!pod_id) {
        return 0; // If no Pod ID, ignore
    }

    key.src_addr = sk->__sk_common.skc_rcv_saddr;
    key.dst_addr = sk->__sk_common.skc_daddr;
    key.src_port = sk->__sk_common.skc_num;
    key.dst_port = sk->__sk_common.skc_dport;
    key.pod_id = *pod_id; // Assign Pod ID to the key

    data = bpf_map_lookup_elem(&flow_stats, &key);
    if (!data) {
        struct data_t zero = {0};
        bpf_map_update_elem(&flow_stats, &key, &zero, BPF_NOEXIST);
        data = bpf_map_lookup_elem(&flow_stats, &key);
        if (!data)
            return 0; // Should not happen, but check anyway
    }

    data->packets++;
    return 0;
}

SEC("kprobe/tcp_rcv_space_adjust")
int bpf_prog2(struct pt_regs *ctx)
{
    struct sock *sk = (struct sock *)PT_REGS_PARM1(ctx);
    struct flow_key key = {};
    struct data_t *data;
    __u32 saddr = sk->__sk_common.skc_rcv_saddr;

    // Get Pod ID from IP
    __u32 *pod_id = bpf_map_lookup_elem(&pod_ip_map, &saddr);
    if (!pod_id) {
        return 0; // If no Pod ID, ignore
    }

    key.src_addr = sk->__sk_common.skc_rcv_saddr;
    key.dst_addr = sk->__sk_common.skc_daddr;
    key.src_port = sk->__sk_common.skc_num;
    key.dst_port = sk->__sk_common.skc_dport;
    key.pod_id = *pod_id; // Assign Pod ID to the key

    data = bpf_map_lookup_elem(&flow_stats, &key);
    if (!data) {
        struct data_t zero = {0};
        bpf_map_update_elem(&flow_stats, &key, &zero, BPF_NOEXIST);
        data = bpf_map_lookup_elem(&flow_stats, &key);
        if (!data)
            return 0; // Should not happen, but check anyway
    }

    long amount = PT_REGS_PARM2(ctx);
    if (amount > 0)
        data->bytes += amount;

    return 0;
}



// License macro - required for using BPF kernel helpers
char _license[] SEC("license") = "GPL";

代码解释:

  • flow_key 结构体: 用于标识一个网络流,包含源 IP 地址、目标 IP 地址、源端口、目标端口和 Pod ID。
  • data_t 结构体: 用于存储每个网络流的统计数据,包含数据包数量和字节数。
  • flow_stats BPF_HASH: 用于存储网络流的统计数据,key 为 flow_key 结构体,value 为 data_t 结构体。
  • pod_ip_map BPF_HASH: 用于存储 Pod IP 地址到 Pod ID 的映射关系。这需要你在程序外部维护,例如通过 Kubernetes API 获取 Pod 信息并更新这个 Map。
  • bpf_prog1 函数: 使用 tracepoint/syscalls/sys_enter_connect 跟踪点,在 TCP 连接建立时被调用,统计连接数。
  • bpf_prog2 函数: 使用 kprobe/tcp_rcv_space_adjust kprobe,在接收到 TCP 数据时被调用,统计流量大小。

注意:

  • 需要根据实际情况修改代码,例如调整 Map 的大小、添加更多的统计指标等。
  • 这个例子只是一个简单的示例,实际应用中可能需要考虑更多的情况,例如处理 UDP 流量、过滤特定的网络流等。
  • 务必进行充分的测试,确保 eBPF 程序的安全性和稳定性。

3. 编译 eBPF 程序

使用以下命令编译 eBPF 程序:

clang -O2 -target bpf -D__KERNEL__ -I/usr/include -c example.c -o example.o

4. 加载和运行 eBPF 程序

可以使用 bpftool 或其他 eBPF 库加载和运行编译后的 eBPF 程序。以下是一个使用 bpftool 的示例:

bpftool prog load example.o /sys/fs/bpf/example
bpftool prog attach tracepoint sys_enter_connect /sys/fs/bpf/example
bpftool prog attach kprobe tcp_rcv_space_adjust /sys/fs/bpf/example

5. 收集和展示数据

eBPF 程序会将统计数据存储在 flow_stats Map 中。可以使用用户态程序读取这个 Map,并将数据展示出来。以下是一个简单的 Python 示例:

from bcc import BPF
import socket
import struct

def int_to_ip(int_ip):
    return socket.inet_ntoa(struct.pack('!I', int_ip))

# Load the compiled eBPF program
b = BPF(src_file="example.c")

# Get the flow_stats map
flow_stats = b["flow_stats"]

# Print the header
print("%-16s %-16s %-6s %-6s %-10s %-10s" % ("SRC IP", "DST IP", "SPORT", "DPORT", "PACKETS", "BYTES"))

# Print the flow statistics every 2 seconds
while True:
    for key, value in flow_stats.items():
        src_ip = int_to_ip(key.src_addr)
        dst_ip = int_to_ip(key.dst_addr)
        src_port = key.src_port
        dst_port = key.dst_port
        packets = value.packets
        bytes = value.bytes
        print("%-16s %-16s %-6d %-6d %-10d %-10d" % (src_ip, dst_ip, src_port, dst_port, packets, bytes))
    flow_stats.clear()
    sleep(2)

代码解释:

  • 使用 bcc 库加载 eBPF 程序。
  • 获取 flow_stats Map。
  • 循环读取 Map 中的数据,并将数据打印到控制台。

6. 与 Kubernetes 集成

为了将 eBPF 程序与 Kubernetes 集成,需要解决以下几个问题:

  • Pod IP 地址到 Pod ID 的映射: 需要通过 Kubernetes API 获取 Pod 的 IP 地址和 ID,并将它们存储在 pod_ip_map Map 中。可以使用 Kubernetes 的 informer 机制监听 Pod 的创建、删除和更新事件,并动态更新 pod_ip_map Map。
  • eBPF 程序的部署: 可以使用 Kubernetes 的 DaemonSet 部署 eBPF 程序到每个节点上。DaemonSet 确保每个节点上都运行一个 Pod,该 Pod 负责加载和运行 eBPF 程序。
  • 数据收集和展示: 可以使用 Kubernetes 的 Service 暴露用户态程序,并将统计数据展示出来。可以使用 Prometheus 等监控系统收集和存储统计数据,并使用 Grafana 等可视化工具展示数据。

总结

本文介绍了如何使用 eBPF 技术监控 Kubernetes Pod 的网络流量。eBPF 具有高性能、低开销、安全性和灵活性等优点,是网络监控和性能分析的理想选择。通过将 eBPF 程序与 Kubernetes 集成,可以实现对 K8s 集群网络流量的全面监控和分析,为应用的性能优化和故障排查提供有力支持。

NetObserver eBPFKubernetes网络监控

评论点评