تکثیر خودکار داده ها از AWS S3 تا Microsoft Azure Storage Made Easy

ممکن است یکی از نیازهای کسب و کار شما باشد که مقدار زیادی داده را بطور دوره ای از یک ابر عمومی به ابر دیگر منتقل کنید. به طور خاص ، ممکن است با دستوراتی روبرو شوید که به راه حل چند ابر نیاز دارند. این مقاله شامل یک رویکرد به صورت خودکار تکثیر داده ها از AWS S3 Bucket در Microsoft Azure Blob Storage Container با استفاده از Amazon S3 Inventory ، Amazon S3 Batch Operations ، Fargate و AzCopy است.

سناریو

شرکت شما هر روز پس از فشرده سازی ، پرونده های CSV جدید را در محل با اندازه کلی حدود 100 گیگابایت تولید می کند. اندازه همه پرونده ها 1–2 گیگابایت است و باید هر شب در یک ساعت مشخص بین 3 تا 5 صبح در Amazon S3 بارگذاری شوند. مشاغل شما تصمیم گرفته است پس از تمام پرونده های بارگذاری شده در S3 ، آن فایل های CSV را از S3 در Microsoft Azure Storage کپی کند. شما باید راهی آسان و سریع برای خودکار کردن گردش کار تکثیر داده ها پیدا کنید.

برای انجام این کار ، ما می توانیم یک خط لوله داده بسازیم تا داده ها را به صورت دوره ای از S3 به Azure Storage با استفاده از AWS کپی کند داده Wrangler ، Amazon S3 Inventory ، Amazon S3 Batch Operations ، Athena ، Fargate و AzCopy. div class = “image-container”>

آنچه را که ما پوشش خواهیم داد:

  • ایجاد VPC با زیر شبکه های خصوصی و عمومی ، نقاط انتهایی S3 و دروازه NAT .
  • یک حساب ذخیره سازی Azure و یک ظرف حباب ایجاد کنید ، یک رمز SAS ایجاد کنید ، سپس یک قانون فایروال اضافه کنید تا از AWS VPC به Azure Storage دسترسی داشته باشید.
  • پیکربندی گزارش های موجودی S3 روزانه در سطل S3.
  • از Athena برای استفاده استفاده کنید فقط اشیا جدید را از گزارش موجودی S3 فیلتر کنید و نام سطل و کلیدهای اشیا those آن اشیا to را به یک فایل مانیفست CSV صادر کنید.
  • برای ایجاد یک شغل کپی SUT Batch Operations S3 از اسناد صادر شده CSV استفاده کنید یک سطل مقصد S3 با قانون انقضا سیاست چرخه زندگی پیکربندی شده است.
  • برای اجرای وظیفه Fargate که از همه اشیا with با همان پیشوند در سطل مقصد در Azure Storage کپی می کند ، از قانون Eventbridge استفاده کنید.

پیش نیازها

  • راه اندازی یک حساب AWS
  • راه اندازی یک حساب Azure
  • نصب جدیدترین AWS-CLI
  • AWS CDK-CLI را نصب کنید
  • درک اساسی AWS CDK
  • درک اساسی داکر

شروع کنیم! < h2> ایجاد سطلهای منبع و مقصد S3

ما از CDK برای ایجاد زیرساخت های خود بر اساس AWS استفاده می کنیم. ابتدا ، بیایید یک سطل منبع ایجاد کنیم تا پرونده ها را از ارائه دهندگان خارجی یا موارد قبلی دریافت کنیم و گزارش های موجودی روزانه را تنظیم کنیم که لیستی از پرونده ها و ابرداده های شما را با یک پرونده تخت فراهم می کند.

بعدی ، یک سطل مقصد به عنوان ذخیره سازی موقت با قانون انقضاiration سیاست چرخه حیات که در پیشوند / tmp_transition پیکربندی شده ایجاد کنید همه پرونده های دارای پیشوند (به عنوان مثال

/tmp_transition/file1.csv) در Azure کپی می شوند و پس از 24 ساعت با سیاست چرخه عمر حذف می شوند.

برای ایجاد سطل های S3 از کد زیر استفاده کنید.

از aws_cdk import ( aws_s3 as s3 ، هسته،
) s3_destination = s3.Bucket (خود ، “dataBucketInventory” ، قوانین چرخه عمر = [ { ‘انقضا’ : core.Duration.days ( 1.0 ) ، ‘پیشوند’ : ‘tmp_transition’ } ، ]) s3_source = s3.Bucket (خود ، “demoDataBucket” ، bucket_name = self.s3_source_bucket_name ، رمزگذاری = s3.BucketEncryption.S3_MANAGED ، موجودی = [ { “فرکانس” : s3.InventoryFrequency.DAILY ، “include_object_versions” : s3.InventoryObjectVersion.CURRENT ، “مقصد” : { “سطل” : s3_destination } } ])

ایجاد AWS VPC

بعد ، ما باید VPC را با دو زیرشبکه عمومی و خصوصی ، NAT Gateway ، نقطه پایانی S3 ایجاد کنیم و خط مشی پایانی را ضمیمه کنیم که اجازه می دهد دسترسی به ظرف فارگیت که سطل S3 به آن از Azure کپی می کنیم.

اکنون VPC و منابع مربوطه خود را با استفاده از کد زیر تعریف کنید.

از aws_cdk وارد کردن ( aws_ec2 as ec2 ، هسته،
)
vpc = ec2.Vpc (خود ، “demoVPC” ، max_azs = 2 ، cidr = “10.0.0.0/16” ، nat_gateways = 1 ، subnet_configuration = [{ “cidrMask” : 24 ، “name” : “private” ، “subnetType” : ec2.SubnetType.PRIVATE } ، { “cidrMask” : 24 ، “نام” : “عمومی” ، “subnetType” : ec2.SubnetType.PUBLIC }] )
زیر شبکه = vpc.select_subnets ( subnet_type = ec2.SubnetType.PRIVATE). زیرشبکه ها
endpoint = vpc.add_gateway_endpoint ( ‘s3Endpoint’ ، service = ec2.GatewayVpcEndpointAwsService.S3 ، زیر شبکه = [{ “subnet_id” : زیر شبکه [ 0 ] .subnet_id } ، { “subnet_id” : زیر شبکه ها [ 1 ] .subnet_id }]) endpoint.add_to_policy (iam.PolicyStatement ( اثر = iam. اثر اجازه می دهد ، منابع = [ سطل_کار ، f “{bucket_arn} / *” ] ، principals = [iam.ArnPrincipal ( “*” )] ، اقدامات = [ “s3: GetObject” ، “s3: GetObjects” ، “s3: ListObjects” ، “S3: ListBucket” ] ،
)) # دسترسی به سطل آمازون S3 را شامل لایه های برای هر تصویر Docker از ECR می کند.
endpoint.add_to_policy (iam.PolicyStatement ( اثر = iam. اثر اجازه می دهد ، منابع = [ f “arn: aws: s3 ::: prod- {self.region} -starport-layer-bucket / *” ] ، principals = [iam.ArnPrincipal ( “*” )] ، اقدامات = [ “s3: GetObject” ] ،
))

هنگام ایجاد NAT Gateway ، یک آدرس IP الاستیک در AWS ایجاد می کند. برای تنظیم قانون Azure Storage Firewall در مرحله 3 به آدرس IP نیاز خواهیم داشت.

استقرار حساب ذخیره سازی Azure

برای ساده سازی مدیریت منابع ، می توانیم از Azure استفاده کنیم الگوی مدیر منابع (الگوی ARM) برای استقرار منابع در سطح اشتراک Azure ما.

من فرض می کنم شما از قبل یک تنظیم اشتراک Azure دارید. ما برای ایجاد ترافیک از یک آدرس IP خاص از پوسته Cloud برای استقرار یک گروه منابع ، حساب Azure Storage ، یک کانتینر و قانون فایروال استفاده خواهیم کرد.

روی نماد Cloud Shell در نماد کلیک کنید نوار هدر Azure Portal ، و Cloud Shell را باز می کند.

/ p>

az group create –name examplegroup –location australiaeast گروه استقرار az ایجاد – گروه -مثال گروه گروه –template-uri https://raw.githubusercontent.com/yai333/DataPipelineS32Blob/master/Azure-Template-DemoRG/template.json – پارامترهای ذخیره سازیAccounts_mydemostroageaccounty> نام = / div>

پس از استقرار الگو ، می توانیم با کاوش در گروه منابع پورتال Azure استقرار را تأیید کنیم. تمام منابع مستقر شده در بخش نمای کلی گروه منابع نمایش داده می شوند.

> برای کلاس ایجاد کنیم”> برای کلاس ایجاد کنیم “> برای کلاس ایجاد کنیم”> برای کلاس ایجاد کنیم “> برای کلاس ایجاد کنیم”> برای کلاس ایجاد می کنیم “> حساب ذخیره سازی ما:

  • در مرحله اول ، به حساب ذخیره سازی که تازه مستقر کرده ایم بروید.
  • ثانیا ، روی منوی تنظیمات به نام Firewalls و شبکه های مجازی کلیک کنید.
  • ثالثاً ، بررسی کنید که اجازه دسترسی از شبکه های انتخاب شده را انتخاب کرده باشید.
  • سپس ، برای اعطای دسترسی به محدوده IP اینترنت ، آدرس IP عمومی AWS VPC را وارد کنید (مرحله 2) و ذخیره کنید. < div class = "image-container">
  • ما سپس برای دسترسی محدود به منابع ذخیره سازی Azure ، امضاهای دسترسی مشترک (SAS) ایجاد کنید.

    اجرای دستور زیر در Cloudshell:

    RG_NAME = ” گروه نمونه “
    ACCOUNT_NAME = ‘mydemostorageaccountaiyi’
    ACCOUNT_KEY = فهرست کلیدهای حساب ذخیره سازی –account-name = $ ACCOUNT_NAME – پرس و جو [0] .value -o tsv`
    BLOB_CONTAINER = ظرف نگهدارنده democ STORAGE_CONN_STRING = حساب ذخیره سازی show-connection-string – نام $ ACCOUNT_NAME – گروه منبع $ RG_NAME – خروجی tsv` SAS = ظرف ذخیره سازی تولید-sas – رشته اتصال $ STORAGE_CONN_STRING -n $ BLOB_CONTAINER – انقضا ‘2021-06-30’ – مجوزهای ACLRW – خروجی tsv` echo $SAS

    ما SAS و URL های موردنیاز را که اعطا می کنند دریافت خواهیم کرد (a) dd (d) elete (r) ead (w) دسترسی آیینی به یک ظرف نگهدارنده مخزن حباب.

    se = 2021 -06 -30&sp=racwl&sv=2018-11 -09 & sr = c & sig = xxxxbBfqfEppPpBZPOTRiwvkh69xxxx / xxxxQA0Ytspan> 3 D

    بیایید دوباره به AWS برگردیم و SAS را به فروشگاه پارامتر AWS SSM قرار دهیم.

    اجرای دستور زیر در local terminator .

    aws ssm put-parameter –cli-input-json ‘{ “نام”: “/ s3toblob / azure / storage / sas” ، “مقدار”: “se = 2021-06-30 & sp = racwl & sv = 2018-11-09 & sr = c & sig = xxxxbBfqfEppPpBZPOTRiwvkh69xxxx / xxxxQA0YtKo٪ 3D” ، “Type”: “SecureString”
    } ‘

    تعریف توابع Lambda و لایه AWS Data Wrangler

    اکنون ، بیایید به سمت توابع lambda حرکت کنیم. ما سه توابع lambda و یک لایه lambda ایجاد خواهیم کرد: strong> fn_create_s3batch_manifest و AWS Data Wrangler layer

    این تابع lambda با استفاده از ماژول Athena داده Wrangler از AWS برای فیلتر کردن پرونده های جدید در تاریخ UTC گذشته و ذخیره لیست پرونده ها در یک CSV پرونده.

    کد زیر را در CDK stack.py کپی کنید. پرونده awswranger-layerzip را از اینجا بارگیری کنید.

    datawrangler_layer = lambda_.LayerVersion (خود ، “DataWranglerLayer” ، کد = lambda_.Code.from_asset ( “./layers/awswrangler-layer-1.9.6-py3.6.zip” ) ، زمانهای_فقیت = = lambda_.Runtime.PYTHON_3_6] ) fn_create_s3batch_manifest = lambda_.Function (خود ، “CreateS3BatchManifest” ، زمان اجرا = lambda_.Runtime.PYTHON_3_6 ، handler = “lambda_create_s3batch_manifest.handler” ، timeout = core.Duration.minutes ( 15 ) ، کد = lambda_.Code.from_asset ( “./src” ) ، لایه ها = [ datawrangler_layer] ) fn_create_s3batch_manifest.add_en Environment ( “DESTINATION_BUCKET_NAME” ، s3_destination_bucket_name)
    fn_create_s3batch_manifest.add_en Environment ( “SOURCE_BUCKET_NAME” ، self.s3_source_bucket_name) fn_create_s3batch_manifest.ad__rool_policy (iam.PolicyStatement ( اثر = iam. اثر اجازه می دهد ، منابع = [ “*” ] ، اقدامات = [ “چسب: GetTable” ، “چسب: جدول ایجاد” ، “athena: StartQueryExecution” ، “athena: CancelQueryExecution” ، “athena: StopQueryExecution” ، “athena: GetQueryExecution” ، “athena: GetQueryResults” ] ،
    )) fn_create_s3batch_manifest.ad__rool_policy (iam.PolicyStatement ( اثر = iam. اثر اجازه می دهد ، منابع = [ f “arn: aws: glue: {self.region} : {self.account} : کاتالوگ “ ، f “arn: aws: glue: {self.region} : {self.account} : پایگاه داده / * “ ، f “arn: aws: glue: {self.region} : {self.account} : table / * “ ] ، اقدامات = [ “چسب: GetDatabases” ، “چسب: GetDatabase” ، “چسب: BatchCreatePartition” ، “چسب: GetPartitions” ، “چسب: CreateDatabase” ، “چسب: GetPartition” ] ،
    )) s3_destination.add_event_notification (s3.EventType.OBJECT_CREATED ، s3n.LambdaDestination ( fn_create_s3batch_manifest) ، { “پیشوند” : f ‘ {self.s3_source_bucket_name} / demoDataBucketInventory0 /’ ، “پسوند “: ‘.json’ })

    سپس

    را ایجاد کنید. / src/lambda_create_s3batch_manifest.py با کد زیر: < div class = "code-container" readability = "30"> وارد کردن json
    وارد کردن ورود به سیستم
    وارد کردن سیستم عامل
    از datetime import datetime ، timedelta
    import awswrangler as wr logger = logging.getLogger ()
    logger.setLevel (logging.DEBUG) DATABASE_NAME = “s3datademo”
    TABLE_NAME = “موضوعات روزانه” def handler (رویداد ، زمینه) : logger.info ( “رویداد دریافت شده:” + json.dumps (event، indent = 2 ))) اگر DATABASE_NAME نیست در wr.catalog.databases (). مقادیر: wr.catalog.create_database (DATABASE_NAME) event_date = datetime.strptime ( event [ “Records” ] [ 0 ] [ “eventTime” ] ، “٪ Y-٪ m-٪ dT٪ H :٪ M:٪ S.٪ fZ “) partition_dt = f ‘ {(event_date – timedelta (days = 1 ))). strftime ( “٪ Y-٪ m-٪ d “)} -00-00″ previous_partition_dt = f ‘ {(event_date – timedelta (days = 2 ))). strftime ( “٪ Y-٪ m-٪ d “)} -00-00″ logger.debug ( f “partition_dt: {partition_dt} ) اگر نیست wr.catalog.does_table_exist (پایگاه داده = DATABASE_NAME ، جدول = TABLE_NAME): table_query_exec_id = wr.athena.start_query_execution (s3_output = f “s3: // {os.getenv ( ‘DESTINATION_BUCKET_NAME’ )} / athena_output “ ، sql = f “ایجاد جدول خارجی {TABLE_NAME} (\ رشته `سطل` ، رشته کلید ، \ رشته version_id ، \ _ جدیدترین بولی ، \ is_delete_marker boolean است ، \ اندازه bigint ، \ آخرین مهلت تاریخ تغییر یافته ، \ رشته e_tag \ ) \ مشارکت شده توسط (رشته dt) \ ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.OpenCSVSerde’ \ ذخیره شده به عنوان INPUTFORMAT ‘org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat’ \ OUTPUTFORMAT ‘org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat’ \ LOCATION ‘s3: // {os.getenv (‘ DESTINATION_BUCKET_NAME ‘)}} / {os .getenv ( ‘SOURCE_BUCKET_NAME’ )} / demoDataBucketInventory0 / hive / ‘؛ “ ، پایگاه داده = DATABASE_NAME) wr.athena.wait_query (query_execution_id = table_query_exec_id) partition_query_exec_id = wr.athena.start_query_execution ( sql = f “ALTER TABLE {TABLE_NAME} اگر قسمت موجود نباشد ، اضافه کنید (dt = \ ‘ {partition_dt} < / span> \ ‘)؛ “ ، s3_output = f “s3: // {os.getenv ( ‘DESTINATION_BUCKET_NAME’ )} / athena_output” ، پایگاه داده = DATABASE_NAME) wr.athena.wait_query (query_execution_id = partition_query_exec_id) select_query_exec_id = wr.athena.start_query_execution (sql = “سطل DISTINCT را به عنوان” “انتخاب کنید + os.getenv ( “SOURCE_BUCKET_NAME” ) + ‘”، کلید را به عنوان” dump.txt “FROM” + کلید دهید TABLE_NAME + “where dt = ‘” + partition_dt + “” و is_delete_marker = false “است + “به جز” + “سطل DISTINCT را به عنوان” “انتخاب کنید” + os.getenv ( “SOURCE_BUCKET_NAME” ) + ‘”، کلید را به عنوان” dump.txt “FROM” + کلید دهید TABLE_NAME + “where dt = ‘” + پارتیشن_ قبلی + dt + “” and is_delete_marker = false؛ “ ، پایگاه داده = DATABASE_NAME ، s3_output = f “s3: // {os.getenv ( ‘DESTINATION_BUCKET_NAME’ )} / csv_manifest / dt = {partition_dt} ) return select_query_exec_id

    در کدگذاری فوق ، از کوئری آتنا برای ایجاد Glue Database، Table و افزودن پارتیشن به آن جدول استفاده می کنیم. سپس lambda به جز کوئری اجرا می کند تا تفاوت بین دو پارتیشن تاریخ را برگرداند.

    توجه داشته باشید که start_query_execution ناهمزمان است ، بنابراین نیازی به انتظار برای نتیجه در Lambda نیست. پس از اجرای درخواست ، نتیجه به عنوان پرونده CSV در s3_output = f “s3: // {os.getenv (‘DESTINATION_BUCKET_NAME’)} / csv_manifest / dt = {partition_dt}” ذخیره می شود.

    fn_create_batch_job و S3 Notification

    در این بخش ، ما یک تابع lambda ایجاد خواهیم کرد fn_create_batch_job و Amazon S3 را قادر می سازد هنگام ارسال فایل CSV به سطل آمازون S3 اعلانی را برای ایجاد fn_create_batch_job ارسال کند / پیشوند csv_manifest. کد زیر را در CDK stack.py:

    fn_create_batch_job = lambda_.Function قرار دهید (خود ، “CreateS3BatchJobFunction” ، زمان اجرا = lambda_.Runtime.PYTHON_3_6 ، handler = “lambda_create_batch_job.handler” ، timeout = core.Duration.minutes ( 5 ) ، کد = lambda_.Code.from_asset ( “./src” ))
    fn_create_batch_job.add_en Environment ( “ROLE_ARN” ، s3_batch_role.role_arn)
    fn_create_batch_job.add_en Environment ( “SOURCE_BUCKET_NAME” ، self.s3_source_bucket_name) fn_create_batch_job.add_to_role_policy (iam.PolicyStatement ( اثر = iam. اثر اجازه می دهد ، اقدامات = [ “s3: CreateJob” ] ، منابع = [ “*” ] )) fn_create_batch_job.add_to_role_policy (iam.PolicyStatement ( اثر = iam. اثر اجازه می دهد ، اقدامات = [ “iam: PassRole” ] ، منابع = [s3_batch_role.role_arn]
    )) s3_destination.add_event_notification (s3.EventType.OBJECT_CREATED ، s3n.LambdaDestination ( fn_create_batch_job) ، { “پیشوند” : f’csv_manifest / ‘ ، “پسوند” : ‘ .csv ‘}) < / div>

    ایجاد ./src/lambda_create_batch_job.py با کد زیر:

    import json
    وارد کردن boto3
    وارد کردن ورود به سیستم
    وارد کردن سیستم عامل
    از urllib.parse وارد کردن بدون نقل قول logger = logging.getLogger ()
    logger.setLevel (logging.DEBUG) s3_control_client = boto3.client ( “s3control” )
    s3_cli = boto3.client ( ‘s3’ ) def هندلر (رویداد ، زمینه) : logger.info ( “رویداد دریافت شده:” + json.dumps (event، indent = 2 ))) account_id = boto3.client ( ‘sts’ ) .get_caller_identity (). get ( ‘Account’ ) bucket_name = event [ “Records” ] [ 0 ] [ “s3” ] [ “سطل” ] [ “نام” ] bucket_arn = event [ “Records” ] [ 0 ] [ “s3” ] [ “سطل” ] [ ‘arn’ ] file_key = event [ “Records” ] [ 0 ] [ “s3” ] [ “object” ] [ “کلید” ] e_tag = event [ “Records” ] [ 0 ] [ ‘s3’ ] [ ‘object’ ] [ ‘eTag’ ] logger.info ( “خواندن {} از {}” .format (file_key ، bucket_name)) پاسخ = s3_control_client.create_job ( AccountId = حساب_id ، ConfirmationRequired = False ، عملیات = { ‘S3PutObjectCopy’ : { ‘TargetResource’ : bucket_arn ، ‘StorageClass’ : ‘STANDARD’ ، ‘TargetKeyPrefix’ : ‘tmp_transition’ } ، } ، گزارش = { “سطل” : bucket_arn ، ‘Format’ : ‘Report_CSV_20180820’ ، “فعال” : درست است ، ‘پیشوند’ : f’report / {os.getenv ( “SOURCE_BUCKET_NAME” )} ، ‘ReportScope’ : ‘FailedTasksOlyly’ } ، مانیفست = { “Spec” : { ‘Format’ : ‘S3BatchOperations_CSV_20180820’ ، “زمینه ها” : [ “سطل” ، “کلید” ] } ، “مکان” : { ‘ObjectArn’ : f ‘ {bucket_arn} / {unquote (file_key)} ، ‘ETag’ : e_tag } } ، اولویت = 10 ، RoleArn = os.getenv ( “ROLE_ARN” ) ، برچسب ها = [ { “کلید” : “مهندس” ، “ارزش” : “yiai” } ، ] ) logger.info ( “پاسخ شغلی S3 barch:” + json.dumps (پاسخ ، indent = 2 )) بازگشت

    Lambda

    fn_create_batch_job تابع ایجاد S3 Batch Operation Job ، تمام فایل های ذکر شده در مانیفست CSV را در S3 Destination Bucket / tmp_transition پیشوند کپی کنید. < p class = "paragraph"> S3 Batch Operations یک ویژگی مدیریت داده S3 آمازون است که به شما امکان می دهد میلیاردها شی را در مقیاس مدیریت کنید. برای شروع کار S3 Batch Operation Job ، همچنین باید یک نقش IAM S3BatchRole با سیاست های مربوطه تنظیم کنیم:

    s3_batch_role = iam.Role (خود ، < span> “S3BatchRole” ، assulated_by = iam.ServicePrincipal ( “batchoperations.s3.amazonaws.com” ) ) s3_batch_role.add_to_policy (iam.PolicyStatement ( اثر = iam. اثر اجازه می دهد ، منابع = [ s3_destination.bucket_arn ، f “ {s3_destination.bucket_arn} / *” ] ، اقدامات = [ “s3: PutObject” ، “s3: PutObjectAcl” ، “s3: PutObjectTagging” ، “s3: PutObjectLegalHold” ، “s3: PutObjectRetention” ، “s3: GetBucketObjectLockConfiguration” ] ،
    )) s3_batch_role.add_to_policy (iam.PolicyStatement ( اثر = iam. اثر اجازه می دهد ، منابع = [ s3_source.bucket_arn ، f “ {s3_source.bucket_arn} / *” ] ، اقدامات = [ “s3: GetObject” ، “s3: GetObjectAcl” ، “s3: GetObjectTagging” ] ،
    )) s3_batch_role.add_to_policy (iam.PolicyStatement ( اثر = iam. اثر اجازه می دهد ، منابع = [ f “ {s3_destination.bucket_arn} / *” ] ، اقدامات = [ “s3: GetObject” ، “s3: GetObjectVersion” ، “s3: GetBucketLocation” ] ،
    )) s3_batch_role.add_to_policy (iam.PolicyStatement ( اثر = iam. اثر اجازه می دهد ، منابع = [ f “ {s3_destination.bucket_arn} / report / {self.s3_source_bucket_name} / *” < / span> ] ، اقدامات = [ “s3: PutObject” ، “s3: GetBucketLocation” ] ،
    ))

    fn_process_transfer_task و قانون سفارشی Eventbridge

    ما یک قانون سفارشی Eventbridge ایجاد خواهیم کرد که یک کار Batch Operations S3 را در Amazon EventBridge از طریق AWS CloudTrail ردیابی می کند و رویدادها را به وضعیت تکمیل شده در منبع اطلاع رسانی هدف fn_process_transfer_task.

    Lambda

    fn_process_transfer_task سپس یک Fargate Task را برای کپی کردن پرونده ها در پیشوند / tmp_transition به نسخه آزمایشی Azure Storage Container. = “code-container” readability = “35”> fn_process_transfer_task = lambda_.Function (خود ، “ProcessS3TransferFunction” ، زمان اجرا = lambda_.Runtime.PYTHON_3_6 ، handler = “lambda_process_s3transfer_task.handler” ، timeout = core.Duration.minutes ( 5 ) ، کد = lambda_.Code.from_asset ( “./src” ))
    fn_process_transfer_task.add_en Environment ( “CLUSTER_NAME” ، نام خوشه) fn_process_transfer_task.add_en Environment ( “PRIVATE_SUBNET1” ، زیر شبکه [ 0 ] .subnet_id)
    fn_process_transfer_task.add_en Environment ( “PRIVATE_SUBNET2” ، زیر شبکه ها [ 1 ] .subnet_id)
    fn_process_transfer_task.add_en Environment ( “TASK_DEFINITION” ، task_definition.task_definition_arn)
    fn_process_transfer_task.add_en Environment ( “S3_BUCKET_NAME” ، s3_destination_bucket_name) fn_process_transfer_task.ad__rool_policy (iam.PolicyStatement ( اثر = iam. اثر اجازه می دهد ، منابع = [ task_definition.task_definition_arn ] ، اقدامات = [ “ecs: RunTask” ] ،
    )) fn_process_transfer_task.add_to_role_policy (iam.PolicyStatement ( اثر = iam. اثر اجازه می دهد ، اقدامات = [ “iam: PassRole” ] ، منابع = [task_definition.execution_role.role_arn]
    )) fn_process_transfer_task.add_to_role_policy (iam.PolicyStatement ( اثر = iam. اثر اجازه می دهد ، اقدامات = [ “iam: PassRole” ] ، منابع = [task_definition.task_role.role_arn]
    )) دنباله = دنباله. دنباله ( خود ، “CloudTrail” ، ارسال_به_ Cloud_watch_logs = درست است ) event_rule = trail.on_event (خود ، “S3JobEvent” ، هدف = اهداف. عملکرد Lambda ( کنترل کننده = fn_process_transfer_task) )
    event_rule.add_event_pattern ( منبع = [ “aws.s3” ] ، نوع جزئیات = [ “رویداد خدمات AWS از طریق CloudTrail” ] ، جزئیات = { “eventSource” : [ “s3.amazonaws.com” ] ، “eventName” : [ “JobStatusChanged” ] ، “serviceEventDetails” : { “وضعیت” : [ “کامل” ] } }
    )

    ایجاد ./src/lambda_process_s3transfer_task.py با کد زیر:

    import < / span> json
    وارد کردن boto3
    وارد کردن ورود به سیستم
    وارد کردن سیستم عامل logger = logging.getLogger ()
    logger.setLevel (ورود به سیستم DEBUG) ecs = boto3.client ( “ecs” ) def handler (رویداد ، زمینه) : logger.info ( “رویداد دریافت شده:” + json.dumps (event، indent = 2 ))) logger.info ( “ENV SUBNETS:” + json.dumps (os.getenv ( “SUBNETS” ) ، indent = 3 )) پاسخ = ecs.run_task ( خوشه = os.getenv ( “CLUSTER_NAME” ) ، taskDefinition = os.getenv ( “TASK_DEFINITION” ) ، launchType = ‘FARGATE’ ، تعداد = 1 ، platformVersion = “LATEST” ، پیکربندی شبکه = { ‘awsvpcConfiguration’ : { “زیر شبکه” : [ os.getenv ( “PRIVATE_SUBNET1” ) ، os.getenv ( “PRIVATE_SUBNET2” ) ، ] ، ‘assignPublicIp’ : ‘DISABLED’ } } ، overrides = { “containerOverrides” : [{ “name” : “azcopy” ، “حافظه” : 512 ، ‘memoryReservation’ : 512 ، ‘cpu’ : 2 ، “محیط” : [ { ‘name’ : ‘S3_SOURCE’ ، ‘مقدار’ : f’https: // s3. {os.getenv ( “AWS_REGION” )}} < /span>.amazonaws.com/ {os.getenv ( “S3_BUCKET_NAME” )} / tmp_transition “ } ] ، }]}) return خیابان (پاسخ)

    اکنون ، ما قسمت بدون سرور را تنظیم کرده ایم. بیایید به کار Fargate برویم و تکثیر داده ها را پردازش کنیم.

    ایجاد یک کار AWS Fargate

    ما ایجاد خواهیم کرد:

    • یک تصویر ECR با AzCopy نصب شد. AzCopy یک ابزار خط فرمان است که می توانید برای کپی کردن حباب ها یا پرونده ها از یک حساب ذخیره سازی یا از آن استفاده کنید.
    • یک خوشه ECS با یک کار Fargte.

    بیایید شروع کنیم.

    1) ECS ، ECR و Fargate پشته بسازید.

    از aws_cdk وارد کردن ( aws_iam به عنوان iam ، aws_ecr as ecr_ ، aws_ecs as ecs ، هسته،
    ) ecr = ecr_.Repository (خود ، “azcopy” )
    خوشه = ecs.Cluster (خود ، “DemoCluster” ، vpc = vpc، container_insights = True ) task_definition = ecs.FargateTaskDefinition ( خود ، “azcopyTaskDef” )
    task_definition.add_container ( “azcopy” ، image = ecs.ContainerImage.from_registry ( ecr.repository_uri) ، logging = ecs.LogDrivers.aws_logs (stream_prefix = “s32blob” ) ، محیط = { ‘AZURE_BLOB_URL’ : ‘https://mydemostroageaccount.blob.core.windows.net/democontainer/’ } ، اسرار = { ‘SAS_TOKEN’ : ecs.Secret.from_ssm_parameter ( ssm.StringParameter.from_secure_string_parameter_attributes (خود ، “sas” ، parameter_name = ‘/ azure / storage / sas’ ، نسخه = 2 )))
    }) task_definition.task_role.add_to_policy (iam.PolicyStatement ( اثر = iam. اثر اجازه می دهد ، منابع = [ سطل_کار ، f “ {bucket_arn} / *” ] ، اقدامات = [ “s3: GetObject” ، “s3: GetObjects” ، “s3: ListObjects” ، “S3: ListBucket” ] ،
    ))
    ecr.grant_pull (task_definition.obtain_execution_role ())

    2) یک تصویر داکر بسازید و Azcopy را در آنجا نصب کنید.

    از alpine AS azcopy
    RUN apk اضافه کردن –no-cache wget \ && wget https://aka.ms/downloadazcopy-v10-linux -O /tmp/azcopy.tgz \ && صادرات BIN_LOCATION = $ (tar -tzf /tmp/azcopy.tgz | grep “/ azcopy” ) \ && tar -xzf /tmp/azcopy.tgz $ BIN_LOCATION –strip-components = 1 -C / usr / bin

    FROM آلپ: 3.9
    RUN به روز رسانی APK و & amp ؛ apk اضافه کردن libc6-compat ca گواهینامه jq curl
    COPY – از = azcopy / usr / bin / azcopy / usr / محلی / bin / azcopy
    RUN ldd / usr / local / bin / azcopy
    COPY entrypoint.sh /
    RUN chmod 777 /entrypoint.sh

    ENTRYPOINT [ “sh” ، “/entrypoint.sh “ ]
    #! / bin / bash

    echo “export AWS_CONTAINER_CREDENTIALS_RELATIVE_URI = $ AWS_CONTAINER_CREDENTIALS_RELATIVE_URI >> /root/.profile
    json = $ (curl “http://169.254.170.2 $ {AWS_CONTAINER_CREDENTIALS_RELATIVE_URI} ) صادرات AWS_ACCESS_KEY_ID = $ ( echo $ json | jq -r “.AccessKeyId” < / span>) نصب pip -r Requisations.txt