kafka源码阅读-connector,distributed模式


运行 bin/connect-distributed.sh config/connect-distributed.properties,会将share/java下目录为"confluent-common" "kafka-serde-tools" "monitoring-interceptors“ kafka-connect-* 的jar包设置到CLASSPATH中,程序运行起来后会加载到内存中。

bootstrap.servers=localhost:9092

group.id=eoi-connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false

value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter

internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false

internal.value.converter.schemas.enable=false

offset.storage.topic=eoi-connect-offsets

config.storage.topic=eoi-connect-configs

status.storage.topic=eoi-connect-status

offset.flush.interval.ms=10000

rest.host.name=0.0.0.0

rest.port=7583

AbstractConfig基类,默认配置和解析配置的变量

WorkerConfig类提供kafka connector workers的配置基础类,主要是获取connect-distributed.properties的信息

commonClientConifigs 供kafka producer&consumer的配置

DistributedConfig供distributed模式配置

一、命令行启动connector   
1 实例化工厂类ConnectorFactory

2 加载connect-distributed.properties到Map<String, String>

3 初始化connector类插件加载器

Plugins plugins = new Plugins(workerProps);

4 实例化DistributedConfig

5 实例化RestServer,服务端ip和端口从配置文件获取(0.0.0.0:7583),接受连接,生成对象server

1)实例化对象 jetty httpserver,对外提供REST风格的API 接受http请求,然后调用服务Handler

使用一个ThreadPool接受Post、Get等请求,交于Handler处,用到了NIO接受请求。

6 KafkaOffsetBackingStore用kafka的topic存储offsets,很多tasks的offset map都使用这个对象

private KafkaBasedLog<byte[], byte[]> offsetLog;

private HashMap<ByteBuffer, ByteBuffer> data;

topic不存在则自动创建。

5 实例化Worker,worker在一组线程里动态运行一系列的tasks来执行source or sink任务,每个task都有一个固定的线程,worker充当了个容器作用,对象里定义一下变量

private final ExecutorService executor;

private final Time time;
private final String workerId;
private final Plugins plugins;
private final WorkerConfig config;
private final Converter defaultKeyConverter;
private final Converter defaultValueConverter;
private final Converter internalKeyConverter;
private final Converter internalValueConverter;
private final OffsetBackingStore offsetBackingStore;
private final Map<String, Object> producerProps;
private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>();
private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;

           1)定义线程池ExecutorService,这里用的是CachedThreadPool特点是:

                - 池中线程时随着处理数据增加而增加 

               - 线程数并不是一直增加,如果有新任务需要执行时,首先查询池中是否有空闲线程并且还为到空闲截止时间,如果有,则使用空闲线程,如果没有,则创建新线程并放入池中。 

               - 用于执行一些生存期很短的异步型任务。不适用于IO等长延时操作,因为这可能会创建大量线程,导致系统崩溃。 
               - 使用SynchronousQueue作为阻塞队列,如果有新任务进入队列,必须队列中数据被其他线程处理,否则会等待。

2)Time

        3)plugin

4)workerId

6 初始化 KafkaStatusBackingStore

{"state":"RUNNING","trace":null,"worker_id":"192.168.31.30:7583","generation":7}

{"state":"UNASSIGNED","trace":null,"worker_id":"192.168.31.30:7583","generation":7}

{"state":"RUNNING","trace":null,"worker_id":"192.168.31.31:7583","generation":8}

{"state":"UNASSIGNED","trace":null,"worker_id":"192.168.31.31:7583","generation":8}

{"state":"RUNNING","trace":null,"worker_id":"192.168.31.31:7583","generation":9}

{"state":"UNASSIGNED","trace":null,"worker_id":"192.168.31.31:7583","generation":9}

{"state":"RUNNING","trace":null,"worker_id":"192.168.31.30:7583","generation":10}

{"state":"UNASSIGNED","trace":null,"worker_id":"192.168.31.30:7583","generation":10}

{"state":"RUNNING","trace":null,"worker_id":"192.168.31.30:7583","generation":11}

