Teragrep Functions

Recall

High Recall sizes (exceeding 5000) are known to slow down processing in the current version of Teragrep, as Class Instantiation takes quite a long time.

Recall in DPL

Recall is automatically used when a non-aggregating dataset is received at the end of the data processing flow.

Recall size can be adjusted with the following Spark configuration:

%spark.conf
dpl.recall-size=1000

Recall in Scala

%spark

import org.apache.spark.sql.streaming.Trigger
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.functions.{lit, schema_of_json, from_json}
import collection.JavaConverters._
import org.apache.spark.sql.{Row, SparkSession, Dataset}
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._
import scala.collection.mutable.ArrayBuffer
import spark.implicits._

import com.teragrep.pth_09.functions.scala.Recall

// use Scala Archive access to get the df

val df = spark
  .readStream
  .format("com.teragrep.pth06.ArchiveSourceProvider")
  .option("username", sc.getConf.get("dpl.archive.db.username"))
  .option("password", sc.getConf.get("dpl.archive.db.password"))
  .option("url", sc.getConf.get("dpl.archive.db.url"))
  .option("streamdbname", sc.getConf.get("dpl.archive.db.streamdb.name"))
  .option("journaldbname", sc.getConf.get("dpl.archive.db.journaldb.name"))
  .option("num_partitions", "16")
  .option("query", """<index value="f17" operation="EQUALS"/>""")
  .load()

// create Recall function with df specific parameters
// schema, number of recalled rows, sort column, descending sort boolean
val recallUdaf = new Recall(df.schema, 100, "_time", false)

// register udaf
spark.udf.register("Recall", recallUdaf)

// create new column with the recalled data
val df4 = df.selectExpr("Recall(_time, _raw, index, sourcetype, host, source, partition, offset)");

// select only the recalled column
val df5 = df4.toDF("recallColumn")

// execute
val query = df5
  .na.fill(0)
  .na.fill("")
  .writeStream
  .outputMode("complete")
  .format("memory")
  .queryName("RecallQuery")
  .trigger(Trigger.ProcessingTime(0))
  .start()

// create column to dataframe method
def makeResSeqDF(input : Row) : Dataset[Row] = {

    val firstRow = input.getAs[Seq[Row]](0)

    val df1 = spark.createDataFrame(sc.parallelize(firstRow), df.schema);
    df1
}

while (!query.awaitTermination(1000)) {
    val rows = sqlContext.sql("SELECT recallColumn FROM RecallQuery").count
    if (rows > 0) {
        // create new dataframe using the recall column
        val dfOut = makeResSeqDF(sqlContext.sql("SELECT recallColumn FROM RecallQuery").first)
        z.getInterpreterContext.out.clear(true);
        // use zeppelin context to render the output
        z.show(dfOut)
    }
}

Recall in PySpark

Recall function can be used from PySpark by first registering it with Scala.

The schema must be the final schema which Recall is called upon.
%spark
//Recall function

import com.teragrep.pth_09.functions.scala.Recall
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.RowFactory

val Schema: StructType = new StructType(Array[StructField](
    StructField("_time", DataTypes.TimestampType, true, new MetadataBuilder().build),
    StructField("_raw", DataTypes.StringType, true, new MetadataBuilder().build),
    StructField("directory", DataTypes.StringType, true, new MetadataBuilder().build),
    StructField("stream", DataTypes.StringType, true, new MetadataBuilder().build),
    StructField("host", DataTypes.StringType, true, new MetadataBuilder().build),
    StructField("input", DataTypes.StringType, true, new MetadataBuilder().build),
    StructField("partition", DataTypes.StringType, false, new MetadataBuilder().build),
    StructField("offset", DataTypes.LongType, true, new MetadataBuilder().build))

  )

val recall = new Recall(Schema, 100, "_time", false) // true desc, false asc
spark.udf.register("Recall", recall)

After registering the Recall function, it is accessible with PySpark.

%pyspark

from pyspark.sql.functions import *
from pyspark.sql.types import *

spark_session = spark.builder.getOrCreate()

spark_context = spark_session.sparkContext

Schema = StructType([StructField("_time", TimestampType(), True),
    StructField("_raw", StringType(), True),
    StructField("directory", StringType(), True),
    StructField("stream", StringType(), True),
    StructField("host", StringType(), True),
    StructField("input", StringType(), True),
    StructField("partition", StringType(), False),
    StructField("offset", LongType(), True)])


df = spark_session.readStream.schema(Schema).format("com.teragrep.pth06.ArchiveSourceProvider") \
    .option("username", spark_context.getConf().get("dpl.archive.db.username")) \
    .option("password", spark_context.getConf().get("dpl.archive.db.password")) \
    .option("url", spark_context.getConf().get("dpl.archive.db.url")) \
    .option("streamdbname", spark_context.getConf().get("dpl.archive.db.streamdb.name")) \
    .option("journaldbname", spark_context.getConf().get("dpl.archive.db.journaldb.name")) \
    .option("num_partitions", "16") \
    .option("query", """<index value="f17" operation="EQUALS"/>""") \
    .load()

#recall function
df2 = df.selectExpr("Recall(_time, _raw, directory, stream, host, input, partition, offset)")

query = df2.writeStream.trigger(processingTime="0 seconds").outputMode("complete").format("memory").queryName("PySparkExample")\
    .start()

#Extract data from the wrapped array
def makeResSeqDF (input):
    rowList = input[0]
    DF = spark.createDataFrame(sc.parallelize(rowList), Schema)
    return DF

# update display every second
while not query.awaitTermination(1):
    #use zeppelin context "z" to render the output
    z.getInterpreterContext().out().clear(True)
    df = spark_session.sql("SELECT * FROM PySparkExample")
    if df.count() > 0:
        row = df.first()
        df2 = makeResSeqDF(row)
        z.show(df2)