Connect Airflow to ByteHouse and Schedule Queries/Data loading

connect Apache Airflow with ByteHouse CLI

Prerequisite

  • Install pip in your virtual/local environment
  • Install ByteHouse CLI in your virtual/local environment and login with your own ByteHouse account. For Mac OS, you can directly install it by homebrew. For other installation approaches, check it out ByteHouse CLI document for reference.
brew install bytehouse-cli

Airflow Installation

Firstly, install Apache Airflow in your local or virtual environment. In this tutorial, we use pip to install it. For other ways, check it out Airflow official document for detailed information.

# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow

AIRFLOW_VERSION=2.1.3
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
# For example: 3.6
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

Note: If you failed to install with pip, try pip3 install regarding your python version.

After installation, run the command airflow info to get more information about the Airflow.

Airflow Initialization

Initialize the webserver of Airflow by executing the following commands

# initialize the database
airflow db init

airflow users create \
    --username admin \
    --firstname admin \
    --lastname admin \
    --role Admin \
    --email admin 
# start the web server, default port is 8080
# or modify airflow.cfg set web_server_port 
airflow webserver --port 8080

After setting up the webserver, you can visit http://localhost:8080/ to log in to the Airflow console with the username and password set previously.

Open a new terminal and set up the airflow scheduler by the command below, then refresh http://localhost:8080/

# start the scheduler
# open a new terminal or else run webserver with ``-D`` option to run it as a daemon
airflow scheduler
# visit localhost:8080 in the browser and use the admin account you just
# created to login. Enable the example_bash_operator dag in the home page

YAML Configuration

Enter the folder of airflow by the command cd ~/airflow, open the configuration file named airflow.cfg, then add configuration and connect to the Database, by default you can use SQLite, you can connect to MySQL as well.

# sqlite by default, you can also connect to mysql
sql_alchemy_conn = mysql+pymysql://airflow:[email protected]:8080/airflow

# authenticate = False

# disable alchemy pool to prevent failure when setting up airflow scheduler
# https://github.com/apache/airflow/issues/10055
sql_alchemy_pool_enabled = False

# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
dags_folder = /home/admin/airflow/dags

Create a DAG Job

Create a folder named dags in the path of airflow, then create test_bytehouse.py to start a new DAG job.

~/airflow
mkdir dags
cd dags
nano test_bytehouse.py

Add the following codes in test_bytehouse.py, the job can connect to ByteHouse CLI and use BashOperator to run tasks to run queries or loading data into ByteHouse.

from datetime import timedelta
from textwrap import dedent

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
with DAG(
    'test_bytehouse',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(1),
    tags=['example'],
) as dag:
    
    tImport  = BashOperator(
        task_id='ch_import',
        depends_on_past=False,
        bash_command='/home/admin/bytehousecli/bytehouse-cli -cf /home/admin/bytehousecli/conf.toml -q "INSERT INTO test.product FORMAT csv INFILE \'/home/admin/bytehousecli/data.csv\' "',
    )

    tSelect  = BashOperator(
        task_id='ch_select',
        depends_on_past=False,
        bash_command='/home/admin/bytehousecli/bytehouse-cli -cf /home/admin/bytehousecli/conf.toml -q "select * from test.product limit 10 into outfile \'/home/admin/bytehousecli/data.csv\' format csv "',
    )
    
    tSelect >> tImport

Run python test_bytehouse.py under the current file path to create the DAG in Airflow.

Refresh the web page in the browser, you can see the newly created DAG named test_bytehouse showing in the DAG list.

Execute the DAG

In the terminal, run the following airflow commands to check out the DAG list and test sub-tasks in test_Bytehouse DAG. You can test the query execution and data import tasks separately.

# print the list of active DAGs
airflow dags list

# prints the list of tasks in the "test_bytehouse" DAG
airflow tasks list test_bytehouse

# prints the hierarchy of tasks in the "test_bytehouse" DAG
airflow tasks list test_bytehouse --tree

# command layout: command subcommand dag_id task_id date
# testing task ch_select 
airflow tasks test test_bytehouse ch_select 2021-09-27
# testing task ch_import
airflow tasks test test_bytehouse ch_import 2021-09-27
# testing whole dag
airflow dags test test_clickhouse 2021-09-26

After running the DAG, check out the query history page and database module in your ByteHouse account, you can see the data is being queried/loaded successfully.


Did this page help you?