{"state":"FAILED","trace":"org.apache.kafka.connect.errors.ConnectException: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure\n\nThe last packet successfully received from the server was 10,002 milliseconds ago.  The last packet sent successfully to the server was 10,001 milliseconds ago.\n\tat com.eoitek.dc.connect.jdbc.util.ConnectionProvider.getValidConnection(ConnectionProvider.java:59)\n\tat com.eoitek.dc.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:168)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:162)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure\n\nThe last packet successfully received from the server was 10,002 milliseconds ago.  The last packet sent successfully to the server was 10,001 milliseconds ago.\n\tat sun.reflect.GeneratedConstructorAccessor32.newInstance(Unknown Source)\n\tat sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)\n\tat java.lang.reflect.Constructor.newInstance(Constructor.java:423)\n\tat com.mysql.jdbc.Util.handleNewInstance(Util.java:425)\n\tat com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:989)\n\tat com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3556)\n\tat com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3456)\n\tat com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3897)\n\tat com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:873)\n\tat com.mysql.jdbc.MysqlIO.proceedHandshakeWithPluggableAuthentication(MysqlIO.java:1710)\n\tat com.mysql.jdbc.MysqlIO.doHandshake(MysqlIO.java:1226)\n\tat com.mysql.jdbc.ConnectionImpl.coreConnect(ConnectionImpl.java:2253)\n\tat com.mysql.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:2284)\n\tat com.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2083)\n\tat com.mysql.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:806)\n\tat com.mysql.jdbc.JDBC4Connection.<init>(JDBC4Connection.java:47)\n\tat sun.reflect.GeneratedConstructorAccessor21.newInstance(Unknown Source)\n\tat sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)\n\tat java.lang.reflect.Constructor.newInstance(Constructor.java:423)\n\tat com.mysql.jdbc.Util.handleNewInstance(Util.java:425)\n\tat com.mysql.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:410)\n\tat com.mysql.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:328)\n\tat java.sql.DriverManager.getConnection(DriverManager.java:664)\n\tat java.sql.DriverManager.getConnection(DriverManager.java:247)\n\tat com.eoitek.dc.connect.jdbc.util.ConnectionProvider.newConnection(ConnectionProvider.java:66)\n\tat com.eoitek.dc.connect.jdbc.util.ConnectionProvider.getValidConnection(ConnectionProvider.java:56)\n\t... 9 more\nCaused by: java.io.EOFException: Can not read response from server. Expected to read 4 bytes, read 0 bytes before connection was unexpectedly lost.\n\tat com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:3008)\n\tat com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3466)\n\t... 29 more\n","worker_id":"192.168.31.30:7583","generation":11}

7 初始化 KafkaConfigBackingStore

{"properties":{"connector.class":"com.eoitek.dc.connect.jdbc.JdbcSourceConnector","incrementing.column.name":"id","connection.password":"[email protected]","tasks.max":"1","query":"select * from wkm1","poll.now":"true","mode":"bulk","topic.prefix":"testhirdpart_my20180628095745891","task.class":"com.eoitek.dc.connect.jdbc.source.JdbcSourceTask","connection.user":"root","poll.interval.cron":"0 0/4 * * * ? *","name":"jdbc-sync-192193601130496-all","publish.fields":"{\"@@datasetid\":192193601130496,\"@dataset\":\"my20180628095745891\",\"@hostname\":\"192.168.31.136\",\"@sourcetype\":\"mysql\",\"@topic\":\"testhirdpart_my20180628095745891\"}","connection.url":"jdbc:mysql://192.168.31.136:3306/test_data"}}

{"tasks":1}

{"properties":{"connection.password":"[email protected]","connection.url":"jdbc:mysql://192.168.31.136:3306/test_data","connection.user":"root","connector.class":"com.eoitek.dc.connect.jdbc.JdbcSourceConnector","incrementing.column.name":"id","mode":"bulk","poll.interval.cron":"0 0/2 * * * ? *","poll.now":"true","publish.fields":"{\"@@datasetid\":192187803377664,\"@dataset\":\"my20180628093410414\",\"@hostname\":\"192.168.31.136\",\"@sourcetype\":\"mysql\",\"@topic\":\"testhirdpart_my20180628093410414\"}","query":"select * from wkm1","tasks.max":"1","topic.prefix":"testhirdpart_my20180628093410414","name":"jdbc-sync-192187803377664-all"}}

