Python Minifier
Transforms Python source code into it's most compact representation.
Documentation
GitHub
Original Source
12239 Bytes
""" Lambda function backing cloudformation certificate resource Post-minification, this module must be less than 4KiB. """ import copy import hashlib import json import logging import time from boto3 import client from botocore.exceptions import ClientError, ParamValidationError from botocore.vendored import requests logger = logging.getLogger() logger.setLevel(logging.INFO) log_info = logger.info log_exception = logger.exception json_dumps = json.dumps shallow_copy = copy.copy sleep = time.sleep def handler(event, c, /): """ Cloudformation custom resource handler :param event: lambda event payload :param c: lambda execution context """ get_remaining_time_in_millis = c.get_remaining_time_in_millis log_info(event) def request_cert(): """ Create a certificate This create an ACM certificate and update the event payload with the PhysicalResourceId. The certificate will not yet be issued. """ api_request = shallow_copy(props) for key in ['ServiceToken', 'Region', 'Tags', 'Route53RoleArn']: api_request.pop(key, None) if 'ValidationMethod' in props: if props['ValidationMethod'] == 'DNS': # Check that we have all the hosted zone information we need to validate # before we create the certificate for name in set([props['DomainName']] + props.get('SubjectAlternativeNames', [])): get_zone_for(name) del api_request['DomainValidationOptions'] event['PhysicalResourceId'] = acm.request_certificate( IdempotencyToken=i_token, **api_request )['CertificateArn'] add_tags() def delete_certificate(a): """ Delete a certificate Attempts to delete a certificate. :param str a: Arn of the certificate to delete """ while True: try: acm.delete_certificate(**{'CertificateArn': a}) return except ClientError as exception: log_exception('') err_code = exception.response['Error']['Code'] if err_code == 'ResourceInUseException': if get_remaining_time_in_millis() / 1000 30: cert = acm.describe_certificate(**{'CertificateArn': event['PhysicalResourceId']})['Certificate'] log_info(cert) if cert['Status'] == 'ISSUED': return True elif cert['Status'] == 'FAILED': raise RuntimeError(cert.get('FailureReason', '')) sleep(5) return False def replace_cert(): """ Does the update require replacement of the certificate? Only tags can be updated without replacement :rtype: bool """ old = shallow_copy(event['Old' + 'ResourceProperties']) old.pop('Tags', None) new = shallow_copy(event['ResourceProperties']) new.pop('Tags', None) return old != new def validate(): """ Add DNS validation records for a certificate """ if 'ValidationMethod' in props and props['ValidationMethod'] == 'DNS': done = False while not done: done = True cert = acm.describe_certificate(**{'CertificateArn': event['PhysicalResourceId']})['Certificate'] log_info(cert) if cert['Status'] != 'PENDING_VALIDATION': return for validation_option in cert['DomainValidationOptions']: if 'ValidationStatus' not in validation_option or 'ResourceRecord' not in validation_option: done = False continue if validation_option['ValidationStatus'] == 'PENDING_VALIDATION': hosted_zone = get_zone_for(validation_option['DomainName']) role_arn = hosted_zone.get('Route53RoleArn', props.get('Route53RoleArn')) sts = client('sts').assume_role( RoleArn=role_arn, RoleSessionName=('Certificate' + event['LogicalResourceId'])[:64], DurationSeconds=900, )['Credentials'] if role_arn is not None else {} route53 = client('route53', aws_access_key_id=sts.get('AccessKeyId'), aws_secret_access_key=sts.get('SecretAccessKey'), aws_session_token=sts.get('SessionToken'), ).change_resource_record_sets(**{ 'HostedZoneId': hosted_zone['HostedZoneId'], 'ChangeBatch': { 'Comment': 'Domain validation for ' + event['PhysicalResourceId'], 'Changes': [{ 'Action': 'UPSERT', 'ResourceRecordSet': { 'Name': validation_option['ResourceRecord']['Name'], 'Type': validation_option['ResourceRecord']['Type'], 'TTL': 60, 'ResourceRecords': [{'Value': validation_option['ResourceRecord']['Value']}], }, }], }}, ) log_info(route53) sleep(1) def get_zone_for(n): """ Return the hosted zone to use for validating a name :param str n: The name to validate :rtype: dict """ n = n.rstrip('.') zones = {domain['DomainName'].rstrip('.'): domain for domain in props['DomainValidationOptions']} parts = n.split('.') while len(parts): if '.'.join(parts) in zones: return zones['.'.join(parts)] parts = parts[1:] raise RuntimeError('DomainValidationOptions' + ' missing' + ' for ' + n) hash = lambda v: hashlib.new('md5', json_dumps(v, sort_keys=True).encode()).hexdigest() def add_tags(): """ Add tags from the ResourceProperties to the Certificate Also adds logical-id, stack-id, stack-name and properties tags, which are used by the custom resource. """ tags = shallow_copy(event['ResourceProperties'].get('Tags', [])) tags += [ {'Key': 'cloudformation:' + 'logical-id', 'Value': event['LogicalResourceId']}, {'Key': 'cloudformation:' + 'stack-id', 'Value': event['StackId']}, {'Key': 'cloudformation:' + 'stack-name', 'Value': event['StackId'].split('/')[1]}, {'Key': 'cloudformation:' + 'properties', 'Value': hash(event['ResourceProperties'])} ] acm.add_tags_to_certificate(**{'CertificateArn': event['PhysicalResourceId'], 'Tags': tags}) def send_response(): """ Send a response to cloudformation """ log_info(event) response = requests.put(event['ResponseURL'], json=event, headers={'content-type': ''}) response.raise_for_status() try: i_token = hash(event['RequestId'] + event['StackId']) props = event['ResourceProperties'] acm = client('acm', region_name=props.get('Region')) event['Status'] = 'SUCCESS' if event['RequestType'] == 'Create': if event.get('Reinvoked', False) is False: event['PhysicalResourceId'] = 'None' request_cert() validate() if not wait_for_issuance(): return reinvoke() elif event['RequestType'] == 'Delete': if event['PhysicalResourceId'] != 'None': if event['PhysicalResourceId'].startswith('arn:'): delete_certificate(event['PhysicalResourceId']) else: delete_certificate(find_certificate(props)) elif event['RequestType'] == 'Update': if replace_cert(): log_info('Update') if find_certificate(props) == event['PhysicalResourceId']: # This is an update cancel request. # Try and delete the new certificate that is no longer required try: acm = client('acm', region_name=event['OldResourceProperties'].get('Region')) log_info('Delete') delete_certificate(find_certificate(event['OldResourceProperties'])) except: log_exception('') # return success for the update - nothing changed return send_response() if event.get('Reinvoked', False) is False: request_cert() validate() if not wait_for_issuance(): return reinvoke() else: if 'Tags' in event['Old' + 'ResourceProperties']: acm.remove_tags_from_certificate(**{ 'CertificateArn': event['PhysicalResourceId'], 'Tags': event['Old' + 'ResourceProperties']['Tags'] }) add_tags() else: raise RuntimeError(event['RequestType']) return send_response() except Exception as ex: log_exception('') event['Status'] = 'FAILED' event['Reason'] = str(ex) return send_response()
Options
Combine Import statements
Remove Pass statements
Remove literal statements (docstrings)
Hoist Literals
Rename Locals
Preserved Locals:
Rename Globals
Preserved Globals:
Convert Positional-Only Arguments to Normal Arguments
Preserve any shebang line
Remove assert statements
Remove if statements conditional on __debug__ being True
Remove return statements that are not required
Remove brackets when raising builtin exceptions
Evaluate constant expressions
Remove annotations
Remove variable annotations
Remove function return annotations
Remove function argument annotations
Remove class attribute annotations
Target python:
Python 3.12
Python 3.11 or below
Minify!
Minified Source
4065 Bytes
R=RuntimeError import copy,hashlib,json,logging as B,time from boto3 import client as L from botocore.exceptions import ClientError as A2,ParamValidationError as A3 from botocore.vendored import requests as A4 A=B.getLogger() A.setLevel(B.INFO) D=A.info Y=A.exception j=json.dumps Q=copy.copy Z=time.sleep def handler(A,k): A1='OldResourceProperties';A0='Update';z='Delete';y='None';x='acm';w='FAILED';v='properties';u='stack-id';t='logical-id';s='DNS';i='Old';h='Certificate';g='LogicalResourceId';f='DomainName';e='Route53RoleArn';d='Region';X='RequestType';W='Reinvoked';V='StackId';U='DomainValidationOptions';T='ValidationMethod';S=None;P='Status';O='Key';N='';M=True;K='ResourceProperties';J='cloudformation:';I='Value';H=False;G='CertificateArn';F='Tags';C='PhysicalResourceId';l=k.get_remaining_time_in_millis;D(A) def m(): D=Q(B) for H in['ServiceToken',d,F,e]:D.pop(H,S) if T in B: if B[T]==s: for I in set([B[f]]+B.get('SubjectAlternativeNames',[])):q(I) del D[U] A[C]=E.request_certificate(IdempotencyToken=A6,**D)[G];r() def a(a): while M: try:E.delete_certificate(**{G:a});return except A2 as B: Y(N);A=B.response['Error']['Code'] if A=='ResourceInUseException': if l()/100030: B=E.describe_certificate(**{G:A[C]})[h];D(B) if B[P]=='ISSUED':return M elif B[P]==w:raise R(B.get('FailureReason',N)) Z(5) return H def A5():B=Q(A[i+K]);B.pop(F,S);C=Q(A[K]);C.pop(F,S);return B!=C def p(): a='Type';Y='Name';X='HostedZoneId';W='ValidationStatus';V='PENDING_VALIDATION';J='ResourceRecord' if T in B and B[T]==s: K=H while not K: K=M;N=E.describe_certificate(**{G:A[C]})[h];D(N) if N[P]!=V:return for F in N[U]: if W not in F or J not in F:K=H;continue if F[W]==V:Q=q(F[f]);R=Q.get(e,B.get(e));O=L('sts').assume_role(RoleArn=R,RoleSessionName=(h+A[g])[:64],DurationSeconds=900)['Credentials']if R is not S else{};b=L('route53',aws_access_key_id=O.get('AccessKeyId'),aws_secret_access_key=O.get('SecretAccessKey'),aws_session_token=O.get('SessionToken')).change_resource_record_sets(**{X:Q[X],'ChangeBatch':{'Comment':'Domain validation for '+A[C],'Changes':[{'Action':'UPSERT','ResourceRecordSet':{Y:F[J][Y],a:F[J][a],'TTL':60,'ResourceRecords':[{I:F[J][I]}]}}]}});D(b) Z(1) def q(n): C='.';n=n.rstrip(C);D={A[f].rstrip(C):A for A in B[U]};A=n.split(C) while len(A): if C.join(A)in D:return D[C.join(A)] A=A[1:] raise R(U+' missing'+' for '+n) hash=lambda v:hashlib.new('md5',j(v,sort_keys=M).encode()).hexdigest() def r():B=Q(A[K].get(F,[]));B+=[{O:J+t,I:A[g]},{O:J+u,I:A[V]},{O:J+'stack-name',I:A[V].split('/')[1]},{O:J+v,I:hash(A[K])}];E.add_tags_to_certificate(**{G:A[C],F:B}) def c():D(A);B=A4.put(A['ResponseURL'],json=A,headers={'content-type':N});B.raise_for_status() try: A6=hash(A['RequestId']+A[V]);B=A[K];E=L(x,region_name=B.get(d));A[P]='SUCCESS' if A[X]=='Create': if A.get(W,H)is H:A[C]=y;m() p() if not o():return n() elif A[X]==z: if A[C]!=y: if A[C].startswith('arn:'):a(A[C]) else:a(b(B)) elif A[X]==A0: if A5(): D(A0) if b(B)==A[C]: try:E=L(x,region_name=A[A1].get(d));D(z);a(b(A[A1])) except:Y(N) return c() if A.get(W,H)is H:m() p() if not o():return n() else: if F in A[i+K]:E.remove_tags_from_certificate(**{G:A[C],F:A[i+K][F]}) r() else:raise R(A[X]) return c() except Exception as A7:Y(N);A[P]=w;A['Reason']=str(A7);return c()
Copy