这是一份 RDMA Write 的 demo,实现了这样的功能:

  • 发送端向接收端 RDMA Write,并通过 SEND/RECV 通知接收端写入完成
  • 发送端向接收端 RDMA Write,并通过内存 Doorbell 通知接收端写入完成

解析见注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
#define _GNU_SOURCE
#include <arpa/inet.h>
#include <endian.h>
#include <errno.h>
#include <infiniband/verbs.h>
#include <netdb.h>
#include <rdma/rdma_cma.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#define BUF_SIZE 4096
#define CQ_DEPTH 16
#define TIMEOUT_MS 2000
#define DOORBELL_VALUE 1

/*
* 连接建立时通过 RDMA CM private_data 交换的信息。
*
* RDMA WRITE/READ 是单边操作:发起端不需要对端 post_recv,
* 但必须知道对端注册内存的虚拟地址 addr 和访问钥匙 rkey。
* 这里除了数据 buffer,也额外暴露一个 doorbell buffer,用来演示
* “写远端内存里的 flag”这种通知方式。
*/
struct remote_info {
uint64_t addr_be;
uint32_t rkey_be;
uint32_t size_be;
uint64_t doorbell_addr_be;
uint32_t doorbell_rkey_be;
uint32_t doorbell_size_be;
} __attribute__((packed));

enum notify_mode {
NOTIFY_SEND,
NOTIFY_DOORBELL,
};

/*
* 一条 RDMA 连接对应一个 RC QP。QP 不是“一条消息用一次”,
* 而是在连接生命周期内反复复用:
*
* - SEND / RDMA_WRITE / RDMA_READ 这类请求进入 Send Queue
* - RECV buffer 进入 Receive Queue
* - 已完成的 signaled WR 从 Completion Queue 里 poll 出来
*
* 真正高性能程序会维护 buffer pool、credit/window 和 outstanding WR。
*/
struct conn {
struct rdma_cm_id *id;
struct ibv_pd *pd;
struct ibv_cq *cq;
struct ibv_qp *qp;

char *buf;
struct ibv_mr *mr;

char *recv_buf;
struct ibv_mr *recv_mr;

char *send_buf;
struct ibv_mr *send_mr;

uint64_t *doorbell;
struct ibv_mr *doorbell_mr;
};

static void die(const char *msg)
{
perror(msg);
exit(1);
}

static uint64_t htonll_u64(uint64_t x)
{
#if __BYTE_ORDER == __LITTLE_ENDIAN
return __builtin_bswap64(x);
#else
return x;
#endif
}

static uint64_t ntohll_u64(uint64_t x)
{
return htonll_u64(x);
}

static void alloc_aligned(void **ptr, size_t size)
{
int ret = posix_memalign(ptr, 4096, size);
if (ret) {
errno = ret;
die("posix_memalign");
}
}

/*
* 创建 PD/CQ/QP。max_*_wr 表示队列里最多同时挂多少个未完成 WQE,
* 不是这条连接总共能发多少消息。完成后 WQE 槽位会被释放并复用。
*/
static void build_qp(struct conn *c)
{
c->pd = ibv_alloc_pd(c->id->verbs);
if (!c->pd) die("ibv_alloc_pd");

c->cq = ibv_create_cq(c->id->verbs, CQ_DEPTH, NULL, NULL, 0);
if (!c->cq) die("ibv_create_cq");

struct ibv_qp_init_attr attr;
memset(&attr, 0, sizeof(attr));
attr.send_cq = c->cq;
attr.recv_cq = c->cq;
attr.qp_type = IBV_QPT_RC;
attr.cap.max_send_wr = 8;
attr.cap.max_recv_wr = 8;
attr.cap.max_send_sge = 1;
attr.cap.max_recv_sge = 1;

if (rdma_create_qp(c->id, c->pd, &attr))
die("rdma_create_qp");

c->qp = c->id->qp;
}

