Follow Slashdot blog updates by subscribing to our blog RSS feed

 



Forgot your password?
typodupeerror
×
User Journal

Journal KWTm's Journal: processing GnuCash files (XML): a Python script 3

I was asked for the Python code I wrote to process GnuCash files. I figured I may as well post it publicly so that more than one person can benefit from the work I put in. I cleaned it up a bit, but if you have any questions, please contact me. My program here is called GnuCash_Fix.

As mentioned in the comments: When GnuCash imports QFX/OFX/QIF files (say, for a bank account or credit card account), each transaction is between 2 accounts but GnuCash can only identify 1 of the accounts, that of the bank account/credit card account in question. If the QFX/OFX/QIF file is imported without further intervention, the other account is marked as "Unbalanced" and is assigned to this account (for example, "Unbalanced-USD" if in US Dollars) after doing Main Menu > Actions > Check & Repair > All transactions.

This program tries to identify the other account for each transaction (e.g. "groceries" or "salary") depending on the description of the transaction and optionally the range of the amount. It also finds two complementary unbalanced transactions and matches them together. E.g.
# transaction #1 on 2003-06-21, for $123.45 from Account A to ...somewhere unknown!!!; and then
# transaction #2 on 2003-06-21, for $123.45 from somewhere unknown ... to Account B.
Gee, I wonder what could have happened with those two seemingly unrelated transactions???? (duhhh...)

