如何绑定ZeroMQ套接字与棘轮网络套接字库,使实时应用程序的PHP应用程序?

问题描述:

我只是涉及websocket,Ratchet和ZeroMQ这个整个领域的初学者。如何绑定ZeroMQ套接字与棘轮网络套接字库,使实时应用程序的PHP应用程序?

为了我的基本认识:

websocket的东西,有助于创建服务器和客户机之间的连接断开。

Ratchet是一种使用PHP的核心socket函数创建一个PHP插座框架允许我们在PHP套接字编程缓解基于PHP库。

ZeroMQ是一个套接字库,帮助非棘轮应用程序(其他PHP脚本)通过棘轮套接字和网络套接字发送数据。

我下面的棘轮发现关于“世界你好”和“推”,但他们两人的教程似乎是不完整的,只是教你如何只用控制台的工作。我还在github中找到棘轮示例,但它没有正确记录。我正在寻找一个完整的例子(与一个专门的HTML页面和JavaScript)

以下是我工作的代码:这是我正在作出Ajax请求的控制器的方法之一。这种方法将创建一个新的职位(让我们说)。我想通过在ZeroMq的帮助下进行广播/推送,在多个客户端浏览器中动态更新发布列表。

在控制器的方法:

public function create_new_post(){ 
    // ------ 
    // code to create a new post. 
    // ------- 

    // After creating a post 
    $response = [ 
     'new_post_title' => $title, 
     'post_id'   => $id 
    ]; 

    $context = new ZMQContext(); 
    $socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'my pusher'); 
    $socket->connect("tcp://localhost:8000"); 
    $socket->send(json_encode($response)); 

} 

推杆文件:

use Ratchet\ConnectionInterface; 
use Ratchet\Wamp\WampServerInterface; 

class Pusher implements WampServerInterface{ 

    public function onPostEntry($data){ 
     // Data that were sent by ZeroMQ through create_new_post() method 
     $entry_data = json_decode($data);  

     // AND AFTER THIS, I DONT HAVE CLUE OF WHAT TO DO NEXT !!    

    } 
} 

Shell脚本来运行服务器:

require dirname(__DIR__) . '/vendor/autoload.php'; 

$loop = React\EventLoop\Factory::create(); 
$pusher = new MyApp\Pusher; 

// Listen for the web server to make a ZeroMQ push after an ajax request 
$context = new React\ZMQ\Context($loop); 
$pull = $context->getSocket(ZMQ::SOCKET_PULL); 
$pull->bind('tcp://127.0.0.1:8000'); 
$pull->on('message', array($pusher, 'onBidEntry')); 

// Set up our WebSocket server for clients wanting real-time updates 
$webSock = new React\Socket\Server($loop); 
$webSock->listen(8080, '0.0.0.0'); 
$webServer = new Ratchet\Server\IoServer(
    new Ratchet\Http\HttpServer(
     new Ratchet\WebSocket\WsServer(
      new Ratchet\Wamp\WampServer(
       $pusher 
      ) 
     ) 
    ), 
    $webSock 
); 

$loop->run(); 

shell脚本只告诉,将在港口服务8080,但是我怎么会提到我的路线。比方说,我只想在'mysite/allposts'这个页面打开连接。另外,我必须在客户端编写脚本(JavaScript文件)以及如何通过客户端更新特定DOM对象来接收这些新数据。

我跟着你所谈论的例子。他们对我来说似乎并不完整,但我明白你的意思。棘轮是一个服务器端脚本,只是允许你编写一个服务,实现websockets,并能够收听ZMQ消息。您将在命令行上启动您的Ratchet脚本,并将其作为与Apache平行的服务运行。

这是所有独立于的WebSocket的客户端。就像他们推荐的那样,我在客户端使用了Autobahn.js。这个库实现了WAMP协议。它将客户端代码简化到最大。

