Apache Sentry初讀

1. 背景

????Apache Sentry 是Cloudera公司發(fā)布的一個Hadoop開源組件,提供了細(xì)粒度級、基于角色的授權(quán)以及多租戶的管理模式,主要針對存儲在Hadoop集群上的數(shù)據(jù)和元數(shù)據(jù)。它可以和Hive/Hcatalog、Apache Solr 和Cloudera Impala等集成,未來可以擴(kuò)展到其他Hadoop生態(tài)系統(tǒng)組件,如HDFS和HBase。

????Sentry旨在成為可插拔授權(quán)引擎的Hadoop組件。允許定義授權(quán)規(guī)則以驗證用戶或應(yīng)用程序?qū)adoop資源的訪問請求。Sentry是高度模塊化的,可以支持Hadoop中各種數(shù)據(jù)模型的授權(quán)。

2. 本文的目標(biāo)

? ? 本文目標(biāo)人群為初次接觸apache sentry的開發(fā)人員,幫助其找到代碼的入口函數(shù),快速摸清代碼的架構(gòu),梳理系統(tǒng)的結(jié)構(gòu),追蹤代碼的調(diào)用路徑,為之后的深入閱讀打下基礎(chǔ)。建議閱讀之前,在網(wǎng)上搜索sentry相關(guān)的資料,了解其架構(gòu),原理,以便更好地了解源代碼。寫這篇文章時,本人也是剛接觸sentry,有些心得,與大家分享。更多深入的理解,請關(guān)注下一次的分享

3. sentry簡單介紹

? ? 權(quán)限模型,在很多系統(tǒng)我們都見過,先是資源,然后是權(quán)限,權(quán)限是對資源的訪問規(guī)則,然后有角色,角色是一組權(quán)限,最后是用戶,角色賦予給用戶,有時候也會有組的概念,相同屬性的用戶劃成一組,角色賦予給組。sentry的權(quán)限模型也是同樣的原理。它包括一下元素:

? ? a. Resource:權(quán)限的對象,包括server, 庫,表,行,uri等

? ? b. Privilege:? 權(quán)限,可以連接成一組訪問規(guī)則。比如用戶A可以對表T進(jìn)行讀訪問,但不能刪除。

? ? c. Role:角色是一組權(quán)限的集合

? ? d. Group:相當(dāng)于用戶這個概念,sentry不存在用戶這個概念。都是以Group來表達(dá)。你可以理解成sentry的組就是用戶或者賬號的概念。角色賦予給Group。

4. 代碼初讀

4.1 入口函數(shù)

? ? 閱讀源代碼,最好的方式是找到入口函數(shù),從入口函數(shù)一步步往下閱讀,自然能夠梳理出代碼的整體架構(gòu)。

? ? Sentry入口函數(shù)位于:sentry-tools/src/main/java/org/apache/sentry/SentryMain.java

為什么我知道在這兒,根據(jù)啟動時采用命令行的方式,帶參數(shù)。一般都會有main函數(shù),還是經(jīng)驗問題哈。



public static void main(String[] args)