Works for GnuCash v2.2.6, and also for earlier versions that I used (I can't tell which versions now, but at least one of them was a v1.x.x version).

The main tricky thing I found out from working with GnuCash files is that the tags contain the colon character (':') which Python's XML libraries don't like, so I used "sed" to convert them to double underscores ("__")

In addition to depending on some publicly available software such as "sed" and various Python libraries, it also depends on two Python modules of my own creation. I will post these in subsequent entries. It's easy to rewrite the code so it does not have these dependencies, but if you wanted to make sure the code was working as-is, you'll need these modules. One module is just a debugging script (decide whether to display debugging messages depending on a pre-set "debugging level of detail), and the other is just a personalized version of "getopt". (In fact, I think you can just replace it with the official Python "getopt" --but the version with GNU compatibility.)

Oh, it also depends on a pre-defined file containing parameters that allow this GnuCash_Fix program to identify and classify the transactions. This is called descr_acct.py, and I will post it in a subsequent entry.

#!/usr/bin/env python
#
# Copyright KWTm 2006-2009
 
# released under GPL v3. See http://www.gnu.org/copyleft/gpl.html
# Those who don't like this license may contact me for other licensing options, such as my special I-Don't-Like-GPL-And-Want-To-Give-You-Lots-Of-Money license.
 
#
# When GnuCash imports QFX/OFX/QIF files (say, for a bank account or credit card account), each transaction is between 2 accounts but GnuCash can only identify 1 of the accounts, that of the bank account/credit card account in question. If the QFX/OFX/QIF file is imported without further intervention, the other account is marked as "Unbalanced" and is assigned to this account (for example, "Unbalanced-USD" if in US Dollars) after doing Main Menu > Actions > Check & Repair > All transactions.
 
# This program tries to identify the other account for each transaction (e.g. "groceries" or "salary") depending on the description of the transaction and optionally the range of the amount.
#
# Also finds two complementary unbalanced transactions and matches them together. E.g.
# transaction #1 on 2003-06-21, for $123.45 from Account A to ...somewhere unknown!!!; and then
# transaction #2 on 2003-06-21, for $123.45 from somewhere unknown ... to Account B.
# Gee, I wonder what could have happened with those two seemingly unrelated transactions???? (duhhh...)
 
# Works for GnuCash v2.2.6, and also for earlier versions that I used (I can't tell which versions now, but at least one of them was a v1.x.x version).
 
# Dependencies:
# standard Python libraries, especially xml.dom.ext and xml.dom.minidom
# my own modules: kwdebug, kwgetopt (but this script can be easily rewritten to eliminate these dependences)
# standard GNU/Linux command-line utilities: gzip, sed
 
# The Python xml.dom.minidom module doesn't like colons as part of the XML tag name, so we need to s/:/__/ or something.
 
"""
I'll set up a naming convention:
 
_Ac = instance of Account class
_Class = a class
_Cr = instance of Criterion class
_Gn = instance of Gnucash class
_Tr = instance of Transaction class
_Ts = instance of Timestamp class
_Xn = xml_node
 
"""
 
import solitime
import sys
sys.path.append('/usr/lib/python%s/site-packages/oldxml' % sys.version[:3])
# The above 2 lines are necessary because Ubuntu 8.04 moved the python-xml package, breaking old code. Stupid.
import xml.dom.minidom
import xml.dom.ext
 
import kwdebug
MyDebug = kwdebug.Debug(0)
 
class Global_values_Class:
 
    import kwgetopt
    cmd_line_opts = kwgetopt.kwgetopt("a:d:i:o:")
    # Remember: the colon after the option letters mean that there is an argument after those options
    # -a = account containing unbalanced transactions
    # -d = description / account dictionary file
    # -i = input file
    # -o = output file
    #imbal_acct_re = cmd_line_opts.getopt("-a", r"(?i)imbalance.usd") # The other one is "Unspecified" --make sure it doesn't accidentally refer to "Cu Cdn unspecified", by capitalizing "Unspecified")
 
    descr_acct_fname = cmd_line_opts.getopt("-d", "descr_acct.py")
    input_fname = cmd_line_opts.getopt("-i", "gnucash.xml")
    output_fname = cmd_line_opts.getopt("-o", "gnucash-out.xml")
 
    input_unzipped_fname = input_fname + "_unzipped"
    input_unzipped_nocolons_fname = input_unzipped_fname + "_nocolons"
    output_unzipped_fname = output_fname + "_unzipped"
    output_unzipped_nocolons_fname = output_unzipped_fname + "_nocolons"
 
    preamble=''' GnuCashFIX (with transaction description notes, pair matching and RegExp lookup; updated 954JHq)
: This matches corresponding transactions in Gnucash from a designated account.
: It then takes all the transactions in that account
: and moves them to the appropriate accounts based on the description of
: the transaction.
: (-i) GnuCash input: %s
: (-o) GnuCash output: %s
: (-d) Description -> Account matchfile: %s
: (-a) Account to be fixed will be 'Imbalance-USD'
''' % (input_fname, output_fname, descr_acct_fname )
 
    def __init__(self):
        import os.path
        print(self.preamble)
 
        if os.path.abspath(self.input_fname) == os.path.abspath(self.output_fname) :
            print("\n! *Warning*: output will overwrite the input file %s" % self.input_fname)
        else :
            print(": To force output file to overwrite the input file, use the command-line option '-o %s'" % self.input_fname)
 
        try:
            raw_input("> ENTER to continue, or Ctrl-C to abort")
        except KeyboardInterrupt :
            print("\n! Interrupted by keyboard")
            raise SystemExit
 
Global = Global_values_Class()
 
class GnuCashXML_Class:
 
    # I'll make it so that only this class handles XML, and any other classes do not need to access the XML document at all
 
    acct_denominator_tagname = "act__commodity-scu"
    acct_description_tagname = "act__description"
    acct_id_tagname = "act__id"
    acct_name_tagname = "act__name" # Note that it's abbreviated "act", not "acct", in the GnuCash file
    gnc_acct_tagname = "gnc__account"
    #imbal_acct_re = r"(?i)unspecified"
    #imbal_acct_re = r"Uchq" # for testing only
    split_acct_tagname = "split__account"
    #split_id_tagname = "split__id" # This is not used and may be confused with split_acct_tagname
    split_quantity_tagname = "split__quantity"
    split_value_tagname = "split__value"
    transaction_tagname = "gnc__transaction"
    trn_dateposted_tagname = "trn__date-posted"
    trn_description_tagname = "trn__description"
    trn_id_tagname = "trn__id"
    ts_date_tagname = "ts__date"
 
    # To experiment with XML document in Python IDE, use xml.dom.minidom.parse("/home/kwtm1/finances/gnucash-in.conv")
 
    def __init__(self, in_fname = None):
 
        if None != in_fname:
            MyDebug.debug_msg(": Now converting colons")
            conv_fname = Global.input_unzipped_nocolons_fname
            self.colon_to_double_underscore( in_fname, conv_fname )
            MyDebug.debug_msg(": Now importing %s" % conv_fname )
            self.xmldoc = xml.dom.minidom.parse( conv_fname )
        else:
            self.xmldoc = xml.dom.minidom.Document()
 
        self.acct_dict_AcDict = AccountDict_Class({})
 
    def colon_to_double_underscore(self, in_fname, out_fname):
        conv_cmd = "sed -e 's/:/__/g' %s >%s"
        import os
        os.system( conv_cmd % (in_fname, out_fname) )
 
    def double_underscore_to_colon(self, in_fname, out_fname):
        conv_cmd = "sed -e 's/__/:/g' %s >%s"
        import os
        os.system( conv_cmd % (in_fname, out_fname) )
 
    def convert_acct_xml(self, ac):
        # See convert_trans_xml to see how ridiculously nested the XML is.
        ac_name_contentsnode_val = ac.getElementsByTagName(self.acct_name_tagname)[0].childNodes[0].nodeValue
        ac_id_contentsnode_val = ac.getElementsByTagName(self.acct_id_tagname)[0].childNodes[0].nodeValue
        return Account_Class( ac_id_contentsnode_val, ac_name_contentsnode_val, ac)
 
    def convert_trans_xml(self, tr_Tr):
        # Convert from the transactions themselves back to XML
 
        tr_Tr.node.getElementsByTagName(self.trn_dateposted_tagname)[0].getElementsByTagName(self.ts_date_tagname)[0].childNodes[0].nodeValue = tr_Tr.date
 
        tr_Tr.node.getElementsByTagName(self.trn_id_tagname)[0].childNodes[0].nodeValue = tr_Tr.trn_id
 
        try:
            tr_Tr.node.getElementsByTagName(self.trn_description_tagname)[0].childNodes[0].nodeValue = tr_Tr.description
        except IndexError:
            MyDebug.debug_msg( "Bloody transaction %s had no description so we can't modify that." % tr_Tr ,2)
 
        tr_acctids_list = tr_Tr.node.getElementsByTagName(self.split_acct_tagname) # NOT the self.split_id_tagname which is the id of the split itself, not the accounts involved in the transaction
        tr_values_list = tr_Tr.node.getElementsByTagName(self.split_value_tagname)
        tr_quantities_list = tr_Tr.node.getElementsByTagName(self.split_quantity_tagname) # In all GnuCash transactions I've seen, value = quantity, so we can get the value from either one; but when we modify, then we must update both elements
        tr_num_accts_between = len(tr_acctids_list)
 
        if ( tr_Tr.num_accts_between != tr_num_accts_between ):
            MyDebug.debug_msg( "Funny... I thought transaction %s was between exactly %s accounts, but the XML says it's between %s accounts." % (tr_Tr, tr_Tr.num_accts, tr_num_accts_between ),1)
        else:
 
            if ( tr_num_accts_between >= 1):
                tr_acctids_list[0].childNodes[0].nodeValue = tr_Tr.acct1_Ac.acctid
                tr_values_list[0].childNodes[0].nodeValue = self.derive_valuetext(tr_Tr.value)
                tr_quantities_list[0].childNodes[0].nodeValue= self.derive_valuetext(tr_Tr.value)
 
            if ( tr_num_accts_between >= 2):
                tr_acctids_list[1].childNodes[0].nodeValue = tr_Tr.acct2_Ac.acctid
                tr_values_list[1].childNodes[0].nodeValue = self.derive_valuetext( -tr_Tr.value) # The second value is negative to balance the first.
                tr_quantities_list[1].childNodes[0].nodeValue= self.derive_valuetext( -tr_Tr.value) # The second value is negative to balance the first.
 
    def convert_xml_trans(self, tr):
 
        """
        # The XML is nested to a ridiculous degree. To get the date-posted of the transaction, we would have to do:
        tr_dateposted_list = tr.getElementsByTagName(self.trn_dateposted_tagname)
        tr_dateposted = tr_dateposted_list[0]
        tr_dateposted_date_list = tr_dateposted.getElementsByTagName(self.ts_date_tagname)
        tr_dateposted_date = tr_dateposted_date_list[0]
        tr_dateposted_date_contentsnode_list = tr_dateposted_date.childNodes
        tr_dateposted_date_contentsnode = tr_dateposted_date_contentsnode_list[0]
        tr_dateposted_date_contentsnode_val = tr_dateposted_date_contentsnode.nodeValue
        """
 
        tr_dateposted_date_contentsnode_val = tr.getElementsByTagName(self.trn_dateposted_tagname)[0].getElementsByTagName(self.ts_date_tagname)[0].childNodes[0].nodeValue
        #dateposted = self.extract_date(tr_dateposted_date_contentsnode_val)
        #MyDebug.debug_msg( "Extracted date %s" % dateposted,4)
        # No. There is no need to interpret the date ... yet. Just store the date string as is. When we have to compare, only then do we need to interpret the date string stored in a Transaction.
 
        tr_id_contentsnode_val = tr.getElementsByTagName(self.trn_id_tagname)[0].childNodes[0].nodeValue
 
        try:
            tr_description_contentsnode_val = tr.getElementsByTagName(self.trn_description_tagname)[0].childNodes[0].nodeValue
        except:
            tr_description_contentsnode_val = ""
            MyDebug.debug_msg( "Bloody transaction %s has no description!?" % tr_id_contentsnode_val ,2)
 
        tr_acctids_list = tr.getElementsByTagName(self.split_acct_tagname) # NOT the self.split_id_tagname which is the id of the split itself, not the accounts involved in the transaction
        tr_num_accts_between = len(tr_acctids_list)
 
        if ( 2 != tr_num_accts_between ):
            MyDebug.debug_msg( "Funny... transaction %s (%s) is not between exactly 2 accounts, but with %s accounts." % (tr_id_contentsnode_val, tr_description_contentsnode_val, tr_num_accts_between ),4)
 
        if ( tr_num_accts_between >= 1):
            tr_acctid1_contentsnode_val = tr_acctids_list[0].childNodes[0].nodeValue
            acct1_Ac = self.acct_dict_AcDict.get(tr_acctid1_contentsnode_val, None) # Get the account, or return None if no such account
        else:
            acct1_Ac = None
 
        if ( tr_num_accts_between >= 2):
            tr_acctid2_contentsnode_val = tr_acctids_list[1].childNodes[0].nodeValue
            acct2_Ac = self.acct_dict_AcDict.get(tr_acctid2_contentsnode_val, None)
        else:
            acct2_Ac = None
 
        tr_splitvalue_contentsnode_val = tr.getElementsByTagName(self.split_value_tagname)[0].childNodes[0].nodeValue
        value = self.extract_value(tr_splitvalue_contentsnode_val)
 
        # Now to create an instance of Transaction_Class based on the values above
        return Transaction_Class(tr, acct1_Ac, acct2_Ac, tr_dateposted_date_contentsnode_val, tr_description_contentsnode_val, tr_num_accts_between, tr_id_contentsnode_val, value)
 
    def derive_valuetext(self, value, denominator=100):
        return "%d/%d" % (value*denominator, denominator)
 
    def extract_value(self, text):
        # The value text is something like "12345/100" which means "$123.45", but we have to divide numerator by denominator.
 
        try:
            return eval( text.strip() + ".0")
            # If we don't add ".0", then we will be doing integer arithmetic
        except:
            # Okay, just evaluating didn't work, so now we actually have to decode the thing
 
            import re
            value_re = "(\D|^)(?P<numerator>\d+)/(?P<denominator>\d+)(\D|$)"
            match_result = re.search( value_re, text)
            MyDebug.debug_msg("Extracting value from %s" % text,5)
            if match_result == None:
                return None
 
            numerator = float( match_result.groupdict("0").get( "numerator" ) )
            # Get the text that matches the groupname "numerator" in the regular expression (if can't find it, default to "0") and convert it to a float(ing point number) in preparation for calculating the value of this transaction.
            denominator = float( match_result.groupdict("100").get( "denominator" ) )
 
            MyDebug.debug_msg("Value is %s over %s" % (numerator,denominator),5)
            return (numerator/denominator)
 
    def get_acct_dict(self):
        MyDebug.debug_msg("Creating dict of all accounts", 1)
        acct_dict = AccountDict_Class({})
        acct_list = self.xmldoc.getElementsByTagName(self.gnc_acct_tagname)
        for acct in acct_list:
            acct_dict.add_account( self.convert_acct_xml(acct) )
 
        self.acct_dict_AcDict = acct_dict
 
    def get_all_trans_list(self):
        MyDebug.debug_msg("Creating list of all transactions", 1)
        trn_list = self.xmldoc.getElementsByTagName(self.transaction_tagname)
        all_trans_list = Trans_list_Class([])
        for trn in trn_list:
            all_trans_list.append( self.convert_xml_trans(trn) )
        return all_trans_list
 
    def get_unbal_trans_list(self):
        raise NotImplementedError, "Hey, this function is not going to be used."
        #stub
        return []
 
    def modify_original_trans(self, trans_list_TrList):
        for trans_Tr in trans_list_TrList:
            if trans_Tr.need_to_delete_original():
                MyDebug.debug_msg("I would delete %s." % (trans_Tr) ,4)
                trans_Tr.node.parentNode.removeChild(trans_Tr.node)
                trans_Tr.node.unlink()
            elif trans_Tr.need_to_update_original():
                MyDebug.debug_msg("I would update %s." % (trans_Tr) ,4)
                self.convert_trans_xml(trans_Tr)
            else:
                MyDebug.debug_msg("No changes in %s." % (trans_Tr) ,4)
        #stub
 
    def save_changes(self):
        #stub
        pass
 
    def write_to_file(self, out_fname):
        conv_fname = Global.output_unzipped_nocolons_fname
        MyDebug.debug_msg("Writing to file %s" % conv_fname ,1)
        xml.dom.ext.PrettyPrint(self.xmldoc, open(conv_fname, "wb"))
        MyDebug.debug_msg("Converting to %s" % out_fname ,1)
        self.double_underscore_to_colon(conv_fname, out_fname)
 
class Account_Class():
 
    acctid = None
    acctname = None
    acctnode = None
 
    def __init__(self, acctid=None, acctname=None, acctnode=None):
        self.acctid = acctid
        self.acctname = acctname
        self.acctnode = acctnode
 
    def __eq__(self, other):
        if type(other) == type(None):
            return False
 
        if type(other) != type(self):
            raise TypeError, "%s needs to be of type %s" % (other, type(self))
 
        return self.acctid == other.acctid
 
    def __ne__(self, other):
        return not (self == other)
 
    def __repr__(self):
        return "Account %s, which is named %s and resides in node %s" % (self.acctid, self.acctname, self.acctnode)
 
class AccountDict_Class(dict):
    # The dictionary key shall be the account id; the value corresponding to that key shall be an instance of Account_Class that has info about that account.
 
    def __init__(self, *other_args):
        dict.__init__(self, *other_args)
 
    def add_account(self, account):
        self[ account.acctid ] = account
 
    def lookup_by_re(self, name_re):
        # Yes, partial matches do match (ie. if name_re is "bcd" it will find account "abcde")
        import re
        for acct in self.values():
            match_result = re.search( name_re, acct.acctname )
            if match_result != None:
                break
 
        if match_result == None:
            # We didn't find it. We got here because the loop ended, not because we found it and broke out of the loop
            acct=None
 
        return acct
 
class Criteria_Class:
 
    def __init__(self, criteria):
        import re
 
        self.re_c = None
        self.lower_lim = None
        self.upper_lim = None
 
        re_text = "^$" # Initialize to a regular expression that represents the null string
        if str == type(criteria):
            re_text = criteria
        elif tuple == type(criteria) or list == type(criteria):
            if 3 <= len(criteria):
                (re_text, self.lower_lim, self.upper_lim) = tuple(criteria)[:3]
            elif 2 == len(criteria):
                (re_text, self.lower_lim) = tuple(criteria)[:2]
                self.upper_lim = self.lower_lim
            elif 1 == len(criteria):
                re_text = criteria[0]
            # If 0 == len(criteria_text), then we can just use the default values, so we don't have to do anything.
 
        self.re_c = re.compile(re_text)
 
    def __repr__(self):
        return "<criteria: re_c %s, lower_lim %s, upper_lim %s>" % (self.re_c, self.lower_lim, self.upper_lim)
 
    def fulfills_limit_criteria(self, trn_Tr):
 
        # Based on the description of the transaction, such as "MEGA FUEL",
        # as well as upper/lower limits of the transaction amount,
        # figure out whether a given transaction fulfills the criteria
 
        # The order of these tests matter as we go through trying to decide whether the transaction fulfills criteria
 
        #MyDebug.debug_msg("crit.re_c is %s" % self.re_c,3)
        # First check that there's a valid description in the criterion
        if None == self.re_c:
            return False
 
        # Now see if the criterion description matches that of the transaction
 
        if None == self.re_c.search(trn_Tr.description):
            MyDebug.msg_no_cr("N",4)
            return False
        MyDebug.debug_msg("crit matches %s" % trn_Tr.description,4)
        # Now see if there's a valid upper limit (if there's just a lower limit, the upper limit should already have been set to equal the lower limit)
        if None == self.upper_lim:
            MyDebug.msg_no_cr("-",4)
            return True
        elif trn_Tr.value > self.upper_lim:
            MyDebug.msg_no_cr(">",4)
            return False
        elif trn_Tr.value < self.lower_lim:
            MyDebug.msg_no_cr("<",4)
            return False
        else:
            MyDebug.msg_no_cr("=",4)
            return True
 
class Criteria_list_Class(list):
    # contains these properties:
    # crit_dict = the compiled version of the critera. Key = compiled regexp; Value = the account
    # text_dict = the text of the criteria, in a dictionary, read from file
 
    def __init__(self, descr_fname=None, account_dict=None, *other_args):
        list.__init__(self, *other_args)
 
        if descr_fname != None:
            print(": Reading account description from file %s" % descr_fname)
            try:
                self.text_list = self.eval_py_expr_file(descr_fname)
            except TypeError, errmsg:
                print("\n! ERROR: %s" % errmsg)
                print("! Looks like account description file %s is not a correct Python expression. Most likely there is a comma missing at the end of a line." % Global.descr_acct_fname)
                print("! Try looking for the regexp '^[^#]*\)[^,]*($|#)'")
                print("! (that is, a line where a comma does not occur between the closing parenthesis and the end of line/start of comment)")
                raise SystemExit
            except SyntaxError, errmsg:
                print("\n! ERROR: %s" % errmsg)
                print("! Account description file %s is not a correct Python expression. This is not just a comma missing at the end of a line; some extra character got inserted or something." % Global.descr_acct_fname)
                print("! Try deleting parts of the file and retrying, to narrow down where the problem might be.")
                raise SystemExit
 
        else:
            self.text_list = []
 
        if account_dict != None:
            self.acct_dict = account_dict
        else:
            self.acct_dict = AccountDict_Class({})
 
        MyDebug.debug_msg(": initialized crit list" ,2)
        self.initialize_crit_list()
 
    def eval_py_expr_file(self, fname):
        file = open(fname, 'rb')
        py_expr = file.read()
        return eval(py_expr)
 
    def initialize_crit_list(self):
        import re
 
        try:
            for text_list_element in self.text_list:
                (crit_key, acctname_re) = text_list_element[:2]
                acct_Ac = self.acct_dict.lookup_by_re(acctname_re)
                if acct_Ac == None :
                    print("\n! Unidentified account '%s' --ignoring criteria '%s'" % (acctname_re, crit_key))
                else :
                    criteria_Cr = Criteria_Class(crit_key)
                    self.append( (criteria_Cr, acct_Ac) + tuple(text_list_element[2:]) )
                    # Add the criteria and the account name, but also any other elements in the tuple which might correspond to other features
 
        except ValueError, errmsg:
            print("\n! ERROR: %s" % errmsg)
            print("! Is the account description file %s in the wrong format? As of 2009-05-03, it needs to be a list, not a dictionary." % Global.descr_acct_fname)
            raise SystemExit
 
class Timestamp_Class(solitime.Solitime_class):
 
    def __init__(self, *other_args):
        return solitime.Solitime_class.__init__(self, *other_args)
 
    def formatted_timestamp(self, *other_args):
        return solitime.Solitime_class.formatted_solitime(self, *other_args)
 
class Transaction_Class():
 
    node = None
    acct1_Ac = None
    acct2_Ac = None
    #book_Gn = None
    date = None
    description = None
    num_accts_between = 0
    trn_id = None
    value = None
 
    def __init__(self, node=None, acct1_Ac=None, acct2_Ac=None, date=None, description=None, num_accts_between=0, trn_id=None, value=None):
        self.node = node
        self.acct1_Ac = acct1_Ac
        self.acct2_Ac = acct2_Ac
        self.date = date
        self.description = description
        self.num_accts_between = num_accts_between
        self.trn_id = trn_id
        self.value = value
        self.update_flag = False # If this is None, then the transaction is to be deleted
 
    def __repr__(self):
        return "Transaction, with id %s, for amt$ %s, on date %s, between acct %s and acct %s, covering %s accounts, described as %s. Stored in XML node %s." % (self.trn_id, self.value, self.date, self.acct1_Ac, self.acct2_Ac, self.num_accts_between, self.description, self.node)
 
    def __str__(self):
        return "Transaction %s: on %s, $%s from %s to %s." % (self.trn_id, self.date, self.value, self.acct1_Ac.acctname, self.acct2_Ac.acctname)
 
    def extract_timestamp(self):
        mytimestamp_Ts = Timestamp_Class()
        mytimestamp_Ts.set_time_from_string(self.date)
        MyDebug.debug_msg("Converting date from '%s' to '%s'" % (self.date, mytimestamp_Ts.formatted_timestamp()) ,4)
        return mytimestamp_Ts
 
    def flag_to_delete_original(self):
        self.update_flag = None
 
    def flag_to_preserve_original(self):
        self.update_flag = False
 
    def flag_to_update_original(self):
        self.update_flag = True
 
    def need_to_delete_original(self):
        return ( None == self.update_flag)
 
    def need_to_update_original(self):
        return ( True == self.update_flag)
 
class Trans_list_Class(list):
 
    def __init__(self, *other_args):
        list.__init__(self, *other_args)
 
    def find_matching_trans(self, unbal_trans_Tr):
        #found_it = False # We don't need this yet
        for poss_matching_trans_Tr in self:
            if self.the_dates_match(unbal_trans_Tr, poss_matching_trans_Tr) and \
                self.the_accounts_and_amounts_match(unbal_trans_Tr, poss_matching_trans_Tr):
                return poss_matching_trans_Tr
 
        return None
 
    def the_accounts_and_amounts_match(self, unbal_trans_Tr, poss_matching_trans_Tr):
        # Two cases: both transactions share the same Acct1, or same Acct2; then first amount should be negative of the second amount
        # Otherwise if the Acct1 of one is the same as the Acct2 of the other, or vice versa, then the amounts should be identical (not negatives of each other)
        if (unbal_trans_Tr.acct1_Ac == poss_matching_trans_Tr.acct1_Ac) or (unbal_trans_Tr.acct2_Ac == poss_matching_trans_Tr.acct2_Ac):
            return unbal_trans_Tr.value == -(poss_matching_trans_Tr.value)
        elif (unbal_trans_Tr.acct1_Ac == poss_matching_trans_Tr.acct2_Ac) or (unbal_trans_Tr.acct2_Ac == poss_matching_trans_Tr.acct1_Ac):
            return unbal_trans_Tr.value == poss_matching_trans_Tr.value
 
    def the_dates_match(self, unbal_trans_Tr, poss_matching_trans_Tr):
        unbal_trans_timestamp = unbal_trans_Tr.extract_timestamp()
        poss_matching_trans_timestamp = poss_matching_trans_Tr.extract_timestamp()
 
        return_val= abs(unbal_trans_timestamp.time_tuple[0]-poss_matching_trans_timestamp.time_tuple[0]) <= 1 and \
            unbal_trans_timestamp.time_tuple[1] == poss_matching_trans_timestamp.time_tuple[1] and \
            unbal_trans_timestamp.time_tuple[2] == poss_matching_trans_timestamp.time_tuple[2]
        # The dates will be considered matching if the month and day match exactly, and if the years match or are exactly 1 apart (my own special code)
        return return_val
 
class Transfix_Class():
 
    def __init__(self):
        pass
 
    def classify_trans_by_descr(self, trn_Tr):
        # Based on the description of the transaction, such as "MEGA FUEL",
        # figure out which income/expense account this should be, such as "Expenses:consumables:gasoline"
        #description_node = trn.getElementsByTagName(Global.trn_description_tagname)
        #if 0 >= len(description_node):
        # raise AccountError, "Can't find transaction description"
        #description = description_node[0].childNodes[0].nodeValue
 
        return_acct_Ac = None
        return_trans_descr = None
        for element in self.criteria_list_CrList:
            (criterion_Cr, acct_Ac) = element[:2]
            if criterion_Cr.fulfills_limit_criteria( trn_Tr ):
                # The transaction description matches, so let's see what account this transaction should be under
                return_acct_Ac = acct_Ac
                if len(element) >=3:
                    return_trans_descr = element[2] + " - " + trn_Tr.description
                    # ToDo: change this to a more sophisticated substitution
                break
 
        return (return_acct_Ac, return_trans_descr)
 
    def classify_trans_of_translist_by_descr(self, remaining_unbal_trans_list_TrList):
 
        print("")
        for unbal_trans_Tr in remaining_unbal_trans_list_TrList:
            (corrected_acct_Ac, corrected_trans_descr) = self.classify_trans_by_descr(unbal_trans_Tr)
            if None != corrected_acct_Ac:
                # Do this only if we've actually found the corrected acct id, or else leave the acct id alone
 
                # But is it acct1 or acct2 of the transaction that we need to correct? The following takes care of this.
                if unbal_trans_Tr.acct1_Ac in self.unbal_acct_list:
                    unbal_trans_Tr.acct1_Ac = corrected_acct_Ac
                elif unbal_trans_Tr.acct2_Ac in self.unbal_acct_list:
                    unbal_trans_Tr.acct2_Ac = corrected_acct_Ac
 
                if corrected_trans_descr != None:
                    unbal_trans_Tr.description = corrected_trans_descr
 
                unbal_trans_Tr.flag_to_update_original()
                MyDebug.msg_no_cr("!", 8)
                print(": (%s) $%0.2f\t%s -> '%s'" % (unbal_trans_Tr.date, unbal_trans_Tr.value, unbal_trans_Tr.description, corrected_acct_Ac.acctname))
            else:
                print("! (%s) $%0.2f *\t*\t* no account found for '%s'" % (unbal_trans_Tr.date, unbal_trans_Tr.value, unbal_trans_Tr.description))
 
    def extract_unbal_trans_list(self, all_transactions, unbal_acct_list):
        # First, clean up unbal_acct_list because it might contain None's
        no_more_Nones = False
        while not no_more_Nones:
            try:
                unbal_acct_list.remove(None)
            except:
                # We couldn't remove any more Nones from the list, so we must have gotten rid of all of them
                no_more_Nones = True
 
        MyDebug.debug_msg("After removing Nones, our unbal_acct_list is %s" % unbal_acct_list,4)
 
        unbal_trans_list = Trans_list_Class([])
        for tr in all_transactions:
            MyDebug.msg_no_cr("t", 3)
            #MyDebug.debug_msg("Checking tr %s." % tr,4)
            for unbal_acct in unbal_acct_list:
                MyDebug.debug_msg("Checking with account %s." % unbal_acct,4)
                if (tr.acct1_Ac == unbal_acct) or (tr.acct2_Ac == unbal_acct):
                    MyDebug.debug_msg("Found that transaction %s is unbalanced." % tr, 3)
                    unbal_trans_list.append(tr)
 
        return unbal_trans_list
 
    def find_matching_pairs_of_unbal_trans(self, unbal_trans_list_TrList):
    #def get_book_fn(self):
    # return "gnucash.xml"
        self.matched_unbal_trans_list = [] # This will contain tuples (pairs) of the matched accounts
        self.unmatched_unbal_trans_list_TrList = Trans_list_Class([])
        self.already_matched_unbal_trans_list_TrList = Trans_list_Class([])
        for unbal_trans_Tr in unbal_trans_list_TrList:
            if unbal_trans_Tr not in self.already_matched_unbal_trans_list_TrList :
                match_unbal_trans_Tr = unbal_trans_list_TrList.find_matching_trans(unbal_trans_Tr)
                if None != match_unbal_trans_Tr:
                    # We found a match! Now put the match in already_matched_unbal_trans_list_TrList so we don't try to look for a match (which would produce duplicate match)
                    self.already_matched_unbal_trans_list_TrList.append( match_unbal_trans_Tr )
                    self.matched_unbal_trans_list.append( (unbal_trans_Tr, match_unbal_trans_Tr) )
                    #my_gnucash_book_Gn.reconcile(unbal_trans_Tr, match_unbal_trans_Tr)
                    MyDebug.debug_msg("Found match. Currently %s unbalanced and %s pairs of matched transactions." % ( len(self.unmatched_unbal_trans_list_TrList),len( self.matched_unbal_trans_list ) ) ,4)
                else:
                    self.unmatched_unbal_trans_list_TrList.append( unbal_trans_Tr )
                    MyDebug.debug_msg("Could not find match for %s" % unbal_trans_Tr,4)
                    MyDebug.debug_msg("No match. Currently %s unbalanced and %s pairs of matched transactions." % ( len(self.unmatched_unbal_trans_list_TrList),len( self.matched_unbal_trans_list ) ) ,4)
        print(": Could not find match for %s unbalanced transactions." % len(self.unmatched_unbal_trans_list_TrList))
        print(": %s pairs of transactions to be reconciled." % len( self.matched_unbal_trans_list ))
 
        for (tr1_Tr, tr2_Tr) in self.matched_unbal_trans_list:
            self.reconcile_matching_unbal_trans(tr1_Tr, tr2_Tr)
 
        return self.unmatched_unbal_trans_list_TrList
 
    def reconcile_matching_unbal_trans(self,tr1_Tr, tr2_Tr):
        # The goal is to modify the transaction where the *second* account (not the first) is the shared one.
        # Then we delete the transaction where the first account is the shared one. So, if we have:
        # tr1: $10 from Acct A to Acct B
        # tr2: $10 from Acct B to Acct C
        # Then we delete tr2, and change tr1 to: $10 from Acct A to Acct C
 
        if (tr1_Tr.acct2_Ac == tr2_Tr.acct1_Ac):
            tr1_Tr.acct2_Ac = tr2_Tr.acct2_Ac
            tr2_Tr.acct1_Ac = tr1_Tr.acct1_Ac
        elif (tr1_Tr.acct2_Ac == tr2_Tr.acct2_Ac):
            tr1_Tr.acct2_Ac = tr2_Tr.acct1_Ac
            tr2_Tr.acct2_Ac = tr1_Tr.acct1_Ac
        elif (tr1_Tr.acct1_Ac == tr2_Tr.acct2_Ac):
            tr1_Tr.acct1_Ac = tr2_Tr.acct1_Ac
            tr2_Tr.acct2_Ac = tr1_Tr.acct2_Ac
        elif (tr1_Tr.acct2_Ac == tr2_Tr.acct2_Ac):
            tr1_Tr.acct2_Ac = tr2_Tr.acct1_Ac
            tr2_Tr.acct2_Ac = tr1_Tr.acct1_Ac
        else:
            raise AssertionError, "Looks like these aren't really matching transactions: %s and %s" % (tr1_Tr, tr2_Tr)
 
        # At this point each of the two transactions where one of the accounts was the Imbalance account has had its Imbalance account replaced with the correct account. So there are 2 duplicate transactions and we need to get rid of one.
        # Which do we keep? The one with the earlier date, since my special code is that I will put a date one year ahead. That is, if I deposit a cheque for $123.45 on 2003-06-21, then I'll put a reminder transaction that on 2004-06-21, $123.45 went somewhere. So even if the cheque doesn't show up right away, I will have a reminder that there is a transaction there... somewhere. I think. Does that work? Whatever. Just do it.
        if tr1_Tr.date < tr2_Tr.date :
            tr1_Tr.flag_to_update_original()
            tr2_Tr.flag_to_delete_original()
        else:
            tr2_Tr.flag_to_update_original()
            tr1_Tr.flag_to_delete_original()
 
        MyDebug.debug_msg("Reconciled transactions to: \n%s, and \n%s" % (tr1_Tr, tr2_Tr) ,4)
 
    def xml_manip_main(self):
        import os
 
        print(": Reading from finances file '%s'" % Global.input_fname)
        MyDebug.debug_msg("Now unzipping")
        result = os.system( "gunzip --to-stdout %s >%s" % (Global.input_fname, Global.input_unzipped_fname) )
        MyDebug.debug_msg("unzip result is %s" % result)
        if result != 0 :
            print(": %s does not appear to be zipped. Trying as unzipped file." % Global.input_fname)
            input_unzipped_fname = Global.input_fname
        else :
            input_unzipped_fname = Global.input_unzipped_fname
 
        self.my_gnucash_book_Gn = GnuCashXML_Class(input_unzipped_fname)
        self.my_gnucash_book_Gn.get_acct_dict()
        self.acct_dict_AcDict = self.my_gnucash_book_Gn.acct_dict_AcDict
        #MyDebug.debug_msg("Formed account dictionary of size %s" % len(acct_dict_AcDict) ,1)
        print(": Found %s accounts in %s" % (len(self.acct_dict_AcDict), Global.input_fname) )
        MyDebug.debug_msg(self.acct_dict_AcDict,4)
 
        self.all_trans_list_TrList = self.my_gnucash_book_Gn.get_all_trans_list()
        #MyDebug.debug_msg("There are %s transactions in total." % len(all_trans_list_TrList) ,1)
        print(": Found %s transactions in %s" % (len(self.all_trans_list_TrList) , Global.input_fname) )
 
        self.unbal_acct_list = [self.acct_dict_AcDict.lookup_by_re("Unspecified"),
                    self.acct_dict_AcDict.lookup_by_re("Imbalance-USD")]
        MyDebug.debug_msg("Unbal acct list is %s" % self.unbal_acct_list,2)
 
        self.unbal_trans_list_TrList = self.extract_unbal_trans_list( self.all_trans_list_TrList, self.unbal_acct_list )
        print(": There are %s unbalanced transactions." % len(self.unbal_trans_list_TrList))
 
        # We will store the original unbal_trans_list,
        # and then do various things like match up corresponding transactions and switch the account to the right account based on the description
        # Each time we do this, we maintain a list of transactions that are still not fixed, and pass it to the next function
 
        # Step 1
        #remaining_unmatched_unbal_trans_list_TrList = self.unbal_trans_list_TrList
        remaining_unmatched_unbal_trans_list_TrList = self.find_matching_pairs_of_unbal_trans(self.unbal_trans_list_TrList)
        #MyDebug.debug_msg(": skipping matching pairs. Revert this later on.")
 
        #remaining_unmatched_unbal_trans_list_TrList = self.fix_account_if_needed_based_on_descr(remaining_unbal_trans_list_TrList)
 
        # Step 2
        self.criteria_list_CrList = Criteria_list_Class(Global.descr_acct_fname, self.acct_dict_AcDict)
        remaining_unmatched_unbal_trans_list_TrList = self.classify_trans_of_translist_by_descr(remaining_unmatched_unbal_trans_list_TrList )
 
        self.my_gnucash_book_Gn.modify_original_trans( self.unbal_trans_list_TrList )
 
        print(": Done. Now writing to file '%s'" % Global.output_fname)
        self.my_gnucash_book_Gn.write_to_file(Global.output_unzipped_fname)
        MyDebug.debug_msg("Now zipping")
        result = os.system( "gzip --to-stdout %s >%s" % (Global.output_unzipped_fname, Global.output_fname) )
        MyDebug.debug_msg("zip result is %s" % result)
        if result != 0 :
            print(": %s may not have been zipped properly. Try using unzipped intermediate file %s" % (Global.output_fname, Global.output_unzipped_fname) )
 
def main():
    mytransfix = Transfix_Class()
    mytransfix.xml_manip_main()
 
if __name__ == "__main__":
    main()

End of listing for my Python program.

This discussion has been archived. No new comments can be posted.

processing GnuCash files (XML): a Python script

Comments Filter:
  • This is also released under GPLv3

    #!/usr/bin/env python

    """
    This class written by KWTm to help me debug stuff.
    Define an instance of class Debug, such as
    MyDebug = Debug(1)
    where 1 is the level of detail (can be set higher)
    Send debug messages with MyDebug.debug_msg("whatever debug message"), or
    MyDebug.debug_msg("whatever debug message", 2)
    where 2 is the minimum level of detail needed to display the message (default is 1)

    To clarify: each Debug msg has a "level". Then globally you set the Debug lev

  • I'm supplying this to make it possible to use my gnucash_fix.py program as-is. Right now this module doesn't do a whole lot more than the built-in gnu_getopt from Python.

    Released under GPLv3

    #!/usr/bin/env python

    # This is for getting command-line arguments, and not using the clumsy getopt method built-in

    """
    Here's how to use it:

    import kwgetopt

    and then, you can do this:

    cmd_line_opts = kwgetopt.kwgetopt( "", [] ) --WRONG! There must be arguments
    FirstArg = cmd_li

  • Here is an example of the accompanying descr_acct.py, which should be self-explanatory. This needs to be a valid Python file.

    # This file is for the gnucash_fix.py program.
    # Updated 2009-05-31 05:36:44

    # This is a list of tuples which contain 2 or 3 elements:
    # 1st element: may be one of 3 possibilities:
    # a regular expression (matching the description of the transaction), or
    # a tuple with 2 elements: that regular expression, and a numeric expression (the exact transaction amount), or
    # a tup

E = MC ** 2 +- 3db

Working...