Skip to content

Commit

Permalink
Implemented tcp socket expiration based on max number of http applica…
Browse files Browse the repository at this point in the history
…tion messages.
  • Loading branch information
dirk29 committed Jul 10, 2024
1 parent ead150b commit 181453f
Showing 1 changed file with 47 additions and 22 deletions.
69 changes: 47 additions & 22 deletions src/kflowd.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,10 @@ static __always_inline int handle_fs_event(void *ctx, const struct FS_EVENT_INFO
} else {
/* get record storage on heap and populate initial data */
r = bpf_map_lookup_elem(&heap_record_fs, &zero);
if (!r)
if (!r) {
bpf_printk("WARNING: Failed to allocate new filesystem record for pid %u\n", pid);
return 0;
}
task = (struct task_struct *)bpf_get_current_task();
r->ino = ino;
r->rc.pid = pid;
Expand Down Expand Up @@ -551,8 +553,10 @@ static __always_inline int submit_sock_record(struct SOCK_INFO *sinfo) {
__u32 zero = 0;

r = bpf_map_lookup_elem(&heap_record_sock, &zero);
if (!r)
if (!r) {
bpf_printk("WARNING: Failed to allocate new socket record for pid %u\n", sinfo->pid);
return 0;
}

struct sock *sock = sinfo->sock;
__u32 output_len = sizeof(*r);
Expand Down Expand Up @@ -801,8 +805,10 @@ static __always_inline int handle_tcp_event(void *ctx, const struct SOCK_EVENT_I
sock = (struct sock *)BPF_CORE_READ(args, skaddr);
key = KEY_SOCK(BPF_CORE_READ(sock, __sk_common.skc_hash));
stuple = bpf_map_lookup_elem(&heap_tuple, &zero);
if (!stuple)
if (!stuple) {
bpf_printk("WARNING: Failed to allocate new tuple for pid %u\n", pid);
return 0;
}
// TBD: consolidate
if (family == AF_INET) {
bpf_probe_read_kernel(stuple->laddr, sizeof(args->saddr), BPF_CORE_READ(args, saddr));
Expand Down Expand Up @@ -836,8 +842,10 @@ static __always_inline int handle_tcp_event(void *ctx, const struct SOCK_EVENT_I

if (tcp_state_old == TCP_SYN_RECV && tcp_state == TCP_ESTABLISHED) {
sinfo = bpf_map_lookup_elem(&heap_sock, &zero);
if (!sinfo)
if (!sinfo) {
bpf_printk("WARNING: Failed to allocate new tcp server socket for pid %u\n", pid);
return 0;
}
/* prepare new tcp server socket with unknown pid by remembering socket */
sinfo->sock = sock;
sinfo->family = family;
Expand Down Expand Up @@ -878,8 +886,10 @@ static __always_inline int handle_tcp_event(void *ctx, const struct SOCK_EVENT_I
} else if (tcp_state_old == TCP_CLOSE && tcp_state == TCP_SYN_SENT) {
// TBD: fix key zero
sinfo = bpf_map_lookup_elem(&heap_sock, &zero);
if (!sinfo)
if (!sinfo) {
bpf_printk("WARNING: Failed to allocate new tcp client socket for pid %u\n", pid);
return 0;
}
/* prepare new tcp client socket by remembering pid */
sinfo->sock = sock;
sinfo->family = family;
Expand Down Expand Up @@ -932,8 +942,11 @@ static __always_inline int handle_tcp_event(void *ctx, const struct SOCK_EVENT_I
/* get alternate key based on tuple without local port */
key_alt = crc64(0, (const u8 *)stuple, sizeof(*stuple));
sinfo = bpf_map_lookup_elem(&hash_socks, &key_alt);
if (!sinfo || sinfo->sock != sock)
if (!sinfo || sinfo->sock != sock) {
bpf_printk("WARNING: Failed lookup to add tcp client socket for alt key %lx and pid %u\n", key_alt,
pid);
return 0;
}
sinfo->state = tcp_state;
bpf_probe_read_kernel(sinfo->laddr, sizeof(stuple->laddr), stuple->laddr);
bpf_probe_read_kernel(sinfo->raddr, sizeof(stuple->raddr), stuple->raddr);
Expand All @@ -952,19 +965,19 @@ static __always_inline int handle_tcp_event(void *ctx, const struct SOCK_EVENT_I
} else if ((tcp_state_old == TCP_LAST_ACK && tcp_state == TCP_CLOSE) ||
(tcp_state_old == TCP_FIN_WAIT2 && tcp_state == TCP_CLOSE)) {
sinfo = bpf_map_lookup_elem(&hash_socks, &key);
if (sinfo && sinfo->sock == sock) {
/* submit final record and delete closed client and server sockets */
sinfo->state = tcp_state;
submit_sock_record(sinfo);
if (bpf_map_delete_elem(&hash_socks, &key))
bpf_printk("WARNING: Failed to delete %s socket for key %lx and pid %u\n",
GET_ROLE_STR(sinfo->role), key, sinfo->pid);
else if (debug_proc(sinfo->comm, NULL))
bpf_printk("Submitted and deleted %s socket for key %lx and pid %u\n", GET_ROLE_STR(sinfo->role),
key, sinfo->pid);
} else
bpf_printk("WARNING: Failed tcp socket lookup for key %lx and remote host %x:%u", key,
*((__u32 *)stuple->raddr), stuple->rport);
if (!sinfo || sinfo->sock != sock) {
bpf_printk("WARNING: Failed lookup to delete tcp socket for key %lx and pid %u", key, pid);
return 0;
}
/* submit final record and delete closed client and server sockets */
sinfo->state = tcp_state;
submit_sock_record(sinfo);
if (bpf_map_delete_elem(&hash_socks, &key))
bpf_printk("WARNING: Failed to delete %s socket for key %lx and pid %u\n", GET_ROLE_STR(sinfo->role),
key, sinfo->pid);
else if (debug_proc(sinfo->comm, NULL))
bpf_printk("Submitted and deleted %s socket for key %lx and pid %u\n", GET_ROLE_STR(sinfo->role), key,
sinfo->pid);
} else if (debug_proc(comm, NULL))
bpf_printk("Pass tcp state change for pid %u\n", pid);
} else {
Expand Down Expand Up @@ -1491,7 +1504,7 @@ static __always_inline int handle_udp_event(void *ctx, const struct SOCK_EVENT_I
bpf_printk("WARNING: Failed to delete %s socket for key %lx and pid %u\n",
GET_ROLE_STR(sinfo->role), key, sinfo->pid);
else if (debug_proc(sinfo->comm, NULL))
bpf_printk("Submitted and deleted %s socket for key %lx and pid %u\n",
bpf_printk("Submitted and deleted %s socket due to app message limit for key %lx and pid %u\n",
GET_ROLE_STR(sinfo->role), key, sinfo->pid);
return 0;
}
Expand All @@ -1515,8 +1528,10 @@ static __always_inline int handle_udp_event(void *ctx, const struct SOCK_EVENT_I
/* populate new socket and pid data */
sinfo = bpf_map_lookup_elem(&heap_sock, &zero);
stuple = bpf_map_lookup_elem(&heap_tuple, &zero);
if (!sinfo || !stuple)
if (!sinfo || !stuple) {
bpf_printk("WARNING: Failed to allocate new udp socket or tuple for pid %u\n", pid);
return 0;
}
sinfo->sock = sock;
sinfo->pid = pid;
sinfo->tid = bpf_get_current_pid_tgid();
Expand Down Expand Up @@ -1822,8 +1837,10 @@ int handle_skb(struct __sk_buff *skb) {

/* lookup socket via stuple */
stuple = bpf_map_lookup_elem(&heap_tuple, &zero);
if (!stuple)
if (!stuple) {
bpf_printk("WARNING: Failed to allocate new tuple for application message\n");
return skb->len;
}
bpf_probe_read_kernel(stuple->laddr, sizeof(stuple->laddr), laddr);
bpf_probe_read_kernel(stuple->raddr, sizeof(stuple->raddr), raddr);
stuple->lport = lport;
Expand Down Expand Up @@ -1860,6 +1877,14 @@ int handle_skb(struct __sk_buff *skb) {
bpf_printk("WARNING: Failed to capture payload for %s socket %lx and pid %u\n", GET_ROLE_STR(sinfo->role), key,
sinfo->pid);

/* submit record on application message limit */
if (sinfo->app_msg.cnt == APP_MSG_MAX) {
submit_sock_record(sinfo);
if (debug_proc(sinfo->comm, NULL))
bpf_printk("Submitted %s socket due to app message limit for key %lx and pid %u\n",
GET_ROLE_STR(sinfo->role), key, sinfo->pid);
}

/* debug for socket filter */
if (debug_proc(sinfo->comm, NULL)) {
bpf_printk("HANDLE_SKB %s:", isrx ? "RX" : "TX");
Expand Down

0 comments on commit 181453f

Please sign in to comment.