Airflow
connect Apache Airflow with ByteHouse CLI
What is Airflow
Apache Airflow is an open-source platform for developing, scheduling, and monitoring batch-oriented workflows.
Prerequisite
- Install
pip
in your virtual/local environment - Install
ByteHouse CLI
in your virtual/local environment and login with your own ByteHouse account. For Mac OS, you can directly install it by homebrew. For other installation approaches, check it out ByteHouse CLI document for reference.
brew install bytehouse-cli
Airflow Installation
Firstly, install Apache Airflow in your local or virtual environment. In this tutorial, we use pip
to install it. For other ways, check it out Airflow official document for detailed information.
# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow
AIRFLOW_VERSION=2.1.3
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
# For example: 3.6
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
Note: If you failed to install with
pip
, trypip3 install
regarding your python version.
After installation, run the command airflow info
to get more information about the Airflow.
Airflow Initialization
Initialize the webserver of Airflow by executing the following commands
# initialize the database
airflow db init
airflow users create \
--username admin \
--firstname admin \
--lastname admin \
--role Admin \
--email admin
# start the web server, default port is 8080
# or modify airflow.cfg set web_server_port
airflow webserver --port 8080
After setting up the webserver, you can visit http://localhost:8080/ to log in to the Airflow console with the username and password set previously.

Open a new terminal and set up the airflow scheduler by the command below, then refresh http://localhost:8080/
# start the scheduler
# open a new terminal or else run webserver with ``-D`` option to run it as a daemon
airflow scheduler
# visit localhost:8080 in the browser and use the admin account you just
# created to login. Enable the example_bash_operator dag in the home page
YAML Configuration
Enter the folder of airflow by the command cd ~/airflow
, open the configuration file named airflow.cfg
, then add configuration and connect to the Database, by default you can use SQLite, you can connect to MySQL as well.
# sqlite by default, you can also connect to mysql
sql_alchemy_conn = mysql+pymysql://airflow:[email protected]:8080/airflow
# authenticate = False
# disable alchemy pool to prevent failure when setting up airflow scheduler
# https://github.com/apache/airflow/issues/10055
sql_alchemy_pool_enabled = False
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
dags_folder = /home/admin/airflow/dags
Create a DAG Job
Create a folder named dags
in the path of airflow, then create test_bytehouse.py
to start a new DAG job.
~/airflow
mkdir dags
cd dags
nano test_bytehouse.py
Add the following codes in test_bytehouse.py
, the job can connect to ByteHouse CLI and use BashOperator to run tasks to run queries or loading data into ByteHouse.
from datetime import timedelta
from textwrap import dedent
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'test_bytehouse',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=days_ago(1),
tags=['example'],
) as dag:
tImport = BashOperator(
task_id='ch_import',
depends_on_past=False,
bash_command='$Bytehouse_HOME/bytehouse-cli -cf /root/bytehouse-cli/conf.toml "INSERT INTO korver.cell_towers_1 FORMAT csv INFILE \'/opt/bytehousecli/data.csv\' "',
)
tSelect = BashOperator(
task_id='ch_select',
depends_on_past=False,
bash_command='$Bytehouse_HOME/bytehouse-cli -cf /root/bytehouse-cli/conf.toml -q "select * from korver.cell_towers_1 limit 10 into outfile \'/opt/bytehousecli/dataout.csv\' format csv "'
)
tSelect >> tImport
Run python test_bytehouse.py
under the current file path to create the DAG in Airflow.
Refresh the web page in the browser, you can see the newly created DAG named test_bytehouse
showing in the DAG list.

Execute the DAG
In the terminal, run the following airflow commands to check out the DAG list and test sub-tasks in test_Bytehouse
DAG. You can test the query execution and data import tasks separately.
# prints the list of tasks in the "test_bytehouse" DAG
[[email protected] dags]# airflow tasks list test_bytehouse
ch_import
ch_select
# prints the hierarchy of tasks in the "test_bytehouse" DAG
[[email protected] dags]# airflow tasks list test_bytehouse --tree
<Task(BashOperator): ch_select>
<Task(BashOperator): ch_import>
After running the DAG, check out the query history page and database module in your ByteHouse account, you can see the data is being queried/loaded successfully.

Updated 20 days ago