{"properties":{"connection.password":"[email protected]","connection.url":"jdbc:mysql://192.168.31.136:3306/test_data","connection.user":"root","connector.class":"com.eoitek.dc.connect.jdbc.JdbcSourceConnector","incrementing.column.name":"id","mode":"bulk","poll.interval.cron":"0 0/4 * * * ? *","poll.now":"true","publish.fields":"{\"@@datasetid\":192193601130496,\"@dataset\":\"my20180628095745891\",\"@hostname\":\"192.168.31.136\",\"@sourcetype\":\"mysql\",\"@topic\":\"testhirdpart_my20180628095745891\"}","query":"select * from wkm1","tasks.max":"1","topic.prefix":"testhirdpart_my20180628095745891","name":"jdbc-sync-192193601130496-all"}}

{"state":"PAUSED"}

{"properties":{"connection.password":"[email protected]","connection.url":"jdbc:mysql://192.168.31.136:3306/test_data","connection.user":"root","connector.class":"com.eoitek.dc.connect.jdbc.JdbcSourceConnector","incrementing.column.name":"id","mode":"bulk","poll.interval.cron":"0 0/4 * * * ? *","poll.now":"true","publish.fields":"{\"@@datasetid\":192188845568000,\"@dataset\":\"my20180628093824865\",\"@hostname\":\"192.168.31.136\",\"@sourcetype\":\"mysql\",\"@topic\":\"testhirdpart_my20180628093824865\"}","query":"select * from wkm1","tasks.max":"1","topic.prefix":"testhirdpart_my20180628093824865","name":"jdbc-sync-192188845568000-all"}}

{"state":"PAUSED"}

{"state":"PAUSED"}

{"state":"PAUSED"}

{"state":"PAUSED"}

{"properties":{"connector.class":"com.eoitek.dc.connect.kafka.KafkaSourceConnector","kafka.consume.offset":"latest","kafka.poll.interval":"10000","kafka.publish.fields":"{\"@@datasetid\":197141768261632,\"@dataset\":\"kfk20180712093154506\",\"@record\":\"kfk20180712093154506\",\"@sourcetype\":\"kafka\",\"@topic\":\"jan_kfk20180712093154506\"}","kafka.publish.topic":"jan_kfk20180712093154506","kafka.source.servers":"192.168.31.186:9092","kafka.source.topics":"GW_TO_ITOA_REQ_NEW","kafka.version":">0.10.0","streaming.id":"197141768261632","tasks.max":"1","name":"kafka-sync-197141768261632-all"}}

{"properties":{"connector.class":"com.eoitek.dc.connect.kafka.KafkaSourceConnector","kafka.consume.offset":"latest","kafka.poll.interval":"10000","streaming.id":"197141768261632","kafka.source.topics":"GW_TO_ITOA_REQ_NEW","task.class":"com.eoitek.dc.connect.kafka.KafkaSourceTask","tasks.max":"1","kafka.publish.fields":"{\"@@datasetid\":197141768261632,\"@dataset\":\"kfk20180712093154506\",\"@record\":\"kfk20180712093154506\",\"@sourcetype\":\"kafka\",\"@topic\":\"jan_kfk20180712093154506\"}","name":"kafka-sync-197141768261632-all","kafka.source.servers":"192.168.31.186:9092","kafka.publish.topic":"jan_kfk20180712093154506","kafka.version":">0.10.0"}}

{"tasks":1}

{"state":"PAUSED"}

8 DistributedHerder 


