EXECSPARK table function

By using the SYSHADOOP.EXECSPARK table function, you can invoke Apache Spark jobs from Db2 Big SQL SELECT statements and join the results with other tables.

Syntax

Read syntax diagramSkip visual syntax diagram EXECSPARK ( class ,arg )

Description

class
Specifies the name of the main class for the Spark job. This argument must be named class explicitly by using the arrow notation (for example, class => 'com.ibm.biginsights.bigsql.examples.DataSource').
arg
Specifies one or more optional arguments.

EXECSPARK returns a result set whose schema is determined at invocation time, based on the argument values.

Usage

The Java™ or Scala class that is specified in the class argument must implement the interface com.ibm.biginsights.bigsql.spark.SparkPtf, which contains the following methods:
StructType describe(SQLContext ctx, java.util.Map<String, Object> arguments);
DataSet<Row> execute(SQLContext ctx, java.util.Map<String, Object> arguments);
void destroy(SQLContext ctx, java.util.Map<String, Object> arguments);
long cardinality(SQLContext ctx, java.util.Map<String, Object> arguments);

Classes that implement this interface are referred to as polymorphic table functions (PTFs). All the methods in the SparkPtf interface have the same two parameters: an SQLContext object and a Java map that contains the arguments provided in the invocation. The SQLContext object that is passed to all the methods is Hive-enabled, and therefore can be used to query tables that are registered with the Hive metastore.

The arguments object is a map that contains all the arguments to SYSHADOOP.EXECSPARK except class. If an argument is explicitly named by using the arrow syntax, the map key will be the specified name in uppercase characters; otherwise, the key will be the ordinal position (1, 2, and so on).

The PTF class is instantiated at query compilation time. The SQL compiler invokes the describe and the cardinality methods to obtain the schema of the result and the estimated cardinality. The latter can help the Db2 Big SQL optimizer make better query planning decisions if the PTF result is joined with other tables. At query execution time, the execute method is invoked on the same instance to trigger computation of the result. The Spark data frame that is returned by the execute method is mapped to a Db2 Big SQL result set on the fly. After the query is processed, the destroy method is invoked to give the user code an opportunity to perform resource cleanup.

If any user other than the bigsql user is to run this function, add the following proxy settings to the HDFS core-site.xml file:

hadoop core-site (safety value):

hadoop.proxyuser.bigsql.groups=*
hadoop.proxyuser.bigsql.hosts=*

Examples

The following example shows an invocation of SYSHADOOP.EXECSPARK that uses arguments of different data types. The keys in the argument map are INTARG, DECIMALARG, STRINGARG, and 5.

SELECT *
  FROM TABLE(SYSHADOOP.EXECSPARK(
    class      => 'org.myorg.MyPtf',
    intarg     => CAST(111 AS INT),
    decimalarg => 22.2,
    stringarg  => 'hello',
    CAST(33.3 AS DOUBLE)
    )
  ) AS j
The last argument gets a key value of 5 in the arguments map because it is the fifth argument, and SQL indexing starts at 1. The values of the arguments to SYSHADOOP.EXECSPARK must be constants. The SQL type of each argument is inferred by the compiler according to standard SQL rules, and you can use the CAST expression to cast a literal to a different type than the default. In this example, 33.3 would be interpreted as a decimal value by default, but the CAST expression turns it into a double value.
The following example shows how to use SYSHADOOP.EXECSPARK to invoke a Spark job that reads a JSON file stored on the HDFS. The demo.json file contains a sequence of records in the following format:
{"Country":null, "Direction":"UP", "Language":"English"}
The following SELECT statement returns five records that contain a country value that is not the null value:

SELECT *
  FROM TABLE(SYSHADOOP.EXECSPARK(
    class    => 'DataSource',
    format   => 'json',
    load     => 'hdfs://host.port.com:8020/user/bigsql/demo.json'
    )
  ) AS doc
  WHERE doc.country IS NOT NULL
  LIMIT 5
The output might look like the following text:

COUNTRY              DIRECTION            LANGUAGE
-------------------- -------------------- --------------------
DE                   UP                   German
RU                   DOWN                 Russian
US                   UP                   English
AU                   -                    English
US                   UP                   English
   5 record(s) selected
In this example, each argument is explicitly named by using the arrow syntax to improve readability.

The class argument is mandatory because it specifies the executable code for the Spark job, in this case a Java class named DataSource. Other arguments are passed to the user code. In this example, when the Spark job instantiates class DataSource, the instance receives the specified format and load arguments, which are represented as entries in the arguments map.

For more information, see Apache Spark.