Scala
Scala is the default language for Apache Spark. Because of this, Scala is selected by entering %spark
on the paragraph’s code editor.
%spark
println(sc.version)
println(util.Properties.versionString)
println(java.net.InetAddress.getLocalHost().getHostName())
println(System.getProperty("user.name"))
println(java.time.LocalDateTime.now.format(java.time.format.DateTimeFormatter.ofPattern("dd.MM.yyyy hh:mm:ss")))

Archive Access
Scala can be used to access the Teragrep Archive by using the following code:
%spark
import org.apache.spark.SparkConf // for accessing properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.Trigger
import java.sql.Timestamp
import org.apache.spark.sql.streaming.StreamingQuery
private def isArchiveDone(streamingQuery: StreamingQuery) = {
var archiveDone = true
for (i <- streamingQuery.lastProgress.sources.indices) {
val startOffset = streamingQuery.lastProgress.sources(i).startOffset
val endOffset = streamingQuery.lastProgress.sources(i).endOffset
val description = streamingQuery.lastProgress.sources(i).description
if (description != null && description.startsWith("com.teragrep.pth06.ArchiveMicroBatchReader@")) { // ignore others than archive
if (startOffset != null) {
if (startOffset != endOffset) {
archiveDone = false
}
}
else {
archiveDone = false
}
}
}
archiveDone
}
val s3identity: String = System.getProperty("user.name")
var s3credential: String = _
try {
val path: String = "hdfs:///user/" + s3identity + "/s3credential";
val df = spark.read.textFile(path);
s3credential = df.first()
}
catch {
case e: Exception => {
println("Unable to get s3credential from HDFS: " + e)
System.exit(1)
}
}
val df = spark
.readStream
.format("com.teragrep.pth06.ArchiveSourceProvider")
.option("S3endPoint", sc.getConf.get("fs.s3a.endpoint"))
.option("S3identity", s3identity)
.option("S3credential", s3credential)
.option("DBusername", sc.getConf.get("dpl.archive.db.username"))
.option("DBpassword", sc.getConf.get("dpl.archive.db.password"))
.option("DBurl", sc.getConf.get("dpl.archive.db.url"))
.option("DBstreamdbname", sc.getConf.get("dpl.archive.db.streamdb.name"))
.option("DBjournaldbname", sc.getConf.get("dpl.archive.db.journaldb.name"))
.option("num_partitions", "1")
.option("queryXML", """<index value="f17" operation="EQUALS"/>""")
.load()
val df2 = df.agg(count($"_time").as("count"))
val query = df2
.writeStream
.outputMode("complete")
.format("memory")
.trigger(Trigger.ProcessingTime(0))
.queryName("ArchiveAccessExample")
.start()
while (!query.awaitTermination(1000)) {
val dfOut = sqlContext.sql("SELECT * FROM ArchiveAccessExample")
z.getInterpreterContext.out.clear(true);
z.show(dfOut)
if(query.lastProgress != null && isArchiveDone(query))
query.stop()
}
s3credential="" // clear so it's not present on the output
PySpark
%pyspark
import sys
import datetime
import socket
import getpass
import numpy
print(sys.version)
print(numpy)
print(socket.gethostname())
print(getpass.getuser())
print(datetime.datetime.now())

Archive Access
PySpark can be used to access the Teragrep Archive by using the following code:
%pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *
import getpass
def isArchiveDone(streaming_query):
archive_done = True
print(streaming_query.lastProgress)
for datasource in streaming_query.lastProgress["sources"]:
start_offset = datasource["startOffset"]
end_offset = datasource["endOffset"]
description = datasource["description"]
if description is not None and not description.startswith("com.teragrep.pth06.ArchiveMicroBatchReader@"):
# ignore others than archive
pass
if start_offset is not None:
if not start_offset == end_offset:
archive_done = False
else:
archive_done = False
return archive_done
spark_session = spark.builder.getOrCreate()
spark_context = spark_session.sparkContext
Schema = StructType([StructField("_time", TimestampType(), True),
StructField("_raw", StringType(), True),
StructField("directory", StringType(), True),
StructField("stream", StringType(), True),
StructField("host", StringType(), True),
StructField("input", StringType(), True),
StructField("partition", StringType(), False),
StructField("offset", LongType(), True)])
s3identity = getpass.getuser()
s3credential = None
try:
path = "hdfs:///user/" + s3identity + "/s3credential"
df = spark_context.textFile(path)
s3credential = df.first()
except:
print("Unable to get s3credential from HDFS")
exit(1)
df = spark_session.readStream.schema(Schema).format("com.teragrep.pth06.ArchiveSourceProvider") \
.option("S3endPoint", spark_context.getConf().get("fs.s3a.endpoint")) \
.option("S3identity", s3identity) \
.option("S3credential", s3credential) \
.option("DBusername", spark_context.getConf().get("dpl.archive.db.username")) \
.option("DBpassword", spark_context.getConf().get("dpl.archive.db.password")) \
.option("DBurl", spark_context.getConf().get("dpl.archive.db.url")) \
.option("DBstreamdbname", spark_context.getConf().get("dpl.archive.db.streamdb.name")) \
.option("DBjournaldbname", spark_context.getConf().get("dpl.archive.db.journaldb.name")) \
.option("num_partitions", "1") \
.option("queryXML", """<index value="f17" operation="EQUALS"/>""") \
.load()
df2 = df.agg(count(col("_time")))
query = df2.writeStream.trigger(processingTime="0 seconds").outputMode("complete").format("memory").queryName(
"PySparkExample").start()
# update display every second
while not query.awaitTermination(1):
# use zeppelin context "z" to render the output
z.getInterpreterContext().out().clear(True)
z.show(spark_session.sql("SELECT * FROM PySparkExample"))
if query.lastProgress is not None and isArchiveDone(query) == True:
query.stop()
Spark SQL
Due to the limitations in Spark, the data is wrapped into an array when selecting from a dataset that is provided with a Recall function. SQL code for unwrapping can be provided on request if necessary.
%sql
SELECT * FROM `paragraph_1622045160279_903527815`;

Spark SQL execution gives you detailed information about failures.

Spark Configuration
You can alter the Spark configuration by using the interpreter %spark.conf
.
Resource Allocation
The administrator of the Spark Cluster may limit the available resources per user. A request that exceeds this limit will crash the interpreter.
Dynamic Allocation profile:
%spark.conf
spark.dynamicAllocation.enabled=true
Static Allocation profile:
%spark.conf
spark.executor.instances=8
spark.executor.cores=1
spark.executor.memory=1g