Scraping the Enron Corpus via Spark REPL

Intro

I wrote a number of Apache Hadoop applications in Java. I found writing these applications a frustrating exercise in verbosity.

One in particular involved some data mining of the Enron Corpus. This is my so-called Timeslicer, which can be found on GitHub.

On this page, I will attempt to do the same in spark. To accomplish this, I’ve downloaded and installed it on top of my Hadoop installation. As such, Spark can simply pull data from HDFS, where all my Enron data already lives.

Installation

I downloaded Apache Spark from here: spark.apache.org/downloads.html. and installed like so:

wget http://mirrors.ukfast.co.uk/sites/ftp.apache.org/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
#unzip and set symlink
sudo tar -xvzf spark-2.4.0-bin-hadoop2.7.tgz -C /opt
sudo ln -s spark-2.4.0-bin-hadoop2.7 /opt/spark

To launch the REPL, run the following in the terminal:

/opt/spark/bin/spark-shell 

You’ll see something like this:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 10.0.2)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

Note: it’s :quit to quit the REPL. ctrl-c works too. If you’re like me, you’ll find yourself inadvertently quitting the REPL when attempting to copy some lines.

VS Code

I find it convenient to write my Scala in VS Code and take advantage of the built-in terminal. In this environment, I can simply highlight some code and hit a keybind of my chosing to run it in terminal. I’ve bound alt-X to do this as it’s what I’m familiar with from using SQL Server.

Data Pull

Now we can pull data from HDFS.

val seq20 = sc.wholeTextFiles("hdfs://localhost:54310/user/hduser/in/enron20.seq").
    first.
    _2.
    split("\n\n").
    filter(line => line.contains("Date:"))

sc.wholetextfiles returns a Resilient Distributed Dataset (RDD), which is Scala’s primary abstraction. RDDs contain a key and a value, where the key is the file path.

Since we are only interested in the value, we specify _2 to indicate the second column index.

This file contains only one tuple (=line). As such, first is redundant but nonetheless we can split and filter such that each block of email header is on its own line.

Parsing

We are interested in scraping three bits of information:

  • sender
  • recipient
  • date

Let’s look at how to do this in a single email. Given that Seq types are indexed, we just write:

val email = seq20(1)

This returns the second email in this collection, which looks like:

M.A.
??????? ?folder-2???�Message-ID: <4152796.1075843929751.JavaMail.evans@thyme>
Date: Sun, 29 Apr 2001 11:54:00 -0700 (PDT)
From: george.mcclellan@enron.com
To: sven.becker@enron.com
Subject: RE: Summary on Bremen Deal
Cc: stuart.staley@enron.com, manfred.ungethum@enron.com,
 mike.mcconnell@enron.com, jeffrey.shankman@enron.com
Mime-Version: 1.0
Content-Type: text/plain; charset=ANSI_X3.4-1968
Content-Transfer-Encoding: quoted-printable
Bcc: stuart.staley@enron.com, manfred.ungethum@enron.com,
 mike.mcconnell@enron.com, jeffrey.shankman@enron.com
X-From: George Mcclellan
X-To: Sven Becker
X-cc: Stuart Staley, Manfred Ungethum, Mike McConnell, Jeffrey A Shankman
X-bcc:
X-Folder: \Mark_McConnell_June2001\Notes Folders\Coal
X-Origin: MCCONNELL-M
X-FileName: mmcconn.nsf

Singleton Fields

To scrape the sender and date, we look for either From: and Date: and scrape the remainer of the line:

From

val sender = email.replace("\n\t","").
    split("\n").
    filter(line => line.startsWith("From:"))(0).
    substring(6).
    trim()

Date

val date = email.
    replace("\n\t","").
    split("\n").
    filter(line => line.startsWith("Date:"))(0).
    substring(6).
    trim() 

Multiple Elements

A recipient is denoted in multiple ways:

  • To:
  • Cc:
  • Bcc:

We want to scrape all these prefixes. So I just throw these in a collection:

val recipPrefix = Seq("To:","Cc:","Bcc:")

So we want to iterate through each prefix and scrape the header. We can do this via an anonymous function known as a For Comprehension. I suppose this is analogous to a List Comprehension in Python, apply in R, and lambda in Java.

The below is an example of For Comprehension

for (prefix <- recipPrefix){
	println(prefix)
}

