当前位置: 首页 > news >正文

DataX的python3使用

datax这东西本身是python2写的,这导致python3,就各种语法报错,问题是,现在的工程都是python3搞的,这就很难受....

网上找到一篇帖子,可以解决这个问题:

原帖:python3执行datax报错问题解决办法 - 卷心菜的奇妙历险 - 博客园

怕回头原帖看不了了,再记录一次,这,很无奈.....之前一些帖子,突然就看不了了....

解决办法

路径 datax/bin/  下

需要替换的三个文件,注意备份:

1.datax.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import sys
import os
import signal
import subprocess
import time
import re
import socket
import json
from optparse import OptionParser
from optparse import OptionGroup
from string import Template
import codecs
import platform

def isWindows():
    return platform.system() == 'Windows'

DATAX_HOME = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

DATAX_VERSION = 'DATAX-OPENSOURCE-3.0'
if isWindows():
    codecs.register(lambda name: name == 'cp65001' and codecs.lookup('utf-8') or None)
    CLASS_PATH = ("%s/lib/*") % (DATAX_HOME)
else:
    CLASS_PATH = ("%s/lib/*:.") % (DATAX_HOME)
LOGBACK_FILE = ("%s/conf/logback.xml") % (DATAX_HOME)
DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
DEFAULT_PROPERTY_CONF = "-Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=%s -Dlogback.configurationFile=%s" % (
    DATAX_HOME, LOGBACK_FILE)
ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s  ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (
    DEFAULT_PROPERTY_CONF, CLASS_PATH)
REMOTE_DEBUG_CONFIG = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999"

RET_STATE = {
    "KILL": 143,
    "FAIL": -1,
    "OK": 0,
    "RUN": 1,
    "RETRY": 2
}


def getLocalIp():
    try:
        return socket.gethostbyname(socket.getfqdn(socket.gethostname()))
    except:
        return "Unknown"


def suicide(signum, e):
    global child_process
    print >> sys.stderr, "[Error] DataX receive unexpected signal %d, starts to suicide." % (signum)

    if child_process:
        child_process.send_signal(signal.SIGQUIT)
        time.sleep(1)
        child_process.kill()
    print >> sys.stderr, "DataX Process was killed ! you did ?"
    sys.exit(RET_STATE["KILL"])


def register_signal():
    if not isWindows():
        global child_process
        signal.signal(2, suicide)
        signal.signal(3, suicide)
        signal.signal(15, suicide)


def getOptionParser():
    usage = "usage: %prog [options] job-url-or-path"
    parser = OptionParser(usage=usage)

    prodEnvOptionGroup = OptionGroup(parser, "Product Env Options",
                                     "Normal user use these options to set jvm parameters, job runtime mode etc. "
                                     "Make sure these options can be used in Product Env.")
    prodEnvOptionGroup.add_option("-j", "--jvm", metavar="<jvm parameters>", dest="jvmParameters", action="store",
                                  default=DEFAULT_JVM, help="Set jvm parameters if necessary.")
    prodEnvOptionGroup.add_option("--jobid", metavar="<job unique id>", dest="jobid", action="store", default="-1",
                                  help="Set job unique id when running by Distribute/Local Mode.")
    prodEnvOptionGroup.add_option("-m", "--mode", metavar="<job runtime mode>",
                                  action="store", default="standalone",
                                  help="Set job runtime mode such as: standalone, local, distribute. "
                                       "Default mode is standalone.")
    prodEnvOptionGroup.add_option("-p", "--params", metavar="<parameter used in job config>",
                                  action="store", dest="params",
                                  help='Set job parameter, eg: the source tableName you want to set it by command, '
                                       'then you can use like this: -p"-DtableName=your-table-name", '
                                       'if you have mutiple parameters: -p"-DtableName=your-table-name -DcolumnName=your-column-name".'
                                       'Note: you should config in you job tableName with ${tableName}.')
    prodEnvOptionGroup.add_option("-r", "--reader", metavar="<parameter used in view job config[reader] template>",
                                  action="store", dest="reader",type="string",
                                  help='View job config[reader] template, eg: mysqlreader,streamreader')
    prodEnvOptionGroup.add_option("-w", "--writer", metavar="<parameter used in view job config[writer] template>",
                                  action="store", dest="writer",type="string",
                                  help='View job config[writer] template, eg: mysqlwriter,streamwriter')
    parser.add_option_group(prodEnvOptionGroup)

    devEnvOptionGroup = OptionGroup(parser, "Develop/Debug Options",
                                    "Developer use these options to trace more details of DataX.")
    devEnvOptionGroup.add_option("-d", "--debug", dest="remoteDebug", action="store_true",
                                 help="Set to remote debug mode.")
    devEnvOptionGroup.add_option("--loglevel", metavar="<log level>", dest="loglevel", action="store",
                                 default="info", help="Set log level such as: debug, info, all etc.")
    parser.add_option_group(devEnvOptionGroup)
    return parser