/*
* 所有参与 RDMA 的内存都要先注册成 MR,网卡才能 DMA 访问。
*
* c->buf 是真正被 RDMA WRITE 写入的数据区,因此带 REMOTE_WRITE。
* c->recv_buf / c->send_buf 只用于普通 SEND/RECV 控制消息。
* c->doorbell 是一个远端可写的小 flag/counter,用来演示无 RECV WQE
* 的完成通知。
*/
static void reg_buffers(struct conn *c)
{
alloc_aligned((void **)&c->buf, BUF_SIZE);
alloc_aligned((void **)&c->recv_buf, 128);
alloc_aligned((void **)&c->send_buf, 128);
alloc_aligned((void **)&c->doorbell, 4096);

memset(c->buf, 0, BUF_SIZE);
memset(c->recv_buf, 0, 128);
memset(c->send_buf, 0, 128);
memset(c->doorbell, 0, 4096);

c->mr = ibv_reg_mr(
c->pd,
c->buf,
BUF_SIZE,
IBV_ACCESS_LOCAL_WRITE |
IBV_ACCESS_REMOTE_WRITE |
IBV_ACCESS_REMOTE_READ
);
if (!c->mr) die("ibv_reg_mr data");

c->recv_mr = ibv_reg_mr(c->pd, c->recv_buf, 128, IBV_ACCESS_LOCAL_WRITE);
if (!c->recv_mr) die("ibv_reg_mr recv");

c->send_mr = ibv_reg_mr(c->pd, c->send_buf, 128, IBV_ACCESS_LOCAL_WRITE);
if (!c->send_mr) die("ibv_reg_mr send");

c->doorbell_mr = ibv_reg_mr(
c->pd,
c->doorbell,
4096,
IBV_ACCESS_LOCAL_WRITE |
IBV_ACCESS_REMOTE_WRITE |
IBV_ACCESS_REMOTE_READ
);
if (!c->doorbell_mr) die("ibv_reg_mr doorbell");
}

/*
* SEND/RECV 是双边语义:sender 发 SEND 之前,receiver 必须先 post_recv,
* 让接收端 RNIC 知道“下一个 SEND 到了该放进哪块内存”。
*
* 如果没有提前 post_recv,RC QP 上可能触发 RNR NAK,sender 会重试,
* 重试耗尽后 SEND 失败。RDMA WRITE/READ 不消耗 Receive Queue,
* 所以这里的 RECV 只服务于 NOTIFY_SEND 模式下的完成通知。
*/
static void post_recv(struct conn *c)
{
struct ibv_sge sge;
memset(&sge, 0, sizeof(sge));
sge.addr = (uintptr_t)c->recv_buf;
sge.length = 128;
sge.lkey = c->recv_mr->lkey;

struct ibv_recv_wr wr;
struct ibv_recv_wr *bad = NULL;
memset(&wr, 0, sizeof(wr));
wr.wr_id = 100;
wr.sg_list = &sge;
wr.num_sge = 1;

if (ibv_post_recv(c->qp, &wr, &bad))
die("ibv_post_recv");
}

/*
* 双边控制面通知。对端必须已经 post_recv,才能收到这个 SEND。
* 本 demo 用它发送 “RDMA_WRITE_DONE”,告诉 server 可以读取 c->buf 了。
*/
static void post_send(struct conn *c, const char *msg)
{
snprintf(c->send_buf, 128, "%s", msg);

struct ibv_sge sge;
memset(&sge, 0, sizeof(sge));
sge.addr = (uintptr_t)c->send_buf;
sge.length = strlen(c->send_buf) + 1;
sge.lkey = c->send_mr->lkey;

struct ibv_send_wr wr;
struct ibv_send_wr *bad = NULL;
memset(&wr, 0, sizeof(wr));
wr.wr_id = 200;
wr.opcode = IBV_WR_SEND;
wr.sg_list = &sge;
wr.num_sge = 1;
wr.send_flags = IBV_SEND_SIGNALED;

if (ibv_post_send(c->qp, &wr, &bad))
die("ibv_post_send");
}

/*
* 单边数据面:本端 RNIC 根据 remote_addr + rkey 直接写远端 MR。
*
* 对端应用不会因为内存被 RDMA WRITE 写了就自动得到 CQE;
* completion 只出现在发起端的 CQ 上。因此真实系统通常还需要
* SEND、doorbell、轮询 flag 或其他协议来通知“数据已经到达”。
*/
static void post_rdma_write_raw(struct conn *c, void *local_addr, uint32_t length,
uint32_t lkey, uint64_t remote_addr,
uint32_t rkey, uint64_t wr_id)
{
struct ibv_sge sge;
memset(&sge, 0, sizeof(sge));
sge.addr = (uintptr_t)local_addr;
sge.length = length;
sge.lkey = lkey;

struct ibv_send_wr wr;
struct ibv_send_wr *bad = NULL;
memset(&wr, 0, sizeof(wr));
wr.wr_id = wr_id;
wr.opcode = IBV_WR_RDMA_WRITE;
wr.sg_list = &sge;
wr.num_sge = 1;
wr.send_flags = IBV_SEND_SIGNALED;
wr.wr.rdma.remote_addr = remote_addr;
wr.wr.rdma.rkey = rkey;

if (ibv_post_send(c->qp, &wr, &bad))
die("ibv_post_send RDMA_WRITE");
}

