1
0
mirror of https://github.com/sys-fs/seddy synced 2024-11-30 22:41:32 -05:00
This commit is contained in:
Thomas Mannay 2019-01-30 19:43:35 +00:00
parent bebadf0bfb
commit e42a917cd4
8 changed files with 234 additions and 143 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
*~

143
seddy.py
View File

@ -1,143 +0,0 @@
#!/usr/bin/python3
# See the LICENSE file for licensing information
import re
from time import sleep
from random import choice
nick = 'seddy'
server = 'chat.freenode.net'
channel = '#example'
gecos = 'A text processing bot'
receive = '/tmp/{0}.in'.format(server)
send = '/tmp/{0}.out'.format(server)
parse_msg = re.compile('.* PRIVMSG ' + channel + ' :(.*)')
parse_sed = re.compile('(?<!\\\\)/')
is_sed = re.compile('^s/.*/.*')
ping = re.compile('PING :(.*)')
class Queue:
size = 0
data = []
head = 0
tail = 0
count = 0
def __init__(self, size):
self.size = size
self.data = [None]*size
def enqueue(self, s):
if not self.full():
self.count += 1
self.data[self.tail] = s
self.tail = (self.tail + 1) % self.size
def dequeue(self):
if not self.empty():
self.count -= 1
s = self.data[self.head]
self.head = (self.head + 1) % self.size
return s
def full(self):
return self.size == self.count
def empty(self):
return self.head == self.count
def find(self, s, f=0):
i = self.tail-1
while True:
if i == -1:
i = self.size-1
if re.search(s, self.data[i], f):
return self.data[i]
i -= 1
if i == self.tail-1 or self.data[i] is None:
return False
def msg_replace(find, replace, msg, f, n=1):
try:
if msg[:7] == '\x01ACTION':
res = '\x01ACTION {0}'.format(re.sub(find, replace, msg[8:], count=n, flags=f))
else:
res = re.sub(find, replace, msg, count=n, flags=f)
except:
return False
return res
def seddy(sed, history, parser):
f = 0
regex = parser.split(sed)
if len(regex) < 4:
return False
if 'i' in regex[3]:
f |= re.I
try:
msg = history.find(regex[1], f)
except:
return False
if "g" in regex[3]:
res = msg_replace(regex[1], regex[2], msg, f, n=0)
else:
res = msg_replace(regex[1], regex[2], msg, f)
if res:
res = res.replace('\0', re.search(regex[1], msg, f).group(0))
return res
def notice(msg, channel):
with open(send, 'w', encoding='utf-8') as g:
g.write('NOTICE ' + channel + ' :' + msg + '\r\n')
def privmsg(msg, channel):
if msg[:7] == '\x01ACTION':
with open(send, 'w', encoding='utf-8') as g:
g.write('PRIVMSG ' + channel + ' :' + '\x01' +
''.join(c for c in msg if c.isprintable()) + '\x01\r\n')
else:
with open(send, 'w', encoding='utf-8') as g:
g.write('PRIVMSG ' + channel + ' :' +
msg.replace('\n', ' ',).replace('\r', '') + '\r\n')
if __name__ == "__main__":
history = Queue(48)
with open(send, 'w', encoding='utf-8') as f:
f.write('NICK ' + nick + '\r\n')
f.write('USER ' + nick + ' * 8 :' + gecos + '\r\n')
sleep(5)
f.write('JOIN ' + channel + '\r\n')
with open(receive, 'r', encoding='utf-8') as f:
for line in f:
if 'PING' in line:
with open(send, 'w', encoding='utf-8') as g:
g.write('PONG {0}\r\n'.format(ping.split(line)[1]))
continue
m = parse_msg.match(line)
try:
msg = m.group(1)
except:
continue
if history.full():
history.dequeue()
if 'PRIVMSG' in line and not is_sed.match(msg):
history.enqueue(msg)
if '.bots' in msg[:5] or '.bot ' + nick in msg[:5 + len(nick)]:
notice("I was written to correct your mistakes.", channel)
if '.source ' + nick in msg[:8 + len(nick)]:
notice('[Python] https://github.com/sys-fs/seddy', channel)
elif is_sed.match(msg):
res = seddy(msg, history, parse_sed)
if res:
res = re.sub('\\\\/', '/', res)
if history.full():
history.dequeue()
history.enqueue(res)
privmsg(res, channel)

25
setup.py Normal file
View File

@ -0,0 +1,25 @@
'''Setup file for seddy.'''
from setuptools import setup, find_packages
setup(
name = 'seddy',
version = '0.1.0',
license = 'MIT',
description = 'Deployment tool for Drupal sites.',
author='Thomas Mannay',
author_email='tfm@airmail.cc',
url='https://github.com/sys-fs/seddy',
packages=find_packages(where='src'),
package_dir={'': 'src'},
# install_requires=['boto3', 'fabric', 'rocketchat-API'],
entry_points={
'console_scripts': [
'seddy = seddy.main:main',
]
},
)

5
src/seddy/__init__.py Normal file
View File

@ -0,0 +1,5 @@
'''An IRC bot for tubes.'''
from . import main
from . import tubes
from . import sed

4
src/seddy/__version__.py Normal file
View File

@ -0,0 +1,4 @@
# pylint:disable=missing-docstring
VERSION = (0, 1, 0)
__version__ = '.'.join(map(str, VERSION))

