IBM Streams 4.2.1

Dynamic application composition

When a composite operator in the source code is intended as the main operator in a compiled application, its graph clause typically invokes operators for edge adaptation, that is, ingesting external data as well as producing data that can be consumed by external parties. In particular, it uses various source and sink operators for I/O to URLs, files, databases, and so on; and it uses the special operators Import and Export for I/O to other jobs on the same streaming middleware.

Streaming applications commonly run for long periods of time. Streams that are generated by one application often serve as input to another, and the second application might be deployed when the first is already running. The Import and Export special operators support dynamic application composition, where one application exports a stream, another application imports the stream, but both applications are instantiated separately and might even be mutually anonymous. For example, the streaming middleware might host one or more long-running backbone applications that carry out the bulk of the data processing, and users might launch transient applications that import streams, for example, to visualize results in a dashboard.

As a concrete example of property-based application composition, a hedge fund might be interested in locating bargains in transactions from the stock market, but it might be interested specifically in bargains in two particular sectors (healthcare and technology). Furthermore, the firm might be interested in transactions that are associated with a few specific companies (like IBM® or WellPoint). In these situations, you might deploy two bargain finding applications (one per sector) and have their results exported with tags that indicate the sector and the particular stock symbol they carry. A third application, responsible for the actual trading, would then import streams that match the specific sectors/stock symbols that the firm cares about. The following code exports anonymous streams by exporting their properties to any applications that might be interested:
type BargainType = rstring symbol, decimal32 price;
composite ExportingMain {
  graph
    stream<BargainType> TechSectorBargains = FileSource(){param file:"tech";}
    stream<BargainType> HealthCareSectorBargains = FileSource(){param file:"health";}
    () as ExportOp1 = Export(TechSectorBargains) {
      param properties : { kind="bargains", category="tech",
                          tickers=["IBM", "GOOG", "MSFT"] };
    }
    () as ExportOp2 = Export(HealthCareSectorBargains) {
      param properties : { kind="bargains", category="healthcare",
                          tickers=["AET", "UNH", "WLP"] };
    }
}
The properties parameter specifies properties of the stream as a tuple literal with name-value pairs. Here are examples of importing streams from a different application instance:
type BargainType = rstring symbol, decimal32 price;
composite ImportingMain {
  graph
    stream<BargainType> TechBargains = Import() {
      param subscription : kind == "bargains" && category == "tech";
    }
    stream<BargainType> IBMOrWLPBargains = Import() {
      param subscription : "IBM" in tickers || "WLP" in tickers;
    }
}

Using special operators for dynamic application composition provides a consistent syntax for specifying types and parameters. The middleware connects the Import and Export streams if the subscription predicate matches the exported stream properties. If the Import predicate matches multiple streams that are exported by jobs that are running in the middleware, they are all connected. If there are no matching streams, nothing arrives at the Import operator. Properties can also be added, updated, or removed at run time, and so can subscriptions. The compile-time properties and subscriptions serve as initial settings. Since the compiler sees only one application at a time, it cannot statically check whether the types in publication properties and subscription predicates in other applications match. The output from an Import operator cannot be fed into an input port that expects window punctuation. Also, a port of a down-stream operator that is connected to the output stream of an Import operator stays open forever, no matter how many final punctuation markers it receives.

Besides this property-based anonymous connection, SPL also supports connecting streams across application instances by name. Thus an Export operator invocation does not need a properties parameter:
namespace some.nameSpace;
composite Comp(input E) {
  graph () as ExportInvoke = Export(E) {
         param streamId : "StreamName";
        }
}
public composite ExportingMain {
  graph stream<int32 x> A = Beacon() { }
        () as CompInvoke = Comp(A) { }
}
    
The streamId parameter specifies the external name of the stream that is being exported. Only one of streamId or properties can be specified on an Export operator. If neither streamId nor properties are specified, the stream is exported by properties with an initially empty selection of properties; which is useful only if properties are later set at run time. The importing application instance again uses the Import operator, but this time with a different set of parameters:
composite ImportingMain {
  graph
    stream<int32 x> I2 = Import() {
      param applicationScope : "myApplicationScope";
             //application scope selected when exporting application launched
           applicationName  : "some.nameSpace::ExportingMain";
             //main operator selected when exporting application launched
           streamId : "StreamName";
             //string agreed upon by exporting and importing application
    }
}

If the explicit applicationScope is omitted, it is implicitly bound to the scope in which the current application was launched. An explicit applicationScope can be used equally with property-based and name-based subscription. The optional applicationName selects the main operator that is selected when the exporting application is launched. It is only valid when streamId is specified. You cannot run two applications that specify the same streamId; IBM Streams rejects the submission of the second application instance.

Tip: When streams are exported and imported across jobs, there might be a dual maintenance problem; you must keep both types in sync. Therefore, consider defining a public tuple type. This type can then be used from both jobs to ensure consistency. There might also be unintended circles of imported and exported streams, so look out to avoid this case.
Note: In the current implementation, the SPL compiler represents properties parameters internally as XML and represents subscription parameters internally as XPath. As a result, only a restricted set of SPL expressions are valid for these two parameters, and the compiler emits error messages if others are used. Also, while the language represents Import and Export as operators, the implementation erases them (treats them as ghost operators). This situation has the consequence that it is not valid to directly export an imported stream without sending it through another operator first. Dynamic optimization in the transport fabric ensures that the bandwidth for dynamic application composition connections is only used when a connection is in place. As a rule of thumb, it is more efficient to establish a name-based subscription than a property-based anonymous subscription, though when the connections are established, they perform equally well.