diff --git a/.github/workflows/codecov.yml b/.github/workflows/codecov.yml
index c1c41c3..fb293b8 100644
--- a/.github/workflows/codecov.yml
+++ b/.github/workflows/codecov.yml
@@ -17,6 +17,7 @@ jobs:
php:
- "8.2"
- "8.3"
+ - "8.4"
steps:
- name: Checkout
@@ -26,6 +27,7 @@ jobs:
uses: shivammathur/setup-php@v2
with:
php-version: "${{ matrix.php }}"
+ extensions: swoole
coverage: pcov
ini-values: assert.exception=1, zend.assertions=1, error_reporting=-1, log_errors_max_len=0, display_errors=On
tools: composer:v2, cs2pr
diff --git a/.github/workflows/qodana_code_quality.yml b/.github/workflows/qodana_code_quality.yml
index 325174a..d341e2d 100644
--- a/.github/workflows/qodana_code_quality.yml
+++ b/.github/workflows/qodana_code_quality.yml
@@ -15,7 +15,7 @@ jobs:
checks: write
strategy:
matrix:
- php-versions: [ '8.2', '8.3']
+ php-versions: [ '8.2', '8.3', '8.4']
steps:
- uses: actions/checkout@v4
with:
diff --git a/.github/workflows/static-analysis.yml b/.github/workflows/static-analysis.yml
index 8a1e162..b9d4c50 100644
--- a/.github/workflows/static-analysis.yml
+++ b/.github/workflows/static-analysis.yml
@@ -17,6 +17,7 @@ jobs:
php:
- "8.2"
- "8.3"
+ - "8.4"
steps:
- name: Checkout
@@ -50,4 +51,4 @@ jobs:
mv config/autoload/swoole.local.php.dist config/autoload/swoole.local.php
mv config/autoload/log.local.php.dist config/autoload/log.local.php
- name: Run static analysis with PHPStan
- run: vendor/bin/phpstan analyse
+ run: vendor/bin/phpstan analyse
diff --git a/.laminas-ci.json b/.laminas-ci.json
index 64be8c2..af5962f 100644
--- a/.laminas-ci.json
+++ b/.laminas-ci.json
@@ -1,8 +1,10 @@
{
"additional_composer_arguments": [
- "--no-scripts",
- "--no-plugins"
+ "--no-scripts"
+ ],
+ "extensions": [
+ "redis"
],
"ignore_php_platform_requirements": {
}
-}
\ No newline at end of file
+}
diff --git a/.laminas-ci/pre-run.sh b/.laminas-ci/pre-run.sh
index 8b8528d..b7ba4af 100755
--- a/.laminas-ci/pre-run.sh
+++ b/.laminas-ci/pre-run.sh
@@ -1,5 +1,5 @@
-#!/bin/bash
+JOB=$3
+PHP_VERSION=$(echo "${JOB}" | jq -r '.php')
-# Due to the fact that we are disabling plugins when installing/updating/downgrading composer dependencies
-# we have to manually enable the coding standard here.
-composer enable-codestandard
+apt update
+apt install -y "php${PHP_VERSION}-swoole"
diff --git a/README.md b/README.md
index abe26d3..e719d43 100644
--- a/README.md
+++ b/README.md
@@ -3,10 +3,20 @@
> [!IMPORTANT]
> Dotkernel component used to queue tasks to be processed asynchronously based on [netglue/laminas-messenger](https://github.com/netglue/laminas-messenger)
+A queue system is a vital part in modern web applications that enables the decoupling of certain tasks from the regular request-response cycle.
+
+This is especially useful for time-consuming and resource-intensive operations which are thus handled asynchronously by background workers on a separate system.
+
+The greatest benefit is to application responsiveness, which allows faster execution, while the heavy lifting is scheduled in the queue based on available resources.
+
+The queue system uses logs to ensure maintainability and implements retry features for reliability and stability.
+
+
+
## Badges

-
+
[](https://github.com/dotkernel/queue/issues)
[](https://github.com/dotkernel/queue/network)
@@ -18,10 +28,6 @@
[](https://github.com/dotkernel/queue/actions/workflows/qodana_code_quality.yml)
[](https://github.com/dotkernel/queue/actions/workflows/static-analysis.yml)
-## Installation
-
-> Until we have a compiled documentation, read the files from /doc/book/v1 folder
-
## Documentation
Documentation is available at: https://docs.dotkernel.org/queue-documentation
diff --git a/composer.json b/composer.json
index c0c1ab6..46ae5ed 100644
--- a/composer.json
+++ b/composer.json
@@ -43,7 +43,7 @@
}
},
"require": {
- "php": "~8.2.0 || ~8.3.0 || ~8.4",
+ "php": "~8.2.0 || ~8.3.0 || ~8.4.0",
"dotkernel/dot-cli": "^3.9",
"dotkernel/dot-dependency-injection": "^1.2",
"dotkernel/dot-errorhandler": "4.2.1",
@@ -55,12 +55,12 @@
},
"require-dev": {
"laminas/laminas-coding-standard": "^3.0",
- "phpunit/phpunit": "^10.5.45",
- "roave/security-advisories": "dev-master",
- "swoole/ide-helper": "~5.0.0",
"phpstan/phpstan": "^2.0",
"phpstan/phpstan-doctrine": "^2.0",
- "phpstan/phpstan-phpunit": "^2.0"
+ "phpstan/phpstan-phpunit": "^2.0",
+ "phpunit/phpunit": "^10.5.45",
+ "roave/security-advisories": "dev-master",
+ "swoole/ide-helper": "~5.0.0"
},
"autoload": {
"psr-4": {
@@ -69,7 +69,8 @@
},
"autoload-dev": {
"psr-4": {
- "QueueTest\\Swoole\\": "test/Swoole"
+ "QueueTest\\App\\": "test/App/",
+ "QueueTest\\Swoole\\": "test/Swoole/"
}
},
"scripts": {
@@ -82,6 +83,6 @@
"cs-fix": "phpcbf",
"test": "phpunit --colors=always",
"test-coverage": "phpunit --colors=always --coverage-clover clover.xml",
- "static-analysis": "phpstan analyse"
+ "static-analysis": "phpstan analyse --memory-limit 1G"
}
}
diff --git a/config/autoload/cli.global.php b/config/autoload/cli.global.php
index 08644ea..aefbeb9 100644
--- a/config/autoload/cli.global.php
+++ b/config/autoload/cli.global.php
@@ -3,6 +3,9 @@
declare(strict_types=1);
use Dot\Cli\FileLockerInterface;
+use Queue\Swoole\Command\GetFailedMessagesCommand;
+use Queue\Swoole\Command\GetProcessedMessagesCommand;
+use Queue\Swoole\Command\GetQueuedMessagesCommand;
use Queue\Swoole\Command\StartCommand;
use Queue\Swoole\Command\StopCommand;
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
@@ -17,6 +20,9 @@
"swoole:stop" => StopCommand::class,
"messenger:start" => ConsumeMessagesCommand::class,
"messenger:debug" => DebugCommand::class,
+ "processed" => GetProcessedMessagesCommand::class,
+ "failed" => GetFailedMessagesCommand::class,
+ "inventory" => GetQueuedMessagesCommand::class,
],
],
FileLockerInterface::class => [
diff --git a/config/autoload/local.php.dist b/config/autoload/local.php.dist
index 11fc0cb..3de5fb1 100644
--- a/config/autoload/local.php.dist
+++ b/config/autoload/local.php.dist
@@ -9,7 +9,7 @@
declare(strict_types=1);
-$baseUrl = 'http://queue.dotkernel.net';
+$baseUrl = 'https://queue.dotkernel.net';
$databases = [
'default' => [
@@ -43,7 +43,13 @@ return [
'protocol' => 'tcp',
'host' => 'localhost',
'port' => '8556',
- 'eof' => "\n",
+ 'eof' => PHP_EOL,
],
],
+ //delay time until the message is added back to the queue if an error occurs during processing
+ 'fail-safe' => [
+ 'first_retry' => 3600000, // 1h
+ 'second_retry' => 43200000, // 12h
+ 'third_retry' => 86400000, // 24h
+ ],
];
diff --git a/config/autoload/log.local.php.dist b/config/autoload/log.local.php.dist
index 1422cc8..e88e592 100644
--- a/config/autoload/log.local.php.dist
+++ b/config/autoload/log.local.php.dist
@@ -1,5 +1,7 @@
[
'writers' => [
'FileWriter' => [
- 'name' => 'stream',
- 'level' => \Dot\Log\Logger::ALERT, // this is equal to 1
+ 'name' => 'stream',
+ 'level' => Logger::ALERT,
'options' => [
- 'stream' => __DIR__ . '/../../log/queue-log.log',
+ 'stream' => __DIR__ . '/../../log/queue-log.log',
'formatter' => [
'name' => Json::class,
],
],
],
],
- ]
+ ],
],
],
];
diff --git a/config/autoload/messenger.local.php.dist b/config/autoload/messenger.local.php.dist
index 4268d3b..78a2c4a 100644
--- a/config/autoload/messenger.local.php.dist
+++ b/config/autoload/messenger.local.php.dist
@@ -1,25 +1,41 @@
[
+ "symfony" => [
"messenger" => [
- "transports" => [
- "redis_transport" => [
- 'dsn' => 'redis://127.0.0.1:6379/messages',
- 'options' => [], // Redis specific options
+ 'transports' => [
+ 'redis_transport' => [
+ 'dsn' => 'redis://127.0.0.1:6379/messages',
+ 'serializer' => SymfonySerializer::class,
+ 'retry_strategy' => [
+ 'max_retries' => 3, //maximum number of times a message will be retried after the first failure
+ 'delay' => 1000, // initial delay before retrying a failed message, in milliseconds
+ 'multiplier' => 2, // factor to increase the delay for each subsequent retry
+ 'max_delay' => 0, // maximum delay between retries, in milliseconds
+ ],
+ ],
+ // separate transport for failed messages
+ 'failed' => [
+ 'dsn' => 'redis://127.0.0.1:6379/failed',
'serializer' => SymfonySerializer::class,
- ]
- ]
- ]
+ ],
+ ],
+ // tells Messenger that the transport to store failed messages is "failed"
+ 'failure_transport' => 'failed',
+ ],
],
"dependencies" => [
"factories" => [
- "redis_transport" => [TransportFactory::class, 'redis_transport'],
- SymfonySerializer::class => fn(\Psr\Container\ContainerInterface $container) => new PhpSerializer()
- ]
- ]
-];
\ No newline at end of file
+ "redis_transport" => [TransportFactory::class, 'redis_transport'],
+ "failed" => [TransportFactory::class, 'failed'],
+ SymfonySerializer::class => fn(ContainerInterface $container) => new PhpSerializer(),
+ ],
+ ],
+];
diff --git a/config/autoload/swoole.local.php.dist b/config/autoload/swoole.local.php.dist
index 84262fe..7085ee0 100644
--- a/config/autoload/swoole.local.php.dist
+++ b/config/autoload/swoole.local.php.dist
@@ -1,9 +1,10 @@
[
- // Available in Swoole 4.1 and up; enables coroutine support
- // for most I/O operations:
+ // Available in Swoole 4.1 and up; enables coroutine support for most I/O operations:
'enable_coroutine' => true,
// Configure Swoole TCP Server:
@@ -13,17 +14,17 @@ return [
'mode' => SWOOLE_BASE, // SWOOLE_BASE or SWOOLE_PROCESS;
// SWOOLE_BASE is the default
'protocol' => SWOOLE_SOCK_TCP, // SWOOLE_SSL, // SSL-enable the server
- 'options' => [
+ 'options' => [
// Set the SSL certificate and key paths for SSL support:
//'ssl_cert_file' => 'path/to/ssl.crt',
//'ssl_key_file' => 'path/to/ssl.key',
- // Whether or not the HTTP server should use coroutines;
+ // Whether the HTTP server should use coroutines;
// enabled by default, and generally should not be disabled:
- 'package_eof' => "\n",
- 'open_eof_check' => true,
+ 'package_eof' => PHP_EOL,
+ 'open_eof_check' => true,
'open_length_check' => true,
- // in order to run swoole as daemon
+ // to run swoole as a daemon
'daemonize' => true,
// Overwrite the default location of the pid file;
diff --git a/config/autoload/templates.global.php b/config/autoload/templates.global.php
deleted file mode 100644
index 6b1f459..0000000
--- a/config/autoload/templates.global.php
+++ /dev/null
@@ -1,43 +0,0 @@
- [
- 'factories' => [
- DateExtension::class => InvokableFactory::class,
- Environment::class => TwigEnvironmentFactory::class,
- TemplateRendererInterface::class => TwigRendererFactory::class,
- TranslationExtension::class => InvokableFactory::class,
- ],
- ],
- 'debug' => false,
- 'templates' => [
- 'extension' => 'html.twig',
- ],
- 'twig' => [
- 'assets_url' => '/',
- 'assets_version' => null,
- 'auto_reload' => true,
- 'autoescape' => 'html',
- 'cache_dir' => 'data/cache/twig',
- 'extensions' => [
- DateExtension::class,
- TranslationExtension::class,
- ],
- 'globals' => [
- 'appName' => $app['name'] ?? '',
- ],
- 'optimizations' => -1,
- 'runtime_loaders' => [],
-// 'timezone' => '',
- ],
-];
diff --git a/daemon/messenger.service b/daemon/messenger.service
index f1dcea0..daa7fb2 100644
--- a/daemon/messenger.service
+++ b/daemon/messenger.service
@@ -12,4 +12,4 @@ WorkingDirectory=/home/dotkernel/queue/
ExecStart=/usr/bin/php /home/dotkernel/queue/bin/cli.php messenger:start
[Install]
-WantedBy=swoole.service
\ No newline at end of file
+WantedBy=swoole.service
diff --git a/daemon/swoole.service b/daemon/swoole.service
index 21ad0c2..b6d2c90 100644
--- a/daemon/swoole.service
+++ b/daemon/swoole.service
@@ -12,4 +12,4 @@ WorkingDirectory=/home/dotkernel/queue/
ExecStart=/usr/bin/php /home/dotkernel/queue/bin/cli.php swoole:start
[Install]
-WantedBy=multi-user.target
\ No newline at end of file
+WantedBy=multi-user.target
diff --git a/docs/book/index.md b/docs/book/index.md
deleted file mode 100644
index ae42a26..0000000
--- a/docs/book/index.md
+++ /dev/null
@@ -1 +0,0 @@
-../../README.md
diff --git a/docs/book/v1/installation.md b/docs/book/v1/installation.md
deleted file mode 100644
index f594fb7..0000000
--- a/docs/book/v1/installation.md
+++ /dev/null
@@ -1,43 +0,0 @@
-## INSTALLATION
-
-### git clone
-
-`git clone -b default-queue https://github.com/dotkernel/queue.git`
-
-### Edit .dist files from config/autoload folder
-
-- local.php.dist
-- log.local.dist
-- messenger.local.php.dist
-- swoole.local.php.dist
-
-## Run Composer
-
-`composer install --no-dev`
-
-### Create services ( daemon)
-
-- Edit the files from `/daemon` folder and set proper paths
-- copy them in /etc/systemd/system/
-
-`sudo cp /home/dotkernel/queue/daemon\swoole.service`
-
-`sudo cp /home/dotkernel/queue/daemon\messenger.service`
-
-
-### Start the daemon
-
-`sudo systemctl daemon-reload`
-
-`sudo systemctl enable swoole.service`
-
-`sudo systemctl start swoole.service`
-
-`sudo systemctl status swoole.service`
-
-
-### Testing the installation
-
-Send a request from your local machine
-
-`echo "Hello" | socat -T1 - TCP:SERVER-IP:8556`
diff --git a/docs/book/v1/overview.md b/docs/book/v1/overview.md
deleted file mode 100644
index f6f4aeb..0000000
--- a/docs/book/v1/overview.md
+++ /dev/null
@@ -1,17 +0,0 @@
-> [!IMPORTANT]
-> Dotkernel component used to queue tasks to be processed asynchronously based on [netglue/laminas-messenger](https://github.com/netglue/laminas-messenger)
-
-## Badges
-
-
-
-
-[](https://github.com/dotkernel/queue/issues)
-[](https://github.com/dotkernel/queue/network)
-[](https://github.com/dotkernel/queue/stargazers)
-[](https://github.com/dotkernel/queue/blob/1.0/LICENSE.md)
-
-[](https://github.com/mezzio/mezzio-skeleton/actions/workflows/continuous-integration.yml)
-[](https://codecov.io/gh/dotkernel/queue)
-[](https://github.com/dotkernel/queue/actions/workflows/qodana_code_quality.yml)
-[](https://github.com/dotkernel/queue/actions/workflows/static-analysis.yml)
\ No newline at end of file
diff --git a/docs/book/v1/server-setup.md b/docs/book/v1/server-setup.md
deleted file mode 100644
index f1b1f33..0000000
--- a/docs/book/v1/server-setup.md
+++ /dev/null
@@ -1,101 +0,0 @@
-## Server setup
-
-> Below instructions are working only on **AlmaLinux 9**
->
->For other OS's need to be adapted accordingly
-
-## Starting point
-A server with AlmaLinux 9 freshly installed, with root access and updated
-
-### Update OS
-dnf update
-
-### Create a new user with sudo permissions
-
-`useradd dotkernel`
-
-`passwd dotkernel`
-
-`usermod -aG wheel dotkernel`
-
-### reboot
-
-### SSH to the server as new user
-
-### Install various utilities
-
-`sudo dnf install -y dnf-utils`
-
-`sudo dnf install zip unzip socat wget`
-
-### PHP
-
-`sudo dnf install -y https://rpms.remirepo.net/enterprise/remi-release-9.rpm`
-
-`sudo dnf module enable php:remi-8.4`
-
-`sudo dnf install -y php php-cli php-common php-intl`
-
-### Start PHP-FPM
-
-`sudo systemctl start php-fpm`
-
-`sudo systemctl enable php-fpm`
-
-### Install and verify swoole
-
-`sudo dnf install php-pecl-swoole6`
-
-`php -i | grep swoole`
-
-### Valkey
-
-`sudo dnf install valkey`
-
-`sudo systemctl enable valkey`
-
-`sudo systemctl start valkey`
-
-`sudo valkey-cli ping`
-
-### Valkey PHP module
-
-`sudo dnf install php-pecl-redis`
-
-`php -i | grep redis`
-
-### GIT
-
-`sudo dnf install git`
-
-### Composer
-`wget https://getcomposer.org/installer -O composer-installer.php`
-
-`sudo chmod 777 /usr/local/bin`
-
-`php composer-installer.php --filename=composer --install-dir=/usr/local/bin`
-
-### Firewall setup
-
-In order to add a minimum level of security, a firewall needs to be installed and allows
-connections from outside only to certain ports, from certain IP's.
-
-> Firewall setup is not mandatory
-
-`sudo dnf install firewalld`
-
-`sudo systemctl enable firewalld`
-
-> Before starting the firewall, be sure you will not be locked outside
-
-`sudo firewall-offline-cmd --zone=public --add-port=22/tcp --permanent`
-
-`sudo systemctl start firewalld`
-
-> By default, Swoole runs on port 8556. You can change that in the configuration file.
-
-`sudo firewall-cmd --permanent --add-rich-rule='rule family="ipv4" source address="YOUR_IP_ADDRESS" port port="8556" protocol="tcp" accept'`
-
-`sudo firewall-cmd --reload`
-
-## NOW THE SERVER IS READY
diff --git a/phpcs.xml b/phpcs.xml
index 1295591..1d560a0 100644
--- a/phpcs.xml
+++ b/phpcs.xml
@@ -13,16 +13,15 @@
config
+ public
src
test
config/config.php
- config/routes.php
- src/Core/src/App/src/Migration/*
config/pipeline.php
diff --git a/phpstan.neon b/phpstan.neon
index ed056f0..9da6dbc 100644
--- a/phpstan.neon
+++ b/phpstan.neon
@@ -4,6 +4,8 @@ includes:
parameters:
level: 5
paths:
+ - config
+ - public
- src
- test
treatPhpDocTypesAsCertain: false
diff --git a/phpunit.xml.dist b/phpunit.xml.dist
index 324aa42..7a397a0 100644
--- a/phpunit.xml.dist
+++ b/phpunit.xml.dist
@@ -10,8 +10,7 @@
displayDetailsOnTestsThatTriggerErrors="true"
displayDetailsOnTestsThatTriggerNotices="true"
displayDetailsOnTestsThatTriggerWarnings="true"
- colors="true"
->
+ colors="true">
./test
diff --git a/public/index.php b/public/index.php
index 1c0c9bc..ee49413 100644
--- a/public/index.php
+++ b/public/index.php
@@ -2,6 +2,10 @@
declare(strict_types=1);
+use Mezzio\Application;
+use Mezzio\MiddlewareFactory;
+use Psr\Container\ContainerInterface;
+
// Delegate static file requests back to the PHP built-in webserver
if (PHP_SAPI === 'cli-server' && $_SERVER['SCRIPT_FILENAME'] !== __FILE__) {
return false;
@@ -14,17 +18,15 @@
* Self-called anonymous function that creates its own scope and keeps the global namespace clean.
*/
(function () {
- /** @var \Psr\Container\ContainerInterface $container */
+ /** @var ContainerInterface $container */
$container = require 'config/container.php';
- /** @var \Mezzio\Application $app */
- $app = $container->get(\Mezzio\Application::class);
- $factory = $container->get(\Mezzio\MiddlewareFactory::class);
+ /** @var Application $app */
+ $app = $container->get(Application::class);
+ $factory = $container->get(MiddlewareFactory::class);
- // Execute programmatic/declarative middleware pipeline and routing
- // configuration statements
+ // Execute programmatic/declarative middleware pipeline and routing configuration statements
(require 'config/pipeline.php')($app, $factory, $container);
- (require 'config/routes.php')($app, $factory, $container);
$app->run();
})();
diff --git a/renovate.json b/renovate.json
deleted file mode 100644
index 7a58182..0000000
--- a/renovate.json
+++ /dev/null
@@ -1,10 +0,0 @@
-{
- "$schema": "https://docs.renovatebot.com/renovate-schema.json",
- "extends": [
- "local>mezzio/.github:renovate-config"
- ],
- "constraints": {
- "composer": "^2.2",
- "php": "^7.4"
- }
-}
diff --git a/src/App/ConfigProvider.php b/src/App/ConfigProvider.php
index 9d6963d..8040eb4 100644
--- a/src/App/ConfigProvider.php
+++ b/src/App/ConfigProvider.php
@@ -10,8 +10,8 @@
use Netglue\PsrContainer\Messenger\Container\Middleware\MessageHandlerMiddlewareStaticFactory;
use Netglue\PsrContainer\Messenger\Container\Middleware\MessageSenderMiddlewareStaticFactory;
use Netglue\PsrContainer\Messenger\HandlerLocator\OneToManyFqcnContainerHandlerLocator;
-use Queue\App\Message\ExampleMessage;
-use Queue\App\Message\ExampleMessageHandler;
+use Queue\App\Message\Message;
+use Queue\App\Message\MessageHandler;
use Symfony\Component\Messenger\MessageBusInterface;
class ConfigProvider
@@ -19,37 +19,27 @@ class ConfigProvider
public function __invoke(): array
{
return [
- "dependencies" => $this->getDependencies(),
+ 'dependencies' => $this->getDependencies(),
'symfony' => [
'messenger' => [
'buses' => $this->busConfig(),
],
],
- 'templates' => $this->getTemplates(),
];
}
private function getDependencies(): array
{
return [
- "factories" => [
- "message_bus" => [MessageBusStaticFactory::class, "message_bus"],
- "message_bus_stamp_middleware" => [BusNameStampMiddlewareStaticFactory::class, "message_bus"],
- "message_bus_sender_middleware" => [MessageSenderMiddlewareStaticFactory::class, "message_bus"],
- "message_bus_handler_middleware" => [MessageHandlerMiddlewareStaticFactory::class, "message_bus"],
- ExampleMessageHandler::class => AttributedServiceFactory::class,
+ 'factories' => [
+ 'message_bus' => [MessageBusStaticFactory::class, 'message_bus'],
+ 'message_bus_stamp_middleware' => [BusNameStampMiddlewareStaticFactory::class, 'message_bus'],
+ 'message_bus_sender_middleware' => [MessageSenderMiddlewareStaticFactory::class, 'message_bus'],
+ 'message_bus_handler_middleware' => [MessageHandlerMiddlewareStaticFactory::class, 'message_bus'],
+ MessageHandler::class => AttributedServiceFactory::class,
],
- "aliases" => [
- MessageBusInterface::class => "message_bus",
- ],
- ];
- }
-
- public function getTemplates(): array
- {
- return [
- 'paths' => [
- 'notification-email' => [__DIR__ . '/templates'],
+ 'aliases' => [
+ MessageBusInterface::class => 'message_bus',
],
];
}
@@ -57,7 +47,7 @@ public function getTemplates(): array
private function busConfig(): array
{
return [
- "message_bus" => [
+ 'message_bus' => [
// Means that it's an error if no handlers are defined for a given message
'allows_zero_handlers' => false,
@@ -68,7 +58,7 @@ private function busConfig(): array
*/
'middleware' => [
// … Middleware that inspects the message before it has been sent to a transport would go here.
- "message_bus_stamp_middleware",
+ 'message_bus_stamp_middleware',
'message_bus_sender_middleware', // Sends messages via a transport if configured.
'message_bus_handler_middleware', // Executes the handlers configured for the message
],
@@ -81,23 +71,23 @@ private function busConfig(): array
*/
'handler_locator' => OneToManyFqcnContainerHandlerLocator::class,
'handlers' => [
- ExampleMessage::class => [ExampleMessageHandler::class],
+ Message::class => [MessageHandler::class],
],
/**
* Routes define which transport(s) that messages dispatched on this bus should be sent with.
*
* The * wildcard applies to all messages.
- * The transport for each route must be an array of one or more transport identifiers. Each transport
- * is retrieved from the DI container by this value.
+ * The transport for each route must be an array of one or more transport identifiers.
+ * This value retrieves each transport from the DI container.
*
* An empty routes definition would mean that messages would be handled immediately and synchronously,
- * i.e. no transport would be used.
+ * i.e., no transport would be used.
*
* Route specific messages to specific transports by using the message name as the key.
*/
'routes' => [
- ExampleMessage::class => ["redis_transport"],
+ Message::class => ['redis_transport'],
],
],
];
diff --git a/src/App/Message/ExampleMessageHandler.php b/src/App/Message/ExampleMessageHandler.php
deleted file mode 100644
index e2414a0..0000000
--- a/src/App/Message/ExampleMessageHandler.php
+++ /dev/null
@@ -1,26 +0,0 @@
-logger->info("message: " . $message->getPayload()['foo']);
- }
-}
diff --git a/src/App/Message/ExampleMessage.php b/src/App/Message/Message.php
similarity index 66%
rename from src/App/Message/ExampleMessage.php
rename to src/App/Message/Message.php
index d094f4d..2ac080b 100644
--- a/src/App/Message/ExampleMessage.php
+++ b/src/App/Message/Message.php
@@ -4,7 +4,7 @@
namespace Queue\App\Message;
-class ExampleMessage
+class Message
{
public function __construct(
private array $payload,
@@ -15,4 +15,9 @@ public function getPayload(): array
{
return $this->payload;
}
+
+ public function setPayload(array $payload): void
+ {
+ $this->payload = $payload;
+ }
}
diff --git a/src/App/Message/MessageHandler.php b/src/App/Message/MessageHandler.php
new file mode 100644
index 0000000..39a0d2b
--- /dev/null
+++ b/src/App/Message/MessageHandler.php
@@ -0,0 +1,60 @@
+getPayload();
+
+ try {
+ if ($payload['foo'] === 'control') {
+ //user control message to log successfully processed message
+ $this->logger->info($payload['foo'] . ' processed successfully');
+ } elseif ($payload['foo'] === 'retry') {
+ //user retry message to test retry functionality
+ throw new \RuntimeException("Intentional failure for testing retries");
+ }
+ } catch (\Throwable $e) {
+ $retryCount = $payload['retry_count'] ?? 0;
+
+ if ($retryCount === 0) {
+ $this->logger->error(
+ "Message '{$payload['foo']}' failed because: " . $e->getMessage()
+ );
+ } else {
+ $this->logger->error(
+ "Message '{$payload['foo']}' failed because: " . $e->getMessage() . " Retry {$retryCount}"
+ );
+ }
+
+ $payload['retry_count'] = $retryCount + 1;
+ $message->setPayload($payload);
+
+ throw $e;
+ }
+ }
+}
diff --git a/src/Swoole/Command/Factory/StopCommandFactory.php b/src/Swoole/Command/Factory/StopCommandFactory.php
index d9d3de4..f21ee9f 100644
--- a/src/Swoole/Command/Factory/StopCommandFactory.php
+++ b/src/Swoole/Command/Factory/StopCommandFactory.php
@@ -4,12 +4,18 @@
namespace Queue\Swoole\Command\Factory;
+use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
+use Psr\Container\NotFoundExceptionInterface;
use Queue\Swoole\Command\StopCommand;
use Queue\Swoole\PidManager;
class StopCommandFactory
{
+ /**
+ * @throws ContainerExceptionInterface
+ * @throws NotFoundExceptionInterface
+ */
public function __invoke(ContainerInterface $container): StopCommand
{
return new StopCommand($container->get(PidManager::class));
diff --git a/src/Swoole/Command/GetFailedMessagesCommand.php b/src/Swoole/Command/GetFailedMessagesCommand.php
new file mode 100644
index 0000000..02c6638
--- /dev/null
+++ b/src/Swoole/Command/GetFailedMessagesCommand.php
@@ -0,0 +1,133 @@
+setDescription('Get processing failure messages.')
+ ->addOption('start', null, InputOption::VALUE_OPTIONAL, 'Start timestamp (Y-m-d H:i:s)')
+ ->addOption('end', null, InputOption::VALUE_OPTIONAL, 'End timestamp (Y-m-d H:i:s)')
+ ->addOption('limit', null, InputOption::VALUE_OPTIONAL, 'Limit in days');
+ }
+
+ protected function execute(InputInterface $input, OutputInterface $output): int
+ {
+ try {
+ $startOption = $input->getOption('start');
+ $endOption = $input->getOption('end');
+ $limit = $input->getOption('limit');
+
+ $startDate = $startOption ? new DateTimeImmutable($startOption) : null;
+ $endDate = $endOption ? new DateTimeImmutable($endOption) : null;
+ } catch (Exception) {
+ $output->writeln('Invalid date format provided.');
+ return Command::FAILURE;
+ }
+
+ if ($startDate && $startDate->format('H:i:s') === '00:00:00') {
+ $startDate = $startDate->setTime(0, 0);
+ }
+
+ if ($endDate && $endDate->format('H:i:s') === '00:00:00') {
+ $endDate = $endDate->setTime(23, 59, 59);
+ }
+
+ if ($limit && is_numeric($limit)) {
+ if ($startDate && ! $endDate) {
+ $endDate = $startDate->modify("+$limit days");
+ } elseif (! $startDate && $endDate) {
+ $startDate = $endDate->modify("-$limit days");
+ }
+ }
+
+ if (! $endDate) {
+ $endDate = new DateTime();
+ }
+
+ if ($startDate > $endDate) {
+ $output->writeln('The start date cannot be after the end date.');
+ return Command::FAILURE;
+ }
+
+ $logPath = dirname(__DIR__, 3) . '/log/queue-log.log';
+
+ if (! file_exists($logPath)) {
+ $output->writeln("Log file was not found: $logPath");
+ return Command::FAILURE;
+ }
+
+ $lines = file($logPath, FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES);
+
+ $startTimestamp = $startDate?->getTimestamp();
+ $endTimestamp = $endDate->getTimestamp();
+
+ $found = false;
+
+ foreach ($lines as $line) {
+ $entry = json_decode($line, true);
+
+ if (! $entry || ! isset($entry['levelName'], $entry['timestamp'])) {
+ continue;
+ }
+
+ if (strtolower($entry['levelName']) !== 'error') {
+ continue;
+ }
+
+ $logTimestamp = strtotime($entry['timestamp']);
+ if (
+ ($startTimestamp && $logTimestamp < $startTimestamp) ||
+ ($endTimestamp && $logTimestamp > $endTimestamp)
+ ) {
+ continue;
+ }
+
+ $output->writeln($line);
+ $found = true;
+ }
+
+ if (! $found) {
+ $output->writeln('No matching log entries found.');
+ }
+
+ return Command::SUCCESS;
+ }
+}
diff --git a/src/Swoole/Command/GetProcessedMessagesCommand.php b/src/Swoole/Command/GetProcessedMessagesCommand.php
new file mode 100644
index 0000000..d92ff4b
--- /dev/null
+++ b/src/Swoole/Command/GetProcessedMessagesCommand.php
@@ -0,0 +1,133 @@
+setDescription('Get successfully processed messages')
+ ->addOption('start', null, InputOption::VALUE_OPTIONAL, 'Start timestamp (Y-m-d H:i:s)')
+ ->addOption('end', null, InputOption::VALUE_OPTIONAL, 'End timestamp (Y-m-d H:i:s)')
+ ->addOption('limit', null, InputOption::VALUE_OPTIONAL, 'Limit in days');
+ }
+
+ protected function execute(InputInterface $input, OutputInterface $output): int
+ {
+ try {
+ $startOption = $input->getOption('start');
+ $endOption = $input->getOption('end');
+ $limit = $input->getOption('limit');
+
+ $startDate = $startOption ? new DateTimeImmutable($startOption) : null;
+ $endDate = $endOption ? new DateTimeImmutable($endOption) : null;
+ } catch (Exception) {
+ $output->writeln('Invalid date format provided.');
+ return Command::FAILURE;
+ }
+
+ if ($startDate && $startDate->format('H:i:s') === '00:00:00') {
+ $startDate = $startDate->setTime(0, 0);
+ }
+
+ if ($endDate && $endDate->format('H:i:s') === '00:00:00') {
+ $endDate = $endDate->setTime(23, 59, 59);
+ }
+
+ if ($limit && is_numeric($limit)) {
+ if ($startDate && ! $endDate) {
+ $endDate = $startDate->modify("+$limit days");
+ } elseif (! $startDate && $endDate) {
+ $startDate = $endDate->modify("-$limit days");
+ }
+ }
+
+ if (! $endDate) {
+ $endDate = new DateTime();
+ }
+
+ if ($startDate > $endDate) {
+ $output->writeln('The start date cannot be after the end date.');
+ return Command::FAILURE;
+ }
+
+ $logPath = dirname(__DIR__, 3) . '/log/queue-log.log';
+
+ if (! file_exists($logPath)) {
+ $output->writeln("Log file was not found: $logPath");
+ return Command::FAILURE;
+ }
+
+ $lines = file($logPath, FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES);
+
+ $startTimestamp = $startDate?->getTimestamp();
+ $endTimestamp = $endDate->getTimestamp();
+
+ $found = false;
+
+ foreach ($lines as $line) {
+ $entry = json_decode($line, true);
+
+ if (! $entry || ! isset($entry['levelName'], $entry['timestamp'])) {
+ continue;
+ }
+
+ if (strtolower($entry['levelName']) !== 'info') {
+ continue;
+ }
+
+ $logTimestamp = strtotime($entry['timestamp']);
+ if (
+ ($startTimestamp && $logTimestamp < $startTimestamp) ||
+ ($endTimestamp && $logTimestamp > $endTimestamp)
+ ) {
+ continue;
+ }
+
+ $output->writeln($line);
+ $found = true;
+ }
+
+ if (! $found) {
+ $output->writeln('No matching log entries found.');
+ }
+
+ return Command::SUCCESS;
+ }
+}
diff --git a/src/Swoole/Command/GetQueuedMessagesCommand.php b/src/Swoole/Command/GetQueuedMessagesCommand.php
new file mode 100644
index 0000000..ed039cd
--- /dev/null
+++ b/src/Swoole/Command/GetQueuedMessagesCommand.php
@@ -0,0 +1,69 @@
+redis = $redis;
+ }
+
+ protected function configure(): void
+ {
+ $this->setDescription('Get all queued messages from Redis stream "messages"');
+ }
+
+ /**
+ * @throws RedisException
+ */
+ protected function execute(InputInterface $input, OutputInterface $output): int
+ {
+ $entries = $this->redis->xRange('messages', '-', '+');
+
+ if (empty($entries)) {
+ $output->writeln('No messages queued found in Redis stream "messages".');
+ return Command::SUCCESS;
+ }
+
+ foreach ($entries as $id => $entry) {
+ $output->writeln("Message ID: $id");
+ $output->writeln(json_encode($entry, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE));
+ $output->writeln(str_repeat('-', 40));
+ }
+
+ $total = count($entries);
+ $output->writeln("Total queued messages in stream 'messages': $total");
+ $output->writeln(str_repeat('-', 40));
+
+ return Command::SUCCESS;
+ }
+}
diff --git a/src/Swoole/Command/IsRunningTrait.php b/src/Swoole/Command/IsRunningTrait.php
index 6e7966a..a7ceb02 100644
--- a/src/Swoole/Command/IsRunningTrait.php
+++ b/src/Swoole/Command/IsRunningTrait.php
@@ -9,7 +9,7 @@
trait IsRunningTrait
{
/**
- * Is the swoole server running?
+ * Is the Swoole server running?
*/
public function isRunning(): bool
{
diff --git a/src/Swoole/Command/StartCommand.php b/src/Swoole/Command/StartCommand.php
index 62ac0b2..c1eb1cd 100644
--- a/src/Swoole/Command/StartCommand.php
+++ b/src/Swoole/Command/StartCommand.php
@@ -4,7 +4,9 @@
namespace Queue\Swoole\Command;
+use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
+use Psr\Container\NotFoundExceptionInterface;
use Queue\Swoole\PidManager;
use Swoole\Server as SwooleServer;
use Symfony\Component\Console\Attribute\AsCommand;
@@ -24,8 +26,6 @@ class StartCommand extends Command
public const DEFAULT_PROCESS_NAME = 'dotkernel-queue';
- public const DEFAULT_NUM_WORKERS = 4;
-
public const HELP = <<<'EOH'
Start the web server. If --daemonize is provided, starts the server as a
background process and returns handling to the shell; otherwise, the
@@ -35,8 +35,7 @@ class StartCommand extends Command
do not provide the option, 4 workers will be started.
EOH;
- /** @var ContainerInterface */
- private $container;
+ private ContainerInterface $container;
public function __construct(ContainerInterface $container, string $name = 'start')
{
@@ -51,6 +50,10 @@ protected function configure(): void
$this->setHelp(self::HELP);
}
+ /**
+ * @throws NotFoundExceptionInterface
+ * @throws ContainerExceptionInterface
+ */
protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->pidManager = $this->container->get(PidManager::class);
@@ -61,8 +64,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$server = $this->container->get(SwooleServer::class);
$config = $this->container->get('config');
- $processName = $config['dotkernel-queue-swoole']['swoole-server']['process-name']
- ?? self::DEFAULT_PROCESS_NAME;
+ $processName = $config['dotkernel-queue-swoole']['swoole-server']['process-name'] ?? self::DEFAULT_PROCESS_NAME;
$pidManager = $this->pidManager;
diff --git a/src/Swoole/Command/StopCommand.php b/src/Swoole/Command/StopCommand.php
index 614b544..b64d2b3 100644
--- a/src/Swoole/Command/StopCommand.php
+++ b/src/Swoole/Command/StopCommand.php
@@ -28,22 +28,20 @@ class StopCommand extends Command
/**
* @internal
*
- * @var Closure Callable to execute when attempting to kill the server
- * process. Generally speaking, this is SwooleProcess::kill; only
- * change the value when testing.
+ * Callable to execute when attempting to kill the server process.
+ * Generally speaking, this is SwooleProcess::kill; only change the value when testing.
*/
- public $killProcess;
+ public Closure $killProcess;
/**
* @internal
*
- * @var int How long to wait for the server process to end. Only change
- * the value when testing.
+ * How long to wait for the server process to end.
+ * Only change the value when testing.
*/
- public $waitThreshold = 60;
+ public int $waitThreshold = 60;
- /** @var PidManager */
- private $pidManager;
+ private PidManager $pidManager;
public function __construct(PidManager $pidManager, string $name = 'stop')
{
diff --git a/src/Swoole/ConfigProvider.php b/src/Swoole/ConfigProvider.php
index a468298..4a30284 100644
--- a/src/Swoole/ConfigProvider.php
+++ b/src/Swoole/ConfigProvider.php
@@ -4,8 +4,12 @@
namespace Queue\Swoole;
+use Dot\DependencyInjection\Factory\AttributedServiceFactory;
use Queue\Swoole\Command\Factory\StartCommandFactory;
use Queue\Swoole\Command\Factory\StopCommandFactory;
+use Queue\Swoole\Command\GetFailedMessagesCommand;
+use Queue\Swoole\Command\GetProcessedMessagesCommand;
+use Queue\Swoole\Command\GetQueuedMessagesCommand;
use Queue\Swoole\Command\StartCommand;
use Queue\Swoole\Command\StopCommand;
use Queue\Swoole\Delegators\TCPServerDelegator;
@@ -23,16 +27,18 @@ public function __invoke(): array
public function getDependencies(): array
{
return [
- "delegators" => [
+ 'delegators' => [
TCPSwooleServer::class => [TCPServerDelegator::class],
],
- "factories" => [
- TCPSwooleServer::class => ServerFactory::class,
- PidManager::class => PidManagerFactory::class,
- StartCommand::class => StartCommandFactory::class,
- StopCommand::class => StopCommandFactory::class,
+ 'factories' => [
+ TCPSwooleServer::class => ServerFactory::class,
+ PidManager::class => PidManagerFactory::class,
+ StartCommand::class => StartCommandFactory::class,
+ StopCommand::class => StopCommandFactory::class,
+ GetProcessedMessagesCommand::class => AttributedServiceFactory::class,
+ GetFailedMessagesCommand::class => AttributedServiceFactory::class,
+ GetQueuedMessagesCommand::class => AttributedServiceFactory::class,
],
- "aliases" => [],
];
}
}
diff --git a/src/Swoole/Delegators/TCPServerDelegator.php b/src/Swoole/Delegators/TCPServerDelegator.php
index a5e6a48..5bcbf84 100644
--- a/src/Swoole/Delegators/TCPServerDelegator.php
+++ b/src/Swoole/Delegators/TCPServerDelegator.php
@@ -4,14 +4,36 @@
namespace Queue\Swoole\Delegators;
+use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
-use Queue\App\Message\ExampleMessage;
+use Psr\Container\NotFoundExceptionInterface;
+use Queue\App\Message\Message;
+use Queue\Swoole\Command\GetFailedMessagesCommand;
+use Queue\Swoole\Command\GetProcessedMessagesCommand;
+use Queue\Swoole\Command\GetQueuedMessagesCommand;
use Swoole\Server as TCPSwooleServer;
+use Symfony\Component\Console\Application;
+use Symfony\Component\Console\Input\ArrayInput;
+use Symfony\Component\Console\Output\BufferedOutput;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
+use Throwable;
+
+use function array_merge;
+use function array_shift;
+use function explode;
+use function ltrim;
+use function str_starts_with;
+use function trim;
+
+use const PHP_EOL;
class TCPServerDelegator
{
+ /**
+ * @throws ContainerExceptionInterface
+ * @throws NotFoundExceptionInterface
+ */
public function __invoke(ContainerInterface $container, string $serviceName, callable $callback): TCPSwooleServer
{
/** @var TCPSwooleServer $server */
@@ -20,29 +42,65 @@ public function __invoke(ContainerInterface $container, string $serviceName, cal
/** @var MessageBusInterface $bus */
$bus = $container->get(MessageBusInterface::class);
- $logger = $container->get("dot-log.queue-log");
+ $logger = $container->get('dot-log.queue-log');
- $server->on('Connect', function ($server, $fd) {
- echo "Client: Connect.\n";
+ $server->on('connect', function (TCPSwooleServer $server, int $fd) {
+ echo 'Client: Connect.' . PHP_EOL;
});
- // Register the function for the event `receive`
- $server->on('receive', function ($server, $fd, $fromId, $data) use ($logger, $bus) {
- $bus->dispatch(new ExampleMessage(["foo" => $data]));
- $bus->dispatch(new ExampleMessage(["foo" => "with 5 seconds delay"]), [
- new DelayStamp(5000),
- ]);
-
- $server->send($fd, "Server: {$data}");
- $logger->notice("Request received on receive", [
- 'fd' => $fd,
- 'from_id' => $fromId,
- ]);
+ $server->on('receive', function ($server, $fd, $fromId, $data) use ($logger, $bus, $container) {
+ $commandMap = [
+ 'processed' => GetProcessedMessagesCommand::class,
+ 'failed' => GetFailedMessagesCommand::class,
+ 'inventory' => GetQueuedMessagesCommand::class,
+ ];
+
+ $message = trim($data);
+ $args = explode(' ', $message);
+ $commandName = array_shift($args);
+
+ if (isset($commandMap[$commandName])) {
+ $commandClass = $commandMap[$commandName];
+ $application = new Application();
+ $commandInstance = $container->get($commandClass);
+ $application->add($commandInstance);
+
+ $parsedOptions = [];
+ foreach ($args as $arg) {
+ if (str_starts_with($arg, '--')) {
+ [$key, $value] = explode('=', ltrim($arg, '-'), 2) + [null, null];
+ $parsedOptions["--$key"] = $value;
+ }
+ }
+
+ $inputData = array_merge(['command' => $commandName], $parsedOptions);
+ $input = new ArrayInput($inputData);
+ $output = new BufferedOutput();
+
+ try {
+ $application->setAutoExit(false);
+ $application->run($input, $output);
+ $response = $output->fetch();
+ $server->send($fd, $response);
+ } catch (Throwable $e) {
+ $logger->error('Error running command: ' . $e->getMessage());
+ }
+ } else {
+ $bus->dispatch(new Message(['foo' => $message]));
+ $bus->dispatch(new Message(['foo' => 'with 5 seconds delay']), [
+ new DelayStamp(5000),
+ ]);
+
+ $logger->notice('TCP request received', [
+ 'fd' => $fd,
+ 'from_id' => $fromId,
+ 'data' => $data,
+ ]);
+ }
});
- // Listen for the 'Close' event.
- $server->on('Close', function ($server, $fd) {
- echo "Client: Close.\n";
+ $server->on('close', function (TCPSwooleServer $server, int $fd) {
+ echo 'Client: Close.' . PHP_EOL;
});
return $server;
diff --git a/src/Swoole/PidManager.php b/src/Swoole/PidManager.php
index b21c1ab..427d529 100644
--- a/src/Swoole/PidManager.php
+++ b/src/Swoole/PidManager.php
@@ -22,7 +22,7 @@ public function __construct(private string $pidFile)
}
/**
- * Write master pid and manager pid to pid file
+ * Write master pid and manager pid to a pid file
*
* @throws RuntimeException When $pidFile is not writable.
*/
@@ -36,7 +36,7 @@ public function write(int $masterPid, int $managerPid): void
}
/**
- * Read master pid and manager pid from pid file
+ * Read master pid and manager pid from a pid file
*
* @return string[] Array with master and manager PID values as strings
*/
diff --git a/src/Swoole/PidManagerFactory.php b/src/Swoole/PidManagerFactory.php
index 9036816..8c06498 100644
--- a/src/Swoole/PidManagerFactory.php
+++ b/src/Swoole/PidManagerFactory.php
@@ -4,15 +4,22 @@
namespace Queue\Swoole;
+use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
+use Psr\Container\NotFoundExceptionInterface;
use function sys_get_temp_dir;
class PidManagerFactory
{
+ /**
+ * @throws ContainerExceptionInterface
+ * @throws NotFoundExceptionInterface
+ */
public function __invoke(ContainerInterface $container): PidManager
{
$config = $container->get('config');
+
return new PidManager(
$config['dotkernel-queue-swoole']['swoole-tcp-server']['options']['pid_file']
?? sys_get_temp_dir() . '/dotkernel-queue-swoole.pid'
diff --git a/src/Swoole/ServerFactory.php b/src/Swoole/ServerFactory.php
index 63354c1..5f7a59f 100644
--- a/src/Swoole/ServerFactory.php
+++ b/src/Swoole/ServerFactory.php
@@ -5,7 +5,10 @@
namespace Queue\Swoole;
use ArrayAccess;
+use ErrorException;
+use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
+use Psr\Container\NotFoundExceptionInterface;
use Queue\Swoole\Exception\InvalidArgumentException;
use Swoole\Runtime as SwooleRuntime;
use Swoole\Server as SwooleServer;
@@ -66,10 +69,12 @@ class ServerFactory
* @see https://www.swoole.co.uk/docs/modules/swoole-server-methods#swoole_server-__construct
* @see https://www.swoole.co.uk/docs/modules/swoole-server/predefined-constants for $mode and $protocol constant
*
+ * @throws ContainerExceptionInterface
+ * @throws ErrorException
* @throws InvalidArgumentException For invalid $port values.
* @throws InvalidArgumentException For invalid $mode values.
* @throws InvalidArgumentException For invalid $protocol values.
- * @throws \ErrorException
+ * @throws NotFoundExceptionInterface
*/
public function __invoke(ContainerInterface $container): SwooleServer
{
diff --git a/test/App/AppConfigProviderTest.php b/test/App/AppConfigProviderTest.php
new file mode 100644
index 0000000..62590b6
--- /dev/null
+++ b/test/App/AppConfigProviderTest.php
@@ -0,0 +1,23 @@
+config = (new ConfigProvider())();
+ }
+
+ public function testHasDependencies(): void
+ {
+ $this->assertArrayHasKey('dependencies', $this->config);
+ }
+}
diff --git a/test/App/Message/MessageHandlerTest.php b/test/App/Message/MessageHandlerTest.php
new file mode 100644
index 0000000..4c2277a
--- /dev/null
+++ b/test/App/Message/MessageHandlerTest.php
@@ -0,0 +1,108 @@
+bus = $this->createMock(MessageBusInterface::class);
+ $this->logger = new Logger([
+ 'writers' => [
+ 'FileWriter' => [
+ 'name' => 'null',
+ 'level' => Logger::ALERT,
+ ],
+ ],
+ ]);
+ $this->config = [
+ 'fail-safe' => [
+ 'first_retry' => 1000,
+ 'second_retry' => 2000,
+ 'third_retry' => 3000,
+ ],
+ 'notification' => [
+ 'server' => [
+ 'protocol' => 'tcp',
+ 'host' => 'localhost',
+ 'port' => '8556',
+ 'eof' => "\n",
+ ],
+ ],
+ 'application' => [
+ 'name' => 'dotkernel',
+ ],
+ ];
+
+ $this->handler = new MessageHandler($this->bus, $this->logger, $this->config);
+ }
+
+ public function testControlMessageDoesNotThrowAndDoesNotSetRetryCount(): void
+ {
+ $handler = $this->handler;
+
+ $message = new Message(['foo' => 'control']);
+ $handler($message);
+
+ $payload = $message->getPayload();
+ $this->assertArrayNotHasKey('retry_count', $payload);
+ }
+
+ public function testRetryMessageThrowsExceptionAndSetsRetryCount(): void
+ {
+ $handler = $this->handler;
+
+ $message = new Message(['foo' => 'retry']);
+
+ $this->expectException(RuntimeException::class);
+ $this->expectExceptionMessage("Intentional failure for testing retries");
+
+ try {
+ $handler($message);
+ } finally {
+ $payload = $message->getPayload();
+ $this->assertArrayHasKey('retry_count', $payload);
+ $this->assertEquals(1, $payload['retry_count']); // first retry
+ }
+ }
+
+ public function testRetryMessageWithExistingRetryCountIncrementsIt(): void
+ {
+ $handler = $this->handler;
+
+ $message = new Message([
+ 'foo' => 'retry',
+ 'retry_count' => 2,
+ ]);
+
+ $this->expectException(RuntimeException::class);
+
+ try {
+ $handler($message);
+ } finally {
+ $payload = $message->getPayload();
+ $this->assertEquals(3, $payload['retry_count']); // incremented from 2 → 3
+ }
+ }
+}
diff --git a/test/App/Message/MessageTest.php b/test/App/Message/MessageTest.php
new file mode 100644
index 0000000..8c733a6
--- /dev/null
+++ b/test/App/Message/MessageTest.php
@@ -0,0 +1,17 @@
+ 'test message payload']);
+ $this->assertSame(['payload' => 'test message payload'], $admin->getPayload());
+ }
+}
diff --git a/test/Swoole/Command/Factory/StartCommandFactoryTest.php b/test/Swoole/Command/Factory/StartCommandFactoryTest.php
new file mode 100644
index 0000000..40e63a0
--- /dev/null
+++ b/test/Swoole/Command/Factory/StartCommandFactoryTest.php
@@ -0,0 +1,27 @@
+createMock(ContainerInterface::class);
+
+ $factory = new StartCommandFactory();
+ $command = $factory($container);
+
+ $this->assertContainsOnlyInstancesOf(StartCommand::class, [$command]);
+ }
+}
diff --git a/test/Swoole/Command/Factory/StopCommandFactoryTest.php b/test/Swoole/Command/Factory/StopCommandFactoryTest.php
new file mode 100644
index 0000000..a2ca055
--- /dev/null
+++ b/test/Swoole/Command/Factory/StopCommandFactoryTest.php
@@ -0,0 +1,35 @@
+createMock(PidManager::class);
+
+ $container = $this->createMock(ContainerInterface::class);
+ $container->expects($this->once())
+ ->method('get')
+ ->with(PidManager::class)
+ ->willReturn($pidManager);
+
+ $factory = new StopCommandFactory();
+
+ $command = $factory($container);
+
+ $this->assertContainsOnlyInstancesOf(StopCommand::class, [$command]);
+ }
+}
diff --git a/test/Swoole/Command/GetDataFromLogsCommandTest.php b/test/Swoole/Command/GetDataFromLogsCommandTest.php
new file mode 100644
index 0000000..14f5e99
--- /dev/null
+++ b/test/Swoole/Command/GetDataFromLogsCommandTest.php
@@ -0,0 +1,204 @@
+logPath = $logDir . '/queue-log.log';
+ if (! is_dir($logDir)) {
+ mkdir($logDir, 0777, true);
+ }
+ file_put_contents($this->logPath, '');
+ }
+
+ protected function tearDown(): void
+ {
+ if (file_exists($this->logPath)) {
+ unlink($this->logPath);
+ }
+ }
+
+ /**
+ * @dataProvider commandProvider
+ */
+ public function testInvalidDateFormat(string $commandClass): void
+ {
+ $command = new $commandClass();
+ $input = new ArrayInput(['--start' => 'not-a-date']);
+ $output = new BufferedOutput();
+
+ $exit = $command->run($input, $output);
+
+ $this->assertEquals(Command::FAILURE, $exit);
+ $this->assertStringContainsString('Invalid date format', $output->fetch());
+ }
+
+ /**
+ * @dataProvider commandProvider
+ */
+ public function testStartAfterEnd(string $commandClass): void
+ {
+ $command = new $commandClass();
+ $input = new ArrayInput([
+ '--start' => '2024-01-02 00:00:00',
+ '--end' => '2024-01-01 00:00:00',
+ ]);
+ $output = new BufferedOutput();
+
+ $exit = $command->run($input, $output);
+
+ $this->assertEquals(Command::FAILURE, $exit);
+ $this->assertStringContainsString('start date cannot be after the end date', $output->fetch());
+ }
+
+ /**
+ * @dataProvider commandProvider
+ */
+ public function testMissingLogFile(string $commandClass): void
+ {
+ unlink($this->logPath);
+
+ $command = new $commandClass();
+ $input = new ArrayInput([]);
+ $output = new BufferedOutput();
+
+ $exit = $command->run($input, $output);
+
+ $this->assertEquals(Command::FAILURE, $exit);
+ $this->assertStringContainsString('Log file was not found', $output->fetch());
+ }
+
+ /**
+ * @dataProvider commandProvider
+ */
+ public function testNoMatchingEntries(string $commandClass): void
+ {
+ file_put_contents($this->logPath, json_encode([
+ 'levelName' => 'debug',
+ 'timestamp' => '2024-01-01 12:00:00',
+ ]) . PHP_EOL);
+
+ $command = new $commandClass();
+ $input = new ArrayInput([]);
+ $output = new BufferedOutput();
+
+ $exit = $command->run($input, $output);
+
+ $this->assertEquals(Command::SUCCESS, $exit);
+ $this->assertStringContainsString('No matching log entries found', $output->fetch());
+ }
+
+ /**
+ * @dataProvider commandProvider
+ */
+ public function testMalformedLogLineIgnored(string $commandClass): void
+ {
+ file_put_contents($this->logPath, 'not-a-json' . PHP_EOL);
+
+ $command = new $commandClass();
+ $input = new ArrayInput([]);
+ $output = new BufferedOutput();
+
+ $exit = $command->run($input, $output);
+
+ $this->assertEquals(Command::SUCCESS, $exit);
+ $this->assertStringContainsString('No matching log entries found', $output->fetch());
+ }
+
+ /**
+ * @dataProvider levelProvider
+ */
+ public function testMatchEntryOutput(string $commandClass, string $expectedLevel): void
+ {
+ $line = json_encode([
+ 'levelName' => $expectedLevel,
+ 'timestamp' => (new DateTime())->format('Y-m-d H:i:s'),
+ 'message' => 'Message here',
+ ]);
+ file_put_contents($this->logPath, $line . PHP_EOL);
+
+ $command = new $commandClass();
+ $input = new ArrayInput([]);
+ $output = new BufferedOutput();
+
+ $exit = $command->run($input, $output);
+
+ $this->assertEquals(Command::SUCCESS, $exit);
+ $this->assertStringContainsString('Message here', $output->fetch());
+ }
+
+ /**
+ * @throws Exception
+ * @throws ExceptionInterface
+ */
+ public function testLimitAddsDaysToStartDateOnly(): void
+ {
+ $start = '2024-01-01 00:00:00';
+ $limit = 5;
+
+ $command = new GetProcessedMessagesCommand();
+ $input = new ArrayInput([
+ '--start' => $start,
+ '--limit' => $limit,
+ ]);
+
+ $output = new BufferedOutput();
+ $logDate = (new DateTimeImmutable($start))->modify("+$limit days")->format('Y-m-d H:i:s');
+
+ file_put_contents($this->logPath, json_encode([
+ 'levelName' => 'info',
+ 'timestamp' => $logDate,
+ 'message' => 'Auto-inferred end',
+ ]) . PHP_EOL);
+
+ $exit = $command->run($input, $output);
+
+ $this->assertEquals(Command::SUCCESS, $exit);
+ $this->assertStringContainsString('Auto-inferred end', $output->fetch());
+ }
+
+ public static function commandProvider(): array
+ {
+ return [
+ [GetProcessedMessagesCommand::class],
+ [GetFailedMessagesCommand::class],
+ ];
+ }
+
+ public static function levelProvider(): array
+ {
+ return [
+ [GetProcessedMessagesCommand::class, 'info'],
+ [GetFailedMessagesCommand::class, 'error'],
+ ];
+ }
+}
diff --git a/test/Swoole/Command/GetQueuedMessagesCommandTest.php b/test/Swoole/Command/GetQueuedMessagesCommandTest.php
new file mode 100644
index 0000000..969842a
--- /dev/null
+++ b/test/Swoole/Command/GetQueuedMessagesCommandTest.php
@@ -0,0 +1,106 @@
+redisMock = $this->createMock(Redis::class);
+ }
+
+ /**
+ * @throws ExceptionInterface
+ */
+ public function testExecuteWithNoMessages(): void
+ {
+ $this->redisMock
+ ->expects($this->once())
+ ->method('xRange')
+ ->with('messages', '-', '+')
+ ->willReturn([]);
+
+ $command = new GetQueuedMessagesCommand($this->redisMock);
+ $input = new ArrayInput([]);
+ $output = new BufferedOutput();
+
+ $exitCode = $command->run($input, $output);
+ $outputText = $output->fetch();
+
+ $this->assertEquals(Command::SUCCESS, $exitCode);
+ $this->assertStringContainsString('No messages queued found', $outputText);
+ }
+
+ /**
+ * @throws ExceptionInterface
+ */
+ public function testExecuteWithMessages(): void
+ {
+ $fakeMessages = [
+ '1691000000000-0' => ['type' => 'testEmail', 'payload' => '{"to":"test@dotkernel.com"}'],
+ '1691000000001-0' => ['type' => 'testSms', 'payload' => '{"to":"+123456789"}'],
+ ];
+
+ $this->redisMock
+ ->expects($this->once())
+ ->method('xRange')
+ ->with('messages', '-', '+')
+ ->willReturn($fakeMessages);
+
+ $command = new GetQueuedMessagesCommand($this->redisMock);
+ $input = new ArrayInput([]);
+ $output = new BufferedOutput();
+
+ $exitCode = $command->run($input, $output);
+ $outputText = $output->fetch();
+
+ $this->assertEquals(Command::SUCCESS, $exitCode);
+
+ foreach (array_keys($fakeMessages) as $id) {
+ $this->assertStringContainsString('Message ID:', $outputText);
+ $this->assertStringContainsString($id, $outputText);
+ }
+
+ $this->assertStringContainsString('Total queued messages in stream', $outputText);
+ $this->assertStringContainsString((string) count($fakeMessages), $outputText);
+ }
+
+ /**
+ * @throws ExceptionInterface
+ */
+ public function testRedisThrowsException(): void
+ {
+ $this->redisMock
+ ->expects($this->once())
+ ->method('xRange')
+ ->willThrowException(new RedisException('Redis unavailable'));
+
+ $command = new GetQueuedMessagesCommand($this->redisMock);
+ $input = new ArrayInput([]);
+ $output = new BufferedOutput();
+
+ $this->expectException(RedisException::class);
+ $command->run($input, $output);
+ }
+}
diff --git a/test/Swoole/Command/IsRunningTraitTest.php b/test/Swoole/Command/IsRunningTraitTest.php
new file mode 100644
index 0000000..ef59e0d
--- /dev/null
+++ b/test/Swoole/Command/IsRunningTraitTest.php
@@ -0,0 +1,41 @@
+traitUser = new class {
+ use IsRunningTrait;
+
+ public PidManager $pidManager;
+ };
+
+ $this->traitUser->pidManager = $this->createMock(PidManager::class);
+ }
+
+ public function testIsRunningReturnsFalseWhenNoPids(): void
+ {
+ $this->traitUser->pidManager->method('read')->willReturn([]);
+ $this->assertFalse($this->traitUser->isRunning());
+ }
+
+ public function testIsRunningReturnsFalseWhenPidsAreZero(): void
+ {
+ $this->traitUser->pidManager->method('read')->willReturn([0, 0]);
+ $this->assertFalse($this->traitUser->isRunning());
+ }
+}
diff --git a/test/Swoole/Command/StartCommandTest.php b/test/Swoole/Command/StartCommandTest.php
new file mode 100644
index 0000000..246976b
--- /dev/null
+++ b/test/Swoole/Command/StartCommandTest.php
@@ -0,0 +1,92 @@
+createMock(InputInterface::class);
+ $output = $this->createMock(OutputInterface::class);
+ $pidManager = $this->createMock(PidManager::class);
+ $server = $this->createMock(Server::class);
+
+ $pidManager->method('read')->willReturn([]);
+
+ $server->master_pid = 1234;
+ $server->manager_pid = 4321;
+
+ $server->expects($this->once())->method('on');
+ $server->expects($this->once())->method('start');
+
+ $config = [
+ 'dotkernel-queue-swoole' => [
+ 'swoole-server' => [
+ 'process-name' => 'test-process',
+ ],
+ ],
+ ];
+
+ $container = $this->createMock(ContainerInterface::class);
+ $container->method('get')->willReturnCallback(function (string $id) use ($pidManager, $server, $config) {
+ return match ($id) {
+ PidManager::class => $pidManager,
+ Server::class => $server,
+ 'config' => $config,
+ default => null,
+ };
+ });
+
+ $command = new StartCommand($container);
+ $statusCode = $command->run($input, $output);
+
+ $this->assertSame(0, $statusCode);
+ }
+
+ /**
+ * @throws ExceptionInterface
+ * @throws Exception
+ */
+ public function testExecuteWhenServerIsAlreadyRunning(): void
+ {
+ $container = $this->createMock(ContainerInterface::class);
+ $pidManager = $this->createMock(PidManager::class);
+ $container->method('get')
+ ->with(PidManager::class)
+ ->willReturn($pidManager);
+
+ $input = $this->createMock(InputInterface::class);
+ $output = $this->createMock(OutputInterface::class);
+
+ $output->expects($this->once())
+ ->method('writeln')
+ ->with('Server is already running!');
+
+ $command = $this->getMockBuilder(StartCommand::class)
+ ->setConstructorArgs([$container])
+ ->onlyMethods(['isRunning'])
+ ->getMock();
+
+ $command->method('isRunning')->willReturn(true);
+
+ $exitCode = $command->run($input, $output);
+
+ $this->assertSame(1, $exitCode);
+ }
+}
diff --git a/test/Swoole/Command/StopCommandTest.php b/test/Swoole/Command/StopCommandTest.php
new file mode 100644
index 0000000..4e0dd12
--- /dev/null
+++ b/test/Swoole/Command/StopCommandTest.php
@@ -0,0 +1,88 @@
+createMock(PidManager::class);
+
+ $command = $this->getMockBuilder(StopCommand::class)
+ ->setConstructorArgs([$pidManager])
+ ->onlyMethods(['isRunning'])
+ ->getMock();
+
+ $command->method('isRunning')->willReturn(false);
+
+ $tester = new CommandTester($command);
+ $exitCode = $tester->execute([]);
+
+ $this->assertSame(0, $exitCode);
+ $this->assertStringContainsString('Server is not running', $tester->getDisplay());
+ }
+
+ /**
+ * @throws Exception
+ */
+ public function testExecuteWhenServerStopsSuccessfully(): void
+ {
+ $pidManager = $this->createMock(PidManager::class);
+ $pidManager->method('read')->willReturn(['1234']);
+ $pidManager->expects($this->once())->method('delete');
+
+ $command = $this->getMockBuilder(StopCommand::class)
+ ->setConstructorArgs([$pidManager])
+ ->onlyMethods(['isRunning'])
+ ->getMock();
+
+ $command->method('isRunning')->willReturn(true);
+
+ $command->killProcess = fn (): bool => true;
+
+ $tester = new CommandTester($command);
+ $exitCode = $tester->execute([]);
+
+ $this->assertSame(0, $exitCode);
+ $this->assertStringContainsString('Server stopped', $tester->getDisplay());
+ }
+
+ /**
+ * @throws Exception
+ */
+ public function testExecuteWhenServerFailsToStop(): void
+ {
+ $pidManager = $this->createMock(PidManager::class);
+ $pidManager->method('read')->willReturn(['1234']);
+ $pidManager->expects($this->never())->method('delete');
+
+ $command = $this->getMockBuilder(StopCommand::class)
+ ->setConstructorArgs([$pidManager])
+ ->onlyMethods(['isRunning'])
+ ->getMock();
+
+ $command->method('isRunning')->willReturn(true);
+ $command->waitThreshold = 1;
+
+ $command->killProcess = function (int $pid, ?int $signal = null): bool {
+ return $signal === 0;
+ };
+
+ $tester = new CommandTester($command);
+ $exitCode = $tester->execute([]);
+
+ $this->assertSame(1, $exitCode);
+ $this->assertStringContainsString('Error stopping server', $tester->getDisplay());
+ }
+}
diff --git a/test/Swoole/Delegators/DummySwooleServer.php b/test/Swoole/Delegators/DummySwooleServer.php
new file mode 100644
index 0000000..d864bb9
--- /dev/null
+++ b/test/Swoole/Delegators/DummySwooleServer.php
@@ -0,0 +1,37 @@
+ */
+ public array $callbacks = [];
+
+ public function __construct()
+ {
+ }
+
+ /**
+ * @param string $eventName
+ * @param callable $callback
+ */
+ public function on($eventName, $callback): bool
+ {
+ $this->callbacks[$eventName] = $callback;
+ return true;
+ }
+
+ /**
+ * @param int|string $fd
+ * @param string $data
+ * @param int $serverSocket
+ */
+ public function send($fd, $data, $serverSocket = -1): bool
+ {
+ return true;
+ }
+}
diff --git a/test/Swoole/Delegators/TCPServerDelegatorTest.php b/test/Swoole/Delegators/TCPServerDelegatorTest.php
new file mode 100644
index 0000000..15d690e
--- /dev/null
+++ b/test/Swoole/Delegators/TCPServerDelegatorTest.php
@@ -0,0 +1,256 @@
+logger = new Logger([
+ 'writers' => [
+ 'FileWriter' => [
+ 'name' => 'null',
+ 'level' => Logger::ALERT,
+ ],
+ ],
+ ]);
+
+ $this->bus = $this->createMock(MessageBusInterface::class);
+ $this->container = $this->createMock(ContainerInterface::class);
+ $this->server = new DummySwooleServer();
+ }
+
+ /**
+ * @throws ContainerExceptionInterface
+ * @throws NotFoundExceptionInterface
+ */
+ public function testCallbacksAreRegistered(): void
+ {
+ $callback = fn() => $this->server;
+
+ $this->container->method('get')->willReturnMap([
+ [MessageBusInterface::class, $this->bus],
+ ['dot-log.queue-log', $this->logger],
+ ]);
+
+ $delegator = new TCPServerDelegator();
+ $result = $delegator($this->container, 'tcp-server', $callback);
+
+ $this->assertSame($this->server, $result);
+ $this->assertArrayHasKey('connect', $this->server->callbacks);
+ $this->assertArrayHasKey('receive', $this->server->callbacks);
+ $this->assertArrayHasKey('close', $this->server->callbacks);
+
+ foreach (['connect', 'receive', 'close'] as $event) {
+ $this->assertIsCallable($this->server->callbacks[$event]);
+ }
+ }
+
+ /**
+ * @throws ContainerExceptionInterface
+ * @throws NotFoundExceptionInterface
+ */
+ public function testConnectOutputsExpectedString(): void
+ {
+ $callback = fn() => $this->server;
+
+ $this->container->method('get')->willReturnMap([
+ [MessageBusInterface::class, $this->bus],
+ ['dot-log.queue-log', $this->logger],
+ ]);
+
+ $delegator = new TCPServerDelegator();
+ $delegator($this->container, 'tcp-server', $callback);
+
+ $this->expectOutputString('Client: Connect.' . PHP_EOL);
+
+ $connectCb = $this->server->callbacks['connect'];
+ $connectCb($this->server, 1);
+ }
+
+ /**
+ * @throws ContainerExceptionInterface
+ * @throws NotFoundExceptionInterface
+ */
+ public function testCloseOutputsExpectedString(): void
+ {
+ $callback = fn() => $this->server;
+
+ $this->container->method('get')->willReturnMap([
+ [MessageBusInterface::class, $this->bus],
+ ['dot-log.queue-log', $this->logger],
+ ]);
+
+ $delegator = new TCPServerDelegator();
+ $delegator($this->container, 'tcp-server', $callback);
+
+ $this->expectOutputString('Client: Close.' . PHP_EOL);
+
+ $closeCb = $this->server->callbacks['close'];
+ $closeCb($this->server, 1);
+ }
+
+ /**
+ * @throws ContainerExceptionInterface
+ * @throws NotFoundExceptionInterface
+ */
+ public function testReceiveDispatchesMessagesAndLogsWhenUnknownCommand(): void
+ {
+ $callback = fn() => $this->server;
+
+ $this->bus->expects($this->exactly(2))
+ ->method('dispatch')
+ ->willReturnCallback(function ($message) {
+ static $callCount = 0;
+ $callCount++;
+
+ if ($callCount === 1) {
+ $this->assertInstanceOf(Message::class, $message);
+ $this->assertEquals('hello', $message->getPayload()['foo']);
+ } elseif ($callCount === 2) {
+ $this->assertInstanceOf(Message::class, $message);
+ $this->assertEquals('with 5 seconds delay', $message->getPayload()['foo']);
+ } else {
+ $this->fail('dispatch called more than twice');
+ }
+
+ return new Envelope($message);
+ });
+
+ $this->container->method('get')->willReturnMap([
+ [MessageBusInterface::class, $this->bus],
+ ['dot-log.queue-log', $this->logger],
+ ]);
+
+ $delegator = new TCPServerDelegator();
+ $delegator($this->container, 'tcp-server', $callback);
+
+ $receiveCb = $this->server->callbacks['receive'];
+
+ $receiveCb($this->server, 42, 5, "hello");
+ }
+
+ /**
+ * @throws ContainerExceptionInterface
+ * @throws NotFoundExceptionInterface
+ */
+ public function testReceiveExecutesKnownCommandSuccessfully(): void
+ {
+ $callback = fn() => $this->server;
+
+ $commandMock = $this->getMockBuilder(GetProcessedMessagesCommand::class)
+ ->onlyMethods(['execute'])
+ ->getMock();
+
+ $commandMock->method('execute')->willReturnCallback(function ($input, $output) {
+ $output->writeln('processed output text');
+ return 0;
+ });
+
+ $this->server = new class extends DummySwooleServer {
+ public ?string $sentData = null;
+
+ /**
+ * @param int $fd
+ * @param string $data
+ * @param int $serverSocket
+ */
+ public function send($fd, $data, $serverSocket = -1): bool
+ {
+ $this->sentData = $data;
+ return true;
+ }
+ };
+
+ $this->container->method('get')->willReturnMap([
+ [MessageBusInterface::class, $this->bus],
+ ['dot-log.queue-log', $this->logger],
+ [GetProcessedMessagesCommand::class, $commandMock],
+ ]);
+
+ $delegator = new TCPServerDelegator();
+ $delegator($this->container, 'tcp-server', $callback);
+
+ $receiveCb = $this->server->callbacks['receive'];
+
+ $receiveCb($this->server, 1, 1, "processed");
+
+ $this->assertNotNull($this->server->sentData);
+ $this->assertStringContainsString('processed output text', $this->server->sentData);
+ }
+
+ /**
+ * @throws ContainerExceptionInterface
+ * @throws NotFoundExceptionInterface
+ */
+ public function testReceiveParsesKnownOptions(): void
+ {
+ $callback = fn() => $this->server;
+
+ $this->server = new class extends DummySwooleServer {
+ public ?string $sentData = null;
+
+ /**
+ * @param int $fd
+ * @param string $data
+ * @param int $serverSocket
+ */
+ public function send($fd, $data, $serverSocket = -1): bool
+ {
+ $this->sentData = $data;
+ return true;
+ }
+ };
+
+ $commandMock = $this->getMockBuilder(GetProcessedMessagesCommand::class)
+ ->onlyMethods(['execute'])
+ ->getMock();
+
+ $commandMock->method('execute')->willReturnCallback(function ($input, $output) {
+ $output->writeln('processed output text with known options');
+ return 0;
+ });
+
+ $this->container->method('get')->willReturnMap([
+ [MessageBusInterface::class, $this->bus],
+ ['dot-log.queue-log', $this->logger],
+ [GetProcessedMessagesCommand::class, $commandMock],
+ ]);
+
+ $delegator = new TCPServerDelegator();
+ $delegator($this->container, 'tcp-server', $callback);
+
+ $receiveCb = $this->server->callbacks['receive'];
+
+ $receiveCb($this->server, 1, 1, "processed --start=1 --end=5");
+
+ $this->assertNotNull($this->server->sentData);
+ $this->assertStringContainsString('processed output text with known options', $this->server->sentData);
+ }
+}
diff --git a/test/Swoole/Exception/InvalidStaticResourceMiddlewareExceptionTest.php b/test/Swoole/Exception/InvalidStaticResourceMiddlewareExceptionTest.php
new file mode 100644
index 0000000..35cf1f9
--- /dev/null
+++ b/test/Swoole/Exception/InvalidStaticResourceMiddlewareExceptionTest.php
@@ -0,0 +1,33 @@
+assertContainsOnlyInstancesOf(InvalidStaticResourceMiddlewareException::class, [$exception]);
+
+ $expectedMessage = sprintf(
+ 'Static resource middleware must be callable; received middleware of type "%s" in position %s',
+ get_debug_type($middleware),
+ $position
+ );
+
+ $this->assertSame($expectedMessage, $exception->getMessage());
+ }
+}
diff --git a/test/Swoole/PidManagerFactoryTest.php b/test/Swoole/PidManagerFactoryTest.php
new file mode 100644
index 0000000..39f78b4
--- /dev/null
+++ b/test/Swoole/PidManagerFactoryTest.php
@@ -0,0 +1,57 @@
+ [
+ 'swoole-tcp-server' => [
+ 'options' => [
+ 'pid_file' => $expectedPath,
+ ],
+ ],
+ ],
+ ];
+
+ $container = $this->createMock(ContainerInterface::class);
+ $container->method('get')
+ ->with('config')
+ ->willReturn($config);
+
+ $factory = new PidManagerFactory();
+ $pidManager = $factory($container);
+
+ $pidFilePath = $this->getPrivateProperty($pidManager);
+ $this->assertSame($expectedPath, $pidFilePath);
+ }
+
+ /**
+ * @throws ReflectionException
+ */
+ private function getPrivateProperty(object $object): mixed
+ {
+ $reflection = new ReflectionClass($object);
+ $property = $reflection->getProperty('pidFile');
+ return $property->getValue($object);
+ }
+}
diff --git a/test/Swoole/PidManagerTest.php b/test/Swoole/PidManagerTest.php
new file mode 100644
index 0000000..f6c22e4
--- /dev/null
+++ b/test/Swoole/PidManagerTest.php
@@ -0,0 +1,92 @@
+tempPidFile = sys_get_temp_dir() . '/test.pid';
+ if (file_exists($this->tempPidFile)) {
+ unlink($this->tempPidFile);
+ }
+ }
+
+ protected function tearDown(): void
+ {
+ if (file_exists($this->tempPidFile)) {
+ unlink($this->tempPidFile);
+ }
+ }
+
+ public function testWriteAndReadPids(): void
+ {
+ $manager = new PidManager($this->tempPidFile);
+
+ $manager->write(12345, 67890);
+
+ $result = $manager->read();
+
+ $this->assertSame(['12345', '67890'], $result);
+ $this->assertFileExists($this->tempPidFile);
+ }
+
+ public function testDeleteRemovesPidFile(): void
+ {
+ file_put_contents($this->tempPidFile, 'dummyData');
+
+ $manager = new PidManager($this->tempPidFile);
+ $deleted = $manager->delete();
+
+ $this->assertTrue($deleted);
+ $this->assertFileDoesNotExist($this->tempPidFile);
+ }
+
+ public function testDeleteReturnsFalseIfFileNotWritable(): void
+ {
+ file_put_contents($this->tempPidFile, 'dummyData');
+ chmod($this->tempPidFile, 0444);
+
+ $manager = new PidManager($this->tempPidFile);
+ $result = $manager->delete();
+
+ $this->assertFalse($result);
+
+ chmod($this->tempPidFile, 0644);
+ }
+
+ public function testWriteThrowsWhenFileNotWritable(): void
+ {
+ $unwritableDir = sys_get_temp_dir() . '/unwritable_dir';
+ mkdir($unwritableDir, 0444);
+ $unwritableFile = $unwritableDir . '/file.pid';
+
+ $manager = new PidManager($unwritableFile);
+
+ $this->expectException(RuntimeException::class);
+ $this->expectExceptionMessageMatches('/not writable/');
+
+ try {
+ $manager->write(1, 2);
+ } finally {
+ chmod($unwritableDir, 0755);
+ rmdir($unwritableDir);
+ }
+ }
+}
diff --git a/test/Swoole/ServerFactoryTest.php b/test/Swoole/ServerFactoryTest.php
new file mode 100644
index 0000000..618c86c
--- /dev/null
+++ b/test/Swoole/ServerFactoryTest.php
@@ -0,0 +1,155 @@
+factory = new ServerFactory();
+ }
+
+ /**
+ * @throws ContainerExceptionInterface
+ * @throws NotFoundExceptionInterface
+ * @throws ErrorException
+ */
+ #[RunInSeparateProcess]
+ public function testInvokeWithMinimalValidConfig(): void
+ {
+ $config = [
+ 'dotkernel-queue-swoole' => [
+ 'swoole-tcp-server' => [],
+ ],
+ ];
+
+ $container = $this->createMock(ContainerInterface::class);
+ $container->method('get')->with('config')->willReturn($config);
+
+ $server = $this->factory->__invoke($container);
+
+ $this->assertContainsOnlyInstancesOf(Server::class, [$server]);
+ }
+
+ /**
+ * @throws ContainerExceptionInterface
+ * @throws NotFoundExceptionInterface
+ * @throws ErrorException
+ */
+ #[RunInSeparateProcess]
+ public function testInvokeWithCustomValidConfig(): void
+ {
+ $config = [
+ 'dotkernel-queue-swoole' => [
+ 'enable_coroutine' => true,
+ 'swoole-tcp-server' => [
+ 'host' => '127.0.0.1',
+ 'port' => 9502,
+ 'mode' => SWOOLE_BASE,
+ 'protocol' => SWOOLE_SOCK_TCP,
+ 'options' => [
+ 'worker_num' => 1,
+ ],
+ ],
+ ],
+ ];
+
+ $container = $this->createMock(ContainerInterface::class);
+ $container->method('get')->with('config')->willReturn($config);
+
+ $server = $this->factory->__invoke($container);
+
+ $this->assertContainsOnlyInstancesOf(Server::class, [$server]);
+ }
+
+ /**
+ * @throws ContainerExceptionInterface
+ * @throws NotFoundExceptionInterface
+ * @throws ErrorException
+ */
+ public function testThrowsOnInvalidPort(): void
+ {
+ $this->expectException(InvalidArgumentException::class);
+ $this->expectExceptionMessage('Invalid port');
+
+ $config = [
+ 'dotkernel-queue-swoole' => [
+ 'swoole-tcp-server' => [
+ 'port' => 70000,
+ ],
+ ],
+ ];
+
+ $container = $this->createMock(ContainerInterface::class);
+ $container->method('get')->with('config')->willReturn($config);
+
+ $this->factory->__invoke($container);
+ }
+
+ /**
+ * @throws ContainerExceptionInterface
+ * @throws NotFoundExceptionInterface
+ * @throws ErrorException
+ */
+ public function testThrowsOnInvalidMode(): void
+ {
+ $this->expectException(InvalidArgumentException::class);
+ $this->expectExceptionMessage('Invalid server mode');
+
+ $config = [
+ 'dotkernel-queue-swoole' => [
+ 'swoole-tcp-server' => [
+ 'mode' => -1,
+ ],
+ ],
+ ];
+
+ $container = $this->createMock(ContainerInterface::class);
+ $container->method('get')->with('config')->willReturn($config);
+
+ $this->factory->__invoke($container);
+ }
+
+ /**
+ * @throws ContainerExceptionInterface
+ * @throws NotFoundExceptionInterface
+ * @throws ErrorException
+ */
+ public function testThrowsOnInvalidProtocol(): void
+ {
+ $this->expectException(InvalidArgumentException::class);
+ $this->expectExceptionMessage('Invalid server protocol');
+
+ $config = [
+ 'dotkernel-queue-swoole' => [
+ 'swoole-tcp-server' => [
+ 'protocol' => -99,
+ ],
+ ],
+ ];
+
+ $container = $this->createMock(ContainerInterface::class);
+ $container->method('get')->with('config')->willReturn($config);
+
+ $this->factory->__invoke($container);
+ }
+}