public class DistributedHerder extends AbstractHerder implements Runnable {
private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class);
private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250;
private static final int START_STOP_THREAD_POOL_SIZE = 8;
private final AtomicLong requestSeqNum = new AtomicLong();
private final Time time;
private final String workerGroupId;
private final int workerSyncTimeoutMs;
private final long workerTasksShutdownTimeoutMs;
private final int workerUnsyncBackoffMs;
private final ExecutorService herderExecutor;
private final ExecutorService forwardRequestExecutor;
private final ExecutorService startAndStopExecutor;
private final WorkerGroupMember member;
private final AtomicBoolean stopping;
// Track enough information about the current membership state to be able to determine which requests via the API
// and the from other nodes are safe to process
private boolean rebalanceResolved;
private ConnectProtocol.Assignment assignment;
private boolean canReadConfigs;
private ClusterConfigState configState;用来获取kafka connector 集群connectors和tasks的快照
// To handle most external requests, like creating or destroying a connector, we can use a generic request where
// the caller specifies all the code that should be executed.
final NavigableSet<HerderRequest> requests = new ConcurrentSkipListSet<>();
// Config updates can be collected and applied together when possible. Also, we need to take care to rebalance when
// needed (e.g. task reconfiguration, which requires everyone to coordinate offset commits).
private Set<String> connectorConfigUpdates = new HashSet<>();
// Similarly collect target state changes (when observed by the config storage listener) for handling in the
// herder's main thread.
private Set<String> connectorTargetStateChanges = new HashSet<>();
private boolean needsReconfigRebalance;
private volatile int generation;

this.time = time;
this.workerGroupId = config.getString(DistributedConfig.GROUP_ID_CONFIG);
this.workerSyncTimeoutMs = config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG);
this.workerTasksShutdownTimeoutMs = config.getLong(DistributedConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
this.workerUnsyncBackoffMs = config.getInt(DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG);
this.member = member != null ? member : new WorkerGroupMember(config, restUrl, this.configBackingStore, new RebalanceListener(), time);
this.herderExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>(1),
new ThreadFactory() {
@Override
public Thread newThread(Runnable herder) {
return new Thread(herder, "DistributedHerder");
}
});
this.forwardRequestExecutor = Executors.newSingleThreadExecutor();
this.startAndStopExecutor = Executors.newFixedThreadPool(START_STOP_THREAD_POOL_SIZE);
stopping = new AtomicBoolean(false);
configState = ClusterConfigState.EMPTY;
rebalanceResolved = true; // If we still need to follow up after a rebalance occurred, starting up tasks
needsReconfigRebalance = false;
canReadConfigs = true; // We didn't try yet, but Configs are readable until proven otherwise

9 初始化Connector类, 把所有的核心组件组合在一起,统一管理它们的生命周期。


private final Herder herder;
private final RestServer rest;
private final CountDownLatch startLatch = new CountDownLatch(1);
private final CountDownLatch stopLatch = new CountDownLatch(1);
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final ShutdownHook shutdownHook;

connec.start()启动总开关

kafka源码阅读-connector,distributed模式


public void start() {
try {
log.info("Kafka Connect starting");
Runtime.getRuntime().addShutdownHook(shutdownHook);设置结束Connector进程钩子函数
herder.start();启动herder
rest.start(herder);启动jetty服务器接受http请求
log.info("Kafka Connect started");
} finally {
startLatch.countDown();
}
}

1  herder.start()


public void start() {
this.herderExecutor.submit(this);
}

启动DistributedHerder的run()函数,herdExecutor有且仅有一个工作线程执行任务,所有任务按照指定顺序执行,即遵循队列的入队出队规则


@Override
public void run() {
try {
log.info("Herder starting");
startServices();
log.info("Herder started");
while (!stopping.get()) {
tick();
}
halt();
log.info("Herder stopped");
} catch (Throwable t) {
log.error("Uncaught exception in herder work thread, exiting: ", t);
Exit.exit(1);
}
}

protected void startServices() {
this.worker.start();
this.statusBackingStore.start();
this.configBackingStore.start();
}
worker.start()

/**
 * Start worker.
 */
public void start() {
    log.info("Worker starting");

    offsetBackingStore.start();
    sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(config);

    log.info("Worker started");
}

offsetBackingStore.start();读取kafka中offset topic 最后一条消息 的值。并死循环查询最后最后一条消息 的值

sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(config);很重要的一个类,用来管理offset定期提交sourcetask任务

private final WorkerConfig config;
private final ScheduledExecutorService commitExecutorService;
private final ConcurrentMap<ConnectorTaskId, ScheduledFuture<?>> committers;

this.statusBackingStore.start();轮询status  topic的最后消息的值

this.configBackingStore.start();轮询config  topic的最后消息的值


while (!stopping.get()) {
    tick();
}