欧洲杯观直播平台

admin · 2022-03-01

  

  后面聊了Airflow根蒂架构??,以及又讲了若何正在容器化外部署Airflow??,本日咱们就再来看看若何经由过程Airflow和celery构修一个雄厚的散布式调剂集群。

   1集群境遇

  同样是正在Ubuntu 20.04.3 LTS机械上装配Airflow集群,此次咱们盘算三台平等摆设效劳器,停止测试,前篇著作??[1]中,咱们依然正在Bigdata1效劳器上装配了airflow的通盘组件,没看过的可能点击链接先看下以前的著作,现正在只要要正在其余两个节点装配worker组件便可。

   Bigdata1(A) Bigdata2(B) Bigdata3(C) Webserver √ Scheduler √ Worker √ √ √

  正在上篇著作中的docker-compose.yml中没有对安顿文献以及数据目次停止的区别,云云正在前期统制的工夫不太轻易,所以咱们可能把效劳勾留后,将数据库以及数据目次与安顿文献分隔

   安顿文献:docker-compose.yaml/.env 寄存正在/apps/airflow目次下 MySQL以及摆设文献: 放正在/data/mysql airflow数据目次: 放正在/data/airflow

  云云拆分隔就轻易前期的团结统制了。

   2安顿worker效劳

  后期盘算

  

mkdir/data/airflow/{dags,plugins}-pvmkdir-pv/apps/airflowmkdir-pv/logs/airflow

 

  worker的安顿文献:

  

---version:3x-airflow-co妹妹on:&airflow-co妹妹on#Inordertoaddcustomdependenciesorupgradeproviderpackagesyoucanuseyourextendedimage.#Co妹妹enttheimageline,placeyourDockerfileinthedirectorywhereyouplacedthedocker-compose.yaml#andunco妹妹entthe"build"linebelow,Thenrun`docker-composebuild`tobuildtheimages.image:${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.3}#build:.environment:&airflow-co妹妹on-envAIRFLOW__CORE__EXECUTOR:CeleryExecutorAIRFLOW__CORE__SQL_ALCHEMY_CONN:mysql+mysqldb://airflow:aaaa@$${MYSQL_HOST}:3306/airflow#点窜MySQL对应的账号和暗码AIRFLOW__CELERY__RESULT_BACKEND:db+mysql://airflow:aaaa@$${MYSQL_HOST}:3306/airflow#点窜MySQL对应的账号和暗码AIRFLOW__CELERY__BROKER_URL:redis://:xxxx@$${REDIS_HOST}:7480/0#点窜Redis的暗码AIRFLOW__CORE__FERNET_KEY:AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION:trueAIRFLOW__CORE__LOAD_EXAMPLES:trueAIRFLOW__API__AUTH_BACKEND:airflow.api.auth.backend.basic_auth_PIP_ADDITIONAL_REQUIREMENTS:${_PIP_ADDITIONAL_REQUIREMENTS:-}volumes:-/data/airflow/dags:/opt/airflow/dags-/logs/airflow:/opt/airflow/logs-/data/airflow/plugins:/opt/airflow/plugins-/data/airflow/airflow.cfg:/opt/airflow/airflow.cfguser:"${AIRFLOW_UID:-50000}:0"services:airflow-worker:<<:*airflow-co妹妹onco妹妹and:celeryworkerhealthcheck:test:-"CMD-SHELL"-celery--appairflow.executors.celery_executor.appinspectping-d"celery@$${HOSTNAME}"interval:10stimeout:10sretries:5environment:<<:*airflow-co妹妹on-env#Requiredtohandlewarmshutdownoftheceleryworkersproperly#Seehttps://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagationDUMB_INIT_SETSID:"0"restart:alwayshostname:bigdata-20-194#此处修设容器的主机名,便于正在flower中检查是哪一个workerdepends_on:airflow-init:condition:service_completed_successfullyairflow-init:<<:*airflow-co妹妹onentrypoint:/bin/bash#yamllintdisablerule:line-lengthco妹妹and:--c-

 

  初始化检测,搜检境遇能否知足:

  

cd/apps/ariflow/echo-e"AIRFLOW_UID=$(id-u)">.env#提神,此处必然要包管AIRFLOW_UID是凡是用户的UID,且包管此用户有创修这些经久化目次的权限docker-composeupairflow-init

 

  要是数据库依然存正在,初始化检测不影响已有的数据库,接上去就运转airflow-worker效劳

  

