Python - R interoperability in Azure HDInsights with Reticulate

Running pyspark scripts through R


Author: Ashish Menkudale Published on: January 10, 2020

When it comes to selecting favorite language for Data Science, both python and R communities present strong arguments. Overall, python has upperhand in infrastructure setup while R has upper hand in core statistical functionality. Although things have changed a lot recently.

If big data is in the picture, pyspark blows the compitition out of the park. SparkR or SparklyR does not have comparable functionality for data manipulation. If only there could be a solution where a project can take advantage of infrastructure setup in python/pyspark and data science models developed in R, wouldn’t it be ideal?

Reticulate lets us use python through R while rpy2 lets us access R functionality through python.

Problem statement

For one of the big data project, I had to setup an R - python interoperable framework in Azure Hdinsights platform. The objective was to use python / pyspark for faster data query , and data transformations. And once the data ETL was complete, the core algorithms in R were to be executed. Reticulate installation and setup is easy. The challange arises when it comes down to using reticulate to run python scripts which had pyspark / hive context dependency.

What this post is about

Right of the bat, there were issues as pyspark and hadoop configuration on HDInsights cluster is in python 2. And the framework, I wanted to setup was in python 3. Things started to fail when I was trying to executing pyspark script. This post is about the core issues and how to configure HDInsights cluster to make it suitable for python3 - R interoperability.

This post is more of solution to this reticulate issue.

1. Reticulate setup

Installing reticulate on Rserver running on Hdinsights was easy. Through developer level access for the cluster, package can be installed through bash script with sudo su command. Note: if we do install.packages('Reticulate') the package will be installed locally for the user. The package needs to be installed at root level.

Once package s installed, we can verify it with simply,

library(reticulate)
reticulate::py_config()

Output:

python:         /usr/bin/python
libpython:      /usr/lib/python2.7/config-x86_64-linux-gnu/libpython2.7.so
pythonhome:     /usr:/usr
version:        2.7.12 (default, Dec  4 2017, 14:50:18)  [GCC 5.4.0 20160609]
numpy:           [NOT FOUND]

python versions found: 
 /usr/bin/python
 /usr/bin/python3

2. Calling simple python3 scripts through Reticulate

Here’s wow reticulate can be used in simple cases,

python script called calc.py

class calculator:

	def __init__(self, x = 10, y = 8):
		self.x = x
		self.y = y
			
	def add(self, x = None, y = None):
		"""add function"""
		if x == None:
			x = self.x
		if y == None:
			y = self.y
		
		print("this is hello from python")	
		return x + y

we will call this script in R

library(reticulate)
reticulate::source_python('calc.py')
 
inst = calculator(x = 14, y = 16)
inst$add()

#Output
#this is hello from python
#[1] 30

3. When pyspark is in picture

Everything so far works fine. However, when python script requires spark dependency, issues start to surface. e.g. for a pyspark script test.py something like,


from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import HiveContext

spark = SparkSession.builder.appName("utils").enableHiveSupport().getOrCreate()

and through R accessing it,

reticulate::py_run_file("test.py")

it will throw,

furthermore if we do something like,

library(reticulate)
ps <- import("pyspark")
py_run_file("/home/sshuser/test.py")

the error will be,

  File "/usr/bin/hdp-select", line 242
    print "ERROR: Invalid package - " + name
    
SyntaxError: Missing parentheses in call to 'print'
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/hdp/2.6.3.84-1/spark2/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/hdp/2.6.3.84-1/spark_llap/spark-llap-assembly-1.0.0.2.6.3.84-1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

Upon investigation with Microsoft support team, the root cause was the way HDInsights clusters are configured for hadoop and spark. The config files are python 2 compatible.

The fix is making two config files on HDInsights compatible with python 3. namely, /etc/hadoop/conf/topology_script.py and /usr/bin/hdp-select.

For /etc/hadoop/conf/topology_script.py, change file as

#!/usr/bin/env python
'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''

#import sys, os
#from string import join
#import ConfigParser

from __future__ import print_function
import sys, os
try:
    from string import join
except ImportError:
    join = lambda s: " ".join(s)
try:
    import configparser as ConfigParser
except ModuleNotFoundError:
    import configparser as ConfigParser

DEFAULT_RACK = "/default-rack"
DATA_FILE_NAME =  os.path.dirname(os.path.abspath(__file__)) + "/topology_mappings.data"
SECTION_NAME = "network_topology"

class TopologyScript():
    def load_rack_map(self):
        try:
            #RACK_MAP contains both host name vs rack and ip vs rack mappings
            mappings = ConfigParser.ConfigParser()
            mappings.read(DATA_FILE_NAME)
            return dict(mappings.items(SECTION_NAME))
        except ConfigParser.NoSectionError:
            return {}
    
    def get_racks(self, rack_map, args):
        if len(args) == 1:
            return DEFAULT_RACK
        else:
            return join([self.lookup_by_hostname_or_ip(input_argument, rack_map) for input_argument in args[1:]],)

    def lookup_by_hostname_or_ip(self, hostname_or_ip, rack_map):
        #try looking up by hostname
        rack = rack_map.get(hostname_or_ip)
        if rack is not None:
            return rack
        #try looking up by ip
        rack = rack_map.get(self.extract_ip(hostname_or_ip))
        #try by localhost since hadoop could be passing in 127.0.0.1 which might not be mapped
        return rack if rack is not None else rack_map.get("localhost.localdomain", DEFAULT_RACK)

    #strips out port and slashes in case hadoop passes in something like 127.0.0.1/127.0.0.1:50010
    def extract_ip(self, container_string):
        return container_string.split("/")[0].split(":")[0]

    def execute(self, args):
        rack_map = self.load_rack_map()
        rack = self.get_racks(rack_map, args)
        print (rack)

if __name__ == "__main__":
    TopologyScript().execute(sys.argv)

Notice the first few lines to take care of print function (different in python2 and python3) and configparser. Also in execute() method, we need to use print function according to python3.

For /usr/bin/hdp-select, change it to,

#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import print_function

import optparse
import copy
import os
import re
import sys

# The global prefix and current directory
root = "/usr/hdp"
current = root + "/current"
versionRegex = re.compile('[-.]')

...
...

and, for every other user than sshuser, we’ll need to change permissions through shell (755 for execution by all users)

sudo chmod 755 /etc/hadoop/conf/topology_script.py
sudo chmod 755 /usr/bin/hdp-select

At the end, every pyspark script, we will have to setup SPARK_HOME and SPARK_CLASSPATH explicitely. This can be done through ‘os’ package as,


import sys
sys.path.append("/usr/hdp/current/spark2-client/python")
sys.path.append("/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip")

import os
os.environ['PYTHON'] = "/usr/bin/anaconda/envs/py35/bin/python"
os.environ['PYSPARK_PYTHON'] = "/usr/bin/anaconda/envs/py35/bin/python"
os.environ['SPARK_HOME'] = "/usr/hdp/current/spark2-client"
os.environ['SPARK_CLASSPATH'] = "/var/lib/ambari-server/resources/sqljdbc41.jar"

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import HiveContext

spark = SparkSession.builder.appName("utils").enableHiveSupport().getOrCreate()

This is how Azure HDInsights cluster can be configured to make it compatible for accessing pyspark functionality through reticulate.