IDC

使用AirFlow调度MaxCompute

作者:admin 2021-08-10 我要评论

简介: airflow是Airbnb开源的一个用python编写的调度工具,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行,通过python代码定义子任...

在说正事之前,我要推荐一个福利:你还在原价购买阿里云、腾讯云、华为云服务器吗?那太亏啦!来这里,新购、升级、续费都打折,能够为您省60%的钱呢!2核4G企业级云服务器低至69元/年,点击进去看看吧>>>)

简介: airflow是Airbnb开源的一个用python编写的调度工具,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行,通过python代码定义子任务,并支持各种Operate操作器,灵活性大,能满足用户的各种需求。本文主要介绍使用Airflow的python Operator调度MaxCompute 任务。

背景

airflow是Airbnb开源的一个用python编写的调度工具,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行,通过python代码定义子任务,并支持各种Operate操作器,灵活性大,能满足用户的各种需求。本文主要介绍使用Airflow的python Operator调度MaxCompute 任务。

一、环境准备

  • Python 2.7.5 PyODPS支持Python2.6以上版本
  • Airflow apache-airflow-1.10.7

1.安装MaxCompute需要的包

pip install setuptools>=3.0

pip install requests>=2.4.0

pip install greenlet>=0.4.10 # 可选,安装后能加速Tunnel上传。

pip install cython>=0.19.0 # 可选,不建议Windows用户安装。

pip install pyodps

注意:如果requests包冲突,先卸载再安装对应的版本

2.执行如下命令检查安装是否成功

python -c "from odps import ODPS"

二、开发步骤

image.png

1.在Airflow家目录编写python调度脚本Airiflow_MC.py

# -*- coding: UTF-8 -*-

import sys

import os

from odps import ODPS

from odps import options

from airflow import DAG

from airflow.operators.python_operator import PythonOperator

from datetime import datetime, timedelta

from configparser import ConfigParser

import time

reload(sys)

sys.setdefaultencoding('utf8')

#修改系统默认编码。

# MaxCompute参数设置

options.sql.settings = {'options.tunnel.limit_instance_tunnel': False, 'odps.sql.allow.fullscan': True}

cfg = ConfigParser()

cfg.read("odps.ini")

print(cfg.items())

odps = ODPS(cfg.get("odps","access_id"),cfg.get("odps","secret_access_key"),cfg.get("odps","project"),cfg.get("odps","endpoint"))

default_args = {

    'owner': 'airflow',

    'depends_on_past': False,

    'retry_delay': timedelta(minutes=5),

    'start_date':datetime(2020,1,15)

    # 'email': ['airflow@example.com'],

    # 'email_on_failure': False,

    # 'email_on_retry': False,

    # 'retries': 1,

    # 'queue': 'bash_queue',

    # 'pool': 'backfill',

    # 'priority_weight': 10,

    # 'end_date': datetime(2016, 1, 1),

}

dag = DAG(

    'Airiflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30))

def read_sql(sqlfile):

    with io.open(sqlfile, encoding='utf-8', mode='r') as f:

        sql=f.read()

    f.closed

    return sql

def get_time():

    print '当前时间是{}'.format(time.time())

    return time.time()

def mc_job ():



    project = odps.get_project()  # 取到默认项目。

    instance=odps.run_sql("select * from long_chinese;")

    print(instance.get_logview_address())

    instance.wait_for_success()

    with instance.open_reader() as reader:

        count = reader.count

    print("查询表数据条数:{}".format(count))

    for record in reader:

        print record

    return count

t1 = PythonOperator (

    task_id = 'get_time' ,

    provide_context = False ,

    python_callable = get_time,

    dag = dag )



t2 = PythonOperator (

    task_id = 'mc_job' ,

    provide_context = False ,

    python_callable = mc_job ,

    dag = dag )

t2.set_upstream(t1)

2.提交

python Airiflow_MC.py

3.进行测试

# print the list of active DAGs

airflow list_dags

# prints the list of tasks the "tutorial" dag_id

airflow list_tasks Airiflow_MC


# prints the hierarchy of tasks in the tutorial DAG

airflow list_tasks Airiflow_MC --tree

#测试task

airflow test Airiflow_MC get_time 2010-01-16

airflow test Airiflow_MC mc_job 2010-01-16

4.运行调度任务

登录到web界面点击按钮运行

image.png

5.查看任务运行结果

(1)点击view log

image.png

(2)查看结果
image.png

原文链接
本文为阿里云原创内容,未经允许不得转载。


本文转自网络,版权归原作者所有,原文链接:https://segmentfault.com/a/1190000040490345

版权声明:本文转载自网络,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。本站转载出于传播更多优秀技术知识之目的,如有侵权请联系QQ/微信:153890879删除

相关文章
  • 使用AirFlow调度MaxCompute

    使用AirFlow调度MaxCompute

  • CSS 即将支持嵌套,SASS/LESS 等预处理

    CSS 即将支持嵌套,SASS/LESS 等预处理

  • CSS filter 生成不规则边框.md

    CSS filter 生成不规则边框.md

  • 前端常见算法题(树篇)

    前端常见算法题(树篇)

腾讯云代理商
海外云服务器