It iterates through recipPrefix and prints each element:

To:
Cc:
Bcc:

But if I want it to return something (instead of printing), I can write yield:

for (prefix <- recipPrefix) yield{
	prefix
}

This will return the same type as the input. In this case, Seq.

Now we can perform a similar regex as before:

for (prefix <- recipPrefix) yield{
	email.
	replace("\n\t","").
	split("\n").
	filter(line => line.startsWith(prefix))
}

This returns a Seq of three elements (one for each prefix).

res0: Seq[Array[String]] = List(Array(To: george.mcclellan@enron.com, daniel.reck@enron.com, stuart.staley@enron.com, michael.beyer@enron.com, kevin.mcgowan@enron.com, jeffrey.shankman@enron.com, mike.mcconnell@enron.com, paula.harris@enron.com), Array(), Array())

There are some issues with this output

  • Each element is of type Array[String]. This is due to our use of split().
  • Notice how some of these arrays are empty. This is because Cc: and Bcc: prefixes for this email did not appear in the header (because those fields had not been populated).
  • Lastly, the prefixes have not yet been removed from the string. I.e., To: is still present in the first element.

Let’s first tackle the last bullet first. As with any regex problem like this, we can solve with substring(), which returns a smaller portion of the string – a substring!

val recipients =  for (prefix <- recipPrefix) yield{
	email.
	replace("\n\t","").
	split("\n").
	filter(line => line.startsWith(prefix)).
	map(_.substring(prefix.length + 1)).
}
recipients: Seq[Array[String]] = List(Array(sven.becker@enron.com), Array(stuart.staley@enron.com, manfred.ungethum@enron.com, mike.mcconnell@enron.com, jeffrey.shankman@enron.com), Array(stuart.staley@enron.com, manfred.ungethum@enron.com, mike.mcconnell@enron.com, jeffrey.shankman@enron.com))

Great, the prefix no longer appears in each Array[String].

Nested For Comprehension

A For Comprehension translates to:

recipPrefix.forEach(prefix => println(prefix))

Indeed, we could have nested For Comprehension. This is especially useful in our case, as we now need to loop through each element in Array[String] and split() it:

val recipients =  for {
	prefix <- recipPrefix
	x <- email.
		replace("\n\t","").
		split("\n").
		filter(line => line.startsWith(prefix)).
		map(_.substring(prefix.length + 1))
	y <- x.split(", ")
	}
	yield{
	y
}
recipients: Seq[String] = List(sven.becker@enron.com, stuart.staley@enron.com, manfred.ungethum@enron.com, mike.mcconnell@enron.com, jeffrey.shankman@enron.com,stuart.staley@enron.com, manfred.ungethum@enron.com, mike.mcconnell@enron.com, jeffrey.shankman@enron.com)

We cast (over the entire comprehension) to Set to remove duplicates and then cast back to Seq. the final For Comprehension is written thus:

val recipientSeq = (
for {
	prefix <- recipPrefix
	x <- email. // this is a forEach -- iterating through Seq
		replace("\n\t","").
		split("\n").
		filter(line => line.startsWith(prefix)).
		map(_.substring(prefix.length + 1))
		// the below also worked in place of map()
		// collect { case s if (s.contains(":")) =>
			//s.substring(prefix.length + 1)
	y <- x.split(", ") //another forEach -- iterating through Array[String]
} yield 
{
	y	// return y
}
).toSet.toSeq // cast remove duplicates and cast again to initial type

It looks like:

recipientSeq: Seq[String] = ArrayBuffer(stuart.staley@enron.com, mike.mcconnell@enron.com, jeffrey.shankman@enron.com, sven.becker@enron.com, manfred.ungethum@enron.com)

This returns nine unique email address!

scala> recipients.size
res19: Int = 9

Encapsulation

We can wrap all of the above in a class:

