为新加入IAM Group的用户自动打标签

本文介绍如何通过 CloudTrail + EventBridge + Lambda 实现 IAM User 加入 Group 时自动打标签。由于 IAM Group 不支持标签,手动为大量用户设置 S3 ABAC 权限标签效率低下。方案通过监听 AddUserToGroup/RemoveUserFromGroup 事件自动触发 Lambda,完成标签的添加与清除,并进一步使用 DynamoDB 存储映射关系,实现无需改代码即可动态调整标签规则。

一、背景

由于IAM Group不支持设置标签,因此使用IAM Group来管理大量IAM User时,当需要给IAM User打标签就需要手动进行。本过程可以使用AWS服务进行自动化,通过通过 CloudTrail + EventBridge + Lambda 实现自动化,当用户被加入某个 Group 时,Lambda 自动为该用户设置对应的标签。

本文以使用 S3 ABAC 标签管理权限的场景为例,为加入IAM Group的IAM User自动添加标签。


二、原理解析

1、整体流程

IAM AddUserToGroup API 调用
        │
        ▼
CloudTrail 记录事件
        │
        ▼
EventBridge 规则匹配事件
        │
        ▼
Lambda 函数触发
        │
        ├── 读取事件中的 GroupName 和 UserName
        ├── 根据 Group 名称查询标签映射表
        └── 调用 iam:TagUser 为用户设置 s3-rw-access 和/或 s3-ro-access 标签

同理,当用户从 Group 中移除时(RemoveUserFromGroup),Lambda 可以自动清除对应标签。

2、前提条件

  • 账户中已启用 CloudTrail(至少有一个 Trail 记录管理事件)
  • CloudTrail 默认记录 IAM 管理事件(AddUserToGroupRemoveUserFromGroup),无需额外配置
  • IAM 事件通过 CloudTrail 发送到 EventBridge 的 default event bus,事件来源为 aws.iam

参考文档:IAM events delivered via CloudTrail

三、配置过程

部署前请先替换以下变量,后续所有命令可直接复制执行:

export ACCOUNT_ID="123456789012"    # 替换为您的 AWS Account ID
export REGION="us-east-1"           # 必须在 us-east-1(因为 CloudTrail 的 IAM 事件仅在此区域生成)

1、创建 Lambda 执行角色和权限策略

# 创建 IAM Role(Trust Policy 允许 Lambda 服务 assume)
aws iam create-role \
  --role-name Lambda-IAM-AutoTag-Role \
  --assume-role-policy-document '{
    "Version": "2012-10-17",
    "Statement": [{
      "Effect": "Allow",
      "Principal": {"Service": "lambda.amazonaws.com"},
      "Action": "sts:AssumeRole"
    }]
  }'

# 创建权限策略(包含 iam:TagUser/UntagUser/ListUserTags/ListGroupsForUser 和 CloudWatch Logs)
aws iam create-policy \
  --policy-name Lambda-IAM-AutoTag-Policy \
  --policy-document '{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Sid": "AllowTagUser",
        "Effect": "Allow",
        "Action": ["iam:TagUser", "iam:UntagUser", "iam:ListUserTags", "iam:ListGroupsForUser"],
        "Resource": "arn:aws:iam::*:user/*"
      },
      {
        "Sid": "AllowLogs",
        "Effect": "Allow",
        "Action": ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"],
        "Resource": "arn:aws:logs:*:*:*"
      }
    ]
  }'

# 附加 Policy 到 Role
aws iam attach-role-policy \
  --role-name Lambda-IAM-AutoTag-Role \
  --policy-arn arn:aws:iam::${ACCOUNT_ID}:policy/Lambda-IAM-AutoTag-Policy

由此创建IAM Role成功。

2、创建 Lambda 函数

使用 heredoc 生成 Python 代码文件、打包并部署,无需事先编辑文件:

# 生成 Lambda 函数代码
cat << 'PYTHON_EOF' > lambda_function.py
import json
import boto3
import os
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

iam_client = boto3.client('iam')

# Group 名称到标签的映射表
# Key 是 IAM Group 名称
# Value 是一个 dict,包含该 Group 对应的 s3-rw-access 和/或 s3-ro-access 标签值
# 根据实际环境修改
GROUP_TAG_MAPPING = {
    "group-01": {
        "s3-rw-access": "group-01",   # group-01 的用户对 s3-rw-access=group-01 的桶有读写权限
        "s3-ro-access": "group-01",   # group-01 的用户对 s3-ro-access=group-01 的桶有只读权限
    },
    "group-02": {
        "s3-rw-access": "group-02",
        "s3-ro-access": "group-02",
    },
    "group-03": {
        "s3-rw-access": "group-03",
        "s3-ro-access": "group-03",
    },
    # ... 按需添加更多 Group
}

