import ctypes """ phdfs is a python module that wrap the libhdfs C library using ctypes. It allows more directly interface with the files in hadoop file system. (There is a similar attemp at http://www.stat.purdue.edu/~sguha/code.html using SWIG.) In order to use phdfs, you will have to set the CLASSPATH environment variable to point to all hadoop jar files and set the _libjvm and _libhdfs to the correct libjvm.so and libhdfs.so respectively. Copyright 2008 Chen-Shan Chin Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ _libjvm = ctypes.CDLL('./libjvm.so') _libhdfs = ctypes.CDLL('./libhdfs.so') class _hdfsFileInfo(ctypes.Structure): """ tObjectKind mKind; /* file or directory */ char *mName; /* the name of the file */ tTime mLastMod; /* the last modification time for the file*/ tOffset mSize; /* the size of the file in bytes */ short mReplication; /* the count of replicas */ tOffset mBlockSize; """ _fields_ = [('mKind', ctypes.c_char), ('mName', ctypes.c_char_p), ('mLastMod', ctypes.c_int32), ('mSize', ctypes.c_int64), ('mReplication', ctypes.c_short), ('mBlockSize', ctypes.c_int64)] class _hdfsFile_internal(ctypes.Structure): """ void* file; enum hdfsStreamType type; """ _fields_ = [('file', ctypes.c_void_p), ('hdfsStreamType', ctypes.c_int)] class hdfsFileInfo: def __init__(self, fileInfo): self.mKind = fileInfo.mKind self.mName = fileInfo.mName self.mLastMod = fileInfo.mLastMod self.mSize = fileInfo.mSize self.mReplication = fileInfo.mReplication self.mBlockSize = fileInfo.mBlockSize #HDFS Connection _hdfsConnect = _libhdfs.hdfsConnect _hdfsConnect.restype = ctypes.c_void_p _hdfsDisconnect = _libhdfs.hdfsDisconnect #HDFS File _hdfsOpenFile = _libhdfs.hdfsOpenFile _hdfsOpenFile.restype = ctypes.POINTER(_hdfsFile_internal) _hdfsCloseFile = _libhdfs.hdfsCloseFile _hdfsExists = _libhdfs.hdfsExists _hdfsSeek = _libhdfs.hdfsSeek _hdfsTell = _libhdfs.hdfsTell _hdfsTell.restype = ctypes.c_int64 _hdfsRead = _libhdfs.hdfsRead _hdfsPread = _libhdfs.hdfsPread _hdfsWrite = _libhdfs.hdfsWrite _hdfsFlush = _libhdfs.hdfsFlush #Utilities _hdfsCopy = _libhdfs.hdfsCopy _hdfsMove = _libhdfs.hdfsMove _hdfsDelete = _libhdfs.hdfsDelete _hdfsRename = _libhdfs.hdfsRename _hdfsGetWorkingDirectory = _libhdfs.hdfsGetWorkingDirectory _hdfsSetWorkingDirectory = _libhdfs.hdfsSetWorkingDirectory _hdfsCreateDirectory = _libhdfs.hdfsCreateDirectory _hdfsSetReplication = _libhdfs.hdfsSetReplication _hdfsListDirectory = _libhdfs.hdfsListDirectory _hdfsListDirectory.restype = ctypes.POINTER(_hdfsFileInfo) _hdfsFreeFileInfo = _libhdfs.hdfsFreeFileInfo _hdfsGetPathInfo = _libhdfs.hdfsGetPathInfo _hdfsGetPathInfo.restype = ctypes.POINTER(_hdfsFileInfo) _hdfsGetHosts = _libhdfs.hdfsGetHosts _hdfsGetHosts.restype = ctypes.POINTER(ctypes.POINTER(ctypes.c_char_p)) _hdfsFreeHosts = _libhdfs.hdfsFreeHosts _hdfsGetDefaultBlockSize = _libhdfs.hdfsGetDefaultBlockSize _hdfsGetDefaultBlockSize.restype = ctypes.c_int64 _hdfsGetCapacity = _libhdfs.hdfsGetCapacity _hdfsGetCapacity.restype = ctypes.c_int64 _hdfsGetUsed = _libhdfs.hdfsGetUsed _hdfsGetUsed.restype = ctypes.c_int64 class hdfFS: def __init__(self, host, port): self.host = host self.port = port self.handle = _hdfsConnect(ctypes.c_char_p(host), ctypes.c_int16(port)) def disconnect(self): _hdfsDisconnect(self.handle) def exists(self, path): if _hdfsExists(self.handle, path)==0: return 1 else: return 0 def copy(self, src, dstFS, dst): return _hdfsCopy(self.handle, src, dstFS.handle, dst) def move(self, src, dstFS, dst): return _hdfsMove(self.handle, src, dstFS.handle, dst) def delete(self, path): return _hdfsDelete(self.handle, path) def rename(self, oldPath, newPath): return _hdfsRename(self.handle, oldPath, newPath) def getWorkingDirectory(self): buffer = ctypes.create_string_buffer(4096) _hdfsGetWorkingDirectory(self.handle, ctypes.pointer(buffer), 4096) rtnStr = "".join([c for c in buffer]) e = rtnStr.index('\000') return rtnStr[:e] def setWorkingDirectory(self, path): return _hdfsSetWorkingDirectory(self.handle, path) def createDirectory(self, path): return _hdfsCreateDirectory(self.handle, path) def setReplication(self, path, replication): _hdfsSetReplication(self.handle, ctypes.c_char_p(path), ctypes.c_int16(replication)) def listDirectory(self, path): if not self.exists(path): return None rtn =[] numberOfEnteries = ctypes.c_int(0) r = _hdfsListDirectory(self.handle, path, ctypes.byref(numberOfEnteries)) i = 0 while i < numberOfEnteries.value: rtn.append(hdfsFileInfo(r[i])) i += 1 if r: _hdfsFreeFileInfo(r, numberOfEnteries) return rtn def getPathInfo(self, path): r = _hdfsGetPathInfo(self.handle, path)[0] return r def getHosts(self, path, start, length): if not self.exists(path): return None rtn = [] r=_hdfsGetHosts(self.handle, path, ctypes.c_int64(start), ctypes.c_int64(length)) i = 0 while r[0][i]: rtn.append(r[0][i]) i += 1 if r: _hdfsFreeHosts(r) return rtn def getDefaultBlockSize(self): return _hdfsGetDefaultBlockSize(self.handle) def getCapacity(self): return _hdfsGetCapacity(self.handle) def getUsed(self): return _hdfsGetUsed(self.handle) O_RDONLY = 0 O_WRONLY = 1 class hdfsFile: def __init__(self, FS, filename, flag=O_RDONLY, bufferSize=0,replication=0,blockSize=0): self.FS = FS self.flag = flag self.handle = _hdfsOpenFile(FS.handle, filename, ctypes.c_int(flag), ctypes.c_int(bufferSize), ctypes.c_short(replication), ctypes.c_int32(blockSize)) self.path = FS.getPathInfo(filename).mName print self.path def close(self): return _hdfsCloseFile(self.FS.handle, self.handle) def seek(self, position): return _hdfsSeek(self.FS.handle, self.handle, ctypes.c_int64(position)) def tell(self): return _hdfsTell(self.FS.handle, self.handle) def flush(self): return _hdfsFlush(self.FS.handle, self.handle) def available(self): return _hdfsAvaiable(self.FS.handle, self.handle) def read(self, length=0): if length == 0: length = self.FS.getPathInfo(self.path).mSize buffer = ctypes.create_string_buffer(length) ctypes.memset(buffer, 0, length) rtnLength = _hdfsRead(self.FS.handle, self.handle, ctypes.pointer(buffer), length) return buffer, rtnLength def Pread(self, position, length): buffer = ctypes.create_string_buffer(length) ctypes.memset(buffer, 0, length) rtnLength = _hdfsPread(self.FS.handle, self.handle, ctypes.c_int64(position), ctypes.pointer(buffer), length) return buffer, rtnLength def write(self, data): # data should be python string buffer = ctypes.create_string_buffer(data) _hdfsWrite(self.FS.handle, self.handle, ctypes.pointer(buffer), ctypes.c_int32(len(data))) self.flush() def readline(self): bufferSize = 256 rtnLine = "" position = self.tell() while 1: rtnData, rtnLength = self.read(bufferSize) if rtnLength == 0: return None tmpS = "".join([ c for c in rtnData]) try: lineEndPos = tmpS.index("\n") break except ValueError: rtnLine = "".join([rtnLine, tmpS]) continue rtnLine = "".join([rtnLine, tmpS[:(lineEndPos+1)]]) self.seek(position + len(rtnLine)) return rtnLine def xreadlines(self): while 1: rtn = self.readline() if rtn == None: break yield rtn def readlines(self): return [l for l in self.xreadlines()] def freadlines(self): bufferSize = self.FS.getPathInfo(self.path).mSize - self.tell() - 1 tmpS = "".join([ c for c in self.read(bufferSize)[0]]) return tmpS.split("\n") def test(): fs = hdfFS('localhost', 9000) fs.createDirectory('test') fs.setWorkingDirectory('test') print fs.getWorkingDirectory() file = hdfsFile(fs, "testfile", flag=O_WRONLY, replication=1, bufferSize=1024) file.write("01234567890123456789\n" * 10) file.close() file = hdfsFile(fs, "testfile") print "test Pread" print "".join([c for c in file.Pread(5,25)[0]]) print print "test read" file.seek(0) print "".join([c for c in file.read(100)[0]]) print print "test readline" file.seek(0) print file.readline() print file.readline() print print "test xreadline" for l in file.xreadlines(): print l print print "test readlines" file.seek(0) for l in file.readlines(): print l file.close() r= fs.getHosts('/user/chinj1/test/testfile', 0, 15) print r r = fs.listDirectory('./') print [e.mName for e in r] fs.delete('./testfile') r = fs.listDirectory('../') print [e.mName for e in r] fs.disconnect() if __name__ == '__main__': test()