docker-composeup-d

  接上去,遵从同样的形式正在bigdata3节点上装配airflow-worker效劳便可能了。安顿完结以后,便可能经由过程flower检查broker的状况:

  

   3经久化摆设文献

  大家状况下,利用airflow众worker节点的集群,咱们就必要经久化airflow的摆设文献,而且将airflow同步到通盘的节点上,所以这里必要点窜一下docker-compose.yaml中x-airflow-co妹妹on的volumes,将airflow.cfg经由过程挂载卷的花样挂载到容器中,摆设文献可能正在容器中拷贝一份出来,而后正在点窜;

  后期利用的工夫,咱们必要将docker-compose文献中的少少境遇变量的值写入到airflow.cfg文献中,比方如下新闻:

  

[core]dags_folder=/opt/airflow/dagshostname_callable=socket.getfqdndefault_timezone=Asia/Shanghai#点窜时区executor=CeleryExecutorsql_alchemy_conn=mysql+mysqldb://airflow:aaaa@$${MYSQL_HOST}:3306/airflowsql_engine_encoding=utf-8sql_alchemy_pool_enabled=Truesql_alchemy_pool_size=5sql_alchemy_max_overflow=10sql_alchemy_pool_recycle=1800sql_alchemy_pool_pre_ping=Truesql_alchemy_schema=parallelism=32max_active_tasks_per_dag=16dags_are_paused_at_creation=Truemax_active_runs_per_dag=16load_examples=Trueload_default_connections=Trueplugins_folder=/opt/airflow/pluginsexecute_tasks_new_python_interpreter=Falsefernet_key=donot_pickle=Truedagbag_import_timeout=30.0dagbag_import_error_tracebacks=Truedagbag_import_error_traceback_depth=2dag_file_processor_timeout=50task_runner=StandardTaskRunnerdefault_impersonation=security=unit_test_mode=Falseenable_xcom_pickling=Falsekilled_task_cleanup_time=60dag_run_conf_overrides_params=Truedag_discovery_safe_mode=Truedefault_task_retries=0default_task_weight_rule=downstrea妹妹in_serialized_dag_update_interval=30min_serialized_dag_fetch_interval=10max_num_rendered_ti_fields_per_task=30check_slas=Truexcom_backend=airflow.models.xcom.BaseXComlazy_load_plugins=Truelazy_discover_providers=Truemax_db_retries=3hide_sensitive_var_conn_fields=Truesensitive_var_conn_names=default_pool_task_slot_count=128[logging]base_log_folder=/opt/airflow/logsremote_logging=Falseremote_log_conn_id=google_key_path=remote_base_log_folder=encrypt_s3_logs=Falselogging_level=INFOfab_logging_level=WARNINGlogging_config_class=colored_console_log=Truecolored_log_format=[%%(blue)s%%(asctime)s%%(reset)s]{%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d}%%(log_color)s%%(levelname)s%%(reset)s-%%(log_color)s%%(message)s%%(reset)scolored_formatter_class=airflow.utils.log.colored_log.CustomTTYColoredFormatterlog_format=[%%(asctime)s]{%%(filename)s:%%(lineno)d}%%(levelname)s-%%(message)ssimple_log_format=%%(asctime)s%%(levelname)s-%%(message)stask_log_prefix_template=log_filename_template={{ti.dag_id}}/{{ti.task_id}}/{{ts}}/{{try_number}}.loglog_processor_filename_template={{filename}}.logdag_processor_manager_log_location=/opt/airflow/logs/dag_processor_manager/dag_processor_manager.logtask_log_reader=taskextra_logger_names=worker_log_server_port=8793[metrics]statsd_on=Falsestatsd_host=localhoststatsd_port=8125statsd_prefix=airflowstatsd_allow_list=stat_name_handler=statsd_datadog_enabled=Falsestatsd_datadog_tags=[secrets]backend=backend_kwargs=[cli]api_client=airflow.api.client.local_clientendpoint_url=http://localhost:8080[debug]fail_fast=False[api]enable_experimental_api=Falseauth_backend=airflow.api.auth.backend.deny_allmaximum_page_limit=100fallback_page_limit=100google_oauth2_audience=google_key_path=access_control_allow_headers=access_control_allow_methods=access_control_allow_origins=[lineage]backend=[atlas]sasl_enabled=Falsehost=port=21000username=password=[operators]default_owner=airflowdefault_cpus=1default_ram=512default_disk=512default_gpus=0default_queue=defaultallow_illegal_arguments=False[hive]default_hive_mapred_queue=[webserver]base_url=https://devopsman.cn/airflow#自界说airflow域名default_ui_timezone=Asia/Shanghai#修设默许的时区web_server_host=0.0.0.0web_server_port=8080web_server_ssl_cert=web_server_ssl_key=web_server_master_timeout=120web_server_worker_timeout=120worker_refresh_batch_size=1worker_refresh_interval=6000reload_on_plugin_change=Falsesecret_key=emEfndkf3QWZ5zVLE1kVMg==workers=4worker_class=syncaccess_logfile=-error_logfile=-access_logformat=expose_config=Falseexpose_hostname=Trueexpose_stacktrace=Truedag_default_view=treedag_orientation=LRlog_fetch_timeout_sec=5log_fetch_delay_sec=2log_auto_tailing_offset=30log_animation_speed=1000hide_paused_dags_by_default=Falsepage_size=100navbar_color=#fffdefault_dag_run_display_number=25enable_proxy_fix=Falseproxy_fix_x_for=1proxy_fix_x_proto=1proxy_fix_x_host=1proxy_fix_x_port=1proxy_fix_x_prefix=1cookie_secure=Falsecookie_samesite=Laxdefault_wrap=Falsex_frame_enabled=Trueshow_recent_stats_for_completed_runs=Trueupdate_fab_perms=Truesession_lifetime_minutes=43200auto_refresh_interval=3[email]email_backend=airflow.utils.email.send_email_smtpemail_conn_id=smtp_defaultdefault_email_on_retry=Truedefault_email_on_failure=True[smtp]#邮箱摆设smtp_host=localhostsmtp_starttls=Truesmtp_ssl=Falsesmtp_port=25smtp_mail_from=airflow@example.comsmtp_timeout=30smtp_retry_limit=5[sentry]sentry_on=falsesentry_dsn=[celery_kubernetes_executor]kubernetes_queue=kubernetes[celery]celery_app_name=airflow.executors.celery_executorworker_concurrency=16worker_umask=0o077broker_url=redis://:xxxx@$${REDIS_HOST}:7480/0result_backend=db+mysql://airflow:aaaa@$${MYSQL_HOST}:3306/airflowflower_host=0.0.0.0flower_url_prefix=flower_port=5555flower_basic_auth=sync_parallelism=0celery_config_options=airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIGssl_active=Falsessl_key=ssl_cert=ssl_cacert=pool=preforkoperation_timeout=1.0task_track_started=Truetask_adoption_timeout=600task_publish_max_retries=3worker_precheck=False[celery_broker_transport_options][dask]cluster_address=127.0.0.1:8786tls_ca=tls_cert=tls_key=[scheduler]job_heartbeat_sec=5scheduler_heartbeat_sec=5num_runs=-1scheduler_idle_sleep_time=1min_file_process_interval=30dag_dir_list_interval=300print_stats_interval=30pool_metrics_interval=5.0scheduler_health_check_threshold=30orphaned_tasks_check_interval=300.0child_process_log_directory=/opt/airflow/logs/schedulerscheduler_zombie_task_threshold=300catchup_by_default=Truemax_tis_per_query=512use_row_level_locking=Truemax_dagruns_to_create_per_loop=10max_dagruns_per_loop_to_schedule=20schedule_after_task_execution=Trueparsing_processes=2file_parsing_sort_mode=modified_timeuse_job_schedule=Trueallow_trigger_in_future=Falsedependency_detector=airflow.serialization.serialized_objects.DependencyDetectortrigger_timeout_check_interval=15[triggerer]default_capacity=1000[kerberos]ccache=/tmp/airflow_krb5_ccacheprincipal=airflowreinit_frequency=3600kinit_path=kinitkeytab=airflow.keytabforwardable=Trueinclude_ip=True[github_enterprise]api_rev=v3[elasticsearch]host=log_id_template={dag_id}-{task_id}-{execution_date}-{try_number}end_of_log_mark=end_of_logfrontend=write_stdout=Falsejson_format=Falsejson_fields=asctime,filename,lineno,levelname,messagehost_field=hostoffset_field=offset[elasticsearch_configs]use_ssl=Falseverify_certs=True[kubernetes]pod_template_file=worker_container_repository=worker_container_tag=namespace=defaultdelete_worker_pods=Truedelete_worker_pods_on_failure=Falseworker_pods_creation_batch_size=1multi_namespace_mode=Falsein_cluster=Truekube_client_request_args=delete_option_kwargs=enable_tcp_keepalive=Truetcp_keep_idle=120tcp_keep_intvl=30tcp_keep_cnt=6verify_ssl=Trueworker_pods_pending_timeout=300worker_pods_pending_timeout_check_interval=120worker_pods_queued_check_interval=60worker_pods_pending_timeout_batch_size=100[smart_sensor]use_smart_sensor=Falseshard_code_upper_limit=10000shards=5sensors_enabled=NamedHivePartitionSensor

 

  点窜完结以后,重启一下效劳。

  

