/ / Python mrjob mapreduce wie man die Eingabedatei vorverarbeitet - python, hadoop, mrjob, bigdata

Python mrjob mapreduce wie man die Eingabedatei vorverarbeitet - python, hadoop, mrjob, bigdata

Ich versuche, eine XML-Datei vorzuverarbeiten, um bestimmte Knoten zu extrahieren, bevor sie in MapReduce eingefügt werden. Ich habe den folgenden Code:

from mrjob.compat import jobconf_from_env
from mrjob.job import MRJob
from mrjob.util import cmd_line, bash_wrap

class MRCountLinesByFile(MRJob):
def configure_options(self):
super(MRCountLinesByFile, self).configure_options()
self.add_file_option("--filter")

def mapper_cmd(self):
cmd = cmd_line([self.options.filter, jobconf_from_env("mapreduce.map.input.file"])
return cmd



if __name__ == "__main__":
MRCountLinesByFile.run()

Und in der Befehlszeile tippe ich:

python3 test_job_conf.py --filter ./filter.py -r local < test.txt

test.txt ist eine normale XML-Datei wie Hier. Während filter.py ist ein Skript, um alle Titelinformationen zu finden.

Ich erhalte jedoch die folgenden Fehler:

Creating temp directory /tmp/test_job_conf.vagrant.20160406.042648.689625
Running step 1 of 1...
Traceback (most recent call last):
File "./filter.py", line 8, in <module>
with open(filename) as f:
FileNotFoundError: [Errno 2] No such file or directory: "None"
Step 1 of 1 failed: Command "["./filter.py", "None"]" returned non-zero exit status 1

Es sieht aus wie mapreduce.map.input.file machen None in diesem Fall. Wie kann ich das fragen? mapper_cmd Funktion, um die Datei zu lesen mrjob liest gerade?

Antworten:

0 für die Antwort № 1

Nach meinem Verständnis geht das in Ihre Selbst. Add_file_option sollte den Pfad zu Ihrer Datei haben.

self.add_file_option("--items", help="Path to u.item")

Ich verstehe Ihr Szenario nicht richtig, aber hierist mein Verständnis. Sie verwenden die configure-Option, um sicherzustellen, dass eine bestimmte Datei zur Verarbeitung an alle Mapper gesendet wird, wenn Sie beispielsweise eine zusätzliche Suche nach Daten in einer anderen Datei als der Quelle durchführen möchten. Diese zusätzliche Lookup-Datei wird von self.add_file_option zur Verfügung gestellt ("- items", help = "Pfad zu u.item").

Um etwas vorzusprechen, sagen Sie vor einem Reduzierer oder einemMapper-Phase verwenden Sie den Reducer_init oder den Mapper_init. Diese Init- oder die Verarbeitungsschritte müssen auch in Ihrer Schrittfunktion erwähnt werden, wie zum Beispiel unten gezeigt.

def steps(self):
return [
MRStep(mapper=self.mapper_get_name,
reducer_init=self.reducer_init,
reducer=self.reducer_count_name),
MRStep(reducer = self.reducer_find_maxname)
]

Innerhalb Ihrer Init-Funktion tun Sie das AktuellePre-processing von was Sie vor dem Senden an Mapper oder Reducer tun müssen. Sagen Sie zum Beispiel öffnen Sie eine Datei xyz und kopieren Sie die Werte in das erste Feld in einem anderen Feld, das ich in meinem Reduzierer verwenden würde und die gleiche ausgeben.

def reducer_init(self):
self.movieNames = {}
with open("xyz") as f:
for line in f:
fields = line.split("|")
self.myNames[fields[0]] = fields[1]

Hoffe das hilft!!