Google Cloud Pubsub数据丢失

问题描述:

我遇到了GCP pubsub的问题,在几秒钟内发布数千条消息时,一小部分数据丢失。Google Cloud Pubsub数据丢失

我登录既message_id从发布订阅和session_id独特到两者上出版端以及接收端的每个消息,并且我看到的结果是,在接收端的一些消息具有相同session_id,但不同message_id。此外,一些消息丢失。

例如,在一次测试中,我发送了5,000条消息到pubsub,并且正好收到了5,000条消息,丢失了8条消息。日志丢失的邮件是这样的:

MISSING sessionId:sessionId: 731 (missing in log from pull request, but present in log from Flask API) 

messageId FOUND: messageId:108562396466545 

API: 200 **** sessionId: 731, messageId:108562396466545 ******(Log from Flask API) 

Pubsub: sessionId: 730, messageId:108562396466545(Log from pull request) 

而且重复的样子:

======= Duplicates FOUND on sessionId: 730======= 

sessionId: 730, messageId:108562396466545 

sessionId: 730, messageId:108561339282318 

(both are logs from pull request) 

所有丢失的数据,并重复这个样子。

从上面的例子可以看出,有些消息已经采取了其他消息的message_id,并且已经用两个不同的message_id发送了两次。

我想知道有没有人会帮我弄清楚发生了什么事?提前致谢。

代码

我有一个API发送消息的PubSub,它看起来像这样:

from flask import Flask, request, jsonify, render_template 
from flask_cors import CORS, cross_origin 
import simplejson as json 
from google.cloud import pubsub 
from functools import wraps 
import re 
import json 


app = Flask(__name__) 
ps = pubsub.Client() 

... 

@app.route('/publish', methods=['POST']) 
@cross_origin() 
@json_validator 
def publish_test_topic(): 
    pubsub_topic = 'test_topic' 
    data = request.data 

    topic = ps.topic(pubsub_topic) 

    event = json.loads(data) 

    messageId = topic.publish(data) 
    return '200 **** sessionId: ' + str(event["sessionId"]) + ", messageId:" + messageId + " ******" 

这是我以前从发布 - 订阅阅读代码:

从谷歌.cloud import pubsub import re import json

ps = pubsub.Client() 
topic = ps.topic('test-xiu') 
sub = topic.subscription('TEST-xiu') 

max_messages = 1 
stop = False 

messages = [] 

class Message(object): 
    """docstring for Message.""" 
    def __init__(self, sessionId, messageId): 
     super(Message, self).__init__() 
     self.seesionId = sessionId 
     self.messageId = messageId 


def pull_all(): 
    while stop == False: 

     m = sub.pull(max_messages = max_messages, return_immediately = False) 

     for data in m: 
      ack_id = data[0] 
      message = data[1] 
      messageId = message.message_id 
      data = message.data 
      event = json.loads(data) 
      sessionId = str(event["sessionId"]) 
      messages.append(Message(sessionId = sessionId, messageId = messageId)) 

      print '200 **** sessionId: ' + sessionId + ", messageId:" + messageId + " ******" 

      sub.acknowledge(ack_ids = [ack_id]) 

pull_all() 

对于产生的session_id,从发送API请求&测井响应:

// generate trackable sessionId 
var sessionId = 0 

var increment_session_id = function() { 
    sessionId++; 
    return sessionId; 
} 

var generate_data = function() { 
    var data = {}; 
    // data.sessionId = faker.random.uuid(); 
    data.sessionId = increment_session_id(); 
    data.user = get_rand(userList); 
    data.device = get_rand(deviceList); 
    data.visitTime = new Date; 
    data.location = get_rand(locationList); 
    data.content = get_rand(contentList); 

    return data; 
} 

var sendData = function (url, payload) { 
    var request = $.ajax({ 
    url: url, 
    contentType: 'application/json', 
    method: 'POST', 
    data: JSON.stringify(payload), 
    error: function (xhr, status, errorThrown) { 
     console.log(xhr, status, errorThrown); 
     $('.result').prepend("<pre id='json'>" + JSON.stringify(xhr, null, 2) + "</pre>") 
     $('.result').prepend("<div>errorThrown: " + errorThrown + "</div>") 
     $('.result').prepend("<div>======FAIL=======</div><div>status: " + status + "</div>") 
    } 
    }).done(function (xhr) { 
    console.log(xhr); 
    $('.result').prepend("<div>======SUCCESS=======</div><pre id='json'>" + JSON.stringify(payload, null, 2) + "</pre>") 
    }) 
} 

