Automating Cheminformatics with Apache Airflow — Step 1: Preparing SMILES Transformations on Large Scale

Sulstice
7 min readMar 21, 2022

One day during graduate school I was assigned to process ~1.7 billion compounds in SMILES format(The EnamineDB) through a data pipeline and get the resulting output. To actually accomplish this, I realized I needed a robust pipeline data management system that will help me navigate large chunks of data with appropriate tracking. If you google the term “Laboratory Information Managment Systems” (LIMS) you arrive at a genre of software toolkits built on automating scientific workflows for specific purposes or data capture. One type of system that seemed to be an all around good fit for me was Apache Airflow. The entire system could be configured with configuration files and python, just needed to learn the module design.

python pip install apache-airflow

And other requirements:

python pip install -r Cheminformatic-Airflow/requirements.txt

The full code I have made available here: https://github.com/Sulstice/Cheminformatic-Airflow so go ahead and clone the repository after your airflow is installed:

git clone https://github.com/Sulstice/Cheminformatic-Airflow.git

Then initialize the airflow with a database should result in a file structure that looks like this and under the hood we’ve got a sqlite mixed with sqlalchemy (an object api to talk to the database in python),

airflow db init

airflow users create \
--username admin \
--firstname Peter \
--lastname Parker \
--role Admin \
--email spiderman@superhero.org

nohup airflow webserver --port 8080 &
nohup airflow scheduler &

And first step is complete:

And for future reference to stop your server or scheduler I have found these be sufficient:

cat airflow/airflow-webserver.pid | xargs kill $1kill $(ps -ef | grep "airflow scheduler" | awk '{print $2}')

To do the workflows, Apache uses Directed Acyclic Graphs (DAGs) to reduce an overall workflow into individual tasks. This makes debugging each task easy to maintain and a place to swap different tasks at different times.

The airflow repository file structure looks something like this:

So you set your dag folder in the airflow.cfg file to point to where you cloned the repository which they coin a dag bag.

dags_folder = Cheminformatic-Airflow/dags

And for any updates to the Dag, for now, because this doesn’t scale well and will be handled elsewhere a quick flush to update the dag bag but for initial development purposes it is fine:

python -c "from airflow.models import DagBag; d = DagBag();"

This can actually be refactored to be a proper update or migration for your dags, and now you should have something like this:

So where I’m headed is a couple of pipelines I implemented as data is being processed, here’s an example of an old dashboard I made for myself: I wanted to keep my base input to be SMILES because of it’s effectiveness to be able to translate to a lot of different functionalities, toolkits, and access to different experimental information if needed down the road. One of my pipelines required all the SMILES to be translated into a Structure Data Format (SDF) with added hydrogens and the SMILES meta information maintained in the file. The files needed to be individual because of an additional software dependency later in the pipeline. The file structure for the dags is relatively simple:

Each task for me is a directory because in my internal pipelines I have certain file transformations for large datasets allocated to different tasks. In my example, I am going to highlight one simple transformation which is SMILES to SDF.

So let’s take a look at one of the tasks:

# Task 1: To Convert the SMILES to SDF

# Imports
# -------
import os
import pandas as pd

# RDKit & Configurations
# ----------------------
from rdkit import Chem
from rdkit.Chem import Draw, AllChem
from rdkit.Chem.Draw import DrawingOptions

# SMILES to SDF
# -------------

def smiles_to_sdf(row):


molecule = Chem.MolFromSmiles(row['smiles'])
molecule_with_hs = Chem.AddHs(molecule)
molecule_with_hs.SetProp('smiles', row['smiles'])

sdwriter = Chem.SDWriter(
os.path.join(
'dags/task_1/data/sdf_files',
str(row.name) + '.sdf')
)
sdwriter.write(molecule_with_hs)

def step_1():
smiles_dataframe = pd.read_csv(
'dags/task_1/data/smiles.txt',
sep='\n',
header=None,
names=['smiles']
)
_ = smiles_dataframe.apply(smiles_to_sdf, axis=1)

So the function is pretty easy. It takes a data frame of a SMILES text file and one example is provided in the Github and passes it through the smiles_to_sdf function. Apache Airflow uses “operators” or languages to perform task scripts. You can apply these Operators on to any script and perform a task.

So for our case we are only using the Python Operator.

# Standard Python Internal Packages
# ---------------------------------
import os, sys
import pandas as pd
import datetime as dt

sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))

# Airflow & Configurations
# ------------------------
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

default_args = {
'owner': 'sulstice',
'start_date': dt.datetime(2018, 9, 24, 10, 00, 00),
'concurrency': 1,
'retries': 0
}

# Airflow Steps
# -------------

from task_1.task_1 import *
from task_2.task_2 import *

with DAG('smiles_to_sdf',
default_args=default_args,
schedule_interval='*/10 * * * *',
catchup=False,
) as dag:

step1 = PythonOperator(
task_id='smiles_to_sdf',
python_callable=step_1
)

step1

So here I setup the DAG workflow for what will happen. Essentially on one thread airflow will run the full SMILES to SDF and output the files. Turn on the button and see the process run:

When it turns dark green you are done! And now you have an automated system for processing SMILES to SDF. For future homework before I post it is my parallelization done under the hood for a future blog post. One thing that is very powerful about the apache airflow is the audit log system. For any compliance if you are following part 11 for electronic records by the FDA needs to have an appropriate logging system.

This data is crucial for any sufficient LIMS system to retrace steps of data usage and loss. Basically a treasure trove of information for a devops engineer.

Okay let’s move on to something more robust is their plugin system and their ability to write individual applications. Basically, a lot of my academic lab mates just want to visit a browser and see their information via an applet. Usually I use plotly or bokeh because of this interoperability. You can set your plugin directory in the airflow.cfg :

plugins_folder =  Cheminformatic-Airflow/plugins

So first we need to setup a flask app to get the plugin running, the infrastructure did take me awhile to configure:

from flask import Blueprint
from flask_appbuilder import expose, BaseView as AppBuilderBaseView

from airflow.plugins_manager import AirflowPlugin
from flask_admin.base import MenuLink

bp = Blueprint(
"plotting_plugins", __name__,
template_folder='templates', # registers airflow/plugins/templates as a Jinja template folder
static_folder='static',
static_url_path='/static/plotting_plugins ')

# Creating a flask appbuilder BaseView
class PCAAnalysisAppBuilderBaseView(AppBuilderBaseView):

template_folder = 'Cheminformatic-Airflow/plugins/plotting_plugins/templates'

@expose("/")
def list(self):

return self.render_template("pca_analysis.html")

pca_analysis_appbuilder_view = PCAAnalysisAppBuilderBaseView()
pca_analysis_appbuilder_package = {
"name": "PCA Analysis",
"category": "Pipeline Plots",
"view": pca_analysis_appbuilder_view
}



# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
name = "plotting_plugins"
# operators = []
# flask_blueprints = [bp]
# hooks = []
appbuilder_views = [pca_analysis_appbuilder_package]
# executors = []
# admin_views = []

And we will be using a Jinja template to render the HTML. Inside the script for generating the I am doing a PCA analysis which I’ve chatted about before here and is in task script 2:

# Pipeline Configurations
# -----------------------
morgan_radius = 1
bit_representation = 512

# Imports
# -------
import sys

# Scientific Imports
# ------------------
import pandas as pd
import numpy as np
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA

# RDkit Imports
# -------------
from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit.Chem.Draw import rdDepictor
from rdkit.Chem.Draw import rdMolDraw2D
from rdkit.Chem import Draw
from rdkit.Chem import DataStructs
rdDepictor.SetPreferCoordGen(True)

# Graphing Imports
# ----------------

from bokeh.plotting import ColumnDataSource, figure, output_notebook, output_file, show, save
from bokeh.io import output_notebook, export_png
from bokeh.layouts import gridplot
output_notebook()

# Global Configs
# --------------

TOOLTIPS = """<div>\nMolID: @ids<br>\n@img{safe}\n</div>\n"""
colormaps = { 0: '#e6194b', 1: '#3cb44b', 2: '#ffe119', 3: '#4363d8', 4: '#f58231', 5: '#911eb4'}

# Standard Functions
# ------------------

def mol2svg(mol):
AllChem.Compute2DCoords(mol)
d2d = rdMolDraw2D.MolDraw2DSVG(200,100)
d2d.DrawMolecule(mol)
d2d.FinishDrawing()
return d2d.GetDrawingText()

def mol2fparr(mol):

global morgan_radius
global bit_representation

arr = np.zeros((0,))
fp = AllChem.GetMorganFingerprintAsBitVect(mol, morgan_radius, nBits=bit_representation)
DataStructs.ConvertToNumpyArray(fp, arr)
return arr

def step_2():

smiles_dataframe = pd.read_csv('/home/sulstice/airflow/dags/task_1/data/smiles.txt', sep='\n', header=None, names=['smiles'])
smiles_list = smiles_dataframe['smiles'].to_list()

molecules_list = [Chem.MolFromSmiles(i) for i in smiles_list]
fingerprints_list = np.array([mol2fparr(m) for m in molecules_list])

pca = PCA(n_components=0.95)
chemicalspace = pca.fit_transform(fingerprints_list)
kmean = KMeans(n_clusters=5, random_state=0)
kmean.fit(fingerprints_list)
kmeanc = [colormaps[i] for i in kmean.labels_]

kmean_data = dict(
x=chemicalspace[:,0],
y=chemicalspace[:,2],
img=[mol2svg(m) for m in molecules_list],
ids=[str(i) for i in range(0, len(smiles_list))],
fill_color=kmeanc,
)

source = ColumnDataSource(kmean_data)
plot = figure(plot_width=500, plot_height=500, tooltips=TOOLTIPS, title='error_compounds')
plot.circle('x', 'y',color='fill_color', size=10, fill_alpha=0.2,source=source)

plot = gridplot([
[plot]
])


output_file(filename="Cheminformatic-Airflow/plugins/plotting_plugins/templates/pca_analysis.html", title="Static HTML file")

save(plot)

And Bam!

And that’s how I started automating and distributing analysis or file transformations on high throughput for chemical data.

--

--