static void post_rdma_write(struct conn *c, uint64_t remote_addr, uint32_t rkey, const char *msg)
{
snprintf(c->buf, BUF_SIZE, "%s", msg);
post_rdma_write_raw(c, c->buf, strlen(c->buf) + 1, c->mr->lkey,
remote_addr, rkey, 300);
}

/*
* doorbell 通知:client 不发 SEND,而是再做一次很小的 RDMA WRITE,
* 把 server 暴露出来的 doorbell flag 改成 DOORBELL_VALUE。
*
* 好处是接收端不需要 RECV WQE,也不会产生 RECV CQE;代价是接收端
* CPU 要轮询这块内存。很多高性能系统会用 ring + tail/seq 扩展这个模式。
*/
static void post_doorbell_write(struct conn *c, uint64_t remote_addr, uint32_t rkey)
{
*c->doorbell = htonll_u64(DOORBELL_VALUE);
post_rdma_write_raw(c, c->doorbell, sizeof(*c->doorbell),
c->doorbell_mr->lkey, remote_addr, rkey, 400);
}

/*
* 轮询 CQ,获取本端 signaled WR 的完成事件。
*
* 注意区分:rdma_get_cm_event 管连接管理事件,比如地址解析、连接建立;
* ibv_poll_cq 管数据路径完成事件,比如 SEND 完成、RECV 完成、RDMA_WRITE 完成。
*/
static void poll_one(struct conn *c, const char *what)
{
struct ibv_wc wc;

while (1) {
int n = ibv_poll_cq(c->cq, 1, &wc);
if (n < 0) die("ibv_poll_cq");
if (n == 0) continue;

if (wc.status != IBV_WC_SUCCESS) {
fprintf(stderr, "%s failed: status=%s opcode=%d\n",
what, ibv_wc_status_str(wc.status), wc.opcode);
exit(1);
}

return;
}
}

/*
* server 同时支持两种完成通知:
*
* - NOTIFY_SEND:client 发普通 SEND,server 从 CQ poll 到 RECV completion
* - NOTIFY_DOORBELL:client RDMA_WRITE server 的 doorbell,server 轮询内存
*
* RDMA_WRITE payload 本身不会让 server CQ 出现 completion,所以这里等的是
* “完成通知”,不是 payload 写入本身。
*/
static enum notify_mode wait_for_done(struct conn *c)
{
volatile uint64_t *doorbell = (volatile uint64_t *)c->doorbell;
struct ibv_wc wc;

while (1) {
int n = ibv_poll_cq(c->cq, 1, &wc);
if (n < 0) die("ibv_poll_cq");

if (n > 0) {
if (wc.status != IBV_WC_SUCCESS) {
fprintf(stderr, "recv done message failed: status=%s opcode=%d\n",
ibv_wc_status_str(wc.status), wc.opcode);
exit(1);
}
return NOTIFY_SEND;
}

if (ntohll_u64(*doorbell) == DOORBELL_VALUE) {
__sync_synchronize();
return NOTIFY_DOORBELL;
}
}
}

static struct addrinfo *resolve_addr(const char *host, const char *port, int passive)
{
struct addrinfo hints;
struct addrinfo *res = NULL;

memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = passive ? AI_PASSIVE : 0;

int ret = getaddrinfo(host, port, &hints, &res);
if (ret) {
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(ret));
exit(1);
}

return res;
}

static int parse_notify_mode(const char *arg, enum notify_mode *mode)
{
if (!strcmp(arg, "send")) {
*mode = NOTIFY_SEND;
return 0;
}

if (!strcmp(arg, "doorbell")) {
*mode = NOTIFY_DOORBELL;
return 0;
}

return -1;
}

static const char *notify_mode_name(enum notify_mode mode)
{
return mode == NOTIFY_DOORBELL ? "doorbell" : "send";
}