def generateJobConfigTemplate(reader, writer):
    readerRef = "Please refer to the %s document:\n     https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n" % (reader,reader,reader)
    writerRef = "Please refer to the %s document:\n     https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n " % (writer,writer,writer)
    print(readerRef)
    print(writerRef)
    jobGuid = 'Please save the following configuration as a json file and  use\n     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json \nto run the job.\n'
    print(jobGuid)
    jobTemplate={
      "job": {
        "setting": {
          "speed": {
            "channel": ""
          }
        },
        "content": [
          {
            "reader": {},
            "writer": {}
          }
        ]
      }
    }
    readerTemplatePath = "%s/plugin/reader/%s/plugin_job_template.json" % (DATAX_HOME,reader)
    writerTemplatePath = "%s/plugin/writer/%s/plugin_job_template.json" % (DATAX_HOME,writer)
    try:
      readerPar = readPluginTemplate(readerTemplatePath);
    except Exception as e:
       print("Read reader[%s] template error: can\'t find file %s" % (reader,readerTemplatePath))
    try:
      writerPar = readPluginTemplate(writerTemplatePath);
    except Exception as e:
      print("Read writer[%s] template error: : can\'t find file %s" % (writer,writerTemplatePath))
    jobTemplate['job']['content'][0]['reader'] = readerPar;
    jobTemplate['job']['content'][0]['writer'] = writerPar;
    print(json.dumps(jobTemplate, indent=4, sort_keys=True))

def readPluginTemplate(plugin):
    with open(plugin, 'r') as f:
            return json.load(f)

def isUrl(path):
    if not path:
        return False

    assert (isinstance(path, str))
    m = re.match(r"^http[s]?://\S+\w*", path.lower())
    if m:
        return True
    else:
        return False


