source: tailor/vcpx/shwrap.py @ 850

Revision 850, 6.2 KB checked in by lele@…, 8 years ago (diff)

Use getpreferredencoding() instead of getdefaultencoding()
Accordingly to documentation, the former tells the encoding selected
by the user in his environment LANG variable, while the latter is what
Python uses internally to map non ascii strings to unicode.

Line 
1# -*- mode: python; coding: iso-8859-1 -*-
2# :Progetto: vcpx -- Tiny wrapper around external command
3# :Creato:   sab 10 apr 2004 16:43:48 CEST
4# :Autore:   Lele Gaifax <lele@nautilus.homeip.net>
5# :Licenza:  GNU General Public License
6#
7
8__docformat__ = 'reStructuredText'
9
10try:
11    # Python 2.4
12    from subprocess import Popen, PIPE, STDOUT
13except ImportError:
14    # Older snakes
15    from _process import Popen, PIPE, STDOUT
16
17class ReopenableNamedTemporaryFile:
18    """
19    This uses tempfile.mkstemp() to generate a secure temp file.  It
20    then closes the file, leaving a zero-length file as a placeholder.
21    You can get the filename with ReopenableNamedTemporaryFile.name.
22    When the ReopenableNamedTemporaryFile instance is garbage
23    collected or its shutdown() method is called, it deletes the file.
24
25    Copied from Zooko's pyutil.fileutil, http://zooko.com/repos/pyutil
26    """
27    def __init__(self, suffix=None, prefix=None, dir=None, text=None):
28        from tempfile import mkstemp
29        from os import close
30
31        fd, self.name = mkstemp(suffix, prefix, dir, text)
32        close(fd)
33
34    def __del__(self):
35        self.shutdown()
36
37    def shutdown(self):
38        from os import remove
39
40        remove(self.name)
41
42
43class ExternalCommand:
44    """Wrap a single command to be executed by the shell."""
45
46    VERBOSE = True
47    """Print the executed command on stderr, at each run."""
48
49    DEBUG = False
50    """Print the output of the command, when not PIPEd to the caller."""
51
52    DRY_RUN = False
53    """Don't really execute the command."""
54
55    FORCE_ENCODING = None
56    """Force the output encoding to some other charset instead of user prefs."""
57
58    def __init__(self, command=None, cwd=None):
59        """Initialize a ExternalCommand instance, specifying the command
60           to be executed and eventually the working directory."""
61
62        self.command = command
63        """The command to be executed."""
64
65        self.cwd = cwd
66        """The working directory, go there before execution."""
67
68        self.exit_status = None
69        """Once the command has been executed, this is its exit status."""
70
71        self._last_command = None
72        """Last executed command."""
73
74    def __str__(self):
75        result = []
76        if self.cwd:
77            result.append(self.cwd)
78            result.append(' ')
79        result.append('$')
80        needquote = False
81        for arg in self._last_command or self.command:
82            bs_buf = []
83
84            # Add a space to separate this argument from the others
85            result.append(' ')
86
87            needquote = (" " in arg) or ("\t" in arg)
88            if needquote:
89                result.append('"')
90
91            for c in arg:
92                if c == '\\':
93                    # Don't know if we need to double yet.
94                    bs_buf.append(c)
95                elif c == '"':
96                    # Double backspaces.
97                    result.append('\\' * len(bs_buf)*2)
98                    bs_buf = []
99                    result.append('\\"')
100                else:
101                    # Normal char
102                    if bs_buf:
103                        result.extend(bs_buf)
104                        bs_buf = []
105                    result.append(c)
106
107            # Add remaining backspaces, if any.
108            if bs_buf:
109                result.extend(bs_buf)
110
111            if needquote:
112                result.extend(bs_buf)
113                result.append('"')
114
115        return ''.join(result)
116
117    def execute(self, *args, **kwargs):
118        """Execute the command."""
119
120        from sys import stderr
121        from locale import getpreferredencoding
122        import os
123        from cStringIO import StringIO
124
125        self.exit_status = None
126
127        self._last_command = [chunk % kwargs for chunk in self.command]
128        if len(args) == 1 and type(args[0]) == type([]):
129            self._last_command.extend(args[0])
130        else:
131            self._last_command.extend(args)
132
133        if self.VERBOSE:
134            stderr.write(str(self))
135
136        if self.DRY_RUN:
137            return
138
139        if not kwargs.has_key('cwd') and self.cwd:
140            kwargs['cwd'] = self.cwd
141
142        if not kwargs.has_key('env'):
143            env = kwargs['env'] = {}
144            env.update(os.environ)
145
146            for v in ['LANG', 'TZ', 'PATH']:
147                if kwargs.has_key(v):
148                    env[v] = kwargs[v]
149
150        input = kwargs.get('input')
151        output = kwargs.get('stdout')
152        error = kwargs.get('stderr')
153
154        # When not in debug, redirect stderr and stdout to /dev/null
155        # when the caller didn't ask for them.
156        if not self.DEBUG:
157            devnull = getattr(os, 'devnull', '/dev/null')
158            if output is None:
159                output = open(devnull, 'w')
160            if error is None:
161                error = open(devnull, 'w')
162        try:
163            process = Popen(self._last_command,
164                            stdin=input and PIPE or None,
165                            stdout=output,
166                            stderr=error,
167                            env=kwargs.get('env'),
168                            cwd=kwargs.get('cwd'),
169                            universal_newlines=True)
170        except OSError, e:
171            from errno import ENOENT
172
173            if e.errno == ENOENT:
174                raise OSError("'%s' does not exist!" % self._last_command[0])
175            else:
176                raise
177
178        if input and isinstance(input, unicode):
179            input = input.encode(self.FORCE_ENCODING or getpreferredencoding())
180
181        out, err = process.communicate(input=input)
182
183        self.exit_status = process.returncode
184        if self.VERBOSE:
185            if not self.exit_status:
186                stderr.write(" [Ok]\n")
187            else:
188                stderr.write(" [Status %s]\n" % self.exit_status)
189
190        # For debug purposes, copy the output to our stderr when hidden above
191        if self.DEBUG:
192            if out and output == PIPE:
193                stderr.write('Output stream:\n')
194                stderr.write(out)
195            if err and error == PIPE:
196                stderr.write('Error stream:\n')
197                stderr.write(err)
198
199        if out is not None:
200            out = StringIO(out)
201        if err is not None:
202            err = StringIO(err)
203
204        return out, err
Note: See TracBrowser for help on using the repository browser.