# 也可以通过环境变量传入映射表(JSON 格式)
# GROUP_TAG_MAPPING = json.loads(os.environ.get('GROUP_TAG_MAPPING', '{}'))

def lambda_handler(event, context):
    logger.info(f"Received event: {json.dumps(event)}")

    detail = event.get('detail', {})
    event_name = detail.get('eventName')
    request_params = detail.get('requestParameters', {})

    group_name = request_params.get('groupName')
    user_name = request_params.get('userName')

    if not group_name or not user_name:
        logger.error(f"Missing groupName or userName in event: {request_params}")
        return {"statusCode": 400, "body": "Missing parameters"}

    logger.info(f"Event: {event_name}, Group: {group_name}, User: {user_name}")

    if event_name == 'AddUserToGroup':
        return handle_add_user_to_group(user_name, group_name)
    elif event_name == 'RemoveUserFromGroup':
        return handle_remove_user_from_group(user_name, group_name)
    else:
        logger.warning(f"Unexpected event: {event_name}")
        return {"statusCode": 200, "body": "No action taken"}

def handle_add_user_to_group(user_name, group_name):
    """用户加入 Group 时,自动设置对应的 s3-rw-access 和 s3-ro-access 标签"""
    tag_mapping = GROUP_TAG_MAPPING.get(group_name)

    if not tag_mapping:
        logger.info(f"Group {group_name} not in mapping table, skipping")
        return {"statusCode": 200, "body": f"Group {group_name} not mapped"}

    try:
        tags = []
        for key, value in tag_mapping.items():
            tags.append({'Key': key, 'Value': value})

        iam_client.tag_user(UserName=user_name, Tags=tags)

        tag_str = ", ".join([f"{k}={v}" for k, v in tag_mapping.items()])
        logger.info(f"Tagged user {user_name} with {tag_str}")
        return {
            "statusCode": 200,
            "body": f"Tagged {user_name} with {tag_str}"
        }
    except Exception as e:
        logger.error(f"Failed to tag user {user_name}: {str(e)}")
        raise

def handle_remove_user_from_group(user_name, group_name):
    """用户从 Group 移除时,检查是否需要清除标签"""
    tag_mapping = GROUP_TAG_MAPPING.get(group_name)

    if not tag_mapping:
        logger.info(f"Group {group_name} not in mapping table, skipping")
        return {"statusCode": 200, "body": f"Group {group_name} not mapped"}

    try:
        # 获取用户当前所属的所有 Group
        response = iam_client.list_groups_for_user(UserName=user_name)
        remaining_groups = [g['GroupName'] for g in response['Groups']]

        # 检查每个标签键,判断是否还有其他 Group 提供相同的标签
        tags_to_remove = []
        for tag_key, tag_value in tag_mapping.items():
            still_needed = False
            for g in remaining_groups:
                other_mapping = GROUP_TAG_MAPPING.get(g, {})
                if other_mapping.get(tag_key) == tag_value:
                    still_needed = True
                    break

            if not still_needed:
                tags_to_remove.append(tag_key)

        if tags_to_remove:
            iam_client.untag_user(UserName=user_name, TagKeys=tags_to_remove)
            logger.info(f"Removed tags {tags_to_remove} from user {user_name}")
        else:
            logger.info(f"All tags still needed for user {user_name}, no removal")

        return {
            "statusCode": 200,
            "body": f"Processed tag removal for {user_name}"
        }
    except Exception as e:
        logger.error(f"Failed to process user {user_name}: {str(e)}")
        raise
PYTHON_EOF

# 打包并部署 Lambda 函数
zip function.zip lambda_function.py

aws lambda create-function \
  --function-name IAM-Group-AutoTag \
  --runtime python3.12 \
  --handler lambda_function.lambda_handler \
  --role arn:aws:iam::${ACCOUNT_ID}:role/Lambda-IAM-AutoTag-Role \
  --zip-file fileb://function.zip \
  --timeout 30 \
  --region ${REGION}

# 清理临时文件
rm -f lambda_function.py function.zip

创建Lambda后建议等待1分钟,待函数创建完成,再继续操作。

3、创建 EventBridge 规则并关联 Lambda