102
src/seddy/main.py Executable file
View File

@ -0,0 +1,102 @@
# See the LICENSE file for licensing information
'''The bot itself.'''
import re
from time import sleep
from . import tubes
from . import sed
class Queue:
size = 0
data = []
head = 0
tail = 0
count = 0
def __init__(self, size):
self.size = size
self.data = [None]*size
def enqueue(self, s):
if not self.full():
self.count += 1
self.data[self.tail] = s
self.tail = (self.tail + 1) % self.size
def dequeue(self):
if not self.empty():
self.count -= 1
s = self.data[self.head]
self.head = (self.head + 1) % self.size
return s
def full(self):
return self.size == self.count
def empty(self):
return self.head == self.count
def find(self, s, f=0):
i = self.tail-1
while True:
if i == -1:
i = self.size-1
if re.search(s, self.data[i], f):
return self.data[i]
i -= 1
if i == self.tail-1 or self.data[i] is None:
return False
def main():
nick = 'sedward'
server = 'lain.church'
channel = '#sedward'
gecos = 'A text processing bot'
history = Queue(48)
tube = tubes.Tube(server)
ping = re.compile('PING :(.*)')
parse_message = re.compile('.* PRIVMSG {} :(.*)'.format(channel))
slash_sed = re.compile('^s/.*/.*')
comma_sed = re.compile('^s,.*,.*')
slash_parser = re.compile('(?<!\\\\)/')
comma_parser = re.compile('(?<!\\\\),')
tube.identify(nick, gecos)
sleep(5)
tube.join(channel)
with open(tube.incoming, 'r', encoding='utf-8') as receive:
for line in receive:
if re.match(ping, line):
tube.pong(ping.split(line)[1])
continue
tmp = parse_message.match(line)
try:
message = tmp.group(1)
except:
continue
if history.full():
history.dequeue()
if not slash_sed.match(message) and not comma_sed.match(message):
history.enqueue(message)
if slash_sed.match(message):
response = sed.seddy(message, history, slash_parser)
if response:
response = re.sub('\\\\/', '/', response)
if history.full():
history.dequeue()
history.enqueue(response)
tube.privmsg(response, channel)
elif comma_sed.match(message):
response = sed.seddy(message, history, comma_parser)
if response:
response = re.sub('\\\\,', ',', response)
if history.full():
history.dequeue()
history.enqueue(response)
tube.privmsg(response, channel)

49
src/seddy/sed.py Normal file
View File

@ -0,0 +1,49 @@
'''Core functionality of seddy.'''
import re
def replacer(find, replace, message, flags, count=1):
try:
if message[:7] == '\x01ACTION':
tmp = re.sub(find, replace, message[8:], flags=flags,
count=count)
replaced = '\x01ACTION {}'.format(tmp)
else:
replaced = re.sub(find, replace, message, flags=flags, count=count, )
except:
return False
if '\0' in message:
whole_match = re.search(find, message, flags).group(0)
replaced = replaced.replace('\0', whole_match)
return replaced
def seddy(sed, history, parser):
f = 0
regex = parser.split(sed)
if len(regex) < 4:
return False
find = regex[1]
replace = regex[2]
flags = regex[3]
if 'i' in flags:
f |= re.I
try:
message = history.find(find, f)
except:
return False
if 'g' in flags:
replaced = replacer(find, replace, message, f, count=0)
else:
replaced = replacer(find, replace, message, f)
return replaced

48
src/seddy/tubes.py Normal file
View File

@ -0,0 +1,48 @@
'''Wrappers for interacting with tubes.'''
import os
class Tube:
'''Object for interacting with tubes.'''
def __init__(self, server):
'''Takes a string containing a server name, e.g. chat.freenode.net, and
gets the correct filenames for tubes' named pipes for that server.
'''
self.incoming = os.path.join('/tmp', '{}.in'.format(server))
self.outgoing = os.path.join('/tmp', '{}.out'.format(server))
def identify(self, nick, gecos):
'''Identify ourselves to the ircd.'''
with open(self.outgoing, 'w', encoding='utf-8') as send:
send.write('NICK {}\r\n'.format(nick))
send.write('USER {} * 8 :{}\r\n'.format(nick, gecos))
def join(self, channel):
'''Join a channel.'''
with open(self.outgoing, 'w', encoding='utf-8') as send:
send.write('JOIN {}\r\n'.format(channel))
def notice(self, message, channel):
'''Send a notice.'''
with open(self.outgoing, 'w', encoding='utf-8') as send:
send.write('NOTICE {} :{}\r\n'.format(channel, message))
def privmsg(self, message, channel):
'''Send a message.'''
if message[:7] == '\x01ACTION':
with open(self.outgoing, 'w', encoding='utf-8') as send:
sanitised = ''.join(c for c in msg if c.isprintable())
send.write('PRIVMSG {} :\x01{}\x01\r\n'.format(channel, sanitised))
else:
with open(self.outgoing, 'w', encoding='utf-8') as send:
sanitised = message.replace('\n', ' ',).replace('\r', '')
send.write('PRIVMSG {} :{}\r\n'.format(channel, sanitised))
def pong(self, reply):
'''Reply to a ping.'''
with open(self.outgoing, 'w', encoding='utf-8') as send:
send.write('PONG {0}\r\n'.format(reply))