static void run_server(const char *port)
{
struct rdma_event_channel *ec;
struct rdma_cm_id *listener;
struct rdma_cm_event *event;
struct addrinfo *addr;

ec = rdma_create_event_channel();
if (!ec) die("rdma_create_event_channel");

if (rdma_create_id(ec, &listener, NULL, RDMA_PS_TCP))
die("rdma_create_id");

addr = resolve_addr(NULL, port, 1);

if (rdma_bind_addr(listener, addr->ai_addr))
die("rdma_bind_addr");

if (rdma_listen(listener, 1))
die("rdma_listen");

printf("[server] listening on port %s\n", port);

if (rdma_get_cm_event(ec, &event))
die("rdma_get_cm_event");

if (event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
fprintf(stderr, "unexpected event: %d\n", event->event);
exit(1);
}

struct conn c;
memset(&c, 0, sizeof(c));
c.id = event->id;

/*
* ack CM event 只是释放这个 event 对象,不等于接受连接,也不是网络 ACK。
* 真正让连接继续建立的是后面的 rdma_accept。
*
* 因此这里先 ack event、再建 QP / 注册内存 / post_recv,不会造成
* client 抢先 SEND 的 race condition。
*/
rdma_ack_cm_event(event);

build_qp(&c);
reg_buffers(&c);

/*
* 在 rdma_accept 之前提前 post_recv。连接一旦 ESTABLISHED,client 可能
* 马上 SEND 通知;提前准备 RECV WQE 可以避免 receiver not ready。
*/
post_recv(&c);

struct remote_info info;
info.addr_be = htonll_u64((uintptr_t)c.buf);
info.rkey_be = htonl(c.mr->rkey);
info.size_be = htonl(BUF_SIZE);
info.doorbell_addr_be = htonll_u64((uintptr_t)c.doorbell);
info.doorbell_rkey_be = htonl(c.doorbell_mr->rkey);
info.doorbell_size_be = htonl(sizeof(*c.doorbell));

struct rdma_conn_param param;
memset(&param, 0, sizeof(param));
/*
* private_data 是建连时顺手携带的一小段控制面数据。这里用它把 server
* 的 data buffer 和 doorbell buffer 的 addr/rkey 交给 client。
*/
param.private_data = &info;
param.private_data_len = sizeof(info);
param.responder_resources = 1;
param.initiator_depth = 1;

/*
* rdma_accept 才是接受连接的协议动作。它之后 client 才会收到
* RDMA_CM_EVENT_ESTABLISHED,并开始真正的数据路径操作。
*/
if (rdma_accept(c.id, &param))
die("rdma_accept");

if (rdma_get_cm_event(ec, &event))
die("rdma_get_cm_event established");

if (event->event != RDMA_CM_EVENT_ESTABLISHED) {
fprintf(stderr, "unexpected event after accept: %d\n", event->event);
exit(1);
}

rdma_ack_cm_event(event);

printf("[server] connected\n");
printf("[server] waiting for client RDMA WRITE notification...\n");

enum notify_mode notify = wait_for_done(&c);

if (notify == NOTIFY_SEND)
printf("[server] received normal SEND: %s\n", c.recv_buf);
else
printf("[server] received doorbell value: %lu\n",
ntohll_u64(*c.doorbell));
printf("[server] RDMA-written buffer: %s\n", c.buf);

rdma_disconnect(c.id);

rdma_destroy_qp(c.id);
ibv_dereg_mr(c.mr);
ibv_dereg_mr(c.recv_mr);
ibv_dereg_mr(c.send_mr);
ibv_dereg_mr(c.doorbell_mr);
ibv_dealloc_pd(c.pd);
rdma_destroy_id(c.id);
rdma_destroy_id(listener);
rdma_destroy_event_channel(ec);
freeaddrinfo(addr);
}

