long blogs

进一步有进一步惊喜


  • Home
  • Archive
  • Tags
  •  

© 2025 long

Theme Typography by Makito

Proudly published with Hexo

KCP网络

Posted at 2021-01-25 网络 kcp 

简介

地址,一句话描述,牺牲带宽占用来降低延迟。

KCP 源码注释

头文件ikcp.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
//=====================================================================
//
// KCP - A Better ARQ Protocol Implementation
// skywind3000 (at) gmail.com, 2010-2011
//
// Features:
// + Average RTT reduce 30% - 40% vs traditional ARQ like tcp.
// + Maximum RTT reduce three times vs tcp.
// + Lightweight, distributed as a single source file.
//
//=====================================================================
#ifndef __IKCP_H__
#define __IKCP_H__

#include <stddef.h>
#include <stdlib.h>
#include <assert.h>


//=====================================================================
// 32BIT INTEGER DEFINITION
//=====================================================================
#ifndef __INTEGER_32_BITS__
#define __INTEGER_32_BITS__
#if defined(_WIN64) || defined(WIN64) || defined(__amd64__) || \
defined(__x86_64) || defined(__x86_64__) || defined(_M_IA64) || \
defined(_M_AMD64)
typedef unsigned int ISTDUINT32;
typedef int ISTDINT32;
#elif defined(_WIN32) || defined(WIN32) || defined(__i386__) || \
defined(__i386) || defined(_M_X86)
typedef unsigned long ISTDUINT32;
typedef long ISTDINT32;
#elif defined(__MACOS__)
typedef UInt32 ISTDUINT32;
typedef SInt32 ISTDINT32;
#elif defined(__APPLE__) && defined(__MACH__)
#include <sys/types.h>
typedef u_int32_t ISTDUINT32;
typedef int32_t ISTDINT32;
#elif defined(__BEOS__)
#include <sys/inttypes.h>
typedef u_int32_t ISTDUINT32;
typedef int32_t ISTDINT32;
#elif (defined(_MSC_VER) || defined(__BORLANDC__)) && (!defined(__MSDOS__))
typedef unsigned __int32 ISTDUINT32;
typedef __int32 ISTDINT32;
#elif defined(__GNUC__)
#include <stdint.h>
typedef uint32_t ISTDUINT32;
typedef int32_t ISTDINT32;
#else
typedef unsigned long ISTDUINT32;
typedef long ISTDINT32;
#endif
#endif


//=====================================================================
// Integer Definition
//=====================================================================
#ifndef __IINT8_DEFINED
#define __IINT8_DEFINED
typedef char IINT8;
#endif

#ifndef __IUINT8_DEFINED
#define __IUINT8_DEFINED
typedef unsigned char IUINT8;
#endif

#ifndef __IUINT16_DEFINED
#define __IUINT16_DEFINED
typedef unsigned short IUINT16;
#endif

#ifndef __IINT16_DEFINED
#define __IINT16_DEFINED
typedef short IINT16;
#endif

#ifndef __IINT32_DEFINED
#define __IINT32_DEFINED
typedef ISTDINT32 IINT32;
#endif

#ifndef __IUINT32_DEFINED
#define __IUINT32_DEFINED
typedef ISTDUINT32 IUINT32;
#endif

#ifndef __IINT64_DEFINED
#define __IINT64_DEFINED
#if defined(_MSC_VER) || defined(__BORLANDC__)
typedef __int64 IINT64;
#else
typedef long long IINT64;
#endif
#endif

#ifndef __IUINT64_DEFINED
#define __IUINT64_DEFINED
#if defined(_MSC_VER) || defined(__BORLANDC__)
typedef unsigned __int64 IUINT64;
#else
typedef unsigned long long IUINT64;
#endif
#endif

#ifndef INLINE
#if defined(__GNUC__)

#if (__GNUC__ > 3) || ((__GNUC__ == 3) && (__GNUC_MINOR__ >= 1))
#define INLINE __inline__ __attribute__((always_inline))
#else
#define INLINE __inline__
#endif

#elif (defined(_MSC_VER) || defined(__BORLANDC__) || defined(__WATCOMC__))
#define INLINE __inline
#else
#define INLINE
#endif
#endif

#if (!defined(__cplusplus)) && (!defined(inline))
#define inline INLINE
#endif


//=====================================================================
// QUEUE DEFINITION
//=====================================================================
#ifndef __IQUEUE_DEF__
#define __IQUEUE_DEF__

struct IQUEUEHEAD {
struct IQUEUEHEAD *next, *prev;
};

typedef struct IQUEUEHEAD iqueue_head;


//---------------------------------------------------------------------
// queue init
//---------------------------------------------------------------------
#define IQUEUE_HEAD_INIT(name) { &(name), &(name) }
#define IQUEUE_HEAD(name) \
struct IQUEUEHEAD name = IQUEUE_HEAD_INIT(name)

#define IQUEUE_INIT(ptr) ( \
(ptr)->next = (ptr), (ptr)->prev = (ptr))

#define IOFFSETOF(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER)

#define ICONTAINEROF(ptr, type, member) ( \
(type*)( ((char*)((type*)ptr)) - IOFFSETOF(type, member)) )

#define IQUEUE_ENTRY(ptr, type, member) ICONTAINEROF(ptr, type, member)


//---------------------------------------------------------------------
// queue operation
//---------------------------------------------------------------------
#define IQUEUE_ADD(node, head) ( \
(node)->prev = (head), (node)->next = (head)->next, \
(head)->next->prev = (node), (head)->next = (node))

#define IQUEUE_ADD_TAIL(node, head) ( \
(node)->prev = (head)->prev, (node)->next = (head), \
(head)->prev->next = (node), (head)->prev = (node))

#define IQUEUE_DEL_BETWEEN(p, n) ((n)->prev = (p), (p)->next = (n))

#define IQUEUE_DEL(entry) (\
(entry)->next->prev = (entry)->prev, \
(entry)->prev->next = (entry)->next, \
(entry)->next = 0, (entry)->prev = 0)

#define IQUEUE_DEL_INIT(entry) do { \
IQUEUE_DEL(entry); IQUEUE_INIT(entry); } while (0)

#define IQUEUE_IS_EMPTY(entry) ((entry) == (entry)->next)

#define iqueue_init IQUEUE_INIT
#define iqueue_entry IQUEUE_ENTRY
#define iqueue_add IQUEUE_ADD
#define iqueue_add_tail IQUEUE_ADD_TAIL
#define iqueue_del IQUEUE_DEL
#define iqueue_del_init IQUEUE_DEL_INIT
#define iqueue_is_empty IQUEUE_IS_EMPTY

#define IQUEUE_FOREACH(iterator, head, TYPE, MEMBER) \
for ((iterator) = iqueue_entry((head)->next, TYPE, MEMBER); \
&((iterator)->MEMBER) != (head); \
(iterator) = iqueue_entry((iterator)->MEMBER.next, TYPE, MEMBER))

#define iqueue_foreach(iterator, head, TYPE, MEMBER) \
IQUEUE_FOREACH(iterator, head, TYPE, MEMBER)

#define iqueue_foreach_entry(pos, head) \
for( (pos) = (head)->next; (pos) != (head) ; (pos) = (pos)->next )


#define __iqueue_splice(list, head) do { \
iqueue_head *first = (list)->next, *last = (list)->prev; \
iqueue_head *at = (head)->next; \
(first)->prev = (head), (head)->next = (first); \
(last)->next = (at), (at)->prev = (last); } while (0)

#define iqueue_splice(list, head) do { \
if (!iqueue_is_empty(list)) __iqueue_splice(list, head); } while (0)

#define iqueue_splice_init(list, head) do { \
iqueue_splice(list, head); iqueue_init(list); } while (0)


#ifdef _MSC_VER
#pragma warning(disable:4311)
#pragma warning(disable:4312)
#pragma warning(disable:4996)
#endif

#endif


//---------------------------------------------------------------------
// BYTE ORDER & ALIGNMENT
//---------------------------------------------------------------------
#ifndef IWORDS_BIG_ENDIAN
#ifdef _BIG_ENDIAN_
#if _BIG_ENDIAN_
#define IWORDS_BIG_ENDIAN 1
#endif
#endif
#ifndef IWORDS_BIG_ENDIAN
#if defined(__hppa__) || \
defined(__m68k__) || defined(mc68000) || defined(_M_M68K) || \
(defined(__MIPS__) && defined(__MIPSEB__)) || \
defined(__ppc__) || defined(__POWERPC__) || defined(_M_PPC) || \
defined(__sparc__) || defined(__powerpc__) || \
defined(__mc68000__) || defined(__s390x__) || defined(__s390__)
#define IWORDS_BIG_ENDIAN 1
#endif
#endif
#ifndef IWORDS_BIG_ENDIAN
#define IWORDS_BIG_ENDIAN 0
#endif
#endif

#ifndef IWORDS_MUST_ALIGN
#if defined(__i386__) || defined(__i386) || defined(_i386_)
#define IWORDS_MUST_ALIGN 0
#elif defined(_M_IX86) || defined(_X86_) || defined(__x86_64__)
#define IWORDS_MUST_ALIGN 0
#elif defined(__amd64) || defined(__amd64__)
#define IWORDS_MUST_ALIGN 0
#else
#define IWORDS_MUST_ALIGN 1
#endif
#endif


//=====================================================================
// SEGMENT
//=====================================================================
struct IKCPSEG
{
// 链表节点
struct IQUEUEHEAD node;

// 会话ID,发送方和接收方的会话必须要一致才能收发
IUINT32 conv;

// 报文类型,
// ICKP_CMD_PUSH(数据报文)、
// IKCP_CMD_ACK(确认报文)、
// IKCP_CMD_WASK(窗口探测报文),询问对端剩余接收窗口大小
// IKCP_CMD_WINS(窗口通知报文),
IUINT32 cmd;

// 分片数量,表示后面还有多少报文属于同一个包,0表示最后一个包,应用层的数据会被分包
IUINT32 frg;

// 发送方剩余接收窗口大小, 接收窗口大小-接收窗口队列大小。发送方发送窗口不能大于接收方接收窗口大小
IUINT32 wnd;

// 发送的时间戳
IUINT32 ts;

// 报文的编号,按1累加,和frg有啥区别。该编号用于ack
IUINT32 sn;

// 待接收的消息的编号,ack相关,表示我想要收数据报的sn=una的没有收到。
// 未发生丢包的话,una为下一个可接收数据包序号。当前收到的sn = 10, una = 11。
IUINT32 una;

// 数据data的长度
IUINT32 len;

// 下一次超时重发的时间戳
IUINT32 resendts;

// 该分片超时重传的等待时间
IUINT32 rto;

// ack被跳过大小,会影响重传。如果发送1,2,3,4,接收到1,3,4。那么2的数据包已经被跳过2次了(4 - 2 = 2)
IUINT32 fastack;

// 发送该分片的次数
IUINT32 xmit;

// data指针
char data[1];
};


