diff --git a/README.md b/README.md index 8ff7d55..5a49fe2 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,3 @@ # huawei-mrs-kafka-demo +华为大数据平台 kerberos 认证 kafka对接示例 \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..2954eda --- /dev/null +++ b/pom.xml @@ -0,0 +1,88 @@ + + + 4.0.0 + + com.bcrjl.mrs + huawei-mrs-kafka + 1.0-SNAPSHOT + + org.springframework.boot + spring-boot-starter-parent + 2.2.0.RELEASE + + + + + 8 + 8 + UTF-8 + 1.8 + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-test + test + + + + org.springframework.kafka + spring-kafka + + + org.apache.kafka + kafka-clients + + + + + org.apache.kafka + kafka-clients + 2.4.0-hw-ei-312005 + + + + org.projectlombok + lombok + 1.18.28 + provided + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + + + + huawei-cloud-sdk + HuaWei Cloud Mirrors + https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/ + + true + + + true + + + + huawei-mirrors + HuaWei Mirrors + https://repo.huaweicloud.com/repository/maven/ + + + + \ No newline at end of file diff --git a/src/main/java/com/bcrjl/mrs/Application.java b/src/main/java/com/bcrjl/mrs/Application.java new file mode 100644 index 0000000..be8ef3e --- /dev/null +++ b/src/main/java/com/bcrjl/mrs/Application.java @@ -0,0 +1,17 @@ +package com.bcrjl.mrs; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * 功能描述:项目启动类 + * + * @author yanqs + * @since 2023-08-28 + */ +@SpringBootApplication +public class Application { + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } +} diff --git a/src/main/java/com/bcrjl/mrs/config/HuaWeiMrsKafkaConfiguration.java b/src/main/java/com/bcrjl/mrs/config/HuaWeiMrsKafkaConfiguration.java new file mode 100644 index 0000000..f46931f --- /dev/null +++ b/src/main/java/com/bcrjl/mrs/config/HuaWeiMrsKafkaConfiguration.java @@ -0,0 +1,82 @@ +package com.bcrjl.mrs.config; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.*; +import org.springframework.kafka.support.converter.RecordMessageConverter; +import org.springframework.kafka.support.converter.StringJsonMessageConverter; + +import java.util.HashMap; +import java.util.Map; + +/** + * 功能描述:华为Mrs kafka配置 + * + * @author yanqs + * @since 2023-08-28 + */ +@Configuration +public class HuaWeiMrsKafkaConfiguration { + + @Value("${huawei.mrs.kafka.enable}") + public Boolean enable; + @Value("${huawei.mrs.kafka.bootstrap-servers}") + public String boostrapServers; + + @Value("${huawei.mrs.kafka.security.protocol}") + public String securityProtocol; + + @Value("${huawei.mrs.kafka.kerberos.domain.name}") + public String kerberosDomainName; + + @Value("${huawei.mrs.kafka.sasl.kerberos.service.name}") + public String kerberosServiceName; + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConcurrentKafkaListenerContainerFactoryConfigurer configurer, + ConsumerFactory kafkaConsumerFactory, KafkaTemplate template) { + + ConcurrentKafkaListenerContainerFactory factory + = new ConcurrentKafkaListenerContainerFactory<>(); + configurer.configure(factory, kafkaConsumerFactory); + //禁止消费者自启动,达到动态启动消费者的目的 + factory.setAutoStartup(enable); + return factory; + } + + + @Bean + public ConsumerFactory consumerFactory() { + Map configs = new HashMap<>(); + configs.put("security.protocol", securityProtocol); + configs.put("kerberos.domain.name", kerberosDomainName); + configs.put("bootstrap.servers", boostrapServers); + configs.put("sasl.kerberos.service.name", kerberosServiceName); + configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + return new DefaultKafkaConsumerFactory<>(configs); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + Map configs = new HashMap<>(); + configs.put("security.protocol", securityProtocol); + configs.put("kerberos.domain.name", kerberosDomainName); + configs.put("bootstrap.servers", boostrapServers); + configs.put("sasl.kerberos.service.name", kerberosServiceName); + configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + ProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(configs); + return new KafkaTemplate<>(producerFactory); + } + + @Bean + public RecordMessageConverter converter() { + return new StringJsonMessageConverter(); + } + +} diff --git a/src/main/java/com/bcrjl/mrs/consumer/KafkaConsumer.java b/src/main/java/com/bcrjl/mrs/consumer/KafkaConsumer.java new file mode 100644 index 0000000..20c6cfd --- /dev/null +++ b/src/main/java/com/bcrjl/mrs/consumer/KafkaConsumer.java @@ -0,0 +1,23 @@ +package com.bcrjl.mrs.consumer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +/** + * 功能描述:kafka消费订阅 + * + * @author yanqs + * @since 2023-08-28 + */ +@Slf4j +@Component +public class KafkaConsumer { + + @KafkaListener(topics = "topic1") + public void topicMessage(ConsumerRecord record) { + log.info("Kafka->topic:‘topic1’-->{}", record.value()); + //TODO:业务逻辑 + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..a7fd22a --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,14 @@ +huawei: + mrs: + kafka: + enable: false + bootstrap-servers: 10.244.231.2:21007,10.244.230.202:21007,10.244.230.125:21007 + security: + protocol: SASL_PLAINTEXT + kerberos: + domain: + name: hadoop.hadoop_651_arm.com + sasl: + kerberos: + service: + name: kafka \ No newline at end of file