Skip to content

Commit

Permalink
Modified http message output based on number of messages in record.
Browse files Browse the repository at this point in the history
  • Loading branch information
dirk29 committed Jul 18, 2024
1 parent 181453f commit 80c1a1d
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 47 deletions.
2 changes: 1 addition & 1 deletion plugins
34 changes: 22 additions & 12 deletions src/kflowd.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -943,9 +943,17 @@ static __always_inline int handle_tcp_event(void *ctx, const struct SOCK_EVENT_I
key_alt = crc64(0, (const u8 *)stuple, sizeof(*stuple));
sinfo = bpf_map_lookup_elem(&hash_socks, &key_alt);
if (!sinfo || sinfo->sock != sock) {
bpf_printk("WARNING: Failed lookup to add tcp client socket for alt key %lx and pid %u\n", key_alt,
pid);
return 0;
/* try again without lport */
u16 lport = stuple->lport;
stuple->lport = 0;
key_alt = crc64(0, (const u8 *)stuple, sizeof(*stuple));
stuple->lport = lport;
sinfo = bpf_map_lookup_elem(&hash_socks, &key_alt);
if (!sinfo || sinfo->sock != sock) {
bpf_printk("WARNING: Failed lookup to add tcp client socket for alt key %lx and pid %u\n", key_alt,
pid);
return 0;
}
}
sinfo->state = tcp_state;
bpf_probe_read_kernel(sinfo->laddr, sizeof(stuple->laddr), stuple->laddr);
Expand All @@ -960,13 +968,14 @@ static __always_inline int handle_tcp_event(void *ctx, const struct SOCK_EVENT_I
bpf_printk("Added new tcp client socket for alt key %lx, key %lx and pid %u\n", key_alt, key,
sinfo->pid);
} else
bpf_printk("WARNING: Failed to add new tcp client socket for alt key %lx and pid %u\n", key_alt,
sinfo->pid);
bpf_printk("WARNING: Failed to add new tcp client socket for alt key %lx, key %lx and pid %u\n",
key_alt, key, sinfo->pid);
} else if ((tcp_state_old == TCP_LAST_ACK && tcp_state == TCP_CLOSE) ||
(tcp_state_old == TCP_FIN_WAIT2 && tcp_state == TCP_CLOSE)) {
sinfo = bpf_map_lookup_elem(&hash_socks, &key);
if (!sinfo || sinfo->sock != sock) {
bpf_printk("WARNING: Failed lookup to delete tcp socket for key %lx and pid %u", key, pid);
bpf_printk("WARNING: Failed lookup to delete tcp socket for key %lx, lport %u and pid %u", key,
stuple->lport, pid);
return 0;
}
/* submit final record and delete closed client and server sockets */
Expand Down Expand Up @@ -1029,7 +1038,7 @@ static __always_inline int handle_tcp_event(void *ctx, const struct SOCK_EVENT_I
/* update hash tables */
if (!bpf_map_update_elem(&hash_socks, &key, sinfo, BPF_ANY)) {
if (debug_proc(sinfo->comm, NULL))
bpf_printk("Added new tcp server socket for key %lx and pid %u\n", key, pid);
bpf_printk("Added new tcp server socket for key %lx, rport %u and pid %u\n", key, sinfo->rport, pid);
} else
bpf_printk("WARNING: Failed to add new tcp server socket for key %lx and pid %u\n", key, pid);
}
Expand Down Expand Up @@ -1498,7 +1507,7 @@ static __always_inline int handle_udp_event(void *ctx, const struct SOCK_EVENT_I
bpf_probe_read_kernel(sinfo->app_msg.data[num], MIN((__u16)data_len, sizeof(sinfo->app_msg.data[num])),
dnshdr);
/* export record on max application messages */
if (sinfo->app_msg.cnt == APP_MSG_MAX) {
if (sinfo->app_msg.cnt >= APP_MSG_MAX) {
submit_sock_record(sinfo);
if (bpf_map_delete_elem(&hash_socks, &key))
bpf_printk("WARNING: Failed to delete %s socket for key %lx and pid %u\n",
Expand Down Expand Up @@ -1848,18 +1857,18 @@ int handle_skb(struct __sk_buff *skb) {
stuple->proto = proto;
pkey = bpf_map_lookup_elem(&hash_tuples, stuple);
if (!pkey) {
bpf_printk("WARNING: Failed to lookup tcp socket for tuple\n");
bpf_printk("WARNING: Failed to lookup tcp socket tuple for lport %u and rport %u\n", lport, rport);
return skb->len;
}
bpf_probe_read_kernel(&key, sizeof(key), pkey);
sinfo = bpf_map_lookup_elem(&hash_socks, &key);
if (!sinfo) {
bpf_printk("WARNING: Failed to lookup tcp socket %lx\n", key);
bpf_printk("WARNING: Failed to lookup tcp socket key %lx for lport %u and rport %u\n", key, lport, rport);
return skb->len;
}

/* capture payloads */
num = sinfo->app_msg.cnt++;
num = sinfo->app_msg.cnt;
if (num >= APP_MSG_MAX) {
bpf_printk("WARNING: Failed to capture %u application messages\n", num);
return skb->len;
Expand All @@ -1869,6 +1878,7 @@ int handle_skb(struct __sk_buff *skb) {
sinfo->app_msg.ts[num] = bpf_ktime_get_ns();
sinfo->app_msg.len[num] = data_len;
sinfo->app_msg.isrx[num] = isrx;
sinfo->app_msg.cnt++;
bpf_skb_load_bytes(skb, data_ofs, sinfo->app_msg.data[num], MIN(data_len, sizeof(sinfo->app_msg.data[num]) - 1));
if (!bpf_map_update_elem(&hash_socks, &key, sinfo, BPF_ANY)) {
if (debug_proc(sinfo->comm, NULL))
Expand All @@ -1878,7 +1888,7 @@ int handle_skb(struct __sk_buff *skb) {
sinfo->pid);

/* submit record on application message limit */
if (sinfo->app_msg.cnt == APP_MSG_MAX) {
if (sinfo->app_msg.cnt >= APP_MSG_MAX) {
submit_sock_record(sinfo);
if (debug_proc(sinfo->comm, NULL))
bpf_printk("Submitted %s socket due to app message limit for key %lx and pid %u\n",
Expand Down
66 changes: 32 additions & 34 deletions src/kflowd.c
Original file line number Diff line number Diff line change
Expand Up @@ -604,14 +604,15 @@ static int handle_event(void *ctx, void *data, size_t data_sz) {
char *app_rx_msg[APP_MSG_MAX] = {0};
int app_tx_msg_cnt = 0;
int app_rx_msg_cnt = 0;
char *msg = NULL;

/* decode first tx and then rx messages */
for (cntm = 0; cntm < app_msg->cnt * 2; cntm++) {
struct APP_MSG_DNS dns = {0};
struct APP_MSG_HTTP http = {0};
char *msg = NULL;
int mc = app_msg->cnt;
int idx = cntm % mc;

if ((app_msg->isrx[idx] && cntm < mc) || (!app_msg->isrx[idx] && cntm >= mc))
continue;
ofs = (rs->proto == IPPROTO_TCP ? 2 : 0); /* for dns over tcp omit first 2 bytes contaiing length */
Expand Down Expand Up @@ -666,11 +667,10 @@ static int handle_event(void *ctx, void *data, size_t data_sz) {
dns.flags.qr ? J_UINT : J_IGN_UINT, "AnswerCount", dns.ancount,
J_JSON, "ResourceRecords", dns_rr[0] ? dns_rr : "[]");
} else if (app_msg->type == APP_DNS) {
msg = mkjson(MKJ_OBJ, 1,
J_STRING, "_Exception", "ERROR");
msg = mkjson(MKJ_OBJ, 1, J_STRING, "_Exception", "DNS Message Decoder");
}
else if (app_msg->type == APP_HTTP && !plugin_http_decode(app_msg->data[idx], app_msg->len[idx], &http)) {
int msg_size = APP_MSG_LEN_MAX;
int msg_size = APP_MSG_LEN_MAX / app_msg->cnt;
snprintf(ts1, sizeof(ts1), "%.09f", (app_msg->ts[idx] - r->ts_first) / 1e9);
char *msg_http = mkjson(MKJ_OBJ, 6,
J_TIMESTAMP, "_Timestamp", app_msg->ts[idx] - r->ts_first ? ts1 : "0",
Expand All @@ -679,44 +679,42 @@ static int handle_event(void *ctx, void *data, size_t data_sz) {
strlen(http.version) ? J_STRING : J_IGN_STRING, "_Version", http.version,
http.status ? J_UINT : J_IGN_UINT, "_Status", http.status,
strlen(http.reason) ? J_STRING : J_IGN_STRING, "_Reason", http.reason);
msg = calloc(msg_size, sizeof(char));
snprintf(msg, msg_size, "%s", msg_http);
free(msg_http);
for (cnth = 0; cnth < HTTP_HEADERS_MAX; cnth++) {
if (!http.header_name[cnth][0])
break;
len = strlen(msg);
if(!cnth)
len -= 1;
if(msg_size - len - 1 > (int)strlen(http.header_name[cnth]) + (int)strlen(http.header_value[cnth]) + 32)
snprintf(msg + len, msg_size - len, ", \"%s\": \"%s\"", http.header_name[cnth], http.header_value[cnth]);
}
if(strlen(http.body)) {
len = strlen(msg_http);
if(len && len < msg_size && msg_http[len - 1] == '}') {
msg_http[len - 1] = 0; /* remove closed brace */
msg = calloc(msg_size, sizeof(char));
snprintf(msg, msg_size, "%s", msg_http);
free(msg_http);
for (cnth = 0; cnth < HTTP_HEADERS_MAX; cnth++) {
if (!http.header_name[cnth][0])
break;
len = strlen(msg);
if(msg_size - len - 1 > (int)strlen(http.header_name[cnth]) + (int)strlen(http.header_value[cnth]) + 32)
snprintf(msg + len, msg_size - len, ", \"%s\": \"%s\"", http.header_name[cnth], http.header_value[cnth]);
}
if(strlen(http.body)) {
len = strlen(msg);
if(msg_size - len - 1 > (int)strlen(http.body) + 32)
snprintf(msg + len , msg_size - len, ", \"_Body\": \"%s\"", http.body);
}
len = strlen(msg);
if(msg_size - len - 1 > (int)strlen(http.body) + 32)
snprintf(msg + len , msg_size - len, ", \"_Body\": \"%s\"", http.body);
snprintf(msg + len, msg_size - len, "}");
}
len = strlen(msg);
snprintf(msg + len, msg_size - len, "}");
} else if (app_msg->type == APP_HTTP) {
if(idx > 1)
msg = mkjson(MKJ_OBJ, 1, J_STRING, "_Exception", "HTTP Message Fragmentation");
else
msg = mkjson(MKJ_OBJ, 1, J_STRING, "_Exception", "ERROR");
fprintf(stderr, "Invalid http message with len %u and index %u out of %u discarded: %s\n", len, idx, app_msg->cnt, msg_http);
} else if (app_msg->type == APP_HTTP) {
msg = mkjson(MKJ_OBJ, 1, J_STRING, "_Exception", "HTTP Message Decoder");
}

if(cntm < mc) {
if(app_tx_msg_cnt < APP_MSG_MAX)
if(msg) {
if(cntm < mc)
app_tx_msg[app_tx_msg_cnt++] = msg;
else
break;
}
else {
if(app_rx_msg_cnt < APP_MSG_MAX)
app_rx_msg[app_rx_msg_cnt++] = msg;
else
break;
}
else
fprintf(stderr, "Ignored %s application message with index %u out of %u\n", app_msg->type == APP_HTTP ?
"http" : (app_msg->type == APP_DNS ? "dns" : "unknown"), idx, app_msg->cnt);
}

/* tx and rx message list */
Expand Down Expand Up @@ -1733,7 +1731,7 @@ static char *mkjson_prettify(const char *s, char *r) {

/* iterate over JSON string.*/
for (const char *x = s; *x != '\0'; x++) {
if (*x == '"')
if (*x == '"' && (x == s || *(x - 1) != '\\'))
quoted = !quoted;
if (quoted) {
*r++ = *x;
Expand Down

0 comments on commit 80c1a1d

Please sign in to comment.