//---------------------------------------------------------------------
// IKCPCB
//---------------------------------------------------------------------
struct IKCPCB
{
// conv : 会话标识
// mtu : 最大传输单元(Maximum Transmission Unit)
// mss : 最大报文段大小, mss = mtu - 包头长度(24)
// state : 连接状态, 0 连接建立, -1(0xffffffff) 断开连接
IUINT32 conv, mtu, mss, state;

// snd_una : 发送缓冲区中最小还未确认送达的报文段标号,比它小的已经确认送达了
// snd_nxt : 下一个等待发送的报文段的编号
// rcv_nxt : 下一个等待接收的报文段的编号
IUINT32 snd_una, snd_nxt, rcv_nxt;

// ts_recent : 未使用
// ts_lastack : 未使用
// ssthresh : 慢启动阈值Slow Start Threshold
IUINT32 ts_recent, ts_lastack, ssthresh;


// rx_rttval、rx_srtt、rx_minrto 计算出 => rx_rto
IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto;

// snd_wnd : 发送窗口大小
// rcv_wnd : 接收窗口大小
// rmt_wnd : 对端剩余接收窗口大小
// cwnd : congestion window 拥塞窗口,用于拥塞控制
// probe : 是否要发送控制报文的标志
IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe;

// current : 当前时间
// interval : flush的时间粒度,多长进行flush
// ts_flush : 下一次flush时间
// xmit : 该链接超时重传的总次数
IUINT32 current, interval, ts_flush, xmit;

// nrcv_buf、nsnd_buf 接收发送缓冲区长度
IUINT32 nrcv_buf, nsnd_buf;

// 接收/发送队列长度
IUINT32 nrcv_que, nsnd_que;

// nodelay : 使用启用快速模式
// updated : 使用调用过ikcp_update
IUINT32 nodelay, updated;

// ts_probe、probe_wait :用于确定何时需要发送窗口询问报文
IUINT32 ts_probe, probe_wait;

// dead_link : 当一个报文发送超时次数达到deal_link次,则可以认为连接断开 (state = 0xffffffff)
// incr : 用于计算cwnd
IUINT32 dead_link, incr;

// 发送队列
struct IQUEUEHEAD snd_queue;

// 接收队列
struct IQUEUEHEAD rcv_queue;

// 发送缓冲区,能在发送缓冲区的数据
// 1. 新加入snd_buf中的,从未发送过的报文
// 2. 发送过,在RTO内未收到ACK报文
// 3. 发送过,ACK失序报文,需要执行快速重传
struct IQUEUEHEAD snd_buf;

// 接收缓冲区
struct IQUEUEHEAD rcv_buf;

// ACK列表,待发送的ACK的信息会存在列表中,flush时一并发送
IUINT32 *acklist;

// ACK列表长度
IUINT32 ackcount;

// ACK列表容量
IUINT32 ackblock;

// 自定义的数据
void *user;

// flush 临时缓冲区
char *buffer;

// ACK失序次数大于等于fastresend,触发快速重传
int fastresend;

// 传输次数小于fastlimit,执行快速重传
int fastlimit;

// nocwnd : 是否不考虑拥塞窗口
// stream : 是否开启流模式,开启后可能会粘包
int nocwnd, stream;

// 用于控制日志
int logmask;

// 下层传输协议输出函数
int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user);

// 日志函数
void (*writelog)(const char *log, struct IKCPCB *kcp, void *user);
};


typedef struct IKCPCB ikcpcb;

#define IKCP_LOG_OUTPUT 1
#define IKCP_LOG_INPUT 2
#define IKCP_LOG_SEND 4
#define IKCP_LOG_RECV 8
#define IKCP_LOG_IN_DATA 16
#define IKCP_LOG_IN_ACK 32
#define IKCP_LOG_IN_PROBE 64
#define IKCP_LOG_IN_WINS 128
#define IKCP_LOG_OUT_DATA 256
#define IKCP_LOG_OUT_ACK 512
#define IKCP_LOG_OUT_PROBE 1024
#define IKCP_LOG_OUT_WINS 2048

#ifdef __cplusplus
extern "C" {
#endif

//---------------------------------------------------------------------
// interface
//---------------------------------------------------------------------

// create a new kcp control object, 'conv' must equal in two endpoint
// from the same connection. 'user' will be passed to the output callback
// output callback can be setup like this: 'kcp->output = my_udp_output'
ikcpcb* ikcp_create(IUINT32 conv, void *user);

// release kcp control object
void ikcp_release(ikcpcb *kcp);

// set output callback, which will be invoked by kcp
void ikcp_setoutput(ikcpcb *kcp, int (*output)(const char *buf, int len,
ikcpcb *kcp, void *user));

// user/upper level recv: returns size, returns below zero for EAGAIN
// 用户上层调用的接收数据的函数,这个函数得到的数据是通过kcp算法处理的
// 可以认为该数据一定是对端发送成功的,
int ikcp_recv(ikcpcb *kcp, char *buffer, int len);

// user/upper level send, returns below zero for error
// 用户上层调用的发送数据的函数,该函数内创建报文实例,将报文放入sdn_queue中
// 错误小于0
int ikcp_send(ikcpcb *kcp, const char *buffer, int len);

// update state (call it repeatedly, every 10ms-100ms), or you can ask
// ikcp_check when to call it again (without ikcp_input/_send calling).
// 'current' - current timestamp in millisec.
// 更新函数,需要不断重复调用该函数,用来驱动数据的收发。会调用ikcp_flush函数进行数据发送
// 会根据传进来的时间和上一次进行flush的时间进行比较,需要满足
// 当前时间戳(current) - 上一次flush的时间戳(kcp->ts_flush) >= 0,
// 则说明需要发送了
// 将kcp->ts_flush += kcp->interval之后 和 kcp->current进行比较。
// 如果current还是大过kcp->ts_flush,将kcp->ts_flush设置为kcp->current + kcp->interval
// 为什么要这样?需要控制ikcp_flush发送间隔,
// 如果调用ikcp_update的时间间隔太短(距离上一次调用间隔小于interval时间),就不会触发ikcp_flush了
// 我要怎么知道我调用update的时机?
// 通过调用ickp_check函数可以知道下一次调用update生效的时间距离现在多久
void ikcp_update(ikcpcb *kcp, IUINT32 current);

// Determine when should you invoke ikcp_update:
// returns when you should invoke ikcp_update in millisec, if there
// is no ikcp_input/_send calling. you can call ikcp_update in that
// time, instead of call update repeatly.
// Important to reduce unnacessary ikcp_update invoking. use it to
// schedule ikcp_update (eg. implementing an epoll-like mechanism,
// or optimize ikcp_update when handling massive kcp connections)
IUINT32 ikcp_check(const ikcpcb *kcp, IUINT32 current);

// when you received a low level packet (eg. UDP packet), call it
// 接收到数据之后需要调用该函数
// 如果是UDP的话,调用recvfrom之后得到的数据需要调用这个函数
int ikcp_input(ikcpcb *kcp, const char *data, long size);

// flush pending data
// 该函数会完成如下动作
// + 发送ACK列表中的ACK
//
// + 检查是否需要发送窗口探测和通知报文
//
// + 计算拥塞窗口大小
// 从自身的发送窗口大小和远端接收窗口中选择最小的作为拥塞窗口
//
// + 根据上一步计算得到的拥塞窗口,从发送队列中取出符合拥塞发送窗口大小的数据放入发送缓冲区kcp->snd_buf中,
// 上一步计算是为了这一步取出的数据大小能够发送到对端
//
// + 从kcp->snd_buf中取出需要先发送的数据
// 新报文,需要重传的报文,ACK失序的报文
//
// + 将kcp->snd_buf中剩余的数据发送出去
//
// + 根据丢包计算ssthresh和cwnd
//
// 数据发送的优先级:
// 1. ACK报文
// 2. 响应对端窗口询问报文
// 3. 询问对端窗口大小报文
// 4. 需要先发送的数据报文
// 5. 剩余的报文
//
void ikcp_flush(ikcpcb *kcp);

// check the size of next message in the recv queue
int ikcp_peeksize(const ikcpcb *kcp);

// change MTU size, default is 1400
// 更改KCP的MTU,注意MTU不能小于50或小于24,
// 其中24为IKCP_OVERHEAD,KCP协议的头部大小,小过这个值,这个协议没法用的。
// 为什么不能小于50呢??
// 设置MTU会影响切片的分包,最大分片大小kcp->mss = kcp->mtu - IKCP_OVERHEAD
int ikcp_setmtu(ikcpcb *kcp, int mtu);

// set maximum window size: sndwnd=32, rcvwnd=32 by default
// 设置发送和接收窗口大小
// 发送窗口>0就行,但是接收窗口大小有要求,必须>=128(IKCP_WND_RCV),传小了被设成IKCP_WND_RCV
int ikcp_wndsize(ikcpcb *kcp, int sndwnd, int rcvwnd);

// get how many packet is waiting to be sent
// 获得当前等待发送数据量大小:发送队列大小 + 发送缓冲区大小
int ikcp_waitsnd(const ikcpcb *kcp);

// fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
// nodelay: 0:disable(default), 1:enable
// interval: internal update timer interval in millisec, default is 100ms
// resend: 0:disable fast resend(default), 1:enable fast resend
// nc: 0:normal congestion control(default), 1:disable congestion control
// 设置ikcp的模式
int ikcp_nodelay(ikcpcb *kcp, int nodelay, int interval, int resend, int nc);


// ikcp_log日志模块,用来查看日志kcp的日志,
// 使用前需要先设置kcp->writelog函数
void ikcp_log(ikcpcb *kcp, int mask, const char *fmt, ...);

// setup allocator
// 设置内存分配和释放器,默认使用malloc
void ikcp_allocator(void* (*new_malloc)(size_t), void (*new_free)(void*));

// read conv
// 可以传入UDP接收到数据,会返回数据包的conv值,
// 可以在调用ikcp_input的时候做不同的过滤,只把符合kcp协议的包放入
// 可以用来融合其他协议
IUINT32 ikcp_getconv(const void *ptr);


#ifdef __cplusplus
}
#endif

#endif


源文件ickp.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
//=====================================================================
//
// KCP - A Better ARQ Protocol Implementation
// skywind3000 (at) gmail.com, 2010-2011
//
// Features:
// + Average RTT reduce 30% - 40% vs traditional ARQ like tcp.
// + Maximum RTT reduce three times vs tcp.
// + Lightweight, distributed as a single source file.
//
//=====================================================================
#include "ikcp.h"