def buildStartCommand(options, args):
    commandMap = {}
    tempJVMCommand = DEFAULT_JVM
    if options.jvmParameters:
        tempJVMCommand = tempJVMCommand + " " + options.jvmParameters

    if options.remoteDebug:
        tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIG
        print('local ip: ', getLocalIp())

    if options.loglevel:
        tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options.loglevel))

    if options.mode:
        commandMap["mode"] = options.mode

    # jobResource 可能是 URL,也可能是本地文件路径(相对,绝对)
    jobResource = args[0]
    if not isUrl(jobResource):
        jobResource = os.path.abspath(jobResource)
        if jobResource.lower().startswith("file://"):
            jobResource = jobResource[len("file://"):]

    jobParams = ("-Dlog.file.name=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_'))
    if options.params:
        jobParams = jobParams + " " + options.params

    if options.jobid:
        commandMap["jobid"] = options.jobid

    commandMap["jvm"] = tempJVMCommand
    commandMap["params"] = jobParams
    commandMap["job"] = jobResource

    return Template(ENGINE_COMMAND).substitute(**commandMap)


def printCopyright():
    print('''
DataX (%s), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
''' % DATAX_VERSION)
    sys.stdout.flush()


if __name__ == "__main__":
    printCopyright()
    parser = getOptionParser()
    options, args = parser.parse_args(sys.argv[1:])
    if options.reader is not None and options.writer is not None:
        generateJobConfigTemplate(options.reader,options.writer)
        sys.exit(RET_STATE['OK'])
    if len(args) != 1:
        parser.print_help()
        sys.exit(RET_STATE['FAIL'])

    startCommand = buildStartCommand(options, args)
    # print startCommand

    child_process = subprocess.Popen(startCommand, shell=True)
    register_signal()
    (stdout, stderr) = child_process.communicate()

    sys.exit(child_process.returncode)

2、dxprof.py

#! /usr/bin/env python
# vim: set expandtab tabstop=4 shiftwidth=4 foldmethod=marker nu:

import re
import sys
import time

REG_SQL_WAKE = re.compile(r'Begin\s+to\s+read\s+record\s+by\s+Sql', re.IGNORECASE)
REG_SQL_DONE = re.compile(r'Finished\s+read\s+record\s+by\s+Sql', re.IGNORECASE)
REG_SQL_PATH = re.compile(r'from\s+(\w+)(\s+where|\s*$)', re.IGNORECASE)
REG_SQL_JDBC = re.compile(r'jdbcUrl:\s*\[(.+?)\]', re.IGNORECASE)
REG_SQL_UUID = re.compile(r'(\d+\-)+reader')
REG_COMMIT_UUID = re.compile(r'(\d+\-)+writer')
REG_COMMIT_WAKE = re.compile(r'begin\s+to\s+commit\s+blocks', re.IGNORECASE)
REG_COMMIT_DONE = re.compile(r'commit\s+blocks\s+ok', re.IGNORECASE)

# {{{ function parse_timestamp() #
def parse_timestamp(line):
    try:
        ts = int(time.mktime(time.strptime(line[0:19], '%Y-%m-%d %H:%M:%S')))
    except:
        ts = 0

    return ts

# }}} #

# {{{ function parse_query_host() #
def parse_query_host(line):
    ori = REG_SQL_JDBC.search(line)
    if (not ori):
        return ''

    ori = ori.group(1).split('?')[0]
    off = ori.find('@')
    if (off > -1):
        ori = ori[off+1:len(ori)]
    else:
        off = ori.find('//')
        if (off > -1):
            ori = ori[off+2:len(ori)]

    return ori.lower()
# }}} #

# {{{ function parse_query_table() #
def parse_query_table(line):
    ori = REG_SQL_PATH.search(line)
    return (ori and ori.group(1).lower()) or ''
# }}} #

# {{{ function parse_reader_task() #
def parse_task(fname):
    global LAST_SQL_UUID
    global LAST_COMMIT_UUID
    global DATAX_JOBDICT
    global DATAX_JOBDICT_COMMIT
    global UNIXTIME
    LAST_SQL_UUID = ''
    DATAX_JOBDICT = {}
    LAST_COMMIT_UUID = ''
    DATAX_JOBDICT_COMMIT = {}

    UNIXTIME = int(time.time())
    with open(fname, 'r') as f:
        for line in f.readlines():
            line = line.strip()

            if (LAST_SQL_UUID and (LAST_SQL_UUID in DATAX_JOBDICT)):
                DATAX_JOBDICT[LAST_SQL_UUID]['host'] = parse_query_host(line)
                LAST_SQL_UUID = ''

            if line.find('CommonRdbmsReader$Task') > 0:
                parse_read_task(line)
            elif line.find('commit blocks') > 0:
                parse_write_task(line)
            else:
                continue
# }}} #

# {{{ function parse_read_task() #
def parse_read_task(line):
    ser = REG_SQL_UUID.search(line)
    if not ser:
        return

    LAST_SQL_UUID = ser.group()
    if REG_SQL_WAKE.search(line):
        DATAX_JOBDICT[LAST_SQL_UUID] = {
            'stat' : 'R',
            'wake' : parse_timestamp(line),
            'done' : UNIXTIME,
            'host' : parse_query_host(line),
            'path' : parse_query_table(line)
        }
    elif ((LAST_SQL_UUID in DATAX_JOBDICT) and REG_SQL_DONE.search(line)):
        DATAX_JOBDICT[LAST_SQL_UUID]['stat'] = 'D'
        DATAX_JOBDICT[LAST_SQL_UUID]['done'] = parse_timestamp(line)
# }}} #

# {{{ function parse_write_task() #
def parse_write_task(line):
    ser = REG_COMMIT_UUID.search(line)
    if not ser:
        return

    LAST_COMMIT_UUID = ser.group()
    if REG_COMMIT_WAKE.search(line):
        DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID] = {
            'stat' : 'R',
            'wake' : parse_timestamp(line),
            'done' : UNIXTIME,
        }
    elif ((LAST_COMMIT_UUID in DATAX_JOBDICT_COMMIT) and REG_COMMIT_DONE.search(line)):
        DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID]['stat'] = 'D'
        DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID]['done'] = parse_timestamp(line)
