diff --git a/src/Pools/Connection.php b/src/Pools/Connection.php index 3d2ba4d..0da6bef 100644 --- a/src/Pools/Connection.php +++ b/src/Pools/Connection.php @@ -16,6 +16,8 @@ class Connection */ protected ?Pool $pool = null; + protected float $checkedOutAt = 0; + /** * @param TResource $resource */ @@ -23,6 +25,27 @@ public function __construct(protected mixed $resource) { } + /** + * Mark the connection as checked out (record timestamp) + * + * @return $this + */ + public function markCheckedOut(): static + { + $this->checkedOutAt = microtime(true); + return $this; + } + + /** + * Get the timestamp when this connection was checked out + * + * @return float + */ + public function getCheckedOutAt(): float + { + return $this->checkedOutAt; + } + /** * @return string */ diff --git a/src/Pools/Pool.php b/src/Pools/Pool.php index 541d91e..781c0ed 100644 --- a/src/Pools/Pool.php +++ b/src/Pools/Pool.php @@ -44,6 +44,14 @@ class Pool */ protected int $synchronizedTimeout = 3; // seconds + /** + * Maximum time (seconds) a connection can be checked out before being + * considered leaked. 0 means disabled (no stale detection). + * + * @var int + */ + protected int $maxUseTime = 0; + protected PoolAdapter $pool; /** @@ -189,6 +197,26 @@ public function getSynchronizationTimeout(): int return $this->synchronizedTimeout; } + /** + * Set the maximum time a connection can be checked out before being + * considered leaked. When the pool is exhausted, connections exceeding + * this time will have their slots reclaimed, allowing new connections + * to be created. + * + * @param int $seconds Maximum use time in seconds. 0 to disable. + * @return $this + */ + public function setMaxUseTime(int $seconds): static + { + $this->maxUseTime = $seconds; + return $this; + } + + public function getMaxUseTime(): int + { + return $this->maxUseTime; + } + /** * @param Telemetry $telemetry * @return $this @@ -272,6 +300,7 @@ public function pop(): Connection if ($shouldCreateConnections) { try { $connection = $this->createConnection(); + $connection->markCheckedOut(); $this->pool->synchronized(function () use ($connection) { $this->active[$connection->getID()] = $connection; }); @@ -289,6 +318,16 @@ public function pop(): Connection $connection = $this->pool->pop($this->getSynchronizationTimeout()); if ($connection === false || $connection === null) { + // Before giving up, try to recover slots from leaked connections + if ($this->maxUseTime > 0) { + $recovered = $this->recoverStaleConnections(); + if ($recovered > 0) { + // Slots freed, don't count this as a retry attempt + $attempts--; + continue; + } + } + if ($attempts >= $this->getRetryAttempts()) { $activeCount = count($this->active); $idleCount = $this->pool->count(); @@ -300,6 +339,7 @@ public function pop(): Connection sleep($this->getRetrySleep()); } else { if ($connection instanceof Connection) { + $connection->markCheckedOut(); $this->pool->synchronized(function () use ($connection) { $this->active[$connection->getID()] = $connection; }); @@ -466,6 +506,31 @@ public function isFull(): bool return count($this->active) === 0; } + /** + * Detect and recover slots from connections that have been checked out + * longer than maxUseTime (likely leaked). Returns the number of + * recovered slots. + * + * @return int Number of recovered connection slots + */ + private function recoverStaleConnections(): int + { + $now = microtime(true); + $recovered = 0; + + return $this->pool->synchronized(function () use ($now, &$recovered): int { + foreach ($this->active as $id => $connection) { + $checkedOutAt = $connection->getCheckedOutAt(); + if ($checkedOutAt > 0 && ($now - $checkedOutAt) > $this->maxUseTime) { + unset($this->active[$id]); + $this->connectionsCreated--; + $recovered++; + } + } + return $recovered; + }); + } + private function recordPoolTelemetry(): void { $activeConnections = count($this->active); diff --git a/tests/Pools/Scopes/PoolTestScope.php b/tests/Pools/Scopes/PoolTestScope.php index c40a77b..2d06135 100644 --- a/tests/Pools/Scopes/PoolTestScope.php +++ b/tests/Pools/Scopes/PoolTestScope.php @@ -384,6 +384,75 @@ public function testUseReclainsConnectionOnCallbackException(): void }); } + public function testPoolRecoversStaleConnections(): void + { + $this->execute(function (): void { + $pool = new Pool($this->getAdapter(), 'test-stale', 2, fn () => 'resource'); + $pool->setRetryAttempts(1); + $pool->setRetrySleep(0); + $pool->setMaxUseTime(1); // 1 second max use time + + // Pop both connections to exhaust the pool + $conn1 = $pool->pop(); + $conn2 = $pool->pop(); + + $this->assertSame(0, $pool->count()); + + // Simulate time passing by backdating the checkout timestamp + // so connections appear stale + $reflection = new \ReflectionProperty($conn1, 'checkedOutAt'); + $reflection->setValue($conn1, microtime(true) - 2); // 2 seconds ago + $reflection = new \ReflectionProperty($conn2, 'checkedOutAt'); + $reflection->setValue($conn2, microtime(true) - 2); // 2 seconds ago + + // Without stale recovery, this would throw "Pool is empty" + // With stale recovery, it should detect the leaked connections, + // free their slots, and create a new connection + $conn3 = $pool->pop(); + $this->assertSame('resource', $conn3->getResource()); + }); + } + + public function testPoolDoesNotRecoverNonStaleConnections(): void + { + $this->execute(function (): void { + $pool = new Pool($this->getAdapter(), 'test-no-stale', 2, fn () => 'resource'); + $pool->setRetryAttempts(1); + $pool->setRetrySleep(0); + $pool->setMaxUseTime(60); // 60 seconds - connections won't be stale + + // Pop both connections + $conn1 = $pool->pop(); + $conn2 = $pool->pop(); + + $this->assertSame(0, $pool->count()); + + // Connections are not stale (just checked out), so pool should still throw + $this->expectException(Exception::class); + $this->expectExceptionMessage("Pool 'test-no-stale' is empty"); + $pool->pop(); + }); + } + + public function testPoolStaleRecoveryDisabledByDefault(): void + { + $this->execute(function (): void { + $pool = new Pool($this->getAdapter(), 'test-disabled', 1, fn () => 'resource'); + $pool->setRetryAttempts(1); + $pool->setRetrySleep(0); + // maxUseTime defaults to 0 (disabled) + + $conn1 = $pool->pop(); + + // Even if we backdate the checkout time, stale recovery shouldn't trigger + $reflection = new \ReflectionProperty($conn1, 'checkedOutAt'); + $reflection->setValue($conn1, microtime(true) - 100); + + $this->expectException(Exception::class); + $pool->pop(); + }); + } + public function testPoolTelemetry(): void { $this->execute(function (): void {