前言

《Unix 环境高级编程》 读书笔记,仅供个人 review 使用。


ch3. 文件 I/O

概念:文件 IO 操作对应系统调用,即无缓冲 IO

文件描述符:系统为每个进程都维护一张 fd 表,表中 0,1,2 的 fd 被预分配给 stdin, stdout, stderr 设备文件,新 fd 总是表中最小未使用的正整数

open:指明权限打开指定路径文件。权限分 2 部分

  • 必单选的 access mode:O_RDONLY 只读,O_WRONLY 只写,O_RDWR 读写,O_EXEC 只执行
  • 可多选的 control mode:O_APPEND 原子追加写,O_CREAT 原子创建文件,O_EXCL 存在判断,O_DIRECTORY 目录判断,O_NONBLOCK 非阻塞操作 IO,O_SYNC 持久化写

openat:处理相对路径的 at 家族函数。解决问题:进程内的多线程会共享 cwd,可用相对路径让多线程工作在不同目录下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void *thread_openat(void *arg) {
char buf[256];
int dir_fd, fd;

dir_fd = (long) arg;
sleep(1);
if (getcwd(buf, 256) != NULL)
printf("thread cwd: %s\n", buf); // /tmp

// 若 dif_fd 是 AT_FDCWD 常量则打开 /tmp/x.txt
if ((fd = openat(dir_fd, "x.txt", O_RDONLY)) < 0) // /home/wuyin/x.txt
err_sys("openat failed");
read(fd, buf, 256)
printf("thread_1.txt: %s\n", buf);
}

void proc_open() {
pthread_t tid;
long dir_fd;
if ((dir_fd = open("./", O_RDONLY | O_DIRECTORY)) < 0) // /home/wuyin
err_sys("open dir failed");
if (pthread_create(&tid, NULL, thread_openat, (void *) dir_fd) != 0)
err_sys("create thread failed");
if (chdir("/tmp") < 0)
err_sys("chdir failed");
sleep(2);
}

creat:等效于 open(path, o_WRONLY|O_CREATE|O_TRUNC, mode),注意是只写方式打开:

1
2
3
4
5
6
7
8
9
10
11
12
13
void create_file() {
int fd, flag;
char buf[1024];

if ((fd = creat("/tmp/z", 0644)) < 0) // write-only
err_sys("creat failed");
if ((flag = fcntl(fd, F_GETFL, 0)) < 0)
err_sys("fcntl failed");
printf("write-only? : %d\n", (flag & O_ACCMODE) == O_WRONLY); // 1
if (read(fd, buf, 1024) < 0)
err_sys("read failed"); // Bad file descriptor
// close(fd); // 进程退出自动关闭
}

read:从 fd 中读取 nbytes 到指定缓冲区 void* buf,到文件尾返回 0,出错返回 EOF(-1)

write:将缓冲区 buf 的 nbytes 写入 fd 文件,返回实际写入字节数,一般少于 nbytes 即出错

文件共享:

lseek:重置 fd 的 file offset 到指定的偏移量

  • 偏移量:可正可负,可在文件尾向后偏移,但不能在文件头向前偏移(报错:Invalid argument)
  • 方向:SEEK_SET 指相对文件头,SEEK_CUR 指相对当前 offset,SEEK_END 指相对文件尾

文件原子 I/O 操作

  • O_APPEND:以此 mode 打开的 fd 每次写都会原子地 lseek 到文件尾,再执行写操作
  • O_CREAT | O_EXCL:检查文件是否存在,不存在则创建

dup:复制出的新 fd 指向同一个 Open File Table Entry

fcntl:读取或更新 fd 的 access/control mode,对于 F_SETFL

1
2
old_flags &= ~clr_flags; // 清除指定 mode
old_flags |= O_APPEND; // 新增新 mode

ch4. 文件和目录

stat 函数族,获取指定 path 或 fd 的文件信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct stat {
ino_t st_ino; /* File serial number. */ // i-node 编号
mode_t st_mode; /* File mode. */ // 文件类型,访问权限
nlink_t st_nlink; /* Link count. */ // 引用次数
uid_t st_uid; /* User ID of the file's owner. */ // owner uid
gid_t st_gid; /* Group ID of the file's group. */ // owner gid
dev_t st_rdev; /* Device number, if device. */ // 特殊设备文件 id
off_t st_size; /* Size of file, in bytes. */ // 文件字节大小
blksize_t st_blksize; /* Optimal block size for I/O. */ // 文件所占 block 数量
blkcnt_t st_blocks; /* Number 512-byte blocks allocated. */ // 最佳 IO block 字节数
struct timespec st_atim; /* Time of last access. */ // 最后读内容时间
struct timespec st_mtim; /* Time of last modification. */ // 最后写内容时间
struct timespec st_ctim; /* Time of last status change. */ // i-node 最后更新时间
};

文件类型:检查值 stat.st_mode

1
2
3
4
S_ISREG(), S_ISDIR()  // regular file, directory
S_ISCHR(), S_ISBLK() // (char, block) special file
S_ISFIFO(), S_IS[SOCK,MQ,SEM, SHM]() // (named pipe, socket, mq, semaphore, share mem) ipc
S_ISLNK() // symbolic/soft link

实现:与文件类型屏蔽字 S_IFMT 取与运算,再与具体类型比较:

1
#define S_ISREG(m)  (((m) & S_IFMT) == S_IFREG)

进程的 3 个 user/group 概念:

  • real uid/ruid:执行进程的用户
  • effective uid/euid:判断进程操作权限的用户身份,另外可用 access 检查 ruid 操作权限
  • saved set-uid/set-gid:文件开启 set-uid mode 后,euid 即文件 owner uid,常用于 root 可控提权

文件访问权限:9 个 st_mode 权限屏蔽字

  • owner / USR:S_IRUSR 用户读,S_IWUSR 用户写,S_IXUSR 用户执行,S_IRWXU 用户读写执行
  • group / GRP:S_IRGRP 组内用户读,S_IWGRP 组写,S_IXGRP 组执行,S_IRWXG 组读写执行
  • other / OTH:S_IROTH 其他用户读,S_IWOTH 其他写,S_IXOTH 其他执行,S_IRWXO 其他读写执行

目录是内容为文件列表信息的特殊文件:

Read / O_RDONLY, O_RDWR Write / O_WRONLY, O_RDWR Execute / O_EXEC
文件 读文件内容 写文件内容 执行文件
目录 读文件列表 新建、删除、修改目录内文件 进入目录

文件操作接口:umask 创建文件时屏蔽掉指定的权限位;chmod 运行时修改文件权限(进程 e-uid 须为文件 owner uid);truncate 截断或扩展文件(填充内容为 0);link/unlink 递增或递减某个文件的引用计数 stat.st_nlink,当计数归 0 后,内核检查无进程已打开该文件才会删除


ch5. 标准 I/O

概念:在操作系统层的文件 I/O 上,将文件描述符分配缓冲区封装为“流”,并实现最佳块大小执行 IO 等多项优化

三种缓冲类型:

  • 全缓冲 _IOFBF:在填满缓冲区后才执行 IO
  • 行缓冲 _IOLBF:在输入或输出中遇到换行符 \n 或缓冲区已满,才执行 IO
  • 无缓冲 _IONBF:穿透到文件 IO

标准 I/O 的默认缓冲规则:stderr 必须无缓冲;若流指向 terminal 则行缓冲;否则全缓冲。可用 setvbuf 强制流以指定的缓冲类型使用用户分配的、长度固定为 BUFSIZ 的缓冲区

流的打开关闭

  • fopen:以指定的权限 type 打开文件,注意 w 会创建或清空文件,值为 O_WRONLY|O_CREAT|OTRUNC + 意为读写即 O_RDWRa 的追加操作也是原子执行
  • fdopen 在 fd 上手动关联 I/O 流,反之 fileno 可从流中取出对应的 fd
  • fclose:关闭流之前会 flush 输出数据,discard 输入数据

流的读写

  • 面向字符 IO,每次读写一个字符:getc, fgetc, getchar 读,putc, fputc, putchar 写;即使 EOF 也还用 ungetc 回送一个字符到缓冲区

  • 面向行 IO,每次读写一行:fgetsfputs 需自行处理尾端的换行符

  • 二进制 IO / struct IO,每次读写一个完整的结构:fread, fwrite

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    struct user {
    int id;
    char name[32];
    };

    void struct_io() {
    FILE *fp;
    struct user dst[2];
    struct user src[2] = {{1, "user_1"}, {2, "user_2"}};

    if ((fp = fopen("/tmp/users.txt", "w+")) == NULL)
    err_sys("fopen failed");

    // 从 &src 开始读取 2 个 user 长度的内存,写入流
    if (fwrite(&src, sizeof(struct user), 2, fp) != 2)
    err_sys("fwrite failed");
    rewind(fp);

    // 读取流,类似反序列化
    if (fread(&dst, sizeof(struct user), 2, fp) != 2) {
    if (feof(fp))
    err_ret("fread EOF");
    else if (ferror(fp))
    err_sys("fread failed");
    }
    printf("u1:<%d,%s>\tu2:<%d,%s>\n", dst[0].id, dst[0].name, dst[1].id, dst[1].name);
    }

出错检测:流内部维护 EOF 和出错标志,分别用 feofferror 检测,可用 clearerr 一并清除,类似 errno=0

流的格式化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 格式化输出
printf(char *fmt, ...); // 写到 stdout
fprintf(FILE* fp, char *fmt, ...) // 写到指定流
dprintf(int fd, char *fmt, ... ) // 写到指定 fd
sprintf(char *buf, char *fmt, ...) // 写到指定的 buf 缓冲区,必须保证足够大才不溢出
snprintf(char *buf, size_t n, char *fmt, ...) // 写到长度为 n 的 buf 缓冲区,超出则丢弃,更安全

// 可变参数可由调用方自行解析为 va_list
vprintf(char *fmt, va_list arg); // vfprintf, vdprintf; vsprintf, vsnprintf 同理