# }}} #

# {{{ function result_analyse() #
def result_analyse():
    def compare(a, b):
        return b['cost'] - a['cost']

    tasklist = []
    hostsmap = {}
    statvars = {'sum' : 0, 'cnt' : 0, 'svr' : 0, 'max' : 0, 'min' : int(time.time())}
    tasklist_commit = []
    statvars_commit = {'sum' : 0, 'cnt' : 0}

    for idx in DATAX_JOBDICT:
        item = DATAX_JOBDICT[idx]
        item['uuid'] = idx;
        item['cost'] = item['done'] - item['wake']
        tasklist.append(item);

        if (not (item['host'] in hostsmap)):
            hostsmap[item['host']] = 1
            statvars['svr'] += 1

        if (item['cost'] > -1 and item['cost'] < 864000):
            statvars['sum'] += item['cost']
            statvars['cnt'] += 1
            statvars['max'] = max(statvars['max'], item['done'])
            statvars['min'] = min(statvars['min'], item['wake'])

    for idx in DATAX_JOBDICT_COMMIT:
        itemc = DATAX_JOBDICT_COMMIT[idx]
        itemc['uuid'] = idx
        itemc['cost'] = itemc['done'] - itemc['wake']
        tasklist_commit.append(itemc)

        if (itemc['cost'] > -1 and itemc['cost'] < 864000):
            statvars_commit['sum'] += itemc['cost']
            statvars_commit['cnt'] += 1

    ttl = (statvars['max'] - statvars['min']) or 1
    idx = float(statvars['cnt']) / (statvars['sum'] or ttl)

    tasklist.sort(compare)
    for item in tasklist:
        print('%s\t%s.%s\t%s\t%s\t% 4d\t% 2.1f%%\t% .2f' %(item['stat'], item['host'], item['path'],
                                                           time.strftime('%H:%M:%S', time.localtime(item['wake'])),
                                                           (('D' == item['stat']) and time.strftime('%H:%M:%S', time.localtime(item['done']))) or '--',
                                                           item['cost'], 100 * item['cost'] / ttl, idx * item['cost']))

    if (not len(tasklist) or not statvars['cnt']):
        return

    print('\n--- DataX Profiling Statistics ---')
    print('%d task(s) on %d server(s), Total elapsed %d second(s), %.2f second(s) per task in average' %(statvars['cnt'],
                                                                                                         statvars['svr'], statvars['sum'], float(statvars['sum']) / statvars['cnt']))
    print('Actually cost %d second(s) (%s - %s), task concurrency: %.2f, tilt index: %.2f' %(ttl,
                                                                                             time.strftime('%H:%M:%S', time.localtime(statvars['min'])),
                                                                                             time.strftime('%H:%M:%S', time.localtime(statvars['max'])),
                                                                                             float(statvars['sum']) / ttl, idx * tasklist[0]['cost']))

    idx_commit = float(statvars_commit['cnt']) / (statvars_commit['sum'] or ttl)
    tasklist_commit.sort(compare)
    print('%d task(s) done odps comit, Total elapsed %d second(s), %.2f second(s) per task in average, tilt index: %.2f' % (
        statvars_commit['cnt'],
        statvars_commit['sum'], float(statvars_commit['sum']) / statvars_commit['cnt'],
        idx_commit * tasklist_commit[0]['cost']))

# }}} #

if (len(sys.argv) < 2):
    print("Usage: %s filename" %(sys.argv[0]))
    quit(1)
else:
    parse_task(sys.argv[1])
    result_analyse()

3、perftrace.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-


"""
   Life's short, Python more.
"""

import re
import os
import sys
import json
import uuid
import signal
import time
import subprocess
from optparse import OptionParser
reload(sys)
sys.setdefaultencoding('utf8')

