diff --git a/DependencyInjection/AmqpConfiguration.php b/DependencyInjection/AmqpConfiguration.php
index a9de8cf..955bf2c 100644
--- a/DependencyInjection/AmqpConfiguration.php
+++ b/DependencyInjection/AmqpConfiguration.php
@@ -16,11 +16,18 @@
class AmqpConfiguration implements TransportSectionConfiguration
{
private static $DEFAULT_CONNECTION = [
- 'host' => 'localhost',
- 'port' => '5672',
- 'virtual_host' => '/',
- 'user' => 'guest',
- 'password' => 'guest',
+ 'connection' => [
+ 'servers' => [
+ [
+ 'host' => 'localhost',
+ 'port' => '5672',
+ 'virtual_host' => '/',
+ 'user' => 'guest',
+ 'password' => 'guest',
+ ]
+ ],
+ 'strategy' => 'RoundRobin',
+ ],
'exchanges' => [],
'queues' => []
];
@@ -52,11 +59,26 @@ public function getSectionDefinition()
->addDefaultChildrenIfNoneSet('default')
->prototype('array')
->children()
- ->scalarNode('host')->defaultValue(self::$DEFAULT_CONNECTION['host'])->end()
- ->scalarNode('port')->defaultValue(self::$DEFAULT_CONNECTION['port'])->end()
- ->scalarNode('virtual_host')->defaultValue(self::$DEFAULT_CONNECTION['virtual_host'])->end()
- ->scalarNode('user')->defaultValue(self::$DEFAULT_CONNECTION['user'])->end()
- ->scalarNode('password')->defaultValue(self::$DEFAULT_CONNECTION['password'])->end()
+ ->arrayNode('connection')
+ ->addDefaultsIfNotSet()
+ ->children()
+ ->arrayNode('servers')
+ ->requiresAtLeastOneElement()
+ ->addDefaultChildrenIfNoneSet(0)
+ ->prototype('array')
+ ->children()
+ ->scalarNode('host')->defaultValue(self::$DEFAULT_CONNECTION['connection']['servers'][0]['host'])->end()
+ ->scalarNode('port')->defaultValue(self::$DEFAULT_CONNECTION['connection']['servers'][0]['port'])->end()
+ ->scalarNode('virtual_host')->defaultValue(self::$DEFAULT_CONNECTION['connection']['servers'][0]['virtual_host'])->end()
+ ->scalarNode('user')->defaultValue(self::$DEFAULT_CONNECTION['connection']['servers'][0]['user'])->end()
+ ->scalarNode('password')->defaultValue(self::$DEFAULT_CONNECTION['connection']['servers'][0]['password'])->end()
+ ->end()
+ ->end()
+ ->end()
+ ->scalarNode('strategy')->defaultValue('RoundRobin')->end()
+ ->end()
+ ->end()
+
->arrayNode('exchanges')
->useAttributeAsKey('name')
->prototype('array')
diff --git a/DependencyInjection/EventBandExtension.php b/DependencyInjection/EventBandExtension.php
index 6e61b6b..237e1ee 100644
--- a/DependencyInjection/EventBandExtension.php
+++ b/DependencyInjection/EventBandExtension.php
@@ -10,8 +10,10 @@
namespace EventBand\Bundle\DependencyInjection;
use EventBand\Bundle\DependencyInjection\Compiler\SerializerPass;
+use Symfony\Component\DependencyInjection\Alias;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\Config\FileLocator;
+use Symfony\Component\DependencyInjection\Definition;
use Symfony\Component\DependencyInjection\DefinitionDecorator;
use Symfony\Component\DependencyInjection\Exception\InvalidArgumentException;
use Symfony\Component\DependencyInjection\Reference;
@@ -67,15 +69,30 @@ private function loadAmqpTransport(array $config, ContainerBuilder $container)
return $camelized;
};
+ $camelizeKeyRecursive = function (array $config) use (&$camelizeKey) {
+ $camelized = [];
+ foreach ($config as $key => $value) {
+ if (is_array($value)) {
+ $camelized[lcfirst(ContainerBuilder::camelize($key))] = $camelizeKey($value);
+ } else {
+ $camelized[lcfirst(ContainerBuilder::camelize($key))] = $value;
+ }
+ }
+
+ return $camelized;
+ };
+
$definitions = [];
foreach ($config['connections'] as $name => $connectionConfig) {
$exchanges = $connectionConfig['exchanges'];
unset($connectionConfig['exchanges']);
$queues = $connectionConfig['queues'];
unset($connectionConfig['queues']);
+ $servers = $connectionConfig['connection']['servers'];
+ unset($connectionConfig['connection']['servers']);
$amqp = new DefinitionDecorator('event_band.transport.amqp.definition');
- $amqp->addMethodCall('connection', [$camelizeKey($connectionConfig)]);
+ $amqp->addMethodCall('connections', [$camelizeKeyRecursive($servers)]);
foreach ($exchanges as $exchange => $exchangeConfig) {
$exchangeType = $exchangeConfig['type'];
unset($exchangeConfig['type']);
@@ -89,21 +106,42 @@ private function loadAmqpTransport(array $config, ContainerBuilder $container)
$container->setDefinition($definitionId, $amqp);
$definitions[$name] = $definitionId;
- $connection = new DefinitionDecorator('event_band.transport.amqp.connection_definition');
- $connection->setFactoryService($definitionId);
- $connectionId = self::getAmqpConnectionDefinitionId($name);
- $container->setDefinition($connectionId, $connection);
+ $driverPoolStrategy = new Definition('EventBand\Transport\AmqpLib\Pool\Strategy\StrategyInterface');
+ $driverPoolStrategy->setPublic(false);
+ $driverPoolStrategy->addArgument($connectionConfig['connection']['strategy']);
+ $driverPoolStrategy->setFactory([new Reference(sprintf('event_band.transport.amqp.driver.driver_pool.strategy_factory.%s', $config['driver'])), 'create']);
+
+ $driverPool = new DefinitionDecorator(sprintf('event_band.transport.amqp.driver.driver_pool.%s', $config['driver']));
+ $driverPool->replaceArgument(0, $driverPoolStrategy);
+
+ $container->setDefinition(self::getAmqpDriverPoolId($name), $driverPool);
+
+ foreach ($servers as $index => $server) {
+ $connection = new Definition('EventBand\Transport\Amqp\Definition\ConnectionDefinition');
+ $connection->setPublic(false);
+ $connection->addArgument($index);
+ $connection->setFactory([new Reference($definitionId), 'getConnection']);
- $factory = new DefinitionDecorator(sprintf('event_band.transport.amqp.connection_factory.%s', $config['driver']));
- $factory->addMethodCall('setDefinition', [new Reference($connectionId)]);
- $container->setDefinition(self::getAmqpLibConnectionFactoryId($name), $factory);
+ $container->setDefinition(self::getAmqpConnectionDefinitionId($name, $index), $connection);
- $driver = new DefinitionDecorator('event_band.transport.amqp.driver.'.$config['driver']);
- $driver->replaceArgument(0, new Reference($this->getAmqpLibConnectionFactoryId($name)));
- $container->setDefinition($this->getAmqpDriverId($name), $driver);
+ $factory = new DefinitionDecorator(sprintf('event_band.transport.amqp.connection_factory.%s', $config['driver']));
+ $factory->addMethodCall('setDefinition', [new Reference(self::getAmqpConnectionDefinitionId($name, $index))]);
+ $container->setDefinition(self::getAmqpLibConnectionFactoryId($name, $index), $factory);
+
+ $parent = $container->getDefinition('event_band.transport.amqp.driver.'.$config['driver']);
+
+ $driver = new DefinitionDecorator('event_band.transport.amqp.driver.'.$config['driver']);
+ $driver->setClass($parent->getClass());
+ $driver->replaceArgument(0, new Reference(self::getAmqpLibConnectionFactoryId($name, $index)));
+ $container->setDefinition(self::getAmqpDriverId($name, $index), $driver);
+
+ $driverPool->addMethodCall('addDriver', [$driver]);
+ }
+
+ $container->setAlias(self::getAmqpDriverId($name), new Alias(self::getAmqpDriverPoolId($name)));
$configurator = new DefinitionDecorator('event_band.transport.amqp.configurator');
- $configurator->replaceArgument(0, new Reference($this->getAmqpDriverId($name)));
+ $configurator->replaceArgument(0, new Reference(self::getAmqpDriverId($name)));
$container->setDefinition(self::getTypedTransportConfiguratorId('amqp', $name), $configurator);
$container->getDefinition(self::getTransportConfiguratorId())
->addMethodCall('registerConfigurator', ['amqp.'.$name, new Reference(self::getTypedTransportConfiguratorId('amqp', $name))]);
@@ -287,13 +325,22 @@ public static function getTransportDefinitionId($type, $name)
return sprintf('event_band.transport.%s.definition.%s', $type, $name);
}
- private static function getAmqpConnectionDefinitionId($name)
+ private static function getAmqpConnectionDefinitionId($name, $index)
{
- return sprintf('event_band.transport.amqp.connection_definition.%s', $name);
+ return sprintf('event_band.transport.amqp.connection_definition.%s.%d', $name, $index);
}
- public static function getAmqpDriverId($connectionName)
+ public static function getAmqpDriverPoolId($connectionName)
{
+ return sprintf('event_band.transport.amqp.driver_pool.%s', $connectionName);
+ }
+
+ public static function getAmqpDriverId($connectionName, $index = null)
+ {
+ if ($index !== null) {
+ return sprintf('event_band.transport.amqp.connection_driver.%s.%d', $connectionName, $index);
+ }
+
return sprintf('event_band.transport.amqp.connection_driver.%s', $connectionName);
}
@@ -302,9 +349,9 @@ public static function getAmqpConverterId($name)
return sprintf('event_band.transport.amqp.converter.%s', $name);
}
- private static function getAmqpLibConnectionFactoryId($name)
+ private static function getAmqpLibConnectionFactoryId($name, $index)
{
- return sprintf('event_band.transport.amqplib.connection_factory.%s', $name);
+ return sprintf('event_band.transport.amqplib.connection_factory.%s.%d', $name, $index);
}
public static function getTransportConfiguratorId()
diff --git a/Resources/config/transport/amqp/amqp.xml b/Resources/config/transport/amqp/amqp.xml
index ce2951f..52e3664 100644
--- a/Resources/config/transport/amqp/amqp.xml
+++ b/Resources/config/transport/amqp/amqp.xml
@@ -12,8 +12,7 @@
+ abstract="true" public="false">
+ class="EventBand\Transport\AmqpLib\AmqpLibDriver"
+ public="false" abstract="true">
+
+
+
+
+
+
diff --git a/Tests/DependencyInjection/AmqpConfigurationTest.php b/Tests/DependencyInjection/AmqpConfigurationTest.php
index eff5a76..3b5eb86 100644
--- a/Tests/DependencyInjection/AmqpConfigurationTest.php
+++ b/Tests/DependencyInjection/AmqpConfigurationTest.php
@@ -56,20 +56,34 @@ public function connectionOptions()
{
$connections = [
'default' => [
- 'host' => 'def',
- 'port' => '100',
- 'virtual_host' => 'vh',
- 'user' => 'user',
- 'password' => 'pass',
+ 'connection' => [
+ 'servers' => [
+ [
+ 'host' => 'def',
+ 'port' => '100',
+ 'virtual_host' => 'vh',
+ 'user' => 'user',
+ 'password' => 'pass',
+ ]
+ ],
+ 'strategy' => 'RoundRobin',
+ ],
'exchanges' => [],
'queues' => []
],
'foo' => [
- 'host' => 'foo',
- 'port' => '200',
- 'virtual_host' => 'vh_foo',
- 'user' => 'user_foo',
- 'password' => 'pass_foo',
+ 'connection' => [
+ 'servers' => [
+ [
+ 'host' => 'foo',
+ 'port' => '200',
+ 'virtual_host' => 'vh_foo',
+ 'user' => 'user_foo',
+ 'password' => 'pass_foo',
+ ]
+ ],
+ 'strategy' => 'RoundRobin',
+ ],
'exchanges' => [],
'queues' => []
]
diff --git a/Tests/DependencyInjection/EventBandExtensionTest.php b/Tests/DependencyInjection/EventBandExtensionTest.php
index 269bd28..361dcf3 100644
--- a/Tests/DependencyInjection/EventBandExtensionTest.php
+++ b/Tests/DependencyInjection/EventBandExtensionTest.php
@@ -323,7 +323,7 @@ public function amqpPublishers()
$publisher = $this->container->get($id);
$this->assertInstanceOf('EventBand\Transport\Amqp\AmqpPublisher', $publisher);
$definition = $this->container->getDefinition($id);
- $this->assertEquals(EventBandExtension::getAmqpDriverId('con1'), (string) $definition->getArgument(0));
+ $this->assertEquals(EventBandExtension::getAmqpDriverPoolId('con1'), (string) $definition->getArgument(0));
$this->assertEquals(EventBandExtension::getAmqpConverterId('foo'), (string) $definition->getArgument(1));
$this->assertEquals('test', $definition->getArgument(2));
$this->assertEquals(EventBandExtension::getRouterId('router1'), $definition->getArgument(3));
@@ -404,7 +404,7 @@ public function amqpConsumers()
$consumer = $this->container->get($id);
$this->assertInstanceOf('EventBand\Transport\Amqp\AmqpConsumer', $consumer);
$definition = $this->container->getDefinition($id);
- $this->assertEquals(EventBandExtension::getAmqpDriverId('con1'), (string) $definition->getArgument(0));
+ $this->assertEquals(EventBandExtension::getAmqpDriverPoolId('con1'), (string) $definition->getArgument(0));
$this->assertEquals(EventBandExtension::getAmqpConverterId('foo'), (string) $definition->getArgument(1));
$this->assertEquals('test', $definition->getArgument(2));
}
diff --git a/composer.json b/composer.json
index 1ee43be..8d0b8a3 100644
--- a/composer.json
+++ b/composer.json
@@ -21,8 +21,8 @@
"require": {
"php": ">=5.4",
- "event-band/band-framework": "~1.0",
- "event-band/symfony-adapter": "~1.0",
+ "event-band/band-framework": "dev-2.0.x-connectionPool",
+ "event-band/symfony-adapter": "dev-2.0.x-connectionPool",
"symfony/framework-bundle": "2.*, >=2.5"
},
@@ -31,9 +31,9 @@
"che/console-signals": "dev-master",
- "event-band/symfony-adapter": "~1.0@dev",
- "event-band/amqplib-transport": "~1.0@dev",
- "event-band/jms-serializer": "~1.0@dev",
+ "event-band/symfony-adapter": "dev-2.0.x-connectionPool",
+ "event-band/amqplib-transport": "dev-2.0.x-connectionPool",
+ "event-band/jms-serializer": "dev-2.0.x-connectionPool",
"jms/serializer-bundle": "0.12.*@dev",
"jms/aop-bundle": "1.*"
@@ -47,4 +47,4 @@
"jmikola/wildcard-event-dispatcher-bundle": "Simple publisher event configuration with wildcards",
"jms/aop-bundle": "Required for amqp publication data collect"
}
-}
\ No newline at end of file
+}