// 格式化输入
scanf(char *fmt, ...) // 从 stdin 读入
fscanf(FILE* fp, char *fmt, ...) // 从指定流读入
sprintf(char *buf, char *fmt, ...) // 从指定缓冲区读入

临时文件:tmpnam 生成临时文件名,tmpfile 创建临时文件流,进程退出自动清理


ch6. 系统文件与时间

文件内容 位置 头文件 struct 搜索函数
用户数据 /etc/passwd <pwd.h> passwd getpwnam, getpwid
用户组数据 /etc/group <grp.h> group getgrnam, getgrid
主机地址信息 /etc/hosts <netdb.h> hostent getnameinfo, getaddrinfo
本地网络信息 /etc/networks <netdb.h> netent getnetbyname, getnetbyaddr
常见网络协议 /etc/protocols <netdb.h> protent getprotobyname, getprotobynumber
本地服务信息 /etc/services <netdb.h> servent getservbyname, getservbyport

以上系统数据文件都有各自的遍历函数,其中文件的打开和读取由系统负责,由 3 类函数实现遍历:

  • set 函数:系统打开文件,rewind offset 从头开始读取
  • get 函数:读取下一条数据 entry,返回指向系统静态存储区的指针,若要修改则需手动复制
  • end 函数:系统关闭文件

如用遍历实现按用户名搜索用户数据:

1
2
3
4
5
6
7
8
9
const struct passwd *getpwnam(char *name) {
struct passwd *pw;
setpwent();
while ((pw = getpwent()) != NULL)
if (strcmp(pw->pw_name, name) == 0)
break;
endpwent();
return pw;
}

系统时间:按精度分为 3 类,及其读取当前时间戳的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include<time.h>
typedef long time_t // 秒级
time_t time(time_t *tp)

#include<sys/time.h>
struct timeval {
time_t tv_sec; // 秒单位 time_t
int32_t tv_usec; // 微秒级
};
int gettimeofday(struct timeval *tvp, void * tzone);

struct timespec {
time_t tv_sec;
long tv_nsec; // 纳秒级
};
int clock_gettime(clockid_t cid, struct timespec *tsp);

时间格式化:三种时间戳格式化前需转为统一的 broken-down time,即 tm

1
2
3
4
5
6
7
struct tm {
int tm_sec, tm_min, tm_hour; // 时分秒
int tm_mday, tm_mon, tm_year; // 年月日
int tm_wday, tm_yday; // 周日,年日
long tm_gmtoff; // 时区及其偏移秒数
char *tm_zone;
};

转换关系:


ch7. 进程环境

进程终止:相比_exit, _Exitexit 退出前会执行标准 IO 库的清理关闭,还会以栈的方式调用 atexit 注册的 exit handlers

1
2
3
4
5
int main(int argc, char **argv) {
printf("abc");
// _exit(0); _Exit(0); // 内核直接退出
exit(0); // 输出 abc
}

环境表:类比字符串实现为 \0 结尾的字符数组,对于 char **argv, **environ 都保证最后一个参数值为 NULL 来标记数组结束,遍历方式为:

1
2
3
extern char **environ;
char **env;
for (env = environ; *env != NULL; env++);

内存布局:每个进程都有一段连续的内核、用户虚拟地址空间,后者结构如下:

  • Text Segment,正文段:存储只读的可执行代码,可在多进程间安全共享(如父子进程)

  • Initialized Data Segment,数据段:存储初始化数据,显式初始化的全局变量、static 变量

  • BSS,未初始化数据段:存储未初始化的全局、static 变量,由系统自动初始化为零值或 NULL

  • Stack,栈:以栈帧(frame)为单位存储函数调用的上下文信息(局部变量,返回地址)

  • Heap,堆:存储运行时动态内存。malloc 分配指定字节数,calloc 会额外零初始化,realloc 用于扩缩容

    1
    2
    3
    4
    5
    char *s1; // NULL
    int f() {
    // 字符指针变量 s2 存储在数据段,但指向的字符串常量存储在正文段,无法修改其值
    static char *s2 = "read-only text data";
    }

    图源:gabrieletolomei

非局部 goto:跨函数丢弃栈帧跳转,需注意存储在不同区域、不同修饰符声明的变量,在跳转前后值的变化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
jmp_buf jmp_buffer;
static int global_var;
static void f2() { longjmp(jmp_buffer, 1); }
static void f1(int auto_val, int reg, int volatile_val, int static_val) {
// 10, 20, 30, 40, 50
printf("before jump: %d\t%d\t%d\t%d\t%d\n", global_var, auto_val, reg, volatile_val, static_val);
f2();
}

void jump_frames() {
global_var = 1;
int auto_var = 2;
register int reg_var = 3;
volatile int volatile_val = 4;
static int static_var = 5;

if (setjmp(jmp_buffer) != 0) {
// 局部变量、register 变量在 longjmp 后会被重置为调用 setjmp 时的值
// 全局变量、静态变量不受跳转影响
// 局部变量可用 volatile 声明易失特性,每次更新值都有效
// 10, 2, 3, 40, 50
printf("after jump: %d\t%d\t%d\t%d\t%d\n", global_var, auto_var, reg_var, volatile_val, static_var);
exit(0);
}
global_var = 10;auto_var = 20; reg_var = 30; volatile_val = 40; static_var = 50;
f1(auto_var, reg_var, volatile_val, static_var);
exit(0);
}

资源限制:每个进程都有一组资源限制值,可用 getrlimit, setrlimit 读写,常用于 runtime patch

1
2
3
4
struct rlimit {
rlim_t rlim_cur; /* current (soft) limit */ // 软限制
rlim_t rlim_max; /* maximum value for rlim_cur */ // 硬限制
};

ch8. 进程控制

fork:创建子进程,返回 2 次(父进程能 fork 多个子进程,而子进程只会有一个父进程)

  • 父进程:返回子进程 pid
  • 子进程:返回 0,可用 getppid 读取唯一的父进程 pid

除 pending sigset 和 file lock 等会冲突的资源外,子进程会继承属性并拷贝数据段和堆栈副本

另外,fork 常组合 exec* 执行新程序会导致副本被覆盖,故用 cow(copy on write) 技术只在子进程真正修改时,内核才为修改区域制作副本,避免无意义的拷贝

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
int global_num = 10;
char global_buf[] = "111\n";

void fork_process() {
int num = 1;
pid_t pid;

// 无缓冲的文件 IO 直接输出,输出一次
if (write(STDOUT_FILENO, global_buf, sizeof(global_buf) - 1) != sizeof(global_buf) - 1)
err_sys("write failed");

// 标准 IO 非终端默认全缓冲,若 stdout 被重定向到文件,则缓冲区内容会会被子进程拷贝,输出两次
printf("222\n");
if ((pid = fork()) < 0)
err_sys("fork failed");
else if (pid == 0)
num++, global_num++;
else
sleep(1);
printf("pid:%d, num:%d, global_num:%d\n", getpid(), num, global_num);
}

vfork:fork 出的子进程在父进程空间内运行,直到调用 exec / exit ,之后父进程才被调度。

1
2
3
4
5
6
7
8
9
10
11
12
13
int global_num = 10;
void vfork_spawn() {
int num = 1;
pid_t pid;
if ((pid = vfork()) < 0) {
err_sys("vfork failed");
} else if (pid == 0) {
++global_num;
++num;
_exit(0);
}
printf("%d, %d\n", num, global_num); // 2 11 // 子进程在 exit 前修改了父进程数据
}

wait, waitpid:子进程在 exit 正常退出或 terminate 异常终止后,内核为其保存终止状态、CPU 时间等信息,等待父进程 wait* 读取,读取前终止的子进程称为僵尸进程 zombie;反之若父进程先于子进程终止,则子进程被 pid 为 1 的 init 进程收养,称为孤儿进程 orphan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void zombie_orphan_process() {
pid_t pid;
int status;

if ((pid = fork()) < 0) {
err_sys("fork failed");
} else if (pid == 0) {
if ((pid = fork()) < 0) {
err_sys("fork failed");
} else if (pid > 0) {
printf("child pid %d exited\n", getpid()); // 5075
exit(10); // 子进程结束
}
sleep(1); // 子进程的子进程成为孤儿进程,被 init 收养
printf("child*2 pid %d, parent pid: %d\n", getpid(), getppid()); // 5076, 1
sleep(5);
exit(2);
}
if (waitpid(pid1, &status, 0) > 0)
// 进程退出状态需用 WIF* 宏检查,并用对应的宏取值
if (WIFEXITED(status))
printf("parent got status: %d\n", WEXITSTATUS(status)); // 10
sleep(5);
}

ps 能看到父进程 stat 为 S/sleep,而子进程为 Z/zombie

1
2
3
wuyin		5075   0.0  0.0        0      0 s001  Z+	(all)
wuyin 5076 0.0 0.0 4277516 392 s001 S+ ./all
wuyin 5072 0.0 0.0 4268300 712 s001 S+ ./all

waitid 能额外获取导致子进程状态变化的信号信息,wait3,wait4 能额外获取子进程 CPU 时间等资源统计信息

exec 家族函数:execl, execle, execlp; execv, execve; execvp; fexecve,区别在于入参,函数名中除 exec 外的后缀:

  • l,list:多个参数以可变参数表的形式传入,注意最后一个参数须为 (char*)0 做边界检测

  • v,vector:多个参数组成数组,传入数组指针,同理最后一个参数须为 NULL

  • p,PATH:在 $PATH 环境变量中搜索该文件

  • e,environ:传入指定的环境变量表,否则使用现成的 char **environ

此外 fexecve 多用于配合 fcntl 为文件 F_SETLK 线程安全地加锁后再执行。只有 execve 是系统调用,其他六个函数转换关系如下:

进程权限:setuid 更新实际 ruid;seteuid 更新有效 euid;setreuid 交换实际 ruid 和有效 euid,用于 set-user-ID 的进程主动取消提权