##begin cli & help logic
def getOptionParser():
    usage = getUsage()
    parser = OptionParser(usage = usage)
    #rdbms reader and writer
    parser.add_option('-r', '--reader', action='store', dest='reader', help='trace datasource read performance with specified !json! string')
    parser.add_option('-w', '--writer', action='store', dest='writer', help='trace datasource write performance with specified !json! string')

    parser.add_option('-c', '--channel',  action='store', dest='channel', default='1', help='the number of concurrent sync thread, the default is 1')
    parser.add_option('-f', '--file',   action='store', help='existing datax configuration file, include reader and writer params')
    parser.add_option('-t', '--type',   action='store', default='reader', help='trace which side\'s performance, cooperate with -f --file params, need to be reader or writer')
    parser.add_option('-d', '--delete',   action='store', default='true', help='delete temporary files, the default value is true')
    #parser.add_option('-h', '--help',   action='store', default='true', help='print usage information')
    return parser

def getUsage():
    return '''
The following params are available for -r --reader:
    [these params is for rdbms reader, used to trace rdbms read performance, it's like datax's key]
    *datasourceType: datasource type, may be mysql|drds|oracle|ads|sqlserver|postgresql|db2 etc...
    *jdbcUrl:        datasource jdbc connection string, mysql as a example: jdbc:mysql://ip:port/database
    *username:       username for datasource
    *password:       password for datasource
    *table:          table name for read data
    column:          column to be read, the default value is ['*']
    splitPk:         the splitPk column of rdbms table
    where:           limit the scope of the performance data set
    fetchSize:       how many rows to be fetched at each communicate
    [these params is for stream reader, used to trace rdbms write performance]
    reader-sliceRecordCount:  how man test data to mock(each channel), the default value is 10000
    reader-column          :  stream reader while generate test data(type supports: string|long|date|double|bool|bytes; support constant value and random function),demo: [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]
The following params are available for -w --writer:
    [these params is for rdbms writer, used to trace rdbms write performance, it's like datax's key]
    datasourceType:  datasource type, may be mysql|drds|oracle|ads|sqlserver|postgresql|db2|ads etc...
    *jdbcUrl:        datasource jdbc connection string, mysql as a example: jdbc:mysql://ip:port/database
    *username:       username for datasource
    *password:       password for datasource
    *table:          table name for write data
    column:          column to be writed, the default value is ['*']
    batchSize:       how many rows to be storeed at each communicate, the default value is 512
    preSql:          prepare sql to be executed before write data, the default value is ''
    postSql:         post sql to be executed end of write data, the default value is ''
    url:             required for ads, pattern is ip:port
    schme:           required for ads, ads database name
    [these params is for stream writer, used to trace rdbms read performance]
    writer-print:           true means print data read from source datasource, the default value is false
The following params are available global control:
    -c --channel:    the number of concurrent tasks, the default value is 1
    -f --file:       existing completely dataX configuration file path
    -t --type:       test read or write performance for a datasource, couble be reader or writer, in collaboration with -f --file
    -h --help:       print help message
some demo:
perftrace.py --channel=10 --reader='{"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/database", "username":"", "password":"", "table": "", "where":"", "splitPk":"", "writer-print":"false"}'
perftrace.py --channel=10 --writer='{"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/database", "username":"", "password":"", "table": "", "reader-sliceRecordCount": "10000", "reader-column": [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]}'
perftrace.py --file=/tmp/datax.job.json --type=reader --reader='{"writer-print": "false"}'
perftrace.py --file=/tmp/datax.job.json --type=writer --writer='{"reader-sliceRecordCount": "10000", "reader-column": [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]}'
some example jdbc url pattern, may help:
jdbc:oracle:thin:@ip:port:database
jdbc:mysql://ip:port/database
jdbc:sqlserver://ip:port;DatabaseName=database
jdbc:postgresql://ip:port/database
warn: ads url pattern is ip:port
warn: test write performance will write data into your table, you can use a temporary table just for test.
'''

def printCopyright():
    DATAX_VERSION = 'UNKNOWN_DATAX_VERSION'
    print('''
DataX Util Tools (%s), From Alibaba !
Copyright (C) 2010-2016, Alibaba Group. All Rights Reserved.''' % DATAX_VERSION)
    sys.stdout.flush()


def yesNoChoice():
    yes = set(['yes','y', 'ye', ''])
    no = set(['no','n'])
    choice = raw_input().lower()
    if choice in yes:
        return True
    elif choice in no:
        return False
    else:
        sys.stdout.write("Please respond with 'yes' or 'no'")
##end cli & help logic


