为新加入IAM Group的用户自动打标签
本文介绍如何通过 CloudTrail + EventBridge + Lambda 实现 IAM User 加入 Group 时自动打标签。由于 IAM Group 不支持标签,手动为大量用户设置 S3 ABAC 权限标签效率低下。方案通过监听 AddUserToGroup/RemoveUserFromGroup 事件自动触发 Lambda,完成标签的添加与清除,并进一步使用 DynamoDB 存储映射关系,实现无需改代码即可动态调整标签规则。
一、背景
由于IAM Group不支持设置标签,因此使用IAM Group来管理大量IAM User时,当需要给IAM User打标签就需要手动进行。本过程可以使用AWS服务进行自动化,通过通过 CloudTrail + EventBridge + Lambda 实现自动化,当用户被加入某个 Group 时,Lambda 自动为该用户设置对应的标签。
本文以使用 S3 ABAC 标签管理权限的场景为例,为加入IAM Group的IAM User自动添加标签。
二、原理解析
1、整体流程
IAM AddUserToGroup API 调用
│
▼
CloudTrail 记录事件
│
▼
EventBridge 规则匹配事件
│
▼
Lambda 函数触发
│
├── 读取事件中的 GroupName 和 UserName
├── 根据 Group 名称查询标签映射表
└── 调用 iam:TagUser 为用户设置 s3-rw-access 和/或 s3-ro-access 标签
同理,当用户从 Group 中移除时(RemoveUserFromGroup),Lambda 可以自动清除对应标签。
2、前提条件
- 账户中已启用 CloudTrail(至少有一个 Trail 记录管理事件)
- CloudTrail 默认记录 IAM 管理事件(
AddUserToGroup、RemoveUserFromGroup),无需额外配置 - IAM 事件通过 CloudTrail 发送到 EventBridge 的 default event bus,事件来源为
aws.iam
参考文档:IAM events delivered via CloudTrail
三、配置过程
部署前请先替换以下变量,后续所有命令可直接复制执行:
export ACCOUNT_ID="123456789012" # 替换为您的 AWS Account ID
export REGION="us-east-1" # 必须在 us-east-1(因为 CloudTrail 的 IAM 事件仅在此区域生成)
1、创建 Lambda 执行角色和权限策略
# 创建 IAM Role(Trust Policy 允许 Lambda 服务 assume)
aws iam create-role \
--role-name Lambda-IAM-AutoTag-Role \
--assume-role-policy-document '{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "lambda.amazonaws.com"},
"Action": "sts:AssumeRole"
}]
}'
# 创建权限策略(包含 iam:TagUser/UntagUser/ListUserTags/ListGroupsForUser 和 CloudWatch Logs)
aws iam create-policy \
--policy-name Lambda-IAM-AutoTag-Policy \
--policy-document '{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowTagUser",
"Effect": "Allow",
"Action": ["iam:TagUser", "iam:UntagUser", "iam:ListUserTags", "iam:ListGroupsForUser"],
"Resource": "arn:aws:iam::*:user/*"
},
{
"Sid": "AllowLogs",
"Effect": "Allow",
"Action": ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"],
"Resource": "arn:aws:logs:*:*:*"
}
]
}'
# 附加 Policy 到 Role
aws iam attach-role-policy \
--role-name Lambda-IAM-AutoTag-Role \
--policy-arn arn:aws:iam::${ACCOUNT_ID}:policy/Lambda-IAM-AutoTag-Policy
由此创建IAM Role成功。
2、创建 Lambda 函数
使用 heredoc 生成 Python 代码文件、打包并部署,无需事先编辑文件:
# 生成 Lambda 函数代码
cat << 'PYTHON_EOF' > lambda_function.py
import json
import boto3
import os
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
iam_client = boto3.client('iam')
# Group 名称到标签的映射表
# Key 是 IAM Group 名称
# Value 是一个 dict,包含该 Group 对应的 s3-rw-access 和/或 s3-ro-access 标签值
# 根据实际环境修改
GROUP_TAG_MAPPING = {
"group-01": {
"s3-rw-access": "group-01", # group-01 的用户对 s3-rw-access=group-01 的桶有读写权限
"s3-ro-access": "group-01", # group-01 的用户对 s3-ro-access=group-01 的桶有只读权限
},
"group-02": {
"s3-rw-access": "group-02",
"s3-ro-access": "group-02",
},
"group-03": {
"s3-rw-access": "group-03",
"s3-ro-access": "group-03",
},
# ... 按需添加更多 Group
}
# 也可以通过环境变量传入映射表(JSON 格式)
# GROUP_TAG_MAPPING = json.loads(os.environ.get('GROUP_TAG_MAPPING', '{}'))
def lambda_handler(event, context):
logger.info(f"Received event: {json.dumps(event)}")
detail = event.get('detail', {})
event_name = detail.get('eventName')
request_params = detail.get('requestParameters', {})
group_name = request_params.get('groupName')
user_name = request_params.get('userName')
if not group_name or not user_name:
logger.error(f"Missing groupName or userName in event: {request_params}")
return {"statusCode": 400, "body": "Missing parameters"}
logger.info(f"Event: {event_name}, Group: {group_name}, User: {user_name}")
if event_name == 'AddUserToGroup':
return handle_add_user_to_group(user_name, group_name)
elif event_name == 'RemoveUserFromGroup':
return handle_remove_user_from_group(user_name, group_name)
else:
logger.warning(f"Unexpected event: {event_name}")
return {"statusCode": 200, "body": "No action taken"}
def handle_add_user_to_group(user_name, group_name):
"""用户加入 Group 时,自动设置对应的 s3-rw-access 和 s3-ro-access 标签"""
tag_mapping = GROUP_TAG_MAPPING.get(group_name)
if not tag_mapping:
logger.info(f"Group {group_name} not in mapping table, skipping")
return {"statusCode": 200, "body": f"Group {group_name} not mapped"}
try:
tags = []
for key, value in tag_mapping.items():
tags.append({'Key': key, 'Value': value})
iam_client.tag_user(UserName=user_name, Tags=tags)
tag_str = ", ".join([f"{k}={v}" for k, v in tag_mapping.items()])
logger.info(f"Tagged user {user_name} with {tag_str}")
return {
"statusCode": 200,
"body": f"Tagged {user_name} with {tag_str}"
}
except Exception as e:
logger.error(f"Failed to tag user {user_name}: {str(e)}")
raise
def handle_remove_user_from_group(user_name, group_name):
"""用户从 Group 移除时,检查是否需要清除标签"""
tag_mapping = GROUP_TAG_MAPPING.get(group_name)
if not tag_mapping:
logger.info(f"Group {group_name} not in mapping table, skipping")
return {"statusCode": 200, "body": f"Group {group_name} not mapped"}
try:
# 获取用户当前所属的所有 Group
response = iam_client.list_groups_for_user(UserName=user_name)
remaining_groups = [g['GroupName'] for g in response['Groups']]
# 检查每个标签键,判断是否还有其他 Group 提供相同的标签
tags_to_remove = []
for tag_key, tag_value in tag_mapping.items():
still_needed = False
for g in remaining_groups:
other_mapping = GROUP_TAG_MAPPING.get(g, {})
if other_mapping.get(tag_key) == tag_value:
still_needed = True
break
if not still_needed:
tags_to_remove.append(tag_key)
if tags_to_remove:
iam_client.untag_user(UserName=user_name, TagKeys=tags_to_remove)
logger.info(f"Removed tags {tags_to_remove} from user {user_name}")
else:
logger.info(f"All tags still needed for user {user_name}, no removal")
return {
"statusCode": 200,
"body": f"Processed tag removal for {user_name}"
}
except Exception as e:
logger.error(f"Failed to process user {user_name}: {str(e)}")
raise
PYTHON_EOF
# 打包并部署 Lambda 函数
zip function.zip lambda_function.py
aws lambda create-function \
--function-name IAM-Group-AutoTag \
--runtime python3.12 \
--handler lambda_function.lambda_handler \
--role arn:aws:iam::${ACCOUNT_ID}:role/Lambda-IAM-AutoTag-Role \
--zip-file fileb://function.zip \
--timeout 30 \
--region ${REGION}
# 清理临时文件
rm -f lambda_function.py function.zip
创建Lambda后建议等待1分钟,待函数创建完成,再继续操作。
3、创建 EventBridge 规则并关联 Lambda
# 创建 EventBridge 规则
aws events put-rule \
--name "IAM-Group-Membership-Change" \
--description "Auto-tag IAM users when added to or removed from groups" \
--event-pattern '{
"source": ["aws.iam"],
"detail-type": ["AWS API Call via CloudTrail"],
"detail": {
"eventSource": ["iam.amazonaws.com"],
"eventName": ["AddUserToGroup", "RemoveUserFromGroup"]
}
}' \
--state ENABLED \
--region ${REGION}
# 添加 Lambda 权限,允许 EventBridge 调用
aws lambda add-permission \
--function-name IAM-Group-AutoTag \
--statement-id EventBridgeInvoke \
--action lambda:InvokeFunction \
--principal events.amazonaws.com \
--source-arn arn:aws:events:${REGION}:${ACCOUNT_ID}:rule/IAM-Group-Membership-Change \
--region ${REGION}
# 将 Lambda 设置为规则的 Target
aws events put-targets \
--rule "IAM-Group-Membership-Change" \
--targets "Id"="1","Arn"="arn:aws:lambda:${REGION}:${ACCOUNT_ID}:function:IAM-Group-AutoTag" \
--region ${REGION}
四、测试验证
首先新创建用户和组,然后将用户加入组,并验证是否触发自动打标签行为。
# 创建测试用的 IAM Group(如果已存在可跳过)
aws iam create-group --group-name group-01
# 创建测试用的 IAM User(如果已存在可跳过)
aws iam create-user --user-name user01
# 将 user01 加入 group-01
aws iam add-user-to-group --user-name user01 --group-name group-01
# 等待约 1-2 分钟(CloudTrail 事件传递到 EventBridge 有短暂延迟)
# 验证标签是否自动设置
aws iam list-user-tags --user-name user01
预期输出:
{
"Tags": [
{
"Key": "s3-rw-access",
"Value": "group-01"
},
{
"Key": "s3-ro-access",
"Value": "group-01"
}
]
}
查看 Lambda 日志确认执行情况:
aws logs tail /aws/lambda/IAM-Group-AutoTag --follow --region <region>
五、注意事项
-
IAM 是全局服务:IAM API 调用的 CloudTrail 事件只在
us-east-1区域生成。因此直接在us-east-1创建 EventBridge 规则和 Lambda 函数。 -
事件延迟:CloudTrail 事件传递到 EventBridge 通常在 1 分钟以内(best effort delivery),不是实时的。用户加入 Group 后,标签可能需要1分钟才会自动设置。
-
多 Group 场景:如果一个用户同时属于多个 Group,当前代码会为每次
AddUserToGroup事件分别设置标签。后加入的 Group 的标签会覆盖先前的标签。因此,如果需要不要在User上增加多个Group标签,而是在要访问的S3 Bucket上使用ABAC策略,为S3 Bucket打多个Group标签即可。 -
幂等性:
iam:TagUser操作是幂等的,重复设置相同标签不会报错,只会覆盖。 -
错误处理:Lambda 函数执行失败时,EventBridge 默认会重试。建议配置 Dead Letter Queue(DLQ)捕获持续失败的事件。
六、方案改进:使用 DynamoDB 存储IAM Group和IAM User的标签映射表
1、标签设计
在以上方案中,要为IAM User添加的Tag字段是硬编码的,用户进入组之后,会被添加固定的标签。假如有多个IAM Group,希望用户进入特定的组之后自动加载特定的标签,那么可以使用DynamoDB来存储这种映射关系。
假设标签设计如下。
第一组:
- IAM Group名称:group-01
- 标签名称/值:s3-rw-access/group-01
- 标签名称/值:s3-ro-access/group-01
第二组:
- IAM Group名称:group-02
- 标签名称/值:s3-rw-access/group-02
- 标签名称/值:s3-ro-access/group-02
这里可以看到,组名和标签的值始终是完全对应的,这样代表本组内的所有用户都将获得一致的权限。
2、创建DynamoDB库
配置钱先替换以下变量,后续所有命令可直接复制执行:
export ACCOUNT_ID="123456789012" # 替换为您的 AWS Account ID
export REGION="us-east-1" # 必须在 us-east-1(因为 CloudTrail 的 IAM 事件仅在此区域生成)
接下来创建DynamoDB表,用于存储 IAM Group 到标签的映射关系。表结构设计如下:
- 分区键(Partition Key):
GroupName(String)— 对应 IAM Group 名称 - 属性
Tags(Map)— 存储该 Group 对应的标签键值对,Lambda 读取后直接用于iam:TagUser
根据章节 1 的标签设计,group-01 和 group-02 各有两个标签(s3-rw-access 和 s3-ro-access),标签值与组名一致。
# 创建 DynamoDB 表
aws dynamodb create-table \
--table-name IAM-Group-Tag-Mapping \
--attribute-definitions AttributeName=GroupName,AttributeType=S \
--key-schema AttributeName=GroupName,KeyType=HASH \
--billing-mode PAY_PER_REQUEST \
--region ${REGION}
# 等待表创建完成
aws dynamodb wait table-exists \
--table-name IAM-Group-Tag-Mapping \
--region ${REGION}
# 写入 group-01 的映射数据(s3-rw-access=group-01, s3-ro-access=group-01)
aws dynamodb put-item \
--table-name IAM-Group-Tag-Mapping \
--item '{
"GroupName": {"S": "group-01"},
"Tags": {"M": {
"s3-rw-access": {"S": "group-01"},
"s3-ro-access": {"S": "group-01"}
}}
}' \
--region ${REGION}
# 写入 group-02 的映射数据(s3-rw-access=group-02, s3-ro-access=group-02)
aws dynamodb put-item \
--table-name IAM-Group-Tag-Mapping \
--item '{
"GroupName": {"S": "group-02"},
"Tags": {"M": {
"s3-rw-access": {"S": "group-02"},
"s3-ro-access": {"S": "group-02"}
}}
}' \
--region ${REGION}
如需新增更多 Group 的映射,按相同格式执行 put-item 即可,Tags Map 中的键值对与标签设计保持一致。
验证写入结果:
# 查询 group-01 的映射
aws dynamodb get-item \
--table-name IAM-Group-Tag-Mapping \
--key '{"GroupName": {"S": "group-01"}}' \
--region ${REGION}
# 查询 group-02 的映射
aws dynamodb get-item \
--table-name IAM-Group-Tag-Mapping \
--key '{"GroupName": {"S": "group-02"}}' \
--region ${REGION}
上一步创建好的 Lambda 代码采用硬编码映射表,现在需要改为从 DynamoDB 读取映射规则。修改分三步:补充 IAM 权限、更新 Lambda 代码、部署新版本。
3、为 Lambda 执行角色补充 DynamoDB 读取权限
章节三创建的 Lambda-IAM-AutoTag-Policy 只包含 IAM 和 CloudWatch Logs 权限,需要追加 DynamoDB GetItem 权限。创建一个新的 Inline Policy 附加到现有 Role 上:
aws iam put-role-policy \
--role-name Lambda-IAM-AutoTag-Role \
--policy-name Lambda-DynamoDB-Read-Policy \
--policy-document '{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowDynamoDBRead",
"Effect": "Allow",
"Action": ["dynamodb:GetItem"],
"Resource": "arn:aws:dynamodb:'"${REGION}"':'"${ACCOUNT_ID}"':table/IAM-Group-Tag-Mapping"
}
]
}'
不返回错误就表示执行成功。
4、更新 Lambda 函数代码
cat << 'PYTHON_EOF' > lambda_function.py
import json
import boto3
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
iam_client = boto3.client('iam')
dynamodb_client = boto3.client('dynamodb')
DYNAMODB_TABLE_NAME = 'IAM-Group-Tag-Mapping'
def get_tags_from_dynamodb(group_name):
"""从 DynamoDB 查询指定 Group 的标签映射"""
try:
response = dynamodb_client.get_item(
TableName=DYNAMODB_TABLE_NAME,
Key={'GroupName': {'S': group_name}}
)
item = response.get('Item')
if not item or 'Tags' not in item:
logger.info(f"Group {group_name} not found in DynamoDB or has no Tags attribute")
return None
# 解析 Tags Map:{"s3-rw-access": {"S": "group-01"}, "s3-ro-access": {"S": "group-01"}}
tags_map = item['Tags'].get('M', {})
tags = {}
for key, value in tags_map.items():
tags[key] = value.get('S', '')
logger.info(f"Retrieved tags for group {group_name}: {tags}")
return tags
except Exception as e:
logger.error(f"Failed to query DynamoDB for group {group_name}: {str(e)}")
raise
def lambda_handler(event, context):
logger.info(f"Received event: {json.dumps(event)}")
detail = event.get('detail', {})
event_name = detail.get('eventName')
request_params = detail.get('requestParameters', {})
group_name = request_params.get('groupName')
user_name = request_params.get('userName')
if not group_name or not user_name:
logger.error(f"Missing groupName or userName in event: {request_params}")
return {"statusCode": 400, "body": "Missing parameters"}
logger.info(f"Event: {event_name}, Group: {group_name}, User: {user_name}")
if event_name == 'AddUserToGroup':
return handle_add_user_to_group(user_name, group_name)
elif event_name == 'RemoveUserFromGroup':
return handle_remove_user_from_group(user_name, group_name)
else:
logger.warning(f"Unexpected event: {event_name}")
return {"statusCode": 200, "body": "No action taken"}
def handle_add_user_to_group(user_name, group_name):
"""用户加入 Group 时,从 DynamoDB 读取标签映射并设置到用户上"""
tag_mapping = get_tags_from_dynamodb(group_name)
if not tag_mapping:
return {"statusCode": 200, "body": f"Group {group_name} not mapped in DynamoDB"}
try:
tags = [{'Key': k, 'Value': v} for k, v in tag_mapping.items()]
iam_client.tag_user(UserName=user_name, Tags=tags)
tag_str = ", ".join([f"{k}={v}" for k, v in tag_mapping.items()])
logger.info(f"Tagged user {user_name} with {tag_str}")
return {"statusCode": 200, "body": f"Tagged {user_name} with {tag_str}"}
except Exception as e:
logger.error(f"Failed to tag user {user_name}: {str(e)}")
raise
def handle_remove_user_from_group(user_name, group_name):
"""用户从 Group 移除时,检查是否需要清除标签"""
tag_mapping = get_tags_from_dynamodb(group_name)
if not tag_mapping:
return {"statusCode": 200, "body": f"Group {group_name} not mapped in DynamoDB"}
try:
# 获取用户当前所属的所有 Group
response = iam_client.list_groups_for_user(UserName=user_name)
remaining_groups = [g['GroupName'] for g in response['Groups']]
# 逐个检查被移除 Group 的标签,判断是否还有其他 Group 提供相同标签
tags_to_remove = []
for tag_key, tag_value in tag_mapping.items():
still_needed = False
for g in remaining_groups:
other_mapping = get_tags_from_dynamodb(g)
if other_mapping and other_mapping.get(tag_key) == tag_value:
still_needed = True
break
if not still_needed:
tags_to_remove.append(tag_key)
if tags_to_remove:
iam_client.untag_user(UserName=user_name, TagKeys=tags_to_remove)
logger.info(f"Removed tags {tags_to_remove} from user {user_name}")
else:
logger.info(f"All tags still needed for user {user_name}, no removal")
return {"statusCode": 200, "body": f"Processed tag removal for {user_name}"}
except Exception as e:
logger.error(f"Failed to process user {user_name}: {str(e)}")
raise
PYTHON_EOF
执行如下命令打包并更新 Lambda 函数
# 打包代码
zip function.zip lambda_function.py
# 更新已有的 Lambda 函数代码
aws lambda update-function-code \
--function-name IAM-Group-AutoTag \
--zip-file fileb://function.zip \
--region ${REGION}
# 清理临时文件
rm -f lambda_function.py function.zip
5、验证更新
更新完成后,可以通过将用户加入 Group 来触发测试:
# 创建测试用的 IAM Group(如果已存在可跳过)
aws iam create-group --group-name group-02
# 创建测试用的 IAM User(如果已存在可跳过)
aws iam create-user --user-name user02
# 将 user02 加入 group-02
aws iam add-user-to-group --user-name user02 --group-name group-02
# 等待 1-2 分钟后验证标签
aws iam list-user-tags --user-name user02
预期输出与章节四一致,标签值来自 DynamoDB 而非硬编码:
{
"Tags": [
{
"Key": "s3-rw-access",
"Value": "group-02"
},
{
"Key": "s3-ro-access",
"Value": "group-02"
}
]
}
后续如需新增或修改 Group 的标签映射,只需更新 DynamoDB 表中的记录,无需重新部署 Lambda 代码。
七、参考文档
- IAM events delivered via CloudTrail to EventBridge
- Amazon EventBridge rules
- AWS Lambda - Creating functions
- IAM TagUser API Reference
- IAM UntagUser API Reference
- IAM ListGroupsForUser API Reference
- Boto3 IAM Client - tag_user
- Boto3 DynamoDB Client - get_item
- Amazon DynamoDB Developer Guide
- AWS CloudTrail User Guide
最后修改于 2026-04-08