system:执行给定字符串命令,即 fork+exec+waitpid 并进行了出错和信号处理,简单实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int system(char *cmd_str) {
pid_t pid;
int status;

if (cmd_str == NULL || strlen(cmd_str) == 0)
return 1;
if ((pid = fork()) < 0) {
return -1;
} else if (pid == 0) {
execl("/bin/sh", "SHELL", "-c", cmd_str, (char *) 0); // sh -c 取命令行参数为命令输入
_exit(127); // execl 出错,避免 flush cmd_str 执行后的标准 IO
} else {
if (waitpid(pid, &status, 0) < 0)
if (errno != EINTR)
return -1;
}
return status;
}

nice, getpriority, setpriority:显式修改进程的调度优先级,注意参数意为对 CPU 的友好度,值越低即运行优先级越低

times:返回当前的 CPU 时钟,其值与时间戳无关,取 diff 相对值才有意义,入参为:

1
2
3
4
5
6
struct tms {
clock_t tms_utime; /* User CPU time */
clock_t tms_stime; /* System CPU time */
clock_t tms_cutime; /* Terminated children user CPU time */
clock_t tms_cstime; /* Terminated children System CPU time */
};

如取进程执行前后的 2 个 tms,计算期间的 CPU 时间

1
2
3
4
5
6
7
8
9
10
11
12
void pr_times(clock_t times_diff, struct tms *start, struct tms *end) {
static long clock_tick = 0;
if (clock_tick == 0)
if ((clock_tick = sysconf(_SC_CLK_TCK)) < 0)
err_sys("clk_tck conf failed");

printf("real: %7.2f\n", times_diff / (double) clock_tick);
printf("user: %7.2f\n", (double) (end->tms_utime - start->tms_utime) / (double) clock_tick);
printf("system: %7.2f\n", (double) (end->tms_stime - start->tms_stime) / (double) clock_tick);
printf("child user: %7.2f\n", (double) (end->tms_cutime - start->tms_cutime) / (double) clock_tick);
printf("child system: %7.2f\n", (double) (end->tms_cstime - start->tms_cstime) / (double) clock_tick);
}

ch9. 进程关系

setsid, getsid:进程会话 session 即多个进程组的集合,对应一个控制终端(controlling terminal),与该终端建立连接的会话首进程称为控制进程(controlling process);此外,可将会话内的进程,按是否运行在交互前台,分为前台进程(foreground process group)和后台进程(background process group),对应 fg, bg 切换命令

对于 fg 进程,相关的作业控制信号:

控制字符 动作 信号
Ctrl+Z suspend 进程 SIGSTP
Ctrl+\ 退出进程 SIGQUIT
Ctrl+C 中断进程 SIGINT

ch10. 信号

概念:系统通过异步信号通知进程对应事件发生。信号产生:终端键入某些控制字符、除零等硬件异常、主动 kill 发送具体信号等

信号处理

  • 忽略:注意 SIGKILL, SIGSTOP 无法忽略或捕捉,内核保留二者来实现进程的可靠停止
  • 捕捉:内核异步通知进程某个信号发生,执行用户预先注册的 handler 函数
  • 默认:执行系统默认 handler,大多数信号默认会终止进程,如下:
名字 说明 默认动作
SIGABRT 进程异常终止 abort 终止+core(dump)
SIGALRM 注册的定时器超时 alarm 终止
SIGBUS 硬件故障,如不对齐的访问内存 终止+core
SIGCANCEL 线程内 pthread_cancel 忽略
SIGCHILD 子进程状态改变,如 exit 忽略
SIGFPE 算数异常,如除 0 终止+core
SIGHUP 连接断开 终止
SIGINT 中断控制符 终止
SIGKILL, SIGSTOP 停止进程 停止
SIGPIPE 写对端无读进程的 pipe 终止
SIGPOLL 有可轮询事件 终止
SIGQUIT 退出控制符 终止+core
SIGSEGV 无效内存引用,如读 NULL 指针 终止+core
SIGTERM 终止进程 终止
SIGUSR1, SIGUSR2 2 个用户自定义信号 终止
SIGXCPU, SIGXRES 超过 CPU 时间,其他资源限制 终止,忽略

signalvoid (*signal(int signo, void(*func)(int)))(int) ,注册新的 handler 返回旧的,出错返回 SIG_ERR,此外还有 2 个特殊 action handler

1
2
3
4
typedef void (*sighandler_t) (int);
#define SIG_ERR ((sighandler_t) -1) /* Error return. */
#define SIG_DFL ((sighandler_t) 0) /* Default action. */ // signo 的系统默认 handler
#define SIG_IGN ((sighandler_t) 1) /* Ignore signal. */ // 显式忽略 signo

当 exec 执行新程序后,原进程的 handler 函数地址将不再有效,会原捕捉信号会被内核置为默认 handler;同理 fork 的子进程会继承 handler

1
2
3
4
5
6
7
8
9
10
11
12
13
void exec_do() {
printf("SIGTEM is default: %d\n", signal(SIGTERM, SIG_IGN) == SIG_DFL); // 1
printf("SIGINT is ignored: %d\n", signal(SIGTERM, SIG_IGN) == SIG_IGN); // 1
}
void sig_term(int signo) {}
void reset_signo() {
if (signal(SIGTERM, sig_term) == SIG_ERR)
err_sys("sigterm failed");
if (signal(SIGINT, SIG_IGN) == SIG_ERR) // 系统级 SIG_IGN 在 exec 后依旧有效
err_sys("sigint failed");
if (execl("exec_do", "", (char *) 0) < 0)
err_sys("execl failed");
}

可重入信号处理函数:进程被中断执行 handler 前,若调用会导致取值覆盖、数据结构破坏的函数,则不可重入:使用了静态内存数据如 getpwnam、使用了全局数据结构的大量标准 IO 库函数、调用了 malloc, free

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static void tick_alarm(int signo) {
if (getpwnam("root") == NULL)
err_sys("getpwdnam(root) failed");
alarm(1);
}

void not_reentrant() {
struct passwd *pwd;
if (signal(SIGALRM, tick_alarm) == SIG_ERR)
err_sys("sginal failed");
alarm(1);
while (1) {
if ((pwd = getpwnam("wuyin")) == NULL)
err_sys("getpwnam failed");
if (strcmp(pwd->pw_name, "wuyin") != 0)
printf("broken passwd structure, got '%s'\n", pwd->pw_name); // root
}
}

kill:向指定进程或进程组发送指定的信号,其中空信号 0 可检测指定的进程是否存活

raise:向自身发送信号,等效于 kill(getpid(), signo)

alarm:设置秒级定时器,到期后产生 SIGALRM 信号,默认会终止进程,每个进程只有一个定时器

pause:suspend 进程,直到捕捉到某个信号并执行完 handler。如简易版的 sleep 实现

1
2
3
4
5
6
7
8
static void empty_alarm() {}
unsigned int sleep(unsigned int sec) {
if (signal(SIGALRM, empty_alarm) == SIG_ERR) // 会覆盖旧 handler
err_sys("signal failed");
alarm(sec); // 会覆盖已有定时器的问题
pause();
return (alarm(0)); // 返回剩余时间
}

sigset_t :标识多个信号的集合,在某些只有 31 个信号的平台可用 int32 类型标识

  • 初始化:sigemptyset 置空,sigfillset 填入所有信号
  • 增删:sigaddset 增加,sigdelset 删除。信号集的函数可能增删信号,参数都传址 sigset_t*
  • 查询:sigismember

sigprocmask:类比创建文件时的 umask 权限屏蔽,进程可选择屏蔽掉不关心的信号,避免被信号中断;sigpending:获取被 blocked 信号集合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void reset_sig_quit(int signo) {
printf("caught SIGQUIT\n");
if (signal(SIGQUIT, SIG_DFL) == SIG_ERR) // 本次捕捉,重置给下次
err_sys("signal failed");
}

void sigset_pending() {
sigset_t new_set, old_set, pending_set;

if (signal(SIGQUIT, reset_sig_quit) == SIG_ERR)
err_sys("signal failed");
sigemptyset(&new_set);
sigaddset(&new_set, SIGQUIT);
if (sigprocmask(SIG_BLOCK, &new_set, &old_set) < 0) // 屏蔽 SIGQUIT
err_sys("sigprocmask failed");
printf("SIGQUIT now blocked\n");

sleep(5);
// pending 的多个 SIGQUIT 信号不会被排队,最终只有 1 个信号会被 handler 处理一次
if (sigpending(&pending_set) < 0)
err_sys("sigpending failed");
if (sigismember(&pending_set, SIGQUIT))
printf("SIGQUIT pending\n");

// 解除屏蔽,不能直接 SET_UNBLOCK,调用方可能也 block SIGQUIT
if (sigprocmask(SIG_SETMASK, &old_set, NULL) < 0)
err_sys("sigprocmask failed");
printf("SIGQUIT now unblocked\n");
}
1
2
3
4
SIGQUIT now blocked
^\^\^\^\^\^\SIGQUIT pending
caught SIGQUIT
SIGQUIT now unblocked

sigsuspend:原子地恢复信号屏蔽字,并挂起进程,解决先 pause 后 SET_UNLOCK 导致的永久性阻塞问题

信号 IPC 实例:实现两个进程的同步

实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static volatile sig_atomic_t sig_flag;
static sigset_t new_mask, old_mask, zero_mask;

static void sig_usr(int signo) { sig_flag = 1; }
void TELL_WAIT() {
if (signal(SIGUSR1, sig_usr) < 0)
err_sys("signal failed");
if (signal(SIGUSR2, sig_usr) < 0)
err_sys("signal failed");

sigemptyset(&zero_mask);
sigemptyset(&new_mask);
sigaddset(&new_mask, SIGUSR1);
sigaddset(&new_mask, SIGUSR2);
if (sigprocmask(SIG_BLOCK, &new_mask, &old_mask) < 0) // 屏蔽 SIGUSR1, SIGUSR2
err_sys("sigprocmask failed");

}

