Connecting Apache Spark and SQL databases

Introduction

This blog post demonstrates how to connect to SQL databases using Apache Spark JDBC datasource. This allows us to process data from HDFS and SQL databases like Oracle, MySQL in a single Spark SQL query

Apache Spark SQL includes jdbc datasource that can read from (and write to) SQL databases. In all the examples below the key is to get hold of the correct jdbc driver for your database version, formulate database url and read table (or query) into Spark dataframe. There are also options to parallelise the reading and specifying the fetchsize and batchsize like it is done in the java applications

Connecting to PostgreSQL

Scala

First you need to download the postgresql jdbc driver , ship it to all the executors using –jars and add it to the driver classpath using –driver-class-path. The keys things to note are how you formulate the jdbc URL and passing a table or query in parenthesis to be loaded into the dataframe.

curl -o /tmp/postgresql-42.1.4.jar https://jdbc.postgresql.org/download/postgresql-42.1.4.jar
spark-shell --driver-class-path /tmp/postgresql-42.1.4.jar --jars /tmp/postgresql-42.1.4.jar

// Connection url and the table (can also be query instead of table)
val url = "jdbc:postgresql://mydbserver.cern.ch:5432/mytestdb?user=postgres&password=newpassword"
val table = "employees"

// Load the table into DataFrame
val sDF = spark.read.format("jdbc").options(Map("url" -> url,"dbtable" -> table)).load()

// Print schema and display sample rows
sDF.printSchema()
sDF.show()

The jdbc driver can also be downloaded using the maven coordinates, in which case the code slightly changes as you need to load the jar with Class.forName and explicitly mention the driver name.

spark-shell --driver-class-path ~/.ivy2/jars/org.postgresql_postgresql-42.1.4.jar --packages org.postgresql:postgresql:42.1.4

// Load postgresql jdbc driver
Class.forName("org.postgresql.Driver")

// Connection url and the table (can also be query instead of table)
val url = "jdbc:postgresql://mydbserver.cern.ch:5432/mytestdb?user=postgres&password=newpassword"
val table = "employees"

// Load the table into DataFrame
val sDF = spark.read.format("jdbc").options(Map("driver"-> "org.postgresql.Driver","url" -> url,"dbtable" -> table)).load()

// Print schema and display sample rows
sDF.printSchema()
sDF.show()

 

PySpark

# download postgresql jdbc driver
curl -o /tmp/postgresql-42.1.4.jar https://jdbc.postgresql.org/download/postgresql-42.1.4.jar
pyspark --driver-class-path /tmp/postgresql-42.1.4.jar --jars /tmp/postgresql-42.1.4.jar

// connection url and query
url = 'jdbc:postgresql://mydbserver.cern.ch:5432/mytestdb?user=postgres&password=newpassword'
dbtable = '(select * from employees) as emp'

// load the tables into dataframe
pgsql_df = spark.read.format('jdbc') \
.options(url=url,dbtable=dbtable) \
.load()

// printschema and inspect sample rows
pgsql_df.printSchema()
pgsql_df.show()

Connecting to MySQL

Again connecting to MySQL follows the same pattern as above, you can download the jdbc driver from here and unpack or use the maven coordinates

spark-shell --driver-class-path ~/.ivy2/jars/mysql_mysql-connector-java-5.1.45.jar --packages mysql:mysql-connector-java:5.1.45

// Load MySQL jdbc driver
Class.forName("com.mysql.jdbc.Driver")

// Connection url and the table (can also be query instead of table)
val url = "jdbc:mysql://mydbserver.cern.ch:5504/metastore?user=spark&password=mypassword"
val table = "TBLS"
 
// Load the table into DataFrame
val sDF = spark.read.format("jdbc").options(Map("driver" -> "com.mysql.jdbc.Driver","url" -> url,"dbtable" -> table)).load()
 
// Print schema and display sample rows
sDF.printSchema()
sDF.show()

PySpark

# download mssql jdbc driver
curl -o /tmp/mysql-connector-java-5.1.45-bin.jar https://cernbox.cern.ch/index.php/s/F3UKUDIK11MEFxF/download
pyspark --driver-class-path /tmp/mysql-connector-java-5.1.45-bin.jar --jars /tmp/mysql-connector-java-5.1.45-bin.jar

