mirror of
https://github.com/sys-fs/seddy
synced 2024-11-22 11:54:16 -05:00
Compare commits
No commits in common. "07f5679bedf5103cd30db95407d1546d0d65efb3" and "bebadf0bfb17041ffa2422901a0127ff43a2a715" have entirely different histories.
07f5679bed
...
bebadf0bfb
1
.gitignore
vendored
1
.gitignore
vendored
@ -1 +0,0 @@
|
|||||||
*~
|
|
143
seddy.py
Executable file
143
seddy.py
Executable file
@ -0,0 +1,143 @@
|
|||||||
|
#!/usr/bin/python3
|
||||||
|
# See the LICENSE file for licensing information
|
||||||
|
import re
|
||||||
|
from time import sleep
|
||||||
|
from random import choice
|
||||||
|
|
||||||
|
nick = 'seddy'
|
||||||
|
server = 'chat.freenode.net'
|
||||||
|
channel = '#example'
|
||||||
|
gecos = 'A text processing bot'
|
||||||
|
|
||||||
|
receive = '/tmp/{0}.in'.format(server)
|
||||||
|
send = '/tmp/{0}.out'.format(server)
|
||||||
|
|
||||||
|
parse_msg = re.compile('.* PRIVMSG ' + channel + ' :(.*)')
|
||||||
|
parse_sed = re.compile('(?<!\\\\)/')
|
||||||
|
is_sed = re.compile('^s/.*/.*')
|
||||||
|
ping = re.compile('PING :(.*)')
|
||||||
|
|
||||||
|
class Queue:
|
||||||
|
size = 0
|
||||||
|
data = []
|
||||||
|
head = 0
|
||||||
|
tail = 0
|
||||||
|
count = 0
|
||||||
|
|
||||||
|
def __init__(self, size):
|
||||||
|
self.size = size
|
||||||
|
self.data = [None]*size
|
||||||
|
|
||||||
|
def enqueue(self, s):
|
||||||
|
if not self.full():
|
||||||
|
self.count += 1
|
||||||
|
self.data[self.tail] = s
|
||||||
|
self.tail = (self.tail + 1) % self.size
|
||||||
|
|
||||||
|
def dequeue(self):
|
||||||
|
if not self.empty():
|
||||||
|
self.count -= 1
|
||||||
|
s = self.data[self.head]
|
||||||
|
self.head = (self.head + 1) % self.size
|
||||||
|
return s
|
||||||
|
|
||||||
|
def full(self):
|
||||||
|
return self.size == self.count
|
||||||
|
|
||||||
|
def empty(self):
|
||||||
|
return self.head == self.count
|
||||||
|
|
||||||
|
def find(self, s, f=0):
|
||||||
|
i = self.tail-1
|
||||||
|
while True:
|
||||||
|
if i == -1:
|
||||||
|
i = self.size-1
|
||||||
|
if re.search(s, self.data[i], f):
|
||||||
|
return self.data[i]
|
||||||
|
i -= 1
|
||||||
|
if i == self.tail-1 or self.data[i] is None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def msg_replace(find, replace, msg, f, n=1):
|
||||||
|
try:
|
||||||
|
if msg[:7] == '\x01ACTION':
|
||||||
|
res = '\x01ACTION {0}'.format(re.sub(find, replace, msg[8:], count=n, flags=f))
|
||||||
|
else:
|
||||||
|
res = re.sub(find, replace, msg, count=n, flags=f)
|
||||||
|
except:
|
||||||
|
return False
|
||||||
|
return res
|
||||||
|
|
||||||
|
def seddy(sed, history, parser):
|
||||||
|
f = 0
|
||||||
|
regex = parser.split(sed)
|
||||||
|
|
||||||
|
if len(regex) < 4:
|
||||||
|
return False
|
||||||
|
if 'i' in regex[3]:
|
||||||
|
f |= re.I
|
||||||
|
try:
|
||||||
|
msg = history.find(regex[1], f)
|
||||||
|
except:
|
||||||
|
return False
|
||||||
|
if "g" in regex[3]:
|
||||||
|
res = msg_replace(regex[1], regex[2], msg, f, n=0)
|
||||||
|
else:
|
||||||
|
res = msg_replace(regex[1], regex[2], msg, f)
|
||||||
|
if res:
|
||||||
|
res = res.replace('\0', re.search(regex[1], msg, f).group(0))
|
||||||
|
return res
|
||||||
|
|
||||||
|
def notice(msg, channel):
|
||||||
|
with open(send, 'w', encoding='utf-8') as g:
|
||||||
|
g.write('NOTICE ' + channel + ' :' + msg + '\r\n')
|
||||||
|
|
||||||
|
def privmsg(msg, channel):
|
||||||
|
if msg[:7] == '\x01ACTION':
|
||||||
|
with open(send, 'w', encoding='utf-8') as g:
|
||||||
|
g.write('PRIVMSG ' + channel + ' :' + '\x01' +
|
||||||
|
''.join(c for c in msg if c.isprintable()) + '\x01\r\n')
|
||||||
|
else:
|
||||||
|
with open(send, 'w', encoding='utf-8') as g:
|
||||||
|
g.write('PRIVMSG ' + channel + ' :' +
|
||||||
|
msg.replace('\n', ' ',).replace('\r', '') + '\r\n')
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
history = Queue(48)
|
||||||
|
|
||||||
|
with open(send, 'w', encoding='utf-8') as f:
|
||||||
|
f.write('NICK ' + nick + '\r\n')
|
||||||
|
f.write('USER ' + nick + ' * 8 :' + gecos + '\r\n')
|
||||||
|
sleep(5)
|
||||||
|
f.write('JOIN ' + channel + '\r\n')
|
||||||
|
|
||||||
|
with open(receive, 'r', encoding='utf-8') as f:
|
||||||
|
for line in f:
|
||||||
|
if 'PING' in line:
|
||||||
|
with open(send, 'w', encoding='utf-8') as g:
|
||||||
|
g.write('PONG {0}\r\n'.format(ping.split(line)[1]))
|
||||||
|
continue
|
||||||
|
|
||||||
|
m = parse_msg.match(line)
|
||||||
|
try:
|
||||||
|
msg = m.group(1)
|
||||||
|
except:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if history.full():
|
||||||
|
history.dequeue()
|
||||||
|
if 'PRIVMSG' in line and not is_sed.match(msg):
|
||||||
|
history.enqueue(msg)
|
||||||
|
|
||||||
|
if '.bots' in msg[:5] or '.bot ' + nick in msg[:5 + len(nick)]:
|
||||||
|
notice("I was written to correct your mistakes.", channel)
|
||||||
|
if '.source ' + nick in msg[:8 + len(nick)]:
|
||||||
|
notice('[Python] https://github.com/sys-fs/seddy', channel)
|
||||||
|
elif is_sed.match(msg):
|
||||||
|
res = seddy(msg, history, parse_sed)
|
||||||
|
if res:
|
||||||
|
res = re.sub('\\\\/', '/', res)
|
||||||
|
if history.full():
|
||||||
|
history.dequeue()
|
||||||
|
history.enqueue(res)
|
||||||
|
privmsg(res, channel)
|
23
setup.py
23
setup.py
@ -1,23 +0,0 @@
|
|||||||
'''Setup file for seddy.'''
|
|
||||||
|
|
||||||
from setuptools import setup, find_packages
|
|
||||||
|
|
||||||
setup(
|
|
||||||
name = 'seddy',
|
|
||||||
version = '0.1.0',
|
|
||||||
license = 'BSD',
|
|
||||||
description = 'An IRC bot for tubes.',
|
|
||||||
|
|
||||||
author='Thomas Mannay',
|
|
||||||
author_email='tfm@airmail.cc',
|
|
||||||
url='https://github.com/sys-fs/seddy',
|
|
||||||
|
|
||||||
packages=find_packages(where='src'),
|
|
||||||
package_dir={'': 'src'},
|
|
||||||
|
|
||||||
entry_points={
|
|
||||||
'console_scripts': [
|
|
||||||
'seddy = seddy.main:main',
|
|
||||||
]
|
|
||||||
},
|
|
||||||
)
|
|
@ -1,5 +0,0 @@
|
|||||||
'''An IRC bot for tubes.'''
|
|
||||||
|
|
||||||
from . import main
|
|
||||||
from . import tubes
|
|
||||||
from . import sed
|
|
@ -1,4 +0,0 @@
|
|||||||
# pylint:disable=missing-docstring
|
|
||||||
VERSION = (0, 1, 0)
|
|
||||||
|
|
||||||
__version__ = '.'.join(map(str, VERSION))
|
|
@ -1,102 +0,0 @@
|
|||||||
# See the LICENSE file for licensing information
|
|
||||||
'''The bot itself.'''
|
|
||||||
|
|
||||||
import re
|
|
||||||
from time import sleep
|
|
||||||
from . import tubes
|
|
||||||
from . import sed
|
|
||||||
|
|
||||||
class Queue:
|
|
||||||
size = 0
|
|
||||||
data = []
|
|
||||||
head = 0
|
|
||||||
tail = 0
|
|
||||||
count = 0
|
|
||||||
|
|
||||||
def __init__(self, size):
|
|
||||||
self.size = size
|
|
||||||
self.data = [None]*size
|
|
||||||
|
|
||||||
def enqueue(self, s):
|
|
||||||
if not self.full():
|
|
||||||
self.count += 1
|
|
||||||
self.data[self.tail] = s
|
|
||||||
self.tail = (self.tail + 1) % self.size
|
|
||||||
|
|
||||||
def dequeue(self):
|
|
||||||
if not self.empty():
|
|
||||||
self.count -= 1
|
|
||||||
s = self.data[self.head]
|
|
||||||
self.head = (self.head + 1) % self.size
|
|
||||||
return s
|
|
||||||
|
|
||||||
def full(self):
|
|
||||||
return self.size == self.count
|
|
||||||
|
|
||||||
def empty(self):
|
|
||||||
return self.head == self.count
|
|
||||||
|
|
||||||
def find(self, s, f=0):
|
|
||||||
i = self.tail-1
|
|
||||||
while True:
|
|
||||||
if i == -1:
|
|
||||||
i = self.size-1
|
|
||||||
if re.search(s, self.data[i], f):
|
|
||||||
return self.data[i]
|
|
||||||
i -= 1
|
|
||||||
if i == self.tail-1 or self.data[i] is None:
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
nick = 'seddy'
|
|
||||||
server = 'lain.church'
|
|
||||||
channel = '#lain'
|
|
||||||
gecos = 'A text processing bot'
|
|
||||||
|
|
||||||
history = Queue(48)
|
|
||||||
tube = tubes.Tube(server)
|
|
||||||
ping = re.compile('PING :(.*)')
|
|
||||||
parse_message = re.compile('.* PRIVMSG {} :(.*)'.format(channel))
|
|
||||||
slash_sed = re.compile('^s/.*/.*')
|
|
||||||
comma_sed = re.compile('^s,.*,.*')
|
|
||||||
slash_parser = re.compile('(?<!\\\\)/')
|
|
||||||
comma_parser = re.compile('(?<!\\\\),')
|
|
||||||
|
|
||||||
tube.identify(nick, gecos)
|
|
||||||
sleep(5)
|
|
||||||
tube.join(channel)
|
|
||||||
|
|
||||||
with open(tube.incoming, 'r', encoding='utf-8') as receive:
|
|
||||||
for line in receive:
|
|
||||||
if re.match(ping, line):
|
|
||||||
tube.pong(ping.split(line)[1])
|
|
||||||
continue
|
|
||||||
|
|
||||||
tmp = parse_message.match(line)
|
|
||||||
try:
|
|
||||||
message = tmp.group(1)
|
|
||||||
except:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if history.full():
|
|
||||||
history.dequeue()
|
|
||||||
if not slash_sed.match(message) and not comma_sed.match(message):
|
|
||||||
history.enqueue(message)
|
|
||||||
|
|
||||||
if slash_sed.match(message):
|
|
||||||
response = sed.seddy(message, history, slash_parser)
|
|
||||||
if response:
|
|
||||||
response = re.sub('\\\\/', '/', response)
|
|
||||||
if history.full():
|
|
||||||
history.dequeue()
|
|
||||||
history.enqueue(response)
|
|
||||||
tube.privmsg(response, channel)
|
|
||||||
elif comma_sed.match(message):
|
|
||||||
response = sed.seddy(message, history, comma_parser)
|
|
||||||
if response:
|
|
||||||
response = re.sub('\\\\,', ',', response)
|
|
||||||
if history.full():
|
|
||||||
history.dequeue()
|
|
||||||
history.enqueue(response)
|
|
||||||
tube.privmsg(response, channel)
|
|
@ -1,49 +0,0 @@
|
|||||||
'''Core functionality of seddy.'''
|
|
||||||
|
|
||||||
import re
|
|
||||||
|
|
||||||
def replacer(find, replace, message, flags, count=1):
|
|
||||||
try:
|
|
||||||
if message[:7] == '\x01ACTION':
|
|
||||||
tmp = re.sub(find, replace, message[8:], flags=flags,
|
|
||||||
count=count)
|
|
||||||
replaced = '\x01ACTION {}'.format(tmp)
|
|
||||||
|
|
||||||
else:
|
|
||||||
replaced = re.sub(find, replace, message, flags=flags, count=count, )
|
|
||||||
except:
|
|
||||||
return False
|
|
||||||
|
|
||||||
if '\0' in message:
|
|
||||||
whole_match = re.search(find, message, flags).group(0)
|
|
||||||
replaced = replaced.replace('\0', whole_match)
|
|
||||||
|
|
||||||
return replaced
|
|
||||||
|
|
||||||
|
|
||||||
def seddy(sed, history, parser):
|
|
||||||
f = 0
|
|
||||||
regex = parser.split(sed)
|
|
||||||
|
|
||||||
if len(regex) < 4:
|
|
||||||
return False
|
|
||||||
|
|
||||||
find = regex[1]
|
|
||||||
replace = regex[2]
|
|
||||||
flags = regex[3]
|
|
||||||
|
|
||||||
if 'i' in flags:
|
|
||||||
f |= re.I
|
|
||||||
|
|
||||||
try:
|
|
||||||
message = history.find(find, f)
|
|
||||||
except:
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
if 'g' in flags:
|
|
||||||
replaced = replacer(find, replace, message, f, count=0)
|
|
||||||
else:
|
|
||||||
replaced = replacer(find, replace, message, f)
|
|
||||||
|
|
||||||
return replaced
|
|
@ -1,48 +0,0 @@
|
|||||||
'''Wrappers for interacting with tubes.'''
|
|
||||||
import os
|
|
||||||
|
|
||||||
class Tube:
|
|
||||||
'''Object for interacting with tubes.'''
|
|
||||||
def __init__(self, server):
|
|
||||||
'''Takes a string containing a server name, e.g. chat.freenode.net, and
|
|
||||||
gets the correct filenames for tubes' named pipes for that server.
|
|
||||||
'''
|
|
||||||
self.incoming = os.path.join('/tmp', '{}.in'.format(server))
|
|
||||||
self.outgoing = os.path.join('/tmp', '{}.out'.format(server))
|
|
||||||
|
|
||||||
|
|
||||||
def identify(self, nick, gecos):
|
|
||||||
'''Identify ourselves to the ircd.'''
|
|
||||||
with open(self.outgoing, 'w', encoding='utf-8') as send:
|
|
||||||
send.write('NICK {}\r\n'.format(nick))
|
|
||||||
send.write('USER {} * 8 :{}\r\n'.format(nick, gecos))
|
|
||||||
|
|
||||||
|
|
||||||
def join(self, channel):
|
|
||||||
'''Join a channel.'''
|
|
||||||
with open(self.outgoing, 'w', encoding='utf-8') as send:
|
|
||||||
send.write('JOIN {}\r\n'.format(channel))
|
|
||||||
|
|
||||||
|
|
||||||
def notice(self, message, channel):
|
|
||||||
'''Send a notice.'''
|
|
||||||
with open(self.outgoing, 'w', encoding='utf-8') as send:
|
|
||||||
send.write('NOTICE {} :{}\r\n'.format(channel, message))
|
|
||||||
|
|
||||||
|
|
||||||
def privmsg(self, message, channel):
|
|
||||||
'''Send a message.'''
|
|
||||||
if message[:7] == '\x01ACTION':
|
|
||||||
with open(self.outgoing, 'w', encoding='utf-8') as send:
|
|
||||||
sanitised = ''.join(c for c in msg if c.isprintable())
|
|
||||||
send.write('PRIVMSG {} :\x01{}\x01\r\n'.format(channel, sanitised))
|
|
||||||
else:
|
|
||||||
with open(self.outgoing, 'w', encoding='utf-8') as send:
|
|
||||||
sanitised = message.replace('\n', ' ',).replace('\r', '')
|
|
||||||
send.write('PRIVMSG {} :{}\r\n'.format(channel, sanitised))
|
|
||||||
|
|
||||||
|
|
||||||
def pong(self, reply):
|
|
||||||
'''Reply to a ping.'''
|
|
||||||
with open(self.outgoing, 'w', encoding='utf-8') as send:
|
|
||||||
send.write('PONG {0}\r\n'.format(reply))
|
|
Loading…
Reference in New Issue
Block a user