void TELL_PARENT(pid_t pid) { kill(pid, SIGUSR1); }
void WAIT_PARENT() {
while (sig_flag == 0) // 检查 flag 避免是从其他信号中恢复
sigsuspend(&zero_mask); // 等待对方执行完毕,发来信号
if (sigprocmask(SIG_SETMASK, &old_mask, NULL) < 0)
err_sys("sigprocmask failed");
}

void TELL_CHILD(pid_t pid) { kill(pid, SIGUSR2); }
void WAIT_CHILD() {
while (sig_flag == 0)
sigsuspend(&zero_mask);
if (sigprocmask(SIG_SETMASK, &old_mask, NULL) < 0)
err_sys("sigprocmask failed");
}

ch11. 线程

pthread_t:实现多为 unsigned long ,需用 pthread_equal 比较; pthread_self 读取当前线程 tid

pthread_create:指定属性创建执行入口为 start_routine 函数且实参为 args 的线程,线程标识存入 tidp

1
2
3
4
5
// 与进程级函数检查 errno 不同,线程函数大多直接返回错误标记,须尽早检查处理
int pthread_create(pthread_t *restrict tidp,
const pthread_attr_t *restrict attr,
void *(*start_routine) (void *),
void *restrict args);

pthread_exit:线程退出时可指定 void* 终止值,但需保证值线程退出后地址仍有效

pthread_join:阻塞等待线程退出并获取返回值,此操作会导致调用线程处于 detached 分离状态

pthread_cleanup_push, pthread_cleanup_pop:注册线程清理函数,退出前以栈顺序执行,宏实现需配对

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void *fn(void *arg) {
pthread_cleanup_push(cleanup, "handler_1 arg");
pthread_cleanup_push(cleanup, "handler_2 arg"); // 先被执行
sleep(1);
pthread_exit((void *) 20);
pthread_cleanup_pop(0); // 仅用于宏对齐,已提前 exit,实际不 pop handler
pthread_cleanup_pop(0);
}

void do_pthread() {
pthread_t tid1, tid2;
void *rval1, *rval2;
int err;
if ((err = pthread_create(&tid1, NULL, fn, NULL)) != 0) err_exit(err, "create failed");
if ((err = pthread_create(&tid2, NULL, fn, NULL)) != 0) err_exit(err, "create failed");
if ((err = pthread_cancel(tid1)) != 0) err_exit(err, "cancel failed"); // 线程 1 被中断
if ((err = pthread_join(tid1, &rval1)) != 0) err_exit(err, "join failed");
printf("thread canceled: %d\n", (void *) rval1 == PTHREAD_CANCELED); // 1

if ((err = pthread_detach(tid2)) != 0) err_exit(err, "detached failed"); // 分离线程,线程结束即清理返回值等资源
if ((err = pthread_join(tid2, &rval2)) != 0) err_exit(err, "join failed"); // Invalid argument
}

控制原语对比:

进程原语 线程原语 描述
fork pthread_create 创建线程
exit pthread_exit 线程退出
waitpid pthread_join 等待指定线程退出
atexit pthread_cancel_push 注册线程退出 hook 函数
getpid pthread_self 获取线程 id
abort pthread_cancel 请求线程退出

线程同步

线程安全:资源只读、原子操作、资源加锁

1. 互斥量 Mutex

概念:访问共享资源前对互斥量加锁,完成后释放锁,同一时间保证只有一个线程访问共享资源

初始化:锁结构类型为 pthread_mutex_t,静态锁可用常量 PTHREAD_MUTEX_INITIALIZER 初始化。也能用 pthread_mutex_init 指定属性来初始化,但需在退出前 pthread_mutex_destroy 清理锁资源

加锁:pthread_mutex_lock 阻塞加锁,pthread_mutex_trylock 尝试加锁失败返回 EBUSYpthread_mutex_timedlock 在绝对时间戳前加锁仍失败返回 ETIMEDUT

解锁:pthread_mutex_unlock 释放锁

避免死锁:严格保证有序加锁(即范围大锁);若多把锁的加锁顺序无法排序时,可用 trylock 若加锁失败则主动释放可能造成死锁的锁,稍后再试

2. 读写锁 RWLock

概念:多个线程可以读模式加锁,但只有一个线程可以写模式加锁,二者互斥。提高了读并行性,适合读次数远多于写的场景

pthread_rwlock_t 操作:

1
2
3
4
5
PTHREAD_RWLOCK_INITIALIZER // 静态初始化
pthread_rwlock_init, pthread_rwlock_destroy // 动态初始化与清理
pthread_rwlock_rdlock, pthread_rwlock_wrlock, pthread_rwlock_unlock // 加读锁、加写锁、解锁
pthread_rwlock_tryrdlock, pthread_rwlock_trywrlock // 尝试加读锁、写锁
pthread_rwlock_timedrdlock, pthread_rwlock_timedwrlock // 带超时的读锁、写锁

示例:读写锁实现的 job queue,主线程新增 job,worker 线程处理自己的 job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
struct job {
struct job *prev, *next;
pthread_t worker_tid; // 主线程指定处理此 job 的线程
};

struct queue {
struct job *head, *tail;
pthread_rwlock_t rwlock;
};

int queue_init(struct queue *q) {
int err;
q->head = NULL;
q->tail = NULL;
if ((err = pthread_rwlock_init(&q->rwlock, NULL)) != 0)
return err;
return 0;
}

void job_append(struct queue *q, struct job *j);
void job_insert(struct queue *q, struct job *j) {
pthread_rwlock_wrlock(&q->rwlock);
j->prev = NULL;
j->next = q->head;
if (q->head == NULL)
q->tail = j;
else
q->head->prev = j;
q->head = j; // 更新队列头结点
pthread_rwlock_unlock(&q->rwlock);
}

void job_remove(struct queue *q, struct job *j) {
pthread_rwlock_wrlock(&q->rwlock);
if (j == q->head) {
q->head = q->head->next;
if (j == q->tail)
q->tail = NULL;
else
j->next->prev = NULL;
} else if (j == q->tail) {
j->prev->next = NULL; // 已判断过 j 不是 head
} else {
j->prev->next = j->next;
j->next->prev = j->prev;
}
pthread_rwlock_unlock(&q->rwlock);
}

struct job *job_find(struct queue *q, pid_t worker_tid) {
struct job *j;
pthread_rwlock_rdlock(&q->rwlock);
for (j = q->head; j != NULL; j = j->next)
if (pthread_equal(worker_tid, j->worker_tid)) // pthread_t 比较函数
break;
pthread_rwlock_unlock(&q->rwlock);
return j;
}

3. 条件变量 ConditionVariable

概念:多线程以无竞争的方式等待特定条件发生,依赖互斥量保护。条件不满足时线程进入等待集,临时释放锁,条件满足时被唤醒并带上锁返回

pthread_cond_t 操作:

1
2
3
4
PTHREAD_COND_INITIALIZER // 静态初始化
pthread_cond_init, pthread_cond_destroy // 动态初始化与清理
pthread_cond_wait, pthread_cond_timedwait // 阻塞等待条件满足唤醒,带绝对超时时间的等待
pthread_cond_signal, pthread_cond_broadcast // 通知单个线程条件已满足,广播唤醒所有线程竞争

示例:条件变量实现 message queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
struct msg {
struct msg *next;
char *content;
};

struct msg *mq;
pthread_mutex_t cond_lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void msg_process() {
struct msg *msg_ptr;
while (1) {
pthread_mutex_lock(&cond_lock);
while (mq == NULL)
pthread_cond_wait(&cond, &cond_lock);
msg_ptr = mq;
mq = mq->next;
pthread_mutex_unlock(&cond_lock);
printf("handling: %s\n", msg_ptr->content);
}
}

void msg_enqueue(struct msg *new_msg) {
pthread_mutex_lock(&cond_lock);
new_msg->next = mq;
mq->next = new_msg;
// pthread_cond_signal(&cond); // 被唤醒的线程条件必定已满足
pthread_mutex_unlock(&cond_lock);
pthread_cond_signal(&cond); // 锁可能已被抢占,被唤醒后条件可能不再满足
}

4. 自旋锁 SpinLock

概念:获取锁失败后线程并不挂起等待,而是 BusyWaiting 等待锁释放,适用于锁占用时间极短的场景

5. 屏障 Barrier

概念:协调多线程阶段性合作。屏障逐个阻挡(挂起)线程,直到所有合作线程都达到某个条件,才都继续执行

pthread_barrier_t 操作:

1
2
3
4
pthread_barrier_init // 指定屏障允许所有线程恢复运行之前,必须到达屏障的线程数
pthread_barrier_destroy // 清理屏障
// 指明当前线程已完成工作,等待其他线程赶上来
pthread_barrier_wait // 唯一返回 PTHREAD_BARRIER_SERIAL_THREAD 的线程可做调度

示例:屏障协调多线程分区排序。8 个线程对各自区间内的整数排序,最后由主线程 merge 最终有序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#define THREADS 8
#define NUMS 8000000L
#define NUM_PER_THREAD (NUMS/THREADS)

long nums[NUMS];
long sorted_nums[NUMS];
pthread_barrier_t barrier;

int cmp_long(const void *v1, const void *v2) {
long n1 = *(long *) v1;
long n2 = *(long *) v2;
if (n1 == n2) return 0;
else if (n1 < n2) return -1;
else return 1;
}

void *thread_sort_task(void *arg) {
long start_idx = (long) arg;
heapsort(&nums[start_idx], NUM_PER_THREAD, sizeof(long), cmp_long);
pthread_barrier_wait(&barrier); // 本区间排序完成,等待其他线程
return (void *) 0;
}

void merge_sorted_sections() {
long start_idxs[THREADS];
for (int i = 0; i < THREADS; ++i)
start_idxs[i] = i * NUM_PER_THREAD;

long i, j;
long min_val, min_idx;
for (i = 0; i < NUMS; ++i) {
min_val = LONG_MAX;
for (j = 0; j < THREADS; j++) {
// 区间不越界,比较各区间头,取最小值
if ((start_idxs[j] < (j + 1) * NUM_PER_THREAD) && (nums[start_idxs[j]]) < min_val) {
min_val = nums[start_idxs[j]];
min_idx = j;
}
}
sorted_nums[i] = min_val;
// 被取走的区间继续向后推进
start_idxs[min_idx]++;
}
}