docker-composerestart

4数据同步

 

  由于airflow利用了三个worker节点,每一个节点点窜摆设,其余节点都要同步,同时DAGS目次以及plugins目次也必要及时停止同步,正在scheduler将新闻调剂到某个节点后,要是找不到对应的DAGS文献,就会报错,所以咱们利用lsyncd停止数据及时同步:

  

apt-getinstalllsyncd-y

 

  摆设节点之间经由过程公钥毗邻

  

ssh-keygen-trsa-C"airflow-sync"-b4096#天生一对名为airflow-sync的密钥foripin100200;dossh-copy-id-i~/.ssh/airflow-sync.pub${USERNAME}@192.168.0.$ip-P12022;done

 

  而后咱们便可能经由过程私钥拜访了别的节点了。

  编纂同步的摆设文献,lsyncd摆设的更众参数进修,可能直达官方文档[2]

  

settings{logfile="/var/log/lsyncd.log",#日记文献statusFile="/var/log/lsyncd.status",#同步状况新闻pidfile="/var/run/lsyncd.pid",statusInterval=1,nodaemon=false,#保卫经过inotifyMode="CloseWrite",maxProcesses=1,maxDelays=1,}sync{default.rsync,source="/data/airflow",target="192.168.0.100:/data/airflow",rsync={binary="/usr/bin/rsync",compress=false,archive=true,owner=true,perms=true,--delete=true,whole_file=false,rsh="/usr/bin/ssh-p12022-lsuoper-oStrictHostKeyChecking=no-i/home/username/.ssh/airflow-rsync"},}sync{default.rsync,source="/data/airflow",target="192.168.0.200:/data/airflow",rsync={binary="/usr/bin/rsync",compress=false,archive=true,owner=true,perms=true,--delete=true,whole_file=false,rsh="/usr/bin/ssh-p12022-lsuoper-oStrictHostKeyChecking=no-i/home/username/.ssh/airflow-rsync"},}

 

  以上的参数是甚么旨趣,可能拜访官网检查,此处是经由过程rsync的rsh界说ssh号令,可能处分利用了私钥,自界说端口等平和办法的场景,固然你也可能利用摆设无密拜访,而后利用default.rsync或许default.rsyncssh等停止摆设。

  摆设lsyncd的效劳托管

  

cat<<EOF>/etc/systemd/system/lsyncd.service[Unit]Description=lsyncdConditionFileIsExecutable=/usr/bin/lsyncdAfter=network-online.targetWants=network-online.tarGET@[Service]StartLimitBurst=10ExecStart=/usr/bin/lsyncd/etc/lsyncd.confRestart=on-failureRestartSec=120EnvironmentFile=-/etc/sysconfig/aliyunKillMode=process[Install]WantedBy=multi-user.targetEOFsystemctldaemon-reloadsystemctlenable--nowlsyncd.service#启动效劳并摆设开启自启

 

  云云就完结了数据(dags,plugins,airflow.cfg)的同步题目,前期利用CICD场景的工夫,即可能直接将dag文献上传到Bigdata1节点上便可,其余两个节点就会主动同步了。要是崭露题目,可能经由过程检查日记停止debug

  

lsyncd-logall/etc/lsyncd.conftail-f/var/log/lsyncd.log

   5反向代办[3]

  要是你必要将airflow放正在反向代办以后,如https://lab.mycompany.com/myorg/airflow/你可能经由过程一下摆设完结:

  正在airflow.cfg中摆设base_url

  

base_url=http://my_host/myorg/airflowenable_proxy_fix=True

 

  nginx的摆设

  

server{listen80;server_namelab.mycompany.com;location/myorg/airflow/{proxy_passhttp://localhost:8080;proxy_set_headerHost$http_host;proxy_redirectoff;proxy_http_version1.1;proxy_set_headerUpgrade$http_upgrade;proxy_set_headerConnection"upgrade";}}

 

  到这里就根基上完结的airflow散布式调剂集群的装配了.看下实在功效如下。

  

  看到这里阐发你也正正在利用或对Airflow感兴致,趁机送你一个进修Airflow材料;

  https://livebook.manning.com/book/data-pipelines-with-apache-airflow/chapter-12/1

  参考材料

  [1]Airflow 2.2.3 + MySQL8.0.27: https://mp.weixin.qq.com/s/VncpyXcTtlvnDkFrsAZ5lQ

  [2]lsyncd config file: https://lsyncd.github.io/lsyncd/manual/config/file/

  [3]airflow-behind-proxy: https://airflow.apache.org/docs/apache-airflow/stable/howto/run-behind-proxy.html

文章推荐:

2022 年中国人工智能行业发展现状与市场规模分析 市场规模超 3000 亿元

该来的总要来! 切尔西老板将彻底退出英国市场

雷神黑武士四代开售:i7搭RTX3060不到9千元

智慧城市中 5G 和物联网的未来