Google Cloud Pubsub数据丢失
我遇到了GCP pubsub的问题,在几秒钟内发布数千条消息时,一小部分数据丢失。Google Cloud Pubsub数据丢失
我登录既message_id
从发布订阅和session_id
独特到两者上出版端以及接收端的每个消息,并且我看到的结果是,在接收端的一些消息具有相同session_id
,但不同message_id
。此外,一些消息丢失。
例如,在一次测试中,我发送了5,000条消息到pubsub,并且正好收到了5,000条消息,丢失了8条消息。日志丢失的邮件是这样的:
MISSING sessionId:sessionId: 731 (missing in log from pull request, but present in log from Flask API)
messageId FOUND: messageId:108562396466545
API: 200 **** sessionId: 731, messageId:108562396466545 ******(Log from Flask API)
Pubsub: sessionId: 730, messageId:108562396466545(Log from pull request)
而且重复的样子:
======= Duplicates FOUND on sessionId: 730=======
sessionId: 730, messageId:108562396466545
sessionId: 730, messageId:108561339282318
(both are logs from pull request)
所有丢失的数据,并重复这个样子。
从上面的例子可以看出,有些消息已经采取了其他消息的message_id
,并且已经用两个不同的message_id
发送了两次。
我想知道有没有人会帮我弄清楚发生了什么事?提前致谢。
代码
我有一个API发送消息的PubSub,它看起来像这样:
from flask import Flask, request, jsonify, render_template
from flask_cors import CORS, cross_origin
import simplejson as json
from google.cloud import pubsub
from functools import wraps
import re
import json
app = Flask(__name__)
ps = pubsub.Client()
...
@app.route('/publish', methods=['POST'])
@cross_origin()
@json_validator
def publish_test_topic():
pubsub_topic = 'test_topic'
data = request.data
topic = ps.topic(pubsub_topic)
event = json.loads(data)
messageId = topic.publish(data)
return '200 **** sessionId: ' + str(event["sessionId"]) + ", messageId:" + messageId + " ******"
这是我以前从发布 - 订阅阅读代码:
从谷歌.cloud import pubsub import re import json
ps = pubsub.Client()
topic = ps.topic('test-xiu')
sub = topic.subscription('TEST-xiu')
max_messages = 1
stop = False
messages = []
class Message(object):
"""docstring for Message."""
def __init__(self, sessionId, messageId):
super(Message, self).__init__()
self.seesionId = sessionId
self.messageId = messageId
def pull_all():
while stop == False:
m = sub.pull(max_messages = max_messages, return_immediately = False)
for data in m:
ack_id = data[0]
message = data[1]
messageId = message.message_id
data = message.data
event = json.loads(data)
sessionId = str(event["sessionId"])
messages.append(Message(sessionId = sessionId, messageId = messageId))
print '200 **** sessionId: ' + sessionId + ", messageId:" + messageId + " ******"
sub.acknowledge(ack_ids = [ack_id])
pull_all()
对于产生的session_id,从发送API请求&测井响应:
// generate trackable sessionId
var sessionId = 0
var increment_session_id = function() {
sessionId++;
return sessionId;
}
var generate_data = function() {
var data = {};
// data.sessionId = faker.random.uuid();
data.sessionId = increment_session_id();
data.user = get_rand(userList);
data.device = get_rand(deviceList);
data.visitTime = new Date;
data.location = get_rand(locationList);
data.content = get_rand(contentList);
return data;
}
var sendData = function (url, payload) {
var request = $.ajax({
url: url,
contentType: 'application/json',
method: 'POST',
data: JSON.stringify(payload),
error: function (xhr, status, errorThrown) {
console.log(xhr, status, errorThrown);
$('.result').prepend("<pre id='json'>" + JSON.stringify(xhr, null, 2) + "</pre>")
$('.result').prepend("<div>errorThrown: " + errorThrown + "</div>")
$('.result').prepend("<div>======FAIL=======</div><div>status: " + status + "</div>")
}
}).done(function (xhr) {
console.log(xhr);
$('.result').prepend("<div>======SUCCESS=======</div><pre id='json'>" + JSON.stringify(payload, null, 2) + "</pre>")
})
}
$(submit_button).click(function() {
var request_num = get_request_num();
var request_url = get_url();
for (var i = 0; i < request_num; i++) {
var data = generate_data();
var loadData = changeVerb(data, 'load');
sendData(request_url, loadData);
}
})
UPDATE
我做的API的变化,这个问题似乎消失。我所做的更改是不是使用一个pubsub.Client()
所有要求,我初始化客户端就进来了,每一个请求新的API看起来像:
from flask import Flask, request, jsonify, render_template
from flask_cors import CORS, cross_origin
import simplejson as json
from google.cloud import pubsub
from functools import wraps
import re
import json
app = Flask(__name__)
...
@app.route('/publish', methods=['POST'])
@cross_origin()
@json_validator
def publish_test_topic():
ps = pubsub.Client()
pubsub_topic = 'test_topic'
data = request.data
topic = ps.topic(pubsub_topic)
event = json.loads(data)
messageId = topic.publish(data)
return '200 **** sessionId: ' + str(event["sessionId"]) + ", messageId:" + messageId + " ******"
曾与谷歌从一些家伙,这似乎是与Python客户端的问题:
在我们身边的共识是,没有在当前的Python客户端线程安全问题。当我们说话时,客户端库几乎从头开始被重写,所以我不想在当前版本中进行任何修复。我们预计新版本将在6月底之前上市。
在app.yaml中运行带有thread_safe:false的当前代码或更好,但只是在每个调用中实例化客户端应该是解决方法 - 您找到的解决方案。
对于详细的解决方案,请参阅更新在这个问题
谷歌Cloud发布/订阅消息ID都是唯一的。它不应该是可能的“一些消息[采取另一个消息的message_id
」。消息ID 108562396466545似乎收到的事实意味着Pub/Sub确实将消息传递给订阅者并且没有丢失。
我建议你检查你的session_id
是如何生成的,以确保它们确实是唯一的,并且每个消息只有一个。通过正则表达式搜索在您的JSON中搜索sessionId看起来有点奇怪。将这个JSON解析为一个实际的对象并以这种方式访问字段会更好。
通常,Cloud Pub/Sub中的重复邮件总是可能的;该系统保证至少一次交付。如果复制发生在订阅方(例如,不及时处理ack)或具有不同的消息ID(例如,如果消息的发布在类似错误之后重试超过截止日期)。
谢谢你的回应。 'session_id'是每个消息使用递增数字生成器唯一生成的,我检查App Engine中的日志并确认每个发送的'session_id'都是唯一的。我也使用JSON解析器运行实验,并且我仍然观察到相同的行为。我知道pubsub中的重复消息,我根本不担心它们。这是缺少的消息,以及重复正在使我感到担忧的'message_id'丢失消息。 –
假设self.seesionId = sessionId的拼写错误不负责,我建议您查看变量的范围。发布端或订阅端是否有messageId和sessionId全局变量?如果是这样,可以通过并发调用pull_all或publish_test_topic来覆盖它们。看起来消息是一个全局变量,所以任何并发的pull_all调用都可能导致争用情况。 –
既不是messageId也不是sessionId是全局的。 'messages'是全局的,但不能在任何地方使用。订阅方不在多线程上,所以在那里不应该有任何竞争条件。我可以看到的唯一可能的竞争条件是在API方面,因为App Engine可能有多个实例来处理流量,所以有几个发布请求可能会同时发送到pubsub。 –
您不应该为每个发布操作创建一个新客户端。我敢打赌,“解决问题”的原因是因为它缓解了出版商客户端存在的竞争。我也不相信你已经在发布商端显示的日志行:
API:200 ****的sessionId:731,邮件ID:108562396466545 ******
相当于publish_test_topic()成功发布sessionId 731。在什么条件下打印日志行?迄今为止提出的代码没有显示这一点。
'publish_test_topic()'方法返回'return'200 **** sessionId:'+ str(event [“sessionId”])+“,messageId:”+ messageId +“******”',返回到应用程序的前端,作为每个每个请求对'/ publish'的响应,并且一旦接收到它就立即打印在控制台中。 –
添加了用于记录该消息的代码。 –
根据你的代码,你两次发布的每封邮件。这只是一个错字,或者你是否确实多次发布每封邮件? –
良好的捕获,这是一个错字。每个请求只发送一次。现在已修复 –
这是另一个错字吗? self.seesionId = sessionId。它应该是self.sessionId吗? –