# 创建 EventBridge 规则
aws events put-rule \
  --name "IAM-Group-Membership-Change" \
  --description "Auto-tag IAM users when added to or removed from groups" \
  --event-pattern '{
    "source": ["aws.iam"],
    "detail-type": ["AWS API Call via CloudTrail"],
    "detail": {
      "eventSource": ["iam.amazonaws.com"],
      "eventName": ["AddUserToGroup", "RemoveUserFromGroup"]
    }
  }' \
  --state ENABLED \
  --region ${REGION}

# 添加 Lambda 权限,允许 EventBridge 调用
aws lambda add-permission \
  --function-name IAM-Group-AutoTag \
  --statement-id EventBridgeInvoke \
  --action lambda:InvokeFunction \
  --principal events.amazonaws.com \
  --source-arn arn:aws:events:${REGION}:${ACCOUNT_ID}:rule/IAM-Group-Membership-Change \
  --region ${REGION}

# 将 Lambda 设置为规则的 Target
aws events put-targets \
  --rule "IAM-Group-Membership-Change" \
  --targets "Id"="1","Arn"="arn:aws:lambda:${REGION}:${ACCOUNT_ID}:function:IAM-Group-AutoTag" \
  --region ${REGION}

四、测试验证

首先新创建用户和组,然后将用户加入组,并验证是否触发自动打标签行为。

# 创建测试用的 IAM Group(如果已存在可跳过)
aws iam create-group --group-name group-01

# 创建测试用的 IAM User(如果已存在可跳过)
aws iam create-user --user-name user01

# 将 user01 加入 group-01
aws iam add-user-to-group --user-name user01 --group-name group-01

# 等待约 1-2 分钟(CloudTrail 事件传递到 EventBridge 有短暂延迟)
# 验证标签是否自动设置
aws iam list-user-tags --user-name user01

预期输出:

{
    "Tags": [
        {
            "Key": "s3-rw-access",
            "Value": "group-01"
        },
        {
            "Key": "s3-ro-access",
            "Value": "group-01"
        }
    ]
}

查看 Lambda 日志确认执行情况:

aws logs tail /aws/lambda/IAM-Group-AutoTag --follow --region <region>

五、注意事项

  1. IAM 是全局服务:IAM API 调用的 CloudTrail 事件只在 us-east-1 区域生成。因此直接在 us-east-1 创建 EventBridge 规则和 Lambda 函数。

  2. 事件延迟:CloudTrail 事件传递到 EventBridge 通常在 1 分钟以内(best effort delivery),不是实时的。用户加入 Group 后,标签可能需要1分钟才会自动设置。

  3. 多 Group 场景:如果一个用户同时属于多个 Group,当前代码会为每次 AddUserToGroup 事件分别设置标签。后加入的 Group 的标签会覆盖先前的标签。因此,如果需要不要在User上增加多个Group标签,而是在要访问的S3 Bucket上使用ABAC策略,为S3 Bucket打多个Group标签即可。

  4. 幂等性iam:TagUser 操作是幂等的,重复设置相同标签不会报错,只会覆盖。

  5. 错误处理:Lambda 函数执行失败时,EventBridge 默认会重试。建议配置 Dead Letter Queue(DLQ)捕获持续失败的事件。

六、方案改进:使用 DynamoDB 存储IAM Group和IAM User的标签映射表

1、标签设计

在以上方案中,要为IAM User添加的Tag字段是硬编码的,用户进入组之后,会被添加固定的标签。假如有多个IAM Group,希望用户进入特定的组之后自动加载特定的标签,那么可以使用DynamoDB来存储这种映射关系。

假设标签设计如下。

第一组:

  • IAM Group名称:group-01
  • 标签名称/值:s3-rw-access/group-01
  • 标签名称/值:s3-ro-access/group-01

第二组:

  • IAM Group名称:group-02
  • 标签名称/值:s3-rw-access/group-02
  • 标签名称/值:s3-ro-access/group-02

这里可以看到,组名和标签的值始终是完全对应的,这样代表本组内的所有用户都将获得一致的权限。

2、创建DynamoDB库

配置钱先替换以下变量,后续所有命令可直接复制执行:

export ACCOUNT_ID="123456789012"    # 替换为您的 AWS Account ID
export REGION="us-east-1"           # 必须在 us-east-1(因为 CloudTrail 的 IAM 事件仅在此区域生成)

