Statistics
1310
5
1
5d
Badges
Dependencies

Latest Stable Version License Total Downloads

Nette RabbitMQ

Nette extension for RabbitMQ (using composer package jakubkulhan/bunny)

Example setup

Downloading composer package

composer require gamee/nette-rabbitmq

Extension registration

config.neon:

extensions:
	rabbitmq: Gamee\RabbitMQ\DI\RabbitMQExtension

Example configuration

services:
	- TestConsumer

rabbitmq:
	connections:
		default:
			user: guest
			password: guest
			host: localhost
			port: 5672

	queues:
		testQueue:
			connection: default

	exchanges:
		testExchange:
			type: fanout
			queueBindings:
				testQueue:

	producers:
		testProducer:
			exchange: testExchange
			# queue: testQueue
			contentType: application/json
			deliveryMode: 2 # Producer::DELIVERY_MODE_PERSISTENT

	consumers:
		testConsumer:
			queue: testQueue
			callback: [@TestConsumer, consume]

Publishing messages

Note: Queue will be created automatically after publishing first message.

services.neon:

services:
	- TestQueue(@Gamee\RabbitMQ\Client::getProducer(testProducer))

TestQueue.php:

<?php

declare(strict_types=1);

use Gamee\RabbitMQ\Producer\Producer;

final class TestQueue
{

	/**
	 * @var Producer
	 */
	private $testProducer;


	public function __construct(Producer $testProducer)
	{
		$this->testProducer = $testProducer;
	}


	public function publish(string $message): void
	{
		$json = json_encode(['message' => $message]);
		$headers = [];

		$this->testProducer->publish($json, $headers);
	}

}

Consuming messages

Your consumer callback has to return a confirmation that particular message has been acknowledges (or different states - unack, reject).

TestConsumer.php

<?php

declare(strict_types=1);

use Bunny\Message;
use Gamee\RabbitMQ\Consumer\IConsumer;

final class TestConsumer implements IConsumer
{

	public function consume(Message $message): int
	{
		$messageData = json_decode($message->content);

		$headers = $message->headers;

		/**
		 * @todo Some logic here...
		 */

		return IConsumer::MESSAGE_ACK; // Or ::MESSAGE_NACK || ::MESSAGE_REJECT
	}

}

Running a consumer trough CLI

There are two consumer commands prepared. rabbitmq:consumer wiil consume messages for specified amount of time (in seconds). Following command wiil be consuming messages for one hour:

php index.php rabbitmq:consumer testConsumer 3600

rabbitmq:staticConsumer will consume particular amount of messages. Following example will consume just 20 messages:

php index.php rabbitmq:staticConsumer testConsumer 20
  • v1.2.5 v1.2.5

    Enhancements:

    • Added support for ContentType and DeliveryMode properties from producer config
    • Improve StaticConsumer behavior when queue is empty
    • DI extension is loaded using loadConfiguration method

    Thanks, @pavelkovar !

  • v1.2.4 v1.2.4

    Abandon kdyby/console (optional)

    • No need to use strictly kdyby/console anymore. Could use kdyby/console or for example contributte/console :)
    • require symfony/console in composer.json
  • v1.2.3 v1.2.3

    Extension fix

    • ConnectionsHelper: default port fix (string -> int)
  • v1.2.2 v1.2.2

    Consumer.php (Channel::reject()) fix

    • Consumer fix: reject message from queue and do not requeue when returning IConsumer::MESSAGE_REJECT
  • v1.2.0 v1.2.0

    kdyby/console v2.7.x support

  • v1.1.0 v1.1.0

    First official release on GitHub

    • Added license (MIT)
    • Added proper README
    • StaticConsumer - added ability to consume just particular amount of messages

Is this addon outdated? Did you find an issue? Please report it.

Componette Componette admin@componette.com