Pydoop stucks on readline from HDFS files

I read the first line of all the files in the directory, it works fine locally, but on EMR this test fails, getting stuck around the 200-300th file. Also ps -eLF shows an increase in the number of children up to 3000 even on the 200th line.

Is this some kind of error in EMR for reading max bytes? pydoop version pydoop == 0.12.0

import os import sys import shutil import codecs import pydoop.hdfs as hdfs def prepare_data(hdfs_folder): folder = "test_folder" copies_count = 700 src_file = "file" #1) create a folder if os.path.exists(folder): shutil.rmtree(folder) os.makedirs(folder) #2) create XXX copies of file in folder for x in range(0, copies_count): shutil.copyfile(src_file, folder+"/"+src_file+"_"+str(x)) #3) copy folder to hdfs #hadoop fs -copyFromLocal test_folder/ /maaz/test_aa remove_command = "hadoop fs -rmr "+ hdfs_folder print remove_command os.system(remove_command) command = "hadoop fs -copyFromLocal "+folder+" "+ hdfs_folder print command os.system(command) def main(hdfs_folder): try: conn_hdfs = hdfs.fs.hdfs() if conn_hdfs.exists(hdfs_folder): items_list = conn_hdfs.list_directory(hdfs_folder) for item in items_list: if not item["kind"] == "file": continue file_name = item["name"] print "validating file : %s" % file_name try: file_handle = conn_hdfs.open_file(file_name) file_line = file_handle.readline() print file_line file_handle.close() except Exception as exp: print '####Exception \'%s\' in reading file %s' % (str(exp), file_name) file_handle.close() continue conn_hdfs.close() except Exception as e: print "####Exception \'%s\' in validating files!" % str(e) if __name__ == '__main__': hdfs_path = '/abc/xyz' prepare_data(hdfs_path) main(hdfs_path) 
+7
python hadoop emr
source share
1 answer

I suggest using the subprocess module to read the first line instead of pydoop conn_hdfs.open_file

 import subprocess cmd='hadoop fs -cat {f}|head -1'.format(f=file_name) process=subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) stdout, stderr=process.communicate() if stderr!='': file_line=stdout.split('\n')[0] else: print "####Exception '{e}' in reading file {f}".format(f=file_name,e=stdout) continue 
+4
source share

All Articles