IBM Streams 4.3.0

Operator FileSource

Primitive operator image not displayed. Problem loading file: ../../image/tk$spl/op$spl.adapter$FileSource.svg

The FileSource operator reads data from a file and produces tuples as a result.

The FileSource operator accepts an optional output clause. Any SPL expression and supported custom output functions can be used in the output clause. Output attributes might be assigned values on the output clause. If they have an assignment, the expression value is assigned to the attribute, and the attribute is not part of the value that is read from the source. For the line and block formats, there must be only one attribute that is not assigned by an output assignment, and this attribute must have type rstring (for line format) or blob (for block format). For all other formats, any attributes that are not assigned in the output clause are read from the input by using csv, txt, or bin format.

Checkpointed data

When the FileSource operator is checkpointed in a consistent region, the position in the file that it is processing and logic state variables (if present) are saved in the checkpoint. When the FileSource operator is checkpointed in an autonomous region, logic state variables (if present) are saved in the checkpoint.

Behavior in a consistent region

The FileSource operator can be the start operator or an operator within the reachability graph of a consistent region.

  • When the operator is the start of a consistent region, its behavior depends on whether the region is periodic or operator-driven, and whether it has an input port.
    • If it has an input port, drain processing is allowed only at file boundaries.
    • If the consistent region trigger is periodic, the drain processing completes any file that it is processing. If it does not have an input port, drain processing is triggered based on the configured period. When checkpointing, the operator saves the position in the file that it is processing. On a reset processing, the operator seeks its stream position back to its saved position. In a reset to its initial state, the operator seeks back to the beginning of the file.
    • If the consistent region trigger is operatorDriven, the operator initiates drain processing after each successfully processed file. During reset processing, the operator stops processing any file that it might be processing. The operator does not automatically reprocess any files in this configuration. If it does not have an input port, it does drain processing at the end of the file. During reset processing, it rereads its file from the start.
    • When the operator is within the reachability graph, it has the following behavior.
      • During drain processing, it completes processing of the file it is processing. No reset can happen until the file is completely processed
      • During reset processing, it depends on its input port to replay the names of the files to reprocess.
Tip:
  • If the FileSource operator has an input port, include the input port as part of the consistent region. Otherwise, the FileSource operator does not reprocess any files after a reset.
  • If the FileSource operator does not have an input port and a full file reprocessing upon a failure is not wanted, configure the region with a periodic trigger. Otherwise, an operator-driven trigger might be preferred.
  • If the FileSource operator is not the start operator of an operator-driven consistent region, fuse it with the start operator of the region (and do not use a threaded port) to avoid long drain times when processing large files.

Checkpointing behavior in an autonomous region

When the FileSource operator is in an autonomous region and configured with config checkpoint : periodic(T) clause, a background thread in SPL Runtime checkpoints the operator every T seconds, and such periodic checkpointing activity is asynchronous to tuple processing. Upon restart, the operator restores its internal state to its initial state, and restores logic state variables (if present) from the last checkpoint.

When the FileSource operator is in an autonomous region and configured with config checkpoint : operatorDriven clause, no checkpoint is taken at runtime. Upon restart, the operator restores to its initial state.

Such checkpointing behavior is subject to change in the future.

Exceptions

The FileSource operator throws an exception in the following cases:
  • The file input file cannot be opened for reading.
  • The moveFileToDirectory directory does not exist.
  • The moveFileToDirectory is not a directory.

Examples

These examples use the FileSource operator.

The following example demonstrates the general usage of the FileSource operator:

