Skip to content

Commit

Permalink
Fixed duplicate socket buffers from raw socket ebpf filter.
Browse files Browse the repository at this point in the history
  • Loading branch information
dirk29 committed Jul 31, 2024
1 parent 59155d5 commit c6cd167
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 72 deletions.
2 changes: 1 addition & 1 deletion plugins
3 changes: 2 additions & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ echo "- Enable kflowd systemd service to start automatically after reboot
sudo systemctl enable kflowd\n"
echo "- Start kflowd systemd service and check status with the following commands:
sudo systemctl start kflowd
sudo systemctl status kflowd\n"
sudo systemctl status kflowd
sudo journalctl -fe -u kflowd\n"
endef
export NFPM
export NFPM_PRE
Expand Down
116 changes: 67 additions & 49 deletions src/kflowd.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -843,17 +843,27 @@ static __always_inline int handle_tcp_event(void *ctx, const struct SOCK_EVENT_I
}

if (tcp_state_old == TCP_SYN_RECV && tcp_state == TCP_ESTABLISHED) {
sinfo = bpf_map_lookup_elem(&heap_sock, &zero);
/* check if alternate key from application message exists already */
key_alt = crc64(0, (const u8 *)stuple, sizeof(*stuple));
sinfo = bpf_map_lookup_elem(&hash_socks, &key_alt);
if (!sinfo) {
bpf_printk("WARNING: Failed to allocate new tcp server socket for pid %u\n", pid);
return 0;
sinfo = bpf_map_lookup_elem(&heap_sock, &zero);
if (!sinfo) {
bpf_printk("WARNING: Failed to allocate new tcp server socket for pid %u\n", pid);
return 0;
}
sinfo->app_msg.cnt = 0;
}
/* prepare new tcp server socket with unknown pid by remembering socket */
sinfo->sock = sock;
sinfo->family = family;
sinfo->proto = IPPROTO_TCP;
sinfo->role = ROLE_TCP_SERVER;
sinfo->state = tcp_state;
bpf_probe_read_kernel(sinfo->laddr, sizeof(stuple->laddr), stuple->laddr);
bpf_probe_read_kernel(sinfo->raddr, sizeof(stuple->raddr), stuple->raddr);
sinfo->lport = stuple->lport;
sinfo->rport = stuple->rport;
/* add rx syn, ack and tx syn-ack packet since otherwise undetected due to linux syn cookies */
sinfo->rx_ts = bpf_ktime_get_ns();
sinfo->rx_ts_first = sinfo->rx_ts;
Expand All @@ -878,10 +888,10 @@ static __always_inline int handle_tcp_event(void *ctx, const struct SOCK_EVENT_I
sinfo->tx_event[0] = 1;
sinfo->tx_flags_map[0] = TCP_SYN | TCP_ACK;
sinfo->tx_flags_map_cnt = 1;
sinfo->app_msg.cnt = 0;
if (!bpf_map_update_elem(&hash_socks, &key, sinfo, BPF_ANY)) {
if (debug_proc(sinfo->comm, NULL))
bpf_printk("Prepared new tcp server socket for pid %u\n", pid);
bpf_printk("Prepared %s server socket for pid %u\n",
sinfo->app_msg.cnt ? "new tcp" : "tcp application", pid);
} else
bpf_printk("WARNING: Failed to prepare new tcp server socket for pid %u\n", pid);
} else if (tcp_state_old == TCP_CLOSE && tcp_state == TCP_SYN_SENT) {
Expand All @@ -897,6 +907,10 @@ static __always_inline int handle_tcp_event(void *ctx, const struct SOCK_EVENT_I
sinfo->proto = IPPROTO_TCP;
sinfo->role = ROLE_TCP_CLIENT;
sinfo->state = tcp_state;
bpf_probe_read_kernel(sinfo->laddr, sizeof(stuple->laddr), stuple->laddr);
bpf_probe_read_kernel(sinfo->raddr, sizeof(stuple->raddr), stuple->raddr);
sinfo->lport = stuple->lport;
sinfo->rport = stuple->rport;
/* add tx syn, ack and rx syn-ack packet since otherwise undetected due to linux syn cookies */
sinfo->tx_ts = bpf_ktime_get_ns();
sinfo->tx_ts_first = sinfo->tx_ts;
Expand Down Expand Up @@ -940,7 +954,7 @@ static __always_inline int handle_tcp_event(void *ctx, const struct SOCK_EVENT_I
bpf_printk("WARNING: Failed to prepare new tcp client socket for alt key %lx and pid %u\n", key_alt,
pid);
} else if (tcp_state_old == TCP_SYN_SENT && tcp_state == TCP_ESTABLISHED) {
/* get alternate key based on tuple without local port */
/* get alternate key */
key_alt = crc64(0, (const u8 *)stuple, sizeof(*stuple));
sinfo = bpf_map_lookup_elem(&hash_socks, &key_alt);
if (!sinfo || sinfo->sock != sock) {
Expand All @@ -957,11 +971,6 @@ static __always_inline int handle_tcp_event(void *ctx, const struct SOCK_EVENT_I
}
}
sinfo->state = tcp_state;
bpf_probe_read_kernel(sinfo->laddr, sizeof(stuple->laddr), stuple->laddr);
bpf_probe_read_kernel(sinfo->raddr, sizeof(stuple->raddr), stuple->raddr);
sinfo->lport = stuple->lport;
sinfo->rport = stuple->rport;

/* add new tcp client socket */
if (!bpf_map_update_elem(&hash_socks, &key, sinfo, BPF_ANY)) {
if (debug_proc(sinfo->comm, NULL))
Expand Down Expand Up @@ -1006,19 +1015,6 @@ static __always_inline int handle_tcp_event(void *ctx, const struct SOCK_EVENT_I
bpf_probe_read_kernel_str(&sinfo->comm_parent, sizeof(sinfo->comm_parent),
BPF_CORE_READ(task, real_parent, mm, exe_file, f_path.dentry, d_name.name));
sinfo->ts_proc = BPF_CORE_READ(task, start_time);
if (sinfo->family == AF_INET) {
__u32 laddr = BPF_CORE_READ(sock, __sk_common.skc_rcv_saddr);
__u32 raddr = BPF_CORE_READ(sock, __sk_common.skc_daddr);
bpf_probe_read_kernel(sinfo->laddr, sizeof(laddr), &laddr);
bpf_probe_read_kernel(sinfo->raddr, sizeof(raddr), &raddr);
} else {
bpf_probe_read_kernel(sinfo->laddr, sizeof(sinfo->laddr),
BPF_CORE_READ(sock, __sk_common.skc_v6_rcv_saddr.in6_u.u6_addr8));
bpf_probe_read_kernel(sinfo->raddr, sizeof(sinfo->raddr),
BPF_CORE_READ(sock, __sk_common.skc_v6_daddr.in6_u.u6_addr8));
}
sinfo->lport = BPF_CORE_READ(sock, __sk_common.skc_num);
sinfo->rport = bpf_ntohs(BPF_CORE_READ(sock, __sk_common.skc_dport));

/* debug */
if (debug_proc(sinfo->comm, NULL)) {
Expand Down Expand Up @@ -1772,7 +1768,7 @@ int handle_skb(struct __sk_buff *skb) {
__u16 sport;
__u16 dport;
bool is_app_port[APP_MAX] = {0};
struct SOCK_INFO *sinfo;
struct SOCK_INFO *sinfo = NULL;
struct SOCK_TUPLE *stuple;
__u32 zero = 0;
__u64 key = 0;
Expand All @@ -1782,6 +1778,7 @@ int handle_skb(struct __sk_buff *skb) {
__u32 cnta;
__u32 cntl = 0;
__u8 num;
__u32 seq;
bool isrx = (skb->ingress_ifindex == skb->ifindex);
bool found = false;

Expand Down Expand Up @@ -1871,7 +1868,7 @@ int handle_skb(struct __sk_buff *skb) {
data_len = ip_len - (iphdr_len + tcphdr_len);
else
return skb->len;
if (data_len < APP_MSG_LEN_MIN || data_len > APP_MSG_LEN_MAX)
if (data_len < APP_MSG_LEN_MIN)
return skb->len;

/* check data length and dns port */
Expand Down Expand Up @@ -1903,58 +1900,79 @@ int handle_skb(struct __sk_buff *skb) {
stuple->rport = rport;
stuple->proto = proto;
pkey = bpf_map_lookup_elem(&hash_tuples, stuple);
if (!pkey) {
bpf_printk("WARNING: Failed to lookup tcp socket tuple for lport %u and rport %u\n", lport, rport);
return skb->len;
if (pkey) {
bpf_probe_read_kernel(&key, sizeof(key), pkey);
sinfo = bpf_map_lookup_elem(&hash_socks, &key);
if (!sinfo) {
bpf_printk("WARNING: Failed to lookup tcp socket key %lx for lport %u and rport %u\n", key, lport, rport);
return skb->len;
}
}
bpf_probe_read_kernel(&key, sizeof(key), pkey);
sinfo = bpf_map_lookup_elem(&hash_socks, &key);
if (!sinfo || sinfo->lport != lport || sinfo->rport != rport) {
bpf_printk("WARNING: Failed to lookup tcp socket key %lx for lport %u and rport %u\n", key, lport, rport);
return skb->len;
if (!sinfo) {
/* prepare socket for alternate key when tcp server handshake not yet finished */
sinfo = bpf_map_lookup_elem(&heap_sock, &zero);
if (!sinfo) {
bpf_printk("WARNING: Failed to allocate new tcp application server socket\n");
return 0;
}
sinfo->pid = 0;
sinfo->comm[0] = 0;
sinfo->family = family;
sinfo->role = ROLE_TCP_SERVER;
sinfo->proto = IPPROTO_TCP;
bpf_probe_read_kernel(sinfo->laddr, sizeof(stuple->laddr), laddr);
bpf_probe_read_kernel(sinfo->raddr, sizeof(stuple->raddr), raddr);
stuple->lport = lport;
stuple->rport = rport;
sinfo->ts_first = bpf_ktime_get_ns();
sinfo->app_msg.cnt = 0;
key = crc64(0, (const u8 *)stuple, sizeof(*stuple));
}

/* capture payloads */
/* capture application message */
num = sinfo->app_msg.cnt;
if (num >= APP_MSG_MAX) {
bpf_printk("WARNING: Failed to capture %u application messages\n", num);
if (num >= APP_MSG_MAX)
return skb->len;
} else if (!num)
else if (!num)
sinfo->app_msg.type = cnta;

/* discard if duplicate packet from raw socket */
bpf_skb_load_bytes(skb, tcphdr_ofs + offsetof(struct tcphdr, seq), &seq, 4);
sinfo->app_msg.seq[num] = bpf_ntohl(seq);
if (num - 1 >= 0 && num - 1 < APP_MSG_MAX && sinfo->app_msg.seq[num] == sinfo->app_msg.seq[num - 1])
return skb->len;

/* get application data */
sinfo->app_msg.ts[num] = bpf_ktime_get_ns();
sinfo->app_msg.len[num] = data_len;
sinfo->app_msg.isrx[num] = isrx;
sinfo->app_msg.cnt++;
bpf_skb_load_bytes(skb, data_ofs, sinfo->app_msg.data[num], MIN(data_len, sizeof(sinfo->app_msg.data[num]) - 1));
if (data_len > APP_MSG_LEN_MAX)
data_len = APP_MSG_LEN_MAX;
if (data_len >= APP_MSG_LEN_MIN)
bpf_skb_load_bytes(skb, data_ofs, sinfo->app_msg.data[num], data_len);
else
return skb->len;
if (!bpf_map_update_elem(&hash_socks, &key, sinfo, BPF_ANY)) {
if (debug_proc(sinfo->comm, NULL))
bpf_printk("Captured payload for %s socket %lx and pid %u", GET_ROLE_STR(sinfo->role), key, sinfo->pid);
} else
bpf_printk("WARNING: Failed to capture payload for %s socket %lx and pid %u\n", GET_ROLE_STR(sinfo->role), key,
sinfo->pid);

/* submit record on application message limit */
if (sinfo->app_msg.cnt >= APP_MSG_MAX) {
submit_sock_record(sinfo);
if (debug_proc(sinfo->comm, NULL))
bpf_printk("Submitted %s socket due to app message limit for key %lx and pid %u\n",
GET_ROLE_STR(sinfo->role), key, sinfo->pid);
}

/* debug for socket filter */
if (debug_proc(sinfo->comm, NULL)) {
bpf_printk("HANDLE_SKB %s:", isrx ? "RX" : "TX");
bpf_printk(" PID: %u KEY: %lx", sinfo->pid, key);
bpf_printk(" PROTO: %u FAMILY: %u ", sinfo->proto, sinfo->family);
bpf_printk(" PROTO: %u FAMILY: %u ", proto, family);
if (family == AF_INET) {
bpf_printk(" LOCAL: %pI4:%u", laddr, lport);
bpf_printk(" REMOTE: %pI4:%u", raddr, rport);
} else {
bpf_printk(" LOCAL: %pI6c:%u", laddr, lport);
bpf_printk(" REMOTE: %pI6c:%u", raddr, rport);
}
bpf_printk(" APP: MESSAGE %u LEN %u", num, sinfo->app_msg.len[num]);
bpf_printk(" APP: MESSAGE %u LEN %u (%u)\n", sinfo->app_msg.cnt, sinfo->app_msg.len[num], data_len);
}

return skb->len;
Expand Down
Loading

0 comments on commit c6cd167

Please sign in to comment.