def emitRecipTriplet (email:String) : Seq[String] = {
	// These prefixes denote recipients.
	val recipPrefix = Seq("To:","Cc:","Bcc:") 
	
	// Parse sender
	val sender = email.replace("\n\t","").split("\n").filter(line => line.startsWith("From:"))(0).substring(6).trim()

	//Parse Date
	val date = email.replace("\n\t","").split("\n").filter(line => line.startsWith("Date:"))(0).substring(6).trim() 

	// For Comprehension. Parse Recipients
	val recipientSeq = (
		for {
		prefix <- recipPrefix
		x <- email. // this is a flatmap
			replace("\n\t","").
			split("\n").
			filter(line => line.startsWith(prefix)).
			map(_.substring(prefix.length + 1))
		y <- x.split(", ") //another flatmap
		} yield 
		{
			y
		}// comprehension "body"	
	).toSet.toSeq
		
	return recipientSeq.map(r => sender + "\t" + r + "\t" + date)
}

This class takes a single email as input. For each input, it returns a collection (Seq) of tuples. Each tuple would look like:

from:			to:				date:
bill.cordes@enron.com	mike.mcconnell@enron.com	Mon, 24 Jul 2000 00:28:00 -0700 (PDT)

Now we can use this class to iterate and scrape entire corpus.

val buf = scala.collection.mutable.ListBuffer.empty[String]

for(email <- seq20) {
	buf ++= emitRecipTriplet(email)
}

++= lets me add (union?) a collection to a collection. In this case, I’m adding a Seq to a ListBuffer. Each element in buf is a String and not a Seq (I didn’t believe it myself, so I checked).