composite Main {                                                                          
  graph                                                                                   
    // source operator with a relative file argument                                      
    stream<rstring name, uint32 age, uint64 salary> Beat = FileSource()                   
    {                                                                                     
      param                                                                               
       file : "People.dat"; // looks for <data-directory>/People.dat                
    }                                                                                     
    // source operator with a default tuple for missing arguments                         
    stream<rstring name, uint32 age, uint64 salary> Beat1 = FileSource()                  
   {                                                                                     
      param                                                                               
        file : "People.dat";                                                              
        defaultTuple : {name="foo", age=19u, salary=10000ul};                             
    }                                                                                     
    // source operator with an absolute file argument and hot file option                 
    stream<Beat> Beat2 = FileSource()                                                     
    {                                                                                     
      param                                                                               
        file    : "/tmp/People.dat";                                                      
        hotFile : true;                                                                   
    }                                                                                     
    // source operator with a csv format specifier,                                       
    // hasDelayField option, and custom seperator                                         
    stream<Beat> Beat3 = FileSource()                                                     
    {                                                                                     
      param                                                                               
        file   : "People.dat";                                                            
        format : csv;                                                                     
        separator : "|";                                                                  
        hasDelayField : true;                                                             
    }                                                                                     
    // source operator with a txt format specifier and compression                        
    stream<Beat> Beat4 = FileSource()                                                     
    {                                                                                     
      param                                                                               
        file        : "People.dat";                                                       
        format      : txt;                                                                
        compression : zlib;                                                               
    }                                                                                     
    // source operator with a csv format specifier and with strict parsing, waiting       
    // 5 seconds before starting to process the file                                      
    stream<Beat> Beat5 = FileSource()                                                     
    {                                                                                     
      param                                                                               
        file    : "People.dat";                                                           
        format  : csv;                                                                    
        parsing : strict;                                                                 
        initDelay : 5.0;                                                                  
    }                                                                                     
    // source operator with a bin format specifier                                        
    stream<Beat> Beat6 = FileSource()                                                     
    {                                                                                     
      param                                                                               
        file   : "People.dat";                                                            
        format : bin;                                                                     
    }                                                                                     
    // source operator with a line format specifier                                       
    stream<rstring line> Beat7 = FileSource()                                             
    {                                                                                     
      param                                                                               
        file   : "People.dat";                                                            
        format : line;                                                                    
    }                                                                                     
    // source operator with a line format specifier, and an eolMarker option              
    stream<rstring line> Beat8 = FileSource()                                             
    {                                                                                     
      param                                                                               
        file      : "People.dat";                                                         
        format    : line;                                                                 
        eolMarker : "\\r";                                                                 
    }                                                                                     
    // source operator with a block format specifier                                      
    stream<blob block> Beat9 = FileSource()                                               
    {                                                                                     
      param                                                                               
        file      : "People.dat";                                                         
        format    : block;                                                                
        blockSize : 1024u;                                                                
    }                                                                                     
                                                                                          
    stream<rstring filename> Files = DirectoryScan() {                                    
        param directory: "foo";                                                           
    }                                                                                     
    // source operator reading tuples of 2 int32s from files in directory 'foo'           
    // Delete the files after processing is done                                          
    stream<int32 i, int32 j> Beat10 = FileSource(Files)                                   
    {                                                                                     
      param deleteFile : true;                                                            
    }                                                                                     
}                                                                                         

The following example uses the second output stream and shows how to get the string form of the reason for failure:

composite Main() {                                                              
  graph                                                                         
    stream <rstring f> A = Beacon () {                                          
      logic state : mutable int32 i = 0;                                        
      param iterations : 4;                                                     
      output A : f = "file." + (rstring)i++;                                    
    }                                                                           
                                                                              
    (stream<int32 a> B; stream<rstring f, int32 e> C) = FileSource (A) {         
      param ignoreOpenErrors: true;                                    
    }                                                                  
                                                                       
    stream<rstring f, int32 e, rstring reason> D = Functor (C) { 
      output D : reason = strerror (e);                                
    }                                                                  
                                                                       
    () as Nil = FileSink (D) {                                         
      param file : "out";                                              
    }                                                                  
}                                                                      

Summary

Ports
This operator has 1 input port and 2 output ports.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 18 parameters.

Optional: blockSize, compression, defaultTuple, deleteFile, encoding, eolMarker, file, format, hasDelayField, hasHeaderLine, hotFile, ignoreExtraCSVValues, ignoreOpenErrors, initDelay, moveFileToDirectory, parsing, readPunctuations, separator

Metrics
This operator reports 2 metrics.

Properties

Implementation
C++
Threading
Always - Operator always provides a single threaded execution context.

Input Ports

Ports (0)

The FileSource operator has one optional input port, which ingests tuples that contain the names of the files to be opened. If this port is present, the input port schema must be a tuple with a single rstring attribute. Each tuple holds the file name to be read by the FileSource operator. While processing the tuple, the entire file is read, and tuples are generated by the FileSource operator.

Properties

Output Ports

Output Functions
OutputFunctions
rstring FileName()

Returns the name of the file that the tuple is read from.

int64 TupleNumber()

