Statistics
9820
5
1
15d
Badges
Dependencies

Latest Stable Version License Total Downloads Build Status

Nette RabbitMQ

Nette extension for RabbitMQ (using composer package jakubkulhan/bunny)

Example setup

Downloading composer package

composer require gamee/nette-rabbitmq

Extension registration

config.neon:

extensions:
	rabbitmq: Gamee\RabbitMQ\DI\RabbitMQExtension

Example configuration

services:
	- TestConsumer

rabbitmq:
	connections:
		default:
			user: guest
			password: guest
			host: localhost
			port: 5672

	queues:
		testQueue:
			connection: default
			# force queue declare on first queue operation during request
			# autoCreate: true 

	exchanges:
		testExchange:
			connection: default
			type: fanout
			queueBindings:
				testQueue:
					routingKey: testRoutingKey
			# force exchange declare on first exchange operation during request
			# autoCreate: true

	producers:
		testProducer:
			exchange: testExchange
			# queue: testQueue
			contentType: application/json
			deliveryMode: 2 # Producer::DELIVERY_MODE_PERSISTENT

	consumers:
		testConsumer:
			queue: testQueue
			callback: [@TestConsumer, consume]
			qos:
				prefetchSize: 0
				prefetchCount: 5

Declaring Queues and Exchanges

Since v3.0, all queues and exchanges are by default declared on demand using the console command:

php index.php rabbitmq:declareQueuesAndExchanges

It's intended to be a part of the deploy process to make sure all the queues and exchanges are prepared for use.

If you need to override this behavior (for example only declare queues that are used during a request and nothing else), just add the autoCreate: true parameter to queue or exchange of your choice.

You may also want to declare the queues and exchanges via rabbitmq management interface or a script but if you fail to do so, don't run the declare console command and don't specify autoCreate: true, exceptions will be thrown when accessing undeclared queues/exchanges.

Publishing messages

services.neon:

services:
	- TestQueue(@Gamee\RabbitMQ\Client::getProducer(testProducer))

TestQueue.php:

<?php

declare(strict_types=1);

use Gamee\RabbitMQ\Producer\Producer;

final class TestQueue
{

	/**
	 * @var Producer
	 */
	private $testProducer;


	public function __construct(Producer $testProducer)
	{
		$this->testProducer = $testProducer;
	}


	public function publish(string $message): void
	{
		$json = json_encode(['message' => $message]);
		$headers = [];

		$this->testProducer->publish($json, $headers);
	}

}

Consuming messages

Your consumer callback has to return a confirmation that particular message has been acknowledges (or different states - unack, reject).

TestConsumer.php

<?php

declare(strict_types=1);

use Bunny\Message;
use Gamee\RabbitMQ\Consumer\IConsumer;

final class TestConsumer implements IConsumer
{

	public function consume(Message $message): int
	{
		$messageData = json_decode($message->content);

		$headers = $message->headers;

		/**
		 * @todo Some logic here...
		 */

		return IConsumer::MESSAGE_ACK; // Or ::MESSAGE_NACK || ::MESSAGE_REJECT
	}

}

Running a consumer trough CLI

There are two consumer commands prepared. rabbitmq:consumer wiil consume messages for specified amount of time (in seconds). Following command wiil be consuming messages for one hour:

php index.php rabbitmq:consumer testConsumer 3600

rabbitmq:staticConsumer will consume particular amount of messages. Following example will consume just 20 messages:

php index.php rabbitmq:staticConsumer testConsumer 20
  • v3.0.0 v3.0.0

    Added:

    • CLI command to declare queues/exchanges by config file, thanks @31vi5

    BC Breaks:

    • queues and exchanges are no longer declared automatically (default parameter autoCreate is false now)
  • v2.1.0 v2.1.0

    • New option for disabling queue/exchange auto creation, thanks @dada-amater
    • New option for enabling persistent connections, thanks @dada-amater
  • v2.0.0 Fixed exchange message publishing

    Fixed publishing of messages to exchanges.

    Exchanges now accept connection parameter.

    Breaking changes:

    • Producer::publish() method of class now accepts third parameter $routingKey, which allows to propagate messages correctly through exchanges.
    • Exchanges now publish messages in a different way. If you relied on the old (incorrect) way, you will probably have to adjust your code as the old version published one messages for each bound queue with a routing key that was defined in config for the queue binding (instead of message's routing key) basically rendering all exchanges as type fanout in better case, in worse case, if two or more queues shared a routing key, they would have received the message N times where N is the number of queues bound with the same key.
  • v1.3.2 v1.3.2

    • Allow Symfony 4 and bunny 0.3||0.4, thanks @pavelkovar
  • v1.3.1 v1.3.1

    • Added config options: heartbeat, timeout
  • v1.3.0 v1.3.0

    • Added option to specify qos (prefetchSize, prefetchCount)
  • v1.2.9 v1.2.9

    • Functionality and checks related to adding producers data to ProducersDataBag move to ProducersDataBag
  • v1.2.5 v1.2.5

    Enhancements:

    • Added support for ContentType and DeliveryMode properties from producer config
    • Improve StaticConsumer behavior when queue is empty
    • DI extension is loaded using loadConfiguration method

    Thanks, @pavelkovar !

  • v1.2.4 v1.2.4

    Abandon kdyby/console (optional)

    • No need to use strictly kdyby/console anymore. Could use kdyby/console or for example contributte/console :)
    • require symfony/console in composer.json
  • v1.2.3 v1.2.3

    Extension fix

    • ConnectionsHelper: default port fix (string -> int)
  • v1.2.2 v1.2.2

    Consumer.php (Channel::reject()) fix

    • Consumer fix: reject message from queue and do not requeue when returning IConsumer::MESSAGE_REJECT
  • v1.2.0 v1.2.0

    kdyby/console v2.7.x support

  • v1.1.0 v1.1.0

    First official release on GitHub

    • Added license (MIT)
    • Added proper README
    • StaticConsumer - added ability to consume just particular amount of messages

Is this addon outdated? Did you find an issue? Please report it.

Componette Componette admin@componette.com