It looks like you can still use socket_get_status if you just make a little addition in the package itself.
These two functions are in the streamreader class, a socket handler is available here.
public function consume(callable $handler) { $this->running = true; while ($this->running) { /// while $this->running is true socket will try to reconnect always. $this->consumeOnce($handler); } } protected function consumeOnce(callable $handler) { $this->connection = $this->connect(); $lastStreamActivity = time(); $this->connection->read(function ($tweet) use (&$lastStreamActivity, $handler) { $idle = (time() - $lastStreamActivity); $this->monitor->stat('max_idle_time', $idle); $this->monitor->stat('idle_time', $idle); $this->monitor->stat('tweets', 1); $lastStreamActivity = time(); call_user_func($handler, $tweet, $this->monitor); }); $this->connection->close(); }
In the connection class, you have a socket handler so that you can capture the status of the socket when you try to read data from the socket. Below is a slightly modified read function
public function read(callable $callback, $timeout = 5) { $this->pool = [$this->connection]; stream_set_timeout($this->connection, $timeout); $info = stream_get_meta_data($this->connection); while ($this->connection !== null && !feof($this->connection) && stream_select($this->pool, $fdw, $fde, $timeout) !== false && $info['timed_out']!==true) { // @todo safeguard no tweets but connection OK. (reconnect) $this->pool = [$this->connection]; $chunkInfo = trim(fgets($this->connection)); if (!$chunkInfo) { continue; } $len = hexdec($chunkInfo) + 2; $streamInput = ''; while (!feof($this->connection)) { $streamInput .= fread($this->connection, $len-strlen($streamInput)); if (strlen($streamInput)>=$len) { break; } } $this->buffer .= substr($streamInput, 0, -2); $data = json_decode($this->buffer, true); if ($data) { call_user_func($callback, $data); $this->buffer = ''; } } }
Peter Darmis
source share