? ? ? ? ? ? throws Exception {

? ? ? ? CommandLineParser parser = new GnuParser();

? ? ? ? Options options = new Options();

? ? ? ? options.addOption(HELP_SHORT, HELP_LONG, false, "Print this help text");

? ? ? ? options.addOption(VERSION_SHORT, VERSION_LONG, false,

? ? ? ? ? ? ? ? "Print Sentry version");

? ? ? ? options.addOption(HIVE_CONF, true, "Set hive configuration variables");

? ? ? ? options.addOption(null, COMMAND, true, "Command to run. Options: " + COMMANDS);

? ? ? ? options.addOption(null, LOG4J_CONF, true, "Location of log4j properties file");

? ? ? ? //Ignore unrecognized options: service and config-tool options

? ? ? ? CommandLine commandLine = parser.parse(options, args, true);

? ? ? ? String log4jconf = commandLine.getOptionValue(LOG4J_CONF);

? ? ? ? if (log4jconf != null && log4jconf.length() > 0) {

? ? ? ? ? ? Properties log4jProperties = new Properties();

? ? ? ? ? ? // Firstly load log properties from properties file

? ? ? ? ? ? try (InputStream istream = Files.newInputStream(Paths.get(log4jconf))) {

? ? ? ? ? ? ? ? log4jProperties.load(istream);

? ? ? ? ? ? }

? ? ? ? ? ? // Set the log level of DataNucleus.Query to INFO only if it is not set in the

? ? ? ? ? ? // properties file

? ? ? ? ? ? if (!log4jProperties.containsKey(LOG4J_DATANUCLEUS)) {

? ? ? ? ? ? ? ? log4jProperties.setProperty(LOG4J_DATANUCLEUS, "INFO");

? ? ? ? ? ? ? ? // Enable debug log for DataNucleus.Query only when log.threshold is TRACE

? ? ? ? ? ? ? ? String logThreshold = log4jProperties.getProperty("log.threshold");

? ? ? ? ? ? ? ? if (logThreshold != null && logThreshold.equalsIgnoreCase("TRACE")) {

? ? ? ? ? ? ? ? ? ? log4jProperties.setProperty(LOG4J_DATANUCLEUS, "DEBUG");

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? ? ? PropertyConfigurator.configure(log4jProperties);

? ? ? ? ? ? Logger sentryLogger = LoggerFactory.getLogger(SentryMain.class);

? ? ? ? ? ? sentryLogger.info("Configuring log4j to use [" + log4jconf + "]");

? ? ? ? }

? ? ? ? //Print sentry help only if commandName was not given,

? ? ? ? // otherwise we assume the help is for the sub command

? ? ? ? String commandName = commandLine.getOptionValue(COMMAND);

? ? ? ? if (commandName == null && (commandLine.hasOption(HELP_SHORT) ||

? ? ? ? ? ? ? ? commandLine.hasOption(HELP_LONG))) {

? ? ? ? ? ? printHelp(options, "Command name is missing.");

? ? ? ? } else if (commandLine.hasOption(VERSION_SHORT) ||

? ? ? ? ? ? ? ? commandLine.hasOption(VERSION_LONG)) {

? ? ? ? ? ? printVersion();

? ? ? ? }

? ? ? ? Command command = null;

? ? ? ? switch (commandName){

? ? ? ? ? ? case "service":

? ? ? ? ? ? ? ? command = new SentryService.CommandImpl();

? ? ? ? ? ? ? ? break;

? ? ? ? ? ? case "config-tool":

? ? ? ? ? ? ? ? command = new SentryConfigTool.CommandImpl();

? ? ? ? ? ? ? ? break;

? ? ? ? ? ? case "schema-tool":

? ? ? ? ? ? ? ? command = new SentrySchemaTool.CommandImpl();

? ? ? ? ? ? ? ? break;

? ? ? ? ? ? default:

? ? ? ? ? ? ? ? printHelp(options, "Unknown command " + commandName + "\n");

? ? ? ? ? ? ? ? break;

? ? ? ? }

? ? ? ? ((Command)command).run(commandLine.getArgs());

? ? }



這段代碼主要是根據(jù)傳入的參數(shù),解析參數(shù),讀取配置文件,初始化日志。其他的大致看一眼,最主要的是這個函數(shù):command = new SentryService.CommandImpl();

4.2 啟動過程:

? ? sentry啟動過程主要看這個類:sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/SentryService.java。



? ??
public SentryService(Configuration conf) throws Exception {

? ? this.conf = conf;

? ? int port = conf

? ? ? ? .getInt(ServerConfig.RPC_PORT, ServerConfig.RPC_PORT_DEFAULT);

? ? if (port == 0) {

? ? ? port = findFreePort();

? ? ? conf.setInt(ServerConfig.RPC_PORT, port);

? ? }

? ? this.address = NetUtils.createSocketAddr(

? ? ? ? conf.get(ServerConfig.RPC_ADDRESS, ServerConfig.RPC_ADDRESS_DEFAULT),

? ? ? ? port);

? ? LOGGER.info("Configured on address {}", address);

? ? kerberos = ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase(

? ? ? ? conf.get(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_KERBEROS).trim());

? ? maxThreads = conf.getInt(ServerConfig.RPC_MAX_THREADS,

? ? ? ? ServerConfig.RPC_MAX_THREADS_DEFAULT);

? ? minThreads = conf.getInt(ServerConfig.RPC_MIN_THREADS,

? ? ? ? ServerConfig.RPC_MIN_THREADS_DEFAULT);

? ? maxMessageSize = conf.getLong(ServerConfig.SENTRY_POLICY_SERVER_THRIFT_MAX_MESSAGE_SIZE,

? ? ? ? ServerConfig.SENTRY_POLICY_SERVER_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);

? ? if (kerberos) {

? ? ? // Use Hadoop libraries to translate the _HOST placeholder with actual hostname

? ? ? try {

? ? ? ? String rawPrincipal = Preconditions.checkNotNull(conf.get(ServerConfig.PRINCIPAL), ServerConfig.PRINCIPAL + " is required");

? ? ? ? principal = SecurityUtil.getServerPrincipal(rawPrincipal, address.getAddress());

? ? ? } catch(IOException io) {

? ? ? ? throw new RuntimeException("Can't translate kerberos principal'", io);

? ? ? }

? ? ? LOGGER.info("Using kerberos principal: {}", principal);

? ? ? principalParts = SaslRpcServer.splitKerberosName(principal);

? ? ? Preconditions.checkArgument(principalParts.length == 3,

? ? ? ? ? "Kerberos principal should have 3 parts: " + principal);

? ? ? keytab = Preconditions.checkNotNull(conf.get(ServerConfig.KEY_TAB),

? ? ? ? ? ServerConfig.KEY_TAB + " is required");

? ? ? File keytabFile = new File(keytab);

? ? ? Preconditions.checkState(keytabFile.isFile() && keytabFile.canRead(),

? ? ? ? ? "Keytab %s does not exist or is not readable.", keytab);

? ? } else {

? ? ? principal = null;

? ? ? principalParts = null;

? ? ? keytab = null;

? ? }

? ? ThreadFactory sentryServiceThreadFactory = new ThreadFactoryBuilder()

? ? ? ? .setNameFormat(SENTRY_SERVICE_THREAD_NAME)

? ? ? ? .build();

? ? serviceExecutor = Executors.newSingleThreadExecutor(sentryServiceThreadFactory);

? ? this.sentryStore = getSentryStore(conf);

? ? sentryStore.setPersistUpdateDeltas(SentryServiceUtil.isHDFSSyncEnabled(conf));

? ? this.leaderMonitor = LeaderStatusMonitor.getLeaderStatusMonitor(conf);

? ? status = Status.NOT_STARTED;

? ? // Enable signal handler for HA leader/follower status if configured

? ? String sigName = conf.get(ServerConfig.SERVER_HA_STANDBY_SIG);

? ? if ((sigName != null) && !sigName.isEmpty()) {

? ? ? LOGGER.info("Registering signal handler {} for HA", sigName);

? ? ? try {

? ? ? ? registerSigListener(sigName, this);

? ? ? } catch (Exception e) {

? ? ? ? LOGGER.error("Failed to register signal", e);

? ? ? }

? ? }

? }



? ? 首先看下這個類的構(gòu)造函數(shù),主要是根據(jù)傳入的配置文件,確定thrift的端口,最大線程數(shù),最小線程數(shù),最大消息size。還有監(jiān)控,kerberos驗證,可用行HA的配置。最主要的是看:?this.sentryStore = getSentryStore(conf);這個函數(shù)。由于是采用thrift這種rpc框架,所以要注冊processor,以及根據(jù)thrift IDL生成實際的處理函數(shù)。要想弄懂Sentry,建議先去弄懂Thrift這個RPC框架。



private void runServer() throws Exception {

? ? startSentryStoreCleaner(conf);

? ? startHMSFollower(conf);

? ? Iterable<String> processorFactories = ConfUtilties.CLASS_SPLITTER

? ? ? ? .split(conf.get(ServerConfig.PROCESSOR_FACTORIES,

? ? ? ? ? ? ServerConfig.PROCESSOR_FACTORIES_DEFAULT).trim());

? ? TMultiplexedProcessor processor = new TMultiplexedProcessor();

? ? boolean registeredProcessor = false;

? ? for (String processorFactory : processorFactories) {

? ? ? Class<?> clazz = conf.getClassByName(processorFactory);

? ? ? if (!ProcessorFactory.class.isAssignableFrom(clazz)) {

? ? ? ? throw new IllegalArgumentException("Processor Factory "

? ? ? ? ? ? + processorFactory + " is not a "

? ? ? ? ? ? + ProcessorFactory.class.getName());

? ? ? }

? ? ? try {

? ? ? ? Constructor<?> constructor = clazz

? ? ? ? ? ? .getConstructor(Configuration.class);

? ? ? ? LOGGER.info("ProcessorFactory being used: " + clazz.getCanonicalName());

? ? ? ? ProcessorFactory factory = (ProcessorFactory) constructor

? ? ? ? ? ? .newInstance(conf);

? ? ? ? boolean registerStatus = factory.register(processor, sentryStore);

? ? ? ? if (!registerStatus) {

? ? ? ? ? LOGGER.error("Failed to register " + clazz.getCanonicalName());

? ? ? ? }

? ? ? ? registeredProcessor = registerStatus || registeredProcessor;

? ? ? } catch (Exception e) {

? ? ? ? throw new IllegalStateException("Could not create "

? ? ? ? ? ? + processorFactory, e);

? ? ? }

? ? }

? ? if (!registeredProcessor) {

? ? ? throw new IllegalStateException(

? ? ? ? ? "Failed to register any processors from " + processorFactories);

? ? }

? ? addSentryServiceGauge();

? ? TServerTransport serverTransport = new TServerSocket(address);

? ? TTransportFactory transportFactory = null;

? ? if (kerberos) {

? ? ? TSaslServerTransport.Factory saslTransportFactory = new TSaslServerTransport.Factory();

? ? ? saslTransportFactory.addServerDefinition(AuthMethod.KERBEROS

? ? ? ? ? .getMechanismName(), principalParts[0], principalParts[1],

? ? ? ? ? ? ? ServerConfig.SASL_PROPERTIES, new GSSCallback(conf));

? ? ? transportFactory = saslTransportFactory;

? ? } else {

? ? ? transportFactory = new TTransportFactory();

? ? }

? ? TThreadPoolServer.Args args = new TThreadPoolServer.Args(

? ? ? ? serverTransport).processor(processor)

? ? ? ? .transportFactory(transportFactory)

? ? ? ? .protocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))

? ? ? ? .minWorkerThreads(minThreads).maxWorkerThreads(maxThreads);

? ? thriftServer = new TThreadPoolServer(args);

? ? LOGGER.info("Serving on {}", address);

? ? startSentryWebServer();

? ? // thriftServer.serve() does not return until thriftServer is stopped. Need to log before

? ? // calling thriftServer.serve()

? ? LOGGER.info("Sentry service is ready to serve client requests");

? ? // Allow clients/users watching the console to know when sentry is ready

? ? System.out.println("Sentry service is ready to serve client requests");

? ? SentryStateBank.enableState(SentryServiceState.COMPONENT, SentryServiceState.SERVICE_RUNNING);

? ? thriftServer.serve();

? }


