svnlook.py   [plain text]


#!/usr/bin/env python
#
# svnlook.py : alternative svnlook in Python with library API
#
######################################################################
#    Licensed to the Apache Software Foundation (ASF) under one
#    or more contributor license agreements.  See the NOTICE file
#    distributed with this work for additional information
#    regarding copyright ownership.  The ASF licenses this file
#    to you under the Apache License, Version 2.0 (the
#    "License"); you may not use this file except in compliance
#    with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing,
#    software distributed under the License is distributed on an
#    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
#    KIND, either express or implied.  See the License for the
#    specific language governing permissions and limitations
#    under the License.
######################################################################
#
# $HeadURL: https://svn.apache.org/repos/asf/subversion/branches/1.10.x/tools/examples/svnlook.py $
# $LastChangedDate: 2013-11-14 11:11:07 +0000 (Thu, 14 Nov 2013) $
# $LastChangedRevision: 1541878 $

"""
svnlook.py can also be used as a Python module::

  >>> import svnlook
  >>> svnlook = svnlook.SVNLook("/testrepo")
  >>> svnlook.get_author()
  'randomjoe'


Accessible API::

[x] author
[x] changed
[x] date
[ ] diff
[x] dirs-changed
[ ] ids
[x] info
[x] log
[ ] tree
---
[ ] generator API to avoid passing lists
"""


import sys
import time
import os

from svn import core, fs, delta, repos