接下来创建DynamoDB表,用于存储 IAM Group 到标签的映射关系。表结构设计如下:

  • 分区键(Partition Key):GroupName(String)— 对应 IAM Group 名称
  • 属性 Tags(Map)— 存储该 Group 对应的标签键值对,Lambda 读取后直接用于 iam:TagUser

根据章节 1 的标签设计,group-01 和 group-02 各有两个标签(s3-rw-accesss3-ro-access),标签值与组名一致。

# 创建 DynamoDB 表
aws dynamodb create-table \
  --table-name IAM-Group-Tag-Mapping \
  --attribute-definitions AttributeName=GroupName,AttributeType=S \
  --key-schema AttributeName=GroupName,KeyType=HASH \
  --billing-mode PAY_PER_REQUEST \
  --region ${REGION}

# 等待表创建完成
aws dynamodb wait table-exists \
  --table-name IAM-Group-Tag-Mapping \
  --region ${REGION}

# 写入 group-01 的映射数据(s3-rw-access=group-01, s3-ro-access=group-01)
aws dynamodb put-item \
  --table-name IAM-Group-Tag-Mapping \
  --item '{
    "GroupName": {"S": "group-01"},
    "Tags": {"M": {
      "s3-rw-access": {"S": "group-01"},
      "s3-ro-access": {"S": "group-01"}
    }}
  }' \
  --region ${REGION}

# 写入 group-02 的映射数据(s3-rw-access=group-02, s3-ro-access=group-02)
aws dynamodb put-item \
  --table-name IAM-Group-Tag-Mapping \
  --item '{
    "GroupName": {"S": "group-02"},
    "Tags": {"M": {
      "s3-rw-access": {"S": "group-02"},
      "s3-ro-access": {"S": "group-02"}
    }}
  }' \
  --region ${REGION}

如需新增更多 Group 的映射,按相同格式执行 put-item 即可,Tags Map 中的键值对与标签设计保持一致。

验证写入结果:

# 查询 group-01 的映射
aws dynamodb get-item \
  --table-name IAM-Group-Tag-Mapping \
  --key '{"GroupName": {"S": "group-01"}}' \
  --region ${REGION}

# 查询 group-02 的映射
aws dynamodb get-item \
  --table-name IAM-Group-Tag-Mapping \
  --key '{"GroupName": {"S": "group-02"}}' \
  --region ${REGION}

上一步创建好的 Lambda 代码采用硬编码映射表,现在需要改为从 DynamoDB 读取映射规则。修改分三步:补充 IAM 权限、更新 Lambda 代码、部署新版本。

3、为 Lambda 执行角色补充 DynamoDB 读取权限

章节三创建的 Lambda-IAM-AutoTag-Policy 只包含 IAM 和 CloudWatch Logs 权限,需要追加 DynamoDB GetItem 权限。创建一个新的 Inline Policy 附加到现有 Role 上:

aws iam put-role-policy \
  --role-name Lambda-IAM-AutoTag-Role \
  --policy-name Lambda-DynamoDB-Read-Policy \
  --policy-document '{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Sid": "AllowDynamoDBRead",
        "Effect": "Allow",
        "Action": ["dynamodb:GetItem"],
        "Resource": "arn:aws:dynamodb:'"${REGION}"':'"${ACCOUNT_ID}"':table/IAM-Group-Tag-Mapping"
      }
    ]
  }'

不返回错误就表示执行成功。

4、更新 Lambda 函数代码

cat << 'PYTHON_EOF' > lambda_function.py
import json
import boto3
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

iam_client = boto3.client('iam')
dynamodb_client = boto3.client('dynamodb')

DYNAMODB_TABLE_NAME = 'IAM-Group-Tag-Mapping'


def get_tags_from_dynamodb(group_name):
    """从 DynamoDB 查询指定 Group 的标签映射"""
    try:
        response = dynamodb_client.get_item(
            TableName=DYNAMODB_TABLE_NAME,
            Key={'GroupName': {'S': group_name}}
        )
        item = response.get('Item')
        if not item or 'Tags' not in item:
            logger.info(f"Group {group_name} not found in DynamoDB or has no Tags attribute")
            return None

        # 解析 Tags Map:{"s3-rw-access": {"S": "group-01"}, "s3-ro-access": {"S": "group-01"}}
        tags_map = item['Tags'].get('M', {})
        tags = {}
        for key, value in tags_map.items():
            tags[key] = value.get('S', '')

        logger.info(f"Retrieved tags for group {group_name}: {tags}")
        return tags

    except Exception as e:
        logger.error(f"Failed to query DynamoDB for group {group_name}: {str(e)}")
        raise