for (tuple <- buf){
	println(tuple)
}
mark.rodriguez@enron.com        michael.beyer@enron.com Mon, 28 Aug 2000 06:57:00 -0700 (PDT)
mark.rodriguez@enron.com        kevin.mcgowan@enron.com Mon, 28 Aug 2000 06:57:00 -0700 (PDT)
mark.rodriguez@enron.com        stuart.staley@enron.com Mon, 28 Aug 2000 06:57:00 -0700 (PDT)
mark.rodriguez@enron.com        george.mcclellan@enron.com      Mon, 28 Aug 2000 06:57:00 -0700 (PDT)
mark.rodriguez@enron.com        mike.mcconnell@enron.com        Mon, 28 Aug 2000 06:57:00 -0700 (PDT)
mark.rodriguez@enron.com        paula.harris@enron.com  Mon, 28 Aug 2000 06:57:00 -0700 (PDT)
mark.rodriguez@enron.com        jeffrey.shankman@enron.com      Mon, 28 Aug 2000 06:57:00 -0700 (PDT)
mark.rodriguez@enron.com        daniel.reck@enron.com   Mon, 28 Aug 2000 06:57:00 -0700 (PDT)
george.mcclellan@enron.com      stuart.staley@enron.com Sun, 29 Apr 2001 11:54:00 -0700 (PDT)
george.mcclellan@enron.com      mike.mcconnell@enron.com        Sun, 29 Apr 2001 11:54:00 -0700 (PDT)
george.mcclellan@enron.com      jeffrey.shankman@enron.com      Sun, 29 Apr 2001 11:54:00 -0700 (PDT)
george.mcclellan@enron.com      sven.becker@enron.com   Sun, 29 Apr 2001 11:54:00 -0700 (PDT)
george.mcclellan@enron.com      manfred.ungethum@enron.com      Sun, 29 Apr 2001 11:54:00 -0700 (PDT)
stuart.staley@enron.com mike.mcconnell@enron.com        Fri, 4 May 2001 09:10:00 -0700 (PDT)
stuart.staley@enron.com jeffrey.shankman@enron.com      Fri, 4 May 2001 09:10:00 -0700 (PDT)
stuart.staley@enron.com george.mcclellan@enron.com      Sun, 3 Dec 2000 14:59:00 -0800 (PST)
stuart.staley@enron.com jeffrey.shankman@enron.com      Sun, 3 Dec 2000 14:59:00 -0800 (PST)
stuart.staley@enron.com mike.mcconnell@enron.com        Sun, 3 Dec 2000 14:59:00 -0800 (PST)
george.mcclellan@enron.com      michael.beyer@enron.com Mon, 31 Jul 2000 05:48:00 -0700 (PDT)
george.mcclellan@enron.com      kevin.mcgowan@enron.com Mon, 31 Jul 2000 05:48:00 -0700 (PDT)
george.mcclellan@enron.com      stuart.staley@enron.com Mon, 31 Jul 2000 05:48:00 -0700 (PDT)
george.mcclellan@enron.com      mike.mcconnell@enron.com        Mon, 31 Jul 2000 05:48:00 -0700 (PDT)
george.mcclellan@enron.com      jeffrey.shankman@enron.com      Mon, 31 Jul 2000 05:48:00 -0700 (PDT)
george.mcclellan@enron.com      daniel.reck@enron.com   Mon, 31 Jul 2000 05:48:00 -0700 (PDT)
george.mcclellan@enron.com      jordan.mintz@enron.com  Sat, 16 Sep 2000 03:00:00 -0700 (PDT)
george.mcclellan@enron.com      angie.collins@enron.com Sat, 16 Sep 2000 03:00:00 -0700 (PDT)
george.mcclellan@enron.com      matthew.arnold@enron.com        Sat, 16 Sep 2000 03:00:00 -0700 (PDT)
george.mcclellan@enron.com      mike.mcconnell@enron.com        Sat, 16 Sep 2000 03:00:00 -0700 (PDT)
george.mcclellan@enron.com      jeffrey.shankman@enron.com      Sat, 16 Sep 2000 03:00:00 -0700 (PDT)
george.mcclellan@enron.com      daniel.reck@enron.com   Sat, 16 Sep 2000 03:00:00 -0700 (PDT)
george.mcclellan@enron.com      michael.beyer@enron.com Fri, 1 Sep 2000 06:04:00 -0700 (PDT)
george.mcclellan@enron.com      tom.kearney@enron.com   Fri, 1 Sep 2000 06:04:00 -0700 (PDT)
george.mcclellan@enron.com      kevin.mcgowan@enron.com Fri, 1 Sep 2000 06:04:00 -0700 (PDT)
george.mcclellan@enron.com      stuart.staley@enron.com Fri, 1 Sep 2000 06:04:00 -0700 (PDT)
george.mcclellan@enron.com      mike.mcconnell@enron.com        Fri, 1 Sep 2000 06:04:00 -0700 (PDT)
george.mcclellan@enron.com      jeffrey.shankman@enron.com      Fri, 1 Sep 2000 06:04:00 -0700 (PDT)
george.mcclellan@enron.com      daniel.reck@enron.com   Fri, 1 Sep 2000 06:04:00 -0700 (PDT)
george.mcclellan@enron.com      mike.mcconnell@enron.com        Thu, 30 Nov 2000 07:19:00 -0800 (PST)
george.mcclellan@enron.com      jeffrey.shankman@enron.com      Thu, 30 Nov 2000 07:19:00 -0800 (PST)
mary.joyce@enron.com    mike.mcconnell@enron.com        Mon, 11 Sep 2000 02:13:00 -0700 (PDT)
d.hall@enron.com        mike.mcconnell@enron.com        Thu, 24 Aug 2000 00:40:00 -0700 (PDT)
d.hall@enron.com        cathy.phillips@enron.com        Thu, 24 Aug 2000 00:40:00 -0700 (PDT)
cathy.phillips@enron.com        mike.mcconnell@enron.com        Thu, 7 Sep 2000 13:15:00 -0700 (PDT)
cathy.phillips@enron.com        deb.gebhardt@enron.com  Thu, 7 Sep 2000 13:15:00 -0700 (PDT)
jay.hatfield@enron.com  mike.mcconnell@enron.com        Mon, 11 Sep 2000 02:04:00 -0700 (PDT)
bill.cordes@enron.com   mike.mcconnell@enron.com        Mon, 24 Jul 2000 00:28:00 -0700 (PDT)
enron.announcements@enron.com   all.houston@enron.com   Fri, 8 Sep 2000 14:59:00 -0700 (PDT)
john.haggerty@enron.com mike.mcconnell@enron.com        Sun, 10 Sep 2000 01:13:00 -0700 (PDT)
john.nowlan@enron.com   jeffrey.shankman@enron.com      Thu, 21 Sep 2000 10:25:00 -0700 (PDT)
john.nowlan@enron.com   mike.mcconnell@enron.com        Thu, 21 Sep 2000 10:25:00 -0700 (PDT)

Lastly, I sort the collection, cast to RDD type and save in HDFS.

val enronRDD = sc.parallelize(buf.sorted)
enronRDD.saveAsTextFile("hdfs://localhost:54310/user/hduser/out/enronRDD1")

Problem with this is it does not overwrite. I’ll have to write this in terminal:

hdfs dfs -rm -r /user/hduser/out/enronRDD1/