mysql_df = spark.read.format('jdbc') \
    .options(url='jdbc:mysql://mydbserver.cern.ch:5504/metastore?user=spark&password=mypassword',dbtable='TBLS') \
    .load()
mysql_df.printSchema()
mysql_df.show()

Connecting to MS SQL Server

Again the only thing that changes in the way you formulate the jdbc url and thankfully Microsoft makes the jdbc driver available through maven.

Scala

spark-shell --driver-class-path ~/.ivy2/jars/com.microsoft.sqlserver_mssql-jdbc-6.2.2.jre8.jar --packages com.microsoft.sqlserver:mssql-jdbc:6.2.2.jre8

// Load MS SQL Server jdbc driver
Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver")

// Connection url and the table (can also be query instead of table)
val url = "jdbc:sqlserver://mydbserver.cern.ch;databaseName=UC;user=hadoop_user;password=mypassword"
val table = "HadoopDemoTable"
 
// Load the table into DataFrame
val sDF = spark.read.format("jdbc").options(Map("driver"->"com.microsoft.sqlserver.jdbc.SQLServerDriver","url" -> url,"dbtable" -> table)).load()
 
// Print schema and display sample rows
sDF.printSchema()
sDF.show()

PySpark

# download MS SQL jdbc driver
curl -o /tmp/sqljdbc42.jar https://cernbox.cern.ch/index.php/s/X23rvfIygUbLlAl/download

pyspark --driver-class-path /tmp/sqljdbc42.jar --jars /tmp/sqljdbc42.jar

mssql_df = spark.read.format('jdbc') \
 .options(url='jdbc:sqlserver://mydbserver.cern.ch;databaseName=UC;user=hadoop_user;password=mypassword',dbtable='HadoopDemoTable') \
 .load()
mssql_df.printSchema()
mssql_df.show()

Connecting to Oracle database

Oracle jdbc driver is not available on the maven central, so you have to download from the OTN. Once you have downloaded and made it available to driver and executors with –driver-class-path and –jars switch the only thing that changes is the way to formulate the url

Scala

spark-shell --driver-class-path /tmp/ojdbc7.jar --jars /tmp/ojdbc7.jar

// Connection url and the table (can also be query instead of table)
val url = "jdbc:oracle:thin:username/password@hostname:port/service_name"
val table = "applications"

// Load the table into DataFrame
val sDF = spark.read.format("jdbc").options(Map("driver"->"oracle.jdbc.driver.OracleDriver","url" -> url,"dbtable" -> table)).load()
 
// Print schema and display sample rows
sDF.printSchema()
sDF.show()

PySpark

pyspark --driver-class-path /tmp/ojdbc7.jar --jars /tmp/ojdbc7.jar

orcl_df = spark.read.format('jdbc') \
    .options(driver='oracle.jdbc.driver.OracleDriver',url='jdbc:oracle:thin:username/password@hostname:port/servicename',dbtable='applications') \
    .load()
orcl_df.printSchema()
orcl_df.show()

Driver class and connection url for SQL databases

I also include below the information I gathered pertaining to the jdbc driver class and connection url format for these databases which can be useful for future reference

PostgreSQL

Driver Class Name: org.postgresql.Driver
Format of the URL:

jdbc:postgresql://host:port/database[?propertyName1=propertyValue1][&propertyName2=propertyValue2]...

MySQL

Driver Class Name: com.mysql.jdbc.Driver
Format of the URL:

jdbc:mysql://host[,failoverhost]:port/database[?propertyName1=propertyValue1][&propertyName2=propertyValue2]...

MS SQL Server

Driver Class Name: com.microsoft.sqlserver.jdbc.SQLServerDriver
Format of the URL:

jdbc:sqlserver://[serverName[\instanceName][:portNumber]][;propertyName1=propertyValue1][;propertyName2=propertyValue2]...

Oracle

Driver Class Name: oracle.jdbc.driver.OracleDriver
Format of the URL:

jdbc:oracle:thin:[username/password]@[host][:port]:sid
jdbc:oracle:thin:[username/password]@//[host][:port]/service

Conclusion

In addition to the connection properties mentioned above you can also include other jdbc options like fetchsize, batchsize etc, the full list can be found here. There are many use cases for connecting to SQL databases from Apache Spark like moving data, persisting the results and join big data (facts) with SQL tables (dimensions) and I hope this blogpost assists you in achieving this.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s