##begin process logic
def suicide(signum, e):
    global childProcess
    print >> sys.stderr, "[Error] Receive unexpected signal %d, starts to suicide." % (signum)
    if childProcess:
        childProcess.send_signal(signal.SIGQUIT)
        time.sleep(1)
        childProcess.kill()
    print >> sys.stderr, "DataX Process was killed ! you did ?"
    sys.exit(-1)


def registerSignal():
    global childProcess
    signal.signal(2, suicide)
    signal.signal(3, suicide)
    signal.signal(15, suicide)


def fork(command, isShell=False):
    global childProcess
    childProcess = subprocess.Popen(command, shell = isShell)
    registerSignal()
    (stdout, stderr) = childProcess.communicate()
    #阻塞直到子进程结束
    childProcess.wait()
    return childProcess.returncode
##end process logic


##begin datax json generate logic
#warn: if not '': -> true;   if not None: -> true
def notNone(obj, context):
    if not obj:
        raise Exception("Configuration property [%s] could not be blank!" % (context))

def attributeNotNone(obj, attributes):
    for key in attributes:
        notNone(obj.get(key), key)

def isBlank(value):
    if value is None or len(value.strip()) == 0:
        return True
    return False

def parsePluginName(jdbcUrl, pluginType):
    import re
    #warn: drds
    name = 'pluginName'
    mysqlRegex = re.compile('jdbc:(mysql)://.*')
    if (mysqlRegex.match(jdbcUrl)):
        name = 'mysql'
    postgresqlRegex = re.compile('jdbc:(postgresql)://.*')
    if (postgresqlRegex.match(jdbcUrl)):
        name = 'postgresql'
    oracleRegex = re.compile('jdbc:(oracle):.*')
    if (oracleRegex.match(jdbcUrl)):
        name = 'oracle'
    sqlserverRegex = re.compile('jdbc:(sqlserver)://.*')
    if (sqlserverRegex.match(jdbcUrl)):
        name = 'sqlserver'
    db2Regex = re.compile('jdbc:(db2)://.*')
    if (db2Regex.match(jdbcUrl)):
        name = 'db2'
    return "%s%s" % (name, pluginType)

def renderDataXJson(paramsDict, readerOrWriter = 'reader', channel = 1):
    dataxTemplate = {
        "job": {
            "setting": {
                "speed": {
                    "channel": 1
                }
            },
            "content": [
                {
                    "reader": {
                        "name": "",
                        "parameter": {
                            "username": "",
                            "password": "",
                            "sliceRecordCount": "10000",
                            "column": [
                                "*"
                            ],
                            "connection": [
                                {
                                    "table": [],
                                    "jdbcUrl": []
                                }
                            ]
                        }
                    },
                    "writer": {
                        "name": "",
                        "parameter": {
                            "print": "false",
                            "connection": [
                                {
                                    "table": [],
                                    "jdbcUrl": ''
                                }
                            ]
                        }
                    }
                }
            ]
        }
    }
    dataxTemplate['job']['setting']['speed']['channel'] = channel
    dataxTemplateContent = dataxTemplate['job']['content'][0]

    pluginName = ''
    if paramsDict.get('datasourceType'):
        pluginName = '%s%s' % (paramsDict['datasourceType'], readerOrWriter)
    elif paramsDict.get('jdbcUrl'):
        pluginName = parsePluginName(paramsDict['jdbcUrl'], readerOrWriter)
    elif paramsDict.get('url'):
        pluginName = 'adswriter'

    theOtherSide = 'writer' if readerOrWriter == 'reader' else 'reader'
    dataxPluginParamsContent = dataxTemplateContent.get(readerOrWriter).get('parameter')
    dataxPluginParamsContent.update(paramsDict)

    dataxPluginParamsContentOtherSide = dataxTemplateContent.get(theOtherSide).get('parameter')

    if readerOrWriter == 'reader':
        dataxTemplateContent.get('reader')['name'] = pluginName
        dataxTemplateContent.get('writer')['name'] = 'streamwriter'
        if paramsDict.get('writer-print'):
            dataxPluginParamsContentOtherSide['print'] = paramsDict['writer-print']
            del dataxPluginParamsContent['writer-print']
        del dataxPluginParamsContentOtherSide['connection']
    if readerOrWriter == 'writer':
        dataxTemplateContent.get('reader')['name'] = 'streamreader'
        dataxTemplateContent.get('writer')['name'] = pluginName
        if paramsDict.get('reader-column'):
            dataxPluginParamsContentOtherSide['column'] = paramsDict['reader-column']
            del dataxPluginParamsContent['reader-column']
        if paramsDict.get('reader-sliceRecordCount'):
            dataxPluginParamsContentOtherSide['sliceRecordCount'] = paramsDict['reader-sliceRecordCount']
            del dataxPluginParamsContent['reader-sliceRecordCount']
        del dataxPluginParamsContentOtherSide['connection']

    if paramsDict.get('jdbcUrl'):
        if readerOrWriter == 'reader':
            dataxPluginParamsContent['connection'][0]['jdbcUrl'].append(paramsDict['jdbcUrl'])
        else:
            dataxPluginParamsContent['connection'][0]['jdbcUrl'] = paramsDict['jdbcUrl']
    if paramsDict.get('table'):
        dataxPluginParamsContent['connection'][0]['table'].append(paramsDict['table'])


    traceJobJson = json.dumps(dataxTemplate, indent = 4)
    return traceJobJson

