点亮 ⭐️ Star · 照亮开源之路
重要功能介绍
2.0.5 版本更新之后,Apache DolphinScheduler 新增了 Python API 功能,用户可以通过 Python 脚本编排工作流,最后实现工作流的创建、更新、调度等操作,这给 Python 用户带来了很多便利。
1
DolphinScheduler Python
API
01
安装
python -m pip install apache-dolphinscheduler
02
运行
wget https://raw.githubusercontent.com/apache/dolphinscheduler/dev/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial.py
python tutorial.py
2
Python API 功能
3
示例
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.python import Python
from pydolphinscheduler.tasks.shell import Shell
download_dir = "/tmp/demo"
store_dir = "dolphinscheduler"
download_link = "https://github.com/apache/dolphinscheduler/archive/refs/heads/dev.zip"
file_name = download_link.split("/")[-1]
def largest_size():
from pathlib import Path
download_dir = "/tmp/demo"
store_dir = "dolphinscheduler"
result = (None, 0)
paths = Path(download_dir).joinpath(store_dir).glob("**/*")
for path in paths:
# skip is path is directory
if path.is_dir():
continue
file_size = path.stat().st_size
if result[0] is None or file_size > result[1]:
result = (path.name, file_size)
print(result)
def most_frequently():
from pathlib import Path
download_dir = "/tmp/demo"
store_dir = "dolphinscheduler"
ext_cnt = {}
paths = Path(download_dir).joinpath(store_dir).glob("**/*")
for path in paths:
# skip is path is directory
if path.is_dir():
continue
ext = path.suffix
ext_cnt[ext] = ext_cnt[ext] + 1 if ext in ext_cnt else 1
print(max(ext_cnt.items(), key=lambda p: p[1]))
with ProcessDefinition(
name="top_ten_size_files",
tenant="zhongjiajie",
) as pd:
prepare = Shell(
name="prepare_dir",
command=f"mkdir -p {download_dir}; rm -rf {download_dir}/*"
)
download = Shell(
name="download_resources",
command=f"wget -P {download_dir} {download_link}"
)
compress = Shell(
name="compress_tar",
command=f"cd {download_dir}; unzip {file_name} -d {store_dir}"
)
largest_file = Python(
name="largest_file",
definition=largest_size,
)
most_type = Python(
name="most_type",
definition=most_frequently,
)
prepare >> download >> compress >> [
largest_file,
most_type,
]
pd.run()
4
下载资源
download = Shell(
name="download_resources",
command=f"wget -P {download_dir} {download_link}"
)
01
下载资源前后需要做的事
prepare = Shell(
name="prepare_dir",
command=f"mkdir -p {download_dir}; rm -rf {download_dir}/*"
)
compress = Shell(
name="compress_tar",
command=f"cd {download_dir}; unzip {file_name} -d {store_dir}"
)
02
计算体积最大的文件
def largest_size():
from pathlib import Path
download_dir = "/tmp/demo"
store_dir = "dolphinscheduler"
result = (None, 0)
paths = Path(download_dir).joinpath(store_dir).glob("**/*")
for path in paths:
# skip is path is directory
if path.is_dir():
continue
file_size = path.stat().st_size
if result[0] is None or file_size > result[1]:
result = (path.name, file_size)
print(result)
largest_file = Python(
name="largest_file",
definition=largest_size,
)
03
计算出现频率最高的文件类型
def most_frequently():
from pathlib import Path
download_dir = "/tmp/demo"
store_dir = "dolphinscheduler"
ext_cnt = {}
paths = Path(download_dir).joinpath(store_dir).glob("**/*")
for path in paths:
# skip is path is directory
if path.is_dir():
continue
ext = path.suffix
# skip file without suffix
if ext == "":
continue
ext_cnt[ext] = ext_cnt[ext] + 1 if ext in ext_cnt else 1
print(max(ext_cnt.items(), key=lambda p: p[1]))
most_type = Python(
name="most_type",
definition=most_frequently,
)
04
设置任务依赖
05
运行!
pd.run()
http://127.0.0.1:12345/dolphinscheduler。
python3 demo.py
5
总结
参与贡献
随着国内开源的迅猛崛起,Apache DolphinScheduler 社区迎来蓬勃发展,为了做更好用、易用的调度,真诚欢迎热爱开源的伙伴加入到开源社区中来,为中国开源崛起献上一份自己的力量,让本土开源走向全球。
参与 DolphinScheduler 社区有非常多的参与贡献的方式,包括:
贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。
社区汇总了以下适合新手的问题列表:https://github.com/apache/dolphinscheduler/issues/5689
非新手问题列表:https://github.com/apache/dolphinscheduler/issues?q=is%3Aopen+is%3Aissue+label%3A%22volunteer+wanted%22
如何参与贡献链接:https://dolphinscheduler.apache.org/zh-cn/community/development/contribute.html
来吧,DolphinScheduler开源社区需要您的参与,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是巨大的。
参与开源可以近距离与各路高手切磋,迅速提升自己的技能,如果您想参与贡献,我们有个贡献者种子孵化群,可以添加社区小助手微信(Leonard-ds) ,手把手教会您( 贡献者不分水平高低,有问必答,关键是有一颗愿意贡献的心 )。
添加小助手微信时请说明想参与贡献。
来吧,开源社区非常期待您的参与。
☞Apache DolphinScheduler 2.0.7 发布,修复补数及容错故障问题
☞挑战海量数据:基于Apache DolphinScheduler对千亿级数据应用实践
☞金融科技数据中台基于 DolphinScheduler 的应用改造