$(submit_button).click(function() { 
    var request_num = get_request_num(); 
    var request_url = get_url(); 
    for (var i = 0; i < request_num; i++) { 
    var data = generate_data(); 
    var loadData = changeVerb(data, 'load'); 
    sendData(request_url, loadData); 
    } 
}) 

UPDATE

我做的API的变化,这个问题似乎消失。我所做的更改是不是使用一个pubsub.Client()所有要求,我初始化客户端就进来了,每一个请求新的API看起来像:

from flask import Flask, request, jsonify, render_template 
from flask_cors import CORS, cross_origin 
import simplejson as json 
from google.cloud import pubsub 
from functools import wraps 
import re 
import json 


app = Flask(__name__) 

... 

@app.route('/publish', methods=['POST']) 
@cross_origin() 
@json_validator 
def publish_test_topic(): 

    ps = pubsub.Client() 


    pubsub_topic = 'test_topic' 
    data = request.data 

    topic = ps.topic(pubsub_topic) 

    event = json.loads(data) 

    messageId = topic.publish(data) 
    return '200 **** sessionId: ' + str(event["sessionId"]) + ", messageId:" + messageId + " ******" 
+0

根据你的代码,你两次发布的每封邮件。这只是一个错字,或者你是否确实多次发布每封邮件? –

+0

良好的捕获,这是一个错字。每个请求只发送一次。现在已修复 –

+0

这是另一个错字吗? self.seesionId = sessionId。它应该是self.sessionId吗? –

曾与谷歌从一些家伙,这似乎是与Python客户端的问题:

在我们身边的共识是,没有在当前的Python客户端线程安全问题。当我们说话时,客户端库几乎从头开始被重写,所以我不想在当前版本中进行任何修复。我们预计新版本将在6月底之前上市。

在app.yaml中运行带有thread_safe:false的当前代码或更好,但只是在每个调用中实例化客户端应该是解决方法 - 您找到的解决方案。

对于详细的解决方案,请参阅更新在这个问题

谷歌Cloud发布/订阅消息ID都是唯一的。它不应该是可能的“一些消息[采取另一个消息的message_id」。消息ID 108562396466545似乎收到的事实意味着Pub/Sub确实将消息传递给订阅者并且没有丢失。

我建议你检查你的session_id是如何生成的,以确保它们确实是唯一的,并且每个消息只有一个。通过正则表达式搜索在您的JSON中搜索sessionId看起来有点奇怪。将这个JSON解析为一个实际的对象并以这种方式访问​​字段会更好。

通常,Cloud Pub/Sub中的重复邮件总是可能的;该系统保证至少一次交付。如果复制发生在订阅方(例如,不及时处理ack)或具有不同的消息ID(例如,如果消息的发布在类似错误之后重试超过截止日期)。

+0

谢谢你的回应。 'session_id'是每个消息使用递增数字生成器唯一生成的,我检查App Engine中的日志并确认每个发送的'session_id'都是唯一的。我也使用JSON解析器运行实验,并且我仍然观察到相同的行为。我知道pubsub中的重复消息,我根本不担心它们。这是缺少的消息,以及重复正在使我感到担忧的'message_id'丢失消息。 –

+0

假设self.seesionId = sessionId的拼写错误不负责,我建议您查看变量的范围。发布端或订阅端是否有messageId和sessionId全局变量?如果是这样,可以通过并发调用pull_all或publish_test_topic来覆盖它们。看起来消息是一个全局变量,所以任何并发的pull_all调用都可能导致争用情况。 –

+0

既不是messageId也不是sessionId是全局的。 'messages'是全局的,但不能在任何地方使用。订阅方不在多线程上,所以在那里不应该有任何竞争条件。我可以看到的唯一可能的竞争条件是在API方面,因为App Engine可能有多个实例来处理流量,所以有几个发布请求可能会同时发送到pubsub。 –

您不应该为每个发布操作创建一个新客户端。我敢打赌,“解决问题”的原因是因为它缓解了出版商客户端存在的竞争。我也不相信你已经在发布商端显示的日志行:

API:200 ****的sessionId:731,邮件ID:108562396466545 ******

相当于publish_test_topic()成功发布sessionId 731。在什么条件下打印日志行?迄今为止提出的代码没有显示这一点。

+0

'publish_test_topic()'方法返回'return'200 **** sessionId:'+ str(event [“sessionId”])+“,messageId:”+ messageId +“******”',返回到应用程序的前端,作为每个每个请求对'/ publish'的响应,并且一旦接收到它就立即打印在控制台中。 –

+0

添加了用于记录该消息的代码。 –