Configure message queue with Redis

If several processes are to be used to process messages from the Shopware 6 message queue, it must be ensured that identical messages are not processed by the same consumers. This requires the use of a technology that supports the atomic blocking of messages. Redis](https://marketplace.creoline.com/de/app/redis) or RabbitMQ are suitable for this purpose.


Prerequisites

  • Redis server
  • Shopware server

Preparation in Shopware

First, the Admin Worker must be deactivated so that the message queue can be processed via a background process. More information on deactivating the Admin Worker and the initial setup via a 'supervisor process' can be found here: Deactivate Shopware 6 Frontend Message Queue Worker.


Configuration of the Shopware 6 Background Queue Worker

The configuration of the supervisor processes is covered in the following help center articles:



Configuration in Redis

As message queue jobs do not have a TTL as in Redis by default, it is recommended to use suitable key-eviction-policies, the configuration of which is described in the linked help center article.


If the message queue is operated on a dedicated server that is used exclusively for this purpose, the standard noeviction policy can be used without any problems.


Alternatively, it is possible to operate the message queue together with the app cache on the same Redis server. In this case, however, the key eviction policy volatile-lru should be selected. This ensures that only cache entries are removed when the maxmemory limit is reached, as only these have a TTL. The jobs in the message queue are therefore retained and are not deleted unintentionally.



Configuration in Shopware

Create a new configuration file under the path: config/packages/messenger.yaml. This configuration file can be used to configure the Shopware Messenger transport service.


Then add the following content to the file:

Up to Shopware 6.4

parameters:
  env(MESSENGER_CONSUMER_NAME): 'consumer'

framework:
  messenger:
    transports:
      default:
        dsn: "redis://%env(REDIS_HOST_MESSENGER)%:%env(REDIS_PORT_MESSENGER)%/messages/symfony/consumer-%env(MESSENGER_CONSUMER_NAME)%/?delete_after_ack=true&delete_after_reject=true&dbindex=%env(REDIS_DB_MESSENGER)%"


From Shopware 6.5

# File: config/packages/messenger.yaml

parameters:
  env(MESSENGER_CONSUMER_NAME): 'consumer'

framework:
  messenger:
    transports:
      # shopware default transports
      async:
        dsn: "redis://%env(REDIS_HOST_MESSENGER)%:%env(REDIS_PORT_MESSENGER)%/messages_async?dbindex=%env(REDIS_DB_MESSENGER)%"
        options:
          consumer: "%env(MESSENGER_CONSUMER_NAME)%"
      failed:
        dsn: "redis://%env(REDIS_HOST_MESSENGER)%:%env(REDIS_PORT_MESSENGER)%/messages_failed?dbindex=%env(REDIS_DB_MESSENGER)%"
        options:
          consumer: "%env(MESSENGER_CONSUMER_NAME)%"
      low_priority:
        dsn: "redis://%env(REDIS_HOST_MESSENGER)%:%env(REDIS_PORT_MESSENGER)%/messages_low_priority?dbindex=%env(REDIS_DB_MESSENGER)%"
        options:
          consumer: "%env(MESSENGER_CONSUMER_NAME)%"


If further (future new) transports have to be processed via Redis, these must also be configured as specified above in messenger.yaml.


Description of a transport

The transport DSN variant we use for Redis is structured as follows:

<transport-name>:
  dsn: "redis://<hostname>:<port>/<stream_key_name>?<options>"
  options:
    consumer: "custom-consumer-name-defined-in-environment-variable"


Example for async

async:
  dsn: "redis://%env(REDIS_HOST_MESSENGER)%:%env(REDIS_PORT_MESSENGER)%/messages_async?dbindex=%env(REDIS_DB_MESSENGER)%"
  options:
    consumer: "%env(MESSENGER_CONSUMER_NAME)%"


Variable Description
redis:// Defines the protocol for Symfony Redis transport
%(REDIS_HOST_MESSENGER)% Hostname or IP address to the Redis server definition in the .env.local
%(REDIS_PORT_MESSENGER)% TCP port for Redis connections (default: 6379) Definition in the .env.local
messages_async Redis Keyname (recommended to be named in the format after the transport)
%(MESSENGER_CONSUMER_NAME)% Unique name of the consumer (per consumer)Definition in the supervisor processes
dbindex Defines the Redis database index
%(REDIS_DB_MESSENGER)% Redis database for the messenger definition in the .env.local


The environment variable MESSENGER_CONSUMER_NAME must be overwritten by the supervisor service so that the execution of several simultaneous consumers can also be supported.


The variables used here must then be added to the Shopware environment configuration .env.local. To do this, open the .env.local file in the Shopware root directory and add the following part to the existing configuration:

# [...]

REDIS_HOST_MESSENGER=127.0.0.1
REDIS_PORT_MESSENGER=6379
REDIS_DB_MESSENGER=1


Make sure that the Redis database 1 is free and is not used by any other configuration.


Then save the file and empty the Shopware cache via the console:

bin/console cache:clear


The message queue is then executed via Redis.



Processing of old jobs in the database (not Redis, but MariaDB/MySQL)

After switching the message queue to Redis, it is possible that old jobs are forgotten in the database, the execution of which may be in the distant future.


How do I check whether old jobs need to be processed?

To answer this question, we recommend taking a look at the Shopware database table messenger_messages.

Plesk

Call up your Plesk Control Panel, navigate to the database overview of the corresponding Plesk subscription (Websites & Domains โ†’ Domain โ†’ Dashboard โ†’ Databases) and call up the phpMyAdmin button for your Shopware database.


Next, enter messenger_messages in the search text field at the top left, confirm the entry with Enter and select the table in the left column with the mouse as shown in the screenshot:




Command line

Log in to your database server via SSH and execute the following command:


MariaDB

mariadb -u <database_user> -p -h 127.0.0.1 --port 3306 <database_name>

MySQL

mysql -u <database_user> -p -h 127.0.0.1 --port 3306 <database_name>


Within the MySQL or MariaDB shell, you can output the jobs as follows:

SELECT * FROM messenger_messages;



How do I work off the old jobs?

As you can see in our example, there are some jobs that are still in the database after the changeover. In order to complete the changeover to Redis, these jobs should still be processed in the future, especially if they are jobs such as payment reminders after 14 days.

For this purpose, separate transports must be defined, through which the bin/console messenger:consume command accesses the database and not Redis during processing.

The queue_name column in the database table is essential here. In our example screenshot, we find default and low_priority.


Accordingly, we must define two user-defined transports in messenger.yaml as follows:

# File: config/packages/messenger.yaml
# [...]
      # user-defined transport 1
      doctrine_low_priority: # Name of the transport, visible in messenger:consume (freely selectable)
        dsn: "%env(MESSENGER_TRANSPORT_DSN)%" # URL for processing
        options:    # Option definition
          queue_name: low_priority # Definition of the queue name

      # user-defined transport 2
      doctrine_default:
        dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
        options:
          queue_name: default
# [...]


Please pay attention to the correct YAML syntax. The spaces before the definitions must consist of spaces, 2 spaces for each level.


The environment variable MESSENGER_TRANSPORT_DSN is predefined and already contains the value doctrine://default required for accessing Doctrine.


A transport must be defined for each different queue_name in the database. If there are other names in the queue_name column, these must be adapted in the queue_name: option.


Then save the file and empty the Shopware cache via the console:

bin/console cache:clear


The doctrine_low_priority and doctrine_default transports must then be added to the existing supervisor configuration:

# Shopware Consumer
[program:shopware-consumer-doctrine-old]
process_name=%(program_name)s_%(process_num)02d
numprocs=1
command= /var/www/vhosts/<your-domain.com>/.phpenv/shims/php /var/www/vhosts/<your-domain.com>/httpdocs/bin/console messenger:consume doctrine_default doctrine_low_priority --time-limit=120 --memory-limit=512M
environment=MESSENGER_CONSUMER_NAME="%(program_name)s_%(process_num)02d"
user=<ssh-user>
autostart=true
autorestart=true
startsecs=5
startretries=10
stopwaitsecs=3600
redirect_stderr=true
stdout_logfile=/var/log/supervisor/%(program_name)s.log


Further information on setting up the background queue workers can be found in this Help Center article.



How long do I have to process the user-defined transports with the Messenger Consumer?

The following command outputs the line with the oldest date in the available_at column, which you can use to see when the last job should be executed.

SELECT * FROM messenger_messages ORDER BY available_at DESC LIMIT 1;


If the date is already in the past, these entries are quickly removed from the database after the transports have been processed in the message consumer. Entries with a date in the future remain in the database until this time.


Plesk

Select the SQL button in the top bar, insert above command in the text field and confirm the entry with the OK button at the bottom right.



SQL

Open the MySQL or MariaDB shell as described in the check and execute the above command in it.


Result

The output is a line whose available_at column contains the date on which the last job from the message queue was processed in the database.


If no more jobs appear in the database after the date of the last one, the changeover has been successfully implemented and you can remove the receivers from the Background Queue Workers again. To do this, simply remove the above supervisor configuration from the corresponding file.