
系统架构
-
每个用户有一个专属的队列用于接收消息
-
有一个广播交换机(fanout)用于群发消息
-
有一个直接交换机(direct)用于私聊消息
server.go实现
用于创建消息队列
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
type Message struct {
Sender string `json:"sender"`
Recipient string `json:"recipient"` // "all" for broadcast
Content string `json:"content"`
Timestamp int64 `json:"timestamp"`
}
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func setupRabbitMQ() (*amqp.Connection, *amqp.Channel) {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
// 声明广播交换机
err = ch.ExchangeDeclare(
"broadcast", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
// 声明私聊交换机
err = ch.ExchangeDeclare(
"direct_msg", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
return conn, ch
}
func handleUserRegistration(ch *amqp.Channel, username string) {
// 为每个用户创建一个队列
_, err := ch.QueueDeclare(
username, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// 将队列绑定到广播交换机
err = ch.QueueBind(
username, // queue name
"", // routing key
"broadcast", // exchange
false,
nil,
)
failOnError(err, "Failed to bind a queue")
// 将队列绑定到私聊交换机,使用用户名作为routing key
err = ch.QueueBind(
username, // queue name
username, // routing key
"direct_msg", // exchange
false,
nil,
)
failOnError(err, "Failed to bind a queue")
}
func publishMessage(ch *amqp.Channel, msg Message) {
body, err := json.Marshal(msg)
failOnError(err, "Failed to marshal message")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if msg.Recipient == "all" {
// 广播消息
err = ch.PublishWithContext(ctx,
"broadcast", // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: body,
})
failOnError(err, "Failed to publish a message")
} else {
// 私聊消息
err = ch.PublishWithContext(ctx,
"direct_msg", // exchange
msg.Recipient, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: body,
})
failOnError(err, "Failed to publish a message")
}
}
func main() {
conn, ch := setupRabbitMQ()
defer conn.Close()
defer ch.Close()
log.Println("IM Server started. Press CTRL+C to exit.")
// 模拟用户注册
users := []string{"user1", "user2", "user3"}
for _, user := range users {
handleUserRegistration(ch, user)
}
// 模拟消息发送
for i := 0; i < 5; i++ {
msg := Message{
Sender: "system",
Recipient: "all",
Content: fmt.Sprintf("System message %d", i+1),
Timestamp: time.Now().Unix(),
}
publishMessage(ch, msg)
time.Sleep(2 * time.Second)
}
// 保持服务器运行
select {}
}
client/receiver.go实现
用于接收消息
package main
import (
"encoding/json"
"fmt"
"log"
"os"
amqp "github.com/rabbitmq/amqp091-go"
)
type Message struct {
Sender string `json:"sender"`
Recipient string `json:"recipient"`
Content string `json:"content"`
Timestamp int64 `json:"timestamp"`
}
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func setupReceiver(username string) {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明交换机(幂等操作)
err = ch.ExchangeDeclare(
"broadcast", "fanout", true, false, false, false, nil)
failOnError(err, "Failed to declare broadcast exchange")
err = ch.ExchangeDeclare(
"direct_msg", "direct", true, false, false, false, nil)
failOnError(err, "Failed to declare direct message exchange")
// 声明队列
_, err = ch.QueueDeclare(
username, true, false, false, false, nil)
failOnError(err, "Failed to declare a queue")
// 绑定队列
err = ch.QueueBind(
username, "", "broadcast", false, nil)
failOnError(err, "Failed to bind to broadcast exchange")
err = ch.QueueBind(
username, username, "direct_msg", false, nil)
failOnError(err, "Failed to bind to direct message exchange")
msgs, err := ch.Consume(
username, "", true, false, false, false, nil)
failOnError(err, "Failed to register a consumer")
fmt.Printf("用户 [%s] 的消息接收端已启动...\n", username)
fmt.Println("正在等待消息...")
for msg := range msgs {
var message Message
err := json.Unmarshal(msg.Body, &message)
if err != nil {
log.Printf("Error decoding message: %v", err)
continue
}
timestamp := time.Unix(message.Timestamp, 0).Format("15:04:05")
if message.Recipient == "all" {
fmt.Printf("[%s] %s (广播): %s\n", timestamp, message.Sender, message.Content)
} else {
fmt.Printf("[%s] %s (私聊): %s\n", timestamp, message.Sender, message.Content)
}
}
}
func main() {
fmt.Print("请输入你的用户名: ")
var username string
fmt.Scanln(&username)
if username == "" {
fmt.Println("用户名不能为空")
os.Exit(1)
}
setupReceiver(username)
}
client/sender.go实现
用于发送消息
package main
import (
"bufio"
"encoding/json"
"fmt"
"log"
"os"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
type Message struct {
Sender string `json:"sender"`
Recipient string `json:"recipient"`
Content string `json:"content"`
Timestamp int64 `json:"timestamp"`
}
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func setupSender() *amqp.Channel {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
// 声明交换机(幂等操作)
err = ch.ExchangeDeclare(
"broadcast", "fanout", true, false, false, false, nil)
failOnError(err, "Failed to declare broadcast exchange")
err = ch.ExchangeDeclare(
"direct_msg", "direct", true, false, false, false, nil)
failOnError(err, "Failed to declare direct message exchange")
return ch
}
func sendMessage(ch *amqp.Channel, sender, recipient, content string) {
msg := Message{
Sender: sender,
Recipient: recipient,
Content: content,
Timestamp: time.Now().Unix(),
}
body, err := json.Marshal(msg)
failOnError(err, "Failed to marshal message")
if recipient == "all" {
err = ch.Publish(
"broadcast", "", false, false,
amqp.Publishing{
ContentType: "application/json",
Body: body,
})
} else {
err = ch.Publish(
"direct_msg", recipient, false, false,
amqp.Publishing{
ContentType: "application/json",
Body: body,
})
}
failOnError(err, "Failed to publish a message")
fmt.Printf("消息已发送给 %s\n", recipient)
}
func main() {
fmt.Print("请输入你的用户名: ")
var sender string
fmt.Scanln(&sender)
if sender == "" {
fmt.Println("用户名不能为空")
os.Exit(1)
}
ch := setupSender()
defer ch.Close()
scanner := bufio.NewScanner(os.Stdin)
fmt.Println("输入消息格式:")
fmt.Println("1. 广播消息: 直接输入消息内容")
fmt.Println("2. 私聊消息: @用户名 消息内容")
fmt.Println("按Ctrl+C退出")
for scanner.Scan() {
text := scanner.Text()
if text == "" {
continue
}
var recipient, content string
if len(text) > 1 && text[0] == '@' {
// 私聊消息
spaceIndex := -1
for i, c := range text {
if c == ' ' {
spaceIndex = i
break
}
}
if spaceIndex == -1 {
recipient = text[1:]
content = " "
} else {
recipient = text[1:spaceIndex]
content = text[spaceIndex+1:]
}
} else {
// 广播消息
recipient = "all"
content = text
}
sendMessage(ch, sender, recipient, content)
}
if err := scanner.Err(); err != nil {
log.Printf("读取输入错误: %v", err)
}
}
广告:
© 版权声明
THE END
暂无评论内容