使用Spring Boot处理Twitter feed

使用Spring Boot处理Twitter feed 在我以前的一篇文章中,我已经讨论了如何使用Apache Storm处理Twitter示例提要,现在,我们将逐步完成创建示例Spring Boot应用程序的步骤,该应用程序使用Spring Social Twitter从Twitter示例提要中读取消息框架和写入数据的Neo4j使用弹簧数据Neo4j的

整个项目都可以在Github上找到,网址为https://github.com/davidkiss/twitter-keyword-graph ,但是我们将在这里一步一步地进行讨论。

什么是Spring Boot?

如果您想知道Spring Boot可能是什么,它是Spring堆栈的最新成员之一,它建立在Spring框架之上。 Spring Boot将软件开发的生产力提高到一个新的水平,同时还提供一些现成的生产工具(指标,运行状况检查,外部配置以及与数据库重构工具liquibase的集成)。

配置应用程序

所有与应用程序相关的配置都存储在src / main / resources / application.properties文件中,该文件必须从同一文件夹中的模板application-template.properties文件创建。 确保使用您自己的配置值更新属性文件以连接到Twitter Api( https://twittercommunity.com/t/how-to-get-my-api-key/7033 )。

neo4j.uri属性用于设置与Neo4J服务器的连接详细信息。

twitterProcessing.enabled属性设置为false将禁用对Twitter feed的处理,而我们仍然可以查询应用程序的REST api以获取已处理的数据。

taskExecutor.xyz属性用于TaskExecutorPool,在此我们配置了一个工作池,该工作池将并行处理来自Twitter feed的推文。

Spring Boot可以使用其注释来创造奇迹,它有助于通过几行代码来启动和运行Web应用程序。 有关如何使用application.properties配置文件将Neo4J和Twitter客户端连接在一起的信息 ,请参阅ApplicationNeo4JConfigTwitterConfigTaskExcutorConfig类。

阅读来自Twitter feed的消息

TwitterStreamIngester服务类使用Spring Social Twitter为Twitter示例提要设置了一个侦听器。 基于为TaskExecutor配置的工作程序数量,应用程序创建TweetProcessor类的多个实例,这些实例将异步并行处理tweet(如果启用了处理)。

异步处理使用Spring注入的BlockingQueueThreadPoolTask​​Executor bean完成。 如果推文的处理速度慢于传入推文的速率,则应用程序将删除新的推文(请参阅BlockingQueue#offer()方法),直到赶上为止。

这是从提要中读取消息并将其放入TwitterStreamIngester中的队列的代码:

public void run() {
        List<StreamListener> listeners = new ArrayList<>();
        listeners.add(this);
        twitter.streamingOperations().sample(listeners);
    }

    @PostConstruct
    public void afterPropertiesSet() throws Exception {
        if (processingEnabled) {
            for (int i = 0; i < taskExecutor.getMaxPoolSize(); i++) {
                taskExecutor.execute(new TweetProcessor(graphService, queue));
            }

            run();
        }
    }

    @Override
    public void onTweet(Tweet tweet) {
        queue.offer(tweet);
    }

这是TweetProcessor类中用于处理来自队列的消息的代码:

@Override
    public void run() {
        while (true) {
            try {
                Tweet tweet = queue.take();
                processTweet(tweet);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

与Neo4J数据库交谈

该应用程序需要运行独立的Neo4j服务器。 您可以从http://neo4j.com/download/下载最新版本的Neo4J Community Edition,然后通过运行bin / neo4j-community启动它。

在小对话框中,单击右下角的“开始”按钮,几秒钟后,数据库应已启动并在http:// localhost:7474 /上运行

回到代码, KeywordRepository类扩展了Spring Data Neo4J的存储库接口,使我们可以创建Cypher查询来从Neo4j检索数据,而无需任何样板代码。 使用@RepositoryRestResource批注,它还创建REST端点来访问twitter关键字数据:

@RepositoryRestResource(collectionResourceRel = "keywords", path = "keywords")
public interface KeywordRepository extends GraphRepository<Keyword>, RelationshipOperationsRepository<Keyword> {
    // Spring figures out Neo4j query based on method name:
    Keyword findByWord(String word);

    // Spring implements method using query defined in annotation:
    @Query("START n = node(*) MATCH n-[t:Tag]->c RETURN c.word as tag, count(t) AS tagCount ORDER BY tagCount DESC limit 10")
    List<Map> findTopKeywords();
    @Query("start n=node({0}) MATCH n-[*4]-(m:Keyword) WHERE n <> m RETURN DISTINCT m LIMIT 10")
    List<Keyword> findRelevantKeywords(long keywordId);
}

请注意,必须将Application类配置为查找@RepositoryRestResource批注:

...
@Import(RepositoryRestMvcConfiguration.class)
public class Application extends Neo4jConfiguration {
...

GraphService类封装了所有与Neo4j相关的操作-在数据库中创建节点和关系并查询现有记录。 这是班上的节选:

public Tag connectTweetWithTag(Tweet tweet, String word) {
        Keyword keyword = new Keyword(word);
        keyword = keywordRepository.save(keyword);
        Tag tag = tweetRepository.createRelationshipBetween(tweet, keyword, Tag.class, "Tag");
        return tag;
    }
// ...

    public List<Map> findTopKeywords() {
        return keywordRepository.findTopKeywords();
    }

REST API查询Neo4j

除了Spring Data自动提供的REST端点(例如: http:// localhost:8080 / keywords / )之外, TwitterController类还配置为使用Spring MVC注释处理自定义REST请求:

@RequestMapping("/keywords/relevants/{word}")
    @ResponseBody
    public Iterable<Keyword> findRelevantKeywords(@PathVariable("word") String word) {
        return graphService.findRelevantKeywords(word);
    }

一旦应用程序启动并在http:// localhost:8080 / keywords / relevants / <您的关键字>上运行,您就可以测试此端点。

构建应用程序

该示例应用程序使用Maven v3 +,如果您没有安装它,请点击以下链接下载: http : //maven.apache.org/download.cgi

pom.xml非常简单,它包含所有spring依赖项的列表。 请注意文件中spring-boot-maven-plugin的配置和start-class属性,该属性定义了spring boot maven插件可以从命令行启动的主类(Spring Boot使用嵌入式Tomcat服务器来服务HTTP请求。 )。

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>1.7</java.version>
    <start-class>com.kaviddiss.keywords.Application</start-class>
    <spring-data-neo4j.version>3.2.0.RELEASE</spring-data-neo4j.version>
</properties>
<build>
   <plugins>
      <plugin>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
   </plugins>
</build>

运行应用程序

要运行该应用程序,请执行以下命令:

mvn spring-boot:run

为了查看Neo4j中填充的现有数据,请访问http:// localhost:7474 / browser /并执行以下查询:

MATCH (N) return N;

结果将类似于下面的屏幕截图。

使用Spring Boot处理Twitter feed

摘要

这篇文章简要介绍了使用Spring的一些最令人兴奋的技术(Spring Boot和Spring Data)和Neo4j DB。 我希望您喜欢它,并且您有足够的信息来开始自己的项目。

您以前使用过Spring Boot吗? 您对Spring Boot或此处提到的任何其他技术有何经验? 在下面留下您的评论。

如果您需要帮助来构建高效且可扩展的基于Java的Web应用程序,请告诉我

翻译自: https://www.javacodegeeks.com/2015/08/processing-twitter-feed-using-spring-boot.html