Tuesday, June 2, 2015

Migrate Large table from Hive/Impala to Netezza

We can follow below steps to migrate a table from Hive to Netezza/IBM Puredata (without sqoop).
It is very useful when the table size if large (more that 100 GB) as the full operation doesn't required local filesystem.


0. Create the target table in netezza.

1. create the target directory in HDFS
hadoop fs -mkdir /user/etl/temp/my_table

2. Login to hive/beeline export the target table/query to the hdfs lcation created above. Here, we can are using csv-serde to put customised quote string [https://drone.io/github.com/ogrodnek/csv-serde/files/target/csv-serde-1.1.2-0.11.0-all.jar]


beeline -u jdbc:hive2://hiveSeverHost:10000/db_name -n etl

> add jar /var/lib/hive/jars/csv-serde-1.1.2-0.11.0-all.jar;

> create table exp_my_table 
row format serde 'com.bizo.hive.serde.csv.CSVSerde'
with serdeproperties(
"separatorChar" = "\,",
"quoteChar" = "\"",
"escapeChar" = "\\")
STORED AS textfile 
LOCATION '/user/etl/temp/my_table/' 
as select * from my_table;

3. Create a unix named pipe:
mkfifo /stage/AIU/ETL/temp/mypipe1

4. Issue below command in background of in a seperate session, loading data from the named pipe create above(this command will stuck until data inserts to the named pipe). Make sure -delim & -quotedValue matching with previous step. Put other nzload options as per requirements.

nzload -host nzhost_IP -u NZ_User -db DB_NAME -t my_table -delim "," -quotedValue DOUBLE -escapeChar '\' -df /stage/AIU/ETL/temp/mypipe1 -ignoreZero YES -ctrlChars -crInString &

To allow loading un-printable strings and ignore "null byte", specially in URLs:
-ignoreZero YES : "|<nul>AB<nul>CDEF|" will become "|ABCDEF|"
-ctrlChars : accept control chars in char/varchar fields
-crInString : allow unescaped carriage returns (cr), ine feed (LF) becomes the default end-of-row indicator

ref: https://www-304.ibm.com/support/knowledgecenter/SSULQD_7.1.0/com.ibm.nz.load.doc/c_load_handle_legal_characters.html

5. Now from a separate session(if step 4 is not in background), copy the hive table HDFS location(step 2) to the named pipe(created in step 3).

hadoop fs -cat /user/etl/temp/my_table/* > /stage/AIU/ETL/temp/mypipe1

6. It will start flowing data to the named pipe and naload was waiting to get data in the named pipe(step 4). So, at this point data starts flowing to netezza. When data loading done or any error encountered both nzload and hdfs copy commands will exit. Please check nzload log and on screen unix related error for further problem solving.


HDFS Directories Under Metastore(hive) Location (/user/hive/warehouse/) Without Corresponding Metastore Tables

Some times it happens that for some reason we drop tables from hive/impala but the corresponding HDFS directories are not removed.
The issue mainly as per my and also from below link, is Trash directory. Either the permission on the directory or there are no HDFS home directory exists for the user dropping a table.

https://groups.google.com/a/cloudera.org/forum/#!topic/cdh-user/8J0IZf8E1yc

In our case we suspect that, it was a HUE user without HDFS home directory.

Whatever it is, we have to keep track of these orphan directories.
Manually identifying these HDFS location too tiring to a lazy guys like me. So, I  prepared below simple python script for this. Hope it will help others too.

Please comment if you have better idea or there is something wrong with my script.
Please test properly before you proceed with any action based on this script.

#!/usr/bin/python

# Script to find HDFS directories under metastore location (/user/hive/warehouse/) without corresponding tables
# just call call the script with the target database name as mandatory parameter, if no parameter it will show error
# it the db name it wrong it will show "No such file or directory" & "Database does not exist"
# Prerequisites: hive CLI (NOT beeline), python 2.6, dircet access to the target hadoop cluster

import sys

# Convert a String having words separated by new line(\n) to list
# Example input : convertNewLineStringsToList('abc\n123\n')
# Example output: ['abc', '123']
def convertNewLineStringsToList(stringWithNewLine):
out = []
buff = []
for c in stringWithNewLine:
if c == '\n':
out.append(''.join(buff))
buff = []
else:
buff.append(c)
else:
if buff and buff[0]:
out.append(''.join(buff))
return out

# Get list of all directory names for corresponding table from HDFS for a metastore database
# Example input : getHDFSDirForDB("dbname"))
# Example output: ['table_a', 'table_b'] 
def getHDFSDirForDB(dbname):
import os
# get HDFS list by running hadoop fs command from OS
hdfs_list = os.popen("hadoop fs -ls /user/hive/warehouse/"+ dbname +".db/|awk '{print $8}'|cut -d'/' -f6")
hdfs_list = hdfs_list.read()
# collect HDFS dirs as list
out = convertNewLineStringsToList(hdfs_list)
# return HDFS dirs as list which are non-empty, keep tha case intact, as HDFS location should be exactly match
return [x for x in out if x]


# Get list of all table names for a metastore database
# Example input : getTablesForDB("dbname")
# Example output: ['table_a', 'table_b'] 
def getTablesForDB(dbname):
import os
# get HDFS list by running hive command from OS
# here we are using silent mode(-S) to get rid of unwanted texts
table_list = os.popen("hive --database "+dbname+" -S -e \"show tables\"|sed -n '1!p'")
table_list = table_list.read()
# collect table names as list
out = convertNewLineStringsToList(table_list)
  # return table names as list which are non-empty and make in lower case return [x.lower() for x in out if x]



# main program called with the target DB name for which we want to find HDFS directories without corresponding tables
# Example input: main ("dbname")
# Example output:
# table_a 
# table_b  
def main(dbname):
HDFS_LIST = getHDFSDirForDB(dbname)
DB_LIST = getTablesForDB(dbname)

# directories that do not have corresponding tables
# It mean, components of HDFS_LIST which are not common in DB_LIST
# make HDFS location lowercase while comparing with DB table list(already lowercase) diff = [ x for x in HDFS_LIST if x.lower() not in DB_LIST ]

print  '\n'.join(diff)


#________________main________________
if __name__ == "__main__":
    main(sys.argv[1])


Thanks for the idea of set base minus operation http://stackoverflow.com/questions/13169725/how-to-convert-a-string-that-has-newline-characters-in-it-into-a-list-in-python