int main() {
srandom(1);
for (int i = 0; i < NUMS; ++i)
nums[i] = random();

struct timeval start_time, end_time;
gettimeofday(&start_time, NULL);
pthread_barrier_init(&barrier, NULL, THREADS + 1); // +1 主线程也参与排序

pthread_t tid;
int err;
for (int i = 0; i < THREADS; ++i) {
err = pthread_create(&tid, NULL, thread_sort_task, (void *) (i * NUM_PER_THREAD));
if (err != 0)
return -1;
}
pthread_barrier_wait(&barrier); // 主线程确保 8 个线程都完成

merge_sorted_sections(); // 合并 8 个有序区间
gettimeofday(&end_time, NULL);

long long start_us = start_time.tv_sec * 1000000 + start_time.tv_usec;
long long end_us = end_time.tv_sec * 1000000 + end_time.tv_usec;
double cost = (double) (end_us - start_us) / 1000000.0;
printf("cost: %.4f seconds\n", cost);

for (int i = 0; i < 10; ++i)
printf("%ld, ", sorted_nums[i]);
printf("\n");
}

效果:8 线程排序比单线程快六倍


ch12. 线程控制

线程、同步对象的属性操作模式:关联属性类型、初始化函数设为默认值,清理函数,属性 settergetter

线程四个属性:detachstate 分离状态、guardsize 栈末尾警戒缓冲区大小,stacksize 栈大小,stackaddr 栈最低地址。用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void pthread_attr() {
pthread_attr_t pa;
int detach_state;
size_t stack_size, guard_size;

if (pthread_attr_init(&pa) != 0) err_sys("init failed");
pthread_attr_setdetachstate(&pa, PTHREAD_CREATE_DETACHED);
if (pthread_attr_getdetachstate(&pa, &detach_state) == 0)
printf("detached: %d\n", detach_state == PTHREAD_CREATE_JOINABLE); // 0
if (pthread_attr_getstacksize(&pa, &stack_size) == 0)
printf("stack_size: %zuMB\n", stack_size/1024/1024); // 8MB
if (pthread_attr_getguardsize(&pa, &guard_size) == 0)
printf("guard_size: %zuKB\n", guard_size/1024); // 4KB
pthread_attr_destroy(&pa);
}

线程私有数据:相同 key 每个线程可存储私有数据,仅自己可访问不存在同步问题,如 errno

1
2
3
4
pthread_key_create // 创建 pthread_key_t 类型的公共 key,并指定析构函数
pthread_key_delete // 解除 key 的关联关系
pthread_setspecific // 设置当前线程 key 的关联数据
pthread_getspecifi // 读取关联数据

pthread_once_t:线程安全地保证指定操作仅且执行一次

示例:线程安全的 getenv 实现,各线程包含私有缓冲区指向 env 字符串值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
pthread_once_t ponce = PTHREAD_ONCE_INIT;
pthread_mutex_t env_lock = PTHREAD_MUTEX_INITIALIZER;
pthread_key_t env_key;
void thread_init() { pthread_key_create(&env_key, NULL); }

char *safe_getenv(const char *s) {
char *env_buf;
size_t len;
int i;

pthread_once(&ponce, thread_init); // 保证 key 只会创建一次
pthread_mutex_lock(&env_lock);
env_buf = (char *) pthread_getspecific(env_key);
if (env_buf == NULL) { // 当前线程尚未关联 env_key
if ((env_buf = malloc(1024)) == NULL) {
pthread_mutex_unlock(&env_lock);
err_sys("malloc failed");
}
pthread_setspecific(env_key, env_buf); // set 私有数据
}
len = strlen(s);
for (i = 0; environ[i] != NULL; ++i) {
if (strncmp(s, environ[i], len) == 0 && environ[i][len] == '=') {
strncpy(env_buf, &environ[i][len - 1], 1024); // 拷贝 env 值作为线程私有数据
pthread_mutex_unlock(&env_lock);
return env_buf;
}
}
pthread_mutex_unlock(&env_lock);
return NULL;
}

线程取消:线程可用 pthread_setcancelstate 配置响应 pthread_cancel 请求,默认是 PTHREAD_CANCEL_ENABLE,可配置为 PTHREAD_CANCEL_DISABLE 忽略此请求

线程信号:线程共享进程的 signal handler,无法对同一个信号独立注册不同 handler;但线程可单独用 pthread_sigmask 屏蔽指定的信号集;用 sigwait 等待信号集中的信号发生;用 pthread_kill 发送具体信号给指定的线程

线程和 IO:pread[lseek,read] 的原子操作,同理有线程安全的写 pwrite


ch13. 守护进程

系统日志:后台进程记录日志;可选的 openlog 设置进程名和固定字段,syslog 记录日志,日志等级:

  • EMERG, ALERT, CRIT:系统不可用致命错误,需立刻修复的报警,严重情况
  • ERR, WARNING, NOTICE, INFO, DEBUG:出错,警告,重要信息,普通信息,调试信息
1
2
3
4
5
6
void logging() {
openlog("logging", LOG_PID, LOG_DAEMON);
errno = ENOENT;
syslog(LOG_ERR, "open %s failed: %m", "non-exist.txt");
}
// AUG 14 13:22:03 mesa logging[629796]: open non-exist.txt failed: No such file or directory

ch14. 高级 I/O

非阻塞 IO:在 open 时指定 O_NONBLOCK 打开文件,或 fcntl 为已打开文件新增此标记,之后对此文件的低速 IO 操作若不能完成会立即出错返回,不再阻塞

记录锁 record lock:又名 byte-range lock,用来锁定文件中指定的字节区域。结构为:

1
2
3
4
5
6
7
struct flock {
short l_type; /* lock type: read/write, etc. */
off_t l_start; /* starting offset */
short l_whence; /* type of l_start */
off_t l_len; /* len = 0 means until end of file */
pid_t l_pid; /* lock owner */
};
  • l_type:锁类型,F_RDLCK 共享读锁,F_WRLCK 独占写锁,F_UNLCK 解锁
  • l_start, l_whence:描述锁字节偏移量和相对位置,whence 取值同 lseek,同理能向后越过文件尾,却不能向前越过文件头
  • l_len:锁区域长度,0 值表示延伸到 EOF,追加写的新数据依旧在锁范围内

fcntl:操作指定 fd 对应的记录锁,F_GETLK 测试是否能加锁;F_SETLK 加锁失败返回 EAGAIN, EACCESS;F_SETLKW 阻塞等待加锁直到锁释放唤醒。进程可多次加记录锁来替换旧锁,即进程检测自己持有的锁无意义,故 F_GETLK 不予报告返回

记录锁的继承和释放,三条规则:

  • 子进程不继承父进程的记录锁:继承则父子进程都能写同一 byte-range,违背记录锁初衷
  • FD_CLOEXEC 标记:exec 执行的新程序默认继承记录锁,但若 fd 设置此标记则记录锁会被释放
  • 锁由进程持有:进程退出时持有的记录锁都会解锁释放。由于记录锁信息存入 i-Node,当进程打开同一文件多次生成多个 fd,关闭其中任一 fd,都会导致其他 fd 上的记录锁被释放

I/O Multiplex,IO 多路复用

问题:从多个 fd 同时读写数据

  • 方案1:多进程阻塞读多个 fd,存在信号传递问题
  • 方案2:多线程阻塞读多个 fd,存在线程间资源同步的问题
  • 方案3:单进程使用非阻塞 IO 轮询读多个 fd,存在 CPU 空转、等待时间不好估算的问题
  • 方案4:异步 IO,当某个 fd 准备好读写后,内核发信号异步通知进程,存在信号数量过少的问题
  • 方案5(最佳):构造要读写的 fd 表,由 I/O multiplex 函数去监测并在有 fd 准备好读写后返回告知进程

多路复用 1:select, pselect

1
2
3
4
5
6
7
8
9
10
11
12
// maxfdp1: 关注的最大描述符编号加 1,即要检查的 fd 个数,描述 fd 区间
// rfds, wfds, expfds:关注的可读、可写、异常等条件的就绪 fd_set
// timeout: 微秒级超时(0 值不等待,NULL 则无限等待),就绪返回则存入剩余时间
// 返回值:就绪 fd 个数,超时返回 0,出错返回 -1
int select (int maxfdp1, fd_set *restrict readfds,
fd_set *restrict writefds, fd_set *restrict exceptfds,
struct timeval *restrict timeout);
// 纳秒级超时且为 const,可屏蔽某些信号
int pselect (int maxfdp1, fd_set *restrict readfds,
fd_set *restrict writefds, fd_set *restrict exceptfds,
const struct timespec *restrict timeout,
const sigset_t *restrict sigmask);

fd_set:文件描述符集合,可视为足够大的字节数组,每个 fd 占 1 位。操作函数:FD_ZERO 将集合置零,FD_SET 将 fd 加入集合,FD_CLR 从集合删除 fd,FD_ISSET 查询 fd 是否在集合中

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void do_select() {
struct timeval tv;
struct timespec tp;
int ready, fd, maxfdp1;
fd_set rs, ws;
sigset_t sigset;

if ((fd = open("/tmp/x.txt", O_RDWR)) < 0)
err_sys("open failed");
maxfdp1 = fd + 1; // 3+1

FD_ZERO(&rs); FD_ZERO(&ws);
FD_SET(STDIN_FILENO, &rs); FD_SET(STDOUT_FILENO, &ws);
FD_SET(fd, &rs); FD_SET(fd, &ws);

tp.tv_sec = 2;
tp.tv_nsec = 0;
sigemptyset(&sigset);
sigaddset(&sigset, SIGQUIT);
sigprocmask(SIG_BLOCK, &sigset, NULL);
pselect(maxfdp1, NULL, NULL, NULL, &tp, &sigset); // 精确等待 2s,屏蔽 SIGQUIT

tv.tv_sec = 2;
tv.tv_usec = 0;
ready = select(maxfdp1, &rs, &ws, NULL, &tv);
printf("select ready: %d\n", ready); // 4 // 同一 fd 可读可写,会被计入 2 次就绪数
if (ready > 0) {
for (int i = 0; i < maxfdp1; ++i) {
if (FD_ISSET(i, &rs))
printf("fd %d readable\n", i);
if (FD_ISSET(i, &ws))
printf("fd %d writeable\n", i);
}
}
}

