基于RabbitMQ实现的简易IM通讯系统

基于RabbitMQ实现的简易IM通讯系统

系统架构

  1. 每个用户有一个专属的队列用于接收消息

  2. 有一个广播交换机(fanout)用于群发消息

  3. 有一个直接交换机(direct)用于私聊消息

server.go实现

用于创建消息队列

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

type Message struct {
	Sender    string `json:"sender"`
	Recipient string `json:"recipient"` // "all" for broadcast
	Content   string `json:"content"`
	Timestamp int64  `json:"timestamp"`
}

func failOnError(err error, msg string) {
	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}

func setupRabbitMQ() (*amqp.Connection, *amqp.Channel) {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")

	// 声明广播交换机
	err = ch.ExchangeDeclare(
		"broadcast", // name
		"fanout",    // type
		true,        // durable
		false,       // auto-deleted
		false,       // internal
		false,       // no-wait
		nil,         // arguments
	)
	failOnError(err, "Failed to declare an exchange")

	// 声明私聊交换机
	err = ch.ExchangeDeclare(
		"direct_msg", // name
		"direct",     // type
		true,         // durable
		false,        // auto-deleted
		false,        // internal
		false,        // no-wait
		nil,          // arguments
	)
	failOnError(err, "Failed to declare an exchange")

	return conn, ch
}

func handleUserRegistration(ch *amqp.Channel, username string) {
	// 为每个用户创建一个队列
	_, err := ch.QueueDeclare(
		username, // name
		true,     // durable
		false,    // delete when unused
		false,    // exclusive
		false,    // no-wait
		nil,      // arguments
	)
	failOnError(err, "Failed to declare a queue")

	// 将队列绑定到广播交换机
	err = ch.QueueBind(
		username,    // queue name
		"",          // routing key
		"broadcast", // exchange
		false,
		nil,
	)
	failOnError(err, "Failed to bind a queue")

	// 将队列绑定到私聊交换机,使用用户名作为routing key
	err = ch.QueueBind(
		username,      // queue name
		username,      // routing key
		"direct_msg",  // exchange
		false,
		nil,
	)
	failOnError(err, "Failed to bind a queue")
}

func publishMessage(ch *amqp.Channel, msg Message) {
	body, err := json.Marshal(msg)
	failOnError(err, "Failed to marshal message")

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	if msg.Recipient == "all" {
		// 广播消息
		err = ch.PublishWithContext(ctx,
			"broadcast", // exchange
			"",          // routing key
			false,      // mandatory
			false,      // immediate
			amqp.Publishing{
				ContentType: "application/json",
				Body:        body,
			})
		failOnError(err, "Failed to publish a message")
	} else {
		// 私聊消息
		err = ch.PublishWithContext(ctx,
			"direct_msg", // exchange
			msg.Recipient, // routing key
			false,        // mandatory
			false,        // immediate
			amqp.Publishing{
				ContentType: "application/json",
				Body:        body,
			})
		failOnError(err, "Failed to publish a message")
	}
}

func main() {
	conn, ch := setupRabbitMQ()
	defer conn.Close()
	defer ch.Close()

	log.Println("IM Server started. Press CTRL+C to exit.")

	// 模拟用户注册
	users := []string{"user1", "user2", "user3"}
	for _, user := range users {
		handleUserRegistration(ch, user)
	}

	// 模拟消息发送
	for i := 0; i < 5; i++ {
		msg := Message{
			Sender:    "system",
			Recipient: "all",
			Content:   fmt.Sprintf("System message %d", i+1),
			Timestamp: time.Now().Unix(),
		}
		publishMessage(ch, msg)
		time.Sleep(2 * time.Second)
	}

	// 保持服务器运行
	select {}
}

client/receiver.go实现

用于接收消息

package main

import (
	"encoding/json"
	"fmt"
	"log"
	"os"

	amqp "github.com/rabbitmq/amqp091-go"
)

type Message struct {
	Sender    string `json:"sender"`
	Recipient string `json:"recipient"`
	Content   string `json:"content"`
	Timestamp int64  `json:"timestamp"`
}

func failOnError(err error, msg string) {
	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}