def isUrl(path):
    if not path:
        return False
    if not isinstance(path, str):
        raise Exception('Configuration file path required for the string, you configure is:%s' % path)
    m = re.match(r"^http[s]?://\S+\w*", path.lower())
    if m:
        return True
    else:
        return False


def readJobJsonFromLocal(jobConfigPath):
    jobConfigContent = None
    jobConfigPath = os.path.abspath(jobConfigPath)
    file = open(jobConfigPath)
    try:
        jobConfigContent = file.read()
    finally:
        file.close()
    if not jobConfigContent:
        raise Exception("Your job configuration file read the result is empty, please check the configuration is legal, path: [%s]\nconfiguration:\n%s" % (jobConfigPath, str(jobConfigContent)))
    return jobConfigContent


def readJobJsonFromRemote(jobConfigPath):
    import urllib
    conn = urllib.urlopen(jobConfigPath)
    jobJson = conn.read()
    return jobJson

def parseJson(strConfig, context):
    try:
        return json.loads(strConfig)
    except Exception as e:
        import traceback
        traceback.print_exc()
        sys.stdout.flush()
        print >> sys.stderr, '%s %s need in line with json syntax' % (context, strConfig)
        sys.exit(-1)

def convert(options, args):
    traceJobJson = ''
    if options.file:
        if isUrl(options.file):
            traceJobJson = readJobJsonFromRemote(options.file)
        else:
            traceJobJson = readJobJsonFromLocal(options.file)
        traceJobDict = parseJson(traceJobJson, '%s content' % options.file)
        attributeNotNone(traceJobDict, ['job'])
        attributeNotNone(traceJobDict['job'], ['content'])
        attributeNotNone(traceJobDict['job']['content'][0], ['reader', 'writer'])
        attributeNotNone(traceJobDict['job']['content'][0]['reader'], ['name', 'parameter'])
        attributeNotNone(traceJobDict['job']['content'][0]['writer'], ['name', 'parameter'])
        if options.type == 'reader':
            traceJobDict['job']['content'][0]['writer']['name'] = 'streamwriter'
            if options.reader:
                traceReaderDict = parseJson(options.reader, 'reader config')
                if traceReaderDict.get('writer-print') is not None:
                    traceJobDict['job']['content'][0]['writer']['parameter']['print'] = traceReaderDict.get('writer-print')
                else:
                    traceJobDict['job']['content'][0]['writer']['parameter']['print'] = 'false'
            else:
                traceJobDict['job']['content'][0]['writer']['parameter']['print'] = 'false'
        elif options.type == 'writer':
            traceJobDict['job']['content'][0]['reader']['name'] = 'streamreader'
            if options.writer:
                traceWriterDict = parseJson(options.writer, 'writer config')
                if traceWriterDict.get('reader-column'):
                    traceJobDict['job']['content'][0]['reader']['parameter']['column'] = traceWriterDict['reader-column']
                if traceWriterDict.get('reader-sliceRecordCount'):
                    traceJobDict['job']['content'][0]['reader']['parameter']['sliceRecordCount'] = traceWriterDict['reader-sliceRecordCount']
            else:
                columnSize = len(traceJobDict['job']['content'][0]['writer']['parameter']['column'])
                streamReaderColumn = []
                for i in range(columnSize):
                    streamReaderColumn.append({"type": "long", "random": "2,10"})
                traceJobDict['job']['content'][0]['reader']['parameter']['column'] = streamReaderColumn
                traceJobDict['job']['content'][0]['reader']['parameter']['sliceRecordCount'] = 10000
        else:
            pass#do nothing
        return json.dumps(traceJobDict, indent = 4)
    elif options.reader:
        traceReaderDict = parseJson(options.reader, 'reader config')
        return renderDataXJson(traceReaderDict, 'reader', options.channel)
    elif options.writer:
        traceWriterDict = parseJson(options.writer, 'writer config')
        return renderDataXJson(traceWriterDict, 'writer', options.channel)
    else:
        print(getUsage())
        sys.exit(-1)
    #dataxParams = {}
    #for opt, value in options.__dict__.items():
    #    dataxParams[opt] = value