Returns the number of the current tuple that is generated by the operator for the current file. The first tuple that is generated has number 0.

<any T> T AsIs(T)

Returns the input value

Ports (0)

The FileSource operator is configurable with two output ports. The first output port produces tuples read from the file. The FileSource operator outputs a window marker punctuation after the contents of the file have been output in full (and the file has been moved or deleted, if that option has been selected), unless readPunctuations is true, in which case the window marker is output as directed by the input data.

Assignments
This port set allows any SPL expression of the correct type to be assigned to output attributes.

Properties

Ports (1)

This second output port is optional. It must contain a tuple with two attributes: one with an attribute of type rstring and one with an attribute of type int32. When the end of the file being read is reached, this stream generates tuples with the file name and 0 as the attribute values. If a file fails to open, the stream generates tuples with the file name and the system error code. This behavior allows a downstream operator to know which files were processed and which files cannot be opened successfully.

Assignments
This port set does not allow assignments to output attributes.

Properties

Parameters

This operator supports 18 parameters.

Optional: blockSize, compression, defaultTuple, deleteFile, encoding, eolMarker, file, format, hasDelayField, hasHeaderLine, hotFile, ignoreExtraCSVValues, ignoreOpenErrors, initDelay, moveFileToDirectory, parsing, readPunctuations, separator

blockSize

Specifies the block size. This parameter can be specified when the block format is specified; it cannot appear otherwise. If the blockSize parameter is not specified when using the block format, the entire file is read into a single tuple, and hotFile cannot be specified. In this case, the tuple size could become very large.

Properties

compression

Specifies that the source file is compressed. There are three valid values, representing available compression algorithms. These values are: zlib, gzip, and bzip2.

Properties

defaultTuple

Specifies the attribute values to use when there are missing values in the source data. This parameter is only valid for the csv format. It can take a single value of tuple type. This type must match the type of the output port tuples.

Properties

deleteFile

Specifies whether to remove the file after the operator finishes processing it. If you specify the hotFile or moveFileToDirectory parameters, you cannot specify the deleteFile parameter.

This parameter is not supported in a consistent region.

Properties

encoding

Specifies the character set encoding that is used in the input file. The contents of the file are converted to the UTF-8 character set from the specified character set after any decompression and before extraction of the tuples occurs. An example of a valid character set encoding is ISO_8859-9. You can retrieve a list of available encodings by using the iconv --list command.

This parameter is not valid with bin or block formats.

Properties

eolMarker

Specifies the end of line marker. It can be used only when the line format is specified. This parameter value defaults to "\n". Valid values include strings with one or two characters, such as "\r" and "\r\n".

Properties

file

Specifies the name of the source file. If the FileSource operator has an input port, this parameter must not be present. Otherwise, it must be present.

The file parameter can refer to a named pipe unless the hotFile parameter is set to true. The hotFile behavior is implemented by using seek, and seek is not valid on a named pipe.

Properties

format

Specifies the format of the source file. Valid values are txt, csv, bin, line, and block. The default value is csv.