与您的代码的问题是,class Pusher implements WampServerInterface没有一个public function onPostEntry。这个类必须实现WampServerInterface,这意味着它必须至少有以下功能:

  • onSubscribe(ConnectionInterface $康恩,$话题)
  • onUnSubscribe(ConnectionInterface $康恩,$话题)
  • 的OnOpen( ConnectionInterface $康恩)
  • 的OnClose(ConnectionInterface $康恩)
  • onPublish(ConnectionInterface $康恩,$主题,$事件,数组$排除,数组$资格
  • 的onError(ConnectionInterface $康恩,\ $异常E)
  • onZMQMessage($ jsondata)

可以有其他更先进的功能,如荷兰国际集团call客户端上的远程过程。

在发送器侧(ZMQ消息),把这个代码:

$zmq = new ZMQWrapper; 
$zqm->publish('posts', $response); 

class ZMQWrapper { 
    function __construct(){ 
     $this->context = new ZMQContext(); 
     $this->socket = $this->context->getSocket(ZMQ::SOCKET_PUSH); 
     $this->socket->setSockOpt(ZMQ::SOCKOPT_LINGER, 500); 
     $this->socket->connect("tcp://127.0.0.1:" . ZMQ_PORT); 
    } 
    function publish($topic, $msg){ 
     $data = ['topic' => "mb.$topic", 'msg' => $msg]; 
     $this->socket->send(json_encode($data), ZMQ::MODE_DONTWAIT); 
    } 
} 

在推动器文件放像成才:

public function onSubscribe(ConnectionInterface $conn, $topic) { 
    $log = $this->getLogger(); 
    $topicId = $topic->getId(); 
    $log->info(sprintf('A client subscribed to %s', $topicId)); 
    // you could broadcast that user x joined the discussion 
} 
public function onUnSubscribe(ConnectionInterface $conn, $topic) { 
    $log = $this->getLogger(); 
    $topicId = $topic->getId(); 
    $log->info(sprintf('A client unsubscribed from %s', $topicId)); 
    // you could broadcast that user x leaved the discussion 
} 
public function onOpen(ConnectionInterface $conn) { 
    $log = $this->getLogger(); 
    $log->info(sprintf('Client %d connected', $conn->resourceId)); 
    $this->clients[$conn->resourceId] = array(); // this will allow you to save state information of the client, you can modify in onSubscribe and onUnsubscribe 
    // clients will contain the list of all clients 
} 
public function onClose(ConnectionInterface $conn) { 
    $log = $this->getLogger(); 
    $log->info(sprintf('Client %d disconnected', $conn->resourceId)); 
    // you could broadcast that user x leaved the discussion 
} 
public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) { 
    $log = $this->getLogger(); 
    $topicId = $topic->getId(); 
    $log->info(sprintf('Client %d published to %s : %s', $conn->resourceId, $topicId, json_encode($event))); 
    foreach($topic->getIterator() as $peer){ 
     if(!in_array($peer->WAMP->sessionId, $exclude)){ 
      $peer->event($topicId, $event); 
     } 
    } 
} 

最后是在客户机上。如果用户打开页面mysite/allposts,则在JavaScript中包含autobahn.js。 websocket将在变量ab下提供。然后,您这样做:

当打开页面:

var currentSession; 
ab.connect(
    Paths.ws, 
    function(session) { // onconnect 
     currentSession = session 
     onWsConnect(session) 
    }, 
    function(code, reason, detail) {// onhangup 
     onWsDisconnect(code, reason, detail) 
    },{ 
     maxRetries: 60, 
     retryDelay: 2000, 
     skipSubprotocolCheck: true 
    } 
) 
currentSession.subscribe('posts', onPostReceived) 

function onPostReceived(topic, message){ 
    //display the new post 
} 

当关闭的页面:

currentSession.unsubscribe(topic) 

你注意,我一直都非常一般。这允许我有几种类型的消息由同一个系统处理。 ZMQ消息和currentSession.subscribe的参数有什么不同。

我的实现,我也跟踪打开连接的登录用户,但我剥夺了这部分代码。

我希望这会帮助你。

+0

(untaggable)我知道的一件重要事情是'Ratchet是一个能够收听ZMQ消息的服务器端脚本'。在概念上我明白这一点。但我完全不明白它是如何以技术方式(编码)工作的。我认为在控制器的方法中(在上述问题中)正在创建ZMQ消息,该消息正在作为Json编码消息传输给Ratchet。在那里 !我不明白棘轮在哪里以及如何收到这些编码信息以及棘轮如何知道该方法已经运行。代码的哪一部分可以做到这一点? –

+1

最后我让我的应用程序与棘轮工作,虽然这次我没有使用ZMQ。我的应用程序很小,所以这次选择不使用ZMQ。你给出的关于autobahn.js与web-socket一致性的解释是非常有用的。如果我的应用程序变得更大,我将在未来再次提及这个答案,当我必须在我的应用程序上使用ZMQ时。 –