Go操作各大消息队列教程(RabbitMQ、Kafka)
package RabbitMQ
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
//amqp:// 账号 密码@地址:端口号/vhost
const MQURL = "amqp://ziyi:ziyi@10.253.50.145:5672/ziyi"
type RabbitMQ struct {
//连接
conn *amqp.Connection
//管道
channel *amqp.Channel
//队列名称
QueueName string
//交换机
Exchange string
//key Simple模式 几乎用不到
Key string
//连接信息
Mqurl string
}
//创建RabbitMQ结构体实例
func NewRabbitMQ(queuename string, exchange string, key string) *RabbitMQ {
rabbitmq := &RabbitMQ{QueueName: queuename, Exchange: exchange, Key: key, Mqurl: MQURL}
var err error
//创建rabbitmq连接
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "创建连接错误!")
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "获取channel失败")
return rabbitmq
}
//断开channel和connection
func (r *RabbitMQ) Destory() {
r.channel.Close()
r.conn.Close()
}
//错误处理函数
func (r *RabbitMQ) failOnErr(err error, message string) {
if err != nil {
log.Fatalf("%s:%s", message, err)
panic(fmt.Sprintf("%s:%s", message, err))
}
}
//简单模式step:1。创建简单模式下RabbitMQ实例
func NewRabbitMQSimple(queueName string) *RabbitMQ {
return NewRabbitMQ(queueName, "", "")
}
//订阅模式创建rabbitmq实例
func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {
//创建rabbitmq实例
rabbitmq := NewRabbitMQ("", exchangeName, "")
var err error
//获取connection
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "failed to connecct rabbitmq!")
//获取channel
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "failed to open a channel!")
return rabbitmq
}
//订阅模式生成
func (r *RabbitMQ) PublishPub(message string) {
//尝试创建交换机,不存在创建
err := r.channel.ExchangeDeclare(
//交换机名称
r.Exchange,
//交换机类型 广播类型
"fanout",
//是否持久化
true,
//是否字段删除
false,
//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
false,
//是否阻塞 true表示要等待服务器的响应
false,
nil,
)
r.failOnErr(err, "failed to declare an excha"+"nge")
//2 发送消息
err = r.channel.Publish(
r.Exchange,
"",
false,
false,
amqp.Publishing{
//类型
ContentType: "text/plain",
//消息
Body: []byte(message),
})
}
//订阅模式消费端代码
func (r *RabbitMQ) RecieveSub() {
//尝试创建交换机,不存在创建
err := r.channel.ExchangeDeclare(
//交换机名称
r.Exchange,
//交换机类型 广播类型
"fanout",
//是否持久化
true,
//是否字段删除
false,
//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
false,
//是否阻塞 true表示要等待服务器的响应
false,
nil,
)
r.failOnErr(err, "failed to declare an excha"+"nge")
//2试探性创建队列,创建队列
q, err := r.channel.QueueDeclare(
"", //随机生产队列名称
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "Failed to declare a queue")
//绑定队列到exchange中
err = r.channel.QueueBind(
q.Name,
//在pub/sub模式下,这里的key要为空
"",
r.Exchange,
false,
nil,
)
//消费消息
message, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for d := range message {
log.Printf("Received a message:%s,", d.Body)
}
}()
fmt.Println("退出请按 Ctrl+C")
<-forever
}
//话题模式 创建RabbitMQ实例
func NewRabbitMQTopic(exchagne string, routingKey string) *RabbitMQ {
//创建rabbitmq实例
rabbitmq := NewRabbitMQ("", exchagne, routingKey)
var err error
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "failed to connect rabbingmq!")
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "failed to open a channel")
return rabbitmq
}
//话题模式发送信息
func (r *RabbitMQ) PublishTopic(message string) {
//尝试创建交换机,不存在创建
err := r.channel.ExchangeDeclare(
//交换机名称
r.Exchange,
//交换机类型 话题模式
"topic",
//是否持久化
true,
//是否字段删除
false,
//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
false,
//是否阻塞 true表示要等待服务器的响应
false,
nil,
)
r.failOnErr(err, "topic failed to declare an excha"+"nge")
//2发送信息
err = r.channel.Publish(
r.Exchange,
//要设置
r.Key,
false,
false,
amqp.Publishing{
//类型
ContentType: "text/plain",
//消息
Body: []byte(message),
})
}
//话题模式接收信息
//要注意key
//其中* 用于匹配一个单词,#用于匹配多个单词(可以是零个)
//匹配 表示匹配imooc.* 表示匹配imooc.hello,但是imooc.hello.one需要用imooc.#才能匹配到
func (r *RabbitMQ) RecieveTopic() {
//尝试创建交换机,不存在创建
err := r.channel.ExchangeDeclare(
//交换机名称
r.Exchange,
//交换机类型 话题模式
"topic",
//是否持久化
true,
//是否字段删除
false,
//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
false,
//是否阻塞 true表示要等待服务器的响应
false,
nil,
)
r.failOnErr(err, "failed to declare an exchange")
//2试探性创建队列,创建队列
q, err := r.channel.QueueDeclare(
"", //随机生产队列名称
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "Failed to declare a queue")
//绑定队列到exchange中
err = r.channel.QueueBind(
q.Name,
//在pub/sub模式下,这里的key要为空
r.Key,
r.Exchange,
false,
nil,
)
//消费消息
message, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for d := range message {
log.Printf("Received a message:%s,", d.Body)
}
}()
fmt.Println("退出请按 Ctrl+C")
<-forever
}
//路由模式 创建RabbitMQ实例
func NewRabbitMQRouting(exchagne string, routingKey string) *RabbitMQ {
//创建rabbitmq实例
rabbitmq := NewRabbitMQ("", exchagne, routingKey)
var err error
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "failed to connect rabbingmq!")
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "failed to open a channel")
return rabbitmq
}
//路由模式发送信息
func (r *RabbitMQ) PublishRouting(message string) {
//尝试创建交换机,不存在创建
err := r.channel.ExchangeDeclare(
//交换机名称
r.Exchange,
//交换机类型 广播类型
"direct",
//是否持久化
true,
//是否字段删除
false,
//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
false,
//是否阻塞 true表示要等待服务器的响应
false,
nil,
)
r.failOnErr(err, "failed to declare an excha"+"nge")
//发送信息
err = r.channel.Publish(
r.Exchange,
//要设置
r.Key,
false,
false,
amqp.Publishing{
//类型
ContentType: "text/plain",
//消息
Body: []byte(message),
})
}
//路由模式接收信息
func (r *RabbitMQ) RecieveRouting() {
//尝试创建交换机,不存在创建
err := r.channel.ExchangeDeclare(
//交换机名称
r.Exchange,
//交换机类型 广播类型
"direct",
//是否持久化
true,
//是否字段删除
false,
//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
false,
//是否阻塞 true表示要等待服务器的响应
false,
nil,
)
r.failOnErr(err, "failed to declare an excha"+"nge")
//2试探性创建队列,创建队列
q, err := r.channel.QueueDeclare(
"", //随机生产队列名称
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "Failed to declare a queue")
//绑定队列到exchange中
err = r.channel.QueueBind(
q.Name,
//在pub/sub模式下,这里的key要为空
r.Key,
r.Exchange,
false,
nil,
)
//消费消息
message, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for d := range message {
log.Printf("Received a message:%s,", d.Body)
}
}()
fmt.Println("退出请按 Ctrl+C")
<-forever
}
//简单模式Step:2、简单模式下生产代码
func (r *RabbitMQ) PublishSimple(message string) {
//1、申请队列,如果队列存在就跳过,不存在创建
//优点:保证队列存在,消息能发送到队列中
_, err := r.channel.QueueDeclare(
//队列名称
r.QueueName,
//是否持久化
false,
//是否为自动删除 当最后一个消费者断开连接之后,是否把消息从队列中删除
false,
//是否具有排他性 true表示自己可见 其他用户不能访问
false,
//是否阻塞 true表示要等待服务器的响应
false,
//额外数据
nil,
)
if err != nil {
fmt.Println(err)
}
//2.发送消息到队列中
r.channel.Publish(
//默认的Exchange交换机是default,类型是direct直接类型
r.Exchange,
//要赋值的队列名称
r.QueueName,
//如果为true,根据exchange类型和routkey规则,如果无法找到符合条件的队列那么会把发送的消息返回给发送者
false,
//如果为true,当exchange发送消息到队列后发现队列上没有绑定消费者,则会把消息还给发送者
false,
//消息
amqp.Publishing{
//类型
ContentType: "text/plain",
//消息
Body: []byte(message),
})
}
func (r *RabbitMQ) ConsumeSimple() {
//1、申请队列,如果队列存在就跳过,不存在创建
//优点:保证队列存在,消息能发送到队列中
_, err := r.channel.QueueDeclare(
//队列名称
r.QueueName,
//是否持久化
false,
//是否为自动删除 当最后一个消费者断开连接之后,是否把消息从队列中删除
false,
//是否具有排他性
false,
//是否阻塞
false,
//额外数据
nil,
)
if err != nil {
fmt.Println(err)
}
//接收消息
msgs, err := r.channel.Consume(
r.QueueName,
//用来区分多个消费者
"",
//是否自动应答
true,
//是否具有排他性
false,
//如果设置为true,表示不能同一个connection中发送的消息传递给这个connection中的消费者
false,
//队列是否阻塞
false,
nil,
)
if err != nil {
fmt.Println(err)
}
forever := make(chan bool)
//启用协程处理
go func() {
for d := range msgs {
//实现我们要处理的逻辑函数
log.Printf("Received a message:%s", d.Body)
//fmt.Println(d.Body)
}
}()
log.Printf("【*】warting for messages, To exit press CCTRAL+C")
<-forever
}
func (r *RabbitMQ) ConsumeWorker(consumerName string) {
//1、申请队列,如果队列存在就跳过,不存在创建
//优点:保证队列存在,消息能发送到队列中
_, err := r.channel.QueueDeclare(
//队列名称
r.QueueName,
//是否持久化
false,
//是否为自动删除 当最后一个消费者断开连接之后,是否把消息从队列中删除
false,
//是否具有排他性
false,
//是否阻塞
false,
//额外数据
nil,
)
if err != nil {
fmt.Println(err)
}
//接收消息
msgs, err := r.channel.Consume(
r.QueueName,
//用来区分多个消费者
consumerName,
//是否自动应答
true,
//是否具有排他性
false,
//如果设置为true,表示不能同一个connection中发送的消息传递给这个connection中的消费者
false,
//队列是否阻塞
false,
nil,
)
if err != nil {
fmt.Println(err)
}
forever := make(chan bool)
//启用协程处理
go func() {
for d := range msgs {
//实现我们要处理的逻辑函数
log.Printf("%s Received a message:%s", consumerName, d.Body)
//fmt.Println(d.Body)
}
}()
log.Printf("【*】warting for messages, To exit press CCTRAL+C")
<-forever
}