Teragrep Archive Datasource

Limitations

Archive access is currently enabled on all languages that belong to the Spark group. Currently only the Spark MicroBatch interface is implemented.

Trigger will keep processing indefinitely, but it is currently the only trigger that will process all the data. Trigger.once will process only the amount of objects specified as partitions, so it will likely process less objects than you want.

These limitations do not apply to DPL. Other language workarounds can be produced.

Archive Datasource is actively being developed to overcome these obstacles. If you require some of the missing features, please submit an issue at Teragrep’s Github.

Archive Query

Archive Query is made in the XML format, which has the capability of representing complex query structures. You can make a query in the Archive Datasource with the query option.

This DPL query:

index=f17

…​is translated to the following:

"""<index value="f17" operation="EQUALS"/>"""

This DPL query:

index=f17 earliest="01/01/1970:00:00:00" latest="04/27/2021:08:55:54"

…​is translated to the following:

"""<AND><AND><index value="f17" operation="EQUALS"/><earliest value="0" operation="GE"/></AND><latest value="1619513754" operation="LE"/></AND>"""

Languages for Archive Access

DPL

DPL automatically connects to the Teragrep Archive without any extra configuration.

Scala

Scala can be used to access the Teragrep Archive by using the following code:

%spark

import org.apache.spark.SparkConf // for accessing properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.Trigger
import java.sql.Timestamp

val df = spark
  .readStream
  .format("com.teragrep.pth06.ArchiveSourceProvider")
  .option("username", sc.getConf.get("dpl.archive.db.username"))
  .option("password", sc.getConf.get("dpl.archive.db.password"))
  .option("url", sc.getConf.get("dpl.archive.db.url"))
  .option("streamdbname", sc.getConf.get("dpl.archive.db.streamdb.name"))
  .option("journaldbname", sc.getConf.get("dpl.archive.db.journaldb.name"))
  .option("num_partitions", "16")
  .option("query", """<index value="f17" operation="EQUALS"/>""")
  .load()

val df2 = df.agg(count($"_time").as("count"))

val query = df2
  .writeStream
  .outputMode("complete")
  .format("memory")
  .trigger(Trigger.ProcessingTime(0))
  .queryName("ArchiveAccessExample")
  .start()


while (!query.awaitTermination(1000)) {
  val dfOut = sqlContext.sql("SELECT * FROM ArchiveAccessExample")
  z.getInterpreterContext.out.clear(true);
  z.show(dfOut)
}

PySpark

PySpark can be used to access the Teragrep Archive by using the following code:

%pyspark

from pyspark.sql.functions import *
from pyspark.sql.types import *

spark_session = spark.builder.getOrCreate()

spark_context = spark_session.sparkContext

Schema = StructType([StructField("_time", TimestampType(), True),
    StructField("_raw", StringType(), True),
    StructField("directory", StringType(), True),
    StructField("stream", StringType(), True),
    StructField("host", StringType(), True),
    StructField("input", StringType(), True),
    StructField("partition", StringType(), False),
    StructField("offset", LongType(), True)])


df = spark_session.readStream.schema(Schema).format("com.teragrep.pth06.ArchiveSourceProvider") \
    .option("username", spark_context.getConf().get("dpl.archive.db.username")) \
    .option("password", spark_context.getConf().get("dpl.archive.db.password")) \
    .option("url", spark_context.getConf().get("dpl.archive.db.url")) \
    .option("streamdbname", spark_context.getConf().get("dpl.archive.db.streamdb.name")) \
    .option("journaldbname", spark_context.getConf().get("dpl.archive.db.journaldb.name")) \
    .option("num_partitions", "16") \
    .option("query", """<index value="f17" operation="EQUALS"/>""") \
    .load()

df2 = df.agg(count(col("_time")))

query = df2.writeStream.trigger(processingTime="0 seconds").outputMode("complete").format("memory").queryName("PySparkExample").start()

# update display every second
while not query.awaitTermination(1):
    # use zeppelin context "z" to render the output
    z.getInterpreterContext().out().clear(True)
    z.show(spark_session.sql("SELECT * FROM PySparkExample"))