在本博客系列的第一篇中我介紹了兩個自然語言處理庫(John Snow Labs的Apache Spark NLP和Explosion AI的spaCy),并用它們訓練了分詞和詞性標注的模型。在本篇中我將繼續(xù)構(gòu)建并運行一個自然語言處理(NLP)管道,來把這些訓練好的模型應用于新的文本數(shù)據(jù)。
導入測試數(shù)據(jù)是一個具有挑戰(zhàn)性的步驟,因為我的測試數(shù)據(jù)是由未格式化的、未進行句子邊界界定的文本構(gòu)成的,是粗糙的和異構(gòu)的。我要處理一個包含.txt文件的文件夾,并且需要將結(jié)果保存為文字標簽的格式以便將其與正確的答案進行比較。下面讓我們來解決它:
spaCy
start = time.time()
path = “./target/testing/”
files = sorted([path + f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))])
prediction = {}
for file in files:
? ? fo = io.open(file, mode=’r’, encoding=’utf-8′)
? ? content = []
? ? for doc in nlp_model(re.sub(“\\s+”, ‘ ‘, fo.read())):
? ? ? ? content.append((doc.text, doc.tag_))
? ? prediction[file] = content
? ? fo.close()
?? ?
print (time.time() – start)
另一種并行計算方法是使用generator和spaCy的語言管道。 像下面這樣的方式也可以解決數(shù)據(jù)導入的問題。
spaCy
from spacy.language import Language
import itertools
def genf():
? ? path = “./target/testing/”
? ? files = sorted([path + f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))])
? ? for file in files:
? ? ? ? fo = io.open(file, mode=’r’, encoding=’utf-8′)
? ? ? ? t = re.sub(“\\s+”, ‘ ‘, fo.read())
? ? ? ? fo.close()
? ? ? ? yield (file, t)
?? ? ? ?
gen1, gen2 = itertools.tee(genf())
files = (file for (file, text) in gen1)
texts = (text for (file, text) in gen2)
start = time.time()
prediction = {}
for file, doc in zip(files, nlp_model.pipe(texts, batch_size=10, n_threads=12)):
? ? content = []
? ? for d in doc:
? ? ? ? content.append((d.text, d.tag_))
? ? prediction[file] = content
print (time.time() – start)
Spark-NLP
var data = spark.read.textFile(“./target/testing”).as[String]
? ? .map(_.trim()).filter(_.nonEmpty)
? ? .withColumnRenamed(“value”, “text”)
? ? .withColumn(“filename”, regexp_replace(input_file_name(), “file://”, “”))
data = data.groupBy(“filename”).agg(concat_ws(” “, collect_list(data(“text”))).as(“text”))
? ? .repartition(12)
? ? .cache
val files = data.select(“filename”).distinct.as[String].collect
val result = model.transform(data)
val prediction = Benchmark.time(“Time to collect predictions”) {
? ? result
? ? ? ? .select(“finished_token”, “finished_pos”, “filename”).as[(Array[String], Array[String], String)]
? ? ? ? .map(wt => (wt._3, wt._1.zip(wt._2)))
? ? ? ? .collect.toMap
}
和Apache Spark一樣,可以用textFile()來從文件夾讀取文本文件,雖然它是逐行讀取的。 我需要識別每行的文件名,從而可以再次將它們組合起來。幸運的是,input_file_name()這就是這樣實現(xiàn)的。 在讀取文件后,繼續(xù)分組,并用空格連接這些行。
請注意,上面的兩個代碼片段都沒有使用NLP庫特有的代碼。spaCy的代碼里用的是Python的文件操作,而Spark-NLP代碼則使用了Spark的原生數(shù)據(jù)加載和處理的原語??梢钥吹?,如果正確配置了Spark的群集,則上面的Spark代碼在導入10KB、10MB或10 TB的文件上的工作方式是完全相同的。此外,你對每種庫的學習曲線取決于你對這個庫的生態(tài)系統(tǒng)的熟悉程度。
測量結(jié)果
如果我們迄今為止所做的看起來很難懂,那么這一部分會非常困難懂。當我們的答案以不同方式分詞時,我們?nèi)绾魏饬縋OS準確性? 我不得不在這里施展一些“魔術(shù)”,而我相信這會帶來爭議。沒有簡單的方法可以客觀地計算結(jié)果,公平地比較,所以下面是我想出的方法。
spaCy
首先,我需要處理結(jié)果文件夾。該文件夾里有很多.txt文件,看起來與訓練數(shù)據(jù)完全一樣,有與上一步中使用的測試數(shù)據(jù)相同的文件名。
answers = {}
total_words = 0
for file in result_files:
? ? fo = io.open(file, mode=’r’, encoding=’utf-8′)
? ? file_tags = []
? ? for pair in re.split(“\\s+”, re.sub(“\\s+”, ‘ ‘, fo.read())):
? ? ? ? total_words += 1
? ? ? ? tag = pair.strip().split(“|”)
? ? ? ? file_tags.append((tag[0], tag[-1]))
? ? answers[file] = file_tags
? ? fo.close()
print(total_words)
Spark-NLP
對于Spark-NLP,我用了與解析POS注釋器元組相同的函數(shù)。它屬于一個名為ResourceHelper的助手對象。ResourceHelper中有許多類似的幫助功能。
var total_words = 0
val answer = files.map(_.replace(“testing”, “answer”)).map(file => {
? ? val content = ResourceHelper
? ? ? ? .parseTupleSentences(file, “TXT”, ‘|’, 500).flatMap(_.tupleWords)
? ? ? ? .flatMap(_.tupleWords)
? ? total_words += content.length
? ? (file, content)
}).toMap
println()
println(total_words)
下面就是“魔術(shù)”發(fā)生的地方。我有這樣的答案,在spaCy和Spark-NLP中都有像(文件名,數(shù)組((word,tag))的字典。這與我們在預測步驟中使用的格式相同。
所以,我創(chuàng)建了一個小算法。在該算法中,我比較每一對預測和答案中的單詞。 對于每對匹配的單詞,我加一個匹配的標記。并對于每對匹配的POS標識,我都計一次成功。
但是,由于單詞的分詞方式與ANC的結(jié)果不同,因此我需要設(shè)置一個小窗口,讓單詞有機會與ANC中特定數(shù)量的分詞相匹配,或者知道在文件的哪里繼續(xù)開始搜索。
如果預測的單詞是次分詞的(比ANC少的分詞數(shù)),那么這個單詞將永遠不會匹配并被忽略。例如,two-legged|JJ在ANC中是two| NN和legged| JJ,那么我將收集并拼接ANC分詞直到確認它被次分詞了,然后忽略它(下面代碼里的construct是次分詞的集合)。
然后,將索引放置在最近的匹配位置。所以如果預測的詞出現(xiàn)較晚(由于前一個詞被次分詞),它最終將能在此范圍內(nèi)被找到,并對其進行計數(shù)。
以下是上述算法在代碼中的部分:
spaCy
start = time.time()
word_matches = 0
tag_matches = 0
for file in list(prediction.keys()):
? ? last_match = 0
? ? print(“analyzing: ” + file)
? ? for pword, ptag in answers[file.replace(‘testing’, ‘answer’)]:
? ? ? ? print(“target word is: ” + pword)
? ? ? ? last_attempt = 0
? ? ? ? construct = ”
? ? ? ? for word, tag in prediction[file][last_match:]:
? ? ? ? ? ? if word.strip() == ”:
? ? ? ? ? ? ? ? last_match += 1
? ? ? ? ? ? ? ? continue
? ? ? ? ? ? construct += word
? ? ? ? ? ? print(“against: ” + word + ” or completion of construct ” + pword)
? ? ? ? ? ? last_attempt += 1
? ? ? ? ? ? if pword == word:
? ? ? ? ? ? ? ? print(“word found: ” + word)
? ? ? ? ? ? ? ? if ptag == tag:
? ? ? ? ? ? ? ? ? ? print(“match found: ” + word + ” as ” + tag)
? ? ? ? ? ? ? ? ? ? tag_matches += 1
? ? ? ? ? ? ? ? word_matches += 1
? ? ? ? ? ? ? ? last_match += last_attempt
? ? ? ? ? ? ? ? break
? ? ? ? ? ? elif pword == construct:
? ? ? ? ? ? ? ? print(pword + ” construct complete. No point in keeping the search”)
? ? ? ? ? ? ? ? last_match += last_attempt
? ? ? ? ? ? ? ? break
? ? ? ? ? ? elif len(pword) <= len(construct):
? ? ? ? ? ? ? ? print(pword + ” construct larger than target. No point in keeping the search”)
? ? ? ? ? ? ? ? if (pword in construct):
? ? ? ? ? ? ? ? ? ? last_match += last_attempt
? ? ? ? ? ? ? ? break
print (time.time() – start)
運行時間
analyzing: ./target/testing/20000424_nyt-NEW.txt
target word is: IRAQ-POVERTY
against: IRAQ-POVERTY or completion of construct IRAQ-POVERTY
word found: IRAQ-POVERTY
target word is: (
against: ( or completion of construct (
word found: (
match found: ( as (
target word is: Washington
against: Washington or completion of construct Washington
word found: Washington
match found: Washington as NNP
target word is: )
against: ) or completion of construct )
word found: )
match found: ) as )
target word is: Rep.
against: Rep or completion of construct Rep.
against: . or completion of construct Rep.
Rep. construct complete. No point in keeping the search
正如代碼里所示,它會給分詞一個找到匹配的機會。這個算法基本上能保持分詞的同步,僅此而已。
print(“Total words: ” + str(total_words))
print(“Total token matches: ” + str(word_matches))
print(“Total tag matches: ” + str(tag_matches))
print(“Simple Accuracy: ” + str(tag_matches / word_matches))
運行時間
Total words: 21381
Total token matches: 20491
Total tag matches: 17056
Simple Accuracy: 0.832365428724806
Spark-NLP
我們在Spark-NLP中看到類似的邏輯,只是代碼是命令式和函數(shù)式編程的一點混合。
var misses = 0
var token_matches = 0
var tag_matches = 0
for (file <- prediction.keys) {
? ? var current = 0
? ? for ((pword, ptag) <- prediction(file)) {
? ? ? ? println(s”analyzing: ${pword}”)
? ? ? ? var found = false
? ? ? ? val tags = answer(file.replace(“testing”, “answer”))
? ? ? ? var construct = “”
? ? ? ? var attempt = 0
? ? ? ? tags.takeRight(tags.length – current).iterator.takeWhile(_ => !found && construct != pword).foreach { case (word, tag) => {
? ? ? ? ? ? construct += word
? ? ? ? ? ? println(s”against: $word or if matches construct $construct”)
? ? ? ? ? ? if (word == pword) {
? ? ? ? ? ? ? ? println(s”word match: $pword”)
? ? ? ? ? ? ? ? token_matches += 1
? ? ? ? ? ? ? ? if (tag == ptag) {
? ? ? ? ? ? ? ? ? ? println(s”tag match: $tag”)
? ? ? ? ? ? ? ? ? ? tag_matches += 1
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? found = true
? ? ? ? ? ? }
? ? ? ? ? ? else if (pword.length < construct.length) {
? ? ? ? ? ? ? ? if (attempt > 0) {
? ? ? ? ? ? ? ? ? ? println(s”construct $construct too long for word $pword against $word”)
? ? ? ? ? ? ? ? ? ? attempt -= attempt
? ? ? ? ? ? ? ? ? ? misses += 1
? ? ? ? ? ? ? ? ? ? println(s”failed match our $pword against their $word or $construct”)
? ? ? ? ? ? ? ? ? ? found = true
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? attempt += 1
? ? ? ? }}
? ? ? ? current += attempt
? ? }
}
運行時間
analyzing: NYT20020731.0371
against: NYT20020731.0371 or if matches construct NYT20020731.0371
word match: NYT20020731.0371
analyzing: 2002-07-31
against: 2002-07-31 or if matches construct 2002-07-31
word match: 2002-07-31
analyzing: 23:38
against: 23:38 or if matches construct 23:38
word match: 23:38
analyzing: A4917
against: A4917 or if matches construct A4917
word match: A4917
analyzing: &
against: & or if matches construct &
word match: &
tag match: CC
下面計算指標。
println(“Total words: ” + (total_words))
println(“Total token matches: ” + (token_matches))
println(“Total tag matches: ” + (tag_matches))
println(“Simple Accuracy: ” + (tag_matches * 1.0 / token_matches))
運行時間
Total words: 21362
Total token matches: 18201
Total tag matches: 15318
Simple Accuracy: 0.8416021097741883
Spark-NLP,但是Spark在哪里?
至此我并沒有討論太多Spark在這里的作用。但是,為了不讓你進一步感到無聊,讓我把所有內(nèi)容都放在下面易于閱讀的格式里:
- 你猜對了!這里所做的對Spark來說是殺雞用牛刀。你不需要用四把錘子來砸一個釘子。我的意思是,Spark在默認情況下是以分布式方式在工作,用于處理非常大的數(shù)據(jù)(例如默認的spark.sql.shuffle.partitions為200)。在本文的場景里,我試圖控制我使用的數(shù)據(jù)量,而不是擴大太多。 “大數(shù)據(jù)”可能會成為敵人,內(nèi)存問題、糟糕的并行性或者慢的算法可能會讓你掉到坑里。
- 您可以輕松地將HDFS用于這里的管道。增加文件的數(shù)量或增加數(shù)據(jù)量,同樣的代碼也能夠處理。你也可以充分利用恰當?shù)姆謪^(qū)數(shù)和內(nèi)存計算。
- 也同樣可以將此算法用于一個分布式群集,將其提交到任何節(jié)點上的driver,Spark將自動進行分布式的工作負載分配。
- 這個特定的NLP程序并不是一個好的可擴展的解決方案。基本上,如果應用到幾百MB的文本文件中,那么使用collect()操作會將所有單詞和標簽收集到driver里,就會導致driver的內(nèi)存不足。在這種情況下,測量分詞的表現(xiàn)就需要用MapReduce任務來計算POS匹配的數(shù)量。這就解釋了為什么Spark-NLP需要比SpaCy更長的時間才能將少量的預測結(jié)果傳入driver,但是對于大數(shù)據(jù)量的輸入這就會有優(yōu)勢了。
下一步做什么?
在這篇文章中,我們比較了在兩個庫上運行和評估基準NLP管道的工作。 總體來說,個人偏好或經(jīng)驗可能會使用戶更傾向于使用核心Python庫和spaCy這樣的命令式編程,或是選擇核心Spark庫和Spark-NLP這樣的函數(shù)式編程。
對于我們在這里測試的小數(shù)據(jù)集,兩種庫的運行時間均少于1秒,并且準確度相當。在此博客系列的第三篇中,我們將用其他大小的數(shù)據(jù)和參數(shù)繼續(xù)評估。
相關(guān)資料:
Saif Addin Ellafi
Saif Addin Ellafi是一名軟件開發(fā)者、分析師、數(shù)據(jù)科學家,并永遠是一名學生。他同時還是一名極限運動和游戲愛好者。他在銀行和金融行業(yè)的數(shù)據(jù)領(lǐng)域擁有豐富的解決問題和測試的經(jīng)驗?,F(xiàn)在他在John Snow Labs,并是Spark-NLP的主要貢獻者。

