1. 背景
在https://blog.51cto.com/u_15327484/8153877文章中,介绍了在Java中,客户端通过JAAS框架向AS认证获取TGT,再通过GSSAPI on SASL获取service ticket并向服务端进行认证。
Hadoop中整合Kerberos安全认证机制,当HDFS客户端访问NameNode服务端时,HDFS客户端先获取TGT,再获取service ticket并向NameNode进行认证。
本文不会继续深究Kerberos自身的机制,而是探索Hadoop封装Kerberos进行通信的框架逻辑。
2. Hadoop封装Kerberos类
Hadoop中,将Kerberos信息封装在UserGroupInformation类中。其中,包含keytab、认证用户、Subject等成员:
private static UserGroupInformation loginUser = null;
private static String keytabPrincipal = null;
private static String keytabFile = null;
//Kerberos信息
private final Subject subject;
// All non-static fields must be read-only caches that come from the subject.
private final User user;
private final boolean isKeytab;
private final boolean isKrbTkt;
private static String OS_LOGIN_MODULE_NAME;
private static Class<? extends Principal> OS_PRINCIPAL_CLASS;
注意,Subject是JAAS框架中的类,它用于保存principal和TGT等信息:
Set<Principal> principals;
transient Set<Object> pubCredentials;
transient Set<Object> privCredentials;
3. Hadoop基于UserGroupInformation的认证流程
先从使用场景开始,以下是org.apache.hadoop.fs.FileSystem#get
方法。FileSystem#get
提供了多种重载方法,以以下方法为例。它用于创建HDFS客户端,查找并创建进最符合指定user的UGI,基于UGI进行kerberos认证:
public static FileSystem get(final URI uri, final Configuration conf,
final String user) throws IOException, InterruptedException {
String ticketCachePath =
conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
UserGroupInformation ugi =
UserGroupInformation.getBestUGI(ticketCachePath, user);
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws IOException {
return get(uri, conf);
}
});
}
UserGroupInformation.getBestUGI()方法开始进行kerberos认证,获取用户所属UGI,它依次进行以下尝试:
- 如果本地磁盘存储了该用户对应的TGT,则从本地磁盘中读取。
- 如果输入的用户为空,直接使用当前登陆的用户创建UGI。
- 如果输入的用户不为空,创建AuthMethod.SIMPLE类型的UGI,即不使用Kerberos认证。
public static UserGroupInformation getBestUGI(
String ticketCachePath, String user) throws IOException {
if (ticketCachePath != null) {
return getUGIFromTicketCache(ticketCachePath, user);
} else if (user == null) {
return getCurrentUser();
} else {
return createRemoteUser(user);
}
}
首先:在认证kerberos时,先尝试从本地磁盘中获取TGT。如下所示:当使用kinit进行认证后,linux会将TGT保存到本地的/tmp/krb5cc_{UID}
文件名中:
其次:执行UserGroupInformation.getCurrentUser()
方法开始进行认证,后续调用UserGroupInformation.doSubjectLogin
方法开始认证。
该方法会构建HadoopLoginContext
进行kerberos认证:
HadoopLoginContext login = newLoginContext(
authenticationMethod.getLoginAppName(), subject, loginConf);
login.login();
3.1 HadoopLoginContext初始化过程
HadoopLoginContext构造方法中,将认证方式封装成为AppConfigurationEntry,将AppConfigurationEntry封装成为ModuleInfo,即Hadoop中可能包含多种认证方式。如下所示,将所有认证方式封装成ModuleInfo:
// get the LoginModules configured for this application
AppConfigurationEntry[] entries = config.getAppConfigurationEntry(name);
moduleStack = new ModuleInfo[entries.length];
for (int i = 0; i < entries.length; i++) {
// clone returned array
moduleStack[i] = new ModuleInfo
(new AppConfigurationEntry
(entries[i].getLoginModuleName(),
entries[i].getControlFlag(),
entries[i].getOptions()),
null);
}
AppConfigurationEntry中每个认证方式的定义是在UserGroupInformation$HadoopConfiguration
中。如下所示,如果是SIMPLE认证,则使用OS_LOGIN_MODULE_NAME登陆;如果是kerberos认证,则使用KRB5_LOGIN_MODULE登陆;最后,增加HadoopLoginModule方式登陆:
private static final AppConfigurationEntry OS_SPECIFIC_LOGIN =
new AppConfigurationEntry(OS_LOGIN_MODULE_NAME,
LoginModuleControlFlag.REQUIRED,
BASIC_JAAS_OPTIONS);
private static final AppConfigurationEntry HADOOP_LOGIN =
new AppConfigurationEntry(HadoopLoginModule.class.getName(),
LoginModuleControlFlag.REQUIRED,
BASIC_JAAS_OPTIONS);
public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
ArrayList<AppConfigurationEntry> entries = new ArrayList<>();
// login of external subject passes no params. technically only
// existing credentials should be used but other components expect
// the login to succeed with local user fallback if no principal.
if (params == null || appName.equals(SIMPLE_CONFIG_NAME)) {
//不进行安全认证
entries.add(OS_SPECIFIC_LOGIN);
} else if (appName.equals(KERBEROS_CONFIG_NAME)) {
// existing semantics are the initial default login allows local user
// fallback. this is not allowed when a principal explicitly
// specified or during a relogin.
if (!params.containsKey(LoginParam.PRINCIPAL)) {
entries.add(OS_SPECIFIC_LOGIN);
}
//kerberos认证
entries.add(getKerberosEntry());
}
//这里添加了Hadooplogin的entry
entries.add(HADOOP_LOGIN);
return entries.toArray(new AppConfigurationEntry[0]);
3.2 HadoopLoginContext认证过程
执行HadoopLoginContext在执行login方法时,调用LoginContext.invoke方法,便利entrys中的认证方式,通过反射的方式执行不同实现类的认证:
for (int i = moduleIndex; i < moduleStack.length; i++, moduleIndex++) {
try {
int mIndex = 0;
Method[] methods = null;
if (moduleStack[i].module != null) {
methods = moduleStack[i].module.getClass().getMethods();
} else {
// instantiate the LoginModule
//
// Allow any object to be a LoginModule as long as it
// conforms to the interface.
Class<?> c = Class.forName(
moduleStack[i].entry.getLoginModuleName(),
true,
contextClassLoader);
Constructor<?> constructor = c.getConstructor(PARAMS);
Object[] args = { };
moduleStack[i].module = constructor.newInstance(args);
// call the LoginModule's initialize method
methods = moduleStack[i].module.getClass().getMethods();
methods[mIndex].invoke(moduleStack[i].module, initArgs);
}
随后,HadoopLoginContext.login先后调用LOGIN_METHOD和COMMIT_METHOD方法:
public void login() throws LoginException {
loginSucceeded = false;
if (subject == null) {
subject = new Subject();
}
try {
// module invoked in doPrivileged
invokePriv(LOGIN_METHOD);
invokePriv(COMMIT_METHOD);
loginSucceeded = true;
} catch (LoginException le) {
try {
invokePriv(ABORT_METHOD);
} catch (LoginException le2) {
throw le;
}
throw le;
}
}
最后认证的顺序如下所示:
Krb5LoginModule#login
HadoopLoginModule#login
Krb5LoginModule#commit
HadoopLoginModule#commit
3.3 HadoopLoginModule认证过程
认证时,HadoopLoginContext会先后调用HadoopLoginModule#login和HadoopLoginModule#commit方法。本节探索这两个方法。
login()方法不执行任何认证操作:
public boolean login() throws LoginException {
if (LOG.isDebugEnabled()) {
LOG.debug("hadoop login");
}
return true;
}
commit方法用于记录认证用户。认证用户获取流程如下:
- 如果使用kerberos进行认证,直接获取kerberos认证时的principal,即使用kerberos用户记录为当前HDFS客户端访问NameNode的用户。
- 如果没有使用kerberos进行认证,使用系统变量或者时配置文件中的HADOOP_USER_NAME值作为访问NameNode的用户。
- 如果没有配置HADOOP_USER_NAME,直接使用执行命令时的操作系统用户作为访问用户。
public boolean commit() throws LoginException {
if (LOG.isDebugEnabled()) {
LOG.debug("hadoop login commit");
}
// if we already have a user, we are done.
if (!subject.getPrincipals(User.class).isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("using existing subject:"+subject.getPrincipals());
}
return true;
}
//获取kerberos用户
Principal user = getCanonicalUser(KerberosPrincipal.class);
if (user != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("using kerberos user:"+user);
}
}
//If we don't have a kerberos user and security is disabled, check
//if user is specified in the environment or properties
if (!isSecurityEnabled() && (user == null)) {
//kerberos用户为空,就使用HADOOP_USER_NAME变量值
String envUser = System.getenv(HADOOP_USER_NAME);
if (envUser == null) {
envUser = System.getProperty(HADOOP_USER_NAME);
}
user = envUser == null ? null : new User(envUser);
}
//如果没有设置HADOOP_USER_NAME变量,就是用操作系统用户
// use the OS user
if (user == null) {
user = getCanonicalUser(OS_PRINCIPAL_CLASS);
if (LOG.isDebugEnabled()) {
LOG.debug("using local user:"+user);
}
}
// if we found the user, add our principal
if (user != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Using user: \"" + user + "\" with name " + user.getName());
}
User userEntry = null;
try {
// LoginContext will be attached later unless it's an external
// subject.
//根据user类型判断使用哪种真正方式
AuthenticationMethod authMethod = (user instanceof KerberosPrincipal)
? AuthenticationMethod.KERBEROS : AuthenticationMethod.SIMPLE;
userEntry = new User(user.getName(), authMethod, null);
} catch (Exception e) {
throw (LoginException)(new LoginException(e.toString()).initCause(e));
}
if (LOG.isDebugEnabled()) {
LOG.debug("User entry: \"" + userEntry.toString() + "\"" );
}
//将用户信息添加到subject中
subject.getPrincipals().add(userEntry);
return true;
}
LOG.error("Can't find user in " + subject);
throw new LoginException("Can't find user name");
}
Hadoop使用User实现了Principal接口,它保存了用户名,在subject中获取用户都通过User.class获取:
public User(String name, AuthenticationMethod authMethod, LoginContext login) {
try {
shortName = new HadoopKerberosName(name).getShortName();
} catch (IOException ioe) {
throw new IllegalArgumentException("Illegal principal name " + name
+": " + ioe.toString(), ioe);
}
fullName = name;
this.authMethod = authMethod;
this.login = login;
}
4. Hadoop代理机制
4.1 背景
在Hadoop体系中,有一些服务除了接收客户端的请求,服务还会访问其他组件。以Oozie为例,Oozie是一个Hadoop的作业平台。它接收不同用户的请求,执行作业,在作业中,可能会访问NameNode组件。如下所示:
在上述执行过程中,Oozie访问NameNode时的认证是难点。
这时因为,当User A访问Oozie服务端时,User A客户端会使用TGT获取service ticket向Oozie服务端进行认证。认证完后,Oozie不会保存User A的keytab信息。这时,在Oozie中有User A部署的作业,该作业中User A需要获取HDFS数据,由于Oozie服务端没有User A的keytab信息,因此不能向Hadoop认证。同时,也不能直接使用Oozie的TGT认证,因为Hadoop直接解析Oozie用户作业访问用户,鉴权也使用Oozie用户而不是User A用户,这明显不符合安全性设计。
同样在Yarn中也存在类似的情景:User A向RM提交作业,RM中没有存储User A的keytab,导致在RM服务中,无法使用User A的身份向NodeManager进行Kerberos认证,无法启动container。
为了解决这个问题,Hadoop提出了代理机制。它的基本思路是:Oozie服务端启动时,会使用Oozie的用户向其他组件进行kerberos。在User A访问Hadoop时,使用Oozie自身的TGT向Hadoop进行认证。不过会额外携带User A的信息,服务端完成认证后,直接解析User A作为执行用户,进行鉴权。
如下所示:使用SuperUser凭证认证User B,最终使用User B访问HDFS:
因此,代理机制定义如下:
代理机制由当前的系统用户/真实用户realuser/超级用户,如 hive/sqoop 等服务的进程对应的用户,代理最终的业务用户比如dap/cic等,对底层的 hdfs进行访问:
- 经过代理后,hdfs 进行权限校验时,是针对最终业务用户比如 zhangsan/lisi/wangwu,进行权限校验。
- 没有使用代理时,hdfs 进行权限校验时,是针对系统用户比如 hive/hue/sqoop,进行权限校验。
4.2 代理机制传输流程
在Oozie服务端,当User A访问NameNode时,会调用UserGroupInformation.createProxyUser方法创建代理用户。注意方法参数,可以看到user就是提交作业的用户,它没有TGT信息,只能是String类型;realUser是Oozie用户,它是服务端启动用户,包含TGT信息,因此是UserGroupInformation类型。
将作业提交用户的认真方法设置为AuthenticationMethod.PROXY,创建一个principal放在ugi中;realUser携带了TGT,直接放到ugi中:
public static UserGroupInformation createProxyUser(String user,
UserGroupInformation realUser) {
if (user == null || user.isEmpty()) {
throw new IllegalArgumentException("Null user");
}
if (realUser == null) {
throw new IllegalArgumentException("Null real user");
}
Subject subject = new Subject();
Set<Principal> principals = subject.getPrincipals();
//作业提交用户,
principals.add(new User(user, AuthenticationMethod.PROXY, null));
principals.add(new RealUser(realUser));
return new UserGroupInformation(subject);
}
注意,上述的user就是proto中定义的effectiveUser:
message UserInformationProto {
optional string effectiveUser = 1;
optional string realUser = 2;
}
org.apache.hadoop.ipc.Client连接NameNode时,使用的是realUser的ugi信息进行认证:
private synchronized void setupIOstreams(
AtomicBoolean fallbackToSimpleAuth) {
if (socket != null || shouldCloseConnection.get()) {
return;
}
UserGroupInformation ticket = remoteId.getTicket();
if (ticket != null) {
//获取realUser的UGI信息
final UserGroupInformation realUser = ticket.getRealUser();
if (realUser != null) {
ticket = realUser;
}
}
//省略
while (true) {
setupConnection(ticket);
ipcStreams = new IpcStreams(socket, maxResponseLength);
writeConnectionHeader(ipcStreams);
if (authProtocol == AuthProtocol.SASL) {
try {
//使用realUser的ugi信息进行认证
authMethod = ticket
.doAs(new PrivilegedExceptionAction<AuthMethod>() {
@Override
public AuthMethod run()
throws IOException, InterruptedException {
return setupSaslConnection(ipcStreams);
}
});
//省略
writeConnectionContext(remoteId, authMethod);
//省略
start();
return;
//省略
}
org.apache.hadoop.ipc.Server的processConnectionContext方法中,通过从客户端获取的数据流中拿到effective User和Real User,然后对Real User进行Kerberos认证:
//获取客户端发送的数据
connectionContext = getMessage(IpcConnectionContextProto.getDefaultInstance(), buffer);
//获取客户端执行的协议名称
protocolName = connectionContext.hasProtocol() ? connectionContext
.getProtocol() : null;
//获取客户端携带的ugi信息
UserGroupInformation protocolUser = ProtoUtil.getUgi(connectionContext);
// user is authenticated
user.setAuthenticationMethod(authMethod);
//Now we check if this is a proxy user case. If the protocol user is
//different from the 'user', it is a proxy user scenario. However,
//this is not allowed if user authenticated with DIGEST.
//如果客户端携带了ugi信息,并且ugi中的user和认证的user不一致,就是proxy认证
if ((protocolUser != null)
&& (!protocolUser.getUserName().equals(user.getUserName()))) {
if (authMethod == AuthMethod.TOKEN) {
// Not allowed to doAs if token authentication is used
//proxy认证就不允许认证方式是TOKEN
throw new FatalRpcServerException(
RpcErrorCodeProto.FATAL_UNAUTHORIZED,
new AccessControlException("Authenticated user (" + user
+ ") doesn't match what the client claims to be ("
+ protocolUser + ")"));
} else {
// Effective user can be different from authenticated user
// for simple auth or kerberos auth
// The user is the real user. Now we create a proxy user
//如果是proxy认证方式,将提交作业的用户作为effective user,服务端还原客户端创建的proxyuser
UserGroupInformation realUser = user;
user = UserGroupInformation.createProxyUser(protocolUser
.getUserName(), realUser);
}
}
//开始认证
authorizeConnection();
注意:上述方法的realUser是通过SaslServer.getAuthorizationID获取的,AuthorizationID就是认证的用户名,例如Oozie:
/**
* Reports the authorization ID in effect for the client of this
* session.
* This method can only be called if isComplete() returns true.
* @return The authorization ID of the client.
* @exception IllegalStateException if this authentication session has not completed
*/
public String getAuthorizationID();
服务端构建好客户端发送的user和RealUser后,开始验证。验证方法中,只使用RealUser进行验证,这是已经和effective user无关了:
//注意:这里的user对象就是客户端传过来的RealUser
private void authorizeConnection() throws RpcServerException {
try {
// If auth method is TOKEN, the token was obtained by the
// real user for the effective user, therefore not required to
// authorize real user. doAs is allowed only for simple or kerberos
// authentication
if (user != null && user.getRealUser() != null
&& (authMethod != AuthMethod.TOKEN)) {
ProxyUsers.authorize(user, this.getHostAddress());
}
authorize(user, protocolName, getHostInetAddress());
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully authorized " + connectionContext);
}
rpcMetrics.incrAuthorizationSuccesses();
} catch (AuthorizationException ae) {
LOG.info("Connection from " + this
+ " for protocol " + connectionContext.getProtocol()
+ " is unauthorized for user " + user);
rpcMetrics.incrAuthorizationFailures();
throw new FatalRpcServerException(
RpcErrorCodeProto.FATAL_UNAUTHORIZED, ae);
}
}
4.3 代理机制相关配置
core-site.xml文件:
hadoop.proxyuser.$superuser.hosts
:超级用户可以在哪些主机上执行代理。hadoop.proxyuser.$superuser.groups
:超级用户能为哪些用户组中的用户进行代理。hadoop.proxyuser.$superuser.users
: 超级用户可以代理哪些用户。
例如:
<property>
<name>hadoop.proxyuser.${SuperUserName}.hosts</name>
<value>${HostLists}</value>
</property>
<property>
<name>hadoop.proxyuser.${SuperUserName}.groups</name>
<value>${Groups}</value>
</property>
<!-- 说明 -->
<!-- ${SuperUserName} 为具有代理功能的用户,通常也是超级用户, 这意味着并不是每个用户都能成为代理用户 -->
<!-- ${HostLists} 为代理用户能正确完成代理功能的主机地址列表 -->
<!-- ${Groups} 为代理用户能代理的用户组, 也就是能为那些用户组中的用户进行代理 -->
<!-- 示例 -->
<!-- hadoop用户为代理用户, 可以为任意用户进行代理, 但仅在hive-server.hncscwc主机地址上能够正确完成代理工作 -->
<property>
<name>hadoop.proxyuser.hadoop.hosts</name>
<value>hive-server2.hncscwc</value>
</property>
<property>
<name>hadoop.proxyuser.hadoop.groups</name>
<value>*</value>
</property>
5. 扩展:多租户和多用户
多租户是一种软件架构,软件只有一个实例运行在服务器中,服务于多个租户。一个租户包含一组用户,他们拥有指定权限访问软件实例。租户通常指一个企业或者组织,对资源具有独占性和排它性。
多租户在数据存储上有三种方案:
- 独立数据库。一个租户一个数据库,这种隔离性最好。
- 共享数据库,独立Schema:在一个database里面,可以创建多schema,一个schema给一个租户对于若干表、是图、存储过程、索引的权限,其他租户无法查看。缺点:跨租户join很难。
- 共享数据库,共享schema,共享数据表,字段隔离。增加TenantId字段,用来表示该条数据属于哪一个租户。
为了实现多租户,提供资源的独占性效果。Hadoop提供:
- Kerberos+ranger提供最基础的认证+鉴权功能。保证资源在不同租户间具有隔离性。
- 对于存储资源,可以使用独立集群,或者共享集群+ranger鉴权实现资源隔离。
- Yarn不同的队列属于不同的租户,每个租户不能占用其他租户队列的资源,保证了资源隔离性。