rabbitmqctl status | Server status |
rabbitmqctl list_queues | List queues |
rabbitmqctl list_exchanges | List exchanges |
rabbitmqctl list_bindings | List bindings |
rabbitmqctl list_connections | List connections |
rabbitmqctl list_channels | List channels |
rabbitmqctl add_user user pass | Add user |
rabbitmqctl set_permissions -p / user ".*" ".*" ".*" | Set permissions |
rabbitmqctl delete_queue queue_name | Delete queue |
rabbitmqctl purge_queue queue_name | Purge queue |
rabbitmq-plugins enable rabbitmq_management | Enable management UI |
rabbitmq-plugins list | List plugins |
rabbitmq-plugins disable plugin_name | Disable plugin |
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# Declare queue
channel.queue_declare(queue='hello', durable=True)
# Publish message
channel.basic_publish(
exchange='',
routing_key='hello',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
)
)
connection.close() import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='hello', durable=True)
def callback(ch, method, properties, body):
print(f"Received: {body}")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue='hello',
on_message_callback=callback
)
channel.start_consuming() # Routing key must match exactly
channel.exchange_declare(
exchange='direct_logs',
exchange_type='direct'
)
# Bind queue with routing key
channel.queue_bind(
exchange='direct_logs',
queue=queue_name,
routing_key='error'
)
# Publish with routing key
channel.basic_publish(
exchange='direct_logs',
routing_key='error',
body=message
) # Pattern matching with * and #
# * = one word, # = zero or more words
channel.exchange_declare(
exchange='topic_logs',
exchange_type='topic'
)
# Bind with patterns
channel.queue_bind(
exchange='topic_logs',
queue=queue_name,
routing_key='*.error' # matches: app.error
)
channel.queue_bind(
exchange='topic_logs',
queue=queue_name,
routing_key='logs.#' # matches: logs.app.error
) # Broadcast to all bound queues (ignores routing key)
channel.exchange_declare(
exchange='logs',
exchange_type='fanout'
)
# Bind queue (routing_key ignored)
channel.queue_bind(
exchange='logs',
queue=queue_name
)
# Publish (routing_key ignored)
channel.basic_publish(
exchange='logs',
routing_key='',
body=message
) const amqp = require('amqplib');
async function send() {
const conn = await amqp.connect('amqp://localhost');
const channel = await conn.createChannel();
const queue = 'hello';
await channel.assertQueue(queue, { durable: true });
channel.sendToQueue(queue, Buffer.from('Hello!'), {
persistent: true
});
await channel.close();
await conn.close();
}
send(); const amqp = require('amqplib');
async function receive() {
const conn = await amqp.connect('amqp://localhost');
const channel = await conn.createChannel();
const queue = 'hello';
await channel.assertQueue(queue, { durable: true });
await channel.prefetch(1);
channel.consume(queue, (msg) => {
console.log(`Received: ${msg.content.toString()}`);
channel.ack(msg);
});
}
receive(); docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management | Run with management |
docker exec -it rabbitmq rabbitmqctl status | Check status |
version: "3"
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: admin
volumes:
- rabbitmq_data:/var/lib/rabbitmq
volumes:
rabbitmq_data: