
ممکن است یکی از نیازهای کسب و کار شما باشد که مقدار زیادی داده را بطور دوره ای از یک ابر عمومی به ابر دیگر منتقل کنید. به طور خاص ، ممکن است با دستوراتی روبرو شوید که به راه حل چند ابر نیاز دارند. این مقاله شامل یک رویکرد به صورت خودکار تکثیر داده ها از AWS S3 Bucket در Microsoft Azure Blob Storage Container با استفاده از Amazon S3 Inventory ، Amazon S3 Batch Operations ، Fargate و AzCopy است.
سناریو
شرکت شما هر روز پس از فشرده سازی ، پرونده های CSV جدید را در محل با اندازه کلی حدود 100 گیگابایت تولید می کند. اندازه همه پرونده ها 1–2 گیگابایت است و باید هر شب در یک ساعت مشخص بین 3 تا 5 صبح در Amazon S3 بارگذاری شوند. مشاغل شما تصمیم گرفته است پس از تمام پرونده های بارگذاری شده در S3 ، آن فایل های CSV را از S3 در Microsoft Azure Storage کپی کند. شما باید راهی آسان و سریع برای خودکار کردن گردش کار تکثیر داده ها پیدا کنید.
برای انجام این کار ، ما می توانیم یک خط لوله داده بسازیم تا داده ها را به صورت دوره ای از S3 به Azure Storage با استفاده از AWS کپی کند داده Wrangler ، Amazon S3 Inventory ، Amazon S3 Batch Operations ، Athena ، Fargate و AzCopy. div class = “image-container”>
آنچه را که ما پوشش خواهیم داد:
- ایجاد VPC با زیر شبکه های خصوصی و عمومی ، نقاط انتهایی S3 و دروازه NAT .
- یک حساب ذخیره سازی Azure و یک ظرف حباب ایجاد کنید ، یک رمز SAS ایجاد کنید ، سپس یک قانون فایروال اضافه کنید تا از AWS VPC به Azure Storage دسترسی داشته باشید.
- پیکربندی گزارش های موجودی S3 روزانه در سطل S3.
- از Athena برای استفاده استفاده کنید فقط اشیا جدید را از گزارش موجودی S3 فیلتر کنید و نام سطل و کلیدهای اشیا those آن اشیا to را به یک فایل مانیفست CSV صادر کنید.
- برای ایجاد یک شغل کپی SUT Batch Operations S3 از اسناد صادر شده CSV استفاده کنید یک سطل مقصد S3 با قانون انقضا سیاست چرخه زندگی پیکربندی شده است.
- برای اجرای وظیفه Fargate که از همه اشیا with با همان پیشوند در سطل مقصد در Azure Storage کپی می کند ، از قانون Eventbridge استفاده کنید.
پیش نیازها
- راه اندازی یک حساب AWS
- راه اندازی یک حساب Azure
- نصب جدیدترین AWS-CLI
- AWS CDK-CLI را نصب کنید
- درک اساسی AWS CDK
- درک اساسی داکر
شروع کنیم! < h2> ایجاد سطلهای منبع و مقصد S3
- راه اندازی یک حساب AWS
- راه اندازی یک حساب Azure
- نصب جدیدترین AWS-CLI
- AWS CDK-CLI را نصب کنید
- درک اساسی AWS CDK
- درک اساسی داکر
شروع کنیم! < h2> ایجاد سطلهای منبع و مقصد S3
ما از CDK برای ایجاد زیرساخت های خود بر اساس AWS استفاده می کنیم. ابتدا ، بیایید یک سطل منبع ایجاد کنیم تا پرونده ها را از ارائه دهندگان خارجی یا موارد قبلی دریافت کنیم و گزارش های موجودی روزانه را تنظیم کنیم که لیستی از پرونده ها و ابرداده های شما را با یک پرونده تخت فراهم می کند.
بعدی ، یک سطل مقصد به عنوان ذخیره سازی موقت با قانون انقضاiration سیاست چرخه حیات که در پیشوند / tmp_transition پیکربندی شده ایجاد کنید همه پرونده های دارای پیشوند (به عنوان مثال
/tmp_transition/file1.csv) در Azure کپی می شوند و پس از 24 ساعت با سیاست چرخه عمر حذف می شوند.
برای ایجاد سطل های S3 از کد زیر استفاده کنید.
) s3_destination = s3.Bucket (خود ، “dataBucketInventory” ، قوانین چرخه عمر = [ { ‘انقضا’ : core.Duration.days ( 1.0 ) ، ‘پیشوند’ : ‘tmp_transition’ } ، ]) s3_source = s3.Bucket (خود ، “demoDataBucket” ، bucket_name = self.s3_source_bucket_name ، رمزگذاری = s3.BucketEncryption.S3_MANAGED ، موجودی = [ { “فرکانس” : s3.InventoryFrequency.DAILY ، “include_object_versions” : s3.InventoryObjectVersion.CURRENT ، “مقصد” : { “سطل” : s3_destination } } ])
ایجاد AWS VPC
بعد ، ما باید VPC را با دو زیرشبکه عمومی و خصوصی ، NAT Gateway ، نقطه پایانی S3 ایجاد کنیم و خط مشی پایانی را ضمیمه کنیم که اجازه می دهد دسترسی به ظرف فارگیت که سطل S3 به آن از Azure کپی می کنیم.
اکنون VPC و منابع مربوطه خود را با استفاده از کد زیر تعریف کنید.
)
vpc = ec2.Vpc (خود ، “demoVPC” ، max_azs = 2 ، cidr = “10.0.0.0/16” ، nat_gateways = 1 ، subnet_configuration = [{ “cidrMask” : 24 ، “name” : “private” ، “subnetType” : ec2.SubnetType.PRIVATE } ، { “cidrMask” : 24 ، “نام” : “عمومی” ، “subnetType” : ec2.SubnetType.PUBLIC }] )
زیر شبکه = vpc.select_subnets ( subnet_type = ec2.SubnetType.PRIVATE). زیرشبکه ها
endpoint = vpc.add_gateway_endpoint ( ‘s3Endpoint’ ، service = ec2.GatewayVpcEndpointAwsService.S3 ، زیر شبکه = [{ “subnet_id” : زیر شبکه [ 0 ] .subnet_id } ، { “subnet_id” : زیر شبکه ها [ 1 ] .subnet_id }]) endpoint.add_to_policy (iam.PolicyStatement ( اثر = iam. اثر اجازه می دهد ، منابع = [ سطل_کار ، f “{bucket_arn} / *” ] ، principals = [iam.ArnPrincipal ( “*” )] ، اقدامات = [ “s3: GetObject” ، “s3: GetObjects” ، “s3: ListObjects” ، “S3: ListBucket” ] ،
)) # دسترسی به سطل آمازون S3 را شامل لایه های برای هر تصویر Docker از ECR می کند.
endpoint.add_to_policy (iam.PolicyStatement ( اثر = iam. اثر اجازه می دهد ، منابع = [ f “arn: aws: s3 ::: prod- {self.region} -starport-layer-bucket / *” ] ، principals = [iam.ArnPrincipal ( “*” )] ، اقدامات = [ “s3: GetObject” ] ،
))
هنگام ایجاد NAT Gateway ، یک آدرس IP الاستیک در AWS ایجاد می کند. برای تنظیم قانون Azure Storage Firewall در مرحله 3 به آدرس IP نیاز خواهیم داشت.
استقرار حساب ذخیره سازی Azure
برای ساده سازی مدیریت منابع ، می توانیم از Azure استفاده کنیم الگوی مدیر منابع (الگوی ARM) برای استقرار منابع در سطح اشتراک Azure ما.
من فرض می کنم شما از قبل یک تنظیم اشتراک Azure دارید. ما برای ایجاد ترافیک از یک آدرس IP خاص از پوسته Cloud برای استقرار یک گروه منابع ، حساب Azure Storage ، یک کانتینر و قانون فایروال استفاده خواهیم کرد.
روی نماد Cloud Shell در نماد کلیک کنید نوار هدر Azure Portal ، و Cloud Shell را باز می کند.

/ p>
پس از استقرار الگو ، می توانیم با کاوش در گروه منابع پورتال Azure استقرار را تأیید کنیم. تمام منابع مستقر شده در بخش نمای کلی گروه منابع نمایش داده می شوند.

> برای کلاس ایجاد کنیم”> برای کلاس ایجاد کنیم “> برای کلاس ایجاد کنیم”> برای کلاس ایجاد کنیم “> برای کلاس ایجاد کنیم”> برای کلاس ایجاد می کنیم “> حساب ذخیره سازی ما:

ما سپس برای دسترسی محدود به منابع ذخیره سازی Azure ، امضاهای دسترسی مشترک (SAS) ایجاد کنید.
اجرای دستور زیر در Cloudshell:
ACCOUNT_NAME = ‘mydemostorageaccountaiyi’
ACCOUNT_KEY = فهرست کلیدهای حساب ذخیره سازی –account-name = $ ACCOUNT_NAME – پرس و جو [0] .value -o tsv`
BLOB_CONTAINER = ظرف نگهدارنده democ STORAGE_CONN_STRING = حساب ذخیره سازی show-connection-string – نام $ ACCOUNT_NAME – گروه منبع $ RG_NAME – خروجی tsv` SAS = ظرف ذخیره سازی تولید-sas – رشته اتصال $ STORAGE_CONN_STRING -n $ BLOB_CONTAINER – انقضا ‘2021-06-30’ – مجوزهای ACLRW – خروجی tsv` echo $SAS
ما SAS و URL های موردنیاز را که اعطا می کنند دریافت خواهیم کرد (a) dd (d) elete (r) ead (w) دسترسی آیینی به یک ظرف نگهدارنده مخزن حباب.
بیایید دوباره به AWS برگردیم و SAS را به فروشگاه پارامتر AWS SSM قرار دهیم.
اجرای دستور زیر در local terminator .
} ‘
تعریف توابع Lambda و لایه AWS Data Wrangler
اکنون ، بیایید به سمت توابع lambda حرکت کنیم. ما سه توابع lambda و یک لایه lambda ایجاد خواهیم کرد: strong> fn_create_s3batch_manifest و AWS Data Wrangler layer
این تابع lambda با استفاده از ماژول Athena داده Wrangler از AWS برای فیلتر کردن پرونده های جدید در تاریخ UTC گذشته و ذخیره لیست پرونده ها در یک CSV پرونده.
کد زیر را در CDK stack.py کپی کنید. پرونده awswranger-layerzip را از اینجا بارگیری کنید.
fn_create_s3batch_manifest.add_en Environment ( “SOURCE_BUCKET_NAME” ، self.s3_source_bucket_name) fn_create_s3batch_manifest.ad__rool_policy (iam.PolicyStatement ( اثر = iam. اثر اجازه می دهد ، منابع = [ “*” ] ، اقدامات = [ “چسب: GetTable” ، “چسب: جدول ایجاد” ، “athena: StartQueryExecution” ، “athena: CancelQueryExecution” ، “athena: StopQueryExecution” ، “athena: GetQueryExecution” ، “athena: GetQueryResults” ] ،
)) fn_create_s3batch_manifest.ad__rool_policy (iam.PolicyStatement ( اثر = iam. اثر اجازه می دهد ، منابع = [ f “arn: aws: glue: {self.region} : {self.account} : کاتالوگ “ ، f “arn: aws: glue: {self.region} : {self.account} : پایگاه داده / * “ ، f “arn: aws: glue: {self.region} : {self.account} : table / * “ ] ، اقدامات = [ “چسب: GetDatabases” ، “چسب: GetDatabase” ، “چسب: BatchCreatePartition” ، “چسب: GetPartitions” ، “چسب: CreateDatabase” ، “چسب: GetPartition” ] ،
)) s3_destination.add_event_notification (s3.EventType.OBJECT_CREATED ، s3n.LambdaDestination ( fn_create_s3batch_manifest) ، { “پیشوند” : f ‘ {self.s3_source_bucket_name} / demoDataBucketInventory0 /’ ، “پسوند “: ‘.json’ })
سپس
را ایجاد کنید. / src/lambda_create_s3batch_manifest.py با کد زیر: < div class = "code-container" readability = "30"> وارد کردن json
وارد کردن ورود به سیستم
وارد کردن سیستم عامل
از datetime import datetime ، timedelta
import awswrangler as wr logger = logging.getLogger ()
logger.setLevel (logging.DEBUG) DATABASE_NAME = “s3datademo”
TABLE_NAME = “موضوعات روزانه” def handler (رویداد ، زمینه) : logger.info ( “رویداد دریافت شده:” + json.dumps (event، indent = 2 ))) اگر DATABASE_NAME نیست در wr.catalog.databases (). مقادیر: wr.catalog.create_database (DATABASE_NAME) event_date = datetime.strptime ( event [ “Records” ] [ 0 ] [ “eventTime” ] ، “٪ Y-٪ m-٪ dT٪ H :٪ M:٪ S.٪ fZ “) partition_dt = f ‘ {(event_date – timedelta (days = 1 ))). strftime ( “٪ Y-٪ m-٪ d “)} -00-00″ previous_partition_dt = f ‘ {(event_date – timedelta (days = 2 ))). strftime ( “٪ Y-٪ m-٪ d “)} -00-00″ logger.debug ( f “partition_dt: {partition_dt} ” ) اگر نیست wr.catalog.does_table_exist (پایگاه داده = DATABASE_NAME ، جدول = TABLE_NAME): table_query_exec_id = wr.athena.start_query_execution (s3_output = f “s3: // {os.getenv ( ‘DESTINATION_BUCKET_NAME’ )} / athena_output “ ، sql = f “ایجاد جدول خارجی {TABLE_NAME} (\ رشته `سطل` ، رشته کلید ، \ رشته version_id ، \ _ جدیدترین بولی ، \ is_delete_marker boolean است ، \ اندازه bigint ، \ آخرین مهلت تاریخ تغییر یافته ، \ رشته e_tag \ ) \ مشارکت شده توسط (رشته dt) \ ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.OpenCSVSerde’ \ ذخیره شده به عنوان INPUTFORMAT ‘org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat’ \ OUTPUTFORMAT ‘org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat’ \ LOCATION ‘s3: // {os.getenv (‘ DESTINATION_BUCKET_NAME ‘)}} / {os .getenv ( ‘SOURCE_BUCKET_NAME’ )} / demoDataBucketInventory0 / hive / ‘؛ “ ، پایگاه داده = DATABASE_NAME) wr.athena.wait_query (query_execution_id = table_query_exec_id) partition_query_exec_id = wr.athena.start_query_execution ( sql = f “ALTER TABLE {TABLE_NAME} اگر قسمت موجود نباشد ، اضافه کنید (dt = \ ‘ {partition_dt} < / span> \ ‘)؛ “ ، s3_output = f “s3: // {os.getenv ( ‘DESTINATION_BUCKET_NAME’ )} / athena_output” ، پایگاه داده = DATABASE_NAME) wr.athena.wait_query (query_execution_id = partition_query_exec_id) select_query_exec_id = wr.athena.start_query_execution (sql = “سطل DISTINCT را به عنوان” “انتخاب کنید + os.getenv ( “SOURCE_BUCKET_NAME” ) + ‘”، کلید را به عنوان” dump.txt “FROM” + کلید دهید TABLE_NAME + “where dt = ‘” + partition_dt + “” و is_delete_marker = false “است + “به جز” + “سطل DISTINCT را به عنوان” “انتخاب کنید” + os.getenv ( “SOURCE_BUCKET_NAME” ) + ‘”، کلید را به عنوان” dump.txt “FROM” + کلید دهید TABLE_NAME + “where dt = ‘” + پارتیشن_ قبلی + dt + “” and is_delete_marker = false؛ “ ، پایگاه داده = DATABASE_NAME ، s3_output = f “s3: // {os.getenv ( ‘DESTINATION_BUCKET_NAME’ )} / csv_manifest / dt = {partition_dt} “) return select_query_exec_id