中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

說說 MQ 之 Kafka(二)

2018-10-31    來源:importnew

容器云強勢上線!快速搭建集群,上萬Linux鏡像隨意使用

Kafka 的工具和編程接口

Kafka 的工具

Kafka 提供的工具還是比較全的,bin/?目錄下的工具有以下一些,

bin/connect-distributed.sh     bin/kafka-consumer-offset-checker.sh     bin/kafka-replica-verification.sh   bin/kafka-verifiable-producer.sh
bin/connect-standalone.sh      bin/kafka-consumer-perf-test.sh          bin/kafka-run-class.sh              bin/zookeeper-security-migration.sh
bin/kafka-acls.sh              bin/kafka-mirror-maker.sh                bin/kafka-server-start.sh           bin/zookeeper-server-start.sh
bin/kafka-configs.sh           bin/kafka-preferred-replica-election.sh  bin/kafka-server-stop.sh            bin/zookeeper-server-stop.sh
bin/kafka-console-consumer.sh  bin/kafka-producer-perf-test.sh          bin/kafka-simple-consumer-shell.sh  bin/zookeeper-shell.sh
bin/kafka-console-producer.sh  bin/kafka-reassign-partitions.sh         bin/kafka-topics.sh
bin/kafka-consumer-groups.sh   bin/kafka-replay-log-producer.sh         bin/kafka-verifiable-consumer.sh

我常用的命令有以下幾個,

bin/kafka-server-start.sh -daemon config/server.properties &
bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1
bin/kafka-topics.sh --list --zookeeper 192.168.232.23:2181
bin/kafka-topics.sh --delete --zookeeper 192.168.232.23:2181 --topic topic1
bin/kafka-topics.sh --create --zookeeper 192.168.232.23:2181 --replication-factor 3 --partitions 2 --topic topic1
bin/kafka-console-consumer.sh --zookeeper 192.168.232.23:2181 --topic topic1 --from-beginning
bin/kafka-console-producer.sh --broker-list 192.168.232.23:9092 --topic topic1

kafka-server-start.sh?是用于 Kafka 的 Broker 啟動的,主要就一個參數(shù)?config/server.properties,該文件中的配置項待會再說.還有一個?-daemon?參數(shù),這個是將 Kafka 放在后臺用守護進程的方式運行,如果不加這個參數(shù),Kafka 會在運行一段時間后自動退出,據(jù)說這個是 0.10.0.0 版本才有的問題?5。kafka-topics.sh?是用于管理 Topic 的工具,我主要用的?--describe--list、--delete--create?這4個功能,上述的例子基本是不言自明的,--replication-factor 3、--partitions 2?這兩個參數(shù)分別表示3個副本(含 Leader),和2個分區(qū)。kafka-console-consumer.sh?和?kafka-console-producer.sh?是生產(chǎn)者和消費者的簡易終端工具,在調(diào)試的時候比較有用,我常用的是?kafka-console-consumer.sh。我沒有用 Kafka 自帶的 zookeeper,而是用的 zookeeper 官方的發(fā)布版本 3.4.8,端口是默認2181,與 Broker 在同一臺機器上。

下面說一下 Broker 啟動的配置文件?config/server.properties,我在默認配置的基礎上,修改了以下一些,

broker.id=0
listeners=PLAINTEXT://192.168.232.23:9092
log.dirs=/tmp/kafka-logs
delete.topic.enable=true

broker.id?是 Kafka 集群中的 Broker ID,不可重復,我在多副本的實驗中,將他們分別設置為0、1、2;listeners?是 Broker 監(jiān)聽的地址,默認是監(jiān)聽?localhost:9092,因為我不是單機實驗,所以修改為本機局域網(wǎng)地址,當然,如果要監(jiān)聽所有地址的話,也可以設置為?0.0.0.0:9092,多副本實驗中,將監(jiān)聽端口分別設置為 9092、9093、9094;log.dirs?是 Broker 的 log 的目錄,多副本實驗中,不同的 Broker 需要有不同的 log 目錄;delete.topic.enable?設為 true 后,可以刪除 Topic,并且連帶 Topic 中的消息也一并刪掉,否則,即使調(diào)用?kafka-topics.sh --delete?也無法刪除 Topic,這是一個便利性的設置,對于開發(fā)環(huán)境可以,生產(chǎn)環(huán)境一定要設為 false(默認)。實驗中發(fā)現(xiàn), 如果有消費者在消費這個 Topic,那么也無法刪除,還是比較安全的。

