创建 Twitter 生产者

17 Mar 2025 | 4 分钟阅读

在本节中,我们将学习创建 Twitter 生产者。

创建 Twitter 生产者基本上有三个步骤

  1. 创建 Twitter 客户端。
  2. 创建生产者
  3. 发送推文

步骤 1: 创建一个新的 java 包,遵循包命名约定规则。 然后,在其中创建一个 java 类,例如 'tweetproducer.java'。

步骤 2: 通过创建一个方法来创建 Twitter 客户端。 现在,将 'github twitter java' 中的 Quickstart 代码复制到 Twitter 客户端方法中,如下所示

Creating Twitter Producer

将其粘贴到新创建的方法中。 此代码将在客户端和 hbc 主机之间创建连接。 当队列为空或已满时,BlockingQueue 将阻止客户端对消息进行出队或入队。 由于我们使用 hbc-core,因此只需要 msgQueue。 另外,我们将关注术语,而不是人。 因此,仅复制突出显示的代码。

现在,复制连接代码下方的 '创建客户端' 代码,如下所示

Creating Twitter Producer

将代码粘贴到连接代码下方。 此代码将通过客户端构建器创建一个 Twitter 客户端。 由于我们使用的是 msgQueue,因此请勿复制红色突出显示的代码,该代码用于 eventMessageQueue。 不需要它。

步骤 3: 以我们在前几节中学到的类似方式创建生产者,并建立引导服务器连接。

步骤 4: 创建 Kafka 生产者后,就可以将推文发送到 Kafka 了。 从 'github twitter java' 复制 while 循环代码,该代码在 '创建客户端' 代码下方给出。 粘贴到生产者代码下方。

Creating Twitter Producer

现在,我们已准备好从 Twitter 读取推文。 虽然,Kafka 生产者从主题读取消息。 因此,请使用 CLI 上的 '-create' 命令创建指定的主题。 另外,请指定分区值和复制因子。

例如:

Creating Twitter Producer

这里,已创建主题 'twitter_topic',其分区值为 6,复制因子为 1。 最后,执行代码并在实际应用中体验 Kafka。

创建 Twitter 客户端的完整代码如下

在上面的代码中,用户将指定 consumerKey、consumerSecret 密钥、token 密钥以及密钥。 由于它是敏感信息,因此无法显示。 从 'developer.twitter.com' 复制密钥并粘贴到各自的位置。

Creating Twitter Producer

从 'Keys and Tokens' 复制密钥并粘贴到代码中。

上述代码的输出将显示为

Creating Twitter Producer

客户端与 Hosebird 建立连接。 此后,我们可以看到在 'India' 上生成了太多推文。 在任何指定主题上发布一些推文并尝试一下。

在 CLI 上尝试 'kafka-console-consumer -bootstrap-server 127.0.0.1:9092 -topic twitter_topic' 命令。 输出将与 IntelliJ IDEA 终端上的输出相同

Creating Twitter Producer

通过这种方式,我们可以创建一个真实的 Twitter-Kafka-Producer 并将推文发送到 Kafka。


下一主题Kafka 监控