结果:

1
2
3
4
5
6
^\aaa
select ready: 4
fd 0 readable
fd 1 writeable
fd 3 readable # 普通文件的 fd 读写总是就绪
fd 3 writeable

select 问题:

  • 轮询耗时:select 会逐个遍历并检测 [0, maxfdp1) 内的 fd 是否就绪,fd 越多耗时越长
  • 轮询的 fd 数量有上限:fd_set 最多能容纳 FD_SETSIZE,1024 个 fd,超过此限制可能返回 EBADF 错误
  • fd 拷贝开销:select 需将 fd_set 在用户空间和内核空间拷贝,fd 越多开销越大
  • 水平触发:若 fd 返回就绪但未进行 IO 操作,下次 select 还会报告此 fd 就绪

多路复用2:poll

1
2
3
4
// fds:关注的用 pollfd 描述事件的 fd 数组
// timeout:毫秒级超时,0 不等待,-1 阻塞等待
// 返回值:就绪 fd 个数,超时返回 0,出错返回 -1
int poll (struct pollfd *fds, nfds_t nfds, int timeout);

相比 select 为集合区间内的每个 fd 都预留条件位,poll 用 pollfd 来精确描述关注的 fd 及其事件类型:

1
2
3
4
5
struct pollfd {
int fd; /* File descriptor to poll. */
short int events; /* Types of events poller cares about. */ // 注册给 poll 的事件类型
short int revents; /* Types of events that actually occurred. */ // ready 就绪的事件类型
};

事件类型:POLL_IN 可读,POLLOUT 可写,POLLERR 出错,POLLHUP 连接断开(不可写,但可能仍可读)

  • 优点: 注册事件的 fd 作为参数传入,数量不受限制

  • 缺点:与 select 类似也是轮询检测 fd 是否就绪,fd 越多耗时越长;同理也存在 fd 数组在用户态和内核态间的拷贝开销

异步 I/O

概念:相比 select, poll 主动注册并查询事件状态,异步 IO 则是让内核在事件就绪后用信号异步通知进程。更多细节参考手册 man aio

内存映射(memory-mapped)I/O

概念:将磁盘文件的字节区域映射到内存中,读写内存会自动读写文件。示意图:

实例:用 mmap 拷贝文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#define COPY_STEP 1024*1024*1024 // 1GB
void copy_file_by_mmap(const char *file_src, const char *file_dst) {
int fd_src, fd_dst;
void *src, *dst;
struct stat st;
off_t fsz = 0;
size_t cpysz;

if ((fd_src = open(file_src, O_RDONLY)) < 0)
err_sys("open src failed");
if ((fd_dst = open(file_dst, O_RDWR | O_CREAT | O_TRUNC)) < 0)
err_sys("open src failed");
if (fstat(fd_src, &st) < 0)
err_sys("fstat failed");
if (ftruncate(fd_dst, st.st_size) < 0)
err_sys("extend dst file failed"); // 尝试扩展预留大小
while (fsz < st.st_size) {
if ((st.st_size - fsz) > COPY_STEP) // 每次最多映射 1GB
cpysz = COPY_STEP;
else
cpysz = st.st_size - fsz;
if ((src = mmap(0, cpysz, PROT_READ, MAP_SHARED, fd_src, fsz)) == MAP_FAILED)
err_sys("mmap src failed");
// 系统分配映射区起始地址,可读可写,且写操作会写底层文件,指定 fd 从偏移量开始映射 cpysz 长的字节区域
if ((dst = mmap(0, cpysz, PROT_READ|PROT_WRITE, MAP_SHARED, fd_dst, fsz)) == MAP_FAILED)
err_sys("mmap dst failed");

memcpy(dst, src, cpysz);
munmap(src, cpysz); // 解除映射区
munmap(dst, cpysz);
fsz += cpysz;
}
}

readv, writev:按顺序读写多个非连续的缓冲区,类比 Java 中 Bytebuffer 的 ScatterRead 和 GatherWrite 操作;多个缓冲区用 iovec 描述

1
2
3
4
struct iovec {
void *iov_base; /* Pointer to data. */ // 缓冲区数组
size_t iov_len; /* Length of data. */ // 数组长度
};

ch15. 本地 IPC

1. Pipe 匿名管道

概念:半双工单向读写数据的 IPC,只能在具有共同祖先的进程间使用(fork 时复制的 fd);管道本身是由内核维护的大小为 _PC_PIPE_BUF,4KB 的缓冲区

pipe(int fd[2])fd[0] 读打开,fd[1] 写打开,进程通常会 close 不使用的一端:

  • 写端已关闭,read 返回 0 标识 EOF
  • 读端已关闭,write 返回 -1,errno 为 EPIPE 标识出错

示例:管道实现进程同步

实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static int pfd[2], cfd[2];
void TELL_WAIT() {
if (pipe(pfd) < 0 || pipe(cfd) < 0)
err_sys("pipe failed");
}

void TELL_CHILD() {
if (write(pfd[1], "P", 1) != 1)
err_sys("write failed");
}

void WAIT_CHILD() {
char buf[1];
if (read(cfd[0], buf, 1) != 1)
err_sys("read error");
if (buf[0] != 'C')
err_quit("WAIT_CHILD: invalid data");
}

void TELL_PARENT() {
if (write(cfd[1], "C", 1) != 1)
err_sys("write failed");
}

void WAIT_PARENT() {
char buf[1];
if (read(pfd[0], buf, 1) != 1)
err_sys("read error");
if (buf[0] != 'P')
err_quit("WAIT_CHILD: invalid data");
}

popen, pclose:即 pipe+spawn(fork+exec); 创建子进程执行指定命令,读其 stdout 或写其 stdin

2. FIFO 命名管道

概念:特殊文件,用于在不相关进程间交换数据

mkfifo:与 pipe 不同,只读 open 会阻塞直到其他进程写 open,反之亦然。若指定 mode O_NONBLOCK 则只读 open 立刻返回,只写 open 返回 -1,errno 置于 ENXIO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void do_named_pipe(int flag) {
pid_t pid;

if (mkfifo("/tmp/x", 0644) < 0) // prw-r--r--
err_sys("mkfifo failed");
if ((pid = fork()) < 0) {
err_sys("fork failed");
} else if (pid == 0) {
if (flag == 1) {
if (open("/tmp/x", O_RDONLY) < 0) // 无写阻塞;O_NONBLOCK 直接返回
err_sys("open failed");
} else {
if (open("/tmp/x", O_WRONLY | O_NONBLOCK) < 0) // 无读则阻塞; O_NONBLOCK 报错
err_sys("open failed"); // No such device or address
}
printf("child: opened\n");
}
waitpid(pid, NULL, 0);
}

消息队列、信号量、共享存储等 IPC 在 XSI 标准中的共同特征:

1. IPC 标识:内核对内用非负整数标识 ipc,对外则将 ipc 与 key_t 键关联作为外部名。取值如下

  • IPC_PRIVATE:key 由内核生成,进程可将其值写到文件,供其他进程读取,或直接在父子进程间共享
  • ftok:使用存在文件的路径和给定的 id,生成唯一的 key_t

2. IPC 操作权限

1
2
3
4
5
6
7
8
struct ipc_perm {
key_t key; /* Key. */
uid_t uid; /* Owner's user ID. */
gid_t gid; /* Owner's group ID. */
uid_t cuid; /* Creator's user ID. */
gid_t cgid; /* Creator's group ID. */
mode_t mode; /* Read/write permission. */
}

3. MessageQueue 消息队列

队列信息定义:

1
2
3
4
5
6
7
8
9
10
struct msqid_ds {
struct ipc_perm msg_perm; /* structure describing operation permission */
time_t msg_stime; /* time of last msgsnd command */
time_t msg_rtime; /* time of last msgrcv command */
time_t msg_ctime; /* time of last change */
msgqnum_t msg_qnum; /* number of messages currently on queue */
msglen_t msg_qbytes; /* max number of bytes allowed on queue */
pid_t msg_lspid; /* pid of last msgsnd() */
pid_t msg_lrpid; /* pid of last msgrcv() */
};
  • msgget:创建新队列,或指定 key 打开现有队列,返回消息队列 id

  • msgctlIPC_STAT 读取队列信息,IPC_SET 更新队列信息,IPC_RMID 删除指定队列

  • msgsnd:向指定队列发送消息,消息体的前 sizeof(long) 个字节表明消息 type,之后才是的变长消息数据

  • msgrcv:从指定队列接收消息,可指定消息 type 做过滤,IPC_NOWAIT 无消息可用也不阻塞

使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
struct msg_t {
long type;
int data;
};

void do_mq() {
key_t ipc_key;
int msq_id, msg_id;
struct msg_t sent_msg, rcv_msg;
struct ipc_perm perm;
struct msqid_ds ds;

ipc_key = ftok("/tmp", 100);
if ((msq_id = msgget(ipc_key, IPC_CREAT | 0666)) < 0)
err_sys("msgget failed");

sent_msg.type = 1;
sent_msg.data = 999;
if (msgsnd(msq_id, &sent_msg, sizeof(struct msg_t), IPC_NOWAIT) < 0)
err_sys("msgsnd failed");
memset(&rcv_msg, 0, sizeof(rcv_msg));
if (msgrcv(msq_id, &rcv_msg, sizeof(struct msg_t), 0, IPC_NOWAIT) == -1)
err_sys("msgrcv failed");
printf("rcv data: %d\n", rcv_msg.data); // 999

memset(&perm, 0, sizeof(struct ipc_perm));
ds.msg_perm = perm;
if (msgctl(msq_id, IPC_STAT, &ds) == -1)
err_sys("msgctl failed");
printf("last read pid: %d, %d\n", ds.msg_lrpid, getpid()); // 13081, 13081
}