#include <stddef.h>
#include <stdlib.h>
#include <string.h>
#include <stdarg.h>
#include <stdio.h>



//=====================================================================
// KCP BASIC
//=====================================================================
const IUINT32 IKCP_RTO_NDL = 30; // no delay min rto
const IUINT32 IKCP_RTO_MIN = 100; // normal min rto
const IUINT32 IKCP_RTO_DEF = 200;
const IUINT32 IKCP_RTO_MAX = 60000;
const IUINT32 IKCP_CMD_PUSH = 81; // cmd: push data
const IUINT32 IKCP_CMD_ACK = 82; // cmd: ack
const IUINT32 IKCP_CMD_WASK = 83; // cmd: window probe (ask)
const IUINT32 IKCP_CMD_WINS = 84; // cmd: window size (tell)
const IUINT32 IKCP_ASK_SEND = 1; // need to send IKCP_CMD_WASK
const IUINT32 IKCP_ASK_TELL = 2; // need to send IKCP_CMD_WINS
const IUINT32 IKCP_WND_SND = 32;
const IUINT32 IKCP_WND_RCV = 128; // must >= max fragment size
const IUINT32 IKCP_MTU_DEF = 1400;
const IUINT32 IKCP_ACK_FAST = 3;
const IUINT32 IKCP_INTERVAL = 100;
const IUINT32 IKCP_OVERHEAD = 24;
const IUINT32 IKCP_DEADLINK = 20;
const IUINT32 IKCP_THRESH_INIT = 2;
const IUINT32 IKCP_THRESH_MIN = 2;
const IUINT32 IKCP_PROBE_INIT = 7000; // 7 secs to probe window size
const IUINT32 IKCP_PROBE_LIMIT = 120000; // up to 120 secs to probe window
const IUINT32 IKCP_FASTACK_LIMIT = 5; // max times to trigger fastack


//---------------------------------------------------------------------
// encode / decode
//---------------------------------------------------------------------

/* encode 8 bits unsigned int */
static inline char *ikcp_encode8u(char *p, unsigned char c)
{
*(unsigned char*)p++ = c;
return p;
}

/* decode 8 bits unsigned int */
static inline const char *ikcp_decode8u(const char *p, unsigned char *c)
{
*c = *(unsigned char*)p++;
return p;
}

/* encode 16 bits unsigned int (lsb) */
static inline char *ikcp_encode16u(char *p, unsigned short w)
{
#if IWORDS_BIG_ENDIAN || IWORDS_MUST_ALIGN
*(unsigned char*)(p + 0) = (w & 255);
*(unsigned char*)(p + 1) = (w >> 8);
#else
memcpy(p, &w, 2);
#endif
p += 2;
return p;
}

/* decode 16 bits unsigned int (lsb) */
static inline const char *ikcp_decode16u(const char *p, unsigned short *w)
{
#if IWORDS_BIG_ENDIAN || IWORDS_MUST_ALIGN
*w = *(const unsigned char*)(p + 1);
*w = *(const unsigned char*)(p + 0) + (*w << 8);
#else
memcpy(w, p, 2);
#endif
p += 2;
return p;
}

/* encode 32 bits unsigned int (lsb) */
static inline char *ikcp_encode32u(char *p, IUINT32 l)
{
#if IWORDS_BIG_ENDIAN || IWORDS_MUST_ALIGN
*(unsigned char*)(p + 0) = (unsigned char)((l >> 0) & 0xff);
*(unsigned char*)(p + 1) = (unsigned char)((l >> 8) & 0xff);
*(unsigned char*)(p + 2) = (unsigned char)((l >> 16) & 0xff);
*(unsigned char*)(p + 3) = (unsigned char)((l >> 24) & 0xff);
#else
memcpy(p, &l, 4);
#endif
p += 4;
return p;
}

/* decode 32 bits unsigned int (lsb) */
static inline const char *ikcp_decode32u(const char *p, IUINT32 *l)
{
#if IWORDS_BIG_ENDIAN || IWORDS_MUST_ALIGN
*l = *(const unsigned char*)(p + 3);
*l = *(const unsigned char*)(p + 2) + (*l << 8);
*l = *(const unsigned char*)(p + 1) + (*l << 8);
*l = *(const unsigned char*)(p + 0) + (*l << 8);
#else
memcpy(l, p, 4);
#endif
p += 4;
return p;
}

static inline IUINT32 _imin_(IUINT32 a, IUINT32 b) {
return a <= b ? a : b;
}

static inline IUINT32 _imax_(IUINT32 a, IUINT32 b) {
return a >= b ? a : b;
}

static inline IUINT32 _ibound_(IUINT32 lower, IUINT32 middle, IUINT32 upper)
{
return _imin_(_imax_(lower, middle), upper);
}

static inline long _itimediff(IUINT32 later, IUINT32 earlier)
{
return ((IINT32)(later - earlier));
}

//---------------------------------------------------------------------
// manage segment
//---------------------------------------------------------------------
typedef struct IKCPSEG IKCPSEG;

static void* (*ikcp_malloc_hook)(size_t) = NULL;
static void (*ikcp_free_hook)(void *) = NULL;

// internal malloc
static void* ikcp_malloc(size_t size) {
if (ikcp_malloc_hook)
return ikcp_malloc_hook(size);
return malloc(size);
}

// internal free
static void ikcp_free(void *ptr) {
if (ikcp_free_hook) {
ikcp_free_hook(ptr);
} else {
free(ptr);
}
}

// redefine allocator
void ikcp_allocator(void* (*new_malloc)(size_t), void (*new_free)(void*))
{
ikcp_malloc_hook = new_malloc;
ikcp_free_hook = new_free;
}

// allocate a new kcp segment
static IKCPSEG* ikcp_segment_new(ikcpcb *kcp, int size)
{
return (IKCPSEG*)ikcp_malloc(sizeof(IKCPSEG) + size);
}

// delete a segment
static void ikcp_segment_delete(ikcpcb *kcp, IKCPSEG *seg)
{
ikcp_free(seg);
}

// write log
void ikcp_log(ikcpcb *kcp, int mask, const char *fmt, ...)
{
char buffer[1024];
va_list argptr;
if ((mask & kcp->logmask) == 0 || kcp->writelog == 0) return;
va_start(argptr, fmt);
vsprintf(buffer, fmt, argptr);
va_end(argptr);
kcp->writelog(buffer, kcp, kcp->user);
}

// check log mask
static int ikcp_canlog(const ikcpcb *kcp, int mask)
{
if ((mask & kcp->logmask) == 0 || kcp->writelog == NULL) return 0;
return 1;
}

// output segment
static int ikcp_output(ikcpcb *kcp, const void *data, int size)
{
assert(kcp);
assert(kcp->output);
if (ikcp_canlog(kcp, IKCP_LOG_OUTPUT)) {
ikcp_log(kcp, IKCP_LOG_OUTPUT, "[RO] %ld bytes", (long)size);
}
if (size == 0) return 0;
return kcp->output((const char*)data, size, kcp, kcp->user);
}

// output queue
void ikcp_qprint(const char *name, const struct IQUEUEHEAD *head)
{
#if 0
const struct IQUEUEHEAD *p;
printf("<%s>: [", name);
for (p = head->next; p != head; p = p->next) {
const IKCPSEG *seg = iqueue_entry(p, const IKCPSEG, node);
printf("(%lu %d)", (unsigned long)seg->sn, (int)(seg->ts % 10000));
if (p->next != head) printf(",");
}
printf("]\n");
#endif
}


//---------------------------------------------------------------------
// create a new kcpcb
//---------------------------------------------------------------------
ikcpcb* ikcp_create(IUINT32 conv, void *user)
{
ikcpcb *kcp = (ikcpcb*)ikcp_malloc(sizeof(struct IKCPCB));
if (kcp == NULL) return NULL;
kcp->conv = conv;
kcp->user = user;
kcp->snd_una = 0;
kcp->snd_nxt = 0;
kcp->rcv_nxt = 0;
kcp->ts_recent = 0;
kcp->ts_lastack = 0;
kcp->ts_probe = 0;
kcp->probe_wait = 0;
kcp->snd_wnd = IKCP_WND_SND;
kcp->rcv_wnd = IKCP_WND_RCV;
kcp->rmt_wnd = IKCP_WND_RCV;
kcp->cwnd = 0;
kcp->incr = 0;
kcp->probe = 0;
kcp->mtu = IKCP_MTU_DEF;
kcp->mss = kcp->mtu - IKCP_OVERHEAD;
kcp->stream = 0;

kcp->buffer = (char*)ikcp_malloc((kcp->mtu + IKCP_OVERHEAD) * 3);
if (kcp->buffer == NULL) {
ikcp_free(kcp);
return NULL;
}

iqueue_init(&kcp->snd_queue);
iqueue_init(&kcp->rcv_queue);
iqueue_init(&kcp->snd_buf);
iqueue_init(&kcp->rcv_buf);
kcp->nrcv_buf = 0;
kcp->nsnd_buf = 0;
kcp->nrcv_que = 0;
kcp->nsnd_que = 0;
kcp->state = 0;
kcp->acklist = NULL;
kcp->ackblock = 0;
kcp->ackcount = 0;
kcp->rx_srtt = 0;
kcp->rx_rttval = 0;
kcp->rx_rto = IKCP_RTO_DEF;
kcp->rx_minrto = IKCP_RTO_MIN;
kcp->current = 0;
kcp->interval = IKCP_INTERVAL;
kcp->ts_flush = IKCP_INTERVAL;
kcp->nodelay = 0;
kcp->updated = 0;
kcp->logmask = 0;
kcp->ssthresh = IKCP_THRESH_INIT;
kcp->fastresend = 0;
kcp->fastlimit = IKCP_FASTACK_LIMIT;
kcp->nocwnd = 0;
kcp->xmit = 0;
kcp->dead_link = IKCP_DEADLINK;
kcp->output = NULL;
kcp->writelog = NULL;

return kcp;
}


//---------------------------------------------------------------------
// release a new kcpcb
//---------------------------------------------------------------------
void ikcp_release(ikcpcb *kcp)
{
assert(kcp);
if (kcp) {
IKCPSEG *seg;
while (!iqueue_is_empty(&kcp->snd_buf)) {
seg = iqueue_entry(kcp->snd_buf.next, IKCPSEG, node);
iqueue_del(&seg->node);
ikcp_segment_delete(kcp, seg);
}
while (!iqueue_is_empty(&kcp->rcv_buf)) {
seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
iqueue_del(&seg->node);
ikcp_segment_delete(kcp, seg);
}
while (!iqueue_is_empty(&kcp->snd_queue)) {
seg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);
iqueue_del(&seg->node);
ikcp_segment_delete(kcp, seg);
}
while (!iqueue_is_empty(&kcp->rcv_queue)) {
seg = iqueue_entry(kcp->rcv_queue.next, IKCPSEG, node);
iqueue_del(&seg->node);
ikcp_segment_delete(kcp, seg);
}
if (kcp->buffer) {
ikcp_free(kcp->buffer);
}
if (kcp->acklist) {
ikcp_free(kcp->acklist);
}

kcp->nrcv_buf = 0;
kcp->nsnd_buf = 0;
kcp->nrcv_que = 0;
kcp->nsnd_que = 0;
kcp->ackcount = 0;
kcp->buffer = NULL;
kcp->acklist = NULL;
ikcp_free(kcp);
}
}