def lambda_handler(event, context):
    logger.info(f"Received event: {json.dumps(event)}")

    detail = event.get('detail', {})
    event_name = detail.get('eventName')
    request_params = detail.get('requestParameters', {})

    group_name = request_params.get('groupName')
    user_name = request_params.get('userName')

    if not group_name or not user_name:
        logger.error(f"Missing groupName or userName in event: {request_params}")
        return {"statusCode": 400, "body": "Missing parameters"}

    logger.info(f"Event: {event_name}, Group: {group_name}, User: {user_name}")

    if event_name == 'AddUserToGroup':
        return handle_add_user_to_group(user_name, group_name)
    elif event_name == 'RemoveUserFromGroup':
        return handle_remove_user_from_group(user_name, group_name)
    else:
        logger.warning(f"Unexpected event: {event_name}")
        return {"statusCode": 200, "body": "No action taken"}


def handle_add_user_to_group(user_name, group_name):
    """用户加入 Group 时,从 DynamoDB 读取标签映射并设置到用户上"""
    tag_mapping = get_tags_from_dynamodb(group_name)

    if not tag_mapping:
        return {"statusCode": 200, "body": f"Group {group_name} not mapped in DynamoDB"}

    try:
        tags = [{'Key': k, 'Value': v} for k, v in tag_mapping.items()]
        iam_client.tag_user(UserName=user_name, Tags=tags)

        tag_str = ", ".join([f"{k}={v}" for k, v in tag_mapping.items()])
        logger.info(f"Tagged user {user_name} with {tag_str}")
        return {"statusCode": 200, "body": f"Tagged {user_name} with {tag_str}"}

    except Exception as e:
        logger.error(f"Failed to tag user {user_name}: {str(e)}")
        raise


def handle_remove_user_from_group(user_name, group_name):
    """用户从 Group 移除时,检查是否需要清除标签"""
    tag_mapping = get_tags_from_dynamodb(group_name)

    if not tag_mapping:
        return {"statusCode": 200, "body": f"Group {group_name} not mapped in DynamoDB"}

    try:
        # 获取用户当前所属的所有 Group
        response = iam_client.list_groups_for_user(UserName=user_name)
        remaining_groups = [g['GroupName'] for g in response['Groups']]

        # 逐个检查被移除 Group 的标签,判断是否还有其他 Group 提供相同标签
        tags_to_remove = []
        for tag_key, tag_value in tag_mapping.items():
            still_needed = False
            for g in remaining_groups:
                other_mapping = get_tags_from_dynamodb(g)
                if other_mapping and other_mapping.get(tag_key) == tag_value:
                    still_needed = True
                    break

            if not still_needed:
                tags_to_remove.append(tag_key)

        if tags_to_remove:
            iam_client.untag_user(UserName=user_name, TagKeys=tags_to_remove)
            logger.info(f"Removed tags {tags_to_remove} from user {user_name}")
        else:
            logger.info(f"All tags still needed for user {user_name}, no removal")

        return {"statusCode": 200, "body": f"Processed tag removal for {user_name}"}

    except Exception as e:
        logger.error(f"Failed to process user {user_name}: {str(e)}")
        raise
PYTHON_EOF

执行如下命令打包并更新 Lambda 函数

# 打包代码
zip function.zip lambda_function.py

# 更新已有的 Lambda 函数代码
aws lambda update-function-code \
  --function-name IAM-Group-AutoTag \
  --zip-file fileb://function.zip \
  --region ${REGION}

# 清理临时文件
rm -f lambda_function.py function.zip

5、验证更新

更新完成后,可以通过将用户加入 Group 来触发测试:

# 创建测试用的 IAM Group(如果已存在可跳过)
aws iam create-group --group-name group-02
# 创建测试用的 IAM User(如果已存在可跳过)
aws iam create-user --user-name user02

# 将 user02 加入 group-02
aws iam add-user-to-group --user-name user02 --group-name group-02

# 等待 1-2 分钟后验证标签
aws iam list-user-tags --user-name user02

预期输出与章节四一致,标签值来自 DynamoDB 而非硬编码:

{
    "Tags": [
        {
            "Key": "s3-rw-access",
            "Value": "group-02"
        },
        {
            "Key": "s3-ro-access",
            "Value": "group-02"
        }
    ]
}

后续如需新增或修改 Group 的标签映射,只需更新 DynamoDB 表中的记录,无需重新部署 Lambda 代码。

七、参考文档


最后修改于 2026-04-08