剩下的工具多數(shù)在文檔中也有提到。如果看一下這些腳本的話,會發(fā)現(xiàn)多數(shù)腳本的寫法都是一致的,先做一些參數(shù)的校驗,最后運行?exec $base_dir/kafka-run-class.sh XXXXXXXXX "$@",可見,這些工具都是使用運行 Java Class 的方式調(diào)用的。

Kafka 的 Java API

在編程接口方面,官方提供了 Scala 和 Java 的接口,社區(qū)提供了更多的其他語言的接口,基本上,無論用什么語言開發(fā),都能找到相應的 API。下面說一下 Java 的 API 接口。

生產(chǎn)者的 API 只有一種,相對比較簡單,代碼如下,

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class SimpleProducerDemo {
    public static void main(String[] args){
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.232.23:9092,192.168.232.23:9093,192.168.232.23:9094");
        props.put("zookeeper.connect", "192.168.232.23:2181");
        props.put("client.id", "DemoProducer");
        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<Integer, String> producer = new KafkaProducer<>(props);
        String topic = "topic1";
        Boolean isAsync = false;
        int messageNo = 1;
        while (true) {
            String messageStr = "Message_" + String.format("%05d",messageNo);
            long startTime = System.currentTimeMillis();
            if (isAsync) { // Send asynchronously
                producer.send(new ProducerRecord<>(topic,
                        messageNo,
                        messageStr), new DemoCallBack(startTime, messageNo, messageStr));
            } else { // Send synchronously
                try {
                    producer.send(new ProducerRecord<>(topic,
                            messageNo,
                            messageStr)).get();
                    System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            try {
                Thread.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            ++messageNo;
        }
    }
}
class DemoCallBack implements Callback {
    private final long startTime;
    private final int key;
    private final String message;
    public DemoCallBack(long startTime, int key, String message) {
        this.startTime = startTime;
        this.key = key;
        this.message = message;
    }
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        if (metadata != null) {
            System.out.println(
                    "Send     message: (" + String.format("%05d",key) + ", " + message + ") at offset "+ metadata.offset() +
                            " to partition(" + metadata.partition() +
                            ") in " + elapsedTime + " ms");
        } else {
            exception.printStackTrace();
        }
    }
}

上例中使用了同步和異步發(fā)送兩種方式。在多副本的情況下,如果要指定同步復制還是異步復制,可以使用?acks?參數(shù),詳細參考官方文檔 Producer Configs 部分的內(nèi)容;在多分區(qū)的情況下,如果要指定發(fā)送到哪個分區(qū),可以使用?partitioner.class?參數(shù),其值是一個實現(xiàn)了?org.apache.kafka.clients.producer.Partitioner?接口的類,用于根據(jù)不同的消息指定分區(qū)6。消費者的 API 有幾種,比較新的 API 如下,

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class SimpleConsumer {
    public static void main(String[] args){
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.232.23:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("topic1", "topic2", "topic3"));
        while (true) {
            ConsumerRecords<Integer, String> records = consumer.poll(100);
            for (ConsumerRecord<Integer, String> record : records) {
                System.out.println("Received message: (" + String.format("%05d", record.key()) + ", " + record.value() + ") at offset " + record.offset());
            }
        }
    }
}

消費者還有舊的 API,比如?Consumer?和?SimpleConsumer?API,這些都可以從 Kafka 代碼的 kafka-example 中找到,上述的兩個例子也是改寫自 kafka-example。使用新舊 API 在功能上都能滿足消息收發(fā)的需要,但新 API 只依賴?kafka-clients,打包出來的 jar 包會小很多,以我的測試,新 API 的消費者 jar 包大約有 2M 左右,而舊 API 的消費者 jar 包接近 16M。

其實,Kafka 也提供了按分區(qū)訂閱,可以一次訂閱多個分區(qū)?TopicPartition[];也支持手動提交 offset,需要調(diào)用?consumer.commitSync。

Kafka 似乎沒有公開 Topic 創(chuàng)建以及修改的 API(至少我沒有找到),如果生產(chǎn)者向 Broker 寫入的 Topic 是一個新 Topic,那么 Broker 會創(chuàng)建這個 Topic。創(chuàng)建的過程中會使用默認參數(shù),例如,分區(qū)個數(shù),會使用 Broker 配置中的?num.partitions?參數(shù)(默認1);副本個數(shù),會使用?default.replication.factor?參數(shù)。但是通常情況下,我們會需要創(chuàng)建自定義的 Topic,那官方的途徑是使用 Kafka 的工具。也有一些非官方的途徑?7,例如可以這樣寫,

String[] options = new String[]{
        "--create",
        "--zookeeper",
        "192.168.232.23:2181",
        "--partitions",
        "2",
        "--replication-factor",
        "3",
        "--topic",
        "topic1"
};
TopicCommand.main(options);

但是這樣寫有一個問題,在執(zhí)行完?TopicCommand.main(options);?之后,系統(tǒng)會自動退出,原因是執(zhí)行完指令之后,會調(diào)用?System.exit(exitCode);?系統(tǒng)直接退出。這樣當然不行,我的辦法是,把相關(guān)的執(zhí)行代碼挖出來,寫一個 TopicUtils 類,如下,

import joptsimple.OptionSpecBuilder;
import kafka.admin.TopicCommand;
import kafka.admin.TopicCommand$;
import kafka.utils.ZkUtils;
import org.apache.kafka.common.security.JaasUtils;
import scala.runtime.Nothing$;
public class TopicUtils {
    // from: http://blog.csdn.net/changong28/article/details/39325079
    // from: http://www.cnblogs.com/davidwang456/p/4313784.html
    public static void createTopic(){
        String[] options = new String[]{
                "--create",
                "--zookeeper",
                KafkaProperties.ZOOKEEPER_URL,
                "--partitions",
                "2",
                "--replication-factor",
                "3",
                "--topic",
                KafkaProperties.TOPIC
        };
//        TopicCommand.main(options);
        oper(options);
    }
    public static void listTopic(){
        String[] options = new String[]{
                "--list",
                "--zookeeper",
                KafkaProperties.ZOOKEEPER_URL
        };
//        TopicCommand.main(options);
        oper(options);
    }
    public static void deleteTopic(){
        String[] options = new String[]{
                "--delete",
                "--zookeeper",
                KafkaProperties.ZOOKEEPER_URL,
                "--topic",
                KafkaProperties.TOPIC
        };
//        TopicCommand.main(options);
        oper(options);
    }
    public static void describeTopic(){
        String[] options = new String[]{
                "--describe",
                "--zookeeper",
                KafkaProperties.ZOOKEEPER_URL,
                "--topic",
                KafkaProperties.TOPIC
        };
//        TopicCommand.main(options);
        oper(options);
    }
    public static void main(String[] args){
        listTopic();
        createTopic();
        listTopic();
        describeTopic();
        deleteTopic();
        try {
            Thread.sleep(3*1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        listTopic();
    }
    /** copied & modified from kafka.admin.TopicCommand$.main
     *
     * @param args
     */
    public static void oper(String args[]){
        try {
        TopicCommand$ topicCommand$ = TopicCommand$.MODULE$;
        final TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(args);
        if(args.length == 0) {
            throw kafka.utils.CommandLineUtils$.MODULE$.printUsageAndDie(opts.parser(), "Create, delete, describe, or change a topic.");
        } else {
            int actions =0;
            OptionSpecBuilder[] optionSpecBuilders = {opts.createOpt(), opts.listOpt(), opts.alterOpt(), opts.describeOpt(), opts.deleteOpt()};
            for (OptionSpecBuilder temp:optionSpecBuilders){
                if (opts.options().has(temp)) {
                    actions++;
                }
            }
            if(actions != 1) {
                throw kafka.utils.CommandLineUtils$.MODULE$.printUsageAndDie(opts.parser(), "Command must include exactly one action: --list, --describe, --create, --alter or --delete");
            } else {
                opts.checkArgs();
                ZkUtils zkUtils = kafka.utils.ZkUtils$.MODULE$.apply((String)opts.options().valueOf(opts.zkConnectOpt()), 30000, 30000, JaasUtils.isZkSecurityEnabled());
                byte exitCode = 0;
                try {
                    try {
                        if(opts.options().has(opts.createOpt())) {
                            topicCommand$.createTopic(zkUtils, opts);
                        } else if(opts.options().has(opts.alterOpt())) {
                            topicCommand$.alterTopic(zkUtils, opts);
                        } else if(opts.options().has(opts.listOpt())) {
                            topicCommand$.listTopics(zkUtils, opts);
                        } else if(opts.options().has(opts.describeOpt())) {
                            topicCommand$.describeTopic(zkUtils, opts);
                        } else if(opts.options().has(opts.deleteOpt())) {
                            topicCommand$.deleteTopic(zkUtils, opts);
                        }
                    } catch (final Throwable var12) {
                        scala.Predef$.MODULE$.println((new StringBuilder()).append("Error while executing topic command : ").append(var12.getMessage()).toString());
                        System.out.println(var12);
                        exitCode = 1;
                        return;
                    }
                } finally {
                    zkUtils.close();
//                    System.exit(exitCode);
                }
            }
        }
        } catch (Nothing$ nothing$) {
            nothing$.printStackTrace();
        }
    }
}

以上的?oper?方法改寫自?kafka.admin.TopicCommand$.main?方法。可以發(fā)現(xiàn)這部分代碼非常怪異,原因是?TopicCommand$?是 Scala 寫的,再編譯成 Java class 字節(jié)碼,然后我根據(jù)這些字節(jié)碼反編譯得到 Java 代碼,并以此為基礎進行修改,等于是我在用 Java 的方式改寫 Scala 的代碼,難免會覺得詭異。當然,這種寫法用在生產(chǎn)環(huán)境的話是不太合適的,因為調(diào)用的?topicCommand$.createTopic?等方法都沒有拋出異常,例如參數(shù)不合法的情況,而且也沒有使用 log4j 之類的 log 庫,只是用?System.out.println?這樣的方法屏顯,在出現(xiàn)錯誤的時候,比較難以定位。

參考文章

  1. http://kafka.apache.org/documentation.html?
  2. http://www.jianshu.com/p/453c6e7ff81c?
  3. http://www.infoq.com/cn/author/%E9%83%AD%E4%BF%8A#文章?
  4. http://developer.51cto.com/art/201501/464491.htm?
  5. https://segmentfault.com/q/1010000004292925?
  6. http://www.cnblogs.com/gnivor/p/5318319.html?
  7. http://www.cnblogs.com/davidwang456/p/4313784.html?
  8. http://www.jianshu.com/p/8689901720fd?
  9. http://zqhxuyuan.github.io/2016/05/26/2016-05-13-Kafka-Book-Sample/?
  10. http://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/?

標簽: 安全 代碼 腳本

版權(quán)申明:本站文章部分自網(wǎng)絡,如有侵權(quán),請聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀點!
本站所提供的圖片等素材,版權(quán)歸原作者所有,如需使用,請與原作者聯(lián)系。

上一篇:說說 MQ 之 Kafka(三)

下一篇:說說 MQ 之 Kafka(一)