Windows下 IDEA操作远程Linux服务器下Kafka服务
Windows下 IDEA操作远程Linux服务器下Kafka服务
1 修改Kafka配置文件config/server.properties
主要修改以下三个参数,listeners和host.name写上kafka broker主机的地址 这个地址不配置会造成远程无法访问
zookeeper.connect=localhost:2181
listeners=PLAINTEXT://10.190.188.188:9092
host.name=10.190.188.188
2 启动Kafka服务
(1)启动zookeeper zkServer.sh start
(2)启动kafaka kafka-server-start.sh config/server.properties --daemon
3 使用命令行验证Kafka正常工作
产看Topic列表
kafka-topics.sh --list --zookeeper localhost:2181
创建主题
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
启动主题的发送者
kafka-console-producer.sh --broker-list localhost:9092 --topic test
启动简单的主题接受者
kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
查看主题的信息
kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
4 使用IDEA编写JAVA程序
图工程目录
生产者程序
import java.io.Serializable;
public class StockQuotationinfo implements Serializable {
public static final long serialVersionUID = 1L;
/*股票代码*/
public String stockCode;
/**
* 股票名称
*/
public String stockName;
/**
* 交易时间
*/
public long tradeTime;
/**
* 昨日 t&盘价
*/
public float preClosePrice;
/**
* 开盘价
*/
public float openPrice;
/**
* 当前价,收盘时即为当日收盘价
*/
public float currentPrice;
/*今日最高价 */
public float highPrice;
/*今日棋低价*/
private float lowPrice;
@Override
public String toString() {
return this.stockCode + " ! " + stockName + " | " + tradeTime + " ! " + preClosePrice
+ "|" + openPrice + " | " + currentPrice + "| " + highPrice + " | " + lowPrice;
}
public String getStockCode() {
return stockCode;
}
public void setStockCode(String stockCode) {
this.stockCode = stockCode;
}
public String getStockName() {
return stockName;
}
public void setStockName(String stockName) {
this.stockName = stockName;
}
public long getTradeTime() {
return tradeTime;
}
public void setTradeTime(long tradeTime) {
this.tradeTime = tradeTime;
}
public float getPreClosePrice() {
return preClosePrice;
}
public void setPreClosePrice(float preClosePrice) {
this.preClosePrice = preClosePrice;
}
public float getOpenPrice() {
return openPrice;
}
public void setOpenPrice(float openPrice) {
this.openPrice = openPrice;
}
public float getCurrentPrice() {
return currentPrice;
}
public void setCurrentPrice(float currentPrice) {
this.currentPrice = currentPrice;
}
public float getHighPrice() {
return highPrice;
}
public void setHighPrice(float highPrice) {
this.highPrice = highPrice;
}
public float getLowPrice() {
return lowPrice;
}
public void setLowPrice(float lowPrice) {
this.lowPrice = lowPrice;
}
}
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.Logger;
import java.text.DecimalFormat;
import java.util.Properties;
import java.util.Random;
public class QuotationProducer {
private static final Logger LOG = Logger.getLogger(QuotationProducer.class);
/*"设置实例生产消息的总数*/
private static final int MSG_SIZE = 100;
/*主题名称*/
private static final String TOPIC = "stock-quotation";
/*" Kafka 集群的地址如果有多个地址使用逗号分隔,可以写多个不需要把所有集群Broker全写出*/
private static final String BROKER_LIST = "10.190.188.188:9092";
private static KafkaProducer<String, String> producer = null;
static {
//构造用于实例化 KafkaProducer 的 Properties 信息
Properties configs = initConfiq();
//初始化一个 KafkaProducer 连接Kafka服务器
producer = new KafkaProducer<String, String>(configs);
}
/*初始化 Kafka 配置
* @return
*/
private static Properties initConfiq() {
Properties properties = new Properties();
//Kafka broker 列表
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
//设置Key和Value的序列化类
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return properties;
}
/*生产股票行情信息
* @return
*/
private static StockQuotationinfo createQuotationinfo() {
StockQuotationinfo quotationinfo = new StockQuotationinfo();
//随机产生 l 到 10 之间的整数,然后与 600100 相加组成股票代码
Random r = new Random();
Integer stockCode = 600100 + r.nextInt(10);
//随机产生一个 0 到 1 之间的浮点数
float random = (float) Math.random();
//设置涨跌规则
if (random / 2 < 0.5) {
random = -random;
}
DecimalFormat decimalFormat = new DecimalFormat(".00"); //设置保存两位有效数字
quotationinfo.setCurrentPrice(Float.valueOf(decimalFormat.format(11 + random))); // 设置最新价在 l l 元浮动
quotationinfo.setPreClosePrice(11.80f);
// 设置昨日收盘价为固定值
quotationinfo.setOpenPrice(11.5f);
// 设置开盘价
quotationinfo.setLowPrice(10.5f);
// 设置最低价,并不考虑 10 屯 限制,
//以及当前价是否已是最低价6.2 生俨者 API 应用 229
quotationinfo.setHighPrice(12.5f);
//设置最高价 , 并不考虑 10宅限制 ,
//以及当前价是否已是最高价
quotationinfo.setStockCode(stockCode.toString());
quotationinfo.setTradeTime(System.currentTimeMillis());
quotationinfo.setStockName("股票-" + stockCode);
return quotationinfo;
}
//使用 ProducerRecord 发送消息
public static void main(String[] args) {
ProducerRecord<String, String> record = null;
StockQuotationinfo quotationinfo = null;
try {
int num = 0;
for (int i = 0; i < MSG_SIZE; i++) {
quotationinfo = createQuotationinfo();
LOG.info("发出一条消息"+quotationinfo.toString());
record = new ProducerRecord<String, String>(TOPIC, 0, quotationinfo.getStockCode(), quotationinfo.toString());
producer.send(record); //异步发送消息
//异步发送添加回调函数,展示信息
producer.send(record,new Callback()
{
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(metadata!=null)
System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset());
else if(exception!=null)
System.out.println(exception.fillInStackTrace());
}
});
if (num++ % 10 == 0) {
Thread.sleep(2000L);
}
}
} catch (InterruptedException e) {
LOG.error(" Send message occurs exception", e);
} finally {
producer.close();
}
}
}