Scala

Scala is the default language for Apache Spark. Because of this, Scala is selected by entering %spark on the paragraph’s code editor.

%spark

println(sc.version)
println(util.Properties.versionString)
println(java.net.InetAddress.getLocalHost().getHostName())
println(System.getProperty("user.name"))
println(java.time.LocalDateTime.now.format(java.time.format.DateTimeFormatter.ofPattern("dd.MM.yyyy hh:mm:ss")))
53033626

Archive Access

Scala can be used to access the Teragrep Archive by using the following code:

%spark

import org.apache.spark.SparkConf // for accessing properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.Trigger
import java.sql.Timestamp
import org.apache.spark.sql.streaming.StreamingQuery

private def isArchiveDone(streamingQuery: StreamingQuery) = {
  var archiveDone = true
  for (i <- streamingQuery.lastProgress.sources.indices) {
    val startOffset = streamingQuery.lastProgress.sources(i).startOffset
    val endOffset = streamingQuery.lastProgress.sources(i).endOffset
    val description = streamingQuery.lastProgress.sources(i).description
    if (description != null && description.startsWith("com.teragrep.pth06.ArchiveMicroBatchReader@")) { // ignore others than archive
      if (startOffset != null) {
        if (startOffset != endOffset) {
          archiveDone = false
        }
      }
      else {
        archiveDone = false
      }
    }
  }
  archiveDone
}

val s3identity: String = System.getProperty("user.name")
var s3credential: String = _

try {
    val path: String = "hdfs:///user/" + s3identity + "/s3credential";
    val df = spark.read.textFile(path);
    s3credential = df.first()
}
catch {
    case e: Exception => {
        println("Unable to get s3credential from HDFS: " + e)
        System.exit(1)
    }
}

val df = spark
  .readStream
  .format("com.teragrep.pth06.ArchiveSourceProvider")
  .option("S3endPoint", sc.getConf.get("fs.s3a.endpoint"))
  .option("S3identity", s3identity)
  .option("S3credential", s3credential)
  .option("DBusername", sc.getConf.get("dpl.archive.db.username"))
  .option("DBpassword", sc.getConf.get("dpl.archive.db.password"))
  .option("DBurl", sc.getConf.get("dpl.archive.db.url"))
  .option("DBstreamdbname", sc.getConf.get("dpl.archive.db.streamdb.name"))
  .option("DBjournaldbname", sc.getConf.get("dpl.archive.db.journaldb.name"))
  .option("num_partitions", "1")
  .option("queryXML", """<index value="f17" operation="EQUALS"/>""")
  .load()

val df2 = df.agg(count($"_time").as("count"))

val query = df2
  .writeStream
  .outputMode("complete")
  .format("memory")
  .trigger(Trigger.ProcessingTime(0))
  .queryName("ArchiveAccessExample")
  .start()


while (!query.awaitTermination(1000)) {
  val dfOut = sqlContext.sql("SELECT * FROM ArchiveAccessExample")
  z.getInterpreterContext.out.clear(true);
  z.show(dfOut)
  if(query.lastProgress != null && isArchiveDone(query))
    query.stop()
}
s3credential="" // clear so it's not present on the output

PySpark

%pyspark
import sys
import datetime
import socket
import getpass
import numpy

print(sys.version)
print(numpy)
print(socket.gethostname())
print(getpass.getuser())
print(datetime.datetime.now())
53033628

Archive Access

PySpark can be used to access the Teragrep Archive by using the following code:

%pyspark

from pyspark.sql.functions import *
from pyspark.sql.types import *
import getpass

def isArchiveDone(streaming_query):
    archive_done = True
    print(streaming_query.lastProgress)
    for datasource in streaming_query.lastProgress["sources"]:
        start_offset = datasource["startOffset"]
        end_offset = datasource["endOffset"]
        description = datasource["description"]

        if description is not None and not description.startswith("com.teragrep.pth06.ArchiveMicroBatchReader@"):
            # ignore others than archive
            pass

        if start_offset is not None:
            if not start_offset == end_offset:
                archive_done = False
        else:
            archive_done = False

    return archive_done

spark_session = spark.builder.getOrCreate()

spark_context = spark_session.sparkContext

Schema = StructType([StructField("_time", TimestampType(), True),
                     StructField("_raw", StringType(), True),
                     StructField("directory", StringType(), True),
                     StructField("stream", StringType(), True),
                     StructField("host", StringType(), True),
                     StructField("input", StringType(), True),
                     StructField("partition", StringType(), False),
                     StructField("offset", LongType(), True)])

s3identity = getpass.getuser()
s3credential = None

try:
    path = "hdfs:///user/" + s3identity + "/s3credential"
    df = spark_context.textFile(path)
    s3credential = df.first()
except:
    print("Unable to get s3credential from HDFS")
    exit(1)

df = spark_session.readStream.schema(Schema).format("com.teragrep.pth06.ArchiveSourceProvider") \
    .option("S3endPoint", spark_context.getConf().get("fs.s3a.endpoint")) \
    .option("S3identity", s3identity) \
    .option("S3credential", s3credential) \
    .option("DBusername", spark_context.getConf().get("dpl.archive.db.username")) \
    .option("DBpassword", spark_context.getConf().get("dpl.archive.db.password")) \
    .option("DBurl", spark_context.getConf().get("dpl.archive.db.url")) \
    .option("DBstreamdbname", spark_context.getConf().get("dpl.archive.db.streamdb.name")) \
    .option("DBjournaldbname", spark_context.getConf().get("dpl.archive.db.journaldb.name")) \
    .option("num_partitions", "1") \
    .option("queryXML", """<index value="f17" operation="EQUALS"/>""") \
    .load()

df2 = df.agg(count(col("_time")))

query = df2.writeStream.trigger(processingTime="0 seconds").outputMode("complete").format("memory").queryName(
    "PySparkExample").start()

# update display every second
while not query.awaitTermination(1):
    # use zeppelin context "z" to render the output
    z.getInterpreterContext().out().clear(True)
    z.show(spark_session.sql("SELECT * FROM PySparkExample"))
    if query.lastProgress is not None and isArchiveDone(query) == True:
        query.stop()

Spark SQL

Due to the limitations in Spark, the data is wrapped into an array when selecting from a dataset that is provided with a Recall function. SQL code for unwrapping can be provided on request if necessary.

%sql

SELECT * FROM `paragraph_1622045160279_903527815`;
53033629

Spark SQL execution gives you detailed information about failures.

53033630

Kotlin

%kotlin

println("Hello this is Kotlin!")
53033631

Spark Configuration

You can alter the Spark configuration by using the interpreter %spark.conf.

Resource Allocation

The administrator of the Spark Cluster may limit the available resources per user. A request that exceeds this limit will crash the interpreter.

Dynamic Allocation profile:

%spark.conf
spark.dynamicAllocation.enabled=true

Static Allocation profile:

%spark.conf
spark.executor.instances=8
spark.executor.cores=1
spark.executor.memory=1g