Running Spark applications interactively

You can run your Spark applications interactively by leveraging the Kernel API.

Kernel as a service provides:

Using the Kernel API

You can run a Spark application interactively by using the Kernel API. Each application runs in a kernel in a dedicated cluster. Any configuration settings that you pass through the API will override the default configurations.

To run a Spark application interactively:

  1. Generate an access token if you haven't already done so. See Generate an access token.
  2. Export the token into a variable:
     export TOKEN=<token generated>
    
  3. Provision an Analytics Engine powered by Apache Spark instance. You need Administer role in the analytics project or in the deployment space to provision an instance. See Provisioning an instance.
  4. Get the Spark kernel endpoint for the instance.

    1. From the Navigation menu on the IBM Cloud Pak for Data web user interface, click Services > Instances, find the instance and click it to view the instance details.
    2. Under Access information, copy and save the Spark kernel endpoint.
  5. Create a kernel using the kernel endpoint and access token that you generated. This example includes the minimal mandatory parameters that are required:
     curl -k -X POST <KERNEL_ENDPOINT> -H "Authorization: Bearer <ACCESS_TOKEN>" -d '{"name":"scala" }'
    

Create kernel JSON and supported parameters

If you use the Kernel service to invoke the create kernel REST API, you can add advanced configurations to the payload.

Here is a sample payload and list of parameters that you can pass in create kernel API:

curl -k -X POST <KERNEL_ENDPOINT> -H "Authorization: Bearer <ACCESS_TOKEN>" -d '{
     "name": "scala",
     "kernel_size": {
         "cpu": 1,
         "memory": "1g"
         },
     "engine": {
         "type": "spark",
         "template_id": "spark-3.0.0-ws-cp4d-template",
         "conf": {
             "spark.ui.reverseProxy": "false",
             "spark.eventLog.enabled": "false"
             },
         "size": {
             "num_workers": "2",
             "worker_size": {
                 "cpu": 1,
                 "memory": "1g"
                 }
            }
        }
}'

Response:

{
    "id": "<kernel_id>",
    "name": "scala",
    "connections": 0,
    "last_activity": "2021-07-16T11:06:26.266275Z",
    "execution_state": "starting"
}

Spark kernels API parameters

These are the parameters you can use in the Kernel API:

Name Required/Optional Type Description
name Required String Specifies the kernel name. Supported values are scala, r, python39, python38, python37(deprecated)
engine Optional Key-value pairs Specifies the Spark runtime with configuration and version information
type Required if engine is specified String Specifies the kernel runtime type. Currently, only spark is supported.
templateId Optional String Specifies the Spark version and preinstalled system libraries. The default is spark-3.0.0-ws-cp4d-template. The Spark 2.4 template spark-2.4.0-jaas-v2-cp4d-template can only be used if you are on a Cloud Pak for Data version prior to 4.0.7.
conf Optional Key-value JSON object Specifies the Spark configuration values that override the predefined values
env Optional Key-value JSON object Specifies Spark environment variables required for the job
size Optional Takes the parameters num_workers, worker_size and master_size
num_workers Required if size is specified Integer Specifies the number of worker nodes in the Spark cluster. num_workers is equal to the number of executors you want. The default is 1 executor per worker node. The maximum number of executors supported is 50.
worker_size Required if size is specified Takes the parameters cpu and memory.
cpu Required if worker_size is specified Integer Specifies the amount of CPU for the worker node. Default is 1 CPU. Maximum is 10 CPU
memory Required if worker_size is specified Integer Specifies the amount of memory for each worker node. Default is 1 GB. Maximum is 40 GB
master_size Required if size is specified Takes the parameters cpu and memory
cpu Required if master_size is specified Integer Specifies the amount of CPU for the master node. Default is 1 CPU. Maximum is 10 CPU
memory Required if master_size is specified Integer Specifies the amount of memory for each master node. Default is 1 GB. Maximum is 40 GB
kernel_size Required if size is specified Takes the parameters cpu and memory
cpu Required if kernel_size is specified Integer Specifies the amount of CPU for the kernel node. Default is 1 CPU. Maximum is 10 CPU
memory Required if kernel_size is specified Integer Specifies the amount of memory for each kernel node. Default is 1 GB. Maximum is 40 GB

Viewing kernel status

After you have created your Spark kernel, you can view the kernel details.

To view view the kernel status, enter:

curl -k -X GET <KERNEL_ENDPOINT>/<kernel_id> -H "Authorization: Bearer <ACCESS_TOKEN>"

Example response:

{
  "id": "<kernel_id>",
  "name": "scala",
  "connections": 0,
  "last_activity": "2021-07-16T11:06:26.266275Z",
  "execution_state": "starting"
}

Deleting a kernel

You can delete a Spark kernel by entering the following:

curl -k -X DELETE <KERNEL_ENDPOINT>/<kernel_id> -H "Authorization: Bearer <ACCESS_TOKEN>"

Listing kernels

You can list all the active Spark kernels by entering the following:

curl -k -X GET <KERNEL_ENDPOINT> -H "Authorization: Bearer <ACCESS_TOKEN>"

Using Spark kernels

You can use the Kernel API provided by Analytics Engine powered by Apache Spark to create a kernel, check the status of a kernel and delete a kernel with a websocket connection.

The following Python sample code uses Tornado libraries to make HTTP and WebSocket calls to a Jupyter Kernel Gateway service. You need a Python runtime environment with the Tornado package installed to run this sample code.

