Running Spark applications interactively
You can run your Spark applications interactively by leveraging the Kernel API.
Kernel as a service provides:
- Jupyter kernels as a first class entity
- A dedicated cluster per kernel
- Cluster and Kernel customized with user libraries
Using the Kernel API
You can run a Spark application interactively by using the Kernel API. Each application runs in a kernel in a dedicated cluster. Any configuration settings that you pass through the API will override the default configurations.
To run a Spark application interactively:
- Generate an access token if you haven't already done so. See Generate an access token.
- Export the token into a variable:
export TOKEN=<token generated> - Provision an Analytics Engine powered by Apache Spark instance. You need Administer role in the analytics project or in the deployment space to provision an instance. See Provisioning an instance.
-
Get the Spark kernel endpoint for the instance.
- From the Navigation menu on the IBM Cloud Pak for Data web user interface, click Services > Instances, find the instance and click it to view the instance details.
- Under Access information, copy and save the Spark kernel endpoint.
- Create a kernel using the kernel endpoint and access token that you generated. This example includes the minimal mandatory parameters that are required:
curl -k -X POST <KERNEL_ENDPOINT> -H "Authorization: Bearer <ACCESS_TOKEN>" -d '{"name":"scala" }'
Create kernel JSON and supported parameters
If you use the Kernel service to invoke the create kernel REST API, you can add advanced configurations to the payload.
Here is a sample payload and list of parameters that you can pass in create kernel API:
curl -k -X POST <KERNEL_ENDPOINT> -H "Authorization: Bearer <ACCESS_TOKEN>" -d '{
"name": "scala",
"kernel_size": {
"cpu": 1,
"memory": "1g"
},
"engine": {
"type": "spark",
"template_id": "spark-3.0.0-ws-cp4d-template",
"conf": {
"spark.ui.reverseProxy": "false",
"spark.eventLog.enabled": "false"
},
"size": {
"num_workers": "2",
"worker_size": {
"cpu": 1,
"memory": "1g"
}
}
}
}'
Response:
{
"id": "<kernel_id>",
"name": "scala",
"connections": 0,
"last_activity": "2021-07-16T11:06:26.266275Z",
"execution_state": "starting"
}
Spark kernels API parameters
These are the parameters you can use in the Kernel API:
| Name | Required/Optional | Type | Description |
|---|---|---|---|
| name | Required | String | Specifies the kernel name. Supported values are scala, r, python39, python38, python37(deprecated) |
| engine | Optional | Key-value pairs | Specifies the Spark runtime with configuration and version information |
| type | Required if engine is specified | String | Specifies the kernel runtime type. Currently, only spark is supported. |
| templateId | Optional | String | Specifies the Spark version and preinstalled system libraries. The default is spark-3.0.0-ws-cp4d-template. The Spark 2.4 template spark-2.4.0-jaas-v2-cp4d-template can only be used if you are on a Cloud Pak for Data
version prior to 4.0.7. |
| conf | Optional | Key-value JSON object | Specifies the Spark configuration values that override the predefined values |
| env | Optional | Key-value JSON object | Specifies Spark environment variables required for the job |
| size | Optional | Takes the parameters num_workers, worker_size and master_size |
|
| num_workers | Required if size is specified | Integer | Specifies the number of worker nodes in the Spark cluster. num_workers is equal to the number of executors you want. The default is 1 executor per worker node. The maximum number of executors supported is 50. |
| worker_size | Required if size is specified | Takes the parameters cpu and memory. |
|
| cpu | Required if worker_size is specified |
Integer | Specifies the amount of CPU for the worker node. Default is 1 CPU. Maximum is 10 CPU |
| memory | Required if worker_size is specified |
Integer | Specifies the amount of memory for each worker node. Default is 1 GB. Maximum is 40 GB |
| master_size | Required if size is specified | Takes the parameters cpu and memory |
|
| cpu | Required if master_size is specified |
Integer | Specifies the amount of CPU for the master node. Default is 1 CPU. Maximum is 10 CPU |
| memory | Required if master_size is specified |
Integer | Specifies the amount of memory for each master node. Default is 1 GB. Maximum is 40 GB |
| kernel_size | Required if size is specified | Takes the parameters cpu and memory |
|
| cpu | Required if kernel_size is specified |
Integer | Specifies the amount of CPU for the kernel node. Default is 1 CPU. Maximum is 10 CPU |
| memory | Required if kernel_size is specified |
Integer | Specifies the amount of memory for each kernel node. Default is 1 GB. Maximum is 40 GB |
Viewing kernel status
After you have created your Spark kernel, you can view the kernel details.
To view view the kernel status, enter:
curl -k -X GET <KERNEL_ENDPOINT>/<kernel_id> -H "Authorization: Bearer <ACCESS_TOKEN>"
Example response:
{
"id": "<kernel_id>",
"name": "scala",
"connections": 0,
"last_activity": "2021-07-16T11:06:26.266275Z",
"execution_state": "starting"
}
Deleting a kernel
You can delete a Spark kernel by entering the following:
curl -k -X DELETE <KERNEL_ENDPOINT>/<kernel_id> -H "Authorization: Bearer <ACCESS_TOKEN>"
Listing kernels
You can list all the active Spark kernels by entering the following:
curl -k -X GET <KERNEL_ENDPOINT> -H "Authorization: Bearer <ACCESS_TOKEN>"
Using Spark kernels
You can use the Kernel API provided by Analytics Engine powered by Apache Spark to create a kernel, check the status of a kernel and delete a kernel with a websocket connection.
The following Python sample code uses Tornado libraries to make HTTP and WebSocket calls to a Jupyter Kernel Gateway service. You need a Python runtime environment with the Tornado package installed to run this sample code.
To create a Spark application:
- Install pip. If you don't have it, install the Python Tornado package using this command:
yum install –y python-pip; pip install tornado -
In a working directory, create a file called
client.pycontaining the following code.The following sample code creates a Spark Scala kernel and submits Scala code to it for execution:
from uuid import uuid4 from tornado import gen from tornado.escape import json_encode, json_decode, url_escape from tornado.httpclient import AsyncHTTPClient, HTTPRequest from tornado.ioloop import IOLoop from tornado.websocket import websocket_connect @gen.coroutine def main(): token=<token> kg_http_url = <kernel_endpoint> kg_ws_url = <ws_kernel_endpoint> headers = {"Authorization": 'Bearer {}'.format(token) , "Content-Type": "application/json"} validate_cert = False kernel_name="scala" kernel_payload={"name":"scala","kernel_size":{"cpu":3}} print(kernel_payload) code = """ print(s"Spark Version: ${sc.version}") print(s"Application Name: ${sc.appName}") print(s"Application ID: ${sc.applicationId}") print(sc.parallelize(1 to 5).count()) import org.apache.spark.sql.SQLContext; val sqlContext = new SQLContext(sc); val data_df_0 = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").option("mode", "DROPMALFORMED").csv("/opt/ibm/spark/examples/src/main/resources/people.csv"); data_df_0.show(5) """ print("Using kernel gateway URL: {}".format(kg_http_url)) print("Using kernel websocket URL: {}".format(kg_ws_url)) # Remove "/" if exists in JKG url's if kg_http_url.endswith("/"): kg_http_url=kg_http_url.rstrip('/') if kg_ws_url.endswith("/"): kg_ws_url=kg_ws_url.rstrip('/') client = AsyncHTTPClient() # Create kernel # POST /api/kernels print("Creating kernel {}...".format(kernel_name)) response = yield client.fetch( kg_http_url, method='POST', headers = headers, validate_cert=validate_cert, body=json_encode(kernel_payload) ) kernel = json_decode(response.body) kernel_id = kernel['id'] print("Created kernel {0}.".format(kernel_id)) # Connect to kernel websocket # GET /api/kernels/<kernel-id>/channels # Upgrade: websocket # Connection: Upgrade print("Connecting to kernel websocket...") ws_req = HTTPRequest(url='{}/{}/channels'.format( kg_ws_url, url_escape(kernel_id) ), headers = headers, validate_cert=validate_cert ) ws = yield websocket_connect(ws_req) print("Connected to kernel websocket.") # Submit code to websocket on the 'shell' channel print("Submitting code: \n{}\n".format(code)) msg_id = uuid4().hex req = json_encode({ 'header': { 'username': '', 'version': '5.0', 'session': '', 'msg_id': msg_id, 'msg_type': 'execute_request' }, 'parent_header': {}, 'channel': 'shell', 'content': { 'code': code, 'silent': False, 'store_history': False, 'user_expressions': {}, 'allow_stdin': False }, 'metadata': {}, 'buffers': {} }) # Send an execute request ws.write_message(req) print("Code submitted. Waiting for response...") # Read websocket output until kernel status for this request becomes 'idle' kernel_idle = False while not kernel_idle: msg = yield ws.read_message() msg = json_decode(msg) msg_type = msg['msg_type'] print ("Received message type: {}".format(msg_type)) if msg_type == 'error': print('ERROR') print(msg) break # evaluate messages that correspond to our request if 'msg_id' in msg['parent_header'] and \ msg['parent_header']['msg_id'] == msg_id: if msg_type == 'stream': print(" Content: {}".format(msg['content']['text'])) elif msg_type == 'status' and \ msg['content']['execution_state'] == 'idle': kernel_idle = True # close websocket ws.close() # Delete kernel # DELETE /api/kernels/<kernel-id> print("Deleting kernel...") yield client.fetch( '{}/{}'.format(kg_http_url, kernel_id), method='DELETE', headers = headers, validate_cert=validate_cert ) print("Deleted kernel {0}.".format(kernel_id)) if __name__ == '__main__': IOLoop.current().run_sync(main)where:
<token>is the access token you generated in Using the Kernel API.<kernel_endpoint>is the Spark kernel endpoint you got in Using the Kernel API. Example of a kernel endpoint:https://<cp4d_route>/v2/spark/ae/<instance_id>/jkg/api/kernels<ws_kernel_endpoint>is the websocket endpoint which you create by taking the Spark kernel endpoint and changing thehttpsprefix towss. Based on the previous example, change to:wss://<cp4d_route>/v2/spark/ae/<instance_id>/jkg/api/kernels
- Run the script file:
python client.py
Here are the code snippets that show how the kernel name, kernel payload and code variables can be modified in the client.py file for Python and R kernels:
- Python:
kernel_name="python38" kernel_payload={"name":"python38"} print(kernel_payload) code = '\n'.join(( "print(\"Spark Version: {}\".format(sc.version))", "print(\"Application Name: {}\".format(sc._jsc.sc().appName()))", "print(\"Application ID: {} \".format(sc._jsc.sc().applicationId()))", "sc.parallelize([1,2,3,4,5]).count()" )) - R:
kernel_name="r" kernel_payload={"name":"r"} code = """ cat("Spark Version: ", sparkR.version()) conf = sparkR.callJMethod(spark, "conf") cat("Application Name: ", sparkR.callJMethod(conf, "get", "spark.app.name")) cat("Application ID:", sparkR.callJMethod(conf, "get", "spark.app.id")) df <- as.DataFrame(list(1,2,3,4,5)) cat(count(df)) """
Parent topic: Getting started with Spark applications