func setupReceiver(username string) {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 声明交换机(幂等操作)
	err = ch.ExchangeDeclare(
		"broadcast", "fanout", true, false, false, false, nil)
	failOnError(err, "Failed to declare broadcast exchange")

	err = ch.ExchangeDeclare(
		"direct_msg", "direct", true, false, false, false, nil)
	failOnError(err, "Failed to declare direct message exchange")

	// 声明队列
	_, err = ch.QueueDeclare(
		username, true, false, false, false, nil)
	failOnError(err, "Failed to declare a queue")

	// 绑定队列
	err = ch.QueueBind(
		username, "", "broadcast", false, nil)
	failOnError(err, "Failed to bind to broadcast exchange")

	err = ch.QueueBind(
		username, username, "direct_msg", false, nil)
	failOnError(err, "Failed to bind to direct message exchange")

	msgs, err := ch.Consume(
		username, "", true, false, false, false, nil)
	failOnError(err, "Failed to register a consumer")

	fmt.Printf("用户 [%s] 的消息接收端已启动...\n", username)
	fmt.Println("正在等待消息...")

	for msg := range msgs {
		var message Message
		err := json.Unmarshal(msg.Body, &message)
		if err != nil {
			log.Printf("Error decoding message: %v", err)
			continue
		}

		timestamp := time.Unix(message.Timestamp, 0).Format("15:04:05")
		if message.Recipient == "all" {
			fmt.Printf("[%s] %s (广播): %s\n", timestamp, message.Sender, message.Content)
		} else {
			fmt.Printf("[%s] %s (私聊): %s\n", timestamp, message.Sender, message.Content)
		}
	}
}

func main() {
	fmt.Print("请输入你的用户名: ")
	var username string
	fmt.Scanln(&username)

	if username == "" {
		fmt.Println("用户名不能为空")
		os.Exit(1)
	}

	setupReceiver(username)
}

client/sender.go实现

用于发送消息

package main

import (
	"bufio"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

type Message struct {
	Sender    string `json:"sender"`
	Recipient string `json:"recipient"`
	Content   string `json:"content"`
	Timestamp int64  `json:"timestamp"`
}

func failOnError(err error, msg string) {
	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}

func setupSender() *amqp.Channel {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")

	// 声明交换机(幂等操作)
	err = ch.ExchangeDeclare(
		"broadcast", "fanout", true, false, false, false, nil)
	failOnError(err, "Failed to declare broadcast exchange")

	err = ch.ExchangeDeclare(
		"direct_msg", "direct", true, false, false, false, nil)
	failOnError(err, "Failed to declare direct message exchange")

	return ch
}

func sendMessage(ch *amqp.Channel, sender, recipient, content string) {
	msg := Message{
		Sender:    sender,
		Recipient: recipient,
		Content:   content,
		Timestamp: time.Now().Unix(),
	}

	body, err := json.Marshal(msg)
	failOnError(err, "Failed to marshal message")

	if recipient == "all" {
		err = ch.Publish(
			"broadcast", "", false, false,
			amqp.Publishing{
				ContentType: "application/json",
				Body:        body,
			})
	} else {
		err = ch.Publish(
			"direct_msg", recipient, false, false,
			amqp.Publishing{
				ContentType: "application/json",
				Body:        body,
			})
	}
	failOnError(err, "Failed to publish a message")

	fmt.Printf("消息已发送给 %s\n", recipient)
}

func main() {
	fmt.Print("请输入你的用户名: ")
	var sender string
	fmt.Scanln(&sender)

	if sender == "" {
		fmt.Println("用户名不能为空")
		os.Exit(1)
	}

	ch := setupSender()
	defer ch.Close()

	scanner := bufio.NewScanner(os.Stdin)
	fmt.Println("输入消息格式:")
	fmt.Println("1. 广播消息: 直接输入消息内容")
	fmt.Println("2. 私聊消息: @用户名 消息内容")
	fmt.Println("按Ctrl+C退出")

	for scanner.Scan() {
		text := scanner.Text()
		if text == "" {
			continue
		}

		var recipient, content string
		if len(text) > 1 && text[0] == '@' {
			// 私聊消息
			spaceIndex := -1
			for i, c := range text {
				if c == ' ' {
					spaceIndex = i
					break
				}
			}
			if spaceIndex == -1 {
				recipient = text[1:]
				content = " "
			} else {
				recipient = text[1:spaceIndex]
				content = text[spaceIndex+1:]
			}
		} else {
			// 广播消息
			recipient = "all"
			content = text
		}

		sendMessage(ch, sender, recipient, content)
	}

	if err := scanner.Err(); err != nil {
		log.Printf("读取输入错误: %v", err)
	}
}

广告:

© 版权声明
THE END
喜欢就支持一下吧
点赞5打赏 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容