? ? 這段代碼其他的不太重要,主要的看看,接受到RPC請求后怎么處理。既然采用了Thrift框架,必然對應(yīng)的有根據(jù)IDL生成處理函數(shù)。processor就是處理函數(shù),進(jìn)一步去看看processFactories,從配置文件中可以看到采用的這個processorFactory:org.apache.sentry.api.service.thrift.SentryPolicyStoreProcessorFactory,進(jìn)去看看里面干了啥?



public SentryPolicyStoreProcessorFactory(Configuration conf) {

? ? super(conf);

? }

? public boolean register(TMultiplexedProcessor multiplexedProcessor,

? ? ? ? ? ? ? ? ? ? ? ? ? SentryStoreInterface sentryStore) throws Exception {

? ? SentryPolicyStoreProcessor sentryServiceHandler =

? ? ? ? new SentryPolicyStoreProcessor(SentryPolicyServiceConstants.SENTRY_POLICY_SERVICE_NAME,

? ? ? ? ? ? conf, sentryStore);

? ? TProcessor processor =

? ? ? new SentryProcessorWrapper<SentryPolicyService.Iface>(sentryServiceHandler);

? ? multiplexedProcessor.registerProcessor(

? ? ? SentryPolicyServiceConstants.SENTRY_POLICY_SERVICE_NAME, processor);

? ? return true;

? }



? ? 這個類在注冊時,指定具體的處理類:SentryPolicyStoreProcessor,這個類里面都是thrift生成的每個接口的處理函數(shù),如:drop_sentry_role,這個接口時刪除角色的RPC接口。之后的邏輯,大家可以看具體的函數(shù)了,再次我不進(jìn)一步分享了


5. 總結(jié)

? ? 今天主要時跟大家簡單分享了下sentry的背景和原理,以及對讀源碼進(jìn)行一件簡單的引路。本人能力有限,提出了粗鄙的見解,有不足之處,請大家指出,分享交流!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容