4. Semaphore 信号量

概念:资源计数器,保护多进程对共享数据的访问。初始值表明资源个数,进程获取资源则计数减一,释放则加一,获取前若资源个数已归零则休眠等待其他进程释放资源。定义:

1
2
3
4
typedef union {
char size[32];
long int align;
} sem_t;

操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 以 init_val 为资源个数,mode 文件权限来创建指定路径名的信号量文件,oflag 需取值 O_CREAT
// 或读取路径为 name 的信号量
sem_t *sem_open (const char *name, int oflag, .../* mode_t mode, unsigned int init_val */);
int sem_close(sem_t *sem); // 释放信号量资源,进程退出系统会自动释放,类比 fd
int sem_unlink(sem_t *sem); // 删除信号量名字,若无引用则销毁,类比文件 unlink

int sem_init(sem_t *sem, int pshared, unsigned int init_val); // 创建内存态匿名信号量,类比匿名管道
int sem_destroy(sem_t *sem); // 销毁信号量,不能再使用

int sem_wait(sem_t *sem); // 阻塞获取信号量,计数 -1
int sem_trywait(sem_t *sem); // 非阻塞,计数为 0 则 errno=EAGAIN
int sem_timedwait(sem_t *sem, const struct timespec* tsp); // 绝对时间超时等待,超时则 ETIMEDOUT
int sem_post(sem_t *sem); // 释放信号量,计数 +1
int sem_getvalue(sem_t *sem, int *valp) // 调试用读取当前计数值

示例:用计数为 1 的二元信号量实现互斥锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
struct slock {
sem_t *sem;
char name[255];
};

struct slock *s_create() {
struct slock *sp;
static int count;
if ((sp = malloc(sizeof(struct slock))) == NULL)
return NULL;
do {
snprintf(sp->name, 255, "/tmp/%d.%d", getpid(), count++); // 递增获取唯一文件名
sp->sem = sem_open(sp->name, O_CREAT | O_EXCL, S_IRWXU, 1); // 原子 create-if-not-exist 创建文件
} while (sp->sem == SEM_FAILED && errno == EEXIST); // 并发抢占创建文件失败可重试

if (sp->sem == SEM_FAILED) {
free(sp);
return NULL;
}
sem_unlink(sp->name);
return sp;
}

void s_free(struct slock *sp) {
sem_close(sp->sem);
free(sp);
}

int s_lock(struct slock *sp) {
return sem_wait(sp->sem);
}

int s_trylock(struct slock *sp) {
return sem_trywait(sp->sem);
}

int s_unlock(struct slock *sp) {
return sem_post(sp->sem);
}

5. ShareMemory 共享内存

概念:多进程共享同一片匿名内存区,独占写、共享读的约定需额外的互斥量或信号量保证。定义:

1
2
3
4
5
6
7
8
9
10
struct shmid_ds {
struct ipc_perm shm_perm; /* operation permission struct */
size_t shm_segsz; /* size of segment in bytes */
time_t shm_atime; /* time of last shmat() */
time_t shm_dtime; /* time of last shmdt() */
time_t shm_ctime; /* time of last change by shmctl() */
pid_t shm_cpid; /* pid of creator */
pid_t shm_lpid; /* pid of last shmop */
shmatt_t shm_nattch; /* number of current attaches */
};

操作:

  • shmget:指定内存长度和读写权限创建共享内存段,或引用 key 指定的内存
  • shmctl: 读写 shmid 指向的共享内存信息,或删除内存段
  • shmat attach 以只读或读写方式连接共享内存,shmdt detach 释放与共享内存的连接

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void pr_shm() {
int shmid;
char *shm1, *shm2, *cur;

if ((shmid = shmget(IPC_PRIVATE, 1024, S_IRWXU)) == -1)
err_sys("shmget failed");
if ((shm1 = shmat(shmid, 0, 0)) == NULL)
err_sys("shmat failed");
cur = shm1;
*cur++ = 'A';
*cur++ = 'B';
*cur = '\0';
shmdt(shm2); // 共享内存会一直存在直到显式 IPC_RMID
if (fork() == 0) {
if ((shm2 = shmat(shmid, 0, SHM_RDONLY)) == NULL)
err_sys("shmat failed");
printf("child read shm: %s\n", shm2); // AB
shmdt(shm2);
exit(0);
}
sleep(1);
shmctl(shmid, IPC_RMID, 0);
}

ch16. 网络 IPC

Socket:网络通信端点的抽象,通过 socket 描述符操作,可复用大多数针对 fd 的函数。创建 socket:

1
2
// 返回 socket fd,出错返回 -1
int socket (int domain, int type, int protocol);
  • domain 确定 socket 通信域(端点的地址类型),由地址族 address family 描述:AF_INET 即 IPV4 因特网,AF_INET6 即 IPV6,AF_UNIX 即本地 unix 通信域,AF_UPSPEC 未指定
  • type 确定 socket 通信数据特征:SOCK_STREAM 即面向连接、有序、可靠字节流的 TCP,SOCK_DGRAM 即面向报文不可靠无连接的 UDP,SOCK_SEQPACKET 面向连接、有序、可靠报文的协议如 SCTP,SOCK_RAW 即原生 IP 数据报
  • protocol:当 <domain, type> 支持多个协议时,由 protocol 选择特定协议,为 0 则使用默认值,如 SOCK_STREAM 默认 IPPROTO_TCPSOCK_DGRAM 默认 IPPROTO_UDP 等等

shutdown:定向关闭 socket 的端点,SHUT_RD 关闭读端,SHUT_WR 关闭写端,SHUT_RDWR 关闭读写

字节序:多于 1 字节类型的数据在内存中的存放顺序

  • Big-Endian 大端字节序:高位字节存于低地址,低位字节存于高地址;如 TCP 网络字节序

  • Little-Endian 小段字节序:低位字节存于低地址,高位字节存于高地址;如大多数 CPU 字节序

    参考进程虚拟内存布局,以 0x12345678 值为例,两种字节序布局如下:

    1
    2
    3
    4
    5
    6
    7
    8
    	 栈顶高地址		
    0x78 0x12 高位
    0x56 0x34
    0x34 0x56
    0x12 0x78 低位
    栈底低地址
    #大端:从零地址,即从高位开始读,能直接确定数据的符号位,另外也符合阅读习惯
    #小端:从零地址,即从低位开始读,能直接进行位运算,效率高

字节序转换函数

1
2
3
4
5
#include <arpa/inet.h>
uint32_t htonl(uint32_t host32) // h 即 host 小端序
uint16_t htons(uint32_t host16) // n 即 network 大端序
uint32_t ntohl(uint32_t net32) // l 即 uint32 整型
uint16_t ntohs(uint16_t net16) // s 即 uint16 短整型

地址格式:标识 socket 通信端点,通用地址标识,一般各个域的具体地址都需转为通用地址后传参:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct sockaddr {
sa_family_t sa_family; /* address family. */ // socket 域
char sa_data[]; /* variable-length address. */ // 各个域的自定义结构数据,长度不定
}

// IPV4 domain 的 socket 地址
struct in_addr {
in_addr_t s_addr; // ipv4 address
}
struct sockaddr_in {
sa_family_t sin_family;
struct in_addr sin_addr; // 地址
in_port_t sin_port; // 端口号
}
// IPV6 略

IP 地址的二进制格式与点分十进制的转换:

1
2
3
4
// 将 domain 域的、长度为 size 的二进制地址 addr,转为点分十进制存入 str
const char *inet_ntop(int domain, const void* addr, char *str, socklen_t size);
// 将点分十进制地址 str 解析到 domain 域下的二进制地址,存入 addr
int inet_pton(int domain, const char *str, void *addr);

地址查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#include<netdb.h>
// 1. hostent: 本地主机信息
struct hostent { //
char *h_name; /* Official name of host. */
char **h_aliases; /* Alias list. */
int h_addrtype; /* Host address type. */
int h_length; /* Length of address. */
char **h_addr_list; /* List of addresses from name server. */
};
void sethostent (int stayopen); // 如 ch6 所述遍历本地主机信息 /etc/hosts
struct hostent *gethostent (void);
void endhostent (void);

// 2. netent: 网络主机信息
struct netent {
char *n_name; /* Official name of network. */
char **n_aliases; /* Alias list. */
int n_addrtype; /* Net address type. */
uint32_t n_net; /* Network number. */
};
struct netent *getnetbyaddr (uint32_t net, int type);
struct netent *getnetbyname (const char *name);

// 3. protent: 协议端口号与协议名转换
struct protoent {
char *p_name; /* Official protocol name. */
char **p_aliases; /* Alias list. */
int p_proto; /* Protocol number. */
};
void setprotoent (int __stay_open); // /etc/protocols
struct protoent *getprotoent (void);
void endprotoent (void);
struct protoent *getprotobyname (const char *name); // 互转
struct protoent *getprotobynumber (int proto);

// 4. serent: 协议端口号与服务名转换
struct servent {
char *s_name; /* Official service name. */
char **s_aliases; /* Alias list. */
int s_port; /* Port number. */
char *s_proto; /* Protocol to use. */
};
void setservent (int stay_open); // /etc/services
void endservent (void);
struct servent *getservent (void);
struct servent *getservbyname (const char *name, const char *proto); // 互转
struct servent *getservbyport (int port, const char *proto);

