使用ResultSetFuture修改集合
问题描述:
我尝试使用 java-driver-async-queries执行异步查询。我正在修改FutureCallback中的列表,但似乎不起作用 -使用ResultSetFuture修改集合
List<Product> products = new ArrayList<Product>();
for (// iterating over a Map) {
key = entry.getKey();
String query = "SELECT id,desc,category FROM products where id=?";
ResultSetFuture future = session.executeAsync(query, key);
Futures.addCallback(future,
new FutureCallback<ResultSet>() {
@Override public void onSuccess(ResultSet result) {
Row row = result.one();
if (row != null) {
Product product = new Product();
product.setId(row.getString("id"));
product.setDesc(row.getString("desc"));
product.setCategory(row.getString("category"));
products.add(product);
}
}
@Override public void onFailure(Throwable t) {
// log error
}
},
MoreExecutors.sameThreadExecutor()
);
}
System.out.println("Product List : " + products); // Not printing correct values. Sometimes print blank
是否有其他方法?
基于米哈伊尔Baksheev答案我执行,现在得到正确的结果。 只是一个转折点。我需要实现一些额外的逻辑。我想知道如果我可以使用List<MyClass>
代替List<ResultSetFuture>
和MyClass的作为 -
public class MyClass {
private Integer productCount;
private Integer stockCount;
private ResultSetFuture result;
}
然后同时迭代设置FutureList
为 -
ResultSetFuture result = session.executeAsync(query, key.get());
MyClass allResult = new MyClass();
allResult.setInCount(inCount);
allResult.setResult(result);
allResult.setSohCount(values.size() - inCount);
futuresList.add(allResult);
答
正如@RussS提到,是不是等待所有期货交易代码完成。
有许多方法可以同步异步代码。例如,使用CountDownLatch:
编辑: 也请使用separte线程回调和使用并发收集产品。
ConcurrentLinkedQueue<Product> products = new ConcurrentLinkedQueue<Product>();
final Executor callbackExecutor = Executors.newSingleThreadExecutor();
final CountDownLatch doneSignal = new CountDownLatch(/*the Map size*/);
for (// iterating over a Map) {
key = entry.getKey();
String query = "SELECT id,desc,category FROM products where id=?";
ResultSetFuture future = session.executeAsync(query, key);
Futures.addCallback(future,
new FutureCallback<ResultSet>() {
@Override public void onSuccess(ResultSet result) {
Row row = result.one();
if (row != null) {
Product product = new Product();
product.setId(row.getString("id"));
product.setDesc(row.getString("desc"));
product.setCategory(row.getString("category"));
products.add(product);
}
doneSignal.countDown();
}
@Override public void onFailure(Throwable t) {
// log error
doneSignal.countDown();
}
},
callbackExecutor
);
}
doneSignal.await(); // wait for all async requests to finish
System.out.println("Product List : " + products);
另一种方法是收集所有期货清单,然后等待所有结果作为一个未来番石榴的Futures.allAsList,如:
List<ResultSetFuture> futuresList = new ArrayList<>(/*Map size*/);
for (/* iterating over a Map*/) {
key = entry.getKey();
String query = "SELECT id,desc,category FROM products where id=?";
futuresList.add(session.executeAsync(query, key));
}
ListenableFuture<List<ResultSet>> allFuturesResult = Futures.allAsList(futuresList);
List<Product> products = new ArrayList<>();
try {
final List<ResultSet> resultSets = allFuturesResult.get();
for (ResultSet rs : resultSets) {
if (null != rs) {
Row row = rs.one();
if (row != null) {
Product product = new Product();
product.setId(row.getString("id"));
product.setDesc(row.getString("desc"));
product.setCategory(row.getString("category"));
products.add(product);
}
}
}
} catch (InterruptedException | ExecutionException e) {
System.out.println(e);
}
System.out.println("Product List : " + products);
EDIT 2
我想知道如果我可以使用List而不是List和MyClass作为
技术上是可以的,但你不能在这种情况下,通过在Futures.allAsList
List<MyClass>
或MyClass
应该实现ListenableFuture
接口
什么是“不工作”的定义是什么?发布预期行为和实际行为。 –
你是不是在等你的未来?看起来你创建了一堆期货,并立即打印结果结构。结果结构尚未填充,因为大部分期货仍在运行。 – RussS
谢谢。什么是修正。有没有我可以参考的代码示例? – Saurabh