//---------------------------------------------------------------------
// set output callback, which will be invoked by kcp
//---------------------------------------------------------------------
void ikcp_setoutput(ikcpcb *kcp, int (*output)(const char *buf, int len,
ikcpcb *kcp, void *user))
{
kcp->output = output;
}


//---------------------------------------------------------------------
// user/upper level recv: returns size, returns below zero for EAGAIN
//---------------------------------------------------------------------
int ikcp_recv(ikcpcb *kcp, char *buffer, int len)
{
struct IQUEUEHEAD *p;
int ispeek = (len < 0)? 1 : 0;
int peeksize;
int recover = 0;
IKCPSEG *seg;
assert(kcp);

if (iqueue_is_empty(&kcp->rcv_queue))
return -1;

if (len < 0) len = -len;

peeksize = ikcp_peeksize(kcp);

if (peeksize < 0)
return -2;

if (peeksize > len)
return -3;

if (kcp->nrcv_que >= kcp->rcv_wnd)
recover = 1;

// merge fragment
for (len = 0, p = kcp->rcv_queue.next; p != &kcp->rcv_queue; ) {
int fragment;
seg = iqueue_entry(p, IKCPSEG, node);
p = p->next;

if (buffer) {
memcpy(buffer, seg->data, seg->len);
buffer += seg->len;
}

len += seg->len;
fragment = seg->frg;

if (ikcp_canlog(kcp, IKCP_LOG_RECV)) {
ikcp_log(kcp, IKCP_LOG_RECV, "recv sn=%lu", (unsigned long)seg->sn);
}

if (ispeek == 0) {
iqueue_del(&seg->node);
ikcp_segment_delete(kcp, seg);
kcp->nrcv_que--;
}

if (fragment == 0)
break;
}

assert(len == peeksize);

// move available data from rcv_buf -> rcv_queue
while (! iqueue_is_empty(&kcp->rcv_buf)) {
seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) {
iqueue_del(&seg->node);
kcp->nrcv_buf--;
iqueue_add_tail(&seg->node, &kcp->rcv_queue);
kcp->nrcv_que++;
kcp->rcv_nxt++;
} else {
break;
}
}

// fast recover
if (kcp->nrcv_que < kcp->rcv_wnd && recover) {
// ready to send back IKCP_CMD_WINS in ikcp_flush
// tell remote my window size
kcp->probe |= IKCP_ASK_TELL;
}

return len;
}


//---------------------------------------------------------------------
// peek data size
//---------------------------------------------------------------------
int ikcp_peeksize(const ikcpcb *kcp)
{
struct IQUEUEHEAD *p;
IKCPSEG *seg;
int length = 0;

assert(kcp);

if (iqueue_is_empty(&kcp->rcv_queue)) return -1;

seg = iqueue_entry(kcp->rcv_queue.next, IKCPSEG, node);
if (seg->frg == 0) return seg->len;

if (kcp->nrcv_que < seg->frg + 1) return -1;

for (p = kcp->rcv_queue.next; p != &kcp->rcv_queue; p = p->next) {
seg = iqueue_entry(p, IKCPSEG, node);
length += seg->len;
if (seg->frg == 0) break;
}

return length;
}


//---------------------------------------------------------------------
// user/upper level send, returns below zero for error
//---------------------------------------------------------------------
int ikcp_send(ikcpcb *kcp, const char *buffer, int len)
{
IKCPSEG *seg;
int count, i;

assert(kcp->mss > 0);
if (len < 0) return -1;

// append to previous segment in streaming mode (if possible)
if (kcp->stream != 0) {
if (!iqueue_is_empty(&kcp->snd_queue)) {
IKCPSEG *old = iqueue_entry(kcp->snd_queue.prev, IKCPSEG, node);
if (old->len < kcp->mss) {
int capacity = kcp->mss - old->len;
int extend = (len < capacity)? len : capacity;
seg = ikcp_segment_new(kcp, old->len + extend);
assert(seg);
if (seg == NULL) {
return -2;
}
iqueue_add_tail(&seg->node, &kcp->snd_queue);
memcpy(seg->data, old->data, old->len);
if (buffer) {
memcpy(seg->data + old->len, buffer, extend);
buffer += extend;
}
seg->len = old->len + extend;
seg->frg = 0;
len -= extend;
iqueue_del_init(&old->node);
ikcp_segment_delete(kcp, old);
}
}
if (len <= 0) {
return 0;
}
}

if (len <= (int)kcp->mss) count = 1;
else count = (len + kcp->mss - 1) / kcp->mss;

if (count >= (int)IKCP_WND_RCV) return -2;

if (count == 0) count = 1;

// fragment
for (i = 0; i < count; i++) {
int size = len > (int)kcp->mss ? (int)kcp->mss : len;
seg = ikcp_segment_new(kcp, size);
assert(seg);
if (seg == NULL) {
return -2;
}
if (buffer && len > 0) {
memcpy(seg->data, buffer, size);
}
seg->len = size;
seg->frg = (kcp->stream == 0)? (count - i - 1) : 0;
iqueue_init(&seg->node);
iqueue_add_tail(&seg->node, &kcp->snd_queue);
kcp->nsnd_que++;
if (buffer) {
buffer += size;
}
len -= size;
}

return 0;
}


//---------------------------------------------------------------------
// parse ack
//---------------------------------------------------------------------
static void ikcp_update_ack(ikcpcb *kcp, IINT32 rtt)
{
IINT32 rto = 0;
if (kcp->rx_srtt == 0) {
kcp->rx_srtt = rtt;
kcp->rx_rttval = rtt / 2;
} else {
long delta = rtt - kcp->rx_srtt;
if (delta < 0) delta = -delta;
kcp->rx_rttval = (3 * kcp->rx_rttval + delta) / 4;
kcp->rx_srtt = (7 * kcp->rx_srtt + rtt) / 8;
if (kcp->rx_srtt < 1) kcp->rx_srtt = 1;
}
rto = kcp->rx_srtt + _imax_(kcp->interval, 4 * kcp->rx_rttval);
kcp->rx_rto = _ibound_(kcp->rx_minrto, rto, IKCP_RTO_MAX);
}

static void ikcp_shrink_buf(ikcpcb *kcp)
{
struct IQUEUEHEAD *p = kcp->snd_buf.next;
if (p != &kcp->snd_buf) {
IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node);
kcp->snd_una = seg->sn;
} else {
kcp->snd_una = kcp->snd_nxt;
}
}

static void ikcp_parse_ack(ikcpcb *kcp, IUINT32 sn)
{
struct IQUEUEHEAD *p, *next;

if (_itimediff(sn, kcp->snd_una) < 0 || _itimediff(sn, kcp->snd_nxt) >= 0)
return;

for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = next) {
IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node);
next = p->next;
if (sn == seg->sn) {
iqueue_del(p);
ikcp_segment_delete(kcp, seg);
kcp->nsnd_buf--;
break;
}
if (_itimediff(sn, seg->sn) < 0) {
break;
}
}
}

static void ikcp_parse_una(ikcpcb *kcp, IUINT32 una)
{
struct IQUEUEHEAD *p, *next;
for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = next) {
IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node);
next = p->next;
if (_itimediff(una, seg->sn) > 0) {
iqueue_del(p);
ikcp_segment_delete(kcp, seg);
kcp->nsnd_buf--;
} else {
break;
}
}
}

static void ikcp_parse_fastack(ikcpcb *kcp, IUINT32 sn, IUINT32 ts)
{
struct IQUEUEHEAD *p, *next;

if (_itimediff(sn, kcp->snd_una) < 0 || _itimediff(sn, kcp->snd_nxt) >= 0)
return;

for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = next) {
IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node);
next = p->next;
if (_itimediff(sn, seg->sn) < 0) {
break;
}
else if (sn != seg->sn) {
#ifndef IKCP_FASTACK_CONSERVE
seg->fastack++;
#else
if (_itimediff(ts, seg->ts) >= 0)
seg->fastack++;
#endif
}
}
}


//---------------------------------------------------------------------
// ack append
//---------------------------------------------------------------------
static void ikcp_ack_push(ikcpcb *kcp, IUINT32 sn, IUINT32 ts)
{
IUINT32 newsize = kcp->ackcount + 1;
IUINT32 *ptr;

if (newsize > kcp->ackblock) {
IUINT32 *acklist;
IUINT32 newblock;

for (newblock = 8; newblock < newsize; newblock <<= 1);
acklist = (IUINT32*)ikcp_malloc(newblock * sizeof(IUINT32) * 2);

if (acklist == NULL) {
assert(acklist != NULL);
abort();
}

if (kcp->acklist != NULL) {
IUINT32 x;
for (x = 0; x < kcp->ackcount; x++) {
acklist[x * 2 + 0] = kcp->acklist[x * 2 + 0];
acklist[x * 2 + 1] = kcp->acklist[x * 2 + 1];
}
ikcp_free(kcp->acklist);
}

kcp->acklist = acklist;
kcp->ackblock = newblock;
}

ptr = &kcp->acklist[kcp->ackcount * 2];
ptr[0] = sn;
ptr[1] = ts;
kcp->ackcount++;
}

static void ikcp_ack_get(const ikcpcb *kcp, int p, IUINT32 *sn, IUINT32 *ts)
{
if (sn) sn[0] = kcp->acklist[p * 2 + 0];
if (ts) ts[0] = kcp->acklist[p * 2 + 1];
}


//---------------------------------------------------------------------
// parse data
//---------------------------------------------------------------------
void ikcp_parse_data(ikcpcb *kcp, IKCPSEG *newseg)
{
struct IQUEUEHEAD *p, *prev;
IUINT32 sn = newseg->sn;
int repeat = 0;

if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) >= 0 ||
_itimediff(sn, kcp->rcv_nxt) < 0) {
ikcp_segment_delete(kcp, newseg);
return;
}

for (p = kcp->rcv_buf.prev; p != &kcp->rcv_buf; p = prev) {
IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node);
prev = p->prev;
if (seg->sn == sn) {
repeat = 1;
break;
}
if (_itimediff(sn, seg->sn) > 0) {
break;
}
}