To create a Spark application:

  1. Install pip. If you don't have it, install the Python Tornado package using this command:
     yum install –y python-pip; pip install tornado
    
  2. In a working directory, create a file called client.py containing the following code.

    The following sample code creates a Spark Scala kernel and submits Scala code to it for execution:

     from uuid import uuid4
     from tornado import gen
     from tornado.escape import json_encode, json_decode, url_escape
     from tornado.httpclient import AsyncHTTPClient, HTTPRequest
     from tornado.ioloop import IOLoop
     from tornado.websocket import websocket_connect
     @gen.coroutine
     def main():
         token=<token>
         kg_http_url = <kernel_endpoint>
         kg_ws_url = <ws_kernel_endpoint>
         headers = {"Authorization": 'Bearer {}'.format(token) , "Content-Type": "application/json"}  
         validate_cert = False
         kernel_name="scala"
         kernel_payload={"name":"scala","kernel_size":{"cpu":3}}
         print(kernel_payload)
         code = """
             print(s"Spark Version: ${sc.version}")
             print(s"Application Name: ${sc.appName}")
             print(s"Application ID: ${sc.applicationId}")
             print(sc.parallelize(1 to 5).count())
             import org.apache.spark.sql.SQLContext;
             val sqlContext = new SQLContext(sc);
             val data_df_0 = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").option("mode", "DROPMALFORMED").csv("/opt/ibm/spark/examples/src/main/resources/people.csv");
             data_df_0.show(5)    
         """
         print("Using kernel gateway URL: {}".format(kg_http_url))
         print("Using kernel websocket URL: {}".format(kg_ws_url))
         # Remove "/" if exists in JKG url's
         if kg_http_url.endswith("/"):
             kg_http_url=kg_http_url.rstrip('/')
         if kg_ws_url.endswith("/"):
             kg_ws_url=kg_ws_url.rstrip('/')
         client = AsyncHTTPClient()
         # Create kernel
         # POST /api/kernels
         print("Creating kernel {}...".format(kernel_name))
         response = yield client.fetch(
             kg_http_url,
             method='POST',
             headers = headers,
             validate_cert=validate_cert,
             body=json_encode(kernel_payload)
         )
         kernel = json_decode(response.body)
         kernel_id = kernel['id']
         print("Created kernel {0}.".format(kernel_id))
         # Connect to kernel websocket
         # GET /api/kernels/<kernel-id>/channels
         # Upgrade: websocket
         # Connection: Upgrade
         print("Connecting to kernel websocket...")
         ws_req = HTTPRequest(url='{}/{}/channels'.format(
             kg_ws_url,
             url_escape(kernel_id)
         ),
             headers = headers,
             validate_cert=validate_cert
         )
         ws = yield websocket_connect(ws_req)
         print("Connected to kernel websocket.")
         # Submit code to websocket on the 'shell' channel
         print("Submitting code: \n{}\n".format(code))
         msg_id = uuid4().hex
         req = json_encode({
             'header': {
                 'username': '',
                 'version': '5.0',
                 'session': '',
                 'msg_id': msg_id,
                 'msg_type': 'execute_request'
             },
             'parent_header': {},
             'channel': 'shell',
             'content': {
                 'code': code,
                 'silent': False,
                 'store_history': False,
                 'user_expressions': {},
                 'allow_stdin': False
             },
             'metadata': {},
             'buffers': {}
         })
         # Send an execute request
         ws.write_message(req)
         print("Code submitted. Waiting for response...")
         # Read websocket output until kernel status for this request becomes 'idle'
         kernel_idle = False
         while not kernel_idle:
             msg = yield ws.read_message()
             msg = json_decode(msg)
             msg_type = msg['msg_type']
             print ("Received message type: {}".format(msg_type))
             if msg_type == 'error':
                 print('ERROR')
                 print(msg)
                 break
             # evaluate messages that correspond to our request
             if 'msg_id' in msg['parent_header'] and \
                             msg['parent_header']['msg_id'] == msg_id:
                 if msg_type == 'stream':
                     print("  Content: {}".format(msg['content']['text']))
                 elif msg_type == 'status' and \
                                 msg['content']['execution_state'] == 'idle':
                     kernel_idle = True
         # close websocket
         ws.close()
         # Delete kernel
         # DELETE /api/kernels/<kernel-id>
         print("Deleting kernel...")
         yield client.fetch(
             '{}/{}'.format(kg_http_url, kernel_id),
             method='DELETE',
             headers = headers,
             validate_cert=validate_cert
         )
         print("Deleted kernel {0}.".format(kernel_id))
     if __name__ == '__main__':
         IOLoop.current().run_sync(main)
    

    where:

    • <token> is the access token you generated in Using the Kernel API.
    • <kernel_endpoint> is the Spark kernel endpoint you got in Using the Kernel API. Example of a kernel endpoint: https://<cp4d_route>/v2/spark/ae/<instance_id>/jkg/api/kernels
    • <ws_kernel_endpoint> is the websocket endpoint which you create by taking the Spark kernel endpoint and changing the https prefix to wss. Based on the previous example, change to: wss://<cp4d_route>/v2/spark/ae/<instance_id>/jkg/api/kernels
  3. Run the script file:
     python client.py
    

Here are the code snippets that show how the kernel name, kernel payload and code variables can be modified in the client.py file for Python and R kernels:

Parent topic: Getting started with Spark applications