view elpa/elpy-1.26.0/elpy/server.py @ 183:3de719fb264a

elpy: version 1.26
author Jordi Gutiérrez Hermoso <jordigh@octave.org>
date Wed, 21 Nov 2018 14:39:16 -0500
parents elpa/elpy-1.25.0/elpy/server.py@c3bd84985977
children
line wrap: on
line source

"""Method implementations for the Elpy JSON-RPC server.

This file implements the methods exported by the JSON-RPC server. It
handles backend selection and passes methods on to the selected
backend.

"""
import io
import os
import pydoc

from elpy.pydocutils import get_pydoc_completions
from elpy.rpc import JSONRPCServer, Fault
from elpy.auto_pep8 import fix_code
from elpy.yapfutil import fix_code as fix_code_with_yapf
from elpy.blackutil import fix_code as fix_code_with_black


try:
    from elpy import jedibackend
except ImportError:  # pragma: no cover
    jedibackend = None


class ElpyRPCServer(JSONRPCServer):
    """The RPC server for elpy.

    See the rpc_* methods for exported method documentation.

    """
    def __init__(self, *args, **kwargs):
        super(ElpyRPCServer, self).__init__(*args, **kwargs)
        self.backend = None
        self.project_root = None

    def _call_backend(self, method, default, *args, **kwargs):
        """Call the backend method with args.

        If there is currently no backend, return default."""
        meth = getattr(self.backend, method, None)
        if meth is None:
            return default
        else:
            return meth(*args, **kwargs)

    def rpc_echo(self, *args):
        """Return the arguments.

        This is a simple test method to see if the protocol is
        working.

        """
        return args

    def rpc_init(self, options):
        self.project_root = options["project_root"]

        if jedibackend:
            self.backend = jedibackend.JediBackend(self.project_root)
        else:
            self.backend = None

        return {
            'jedi_available': (self.backend is not None)
        }

    def rpc_get_calltip(self, filename, source, offset):
        """Get the calltip for the function at the offset.

        """
        return self._call_backend("rpc_get_calltip", None, filename,
                                  get_source(source), offset)

    def rpc_get_oneline_docstring(self, filename, source, offset):
        """Get a oneline docstring for the symbol at the offset.

        """
        return self._call_backend("rpc_get_oneline_docstring", None, filename,
                                  get_source(source), offset)

    def rpc_get_completions(self, filename, source, offset):
        """Get a list of completion candidates for the symbol at offset.

        """
        results = self._call_backend("rpc_get_completions", [], filename,
                                     get_source(source), offset)
        # Uniquify by name
        results = list(dict((res['name'], res) for res in results)
                       .values())
        results.sort(key=lambda cand: _pysymbol_key(cand["name"]))
        return results

    def rpc_get_completion_docstring(self, completion):
        """Return documentation for a previously returned completion.

        """
        return self._call_backend("rpc_get_completion_docstring",
                                  None, completion)

    def rpc_get_completion_location(self, completion):
        """Return the location for a previously returned completion.

        This returns a list of [file name, line number].

        """
        return self._call_backend("rpc_get_completion_location", None,
                                  completion)

    def rpc_get_definition(self, filename, source, offset):
        """Get the location of the definition for the symbol at the offset.

        """
        return self._call_backend("rpc_get_definition", None, filename,
                                  get_source(source), offset)

    def rpc_get_assignment(self, filename, source, offset):
        """Get the location of the assignment for the symbol at the offset.

        """
        return self._call_backend("rpc_get_assignment", None, filename,
                                  get_source(source), offset)

    def rpc_get_docstring(self, filename, source, offset):
        """Get the docstring for the symbol at the offset.

        """
        return self._call_backend("rpc_get_docstring", None, filename,
                                  get_source(source), offset)

    def rpc_get_pydoc_completions(self, name=None):
        """Return a list of possible strings to pass to pydoc.

        If name is given, the strings are under name. If not, top
        level modules are returned.

        """
        return get_pydoc_completions(name)

    def rpc_get_pydoc_documentation(self, symbol):
        """Get the Pydoc documentation for the given symbol.

        Uses pydoc and can return a string with backspace characters
        for bold highlighting.

        """
        try:
            docstring = pydoc.render_doc(str(symbol),
                                         "Elpy Pydoc Documentation for %s",
                                         False)
        except (ImportError, pydoc.ErrorDuringImport):
            return None
        else:
            if isinstance(docstring, bytes):
                docstring = docstring.decode("utf-8", "replace")
            return docstring

    def rpc_get_refactor_options(self, filename, start, end=None):
        """Return a list of possible refactoring options.

        This list will be filtered depending on whether it's
        applicable at the point START and possibly the region between
        START and END.

        """
        try:
            from elpy import refactor
        except:
            raise ImportError("Rope not installed, refactorings unavailable")
        ref = refactor.Refactor(self.project_root, filename)
        return ref.get_refactor_options(start, end)

    def rpc_refactor(self, filename, method, args):
        """Return a list of changes from the refactoring action.

        A change is a dictionary describing the change. See
        elpy.refactor.translate_changes for a description.

        """
        try:
            from elpy import refactor
        except:
            raise ImportError("Rope not installed, refactorings unavailable")
        if args is None:
            args = ()
        ref = refactor.Refactor(self.project_root, filename)
        return ref.get_changes(method, *args)

    def rpc_get_usages(self, filename, source, offset):
        """Get usages for the symbol at point.

        """
        source = get_source(source)
        if hasattr(self.backend, "rpc_get_usages"):
            return self.backend.rpc_get_usages(filename, source, offset)
        else:
            raise Fault("get_usages not implemented by current backend",
                        code=400)

    def rpc_get_names(self, filename, source, offset):
        """Get all possible names

        """
        source = get_source(source)
        if hasattr(self.backend, "rpc_get_names"):
            return self.backend.rpc_get_names(filename, source, offset)
        else:
            raise Fault("get_names not implemented by current backend",
                        code=400)

    def rpc_fix_code(self, source, directory):
        """Formats Python code to conform to the PEP 8 style guide.

        """
        source = get_source(source)
        return fix_code(source, directory)

    def rpc_fix_code_with_yapf(self, source, directory):
        """Formats Python code to conform to the PEP 8 style guide.

        """
        source = get_source(source)
        return fix_code_with_yapf(source, directory)

    def rpc_fix_code_with_black(self, source, directory):
        """Formats Python code to conform to the PEP 8 style guide.

        """
        source = get_source(source)
        return fix_code_with_black(source, directory)


def get_source(fileobj):
    """Translate fileobj into file contents.

    fileobj is either a string or a dict. If it's a string, that's the
    file contents. If it's a string, then the filename key contains
    the name of the file whose contents we are to use.

    If the dict contains a true value for the key delete_after_use,
    the file should be deleted once read.

    """
    if not isinstance(fileobj, dict):
        return fileobj
    else:
        try:
            with io.open(fileobj["filename"], encoding="utf-8",
                         errors="ignore") as f:
                return f.read()
        finally:
            if fileobj.get('delete_after_use'):
                try:
                    os.remove(fileobj["filename"])
                except:  # pragma: no cover
                    pass


def _pysymbol_key(name):
    """Return a sortable key index for name.

    Sorting is case-insensitive, with the first underscore counting as
    worse than any character, but subsequent underscores do not. This
    means that dunder symbols (like __init__) are sorted after symbols
    that start with an alphabetic character, but before those that
    start with only a single underscore.

    """
    if name.startswith("_"):
        name = "~" + name[1:]
    return name.lower()