if (repeat == 0) {
iqueue_init(&newseg->node);
iqueue_add(&newseg->node, p);
kcp->nrcv_buf++;
} else {
ikcp_segment_delete(kcp, newseg);
}

#if 0
ikcp_qprint("rcvbuf", &kcp->rcv_buf);
printf("rcv_nxt=%lu\n", kcp->rcv_nxt);
#endif

// move available data from rcv_buf -> rcv_queue
while (! iqueue_is_empty(&kcp->rcv_buf)) {
IKCPSEG *seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) {
iqueue_del(&seg->node);
kcp->nrcv_buf--;
iqueue_add_tail(&seg->node, &kcp->rcv_queue);
kcp->nrcv_que++;
kcp->rcv_nxt++;
} else {
break;
}
}

#if 0
ikcp_qprint("queue", &kcp->rcv_queue);
printf("rcv_nxt=%lu\n", kcp->rcv_nxt);
#endif

#if 1
// printf("snd(buf=%d, queue=%d)\n", kcp->nsnd_buf, kcp->nsnd_que);
// printf("rcv(buf=%d, queue=%d)\n", kcp->nrcv_buf, kcp->nrcv_que);
#endif
}


//---------------------------------------------------------------------
// input data
//---------------------------------------------------------------------
int ikcp_input(ikcpcb *kcp, const char *data, long size)
{
IUINT32 prev_una = kcp->snd_una;
IUINT32 maxack = 0, latest_ts = 0;
int flag = 0;

if (ikcp_canlog(kcp, IKCP_LOG_INPUT)) {
ikcp_log(kcp, IKCP_LOG_INPUT, "[RI] %d bytes", (int)size);
}

if (data == NULL || (int)size < (int)IKCP_OVERHEAD) return -1;

while (1) {
IUINT32 ts, sn, len, una, conv;
IUINT16 wnd;
IUINT8 cmd, frg;
IKCPSEG *seg;

if (size < (int)IKCP_OVERHEAD) break;

data = ikcp_decode32u(data, &conv);
if (conv != kcp->conv) return -1;

data = ikcp_decode8u(data, &cmd);
data = ikcp_decode8u(data, &frg);
data = ikcp_decode16u(data, &wnd);
data = ikcp_decode32u(data, &ts);
data = ikcp_decode32u(data, &sn);
data = ikcp_decode32u(data, &una);
data = ikcp_decode32u(data, &len);

size -= IKCP_OVERHEAD;

if ((long)size < (long)len || (int)len < 0) return -2;

if (cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK &&
cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS)
return -3;

kcp->rmt_wnd = wnd;
ikcp_parse_una(kcp, una);
ikcp_shrink_buf(kcp);

if (cmd == IKCP_CMD_ACK) {
if (_itimediff(kcp->current, ts) >= 0) {
ikcp_update_ack(kcp, _itimediff(kcp->current, ts));
}
ikcp_parse_ack(kcp, sn);
ikcp_shrink_buf(kcp);
if (flag == 0) {
flag = 1;
maxack = sn;
latest_ts = ts;
} else {
if (_itimediff(sn, maxack) > 0) {
#ifndef IKCP_FASTACK_CONSERVE
maxack = sn;
latest_ts = ts;
#else
if (_itimediff(ts, latest_ts) > 0) {
maxack = sn;
latest_ts = ts;
}
#endif
}
}
if (ikcp_canlog(kcp, IKCP_LOG_IN_ACK)) {
ikcp_log(kcp, IKCP_LOG_IN_ACK,
"input ack: sn=%lu rtt=%ld rto=%ld", (unsigned long)sn,
(long)_itimediff(kcp->current, ts),
(long)kcp->rx_rto);
}
}
else if (cmd == IKCP_CMD_PUSH) {
if (ikcp_canlog(kcp, IKCP_LOG_IN_DATA)) {
ikcp_log(kcp, IKCP_LOG_IN_DATA,
"input psh: sn=%lu ts=%lu", (unsigned long)sn, (unsigned long)ts);
}
if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0) {
ikcp_ack_push(kcp, sn, ts);
if (_itimediff(sn, kcp->rcv_nxt) >= 0) {
seg = ikcp_segment_new(kcp, len);
seg->conv = conv;
seg->cmd = cmd;
seg->frg = frg;
seg->wnd = wnd;
seg->ts = ts;
seg->sn = sn;
seg->una = una;
seg->len = len;

if (len > 0) {
memcpy(seg->data, data, len);
}

ikcp_parse_data(kcp, seg);
}
}
}
else if (cmd == IKCP_CMD_WASK) {
// ready to send back IKCP_CMD_WINS in ikcp_flush
// tell remote my window size
kcp->probe |= IKCP_ASK_TELL;
if (ikcp_canlog(kcp, IKCP_LOG_IN_PROBE)) {
ikcp_log(kcp, IKCP_LOG_IN_PROBE, "input probe");
}
}
else if (cmd == IKCP_CMD_WINS) {
// do nothing
if (ikcp_canlog(kcp, IKCP_LOG_IN_WINS)) {
ikcp_log(kcp, IKCP_LOG_IN_WINS,
"input wins: %lu", (unsigned long)(wnd));
}
}
else {
return -3;
}

data += len;
size -= len;
}

if (flag != 0) {
ikcp_parse_fastack(kcp, maxack, latest_ts);
}

if (_itimediff(kcp->snd_una, prev_una) > 0) {
if (kcp->cwnd < kcp->rmt_wnd) {
IUINT32 mss = kcp->mss;
if (kcp->cwnd < kcp->ssthresh) {
kcp->cwnd++;
kcp->incr += mss;
} else {
if (kcp->incr < mss) kcp->incr = mss;
kcp->incr += (mss * mss) / kcp->incr + (mss / 16);
if ((kcp->cwnd + 1) * mss <= kcp->incr) {
#if 1
kcp->cwnd = (kcp->incr + mss - 1) / ((mss > 0)? mss : 1);
#else
kcp->cwnd++;
#endif
}
}
if (kcp->cwnd > kcp->rmt_wnd) {
kcp->cwnd = kcp->rmt_wnd;
kcp->incr = kcp->rmt_wnd * mss;
}
}
}

return 0;
}


//---------------------------------------------------------------------
// ikcp_encode_seg
//---------------------------------------------------------------------
static char *ikcp_encode_seg(char *ptr, const IKCPSEG *seg)
{
ptr = ikcp_encode32u(ptr, seg->conv);
ptr = ikcp_encode8u(ptr, (IUINT8)seg->cmd);
ptr = ikcp_encode8u(ptr, (IUINT8)seg->frg);
ptr = ikcp_encode16u(ptr, (IUINT16)seg->wnd);
ptr = ikcp_encode32u(ptr, seg->ts);
ptr = ikcp_encode32u(ptr, seg->sn);
ptr = ikcp_encode32u(ptr, seg->una);
ptr = ikcp_encode32u(ptr, seg->len);
return ptr;
}

static int ikcp_wnd_unused(const ikcpcb *kcp)
{
if (kcp->nrcv_que < kcp->rcv_wnd) {
return kcp->rcv_wnd - kcp->nrcv_que;
}
return 0;
}


//---------------------------------------------------------------------
// ikcp_flush
//---------------------------------------------------------------------
void ikcp_flush(ikcpcb *kcp)
{
IUINT32 current = kcp->current;
char *buffer = kcp->buffer;
char *ptr = buffer;
int count, size, i;
IUINT32 resent, cwnd;
IUINT32 rtomin;
struct IQUEUEHEAD *p;
int change = 0;
int lost = 0;
IKCPSEG seg;

// 'ikcp_update' haven't been called.
if (kcp->updated == 0) return;

seg.conv = kcp->conv;
seg.cmd = IKCP_CMD_ACK;
seg.frg = 0;
seg.wnd = ikcp_wnd_unused(kcp);
seg.una = kcp->rcv_nxt;
seg.len = 0;
seg.sn = 0;
seg.ts = 0;

// flush acknowledges
count = kcp->ackcount;
for (i = 0; i < count; i++) {
size = (int)(ptr - buffer);
if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
ikcp_ack_get(kcp, i, &seg.sn, &seg.ts);
ptr = ikcp_encode_seg(ptr, &seg);
}

kcp->ackcount = 0;

// probe window size (if remote window size equals zero)
if (kcp->rmt_wnd == 0) {
if (kcp->probe_wait == 0) {
kcp->probe_wait = IKCP_PROBE_INIT;
kcp->ts_probe = kcp->current + kcp->probe_wait;
}
else {
if (_itimediff(kcp->current, kcp->ts_probe) >= 0) {
if (kcp->probe_wait < IKCP_PROBE_INIT)
kcp->probe_wait = IKCP_PROBE_INIT;
kcp->probe_wait += kcp->probe_wait / 2;
if (kcp->probe_wait > IKCP_PROBE_LIMIT)
kcp->probe_wait = IKCP_PROBE_LIMIT;
kcp->ts_probe = kcp->current + kcp->probe_wait;
kcp->probe |= IKCP_ASK_SEND;
}
}
} else {
kcp->ts_probe = 0;
kcp->probe_wait = 0;
}

// flush window probing commands
if (kcp->probe & IKCP_ASK_SEND) {
seg.cmd = IKCP_CMD_WASK;
size = (int)(ptr - buffer);
if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
ptr = ikcp_encode_seg(ptr, &seg);
}

// flush window probing commands
if (kcp->probe & IKCP_ASK_TELL) {
seg.cmd = IKCP_CMD_WINS;
size = (int)(ptr - buffer);
if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
ptr = ikcp_encode_seg(ptr, &seg);
}

kcp->probe = 0;

// calculate window size
cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd);
if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd);

// move data from snd_queue to snd_buf
while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
IKCPSEG *newseg;
if (iqueue_is_empty(&kcp->snd_queue)) break;

newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);

iqueue_del(&newseg->node);
iqueue_add_tail(&newseg->node, &kcp->snd_buf);
kcp->nsnd_que--;
kcp->nsnd_buf++;

newseg->conv = kcp->conv;
newseg->cmd = IKCP_CMD_PUSH;
newseg->wnd = seg.wnd;
newseg->ts = current;
newseg->sn = kcp->snd_nxt++;
newseg->una = kcp->rcv_nxt;
newseg->resendts = current;
newseg->rto = kcp->rx_rto;
newseg->fastack = 0;
newseg->xmit = 0;
}

// calculate resent
resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff;
rtomin = (kcp->nodelay == 0)? (kcp->rx_rto >> 3) : 0;