class SVNLook(object):
  def __init__(self, path, rev=None, txn=None, cmd=None):
    """
    path  - path to repository
    rev   - revision number
    txn   - name of transaction (usually the one about to be committed)
    cmd   - if set, specifies cmd_* method to execute

    txn takes precedence over rev; if both are None, inspect the head revision
    """
    path = core.svn_path_canonicalize(path)
    repos_ptr = repos.open(path)
    self.fs_ptr = repos.fs(repos_ptr)

    # if set, txn takes precendence
    if txn:
      self.txn_ptr = fs.open_txn(self.fs_ptr, txn)
    else:
      self.txn_ptr = None
      if rev is None:
        rev = fs.youngest_rev(self.fs_ptr)
      else:
        rev = int(rev)
    self.rev = rev

    if cmd != None:
      getattr(self, 'cmd_' + cmd)()

  def cmd_default(self):
    self.cmd_info()
    self.cmd_tree()

  def cmd_author(self):
    print(self.get_author() or '')

  def cmd_changed(self):
    for status, path in self.get_changed():
      print("%-3s %s" % (status, path))

  def cmd_date(self):
    # duplicate original svnlook format, which is
    # 2010-02-08 21:53:15 +0200 (Mon, 08 Feb 2010)
    secs = self.get_date(unixtime=True)
    if secs is None:
      print("")
    else:
      # convert to tuple, detect time zone and format
      stamp = time.localtime(secs)
      isdst = stamp.tm_isdst
      utcoffset = -(time.altzone if (time.daylight and isdst) else time.timezone) // 60

      suffix = "%+03d%02d" % (utcoffset // 60, abs(utcoffset) % 60)
      outstr = time.strftime('%Y-%m-%d %H:%M:%S ', stamp) + suffix
      outstr += time.strftime(' (%a, %d %b %Y)', stamp)
      print(outstr)


  def cmd_diff(self):
    self._print_tree(DiffEditor, pass_root=1)

  def cmd_dirs_changed(self):
    for dir in self.get_changed_dirs():
      print(dir)

  def cmd_ids(self):
    self._print_tree(Editor, base_rev=0, pass_root=1)

  def cmd_info(self):
    """print the author, data, log_size, and log message"""
    self.cmd_author()
    self.cmd_date()
    log = self.get_log() or ''
    print(len(log))
    print(log)

  def cmd_log(self):
    print(self.get_log() or '')

  def cmd_tree(self):
    self._print_tree(Editor, base_rev=0)


  # --- API getters
  def get_author(self):
    """return string with the author name or None"""
    return self._get_property(core.SVN_PROP_REVISION_AUTHOR)

  def get_changed(self):
    """return list of tuples (status, path)"""
    ret = []
    def list_callback(status, path):
      ret.append( (status, path) )
    self._walk_tree(ChangedEditor, pass_root=1, callback=list_callback)
    return ret

  def get_date(self, unixtime=False):
    """return commit timestamp in RFC 3339 format (2010-02-08T20:37:25.195000Z)
       if unixtime is True, return unix timestamp
       return None for a txn, or if date property is not set
    """
    if self.txn_ptr:
      return None

    date = self._get_property(core.SVN_PROP_REVISION_DATE)
    if not unixtime or date == None:
      return date

    # convert to unix time
    aprtime = core.svn_time_from_cstring(date)
    # ### convert to a time_t; this requires intimate knowledge of
    # ### the apr_time_t type
    secs = aprtime / 1000000  # aprtime is microseconds; make seconds
    return secs

  def get_changed_dirs(self):
    """return list of changed dirs
       dir names end with trailing forward slash even on windows
    """
    dirlist = []
    def list_callback(item):
      dirlist.append(item)
    self._walk_tree(DirsChangedEditor, callback=list_callback)
    return dirlist

  def get_log(self):
    """return log message string or None if not present"""
    return self._get_property(core.SVN_PROP_REVISION_LOG)


  # --- Internal helpers
  def _get_property(self, name):
    if self.txn_ptr:
      return fs.txn_prop(self.txn_ptr, name)
    return fs.revision_prop(self.fs_ptr, self.rev, name)

  def _print_tree(self, e_factory, base_rev=None, pass_root=0):
    def print_callback(msg):
       print(msg)
    self._walk_tree(e_factory, base_rev, pass_root, callback=print_callback)

  # svn fs, delta, repos calls needs review according to DeltaEditor documentation
  def _walk_tree(self, e_factory, base_rev=None, pass_root=0, callback=None):
    if base_rev is None:
      # a specific base rev was not provided. use the transaction base,
      # or the previous revision
      if self.txn_ptr:
        base_rev = fs.txn_base_revision(self.txn_ptr)
      elif self.rev == 0:
        base_rev = 0
      else:
        base_rev = self.rev - 1

    # get the current root
    if self.txn_ptr:
      root = fs.txn_root(self.txn_ptr)
    else:
      root = fs.revision_root(self.fs_ptr, self.rev)

    # the base of the comparison
    base_root = fs.revision_root(self.fs_ptr, base_rev)

    if callback == None:
      callback = lambda msg: None

    if pass_root:
      editor = e_factory(root, base_root, callback)
    else:
      editor = e_factory(callback=callback)

    # construct the editor for printing these things out
    e_ptr, e_baton = delta.make_editor(editor)

    # compute the delta, printing as we go
    def authz_cb(root, path, pool):
      return 1
    repos.dir_delta(base_root, '', '', root, '',
		    e_ptr, e_baton, authz_cb, 0, 1, 0, 0)


# ---------------------------------------------------------
# Delta Editors. For documentation see:
# http://subversion.apache.org/docs/community-guide/#docs

# this one doesn't process delete_entry, change_dir_prop, apply_text_delta,
# change_file_prop, close_file, close_edit, abort_edit
# ?set_target_revision
# need review
class Editor(delta.Editor):
  def __init__(self, root=None, base_root=None, callback=None):
    """callback argument is unused for this editor"""
    self.root = root
    # base_root ignored

    self.indent = ''

  def open_root(self, base_revision, dir_pool):
    print('/' + self._get_id('/'))
    self.indent = self.indent + ' '    # indent one space

  def add_directory(self, path, *args):
    id = self._get_id(path)
    print(self.indent + _basename(path) + '/' + id)
    self.indent = self.indent + ' '    # indent one space

  # we cheat. one method implementation for two entry points.
  open_directory = add_directory

  def close_directory(self, baton):
    # note: if indents are being performed, this slice just returns
    # another empty string.
    self.indent = self.indent[:-1]

  def add_file(self, path, *args):
    id = self._get_id(path)
    print(self.indent + _basename(path) + id)

  # we cheat. one method implementation for two entry points.
  open_file = add_file

  def _get_id(self, path):
    if self.root:
      id = fs.node_id(self.root, path)
      return ' <%s>' % fs.unparse_id(id)
    return ''

# doesn't process close_directory, apply_text_delta,
# change_file_prop, close_file, close_edit, abort_edit
# ?set_target_revision
class DirsChangedEditor(delta.Editor):
  """print names of changed dirs, callback(dir) is a printer function"""
  def __init__(self, callback):
    self.callback = callback

  def open_root(self, base_revision, dir_pool):
    return [ 1, '' ]

  def delete_entry(self, path, revision, parent_baton, pool):
    self._dir_changed(parent_baton)

  def add_directory(self, path, parent_baton,
                    copyfrom_path, copyfrom_revision, dir_pool):
    self._dir_changed(parent_baton)
    return [ 1, path ]

  def open_directory(self, path, parent_baton, base_revision, dir_pool):
    return [ 1, path ]

  def change_dir_prop(self, dir_baton, name, value, pool):
    self._dir_changed(dir_baton)

  def add_file(self, path, parent_baton,
               copyfrom_path, copyfrom_revision, file_pool):
    self._dir_changed(parent_baton)

  def open_file(self, path, parent_baton, base_revision, file_pool):
    # some kind of change is going to happen
    self._dir_changed(parent_baton)

  def _dir_changed(self, baton):
    if baton[0]:
      # the directory hasn't been printed yet. do it.
      self.callback(baton[1] + '/')
      baton[0] = 0

class ChangedEditor(delta.Editor):
  def __init__(self, root, base_root, callback):
    """callback(status, path) is a printer function"""
    self.root = root
    self.base_root = base_root
    self.callback = callback

  def open_root(self, base_revision, dir_pool):
    return [ 1, '' ]

  def delete_entry(self, path, revision, parent_baton, pool):
    ### need more logic to detect 'replace'
    if fs.is_dir(self.base_root, '/' + path):
      self.callback('D', path + '/')
    else:
      self.callback('D', path)

  def add_directory(self, path, parent_baton,
                    copyfrom_path, copyfrom_revision, dir_pool):
    self.callback('A', path + '/')
    return [ 0, path ]

  def open_directory(self, path, parent_baton, base_revision, dir_pool):
    return [ 1, path ]

  def change_dir_prop(self, dir_baton, name, value, pool):
    if dir_baton[0]:
      # the directory hasn't been printed yet. do it.
      self.callback('_U', dir_baton[1] + '/')
      dir_baton[0] = 0

  def add_file(self, path, parent_baton,
               copyfrom_path, copyfrom_revision, file_pool):
    self.callback('A', path)
    return [ '_', ' ', None ]

  def open_file(self, path, parent_baton, base_revision, file_pool):
    return [ '_', ' ', path ]

  def apply_textdelta(self, file_baton, base_checksum):
    file_baton[0] = 'U'

    # no handler
    return None

  def change_file_prop(self, file_baton, name, value, pool):
    file_baton[1] = 'U'

  def close_file(self, file_baton, text_checksum):
    text_mod, prop_mod, path = file_baton
    # test the path. it will be None if we added this file.
    if path:
      status = text_mod + prop_mod
      # was there some kind of change?
      if status != '_ ':
        self.callback(status.rstrip(), path)


class DiffEditor(delta.Editor):
  def __init__(self, root, base_root, callback=None):
    """callback argument is unused for this editor"""
    self.root = root
    self.base_root = base_root
    self.target_revision = 0

  def _do_diff(self, base_path, path):
    if base_path is None:
      print("Added: " + path)
      label = path
    elif path is None:
      print("Removed: " + base_path)
      label = base_path
    else:
      print("Modified: " + path)
      label = path
    print("===============================================================" + \
          "===============")
    args = []
    args.append("-L")
    args.append(label + "\t(original)")
    args.append("-L")
    args.append(label + "\t(new)")
    args.append("-u")
    differ = fs.FileDiff(self.base_root, base_path, self.root,
                         path, diffoptions=args)
    pobj = differ.get_pipe()
    while True:
      line = pobj.readline()
      if not line:
        break
      sys.stdout.write("%s " % line)
    print("")

  def _do_prop_diff(self, path, prop_name, prop_val, pool):
    print("Property changes on: " + path)
    print("_______________________________________________________________" + \
          "_______________")

    old_prop_val = None

    try:
      old_prop_val = fs.node_prop(self.base_root, path, prop_name, pool)
    except core.SubversionException:
      pass # Must be a new path

    if old_prop_val:
      if prop_val:
        print("Modified: " + prop_name)
        print("   - " + str(old_prop_val))
        print("   + " + str(prop_val))
      else:
        print("Deleted: " + prop_name)
        print("   - " + str(old_prop_val))
    else:
      print("Added: " + prop_name)
      print("   + " + str(prop_val))

    print("")

  def delete_entry(self, path, revision, parent_baton, pool):
    ### need more logic to detect 'replace'
    if not fs.is_dir(self.base_root, '/' + path):
      self._do_diff(path, None)

  def add_directory(self, path, parent_baton, copyfrom_path,
                    copyfrom_revision, dir_pool):
    return [ 1, path ]

  def add_file(self, path, parent_baton,
               copyfrom_path, copyfrom_revision, file_pool):
    self._do_diff(None, path)
    return [ '_', ' ', None ]

  def open_root(self, base_revision, dir_pool):
    return [ 1, '' ]

  def open_directory(self, path, parent_baton, base_revision, dir_pool):
    return [ 1, path ]

  def open_file(self, path, parent_baton, base_revision, file_pool):
    return [ '_', ' ', path ]

  def apply_textdelta(self, file_baton, base_checksum):
    if file_baton[2] is not None:
      self._do_diff(file_baton[2], file_baton[2])
    return None

  def change_file_prop(self, file_baton, name, value, pool):
    if file_baton[2] is not None:
      self._do_prop_diff(file_baton[2], name, value, pool)
    return None

  def change_dir_prop(self, dir_baton, name, value, pool):
    if dir_baton[1] is not None:
      self._do_prop_diff(dir_baton[1], name, value, pool)
    return None

  def set_target_revision(self, target_revision):
    self.target_revision = target_revision

def _basename(path):
  "Return the basename for a '/'-separated path."
  idx = path.rfind('/')
  if idx == -1:
    return path
  return path[idx+1:]


def usage(exit):
  if exit:
    output = sys.stderr
  else:
    output = sys.stdout

  output.write(
     "usage: %s REPOS_PATH rev REV [COMMAND] - inspect revision REV\n"
     "       %s REPOS_PATH txn TXN [COMMAND] - inspect transaction TXN\n"
     "       %s REPOS_PATH [COMMAND] - inspect the youngest revision\n"
     "\n"
     "REV is a revision number > 0.\n"
     "TXN is a transaction name.\n"
     "\n"
     "If no command is given, the default output (which is the same as\n"
     "running the subcommands `info' then `tree') will be printed.\n"
     "\n"
     "COMMAND can be one of: \n"
     "\n"
     "   author:        print author.\n"
     "   changed:       print full change summary: all dirs & files changed.\n"
     "   date:          print the timestamp (revisions only).\n"
     "   diff:          print GNU-style diffs of changed files and props.\n"
     "   dirs-changed:  print changed directories.\n"
     "   ids:           print the tree, with nodes ids.\n"
     "   info:          print the author, data, log_size, and log message.\n"
     "   log:           print log message.\n"
     "   tree:          print the tree.\n"
     "\n"
     % (sys.argv[0], sys.argv[0], sys.argv[0]))

  sys.exit(exit)

def main():
  if len(sys.argv) < 2:
    usage(1)

  rev = txn = None

  args = sys.argv[2:]
  if args:
    cmd = args[0]
    if cmd == 'rev':
      if len(args) == 1:
        usage(1)
      try:
        rev = int(args[1])
      except ValueError:
        usage(1)
      del args[:2]
    elif cmd == 'txn':
      if len(args) == 1:
        usage(1)
      txn = args[1]
      del args[:2]

  if args:
    if len(args) > 1:
      usage(1)
    cmd = args[0].replace('-', '_')
  else:
    cmd = 'default'

  if not hasattr(SVNLook, 'cmd_' + cmd):
    usage(1)

  SVNLook(sys.argv[1], rev, txn, cmd)

if __name__ == '__main__':
  main()