一般来说两台服务器之间的通信都可以简单抽象成两个进程之间的通信,而进程之间的通信有管道,消息队列,共享内存等各种的通信方式。我们的故事先从两个进程之间的通信说起。进程之间的通信有管道,消息队列,共享内存等各种通信方式,其中消息队列是一种非常方便于我们理解的方式。比如下面这个例子,我们可以首先创建一个消息队列,然后载创建两个线程 send_tid, receive_tid,它们两个线程分别承担了生产者和消费者的角色,前者发送消息到消息队列,后者从消息队列中获取消息。
在开始之前我们先定义下面的一些表示,包括消息的封装(分为消息类型和内容),消息队列的标识(用来唯一标识这个消息队列)和epoll描述符(我们生产者需要不断从消息队列中epoll出数据)。
struct message {
long msg_type;
char msg_text[256];
};
int msg_id; // 消息队列标识符
int epoll_fd; // epoll描述符
然后我们实现第一个线程用来发送消息到消息队列,在下面这个例子中每隔 2S 写一条消息到消息队列中,因为是 while (1),所以这个线程永远不会退出。
void *send_thread(void *arg) {
struct message msg;
int msg_count = 1;
while (1) {
msg.msg_type = 1;
// 构造消息内容
snprintf(msg.msg_text, sizeof(msg.msg_text), "Message %d from send_thread", msg_count);
// 发送消息到消息队列
if (msgsnd(msg_id, &msg, sizeof(msg), 0) == -1) {
perror("msgsnd");
exit(1);
}
printf("Sent: %s\n", msg.msg_text);
msg_count++;
sleep(2);
}
return NULL;
}
接收消息的线程相比较于发送消息的线程稍微复杂了一点,因为这里用到了 epoll 的系统调用。当消息队列中有事件被监听到之后会通知消费者去处理消息。在这个例子中增加 epoll 的原因是在没有事件发生时,线程会自动进入睡眠状态,不会浪费资源。一旦有事件发生,线程将被唤醒来处理这些事件。这使得 epoll 成为处理大量并发连接的有效工具,因为它能够高效地管理和处理事件而不会占用大量线程资源。
void *receive_thread(void *arg) {
struct epoll_event events[MAX_EVENT_SIZE];
while (1) {
// 使用epoll等待消息队列的可读事件
int num_events = epoll_wait(epoll_fd, events, MAX_EVENT_SIZE, -1);
if (num_events == -1) {
perror("epoll_wait");
exit(1);
}
for (int i = 0; i < num_events; i++) {
if (events[i].data.fd == msg_id && events[i].events & (EPOLLIN | EPOLLOUT)) {
struct message msg;
if (msgrcv(msg_id, &msg, sizeof(msg), 1, 0) == -1) {
perror("msgrcv");
exit(1);
}
printf("Received from Queue: %s\n", msg.msg_text);
}
}
}
return NULL;
}
最后是这个例子的 main 函数,它首先创建了一个消息队列,然后创建了 epoll 实例,并且添加消息队列的可读和可写端到 epoll 事件监听,最后创建了生产者和消费者两个线程。
int main() {
// 创建消息队列
key_t key = ftok("/tmp", 'A');
if (key == -1) {
perror("ftok");
exit(1);
}
msg_id = msgget(key, IPC_CREAT | 0666);
if (msg_id == -1) {
perror("msgget");
exit(1);
}
// 创建epoll实例
epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
perror("epoll_create1");
exit(1);
}
// 添加消息队列的可读和可写端到epoll事件监听
struct epoll_event event;
event.events = EPOLLIN | EPOLLOUT;
event.data.fd = msg_id;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, msg_id, &event) == -1) {
perror("epoll_ctl");
exit(1);
}
// 创建发送线程和接收线程
pthread_t send_tid, receive_tid;
pthread_create(&send_tid, NULL, send_thread, NULL);
pthread_create(&receive_tid, NULL, receive_thread, NULL);
// 等待线程结束
pthread_join(send_tid, NULL);
pthread_join(receive_tid, NULL);
// 删除消息队列
msgctl(msg_id, IPC_RMID, NULL);
// 关闭epoll描述符
close(epoll_fd);
return 0;
}
代码执行的效果如下图所示:
上面这个例子其实就完成了下图所示的这一件事,生产者发送消息到消息队列,然后消费者从消息队列中接收消息。这是进程间通信的一个例子,假入我们把两个进程抽象成不同服务器上的两个进程,是不是也可以通过一个消息队列的方式来实现两个服务器之间的通信呢?答案当然也是肯定的,我们可以把消息队列也放在单独的一台服务器上,这就出现了三台服务器之间在不停地进行信息交互。
以上就是消息服务的开始,主要谈了使用消息队列进行进程间通信。当然上面的示例也可以用其他语言来实现,一般都会比 C 语言简单很多。比如下面的例子就是先创建一个通道,然后一个 goroutine 发消息到通道,主 goroutine 从通道接收消息。值得一提的是下面的例子中在发完消息后关闭了通道,这个时候再往里面写消息就会引发 panic,但是仍然可以从通道里读数据。
package main
import (
"fmt"
"time"
)
var (
messageChannel chan string
)
func sendMsg() {
for i := 1; i <= 3; i++ {
message := fmt.Sprintf("Message %d", i)
messageChannel <- message // 将消息发送到通道
time.Sleep(time.Second) // 等待一秒钟
}
close(messageChannel) // 关闭通道
}
func main() {
messageChannel = make(chan string)
// 启动一个 goroutine 来发送消息到通道
go func() {
sendMsg()
}()
for message := range messageChannel {
fmt.Println("Received:", message)
}
}
代码执行效果如下图所示
Last modified on 2023-09-16