// flush data segments
for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
int needsend = 0;
if (segment->xmit == 0) {
// 新的报文,需要发送
needsend = 1;
segment->xmit++;
segment->rto = kcp->rx_rto;
// 失败的话下一次重传时间 = 当前时间 + rto + rtomin
segment->resendts = current + segment->rto + rtomin;
}
else if (_itimediff(current, segment->resendts) >= 0) {
// current - segment->resendts >= 0
// 当前的segment已经到了重传的时间,需要发送
needsend = 1;
// 发送次数 + 1
segment->xmit++;
// 总的重传次数 + 1
kcp->xmit++;
// 计算该segment的RTO
if (kcp->nodelay == 0) {
segment->rto += _imax_(segment->rto, (IUINT32)kcp->rx_rto);
} else {
IINT32 step = (kcp->nodelay < 2)?
((IINT32)(segment->rto)) : kcp->rx_rto;
segment->rto += step / 2;
}
// 设置下一次重传的时间
segment->resendts = current + segment->rto;
lost = 1;
}
else if (segment->fastack >= resent) {
// 这个segment的ack已经失序了,出现了发送1,2,3,4 收到ack 1,3,4情况
// 编号2的segment的ack已经失序(4 - 2 = 2)次了
if ((int)segment->xmit <= kcp->fastlimit ||
kcp->fastlimit <= 0) {
// 该segment的发送次数小于快速重传次数或者需要快速重传
needsend = 1;
// 发送次数+1,
// 如果发现 segment->xmit > kcp->fastlimit了该segment会怎么处理
segment->xmit++;
segment->fastack = 0;
segment->resendts = current + segment->rto;
change++;
}
// 发现 segment->xmit > kcp->fastlimit了该segment会怎么处理,
// 这个segment不用快速重传了,留到后面发
}

if (needsend) {
// 该segment需要先发送
int need;
segment->ts = current;
segment->wnd = seg.wnd;
segment->una = kcp->rcv_nxt;

size = (int)(ptr - buffer);
need = IKCP_OVERHEAD + segment->len;

if (size + need > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}

ptr = ikcp_encode_seg(ptr, segment);

if (segment->len > 0) {
memcpy(ptr, segment->data, segment->len);
ptr += segment->len;
}

// 该segment的发送次超过限制了(默认20次),
// 出现这种情况可以认为网络出现了问题,一个包发送20几次都发送不出去是真的有问题
if (segment->xmit >= kcp->dead_link) {
// kcp->state = 0xffffffff
kcp->state = (IUINT32)-1;
}
}
}

// flash remain segments
size = (int)(ptr - buffer);
if (size > 0) {
ikcp_output(kcp, buffer, size);
}

// update ssthresh
if (change) {
IUINT32 inflight = kcp->snd_nxt - kcp->snd_una;
kcp->ssthresh = inflight / 2;
if (kcp->ssthresh < IKCP_THRESH_MIN)
kcp->ssthresh = IKCP_THRESH_MIN;
kcp->cwnd = kcp->ssthresh + resent;
kcp->incr = kcp->cwnd * kcp->mss;
}

if (lost) {
kcp->ssthresh = cwnd / 2;
if (kcp->ssthresh < IKCP_THRESH_MIN)
kcp->ssthresh = IKCP_THRESH_MIN;
kcp->cwnd = 1;
kcp->incr = kcp->mss;
}

if (kcp->cwnd < 1) {
kcp->cwnd = 1;
kcp->incr = kcp->mss;
}
}


//---------------------------------------------------------------------
// update state (call it repeatedly, every 10ms-100ms), or you can ask
// ikcp_check when to call it again (without ikcp_input/_send calling).
// 'current' - current timestamp in millisec.
//---------------------------------------------------------------------
void ikcp_update(ikcpcb *kcp, IUINT32 current)
{
IINT32 slap;

kcp->current = current;

if (kcp->updated == 0) {
kcp->updated = 1;
kcp->ts_flush = kcp->current;
}

slap = _itimediff(kcp->current, kcp->ts_flush);

if (slap >= 10000 || slap < -10000) {
kcp->ts_flush = kcp->current;
slap = 0;
}

if (slap >= 0) {
kcp->ts_flush += kcp->interval;
if (_itimediff(kcp->current, kcp->ts_flush) >= 0) {
kcp->ts_flush = kcp->current + kcp->interval;
}
ikcp_flush(kcp);
}
}


//---------------------------------------------------------------------
// Determine when should you invoke ikcp_update:
// returns when you should invoke ikcp_update in millisec, if there
// is no ikcp_input/_send calling. you can call ikcp_update in that
// time, instead of call update repeatly.
// Important to reduce unnacessary ikcp_update invoking. use it to
// schedule ikcp_update (eg. implementing an epoll-like mechanism,
// or optimize ikcp_update when handling massive kcp connections)
//---------------------------------------------------------------------
IUINT32 ikcp_check(const ikcpcb *kcp, IUINT32 current)
{
IUINT32 ts_flush = kcp->ts_flush;
IINT32 tm_flush = 0x7fffffff;
IINT32 tm_packet = 0x7fffffff;
IUINT32 minimal = 0;
struct IQUEUEHEAD *p;

if (kcp->updated == 0) {
return current;
}

if (_itimediff(current, ts_flush) >= 10000 ||
_itimediff(current, ts_flush) < -10000) {
ts_flush = current;
}

if (_itimediff(current, ts_flush) >= 0) {
return current;
}

tm_flush = _itimediff(ts_flush, current);

for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
const IKCPSEG *seg = iqueue_entry(p, const IKCPSEG, node);
IINT32 diff = _itimediff(seg->resendts, current);
if (diff <= 0) {
return current;
}
if (diff < tm_packet) tm_packet = diff;
}

minimal = (IUINT32)(tm_packet < tm_flush ? tm_packet : tm_flush);
if (minimal >= kcp->interval) minimal = kcp->interval;

return current + minimal;
}



int ikcp_setmtu(ikcpcb *kcp, int mtu)
{
char *buffer;
if (mtu < 50 || mtu < (int)IKCP_OVERHEAD)
return -1;
buffer = (char*)ikcp_malloc((mtu + IKCP_OVERHEAD) * 3);
if (buffer == NULL)
return -2;
kcp->mtu = mtu;
kcp->mss = kcp->mtu - IKCP_OVERHEAD;
ikcp_free(kcp->buffer);
kcp->buffer = buffer;
return 0;
}

int ikcp_interval(ikcpcb *kcp, int interval)
{
if (interval > 5000) interval = 5000;
else if (interval < 10) interval = 10;
kcp->interval = interval;
return 0;
}

int ikcp_nodelay(ikcpcb *kcp, int nodelay, int interval, int resend, int nc)
{
if (nodelay >= 0) {
kcp->nodelay = nodelay;
if (nodelay) {
kcp->rx_minrto = IKCP_RTO_NDL;
}
else {
kcp->rx_minrto = IKCP_RTO_MIN;
}
}
if (interval >= 0) {
if (interval > 5000) interval = 5000;
else if (interval < 10) interval = 10;
kcp->interval = interval;
}
if (resend >= 0) {
kcp->fastresend = resend;
}
if (nc >= 0) {
kcp->nocwnd = nc;
}
return 0;
}


int ikcp_wndsize(ikcpcb *kcp, int sndwnd, int rcvwnd)
{
if (kcp) {
if (sndwnd > 0) {
kcp->snd_wnd = sndwnd;
}
if (rcvwnd > 0) { // must >= max fragment size
// 接收大小需要大过128
kcp->rcv_wnd = _imax_(rcvwnd, IKCP_WND_RCV);
}
}
return 0;
}

int ikcp_waitsnd(const ikcpcb *kcp)
{
return kcp->nsnd_buf + kcp->nsnd_que;
}


// read conv
IUINT32 ikcp_getconv(const void *ptr)
{
IUINT32 conv;
ikcp_decode32u((const char*)ptr, &conv);
return conv;
}



收发demo

使用lsof -i:8080

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
#include <sys/types.h>
#include <sys/socket.h>
#include <pthread.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include "ikcp.c"

#include <string.h>

#include <sys/time.h>
#include <sys/wait.h>
#include <arpa/inet.h>
#include <stdbool.h>
#include <errno.h>

typedef struct {
unsigned char *ipstr;
int port;
ikcpcb *pkcpb;
int sockfd;
struct sockaddr_in addr;
struct sockaddr_in clientAddr;
socklen_t clientAddrLen;

volatile bool is_avliable;
volatile bool run_sendread;
volatile bool run_send;
volatile bool run_read;
pthread_mutex_t lock;
pthread_t thread_send;
pthread_t thread_read;
pthread_t thread_update;
}my_kcp_t;

/* get system time */
void itimeofday(long *sec, long *usec)
{
#if defined(__unix)
struct timeval time;
gettimeofday(&time, NULL);
if (sec) *sec = time.tv_sec;
if (usec) *usec = time.tv_usec;
#else
static long mode = 0, addsec = 0;
BOOL retval;
static IINT64 freq = 1;
IINT64 qpc;
if (mode == 0) {
retval = QueryPerformanceFrequency((LARGE_INTEGER*)&freq);
freq = (freq == 0)? 1 : freq;
retval = QueryPerformanceCounter((LARGE_INTEGER*)&qpc);
addsec = (long)time(NULL);
addsec = addsec - (long)((qpc / freq) & 0x7fffffff);
mode = 1;
}
retval = QueryPerformanceCounter((LARGE_INTEGER*)&qpc);
retval = retval * 2;
if (sec) *sec = (long)(qpc / freq) + addsec;
if (usec) *usec = (long)((qpc % freq) * 1000000 / freq);
#endif
}

/* get clock in millisecond 64 */
IINT64 iclock64(void)
{
long s, u;
IINT64 value;
itimeofday(&s, &u);
value = ((IINT64)s) * 1000 + (u / 1000);
return value;
}

IUINT32 iclock()
{
return (IUINT32)(iclock64() & 0xfffffffful);
}

/* sleep in millisecond */
void isleep(unsigned long millisecond)
{
#ifdef __unix /* usleep( time * 1000 ); */
struct timespec ts;
ts.tv_sec = (time_t)(millisecond / 1000);
ts.tv_nsec = (long)((millisecond % 1000) * 1000000);
/*nanosleep(&ts, NULL);*/
usleep((millisecond << 10) - (millisecond << 4) - (millisecond << 3));
#elif defined(_WIN32)
Sleep(millisecond);
#endif
}


int udpOutPut(const char *buf, int len, ikcpcb *kcp, void *user){
my_kcp_t *send = (my_kcp_t *)user;
//发送信息
int n = sendto(send->sockfd, buf, len, MSG_DONTWAIT, (struct sockaddr *)&send->clientAddr, sizeof(struct sockaddr_in));
if (n >= 0)
{
printf("server udp send: ip = %s port = %d 字节 =%d bytes 内容=[%s]\n",inet_ntoa(send->clientAddr.sin_addr), ntohs(send->clientAddr.sin_port), n ,buf+24);//24字节的KCP头部
return n;
}else{
printf("error: %d bytes send, error\n", n);
return -1;
}
}
void kcp_write_log(const char *log, struct IKCPCB *kcp, void *user){
printf("kcp_write_log %s", log);
}