This parameter can take only a single value. The detailed descriptions of individual format options are as follows:
  • txt: This format expects the file to be structured as a series of lines, where each line is a tuple literal, free of any type suffixes. String literals must be in double quotation marks. The # character can be used to mark comment lines. For example:
      # tuple<rstring name, uint32 age>
      {name="John", age=40}
      {name="Mary", age=35}

    The txt format reads a well-formed XML document that is contained in a string (or XML) literal in double quotation marks into an SPL attribute of type xml. Newline characters can appear within the string, causing many physical lines to appear as one logical line. Errors in parsing XML are logged. The corresponding attribute in the output stream must be of type xml.

    The FileSink, TCPSink, and UDPSink operator output formats support the writing of XML data by using the txt format. Attributes of type xml are written out in the form of a tuple literal. For example, {a = 5, x = "<x>...</x>"x, c = "hi"}.

  • csv: This format expects the file to be structured as a series of lines, where each line is a list of comma-separated values. String literals that are used at the outermost level can appear without the double quotation marks, unless they have a ',' character or escaped characters, in which case double quotation marks are required. Optional type string values cannot appear without double quotation marks. The unquoted text null can be used to represent an optional type string with no value. To read the string "null" into an optional type string, use double quotation marks. Both rstring and ustring values should appear as UTF-8 encoded strings. For fields that are missing in the csv formatted line (as in , ,), default constructed values are used, unless the defaultTuple parameter is specified. The separator parameter can be used to change the default separator of ','. '.' is used as the decimal point for binary and decimal floating point data. The # character can be used to mark comment lines. An example is as follows:
      # tuple<rstring name, uint32 age, list<tuple<rstring city, rstring state> > locations>
      John, 40, [{city="New York City",state="NY"},{city="Atlanta",state="GA"}]
      "Mary, and co.", 35, [{city="Toronto",state="ON"},{city="White Plains",state="NY"}]

    The csv format reads a well-formed XML document, contained in a string (or XML) literal in double quotation marks, into an SPL attribute of type xml. A string that spans several physical lines must be explicitly quoted, therefore the exemption for quotations at the outermost level does not apply. Errors in parsing the XML are logged. The corresponding attribute in the output stream must be of type xml.

    The FileSink, TCPSink, and UDPSink operator output formats support the writing of XML data by using the csv format. Attributes of type xml are written out in the form of an XML literal. Attributes are written out as a string literal in the form of <x>...</x> when the value appears in the wanted format (for example, 5,"<x>...</x>"x,"hi"). XML values are preserved from Sink to Source.

  • bin: This format expects the file to be structured as a series of tuples in binary, using network byte order. Tuple attributes are assumed to be serialized in sequence to form a tuple. If readPunctuations is true, tuples are preceded by a byte with value 0x00. A window punctuation is represented by a byte with value 0x01, and a final punctuation by a byte with value 0x02.

    The bin format reads an XML document into an SPL attribute of type xml. The binary data is in the serialized form of an attribute of type xml. No validation is done on binary data for any type.

    The FileSink, TCPSink, and UDPSink operator output formats support the writing of XML data by using the bin format.

  • line: This format expects the file to be structured as a series of lines. It also expects that there is only a single attribute of type rstring that is not assigned in the output clause. Each line is converted into a tuple, where the line text (excluding the end of line marker) becomes the rstring attribute in the output tuple. You can customize the end of line marker by using the eolMarker parameter.

    The line format reads a traditional XML document that spans multiple lines. This format must be used with the XMLParse operator, which supports the conversion of incoming XML directly into one or more streams of tuples. For more information, see XMLParse. The line format is useful for reading XML over a TCP connection, where the size of the XML is not known in advance. The line format does not allow direct access to SPL attributes of the type xml, as it has a fixed format.

    The FileSink, TCPSink, and UDPSink operator output formats do not support the writing of XML data by using the line format directly; the XML data must first be converted to an rstring.

  • block: This format expects the file to be structured as a series of binary blocks. It also expects that there is only a single attribute of type blob that is not assigned in the output clause. Each block is converted into a tuple. You can customize the block size by using the blockSize parameter. The last block that is read from the file can be less than blockSize bytes. If the blockSize parameter is not specified, the entire file is read into a single tuple, and hotFile cannot be specified. In this case, the tuple size could become very large.

    The block format reads XML one block at a time. This format must be used with the XMLParse operator, which supports the conversion of incoming XML directly into one or more streams of tuples. For more information, see XMLParse. The block format is useful for reading very large XML documents, where efficiency of reading the file is important. The block format does not allow direct access to SPL attributes of the type xml, as it has a fixed format.

    The FileSink, TCPSink, and UDPSink operator output formats do not support the writing of XML data by using the block format directly; the XML data must first be converted to a blob.

Properties

hasDelayField

Specifies that the format contains inter-arrival delays as the first field. By default, this parameter value is false. This parameter can be used only with txt, csv, and bin formats.

When the parameter value is true, the FileSource operator expects an extra attribute. The attribute specifies a delay that is used to pace the generation of the output tuples. The type of the delay attribute must be float64 and it is assumed to be in seconds. The delay attribute must appear before the tuple. In the case of txt and csv formats, the delay attribute is separated from the tuple by using a single comma with optional spaces before and after it. For example, for txt format:
# tuple<rstring name, uint32 age>
1.50, {name="John", age=40}
1.75, {name="Mary", age=35}
For example, for csv format:
# tuple<rstring name, uint32 age>
1.50, John, 40
1.75, Mary, 35
Properties

hasHeaderLine

Ignore first line/lines of file in CSV format This attribute-free parameter has type boolean or uint32. It is only valid if the format is csv. The default value is false.