static void run_client(const char *server_ip, const char *port, const char *src_ip,
enum notify_mode notify)
{
struct rdma_event_channel *ec;
struct rdma_cm_id *id;
struct rdma_cm_event *event;
struct addrinfo *dst;
struct addrinfo *src = NULL;

ec = rdma_create_event_channel();
if (!ec) die("rdma_create_event_channel");

if (rdma_create_id(ec, &id, NULL, RDMA_PS_TCP))
die("rdma_create_id");

dst = resolve_addr(server_ip, port, 0);
if (src_ip)
src = resolve_addr(src_ip, "0", 0);

/*
* RDMA CM 负责连接管理:resolve addr、resolve route、connect、established。
* 这些事件通过 rdma_get_cm_event 取;后面的 SEND/RDMA_WRITE 完成仍然
* 要通过 ibv_poll_cq 取。
*/
if (rdma_resolve_addr(id,
src ? src->ai_addr : NULL,
dst->ai_addr,
TIMEOUT_MS))
die("rdma_resolve_addr");

if (rdma_get_cm_event(ec, &event))
die("rdma_get_cm_event addr");

if (event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
fprintf(stderr, "unexpected addr event: %d\n", event->event);
exit(1);
}

rdma_ack_cm_event(event);

struct conn c;
memset(&c, 0, sizeof(c));
c.id = id;

build_qp(&c);
reg_buffers(&c);

if (rdma_resolve_route(id, TIMEOUT_MS))
die("rdma_resolve_route");

if (rdma_get_cm_event(ec, &event))
die("rdma_get_cm_event route");

if (event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
fprintf(stderr, "unexpected route event: %d\n", event->event);
exit(1);
}

rdma_ack_cm_event(event);

struct rdma_conn_param param;
memset(&param, 0, sizeof(param));
param.responder_resources = 1;
param.initiator_depth = 1;

if (rdma_connect(id, &param))
die("rdma_connect");

if (rdma_get_cm_event(ec, &event))
die("rdma_get_cm_event established");

if (event->event != RDMA_CM_EVENT_ESTABLISHED) {
fprintf(stderr, "unexpected established event: %d\n", event->event);
exit(1);
}

printf("[client] private_data_len=%u expected=%zu\n",
event->param.conn.private_data_len,
sizeof(struct remote_info));

if (event->param.conn.private_data == NULL ||
event->param.conn.private_data_len < sizeof(struct remote_info)) {
fprintf(stderr, "server private_data missing or too small\n");
exit(1);
}

struct remote_info info;
memset(&info, 0, sizeof(info));
memcpy(&info, event->param.conn.private_data, sizeof(info));

uint64_t remote_addr = ntohll_u64(info.addr_be);
uint32_t rkey = ntohl(info.rkey_be);
uint32_t size = ntohl(info.size_be);
uint64_t doorbell_addr = ntohll_u64(info.doorbell_addr_be);
uint32_t doorbell_rkey = ntohl(info.doorbell_rkey_be);
uint32_t doorbell_size = ntohl(info.doorbell_size_be);

rdma_ack_cm_event(event);

printf("[client] connected\n");
printf("[client] remote addr=0x%lx rkey=0x%x size=%u\n",
remote_addr, rkey, size);
printf("[client] remote doorbell addr=0x%lx rkey=0x%x size=%u notify=%s\n",
doorbell_addr, doorbell_rkey, doorbell_size, notify_mode_name(notify));

/*
* 真正的数据传输走 RDMA WRITE:client 直接把 payload 写进 server 的 c->buf。
* 下面的 SEND 或 doorbell 只负责通知 server “写完了”。
*/
post_rdma_write(&c, remote_addr, rkey,
"hello from 108-41 client via RDMA WRITE");
poll_one(&c, "RDMA_WRITE completion");

printf("[client] RDMA WRITE completed\n");

if (notify == NOTIFY_SEND) {
post_send(&c, "RDMA_WRITE_DONE");
poll_one(&c, "SEND completion");
} else {
post_doorbell_write(&c, doorbell_addr, doorbell_rkey);
poll_one(&c, "doorbell RDMA_WRITE completion");
}

printf("[client] sent %s notification\n", notify_mode_name(notify));

rdma_disconnect(id);

rdma_destroy_qp(id);
ibv_dereg_mr(c.mr);
ibv_dereg_mr(c.recv_mr);
ibv_dereg_mr(c.send_mr);
ibv_dereg_mr(c.doorbell_mr);
ibv_dealloc_pd(c.pd);
rdma_destroy_id(id);
rdma_destroy_event_channel(ec);

freeaddrinfo(dst);
if (src) freeaddrinfo(src);
}

int main(int argc, char **argv)
{
if (argc < 3) {
fprintf(stderr,
"Usage:\n"
" server: %s server <port>\n"
" client: %s client <server_ip> <port> [client_src_ip] [send|doorbell]\n"
" client: %s client <server_ip> <port> [send|doorbell]\n",
argv[0], argv[0], argv[0]);
return 1;
}

if (!strcmp(argv[1], "server")) {
run_server(argv[2]);
} else if (!strcmp(argv[1], "client")) {
if (argc < 4) {
fprintf(stderr, "client mode needs server_ip and port\n");
return 1;
}
const char *src_ip = NULL;
enum notify_mode notify = NOTIFY_SEND;

if (argc >= 5) {
if (parse_notify_mode(argv[4], &notify))
src_ip = argv[4];
}

if (argc >= 6 && parse_notify_mode(argv[5], &notify)) {
fprintf(stderr, "unknown notify mode: %s\n", argv[5]);
return 1;
}

run_client(argv[2], argv[3], src_ip, notify);
} else {
fprintf(stderr, "unknown mode: %s\n", argv[1]);
return 1;
}

return 0;
}