c语言sscanf函数的用法是什么
213
2022-10-31
函数计算自动化运维实战1 -- 定时任务
函数计算
阿里云函数计算是一个事件驱动的全托管计算服务。通过函数计算,您无需管理服务器等基础设施,只需编写代码并上传。函数计算会为您准备好计算资源,以弹性、可靠的方式运行您的代码,并提供日志查询,性能监控,报警等功能。借助于函数计算,您可以快速构建任何类型的应用和服务,无需管理和运维。更棒的是,您只需要为代码实际运行消耗的资源付费,而代码未运行则不产生费用。
函数计算中的TimeTrigger
触发器是触发函数执行的方式。有时候您不想手动调用函数执行,您希望当某件事情发生时自动触发函数的执行,这个事情就是事件源。您可以通过配置触发器的方式设置事件源触发函数执行。
例如,设置定时触发器,可以在某个时间点触发函数执行或者每隔5分钟触发函数一次;函数计算timetrigger
专题传送门 => 函数计算进行自动化运维专题
定时任务自动化场景分析
定时任务示例场景1
某些账号 ak 需要定期更换,以确保 ak 安全;在下面的代码示例中,授权 service 具有访问 kms 权限的能力,使用 kms,先对一个具有创建和删除 ak 权限的ak加密密文解密,获取具有创建和删除 ak 权限的AK, 之后利用这个AK进行 ak 的创建和删除操作
说明: 除了使用kms加密解密来获取较大权限的AK, 通过函数计算环境变量的设置也是一种很好的方法
操作步骤
创建函数,函数创建可参考函数计算helloworld 注:记得给函数的 service 的 role 设置访问 kms 权限 给函数配置定时器,详情可参考定时触发函数
函数代码(函数计算已经内置了相关 sdk,直接使用下面的代码即可)
# -*- coding: utf-8 -*- import logging, time, json from aliyunsdkcore import client from aliyunsdkram.request.v20150501.CreateAccessKeyRequest import CreateAccessKeyRequest from aliyunsdkram.request.v20150501.DeleteAccessKeyRequest import DeleteAccessKeyRequest from aliyunsdkkms.request.v20160120.EncryptRequest import EncryptRequest from aliyunsdkkms.request.v20160120.DecryptRequest import DecryptRequest from aliyunsdkcore.auth.credentials import StsTokenCredential # ak Encrypt content AK_CiphertextBlob = "NmQyY2ZhODMtMTlhYS00MTNjLTlmZjAtZTQxYTFiYWVmMzZmM1B1NXhTZENCNXVWd1dhdTNMWVRvb3V6dU9QcVVlMXRBQUFBQUFBQUFBQ3gwZTkzeGhDdHVzMWhDUCtZeVVuMWlobzlCa3VxMlErOXFHWWdXXXHELLwL1NSZTFvUURYSW9lak5Hak1lMnF0R2I1TWUxMEJiYmkzVnBwZHlrWGYzc3kyK2tQbGlKb2lHQ3lrZUdieHN2eXZwSVYzN2Qyd1cydz09" USER_NAME = "ls-test" # sub-account name LOGGER = logging.getLogger() def handler(event, context): creds = context.credentials sts_token_credential = StsTokenCredential(creds.access_key_id, creds.access_key_secret, creds.security_token) # this demo ecs and function in same region, if not in same region, you need change region_id to your ecs instance's region_id clt = client.AcsClient(region_id=context.region, credential=sts_token_credential) request = DecryptRequest() request.set_CiphertextBlob(AK_CiphertextBlob) response = _send_request(clt, request) ak_info = json.loads(response.get("Plaintext","{}")) if not ak_info: return "KMS Decrypt ERROR" ak_id = ak_info["ak_id"] ak_secret = ak_info["ak_secret"] LOGGER.info("Decrypt sucessfully with key id: {}".format(response.get("KeyId","{}"))) clt2 = client.AcsClient(ak_id, ak_secret, context.region) request = CreateAccessKeyRequest() request.set_UserName(USER_NAME) # 给子账号ls-test创建AK response = _send_request(clt2, request) create_ak_id = response.get("AccessKey",{}).get("AccessKeyId") if not create_ak_id: return LOGGER.info("create ak {} sucess!".format(create_ak_id)) time.sleep(10) request = DeleteAccessKeyRequest() request.set_UserName(USER_NAME) request.set_UserAccessKeyId(create_ak_id) response = _send_request(clt2, request) LOGGER.info("delete ak {} sucess!".format(create_ak_id)) return "OK" # send open api request def _send_request(clt, request): request.set_accept_format('json') try: response_str = clt.do_action_with_exception(request) LOGGER.debug(response_str) response_detail = json.loads(response_str) return response_detail except Exception as e: LOGGER.error(e)
AK 存在环境变量版本
# -*- coding: utf-8 -*- import os, logging, time, json from aliyunsdkcore import client from aliyunsdkram.request.v20150501.CreateAccessKeyRequest import CreateAccessKeyRequest from aliyunsdkram.request.v20150501.DeleteAccessKeyRequest import DeleteAccessKeyRequest USER_NAME = "ls-test" # sub-account name LOGGER = logging.getLogger() def handler(event, context): ak_id = os.environ['AK_ID'] ak_secret = os.environ['AK_SECRET'] clt = client.AcsClient(ak_id, ak_secret, context.region) request = CreateAccessKeyRequest() request.set_UserName(USER_NAME) # 给子账号USER_NAME创建AK response = _send_request(clt, request) create_ak_id = response.get("AccessKey", "").get("AccessKeyId") if not create_ak_id: return LOGGER.info("create ak {} sucess!".format(create_ak_id)) time.sleep(5) request = DeleteAccessKeyRequest() request.set_UserName(USER_NAME) request.set_UserAccessKeyId(create_ak_id) response = _send_request(clt, request) LOGGER.info("delete ak {} sucess!".format(create_ak_id)) return "OK" # send open api request def _send_request(clt, request): request.set_accept_format('json') try: response_str = clt.do_action_with_exception(request) LOGGER.info(response_str) response_detail = json.loads(response_str) return response_detail except Exception as e: LOGGER.error(e)
定时任务示例场景2
定期检查自己 ecs 对应暴露的端口,确保安全,比如你的 ecs 是一个网站服务器,可能只需要对外暴露 80 端口就行,如果出现 0.0.0.0/0 这种允许所有人访问的,需要出现报警或者自动修复
操作步骤
创建函数,函数创建可参考函数计算helloworld 注:记得给函数的 service 的 role 设置管理 ecs 权限 给函数配置定时器,详情可参考定时触发函数 # -*- coding: utf-8 -*- import logging import json, random, string, time from aliyunsdkcore import client from aliyunsdkecs.request.v20140526.DescribeInstancesRequest import DescribeInstancesRequest from aliyunsdkecs.request.v20140526.DescribeSecurityGroupAttributeRequest import DescribeSecurityGroupAttributeRequest from aliyunsdkcore.auth.credentials import StsTokenCredential LOGGER = logging.getLogger() clt = None # 需要检查的ecs列表, 修改成你的ecs id 列表 ECS_INST_IDS = ["i-uf6h07zdscdg9g55zkxx", "i-uf6bwkxfxh847a1e2xxx"] def handler(event, context): creds = context.credentials global clt sts_token_credential = StsTokenCredential(creds.access_key_id, creds.access_key_secret, creds.security_token) # this demo ecs and function in same region, if not in same region, you need change region_id to your ecs instance's region_id clt = client.AcsClient(region_id=context.region, credential=sts_token_credential) invalid_perssions = {} for ecs_id in ECS_INST_IDS: ret = check_and_modify_security_rule(ecs_id) if ret: invalid_perssions[ecs_id] = ret return invalid_perssions def check_and_modify_security_rule(instance_id): LOGGER.info("check_and_modify_security_rule, instance_id is %s ", instance_id) request = DescribeInstancesRequest() request.set_InstanceIds(json.dumps([instance_id])) response = _send_request(request) SecurityGroupIds = [] if response is not None: instance_list = response.get('Instances', {}).get('Instance') for item in instance_list: SecurityGroupIds = item.get('SecurityGroupIds', {}).get("SecurityGroupId", []) break if not SecurityGroupIds: LOGGER.error("ecs {} do not have SecurityGroupIds".format(instance_id)) return invalid_perssions = [] for sg_id in SecurityGroupIds: request = DescribeSecurityGroupAttributeRequest() request.set_SecurityGroupId(sg_id) response = _send_request(request) LOGGER.info("Find a securityGroup id {}".format(sg_id)) permissions = response.get("Permissions", {}).get("Permission",[]) if not permissions: continue for permission in permissions: if permission["Direction"] == "ingress" and permission["SourceCidrIp"] == "0.0.0.0/0": LOGGER.error("ecs {0} , SecurityGroup id {1}, have a risk, need fix; permission = {2}".format(instance_id, sg_id, permission)) invalid_perssions.append(permission) return invalid_perssions # send open api request def _send_request(request): request.set_accept_format('json') try: response_str = clt.do_action_with_exception(request) LOGGER.debug(response_str) response_detail = json.loads(response_str) return response_detail except Exception as e: LOGGER.error(e)
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~