If the parameter value is true, then the first line in the file is read and ignored. If the parameter value is false, no lines are skipped.

If a uint32 expression is passed, that number of lines are skipped. This behavior allows column names to be present in the first several lines of the file.

Properties

hotFile

Specifies whether the input file is hot, which means it is appended continuously. Unlike regular files, hotfiles are not closed when the end of the file is reached for the first time. Instead, the file is continuously checked for more data. If the file size shrinks during these checks, the file offset is reset to the beginning of the file.

The default value for this parameter is false. When the parameter value is true, a final marker is not sent upon reaching the end of the file; hot files ignore that event. Instead, a final marker is sent upon shutdown after a window marker punctuation is sent. Additionally, if the file offset is ever reset, a window marker punctuation is sent.

The hotFile parameter cannot be specified when the FileSource operator has an input port, or you specify the deleteFile or moveFileToDirectory parameters, or if the block format is specified without the blockSize parameter.

The hotFile parameter is not supported in a consistent region.

Properties

ignoreExtraCSVValues

Specifies to skip extra fields before the end of line when reading in CSV format. This parameter is only relevant with format : csv. If the parameter value is true, extra data on the current input line after the last attribute read is skipped. If the parameter is not present or has a parameter value of false, extra data on a line in csv format causes an error to be logged (when parsing : permissive) or an exception raised (when parsing : strict).

Properties

ignoreOpenErrors

Specifies whether the FileSource operator continues executing if the input file cannot be opened. If the parameter value is true and an input file cannot be opened, the FileSource operator logs an error and proceeds with the next input file. If the parameter is not present or its parameter value is false, the FileSource operator logs an error and terminates. By default, the parameter value is false.

Properties

initDelay

Specifies the number of seconds that the operator delays before it starts to produce tuples. If the operator has an input stream, the delay happens on receipt of the first tuple. During the delay, the operator is blocked and any more input tuples are blocked as well.

Properties

moveFileToDirectory

Specifies whether to move the file to the directory after its processing is finished.

Any files of the same name in the directory are removed before the move is done.

If the hotFile or deleteFile parameters are specified, the moveFileToDirectory cannot be specified.

If the target directory is on a different file system, a .rename subdirectory can be created in the target directory. This behavior is used to ensure that the files appear atomically at the target directory.

This parameter is not supported in a consistent region.

Properties

parsing

Customizes the parsing behavior of the FileSource operator. There are three valid values: strict, permissive, and fast.

When the parameter value is strict, incorrectly formatted tuples result in a runtime error and the operator terminates. When the parameter value is permissive, incorrectly formatted tuples result in the creation of a runtime log entry and the parser makes an effort to skip to the next tuple (for txt and csv formats) and continue. If the format is bin, the parser closes the current file. If the operator has an input stream, it then starts reading the next file.

The permissive parameter value can be used only with txt, csv, and bin formats.

When the parameter value is fast, the input file is assumed to be formatted correctly and no runtime checks are performed. Incorrect input in fast mode causes undefined behavior.

The default parsing mode is strict.

Properties

readPunctuations

Specifies whether to read punctuations from the input. This parameter is valid only if the format is bin.

If the parameter value is true, a byte represents a tuple or window punctuation or final punctuation. Window and final punctuation are generated only as directed by the input data.

If the parameter value is false, only tuple data is present and punctuation is not read. The default parameter value is false.

Properties

separator

Specifies the separator character for the csv format. This parameter value can be a rstring, uint8, or int8 data type. It must be a single-byte constant. For example, "|".

For non-printable characters in the range of 0x00 through 0xff, the parameter can be specified by using an int8 value. For example, separator: 0xc7;.

For values in the range of 0x00 through 0x7f, the hexadecimal sequence \uhhhh can also be used within a string. For example, separator: "\u001e";.

The tab and space characters are not supported as a separator.

This parameter can be specified only when the format is csv.

Properties

Code Templates

FileSource
stream<${streamType}> ${streamName} = FileSource() {
            param
                file : "${filename}";
        }
      

Metrics

nFilesOpened - Counter

The number of files opened by the FileSource operator.

nInvalidTuples - Counter

The number of tuples that failed to read correctly in csv or txt format.

Libraries

spl-std-tk-lib
Library Name: streams-stdtk-runtime, streams_boost_iostreams, streams_boost_filesystem, streams_boost_system
Include Path: ../../../impl/include