static size_t memsize = 0;
static int needsend = 0;
void* my_ikcp_mem_malloc(size_t size) {
void *ptr = NULL;
ptr = malloc(size);
printf("申请内存:%p 大小:%lu\n",ptr, size);
memsize ++;
printf("memsize %ld\n", memsize);
return ptr;
}

void my_ikcp_mem_free(void *ptr){
printf("释放内存:%p\n", ptr);
memsize --;
printf("memsize %ld\n", memsize);
free(ptr);
}

void* th_read(void *arg) {
my_kcp_t *kcp = (my_kcp_t *)arg;
unsigned int len = sizeof(struct sockaddr_in);
int n, ret;
bool read_exit = false;
while (kcp->is_avliable && kcp->run_sendread) {
isleep(5);
char buf[1500] = {0};
// pthread_mutex_lock(&(kcp->lock));
int timeout = 300;
while (timeout --) {
// 收到UDP包
struct sockaddr_in callAddr;
socklen_t callAddrLen;
n = recvfrom(kcp->sockfd, buf, 567, MSG_CONFIRM, (struct sockaddr *)&callAddr, &callAddrLen);
if (n < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
continue;
}else{
read_exit = true;
break;
}
}
// char* callAddrStr = inet_ntoa(callAddr.sin_addr);
// int callAddrPort = ntohs(callAddr.sin_port);
// if (0 != strcmp(callAddrStr, inet_ntoa(kcp->clientAddr.sin_addr)) || callAddrPort != ntohs(kcp->clientAddr.sin_port)) {
// printf("未知连接: ip =%s port=%d data=[%s]\n", callAddrStr, callAddrPort, buf);
// continue;
// }
break;
}
if (timeout <= 0) {
// pthread_mutex_unlock(&(kcp->lock));
printf("Recfrom %s %d Timeout\n", inet_ntoa(kcp->clientAddr.sin_addr), ntohs(kcp->clientAddr.sin_port));
kcp->is_avliable = false;
kcp->run_sendread = false;
break;
}
if (read_exit) {
kcp->is_avliable = false;
kcp->run_sendread = false;
break;
}

printf("recvfrom ip %s port %d data %s\n", inet_ntoa(kcp->clientAddr.sin_addr),ntohs(kcp->clientAddr.sin_port), buf);

// 将收到的数据包塞给kcp处理
ret = ikcp_input(kcp->pkcpb, buf, n);
// pthread_mutex_unlock(&(kcp->lock));
if (0 != ret) {
printf("ikcp_input errcode %d\n", ret);
continue;
}
// 再从kcp中获得数据原始数据
// pthread_mutex_lock(&(kcp->lock));
ret = ikcp_recv(kcp->pkcpb, buf, 1024);
// pthread_mutex_unlock(&(kcp->lock));
if (ret < 0) {
if (kcp->pkcpb->state == 0xffffffff) {
kcp->is_avliable = false;
kcp->run_sendread = false;
break;
}
continue;
}else{
printf("收到: ip = %s port = %d 数据:%s\n", inet_ntoa(kcp->clientAddr.sin_addr), ntohs(kcp->clientAddr.sin_port), buf);
// 返回响应
// pthread_mutex_lock(&(kcp->lock));
// needsend ++;
// pthread_mutex_lock(&(kcp->lock));
}
}
return NULL;
}

void* th_update(void *arg){
// my_kcp_t *kcp = (my_kcp_t *)arg;
// while (kcp->is_avliable && kcp->run_sendread && kcp->pkcpb->state != 0xffffffff) {
// usleep(10 * 1000);
// pthread_mutex_lock(&(kcp->lock));
// if (NULL != kcp->pkcpb) {
// ikcp_update(kcp->pkcpb, iclock());
// }else{
// kcp->is_avliable = false;
// kcp->run_sendread = false;
// }
// pthread_mutex_unlock(&(kcp->lock));
// }
return NULL;
}

static int sendNumber = 0;
void* th_send(void *arg) {
my_kcp_t *kcp = (my_kcp_t *)arg;
char sendbuf[100];
while (kcp->is_avliable && kcp->run_sendread) {
isleep(5);
// pthread_mutex_lock(&(kcp->lock));
// if (needsend > 0) {
// needsend --;
// }else{
// pthread_mutex_unlock(&(kcp->lock));
// continue;
// }
// pthread_mutex_unlock(&(kcp->lock));
memset(sendbuf, 0, sizeof(sendbuf));
sprintf(sendbuf, "This is Server Send Data n=[%d]", ++sendNumber);
if (0 != ikcp_send(kcp->pkcpb, sendbuf, sizeof(sendbuf))){
kcp->run_sendread = false;
kcp->is_avliable = false;
break;
}else{
// printf("服务器发送数据成功....");
}
ikcp_update(kcp->pkcpb,iclock());
}
return NULL;
}

// 服务器开启8080端口监听UDP
// 客户端向服务器发送数据,
// 服务器知道了客户端的IP和PORT了,
// 就可以向这个地址发送响应数据了
int main(int argc,char *argv[])
{
printf("this is kcpServer\n");

my_kcp_t send;
send.port = 8080;
send.pkcpb = NULL;

ikcpcb *kcp = ikcp_create(0x1, (void *)&send);//创建kcp对象把send传给kcp的user变量

// 设置日志回调函数
kcp->writelog = kcp_write_log;

//设置kcp对象的回调函数
ikcp_setoutput(kcp, udpOutPut);

//1, 10, 2, 1
ikcp_nodelay(kcp, 1, 10, 2, 1);

// 设置窗口大小
ikcp_wndsize(kcp, 1024, 1024);

// 设置MTU
ikcp_setmtu(kcp, 1400);

// 设置内存分配器
ikcp_allocator(my_ikcp_mem_malloc, my_ikcp_mem_free);

send.pkcpb = kcp;

int sockfd = socket(AF_INET,SOCK_DGRAM,0);
if(sockfd < 0)
{
perror("socket error!");
exit(1);
}
send.sockfd = sockfd;
bzero(&(send.addr), sizeof(send.addr));
send.addr.sin_family = AF_INET;
send.addr.sin_addr.s_addr = htonl(INADDR_ANY);//INADDR_ANY
send.addr.sin_port = htons(send.port);

printf("服务器socket: %d port:%d\n",send.sockfd, send.port);

if(send.sockfd<0){
perror("socket error!");
exit(1);
}

if(bind(send.sockfd,(struct sockaddr *)&(send.addr),sizeof(struct sockaddr_in)) < 0)
{
perror("bind failed.\n");
exit(1);
}

// 先读到Hello::Server之后再去开启读写线程
char buff[1024];
int buffer_size = 1024;
int rec = 0;
socklen_t socklen;
while(1){
usleep(20* 1000);
memset(buff, 0, sizeof(buff));
rec = recvfrom(send.sockfd, buff, 567, MSG_DONTWAIT, (struct sockaddr*)&(send.clientAddr), &socklen);
if (rec > 0) {
send.clientAddrLen = socklen;
if (0 == strcmp(buff, "Hello::Server")) {
printf("recvfrom ip %s port %d data=[%s] timestamp[%lu]\n", inet_ntoa(send.clientAddr.sin_addr), ntohs(send.clientAddr.sin_port), buff,iclock());
// 响应Hello::Client
memset(buff, 0, buffer_size);
strcpy(buff, "Hello::Client");
int n = sendto(send.sockfd, buff, strlen("Hello::Client") + 1, MSG_DONTWAIT, (struct sockaddr *)&(send.clientAddr), socklen);
printf("sendto ip %s port %d data=[%s]\n", inet_ntoa(send.clientAddr.sin_addr), ntohs(send.clientAddr.sin_port), buff);
if (n == strlen("Hello::Client") + 1){
printf("协商完成,开始使用kcp交互数据\n");
break;
}else{
printf("协商失败 n = %d failed.\n", n);
}
}else{
printf("UNKNOWN Data.... %s\n", buff);
}
}

}

send.is_avliable = true;
send.run_sendread = true;
send.run_send = false;
if (0 != pthread_create(&(send.thread_read), NULL, th_read, &send)) {
printf("创建读线程失败....");
ikcp_free(send.pkcpb);
return -1;
}
pthread_detach(send.thread_read);

if (0 != pthread_create(&(send.thread_send), NULL, th_send, &send)) {
printf("创建写线程失败.....");
ikcp_free(send.pkcpb);
return -1;
}
pthread_detach(send.thread_send);

if (0 != pthread_create(&(send.thread_update), NULL, th_update, &send)) {
printf("创建更新线程失败.....");
ikcp_free(send.pkcpb);
return -1;
}
pthread_detach(send.thread_update);

// 阻塞主线程
while (send.is_avliable) {
sleep(2);
if (!send.is_avliable){
// 发生错误,将收发线程关闭
send.run_sendread = false;
sleep(2); // 等待收发线程释放资源
}
}

ikcp_free(send.pkcpb);
close(send.sockfd);
return 0;
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
#include <sys/types.h>
#include <sys/socket.h>
#include <pthread.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include "ikcp.c"

#include <sys/time.h>
#include <sys/wait.h>
#include <arpa/inet.h>
#include <stdbool.h>
#include <errno.h>


typedef struct {
unsigned char *ipstr;
int port;
ikcpcb *pkcpb;
int sockfd;
struct sockaddr_in addr;//存放服务器的结构体
char buff[488];

volatile bool is_avliable;
volatile bool run_sendread;
pthread_mutex_t lock;
pthread_t thread_send;
pthread_t thread_read;
pthread_t thread_update;
}my_kcp_t;


/* get system time */
void itimeofday(long *sec, long *usec)
{
#if defined(__unix)
struct timeval time;
gettimeofday(&time, NULL);
if (sec) *sec = time.tv_sec;
if (usec) *usec = time.tv_usec;
#else
static long mode = 0, addsec = 0;
BOOL retval;
static IINT64 freq = 1;
IINT64 qpc;
if (mode == 0) {
retval = QueryPerformanceFrequency((LARGE_INTEGER*)&freq);
freq = (freq == 0)? 1 : freq;
retval = QueryPerformanceCounter((LARGE_INTEGER*)&qpc);
addsec = (long)time(NULL);
addsec = addsec - (long)((qpc / freq) & 0x7fffffff);
mode = 1;
}
retval = QueryPerformanceCounter((LARGE_INTEGER*)&qpc);
retval = retval * 2;
if (sec) *sec = (long)(qpc / freq) + addsec;
if (usec) *usec = (long)((qpc % freq) * 1000000 / freq);
#endif
}

/* get clock in millisecond 64 */
IINT64 iclock64(void)
{
long s, u;
IINT64 value;
itimeofday(&s, &u);
value = ((IINT64)s) * 1000 + (u / 1000);
return value;
}

IUINT32 iclock()
{
return (IUINT32)(iclock64() & 0xfffffffful);
}

/* sleep in millisecond */
void isleep(unsigned long millisecond)
{
#ifdef __unix /* usleep( time * 1000 ); */
struct timespec ts;
ts.tv_sec = (time_t)(millisecond / 1000);
ts.tv_nsec = (long)((millisecond % 1000) * 1000000);
/*nanosleep(&ts, NULL);*/
usleep((millisecond << 10) - (millisecond << 4) - (millisecond << 3));
#elif defined(_WIN32)
Sleep(millisecond);
#endif
}
unsigned long malloc_num = 0;
void* my_ikcp_mem_malloc(size_t size) {
void *ptr = NULL;
ptr = malloc(size);
// printf("申请内存:%p 大小:%lu\n",ptr, size);
malloc_num ++;
return ptr;
}

void my_ikcp_mem_free(void *ptr){
// printf("释放内存:%p\n", ptr);
malloc_num --;
free(ptr);
}

int udpOutPut(const char *buf, int len, ikcpcb *kcp, void *user){
my_kcp_t *send = (my_kcp_t *)user;
int n = sendto(send->sockfd, buf, len, 0,(struct sockaddr *) &send->addr,sizeof(struct sockaddr_in));
if (n >= 0)
{
printf("client udp send: 字节 =%d bytes 内容=[%s]\n", n ,buf+24);//24字节的KCP头部
return n;
}else{
printf("udpOutPut: %d bytes send, error\n", n);
return -1;
}
}
void* th_read(void *arg) {
my_kcp_t *kcp = (my_kcp_t *)arg;
unsigned int len = sizeof(struct sockaddr_in);
int n, ret;
bool read_exit = false;
while (kcp->is_avliable && kcp->run_sendread) {
isleep(10);
char buf[1500] = {0};
int timeout = 2000;
while (timeout --) {
isleep(5);
// 收到UDP包
struct sockaddr_in callAddr;
socklen_t callAddrLen;
n = recvfrom(kcp->sockfd, buf, 1500, MSG_DONTWAIT, (struct sockaddr *)&callAddr, &callAddrLen);
if (n < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
continue;
}else{
read_exit = true;
break;
}
}
// char* callAddrStr = inet_ntoa(callAddr.sin_addr);
// int callAddrPort = ntohs(callAddr.sin_port);
// if (0 != strcmp(callAddrStr, inet_ntoa(kcp->addr.sin_addr)) || callAddrPort != ntohs(kcp->addr.sin_port)){
// printf("未知连接: ip =%s port=%d data=[%s]\n", callAddrStr, callAddrPort, buf);
// continue;
// }
break;
}

if (timeout <= 0) {
// pthread_mutex_unlock(&(kcp->lock));
printf("Recfrom %s %d Timeout\n", inet_ntoa(kcp->addr.sin_addr), ntohs(kcp->addr.sin_port));
kcp->is_avliable = false;
kcp->run_sendread = false;
break;
}
if (read_exit) {
kcp->is_avliable = false;
kcp->run_sendread = false;
}

printf("recvfrom ip %s port %d data %s\n", inet_ntoa(kcp->addr.sin_addr),ntohs(kcp->addr.sin_port), buf);
// 将收到的数据包塞给kcp处理
// pthread_mutex_lock(&(kcp->lock));
ret = ikcp_input(kcp->pkcpb, buf, n);
if (kcp->pkcpb->state == 0xffffffff) {
kcp->is_avliable = false;
kcp->run_sendread = false;
break;
}
// pthread_mutex_unlock(&(kcp->lock));
if (-1 == ret) {
continue;
}
// 再从kcp中获得数据原始数据
// pthread_mutex_lock(&(kcp->lock));
ret = ikcp_recv(kcp->pkcpb, buf,576);
// pthread_mutex_unlock(&(kcp->lock));
if (ret < 0) {
continue;
}else{
printf("收到: ip = %s port = %d 数据:%s\n", inet_ntoa(kcp->addr.sin_addr), ntohs(kcp->addr.sin_port), buf);
// 返回响应
}
}
return NULL;
}