##end datax json generate logic


if __name__ == "__main__":
    printCopyright()
    parser = getOptionParser()

    options, args = parser.parse_args(sys.argv[1:])
    #print options, args
    dataxTraceJobJson = convert(options, args)

    #由MAC地址、当前时间戳、随机数生成,可以保证全球范围内的唯一性
    dataxJobPath = os.path.join(os.getcwd(), "perftrace-" + str(uuid.uuid1()))
    jobConfigOk = True
    if os.path.exists(dataxJobPath):
        print("file already exists, truncate and rewrite it? %s" % dataxJobPath)
        if yesNoChoice():
            jobConfigOk = True
        else:
            print("exit failed, because of file conflict")
            sys.exit(-1)
    fileWriter = open(dataxJobPath, 'w')
    fileWriter.write(dataxTraceJobJson)
    fileWriter.close()


    print("trace environments:")
    print("dataxJobPath:  %s" % dataxJobPath)
    dataxHomePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
    print("dataxHomePath: %s" % dataxHomePath)

    dataxCommand = "%s %s" % (os.path.join(dataxHomePath, "bin", "datax.py"), dataxJobPath)
    print("dataxCommand:  %s" % dataxCommand)

    returncode = fork(dataxCommand, True)
    if options.delete == 'true':
        os.remove(dataxJobPath)
    sys.exit(returncode)

ok,把这三个文件给换成python3版本,就可以正常使用了。

相关文章:

  • Android Dagger 2 框架的注解模块源码分析(一)
  • 【QT】-解析打包json
  • Java 8 + Tomcat 9.0.102 的稳定环境搭建方案,适用于生产环境
  • Linux centos 7 grub引导故障恢复
  • celery入门
  • 探讨Deveco Studio常见问题及解决方案,分享快速排障方法
  • Android Framework 之了解系统启动流程一
  • Shp文件转坐标并导出到Excel和JSON(arcMap + excel)
  • NCCL如何打印XML拓扑文件,操作说明和源码展示
  • VIC水文模型
  • 打靶练习-W1R3S、JARBAS、SickOS、Prime
  • 【每日学点HarmonyOS Next知识】tab对齐、相对布局、自定义弹窗全屏、动画集合、回到桌面
  • 【Unity网络同步框架 - Nakama研究(二)】
  • 基于NXP+FPGA永磁同步电机牵引控制单元(单板结构/机箱结构)
  • Simulink指导手册笔记②--快捷键及基本操作
  • C51 Proteus仿真实验17:数码管显示4×4键盘矩阵按键
  • CesiumforUE中Cesium3DTileset中高频使用的组件概述
  • 【最新】 ubuntu24安装 1panel 保姆级教程
  • Flutter PopScope对于iOS设置canPop为false无效问题
  • 网络安全信息收集[web子目录]:dirsearch子目录爆破全攻略以及爆破字典结合
  • 沙县小吃中东首店在沙特首都利雅得开业,首天营业额超5万元
  • 从600名外到跻身大满贯,孙发京:走过的路成就了现在的我
  • 上海浦东机场1号、2号航站楼均推出国内出发安检24小时服务
  • 全球医药股普跌,A股创新药板块下挫
  • 德国将不再公布对乌克兰军事支持的细节
  • 非洲雕刻艺术有着怎样的“变形之美”