// 5. addrinfo: 详细主机地址
struct addrinfo {
int ai_flags; /* Input flags. */
int ai_family; /* Protocol family for socket. */
int ai_socktype; /* Socket type. */
int ai_protocol; /* Protocol for socket. */
socklen_t ai_addrlen; /* Length of socket address. */
struct sockaddr *ai_addr; /* Socket address for socket. */
char *ai_canonname; /* Canonical name for service location. */
struct addrinfo *ai_next; /* Pointer to next in list. */
};
// 主机名+服务名 -> 主机地址链表
int getaddrinfo (const char *name, const char * service,
const struct addrinfo *req, struct addrinfo **pai);
int gai_error (int errno); // 生成转换失败的错误消息
// 主机地址 --> 主机名+服务名
int getnameinfo (const struct sockaddr *sa,
socklen_t salen, char *host,
socklen_t hostlen, char *serv,
socklen_t servlen, int flags);

套接字操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 将 socket fd 绑定到指定地址
int bind(int sockfd, const struct sockaddr *addr, socklen_t len);
// 读取已绑定 socket fd 的地址
int getsockname (int sockfd, struct sockaddr *addr, socklen_t len);
// 读取连接到对方的 peer socket fd 对应的地址
int getpeername (int sockfd, struct sockaddr *addr, socklen_t *alenp);

// 使用本地 sockfd 连接到对端地址
int connect(int sockfd, const struct sockaddr *addr, socklen_t len);
// 监听 sockfd 绑定的地址,backlog 指示能入队的最大未完成连接请求数量
int listen (int sockfd, int backlog);
// 接受请求并建立连接,返回连接到 client 端的 socket fd
int accept(int sockfd, struct addrinfo *addr, socklen_t *alenp);

socket 数据读写专用接口:

1
2
3
4
5
6
// 类似 write,但 flags 控制数据发送方式,如 MSG_DONTWAIT 等价于 O_NONBLOCK
ssize_t send (int sockfd, const void *buf, size_t n, int flags); // sendto=udp_connect + send
// 类似 read,但 flags 控制数据接收方式,如 MSG_WAITALL 会等待收到完整包后才返回
ssize_t recv (int sockfd, void *buf, size_t n, int flags); // recvfrom = udp_connect + recv
// 一次发送 iovec 多个缓冲区数据
ssize_t sendmsg (int sockfd, const struct msghdr *msg, int flags); // recvmsg 接收

配置读取 socket 选项:如 SO_RCVBUF, SO_SNDBUF 设置接收、发送缓冲区的字节长度

1
2
int setsockopt (int fd, int level, int optname, const void *optval, socklen_t optlen);
int getsockopt (int fd, int level, int optname, void *optval, socklen_t *optlen);

示例:TCP EchoServer

epoll 操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 通过 epollfd 增删 fd 上注册的事件
static void do_ctl(int epoll_fd, int fd, int events, int ctl) {
struct epoll_event event;
event.events = events;
event.data.fd = fd;
epoll_ctl(epoll_fd, ctl, fd, &event);
}

static void add_event(int epoll_fd, int fd, int events) {
do_ctl(epoll_fd, fd, events, EPOLL_CTL_ADD);
}
static void del_event(int epoll_fd, int fd, int events) {
do_ctl(epoll_fd, fd, events, EPOLL_CTL_DEL);
}
static void mod_event(int epoll_fd, int fd, int events) {
do_ctl(epoll_fd, fd, events, EPOLL_CTL_MOD);
}

server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// 创建 server socket fd 并监听
int create_server_socket_fd(const char *ip, int port) {
struct sockaddr_in server_addr;
// 1. 创建 socket
int socket_fd = socket(AF_INET, SOCK_STREAM, 0);
if (socket_fd == -1) {
perror("create server socket failed\n");
return -1;
}

// 2. 配置监听地址
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
inet_pton(AF_INET, ip, &server_addr.sin_addr);
server_addr.sin_port = htons(port);

// 3. 尝试绑定
int res = bind(socket_fd, (struct sockaddr *) &server_addr, sizeof(server_addr));
if (res == -1) {
perror("bind failed\n");
return -1;
}

// 4. 监听 socket
listen(socket_fd, 5);
printf("listen on: %s:%d\n", ip, port);
return socket_fd;
}

// 开启 eventloop
static void epoll_loop(int srv_socket_fd) {
// 1. 创建 epoll fd
int epoll_fd = epoll_create(1024);
// 2. 将 server socket fd 注册到 epoll fd
add_event(epoll_fd, srv_socket_fd, EPOLLIN);

struct epoll_event events[EVENT_CNT];
char buf[BUF_SIZE]; // 单进程复用单个读写缓冲区
memset(buf, 0, BUF_SIZE);
while (1) {
// 3. 等待 epoll 返回就绪事件
int ready_event_cnt = epoll_wait(epoll_fd, events, EVENT_CNT, -1);
// 4. 逐个处理读写
for (int i = 0; i < ready_event_cnt; ++i) {
struct epoll_event e = events[i];
int fd = e.data.fd;
if ((fd == srv_socket_fd) && (e.events & EPOLLIN)) {
// 5.1 有就绪则新建连接
handle_accept(epoll_fd, srv_socket_fd);
} else if (e.events & EPOLLIN) {
// 5.2 可读则读连接
handle_read(epoll_fd, fd, buf);
} else if (e.events & EPOLLOUT) {
// 5.3 可写则写数据
handle_write(epoll_fd, fd, buf);
}
}
}
close(epoll_fd);
}

static void handle_accept(int epoll_fd, int srv_socket_fd) {
struct sockaddr_in cli_addr;
socklen_t cli_add_len = sizeof(cli_addr);
int cli_socket_fd = accept(srv_socket_fd, (struct sockaddr *) &cli_addr, &cli_add_len);
if (cli_socket_fd == -1) {
perror("handle accept failed");
return;
}
printf("accept from: %s:%d, fd: %d\n", inet_ntoa(cli_addr.sin_addr), cli_addr.sin_port, cli_socket_fd);
add_event(epoll_fd, cli_socket_fd, EPOLLIN);
}

static void handle_read(int epoll_fd, int cli_socket_fd, char *buf) {
int n = read(cli_socket_fd, buf, BUFSIZ);
switch (n) {
case -1: // 读取失败则关闭连接
perror("read failed, conn closed");
close(cli_socket_fd);
del_event(epoll_fd, cli_socket_fd, EPOLLIN);
break;
case 0: // EOF 也关闭
printf("conn closed: fd:%d\n", cli_socket_fd);
close(cli_socket_fd);
del_event(epoll_fd, cli_socket_fd, EPOLLIN);
break;
default: // 读取成功
printf("[server] recv: %s\n", buf);
// 读取数据后服务端将做处理,为此连接提前注册写事件
mod_event(epoll_fd, cli_socket_fd, EPOLLOUT);
}
}

static void handle_write(int epoll_fd, int cli_socket_fd, char *buf) {
int n = write(cli_socket_fd, buf, strlen(buf));
if (n == -1) {
perror("write failed");
close(cli_socket_fd);
del_event(epoll_fd, cli_socket_fd, EPOLLOUT);
} else {
// 写成功,则切换注册读事件
mod_event(epoll_fd, cli_socket_fd, EPOLLIN);
memset(buf, 0, BUF_SIZE);
}
}

int main() {
int srv_socket_fd = create_server_socket_fd("127.0.0.1", 8082);
if (srv_socket_fd == -1) {
perror("create server socket failed");
return 1;
}
epoll_loop(srv_socket_fd);
close(srv_socket_fd);
}

client.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
static void handle_read(int epoll_fd, int fd, int cli_socket_fd, char *buf) {
int n = read(fd, buf, BUF_SIZE);
switch (n) {
case -1:
perror("read failed, conn closed");
close(fd);
break;
case 0:
printf("conn closed\n");
close(fd);
break;
default:
if (fd == STDIN_FILENO) {
// 有可写数据则注册写事件
add_event(epoll_fd, cli_socket_fd, EPOLLOUT);
} else {
// 等数据写入到 client stdout 后才重新注册读事件
del_event(epoll_fd, cli_socket_fd, EPOLLIN);
add_event(epoll_fd, STDOUT_FILENO, EPOLLOUT);
}
}
}

static void handle_write(int epoll_fd, int fd, char *buf) {
int n = write(fd, buf, strlen(buf));
memset(buf, 0, BUF_SIZE);
if (n == -1) {
perror("write failed");
close(fd);
return;
}
if (fd == STDOUT_FILENO)
del_event(epoll_fd, STDOUT_FILENO, EPOLLOUT); // 本轮已写完毕
else
mod_event(epoll_fd, fd, EPOLLIN); // 为 sockfd 注册读事件,读取更多数据
}

// 开启 eventloop
static void epoll_loop(int cli_socket_fd) {
// 1. 为 client stdin 注册可写事件,等待用户输入
int epoll_fd = epoll_create(EVENT_SIZE);
add_event(epoll_fd, STDIN_FILENO, EPOLLIN);

struct epoll_event events[EVENT_SIZE];
char buf[BUF_SIZE];
memset(buf, 0, BUF_SIZE);
while (1) {
int ready_event_cnt = epoll_wait(epoll_fd, events, EVENT_SIZE, -1);
for (int i = 0; i < ready_event_cnt; ++i) {
struct epoll_event e = events[i];
if (e.events & EPOLLIN) {
// 2.1 stdin 或 sockfd 中可读
handle_read(epoll_fd, e.data.fd, cli_socket_fd, buf);
} else if (e.events & EPOLLOUT) {
// 2.2 sockfd 可写
handle_write(epoll_fd, e.data.fd, buf);
}
}
}
close(epoll_fd);
}

int main() {
int cli_socket_fd = create_client_socket_fd("127.0.0.1", 6379);
if (cli_socket_fd == -1) {
perror("create client socket failed");
return 1;
}
epoll_loop(cli_socket_fd);
close(cli_socket_fd);
}

总结

APUE 从 high-level 描述了 Unix 环境的概念与接口,包括四大块:

  • IO:无缓冲的文件 IO,有缓冲的标准 IO,多路复用 IO
  • 系统与信号:文件的信息与接口,信号操作
  • 进程与 IPC:进程环境及控制,5 种本地 IPC,与 socket 网络 IPC
  • 线程与同步:线程操作与资源共享,5 种多线程同步工具

It’s time to inspect Redis, let’s go.