void* th_update(void *arg){
// my_kcp_t *kcp = (my_kcp_t *)arg;
// while (kcp->is_avliable && kcp->run_sendread) {
// usleep(10 * 1000);
// pthread_mutex_lock(&(kcp->lock));
// if (NULL != kcp->pkcpb) {
// ikcp_update(kcp->pkcpb, iclock());
// }
// pthread_mutex_unlock(&(kcp->lock));
// }
return NULL;
}
static int sendNumber = 0;
void* th_send(void *arg) {
my_kcp_t *kcp = (my_kcp_t *)arg;
char sendbuf[100];
while (kcp->is_avliable && kcp->run_sendread) {
memset(sendbuf, 0, sizeof(sendbuf));
sprintf(sendbuf, "This is Client Send Data n=[%d]", ++sendNumber);
// pthread_mutex_lock(&(kcp->lock));
if (kcp->pkcpb->state == 0xffffffff) {
kcp->is_avliable = false;
kcp->run_sendread = false;
break;
}
if (0 != ikcp_send(kcp->pkcpb, sendbuf, sizeof(sendbuf))){
printf("客户端发送数据失败....");
kcp->run_sendread = false;
kcp->is_avliable = false;
}else{
// printf("服务器发送数据成功....");
}

ikcp_update(kcp->pkcpb, iclock());
// pthread_mutex_unlock(&(kcp->lock));
isleep(20);
}
return NULL;
}
void kcp_write_log(const char *log, struct IKCPCB *kcp, void *user){
printf("kcp_write_log %s", log);
}

// 客户端向服务端发送数据的时候
// 已经知道服务器的IP和端口号了
int main(int argc,char *argv[])
{
printf("this is kcpClient\n");

my_kcp_t send;
send.ipstr = (unsigned char *)"127.0.0.1";
send.port = 8080;
int sockfd = socket(AF_INET,SOCK_DGRAM,0);
printf("sockfd %d\n", sockfd);
send.sockfd = sockfd;
if(send.sockfd < 0)
{
perror("socket error!");
exit(1);
}

bzero(&send.addr, sizeof(send.addr));

//设置服务器ip、port
send.addr.sin_family=AF_INET;
send.addr.sin_addr.s_addr = inet_addr((char*)send.ipstr);
send.addr.sin_port = htons(send.port);

printf("sockfd = %d ip = %s port = %d\n",send.sockfd, send.ipstr, send.port);


ikcp_allocator(my_ikcp_mem_malloc, my_ikcp_mem_free);

// 初始化kcp对象
ikcpcb *kcp = ikcp_create(0x1, (void *)&send);//创建kcp对象把send传给kcp的user变量
ikcp_setoutput(kcp, udpOutPut);//设置kcp对象的回调函数
ikcp_nodelay(kcp,1, 10, 2, 1);//(kcp1, 0, 10, 0, 0); 1, 10, 2, 1
ikcp_wndsize(kcp, 1024, 1024);
ikcp_setmtu(kcp, 1400);
kcp->writelog = kcp_write_log;


send.pkcpb = kcp;

// 先发送Hello::Client,然后等待接收Hello::Server
int rec = 0;
char buff[1024];
int buffer_size = 1024;
bool connectOK = false;
while (!connectOK) {
isleep(50);
memset(buff, 0, sizeof(buff));
strcpy(buff, "Hello::Server");
printf("尝试发送 ip %s port %d ==> Hello::Server数据....timestamp[%lu]\n", inet_ntoa(send.addr.sin_addr), ntohs(send.addr.sin_port), iclock());
int n = sendto(send.sockfd, buff, strlen("Hello::Server") + 1,MSG_DONTWAIT,(struct sockaddr*)&(send.addr), sizeof(send.addr));
printf("已经发送 ip %s port %d ==> Hello::Server数据....Return[%d]\n", inet_ntoa(send.addr.sin_addr), ntohs(send.addr.sin_port), n);

if (n == strlen("Hello::Server") + 1) {
int timeout = 50;
while (timeout --) {
isleep(5);
// 发送成功,接收响应
struct sockaddr_in callAddr;
socklen_t callAddrLen;
rec = recvfrom(send.sockfd, buff,657 ,MSG_DONTWAIT , (struct sockaddr *)&callAddr, &callAddrLen);
if (rec < 0) {
continue;
}
printf("收到ip %s port %d 数据包 %s\n", inet_ntoa(callAddr.sin_addr), ntohs(callAddr.sin_port),buff);
if (0 == strcmp("Hello::Client", buff)) {
printf("收到Hello::Client, 开始KCP\n");
connectOK = true;
break;
}else{
printf("收到其他数据:%s\n", buff);
memset(buff, 0, sizeof(buff));
}
}
}else{
printf("Client sendto ret = %d\n", n);
}
}

printf("协商完成,使用KCP通讯.....\n");
send.is_avliable = true;
send.run_sendread = true;
if (0 != pthread_create(&(send.thread_send), NULL, th_send, &send)) {
printf("创建写线程失败.....");
ikcp_free(send.pkcpb);
return -1;
}
pthread_detach(send.thread_send);

if (0 != pthread_create(&(send.thread_read), NULL, th_read, &send)) {
printf("创建读线程失败....");
ikcp_free(send.pkcpb);
return -1;
}
pthread_detach(send.thread_read);

if (0 != pthread_create(&(send.thread_update), NULL, th_update, &send)) {
printf("创建更新线程失败.....");
ikcp_free(send.pkcpb);
return -1;
}
pthread_detach(send.thread_update);

// 客户端只收发10s
int i = 0;
while (send.is_avliable) {
sleep(4);
if (!send.is_avliable){
// 发生错误,将收发线程关闭
send.run_sendread = false;
sleep(2); // 等待收发线程释放资源
ikcp_free(send.pkcpb);
return -1;
}
if (++i == 14) {
send.is_avliable = false;
send.run_sendread = false;
sleep(2);
ikcp_free(send.pkcpb);
break;
}
}
return 0;
}

现象和总结

收发速度不一致会导致缓存堆积,大量的内存被申请而不被释放。

使用sendto发送数据之后,可以直接使用recvfrom接收响应的数据了。太久不取数据会取不到。

udp响应包不一定通过目的port返回。有可能通过0端口返回。

只有服务端通过一个共享变量来控制收发会出现死锁问题。

Share 

 Previous post: 碎片化生存 Next post: C-Protoc 

© 2025 long

Theme Typography by Makito

Proudly published with Hexo